zl程序教程

您现在的位置是:首页 >  后端

当前栏目

python使用多线程来执行函数

Python多线程执行 函数 使用
2023-09-11 14:15:15 时间

使用多线程来高效执行程序!

示例代码1:

generate_data.py

import random

for i in range(100):
    a = random.randint(10, 99)
    b = random.randint(10, 99)
    data = f'第{i+1}条数据\n'
    with open('text.txt', 'a', encoding='utf-8') as f:
        f.write(data)

main.py

import threading
import time
from queue import Queue


# 定义消费者
class Consumer(threading.Thread):

    def run(self) -> None:
        global queue
        while queue.qsize() > 0:
            msg = self.name + "消费了" + queue.get().replace('\n', '')
            print(msg)
            time.sleep(1)


def read_file():
    file_data = open('text.txt', 'r', encoding='utf-8')
    global queue
    for line in file_data:
        queue.put(line)
    file_data.close()


if __name__ == '__main__':
    queue = Queue()
    read_file()
    consumer_lst = []
    for _ in range(5):
        consumer = Consumer()
        consumer.start()
        consumer_lst.append(consumer)

    for consumer in consumer_lst:
        consumer.join()
    print("程序执行完毕!!!")

运行结果:

        在上面输出的结果过,使用print()打印的数据比较混乱,比较影响观看效果。这时对混乱输出可以做一定的处理。

        python多线程时并发并非并行,线程之间并非严格遵守顺序,这就会造成线程不安全的情况,例如print是自动添加换行的,在换行时可能线程不安全,导致换行和下一句输出发生混乱。

解决方案:

  • 手动输入换行符,例如print(url+'\n',end=''),以空字符结尾,在输出内容后面主动加入换行符,这样就不会存在错位的问题。
  • 对 print() 加锁

示例代码2:

import threading
import time
from queue import Queue

mutex = threading.RLock()


# 定义消费者
class Consumer(threading.Thread):

    # 定义多线程有序输出
    def order_output(self, data):
        print("+" * 10)
        print(data)
        print("*" * 10)

    def run(self) -> None:
        global queue
        while queue.qsize() > 0:
            msg = self.name + "消费了" + queue.get().replace('\n', '')
            # print(msg)
            # 方法一:手动输入换行符
            # print(msg + '\n', end='')
            # 方法二:使用锁
            with mutex:
                self.order_output(msg)
            time.sleep(1)


def read_file():
    file_data = open('text.txt', 'r', encoding='utf-8')
    global queue
    for line in file_data:
        queue.put(line)
    file_data.close()


if __name__ == '__main__':
    queue = Queue()
    read_file()
    consumer_lst = []
    for _ in range(5):
        consumer = Consumer()
        consumer.start()
        consumer_lst.append(consumer)

    for consumer in consumer_lst:
        consumer.join()
    print("程序执行完毕!!!")

运行结果:

有时候想知道程序执行的速度,我们可以添加进度条来显示出来。可以借助tqdm库来实现。

示例代码3:

import threading
import time
from queue import Queue
from tqdm import tqdm

mutex = threading.RLock()


# 定义消费者
class Consumer(threading.Thread):

    # 定义多线程有序输出
    def order_output(self, data):
        print("+" * 10)
        print(data)
        print("*" * 10)

    def run(self) -> None:
        global queue
        while queue.qsize() > 0:
            pbar.update(1)
            msg = self.name + "消费了" + queue.get().replace('\n', '')
            # print(msg)
            # 方法一:手动输入换行符
            print(msg + '\n', end='')
            # 方法二:使用锁
            # with mutex:
            #     self.order_output(msg)
            time.sleep(5)


def read_file():
    file_data = open('text.txt', 'r', encoding='utf-8')
    global queue
    total = 0
    for line in file_data:
        queue.put(line)
        total += 1
    file_data.close()
    return total


if __name__ == '__main__':
    queue = Queue()
    total = read_file()
    # 使用tqdm进度条
    pbar = tqdm(total=total, desc='Consumer')
    pbar.set_postfix(aa=total, bb=total - 1)
    consumer_lst = []
    for _ in range(5):
        consumer = Consumer()
        consumer.start()
        consumer_lst.append(consumer)

    for consumer in consumer_lst:
        consumer.join()
    print("程序执行完毕!!!")

运行结果: