磁力链接与ed2k的自动化检测处理
Published in:2025-01-23 |
Words: 6.9k | Reading time: 34min | reading:

磁力链接与ed2k的自动化检测处理

磁力链接与ed2k对比

磁力链接下载速度的影响因素 (BitTorrent 协议):

  1. 种子数量(Seeders):
    • 定义: 指的是完整拥有你想要下载的文件的用户数量。
    • 影响: 种子越多,你下载的速度通常就越快,因为你可以从多个来源同时下载数据。
    • 重要性: 种子数量是影响下载速度的最关键因素之一。如果种子数为零或很少,下载速度会非常慢甚至无法下载。
  2. 节点数量(Peers):
    • 定义: 指的是正在下载或部分拥有该文件的用户数量。
    • 影响: 节点数量越多,你获得数据的机会就越多,但节点不一定拥有完整文件,所以下载速度不一定总是更快。
    • 区别于种子: 节点可能只拥有部分文件,而种子拥有完整文件。
  3. 种子质量 (带宽和连接数):
    • 定义: 种子的上传带宽和连接数限制。
    • 影响: 如果种子用户的上传带宽很低或者连接数有限制,你下载速度会受到限制。高质量的种子上传速度快且允许更多人连接。
    • 实际情况: 很多用户上传带宽不高,这也是影响下载速度的因素。
  4. 你的网络带宽:
    • 定义: 你的网络服务提供商 (ISP) 提供的最大下载速度。
    • 影响: 如果你的网络带宽很低,下载速度会受到物理限制。即使种子很多,下载速度也不会超过你的带宽上限。
    • 限制: 很多运营商对上传和下载速度有限制,比如上行带宽通常低于下行带宽。
  5. 你的网络环境:
    • 定义: 你的网络连接质量和稳定性。
    • 影响: 如果你的网络丢包严重或者延迟很高,下载速度会受到影响。网络环境不佳会导致下载不稳定,频繁中断。
    • NAT 类型: 网络地址转换 (NAT) 类型也会影响 P2P 连接,比如严格的 NAT 会限制连接数。
  6. 下载客户端:
    • 定义: 你使用的 BitTorrent 客户端软件的设置和性能。
    • 影响: 不同的客户端在性能、连接数、优化算法等方面有所不同,会影响下载速度。
    • 优化: 好的客户端会自动寻找最优的连接,智能分配带宽。
  7. 文件热门程度:
    • 定义: 文件在 P2P 网络中的受欢迎程度。
    • 影响: 热门文件通常种子较多,下载速度更快,冷门文件可能缺少种子或种子质量不高。
  8. DHT 网络和 Tracker:
    • 定义: DHT 网络和 Tracker 服务器帮助客户端找到其他节点。
    • 影响: 如果 DHT 网络连接不好或者 Tracker 服务器不稳定,会影响节点发现和下载速度。
    • 重要性: DHT 网络和 Tracker 服务器是保证 P2P 网络正常运行的关键组件。

eD2k 链接下载速度的影响因素 (eDonkey2000 协议):

  1. 源数量 (Sources):
    • 定义: 指的是拥有你想要下载的文件(或部分)的用户数量。
    • 影响: 源越多,你下载速度通常就越快,因为你可以从多个来源同时下载数据。
    • 关键性: 源数量是影响 eD2k 下载速度的最重要因素。
  2. 用户队列(Queue):
    • 定义: 你在每个源用户处的下载队列中的位置。
    • 影响: 如果你位于队列中比较靠后的位置,你需要等待前面的用户下载完毕,才能轮到你。
    • 等待时间: 队列长度会直接影响下载的等待时间。
  3. 用户共享设置 (Credits):
    • 定义: eDonkey 网络中鼓励用户共享的机制。上传更多数据会获得更高的信用,从而在下载队列中优先。
    • 影响: 如果你没有上传足够的量,下载速度会受到限制。
    • 重要性: eDonkey 网络强调共享精神,积极上传有助于提升下载速度。
  4. 你的网络带宽:
    • 定义: 你的网络服务提供商 (ISP) 提供的最大下载速度。
    • 影响: 如果你的网络带宽很低,下载速度会受到物理限制。
  5. 你的网络环境:
    • 定义: 你的网络连接质量和稳定性。
    • 影响: 如果你的网络丢包严重或者延迟很高,下载速度会受到影响。
    • NAT 类型: NAT 类型也会影响 P2P 连接,特别是低 ID 用户(Low ID)会影响连接其他用户。
  6. 下载客户端:
    • 定义: 你使用的 eDonkey 客户端软件的设置和性能。
    • 影响: 不同的客户端在性能、连接数、优化算法等方面有所不同,会影响下载速度。
    • 高ID: 客户端获取高ID有助于提升连接效率。
  7. eD2k 服务器:
    • 定义: eD2k 服务器帮助客户端找到其他节点。
    • 影响: 如果服务器不稳定或连接较慢,会影响连接效率。
    • 重要性: 服务器是 eD2k 网络的关键组件。

工具安装

  • qbittorrent
1
2
pip install qbittorrent-api
https://www.fosshub.com/qBittorrent.html#
  • emule
1
https://sourceforge.net/projects/emule/files/latest/download

检测可用性等指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
import asyncio
import logging
import os
import csv
import urllib.parse
import socket
import time
import subprocess
import re
from logging.handlers import RotatingFileHandler

log_dir = "./logs"
log_file = os.path.join(log_dir, "man_detect.log")
max_log_size = 10 * 1024 * 1024 # 10MB
backup_count = 5 # 最多保留5个日志文件

# 创建logs目录,如果不存在
os.makedirs(log_dir, exist_ok=True)
# 创建 RotatingFileHandler 实例
log_handler = RotatingFileHandler(log_file, maxBytes=max_log_size, backupCount=backup_count, encoding='utf-8')
# 设置日志格式
log_format = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
log_handler.setFormatter(log_format)
# 获取 logger 实例
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 添加 handler
logger.addHandler(log_handler)

# 添加控制台输出
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(log_format)
logger.addHandler(stream_handler)

async def check_tracker(tracker_url, timeout=5):
"""异步检查单个 Tracker 是否可用"""
try:
parsed_url = urllib.parse.urlparse(tracker_url)
if parsed_url.scheme not in ('udp', 'http', 'https'):
logging.warning(f"跳过不支持的协议: {parsed_url.scheme} in {tracker_url}")
return False

if parsed_url.scheme in ('http', 'https'):
return True # 对于 http和 https 协议不做测试直接返回成功。

host = parsed_url.hostname
port = parsed_url.port or 80 # udp默认53, http默认80

loop = asyncio.get_event_loop()

logging.info(f"尝试连接 tracker: {host}:{port} 协议: {parsed_url.scheme}")

future = loop.create_connection(
lambda: asyncio.Protocol(),
host,
port
)

# 使用asyncio.wait_for设置超时
connection_tuple = await asyncio.wait_for(future, timeout)
transport, protocol = connection_tuple
transport.close()
logging.info(f"Tracker {host}:{port} 连接成功")
return True

except asyncio.TimeoutError:
logging.warning(f"Tracker {tracker_url} 连接超时")
return False
except OSError as e:
logging.warning(f"Tracker {tracker_url} 连接错误: {e}")
return False

except Exception as e:
logging.error(f"处理 Tracker {tracker_url} 异常: {e}")
return False


async def check_magnet_link(magnet_link, timeout=5):
"""检查磁力链接是否可用"""
try:
parsed_magnet = urllib.parse.urlparse(magnet_link)
query_params = urllib.parse.parse_qs(parsed_magnet.query)

trackers = query_params.get('tr', []) # 获取所有 trackers
info_hash = query_params.get('xt', [])

if not trackers or not info_hash:
logging.warning(f"磁力链不完整: {magnet_link}")
return False

info_hash = info_hash[0].split(':')[2] if info_hash[0].startswith("urn:btih:") else info_hash[0]
logging.info(f"开始检查磁力链: {magnet_link} ,info_hash: {info_hash}")

tracker_tasks = [check_tracker(tracker, timeout) for tracker in trackers]

results = await asyncio.gather(*tracker_tasks)

if any(results):
logging.info(f"磁力链 {magnet_link} 有效")
return True
else:
logging.info(f"磁力链 {magnet_link} 无效")
return False

except Exception as e:
logging.error(f"处理磁力链 {magnet_link} 异常: {e}")
return False


def _check_ed2k_emule(ed2k_link, emule_path, timeout=20):
""" 同步的 eMule 命令行检测函数,不应直接在asyncio线程调用 """
try:
# 1. 添加 ed2k 链接
add_cmd = [emule_path, "-add", ed2k_link]
add_process = subprocess.run(add_cmd, capture_output=True, text=True, check=False, timeout=10)
if add_process.returncode != 0:
logging.warning(f"添加 ed2k 链接失败:{add_process.stderr.strip()}")
return False
# 2. 获取下载状态
start_time = time.time()
while time.time() - start_time < timeout:
status_cmd = [emule_path, "-show", "-complete"] # show all downloading files
status_process = subprocess.run(status_cmd, capture_output=True, text=True, check=False)
if status_process.returncode == 0:
output = status_process.stdout
# 正则匹配 ed2k 链接 或文件名
pattern = re.compile(re.escape(os.path.basename(ed2k_link)) or re.escape(ed2k_link), re.IGNORECASE)
if pattern.search(output):
# 状态匹配,如果状态存在则可能可用
if "Downloading" in output or "Connecting" in output or "Queued" in output or "Waiting" in output:
return True
else:
logging.warning(f"eMule 链接状态不在预期状态:{output}")
return False
else:
logging.warning(f"eMule 未找到指定链接:{output}")
else:
logging.warning(f"获取 eMule 状态失败:{status_process.stderr.strip()}")
return False
time.sleep(2)
return False
except Exception as e:
logging.error(f"检查 ed2k 链接时发生异常: {e}")
return False


async def check_ed2k_emule(ed2k_link, emule_path, timeout=20):
"""异步的 eMule 命令行检测函数"""
try:
result = await asyncio.to_thread(_check_ed2k_emule, ed2k_link, emule_path, timeout)
return result
except Exception as e:
logging.error(f"调用异步 eMule 检测函数发生异常:{e}")
return False


def _get_column_index(header, column_names):
""" 查找第一个匹配的列名,返回索引,否则返回-1 """
if not header:
return -1
for name in column_names:
try:
return header.index(name)
except ValueError:
continue
return -1


async def process_csv_file(file_path, magnet_column_names, ed2k_column_names=None, emule_path=None, use_ed2k=False,
timeout=5):
"""处理单个 CSV 文件,并验证其中的磁力链接和ed2k链接, 返回可用的磁力链接列表, 保留原始数据"""
logging.info(f"开始处理 CSV 文件: {file_path}")
available_rows = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as csvfile:
reader = csv.reader(csvfile)
header = next(reader, None)
if not header:
logging.warning(f"CSV文件为空: {file_path}")
return available_rows

magnet_column_index = _get_column_index(header, magnet_column_names)
if magnet_column_index == -1:
logging.error(f"磁力链列名列表 '{magnet_column_names}' 在 CSV 文件中未找到: {file_path}")
return available_rows
ed2k_column_index = -1 # 默认值
if use_ed2k and ed2k_column_names:
ed2k_column_index = _get_column_index(header, ed2k_column_names)
if ed2k_column_index == -1:
logging.error(f"ed2k列名列表 '{ed2k_column_names}' 在 CSV 文件中未找到: {file_path}")
use_ed2k = False # 关闭ed2k

link_tasks = []
rows = []
for row in reader:
try:
if len(row) > magnet_column_index and row[magnet_column_index]:
magnet_link = row[magnet_column_index]
link_tasks.append(check_magnet_link(magnet_link, timeout))
rows.append(row) # 存储行数据
elif use_ed2k and len(row) > ed2k_column_index and row[ed2k_column_index]:
ed2k_link = row[ed2k_column_index]
link_tasks.append(check_ed2k_emule(ed2k_link, emule_path))
rows.append(row)
else:
logging.warning(f"跳过无效行,链接索引超出范围或为空, 文件:{file_path}, 行数据: {row}")

except Exception as e:
logging.error(f"跳过无效行,处理行数据异常, 文件:{file_path}, 行数据: {row}, 异常: {e}")
results = await asyncio.gather(*link_tasks)

available_rows = [row for row, is_valid in zip(rows, results) if is_valid]
logging.info(f"CSV 文件处理完毕: {file_path}, 找到了 {len(available_rows)} 个可用的链接")

if header and available_rows: # 将header 插入
available_rows.insert(0, header)
return available_rows

except Exception as e:
logging.error(f"处理CSV文件异常:{file_path}, 异常:{e}")
return []


async def main(directory, magnet_column_names, output_directory, ed2k_column_names=None, emule_path=None,
use_ed2k=False, timeout=5):
"""主函数,遍历目录中的 CSV 文件,验证磁力链接,并将结果按文件名输出到新的 CSV 文件"""
if not os.path.isdir(directory):
logging.error(f"指定的目录不存在: {directory}")
return

if not os.path.exists(output_directory):
os.makedirs(output_directory)

csv_tasks = []
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
if filename.lower().endswith('.csv'):
file_path = os.path.join(dirpath, filename)
csv_tasks.append(
process_csv_file(file_path, magnet_column_names, ed2k_column_names, emule_path, use_ed2k, timeout))

results = await asyncio.gather(*csv_tasks)

csv_files = []
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
if filename.lower().endswith('.csv'):
csv_files.append(os.path.join(dirpath, filename))

for filename, available_rows in zip(
csv_files, results):

output_file_path = os.path.join(output_directory, os.path.relpath(filename, directory)) # 创建相对路径,保留目录结构
os.makedirs(os.path.dirname(output_file_path), exist_ok=True) # 创建输出目录
logging.info(
f"开始写入 {len(available_rows) - 1 if available_rows else 0} 条可用链接到文件:{output_file_path}")
if not available_rows:
logging.info(f"CSV文件 {filename} 中没有可用的链接")
continue # 如果当前文件没有有效链接,则跳过
try:
with open(output_file_path, "w", encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(available_rows) # 写入所有行
logging.info(f"成功写入可用链接到文件: {output_file_path}")
except Exception as e:
logging.error(f"写入文件 {output_file_path} 失败: {e}")


if __name__ == "__main__":
target_directory = r"D:\pythonProject\audio_record_server\src\utils\data\man_link" # 指定 CSV 文件所在的目录
magnet_column_names = ["磁力链", "磁力链接"] # 指定磁力链所在的列名列表
ed2k_column_names = ["磁力链", "磁力链接"] # 指定ed2k 所在的列名列表
output_dir = r"D:\pythonProject\audio_record_server\src\utils\data" # 指定输出目录
link_timeout = 5 # 设置连接超时时间,秒
emule_path = r"C:\Program Files (x86)\eMule\emule.exe" # 配置 eMule 可执行文件路径 (根据你的安装路径修改)
use_ed2k = True # 是否启用ed2k检测

asyncio.run(
main(target_directory, magnet_column_names, output_dir, ed2k_column_names, emule_path, use_ed2k, link_timeout))

检测速率等指标实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# _*_ coding: utf-8 _*_
"""
Time: 2025/1/22 11:02
Author: ZhaoQi Cao(czq)
Version: V 0.1
File: link_benck_mark.py
Describe: Write during the python at zgxmt, Github link: https://github.com/caozhaoqi
link: https://www.fosshub.com/qBittorrent.html#
"""
import asyncio
import logging
import os
import csv
import urllib.parse
import socket
import time
import subprocess
import re
from qbittorrentapi import Client, LoginFailed

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


async def check_tracker(tracker_url, timeout=5):
"""异步检查单个 Tracker 是否可用"""
try:
parsed_url = urllib.parse.urlparse(tracker_url)
if parsed_url.scheme not in ('udp', 'http', 'https'):
logging.warning(f"跳过不支持的协议: {parsed_url.scheme} in {tracker_url}")
return False

if parsed_url.scheme in ('http', 'https'):
return True # 对于 http和 https 协议不做测试直接返回成功。

host = parsed_url.hostname
port = parsed_url.port or 80 # udp默认53, http默认80

loop = asyncio.get_event_loop()

logging.info(f"尝试连接 tracker: {host}:{port} 协议: {parsed_url.scheme}")

future = loop.create_connection(
lambda: asyncio.Protocol(),
host,
port
)

# 使用asyncio.wait_for设置超时
connection_tuple = await asyncio.wait_for(future, timeout)
transport, protocol = connection_tuple
transport.close()
logging.info(f"Tracker {host}:{port} 连接成功")
return True

except asyncio.TimeoutError:
logging.warning(f"Tracker {tracker_url} 连接超时")
return False
except OSError as e:
logging.warning(f"Tracker {tracker_url} 连接错误: {e}")
return False

except Exception as e:
logging.error(f"处理 Tracker {tracker_url} 异常: {e}")
return False


async def check_magnet_link(magnet_link, timeout=5):
"""检查磁力链接是否可用"""
try:
parsed_magnet = urllib.parse.urlparse(magnet_link)
query_params = urllib.parse.parse_qs(parsed_magnet.query)

trackers = query_params.get('tr', []) # 获取所有 trackers
info_hash = query_params.get('xt', [])

if not trackers or not info_hash:
logging.warning(f"磁力链不完整: {magnet_link}")
return False

info_hash = info_hash[0].split(':')[2] if info_hash[0].startswith("urn:btih:") else info_hash[0]
logging.info(f"开始检查磁力链: {magnet_link} ,info_hash: {info_hash}")

tracker_tasks = [check_tracker(tracker, timeout) for tracker in trackers]

results = await asyncio.gather(*tracker_tasks)

if any(results):
logging.info(f"磁力链 {magnet_link} 有效")
return True
else:
logging.info(f"磁力链 {magnet_link} 无效")
return False

except Exception as e:
logging.error(f"处理磁力链 {magnet_link} 异常: {e}")
return False


def _check_ed2k_emule(ed2k_link, emule_path, timeout=20):
""" 同步的 eMule 命令行检测函数,不应直接在asyncio线程调用 """
try:
# 1. 添加 ed2k 链接
add_cmd = [emule_path, "-add", ed2k_link]
add_process = subprocess.run(add_cmd, capture_output=True, text=True, check=False, timeout=10)
if add_process.returncode != 0:
logging.warning(f"添加 ed2k 链接失败:{add_process.stderr.strip()}")
return False
# 2. 获取下载状态
start_time = time.time()
while time.time() - start_time < timeout:
status_cmd = [emule_path, "-show", "-complete"] # show all downloading files
status_process = subprocess.run(status_cmd, capture_output=True, text=True, check=False)
if status_process.returncode == 0:
output = status_process.stdout
# 正则匹配 ed2k 链接 或文件名
pattern = re.compile(re.escape(os.path.basename(ed2k_link)) or re.escape(ed2k_link), re.IGNORECASE)
if pattern.search(output):
# 状态匹配,如果状态存在则可能可用
if "Downloading" in output or "Connecting" in output or "Queued" in output or "Waiting" in output:
return True
else:
logging.warning(f"eMule 链接状态不在预期状态:{output}")
return False
else:
logging.warning(f"eMule 未找到指定链接:{output}")
else:
logging.warning(f"获取 eMule 状态失败:{status_process.stderr.strip()}")
return False
time.sleep(2)
return False
except Exception as e:
logging.error(f"检查 ed2k 链接时发生异常: {e}")
return False


async def check_ed2k_emule(ed2k_link, emule_path, timeout=20):
"""异步的 eMule 命令行检测函数"""
try:
result = await asyncio.to_thread(_check_ed2k_emule, ed2k_link, emule_path, timeout)
return result
except Exception as e:
logging.error(f"调用异步 eMule 检测函数发生异常:{e}")
return False


async def get_bittorrent_info(magnet_link, qbt_host='127.0.0.1', qbt_port=8080, qbt_user='admin',
qbt_pass='adminadmin'):
"""使用 qBittorrent API 获取磁力链接信息 (示例)。"""
try:
client = Client(host=qbt_host, port=qbt_port, username=qbt_user, password=qbt_pass)
client.auth_log_in() # 登录客户端

# 添加磁力链接
torrent = client.torrents_add(magnet_link)

if torrent:
logging.info(f"成功添加磁力链到 qBittorrent: {magnet_link}")
start_time = time.time()
while time.time() - start_time < 30: # 最长等待30秒
# 等待种子添加完成
torrent = client.torrents_info(torrent_hashes=torrent.hash)[0]
if torrent.state not in ("stalledDL", "queuedDL", "checkingDL"):
break
await asyncio.sleep(2) # 防止CPU过高
if torrent.state in ("stalledDL", "queuedDL", "checkingDL"):
logging.warning(f"qBittorrent 添加磁力链后超时: {magnet_link} ,状态:{torrent.state}")
client.torrents_delete(delete_files=True, torrent_hashes=torrent.hash)
return {}

seeders = torrent.num_seeds
peers = torrent.num_peers
progress = torrent.progress
down_speed = torrent.dlspeed
up_speed = torrent.upspeed
# 删除torrent
client.torrents_delete(delete_files=True, torrent_hashes=torrent.hash)
return {
"seeders": seeders,
"peers": peers,
"progress": progress,
"down_speed": down_speed,
"up_speed": up_speed,
}

else:
logging.warning(f"qBittorrent 添加磁力链失败: {magnet_link}")
return {}
except LoginFailed:
logging.error(f"qBittorrent登录失败,请检查地址,用户名和密码:")
return {}
except Exception as e:
logging.error(f"获取 BitTorrent 信息时发生异常: {e}")
return {}


async def get_ed2k_info(ed2k_link, timeout=20):
"""
尝试解析 ed2k 链接,提取文件名,大小。
注意:这仅仅是解析链接信息,无法获取实时下载状态。
"""
try:
if not ed2k_link:
return {}
file_name = None
file_size = None

match = re.match(r'ed2k:\/\/\|file\|(.+?)\|(\d+)\|.+', ed2k_link)
if match:
file_name = match.group(1)
file_size = int(match.group(2))

return {
"file_name": file_name,
"file_size": file_size
}

except Exception as e:
logging.error(f"解析 ed2k 链接失败: {e}")
return {}


def _get_column_index(header, column_names):
""" 查找第一个匹配的列名,返回索引,否则返回-1 """
if not header:
return -1
for name in column_names:
try:
return header.index(name)
except ValueError:
continue
return -1


async def process_csv_file(file_path, magnet_column_names, ed2k_column_names=None, emule_path=None, use_ed2k=False,
qbt_host='127.0.0.1', qbt_port=8080, qbt_user='admin',
qbt_pass='adminadmin', timeout=5):
"""处理单个 CSV 文件,并验证其中的磁力链接和ed2k链接, 返回可用的磁力链接列表, 保留原始数据"""
logging.info(f"开始处理 CSV 文件: {file_path}")
available_rows = []
try:
with open(file_path, 'r', encoding='utf-8', errors='ignore') as csvfile:
reader = csv.reader(csvfile)
header = next(reader, None)
if not header:
logging.warning(f"CSV文件为空: {file_path}")
return available_rows

magnet_column_index = _get_column_index(header, magnet_column_names)
if magnet_column_index == -1:
logging.error(f"磁力链列名列表 '{magnet_column_names}' 在 CSV 文件中未找到: {file_path}")
return available_rows
ed2k_column_index = -1 # 默认值
if use_ed2k and ed2k_column_names:
ed2k_column_index = _get_column_index(header, ed2k_column_names)
if ed2k_column_index == -1:
logging.error(f"ed2k列名列表 '{ed2k_column_names}' 在 CSV 文件中未找到: {file_path}")
use_ed2k = False # 关闭ed2k

link_tasks = []
rows = []
for row in reader:
try:
if len(row) > magnet_column_index and row[magnet_column_index]:
magnet_link = row[magnet_column_index]
link_tasks.append(check_magnet_link(magnet_link, timeout))
rows.append(row) # 存储行数据
elif use_ed2k and len(row) > ed2k_column_index and row[ed2k_column_index]:
ed2k_link = row[ed2k_column_index]
link_tasks.append(check_ed2k_emule(ed2k_link, emule_path))
rows.append(row)
else:
logging.warning(f"跳过无效行,链接索引超出范围或为空, 文件:{file_path}, 行数据: {row}")

except Exception as e:
logging.error(f"跳过无效行,处理行数据异常, 文件:{file_path}, 行数据: {row}, 异常: {e}")
results = await asyncio.gather(*link_tasks)

available_rows = [row for row, is_valid in zip(rows, results) if is_valid]
logging.info(f"CSV 文件处理完毕: {file_path}, 找到了 {len(available_rows)} 个可用的链接")

# 添加额外信息
available_rows_with_info = []
if header and available_rows:
available_rows_with_info.append(
header + ["磁力链种子数", "磁力链节点数", "磁力链下载进度", "磁力链下载速度", "磁力链上传速度",
"ed2k文件名", "ed2k文件大小"])

for row, is_valid in zip(rows, results):
if is_valid:
extra_info = ["N/A", "N/A", "N/A", "N/A", "N/A", "N/A", "N/A"]
if len(row) > magnet_column_index and row[magnet_column_index]:
magnet_link = row[magnet_column_index]
torrent_info = await get_bittorrent_info(magnet_link, qbt_host, qbt_port, qbt_user, qbt_pass)
if torrent_info:
extra_info[0] = torrent_info.get('seeders', 'N/A')
extra_info[1] = torrent_info.get('peers', 'N/A')
extra_info[2] = f"{torrent_info.get('progress', 0) * 100:.2f}%"
extra_info[3] = f"{torrent_info.get('down_speed', 0) / 1024:.2f} KB/s"
extra_info[4] = f"{torrent_info.get('up_speed', 0) / 1024:.2f} KB/s"
if use_ed2k and len(row) > ed2k_column_index and row[ed2k_column_index]:
ed2k_link = row[ed2k_column_index]
ed2k_info = await get_ed2k_info(ed2k_link)
extra_info[5] = ed2k_info.get("file_name", 'N/A')
extra_info[6] = ed2k_info.get("file_size", 'N/A')
available_rows_with_info.append(row + extra_info)
return available_rows_with_info

except Exception as e:
logging.error(f"处理CSV文件异常:{file_path}, 异常:{e}")
return []


async def main(directory, magnet_column_names, output_directory, ed2k_column_names=None, emule_path=None,
use_ed2k=False, qbt_host='127.0.0.1', qbt_port=8080, qbt_user='admin',
qbt_pass='adminadmin', timeout=5):
"""主函数,遍历目录中的 CSV 文件,验证磁力链接,并将结果按文件名输出到新的 CSV 文件"""
if not os.path.isdir(directory):
logging.error(f"指定的目录不存在: {directory}")
return

if not os.path.exists(output_directory):
os.makedirs(output_directory)

csv_tasks = []
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
if filename.lower().endswith('.csv'):
file_path = os.path.join(dirpath, filename)
csv_tasks.append(
process_csv_file(file_path, magnet_column_names, ed2k_column_names, emule_path, use_ed2k, qbt_host,
qbt_port, qbt_user, qbt_pass, timeout))

results = await asyncio.gather(*csv_tasks)

csv_files = []
for dirpath, _, filenames in os.walk(directory):
for filename in filenames:
if filename.lower().endswith('.csv'):
csv_files.append(os.path.join(dirpath, filename))

for filename, available_rows in zip(
csv_files, results):

output_file_path = os.path.join(output_directory, os.path.relpath(filename, directory)) # 创建相对路径,保留目录结构
os.makedirs(os.path.dirname(output_file_path), exist_ok=True) # 创建输出目录
logging.info(
f"开始写入 {len(available_rows) - 1 if available_rows else 0} 条可用链接到文件:{output_file_path}")
if not available_rows:
logging.info(f"CSV文件 {filename} 中没有可用的链接")
continue # 如果当前文件没有有效链接,则跳过
try:
with open(output_file_path, "w", encoding='utf-8', newline='') as f:
writer = csv.writer(f)
writer.writerows(available_rows) # 写入所有行
logging.info(f"成功写入可用链接到文件: {output_file_path}")
except Exception as e:
logging.error(f"写入文件 {output_file_path} 失败: {e}")


if __name__ == "__main__":
target_directory = r"D:\pythonProject\audio_record_server\src\utils\data\man_link" # 指定 CSV 文件所在的目录
magnet_column_names = ["磁力链", "磁力链接"] # 指定磁力链所在的列名列表
ed2k_column_names = ["磁力链", "磁力链接"] # 指定ed2k 所在的列名列表
output_dir = r"D:\pythonProject\audio_record_server\src\utils\data" # 指定输出目录
link_timeout = 5 # 设置连接超时时间,秒
emule_path = r"C:\Program Files (x86)\eMule\emule.exe" # 配置 eMule 可执行文件路径 (根据你的安装路径修改)
use_ed2k = True # 是否启用ed2k检测
qbt_host = '127.0.0.1' # qBittorrent WebUI 地址
qbt_port = 8080 # qBittorrent WebUI 端口
qbt_user = 'admin' # qBittorrent WebUI 用户名
qbt_pass = 'adminadmin' # qBittorrent WebUI 密码

asyncio.run(
main(target_directory, magnet_column_names, output_dir, ed2k_column_names, emule_path, use_ed2k, qbt_host,
qbt_port, qbt_user, qbt_pass, link_timeout))

接入迅雷 自动化下载

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def download_with_thunder(thunder_path, link, output_path=None):
"""使用迅雷下载链接"""
if not thunder_path:
logging.warning("迅雷路径未找到,无法下载")
return False
try:
cmd = [thunder_path, link] # 最基础的下载命令
if output_path:
cmd.insert(1, f"--output={shlex.quote(output_path)}")
subprocess.Popen(cmd, creationflags=subprocess.CREATE_NO_WINDOW)
logging.info(f"已添加迅雷下载任务: {link}")
return True
except Exception as e:
logging.error(f"迅雷下载失败:{e}")
return False


磁力链中关键信息提取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# _*_ coding: utf-8 _*_
"""
Time: 2025/1/22 14:11
Author: ZhaoQi Cao(czq)
Version: V 0.1
File: bit_link_msg_split.py
Describe: Write during the python at zgxmt, Github link: https://github.com/caozhaoqi
"""
import os
import subprocess
import logging
import urllib.parse
import re
import asyncio
from qbittorrentapi import Client, LoginFailed
from qbittorrentapi.torrents import TorrentDictionary
import time # 导入time模块

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


async def open_magnet_link(magnet_link, qbt_host='127.0.0.1', qbt_port=8080, qbt_user='admin', qbt_pass='adminadmin'):
"""使用指定的程序打开磁力链接并提取信息。"""
try:

client = Client(host=qbt_host, port=qbt_port, username=qbt_user, password=qbt_pass)
client.auth_log_in()

torrent = client.torrents_add(magnet_link)
if isinstance(torrent, TorrentDictionary):
torrent_hash = torrent.hash
logging.info(f"成功添加磁力链到 qBittorrent: {magnet_link}, torrent hash:{torrent_hash}")
elif isinstance(torrent, str):
torrent_hash = torrent
logging.warning(f"添加磁力链返回了字符串,尝试使用hash:{magnet_link}, torrent hash:{torrent_hash}")
if torrent in ("Fails", "Ok"):
logging.error(f"添加磁力链失败,返回了Fails或者Ok字符串:{magnet_link}")
return None
else:
logging.error(f"添加磁力链返回未知对象:{type(torrent)}{torrent}")
return None

if torrent_hash:

start_time = time.time()
while time.time() - start_time < 30: # 最长等待30秒
# 等待种子添加完成
try:
torrent = client.torrents_info(torrent_hashes=torrent_hash)[0]
if torrent.state not in ("stalledDL", "queuedDL", "checkingDL"):
break
except Exception as e:
logging.error(f"获取 torrent 信息失败,{e}")
client.torrents_delete(delete_files=True, torrent_hashes=torrent_hash)
return None

await asyncio.sleep(2) # 防止CPU过高
if torrent.state in ("stalledDL", "queuedDL", "checkingDL"):
logging.warning(f"qBittorrent 添加磁力链后超时: {magnet_link} ,状态:{torrent.state}")
client.torrents_delete(delete_files=True, torrent_hashes=torrent_hash)
return None

name = torrent.name
size = torrent.size

# 解析名称
pattern = re.compile(r"^(.*?)\.S(\d+)(?:E(\d+))?.*?(?:(\d{3,4}p))?.*$")
match = pattern.match(name)
if match:
title = match.group(1).replace(".", " ") # 替换点号为空格
season = int(match.group(2)) if match.group(2) else None
episode = int(match.group(3)) if match.group(3) else None
resolution = match.group(4) if match.group(4) else None
else:
title = name
season = None
episode = None
resolution = None
# 删除torrent
client.torrents_delete(delete_files=True, torrent_hashes=torrent_hash)

return {
"title": title.strip(),
"season": season,
"episode": episode,
"resolution": resolution,
"name": name,
"size": size
}
else:
logging.warning(f"qBittorrent 添加磁力链失败,返回hash为空 : {magnet_link}")
return None

except LoginFailed:
logging.error(f"qBittorrent登录失败,请检查地址,用户名和密码:")
return None
except Exception as e:
logging.error(f"使用 qBittorrent 打开磁力链失败,{e}")
return None


def extract_magnet_info_from_string(magnet_link):
"""从磁力链接本身解析信息"""
try:
parsed_magnet = urllib.parse.urlparse(magnet_link)
query_params = urllib.parse.parse_qs(parsed_magnet.query)

if 'dn' not in query_params:
return None

file_name = query_params['dn'][0]

# 正则表达式
pattern = re.compile(r"^(.*?)\.S(\d+)(?:E(\d+))?.*?(?:(\d{3,4}p))?.*$")

match = pattern.match(file_name)

if match:
title = match.group(1).replace(".", " ") # 替换点号为空格
season = int(match.group(2)) if match.group(2) else None
episode = int(match.group(3)) if match.group(3) else None
resolution = match.group(4) if match.group(4) else None
return {
"title": title.strip(),
"season": season,
"episode": episode,
"resolution": resolution,
"name": file_name,
}
else:
return None
except Exception as e:
print(f"解析磁力链接异常:{e}")
return None


if __name__ == '__main__':
magnet_link = "magnet:?xt=urn:btih:362f3486e3ac1df5844305e90fd5cdae01f5ac0c&tr=http://tr.cili001.com:8070/announce&tr=udp://p4p.arenabg.com:1337&tr=udp://tracker.opentrackr.org:1337/announce&tr=udp://open.demonii.com:1337"
qbt_host = '127.0.0.1'
qbt_port = 8080
qbt_user = 'admin'
qbt_pass = 'adminadmin'


async def main():

# 使用 qBittorrent
info_from_qbt = await open_magnet_link(magnet_link, qbt_host, qbt_port, qbt_user, qbt_pass)

if info_from_qbt:
print("通过 qBittorrent 获取信息:")
print(f" 标题: {info_from_qbt.get('title', 'N/A')}")
if info_from_qbt.get('season'):
print(f" 季数: 第 {info_from_qbt['season']} 季")
if info_from_qbt.get('episode'):
print(f" 集数: 第 {info_from_qbt['episode']} 集")
print(f" 清晰度: {info_from_qbt.get('resolution', '未知')}")
print(f" 文件名: {info_from_qbt.get('name', '未知')}")
print(f" 文件大小: {info_from_qbt.get('size', '未知')}")
else:

info_from_magnet = extract_magnet_info_from_string(magnet_link)
if info_from_magnet:
print("通过解析磁力链接获取信息:")
print(f" 标题: {info_from_magnet.get('title', 'N/A')}")
if info_from_magnet.get('season'):
print(f" 季数: 第 {info_from_magnet.get('season')} 季")
if info_from_magnet.get('episode'):
print(f" 集数: 第 {info_from_magnet.get('episode')} 集")
print(f" 清晰度: {info_from_magnet.get('resolution', '未知')}")
print(f" 文件名: {info_from_magnet.get('name', '未知')}")
else:
print("无法提取磁力链信息")


asyncio.run(main())
Prev:
python 可视化打包工具集锦
Next:
在ubuntu服务器搭建 NAS