python中使用自定义线程
背景
Python中的多线程是一种编程模型,它允许在单个进程中同时执行多个线程。线程是程序执行流的最小单元,是进程内的一条执行路径。Python中的多线程主要是利用threading模块来实现的。
全局解释器锁(GIL):这是Python多线程模型中最重要的一部分。由于Python的设计,同一时间只能有一个线程在执行Python字节码。这意味着即使使用多线程,CPU密集型任务的执行速度也可能不会显著提高。但是,多线程在进行I/O密集型操作时(如文件读写、网络通信等)仍然非常有用,因为它们可以在一个线程等待I/O操作完成时让另一个线程继续执行。
线程创建与销毁:在Python中,创建和销毁线程的开销相对较小,因此可以创建大量的线程来处理并发任务。
线程同步:为了协调多个线程的执行,Python提供了几种线程同步机制,包括锁(Lock)、条件变量(Condition)、信号量(Semaphore)等。
守护线程(Daemon Threads):Python中的线程可以被设置为守护线程。当主线程结束时,守护线程也会随之结束,即使它们还没有完成执行。
自定义python线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| import os import sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import threading import time
from loguru import logger from run import constants
class SISThreading(threading.Thread): def __init__(self, target, args=(), kwargs=None, *args_thread, **kwargs_thread): """
:param target: :param args: :param kwargs: :param args_thread: :param kwargs_thread: """ super(SISThreading, self).__init__(*args_thread, **kwargs_thread) if kwargs is None: kwargs = {} self.target = target self.args = args self.kwargs = kwargs self.__flag = threading.Event() self.__flag.set() self.__running = threading.Event() self.__running.set()
def run(self): """
:return: """ while self.__running.is_set(): if constants.stop_spider_url_flag: logger.warning("stop spider url in threading run.") break logger.info("thread is running!") self.__flag.wait() self.target(*self.args, **self.kwargs) time.sleep(constants.search_delta_time)
def pause(self): """
:return: """ logger.error("thread is pause") self.__flag.clear()
def resume(self): """
:return: """ logger.warning("thread is resume") self.__flag.set()
def stop(self): """
:return: """ logger.error("thread is stop") self.__flag.set() self.__running.clear()
def is_running(self): """检查线程是否正在运行""" return self.__running.is_set()
|
1 2 3 4 5 6 7 8
| def test: log_mon_war_thread_obj = threading.Thread( target=log_mon_war, args=(spider_thread_obj,)) log_mon_war_thread_obj.start() spider_thread_obj.pause() spider_thread_obj.resume() return
|
线程间通信
demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import queue import threading import time
q = queue.Queue()
def producer(): for i in range(5): print(f"生产者生产了 {i}") q.put(i) time.sleep(1)
def consumer(): while True: item = q.get() if item is None: break print(f"消费者消费了 {item}") time.sleep(2)
t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer)
t1.start() t2.start()
t1.join()
q.put(None)
t2.join()
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| ./test/multi_threading_queue.py 2024-05-07 15:47:04: 生产者生产了 0 2024-05-07 15:47:04: 消费者消费了 0 2024-05-07 15:47:05: 生产者生产了 1 2024-05-07 15:47:06: 消费者消费了 1 2024-05-07 15:47:06: 生产者生产了 2 2024-05-07 15:47:07: 生产者生产了 3 2024-05-07 15:47:08: 消费者消费了 2 2024-05-07 15:47:08: 生产者生产了 4 2024-05-07 15:47:10: 消费者消费了 3 2024-05-07 15:47:12: 消费者消费了 4
Process finished with exit code 0
|