import requests import json import base64 from concurrent.futures import ThreadPoolExecutor, as_completed from collections import defaultdict from urllib.parse import quote # ================== 配置区 ================== URL = "https://serverlist.piaservers.net/shadow_socks" THREADS = 4 # 建议改成 3~4,OpenWrt/LEDE 上并发太高容易出问题 TOTAL_REQUESTS = 500 # 可以改大到 800~1000 TIMEOUT = 12 # =========================================== def fetch_once(): try: headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } r = requests.get(URL, timeout=TIMEOUT, headers=headers) r.raise_for_status() text = r.text.strip() # 处理 "JSON\nbase64" 格式,取第一部分(JSON) if '\n' in text: json_text = text.split('\n', 1)[0].strip() else: json_text = text # 调试:打印前200字符,帮助排查 # print("Raw start:", repr(json_text[:200])) data = json.loads(json_text) if isinstance(data, list): return data return None except Exception as e: # print("Request error:", type(e).__name__) # 调试时可以取消注释 return None print(f"开始使用 {THREADS} 个线程采集 {TOTAL_REQUESTS} 次...") servers = defaultdict(set) with ThreadPoolExecutor(max_workers=THREADS) as executor: futures = [executor.submit(fetch_once) for _ in range(TOTAL_REQUESTS)] for i, future in enumerate(as_completed(futures), 1): data = future.result() if data: for item in data: if isinstance(item, dict): region = str(item.get("region", "")).strip() host = str(item.get("host", "")).strip() if region and host: servers[region].add(host) if i % 50 == 0 or i == TOTAL_REQUESTS: current = sum(len(hosts) for hosts in servers.values()) print(f"已完成 {i}/{TOTAL_REQUESTS} 次请求... 当前发现节点数: {current}") print("\n采集完成!") total_nodes = sum(len(hosts) for hosts in servers.values()) print(f"共发现 {total_nodes} 个独特节点\n") if total_nodes == 0: print("仍然采集不到节点。") print("可能原因:") print("1. 你的网络环境对这个域名做了特殊处理或被限流") print("2. 运行在 OpenWrt/LEDE 上,请求头或 TLS 有问题") print("建议:先用 curl 多测试几次,看是否每次都能正常返回 JSON") print(" curl -I https://serverlist.piaservers.net/shadow_socks") else: # 输出表格 print("地区 Host数量 Hosts") print("-" * 85) all_nodes = [] for region in sorted(servers.keys()): hosts = sorted(servers[region]) print(f"{region:<20} {len(hosts):<8} {', '.join(hosts)}") for host in hosts: all_nodes.append((region, host)) # 生成 ss:// 链接 print(f"\n=== 生成 {len(all_nodes)} 个 ss:// 节点 ===") ss_links = [] for region, host in all_nodes: auth = f"aes-128-gcm:shadowsocks@{host}:443" b64 = base64.urlsafe_b64encode(auth.encode()).decode().rstrip("=") name = f"PIA-{region}-{host.split('.')[-1]}" ss_url = f"ss://{b64}#{quote(name)}" ss_links.append(ss_url) print(ss_url) sub_url = "https://api.ss-sub.com/sub?target=ss&url=" + "|".join(ss_links) print(f"\n=== 一键订阅链接 ===") print(sub_url) # 保存到文件 with open("pia_shadowsocks_nodes.txt", "w", encoding="utf-8") as f: f.write("\n".join(ss_links)) f.write(f"\n\n订阅链接:\n{sub_url}") print("\n所有节点已保存到 pia_shadowsocks_nodes.txt 文件中!")