python中使用自定义线程
Published in:2024-05-07 |
Words: 1k | Reading time: 4min | reading:

python中使用自定义线程

背景

Python中的多线程是一种编程模型,它允许在单个进程中同时执行多个线程。线程是程序执行流的最小单元,是进程内的一条执行路径。Python中的多线程主要是利用threading模块来实现的。

全局解释器锁(GIL):这是Python多线程模型中最重要的一部分。由于Python的设计,同一时间只能有一个线程在执行Python字节码。这意味着即使使用多线程,CPU密集型任务的执行速度也可能不会显著提高。但是,多线程在进行I/O密集型操作时(如文件读写、网络通信等)仍然非常有用,因为它们可以在一个线程等待I/O操作完成时让另一个线程继续执行。
线程创建与销毁:在Python中,创建和销毁线程的开销相对较小,因此可以创建大量的线程来处理并发任务。
线程同步:为了协调多个线程的执行,Python提供了几种线程同步机制,包括锁(Lock)、条件变量(Condition)、信号量(Semaphore)等。

守护线程(Daemon Threads):Python中的线程可以被设置为守护线程。当主线程结束时,守护线程也会随之结束,即使它们还没有完成执行。

自定义python线程

  • 代码示例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
import sys


sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

import threading
import time

from loguru import logger
from run import constants


class SISThreading(threading.Thread):
def __init__(self, target, args=(), kwargs=None, *args_thread, **kwargs_thread):
"""

:param target:
:param args:
:param kwargs:
:param args_thread:
:param kwargs_thread:
"""
super(SISThreading, self).__init__(*args_thread, **kwargs_thread)
if kwargs is None:
kwargs = {}
self.target = target # 存储目标函数
self.args = args # 存储位置参数
self.kwargs = kwargs # 存储关键字参数
self.__flag = threading.Event() # 用于暂停线程的标识
self.__flag.set() # 设置为True
self.__running = threading.Event() # 用于停止线程的标识
self.__running.set() # 将running设置为True

def run(self):
"""

:return:
"""
while self.__running.is_set():
if constants.stop_spider_url_flag:
logger.warning("stop spider url in threading run.")
break
logger.info("thread is running!")
self.__flag.wait() # 为True时立即返回, 为False时阻塞直到self.__flag为True后返回
# logger.debug("run = ", time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# 调用目标函数并传递参数
self.target(*self.args, **self.kwargs)
time.sleep(constants.search_delta_time)

def pause(self):
"""

:return:
"""
logger.error("thread is pause")
self.__flag.clear() # 设置为False, 让线程阻塞

def resume(self):
"""

:return:
"""
logger.warning("thread is resume")
self.__flag.set() # 设置为True, 让线程停止阻塞

def stop(self):
"""

:return:
"""
logger.error("thread is stop")
self.__flag.set() # 将线程从暂停状态恢复, 如果已经暂停的话
self.__running.clear() # 设置为False

def is_running(self):
"""检查线程是否正在运行"""
return self.__running.is_set()
  • 调用实现线程暂停,恢复运行
1
2
3
4
5
6
7
8
def test:
log_mon_war_thread_obj = threading.Thread(
target=log_mon_war,
args=(spider_thread_obj,))
log_mon_war_thread_obj.start()
spider_thread_obj.pause()
spider_thread_obj.resume()
return

线程间通信

  • 通过队列实现

demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

import queue
import threading
import time

# 创建一个队列
q = queue.Queue()

# 生产者线程
def producer():
for i in range(5):
print(f"生产者生产了 {i}")
q.put(i)
time.sleep(1)

# 消费者线程
def consumer():
while True:
item = q.get()
if item is None:
break
print(f"消费者消费了 {item}")
time.sleep(2)

# 创建线程
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

# 启动线程
t1.start()
t2.start()

# 等待线程结束
t1.join()

# 向队列中添加一个 None,表示结束信号
q.put(None)

t2.join()
  • test result
1
2
3
4
5
6
7
8
9
10
11
12
13
14
./test/multi_threading_queue.py
2024-05-07 15:47:04: 生产者生产了 0
2024-05-07 15:47:04: 消费者消费了 0
2024-05-07 15:47:05: 生产者生产了 1
2024-05-07 15:47:06: 消费者消费了 1
2024-05-07 15:47:06: 生产者生产了 2
2024-05-07 15:47:07: 生产者生产了 3
2024-05-07 15:47:08: 消费者消费了 2
2024-05-07 15:47:08: 生产者生产了 4
2024-05-07 15:47:10: 消费者消费了 3
2024-05-07 15:47:12: 消费者消费了 4

Process finished with exit code 0

Prev:
在Python中使用loguru库捕获异常出现bug的修正方法
Next:
python使用setuptools和nsis工具打包nuitka工程