• Index

多线程

Reads: 526 Edit

查看运行的线程

# -*- coding: utf-8 -*-

import threading
import time


def thread_job():
    print('This is an added Thread, number is %s' % threading.current_thread())
    time.sleep(5)
    print('thread job end')


def main():
    addd_thread = threading.Thread(target=thread_job, name='thr')
    addd_thread.start()
    print(threading.active_count())  # 线程的数量
    print(threading.enumerate())  # 一共有多少个线程
    print(threading.current_thread())  # 现在运行的线程


if __name__ == '__main__':
    main()

nest-asyncio的Python项目详细描述

按设计异步does not allow 其事件循环将被嵌套。这就提出了一个实际问题: 在事件循环为 已经在运行无法运行任务并等待 为了结果。试图这样做会导致错误 “RuntimeError: This event loop is already running”。

这个问题会在不同的环境中出现,比如Web服务器, gui应用程序和jupyter笔记本。

此模块修补asyncio以允许嵌套使用asyncio.run和 loop.run_until_complete。

pip3 install nest_asyncio

协程池

import asyncio
import aiohttp
import time
import nest_asyncio
import queue
from threading import Thread


class AsyncPool(object):
    """
    1. 支持动态添加任务
    2. 支持自动停止事件循环
    3. 支持最大协程数
    """

    def __init__(self, loop=None, maxsize=0):
        """
        初始化
        :param loop:
        :param maxsize: 默认0,不限制队列
        """
        # 在jupyter需要这个,不然asyncio运行出错
        nest_asyncio.apply()

        # 获取一个事件循环
        if not loop:
            self.loop = asyncio.new_event_loop()

        # 队列,先进先出,根据队列是否为空判断,退出协程
        self.q = queue.Queue(maxsize)

        self.loop_thread = None
        if self.loop:
            self.start_thread_loop()

    def add(self, item=1):
        """
        添加任务
        :param item:
        :return:
        """
        self.q.put(item)

    def done(self, fn):
        """
        任务完成
        回调函数
        :param fn:
        :return:
        """
        if fn:
            pass
        self.q.get()
        self.q.task_done()

    def wait(self):
        """
        等待任务执行完毕
        :return:
        """
        self.q.join()

    @staticmethod
    def _start_thread_loop(loop):
        """
        运行事件循环
        :param loop: loop以参数的形式传递进来运行
        :return:
        """
        # 将当前上下文的事件循环设置为循环。
        asyncio.set_event_loop(loop)
        # 开始事件循环
        loop.run_forever()

    def start_thread_loop(self):
        """
        运行事件循环
        :return:
        """
        self.loop_thread = Thread(target=self._start_thread_loop, args=(self.loop,))
        # 设置守护进程
        self.loop_thread.setDaemon(True)
        # 运行线程,同时协程事件循环也会运行
        self.loop_thread.start()

    def stop_thread_loop(self, loop_time=1):
        """
        队列为空,则关闭线程
        :param loop_time:
        :return:
        """

        async def _close_thread_loop():
            """
            关闭线程
            :return:
            """
            while True:
                if self.q.empty():
                    self.loop.stop()
                    break
                await asyncio.sleep(loop_time)

        # 等待关闭线程
        asyncio.run_coroutine_threadsafe(_close_thread_loop(), self.loop)

    def submit(self, func, callback=None):
        """
        提交任务到事件循环
        :param func: 异步函数对象
        :param callback: 回调函数
        :return:
        """
        # 将协程注册一个到运行在线程中的循环,thread_loop 会获得一个环任务
        # 注意:run_coroutine_threadsafe 这个方法只能用在运行在线程中的循环事件使用
        future = asyncio.run_coroutine_threadsafe(func, self.loop)

        # 回调函数封装
        def callback_done(_future):
            try:
                if callback:
                    callback(_future)
            finally:
                self.done(_future)

        # 添加回调函数
        future.add_done_callback(callback_done)

    def release(self, loop_time=1):
        """
        释放线程
        :param loop_time:
        :return:
        """
        self.stop_thread_loop(loop_time)

    def running(self):
        """
        获取当前线程数
        :return:
        """
        return self.q.qsize()


async def thread_example(i):
    url = "http://127.0.0.1:8080/app04/async4?num={}".format(i)
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as res:
            # print(res.status)
            # print(res.content)
            return await res.text()


def my_callback(future):
    result = future.result()
    print('返回值: ', result)


def main():
    # 任务组, 最大协程数
    pool = AsyncPool(maxsize=10000)

    # 插入任务任务
    for i in range(100000):
        pool.add()
        pool.submit(thread_example(i), my_callback)

    # 停止事件循环
    pool.release()

    # 等待
    pool.wait()

    print("等待子线程结束...")


if __name__ == '__main__':
    start_time = time.time()
    main()
    end_time = time.time()
    print("run time: ", end_time - start_time)


Comments

Make a comment

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。
  • Index
aaaaa