"""
端口扫描技能
功能:扫描指定IP地址的常用端口,检查哪些端口是开放的,并检测可访问的Web服务
"""
import socket
import concurrent.futures
from typing import List, Dict, Tuple, Optional
import time
import urllib.request
import urllib.error
# 技能元数据(符合iFlow技能规范)
SKILL_METADATA = {
"name": "port-scanner",
"description": "Fast and eff...
"""
端口扫描技能
功能:扫描指定IP地址的常用端口,检查哪些端口是开放的,并检测可访问的Web服务
"""
import socket
import concurrent.futures
from typing import List, Dict, Tuple, Optional
import time
import urllib.request
import urllib.error
# 技能元数据(符合iFlow技能规范)
SKILL_METADATA = {
"name": "port-scanner",
"description": "Fast and efficient port scanning tool for network discovery. Scan multiple hosts for open ports, identify services, and detect accessible web services. Supports concurrent scanning with configurable timeout and custom port ranges. 重点提示:如果检测到8888端口开放,会直接告知用户访问该端口。",
"license": "MIT",
"version": "1.0.0",
"author": "Vinson"
}
class PortScanner:
"""
高性能端口扫描器
支持并发扫描多个主机的多个端口,识别常见服务,并检测可访问的Web服务。
"""
# 常用端口列表(端口 -> 服务名称映射)
COMMON_PORTS = {
21: "FTP",
22: "SSH",
23: "Telnet",
25: "SMTP",
53: "DNS",
80: "HTTP",
110: "POP3",
143: "IMAP",
443: "HTTPS",
445: "SMB",
3306: "MySQL",
3389: "RDP",
5432: "PostgreSQL",
6379: "Redis",
8080: "HTTP-Alt",
8888: "HTTP-Alt",
9000: "HTTP-Alt",
27017: "MongoDB",
}
# Web服务端口
WEB_PORTS = [80, 443, 8080, 8888, 9000]
def __init__(self, timeout: float = 1.0, max_workers: int = 50, verbose: bool = True):
"""
初始化端口扫描器
Args:
timeout: 连接超时时间(秒),默认1.0秒
max_workers: 最大并发线程数,默认50
verbose: 是否输出详细扫描信息,默认True
"""
self.timeout = timeout
self.max_workers = max_workers
self.verbose = verbose
def _log(self, message: str) -> None:
"""输出日志信息(如果verbose为True)"""
if self.verbose:
print(message)
def scan_port(self, host: str, port: int) -> Tuple[int, bool, str]:
"""
扫描单个端口
Args:
host: 目标主机IP地址或域名
port: 端口号
Returns:
元组 (端口号, 是否开放, 服务名称)
"""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(self.timeout)
result = sock.connect_ex((host, port))
sock.close()
is_open = result == 0
service = self.COMMON_PORTS.get(port, "Unknown")
return (port, is_open, service)
except Exception:
return (port, False, "Unknown")
def scan_host(self, host: str, ports: Optional[List[int]] = None) -> Dict:
"""
扫描指定主机的端口
Args:
host: 目标主机IP地址或域名
ports: 要扫描的端口列表,如果为None则扫描常用端口
Returns:
扫描结果字典,包含以下键:
- host: 主机地址
- open_ports: 开放端口列表 [(端口号, 服务名称), ...]
- scan_duration: 扫描耗时(秒)
- total_ports_scanned: 扫描的总端口数
- open_ports_count: 开放端口数量
"""
if ports is None:
ports = list(self.COMMON_PORTS.keys())
self._log(f"\n{'=' * 60}")
self._log(f"正在扫描主机: {host}")
self._log(f"扫描端口数: {len(ports)}")
self._log(f"超时时间: {self.timeout}秒")
self._log(f"并发线程数: {self.max_workers}")
self._log(f"{'-' * 60}")
open_ports = []
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
executor.submit(self.scan_port, host, port): port
for port in ports
}
for future in concurrent.futures.as_completed(futures):
port, is_open, service = future.result()
if is_open:
open_ports.append((port, service))
self._log(f" ✓ 端口 {port} ({service}) 开放")
end_time = time.time()
scan_duration = end_time - start_time
result = {
"host": host,
"open_ports": open_ports,
"scan_duration": scan_duration,
"total_ports_scanned": len(ports),
"open_ports_count": len(open_ports)
}
self._log(f"{'-' * 60}")
self._log(f"扫描完成!耗时: {scan_duration:.2f}秒")
self._log(f"开放端口数: {len(open_ports)}/{len(ports)}")
# 检查是否有8888端口开放
port_8888_open = any(port == 8888 for port, _ in open_ports)
if port_8888_open:
self._log(f"\n{'!' * 60}")
self._log(f"⚠️ 发现8888端口开放!")
self._log(f"📌 请访问: http://{host}:8888")
self._log(f"{'!' * 60}\n")
return result
def check_web_url(self, url: str, timeout: int = 3) -> Dict:
"""
检查网页URL是否可访问,并跟踪重定向
Args:
url: 要检查的URL
timeout: 请求超时时间(秒),默认3秒
Returns:
检查结果字典,包含以下键:
- url: 原始URL
- final_url: 最终URL(可能经过重定向)
- status_code: HTTP状态码
- accessible: 是否可访问
- redirected: 是否发生了重定向
- error: 错误信息(如果访问失败)
"""
try:
# 创建请求
req = urllib.request.Request(url)
req.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36')
# 发送请求并跟踪重定向
response = urllib.request.urlopen(req, timeout=timeout)
# 获取最终URL(处理重定向)
final_url = response.geturl()
return {
"url": url,
"final_url": final_url,
"status_code": response.status,
"accessible": True,
"redirected": final_url != url
}
except urllib.error.HTTPError as e:
return {
"url": url,
"final_url": None,
"status_code": e.code,
"accessible": False,
"redirected": False,
"error": f"HTTP Error: {e.code}"
}
except urllib.error.URLError as e:
return {
"url": url,
"final_url": None,
"status_code": None,
"accessible": False,
"redirected": False,
"error": f"URL Error: {str(e.reason)}"
}
except Exception as e:
return {
"url": url,
"final_url": None,
"status_code": None,
"accessible": False,
"redirected": False,
"error": str(e)
}
def generate_web_urls(self, results: List[Dict]) -> List[Dict]:
"""
根据扫描结果生成可访问的网页地址
Args:
results: 扫描结果列表(来自scan_hosts或scan_host)
Returns:
可访问的网页地址列表,每个元素是包含详细信息的字典:
- url: 原始URL
- final_url: 最终URL
- status_code: HTTP状态码
- host: 主机地址
- port: 端口号
- service: 服务名称
- redirected: 是否重定向
"""
urls = []
checked_results = []
self._log(f"\n{'=' * 60}")
self._log("检查可访问的网页地址")
self._log(f"{'=' * 60}")
for result in results:
host = result["host"]
open_ports = result["open_ports"]
for port, service in open_ports:
if port in self.WEB_PORTS:
# 根据端口生成URL
if port == 80:
url = f"http://{host}"
elif port == 443:
url = f"https://{host}"
elif port in [8080, 8888, 9000]:
url = f"http://{host}:{port}"
else:
continue
# 检查URL是否可访问
check_result = self.check_web_url(url)
check_result["host"] = host
check_result["port"] = port
check_result["service"] = service
checked_results.append(check_result)
if check_result["accessible"]:
final_url = check_result["final_url"]
if check_result["redirected"]:
self._log(f" ✓ {url} → 重定向到 {final_url}")
else:
self._log(f" ✓ {url} ({service})")
# 如果是8888端口,额外提示
if port == 8888:
self._log(f" 🌟 重要提示:请访问此地址!")
urls.append(check_result)
else:
self._log(f" ✗ {url} - {check_result.get('error', '无法访问')}")
self._log(f"{'=' * 60}")
self._log(f"共找到 {len(urls)} 个可访问的网页地址")
self._log(f"{'=' * 60}")
return urls
def scan_hosts(self, hosts: List[str], ports: Optional[List[int]] = None) -> Dict:
"""
扫描多个主机的端口
Args:
hosts: 目标主机IP地址或域名列表
ports: 要扫描的端口列表,如果为None则扫描常用端口
Returns:
扫描结果字典,包含以下键:
- hosts: 扫描的主机列表
- results: 每个主机的扫描结果列表
- total_hosts: 总主机数
- total_open_ports: 总开放端口数
- total_scan_duration: 总扫描耗时
- web_urls: 可访问的网页地址列表
"""
results = []
self._log(f"{'=' * 60}")
self._log("端口扫描工具 v1.0")
self._log(f"{'=' * 60}")
self._log(f"目标主机数: {len(hosts)}")
self._log(f"目标主机: {', '.join(hosts)}")
self._log()
total_start_time = time.time()
for i, host in enumerate(hosts, 1):
self._log(f"[{i}/{len(hosts)}] ", end="")
result = self.scan_host(host, ports)
results.append(result)
total_end_time = time.time()
total_scan_duration = total_end_time - total_start_time
# 汇总报告
self._log(f"\n{'=' * 60}")
self._log("扫描汇总报告")
self._log(f"{'=' * 60}")
total_open_ports = sum(r["open_ports_count"] for r in results)
for result in results:
host = result["host"]
open_count = result["open_ports_count"]
open_ports = result["open_ports"]
if open_ports:
ports_str = ", ".join([f"{p}({s})" for p, s in open_ports])
self._log(f" {host}: {open_count}个开放端口 [{ports_str}]")
else:
self._log(f" {host}: 无开放端口")
self._log()
self._log(f"总扫描耗时: {total_scan_duration:.2f}秒")
self._log(f"总开放端口数: {total_open_ports}")
# 检查哪些主机有8888端口开放
hosts_with_8888 = []
for result in results:
if any(port == 8888 for port, _ in result["open_ports"]):
hosts_with_8888.append(result["host"])
if hosts_with_8888:
self._log(f"\n{'🌟' * 30}")
self._log(f"发现 {len(hosts_with_8888)} 台主机开放了8888端口!")
self._log(f"{'🌟' * 30}")
for host in hosts_with_8888:
self._log(f" 📌 请访问: http://{host}:8888")
self._log()
# 检测Web服务
web_urls = self.generate_web_urls(results)
return {
"hosts": hosts,
"results": results,
"total_hosts": len(hosts),
"total_open_ports": total_open_ports,
"total_scan_duration": total_scan_duration,
"web_urls": web_urls
}
def main():
"""
主函数 - 示例用法
演示如何使用PortScanner扫描多个主机并检测Web服务
特别关注8888端口,如果开放会直接提示用户访问
"""
# 要扫描的主机列表(示例IP地址)
hosts = [
"192.168.20.18",
"192.168.10.21",
"192.168.60.132",
"192.168.70.46"
]
# 创建扫描器实例
# timeout: 连接超时时间(秒)
# max_workers: 最大并发线程数
# verbose: 是否输出详细信息
scanner = PortScanner(timeout=1.0, max_workers=50, verbose=True)
# 扫描所有主机
scan_results = scanner.scan_hosts(hosts)
# 返回扫描结果
return scan_results
def quick_scan(host: str, ports: Optional[List[int]] = None) -> Dict:
"""
快速扫描单个主机
Args:
host: 目标主机IP地址或域名
ports: 要扫描的端口列表,如果为None则扫描常用端口
Returns:
扫描结果字典
"""
scanner = PortScanner(timeout=1.0, max_workers=50, verbose=True)
return scanner.scan_host(host, ports)
def quick_scan_multiple(hosts: List[str], ports: Optional[List[int]] = None) -> Dict:
"""
快速扫描多个主机
Args:
hosts: 目标主机IP地址或域名列表
ports: 要扫描的端口列表,如果为None则扫描常用端口
Returns:
扫描结果字典
"""
scanner = PortScanner(timeout=1.0, max_workers=50, verbose=True)
return scanner.scan_hosts(hosts, ports)
if __name__ == "__main__":
# 运行主函数
main()