• Index

aiomysql

Reads: 2100 Edit

#1,python获取当前计算机cpu数量的方法

from multiprocessing import cpu_count
print(cpu_count())

2,aiomysql例子

import asyncio, aiomysql
from loguru import logger
# 单独建立一个py文件,存放数据库相关的信息
#from tests.settings import DB_HOST, DB_NAME, DB_USER, DB_PASSWD, DB_PORT, DB_MAXSIZE, DB_MINSIZE, DB_COMMIT, DB_CHARSET

from faker import Faker

import random
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
from concurrent.futures import as_completed
import threading
from datetime import datetime
from multiprocessing import Pool


DB_HOST =  '127.0.0.1'
DB_USER = 'root'
DB_PASSWD = 'root'
DB_NAME = 'spider'
DB_PORT = 3306
DB_MINSIZE = 1
DB_MAXSIZE = 100
DB_COMMIT = True
DB_CHARSET = 'utf8mb4'



class AsyncMysql(object):
    def __init__(self, loop):
        self._host = DB_HOST
        self._port = DB_PORT
        self._db = DB_NAME
        self._user = DB_USER
        self._password = DB_PASSWD
        self._minsize = DB_MINSIZE
        self._maxsize = DB_MAXSIZE
        self._autocommit = DB_COMMIT
        self._charset = DB_CHARSET
        self._pool = None
        self._loop = loop

    async def get_db_pool(self):
        logger.info('begin create pool')
        self._pool = await aiomysql.create_pool(host=self._host, port=self._port,
                                                user=self._user, password=self._password,
                                                cursorclass=aiomysql.DictCursor,
                                                db=self._db, loop=self._loop, autocommit=self._autocommit,
                                                charset='utf8')
        logger.info('create_aiomysql_pool success')

    '''外面调用这个方法,传sql列表一次500-1000个sql即可'''
    async def exe_sql_task(self, sqltype='insert', sql_list=[]):
        if isinstance(sql_list, list):
            if sqltype == 'insert':
                task_list = [self.exe_sql(sql=sql) for sql in sql_list]
            elif sqltype == 'select':
                task_list = [self.exe_sql(sqltype=sqltype, sql=sql) for sql in sql_list]
            sql_list = None
            if len(task_list) > 0:
                logger.info('start execute sql')
                result, pending = await asyncio.wait(task_list)
                logger.info('execute sql down')
                if pending:
                    logger.info('canceling tasks')
                    logger.error(pending)
                    for t in pending:
                        t.cancel()
                if result:
                    return result
                else:
                    return None
            else:
                logger.info('Error constructing SQL list')
        else:
            logger.error('exe_sql_task方法传入的sql与param应为list')

    async def exe_sql(self, sqltype='insert', sql=''):
        if self._pool is None:
            await self.get_db_pool()
        async with self._pool.acquire() as conn:
            async with conn.cursor() as cur:
                result = None
                try:
                    if sqltype == 'insert':
                        await cur.execute(sql)
                        await conn.commit() # ws
                    elif sqltype == 'select':
                        await cur.execute(sql)
                        result = await cur.fetchall()
                except Exception as ex:
                    if self._autocommit == False:
                        if conn:
                            await conn.rollback()
                    ''' 新增代码将错误记录写入数据库,唯一ID、sql、异常信息三列即可 '''
                    logger.info('SQL:{}\n 执行异常,错误原因为:'.format(sql))
                    logger.error(ex)
                    return None
                return result

    async def __aenter__(self):
        if self._pool is None:
            await self.get_db_pool()
        return self

    async def _close(self):
        if self._pool is not None:
            self._pool.close()
            await self._pool.wait_closed()
            logger.info('close_aiomysql_pool success')

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self._close()


async def exec_insert(event_loop, sqltype='insert', sqllist=[]):
    async with AsyncMysql(event_loop) as ae:
        results = await ae.exe_sql_task(sqltype, sqllist)
    return results


fake = Faker('zh_CN')
def create_fake_data():
    name = fake.name()
    address = str(fake.address()).strip().replace('\n', '').replace('\r', '')
    date = fake.date()
    phone = fake.phone_number()
    age = random.randint(1, 100)
    return name, age, address, date, phone


def insert_demo1():
    from datetime import datetime
    logger.info(' --- begin ---')
    maxline = 500 * 10000
    #sqllist = [f"insert into spider_test(name, age, address) values('li-{i}',{i}, 'bj-{i}' )" for i in range(0, maxline)]
    sqllist = []

    for i in range(0, maxline):
        #name, age, address, date, phone = create_fake_data()
        name, age, address, date, phone = 'wang-'+ str(i) , i, 'bj-' + str(i) , '2021-07-16' , '131231111'
        insert_sql = f"insert into spider_test(name, age, address, date, phone) values('{name}', {age}, '{address}', '{date}', '{phone}')"
        sqllist.append(insert_sql)
        #print(insert_sql)
    logger.info('ready sql for insert')

    x = datetime.now()
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
    event_loop.close()
    logger.info('单次插入' + str(maxline) + '条记录,' +'共耗时' + str(datetime.now() - x))


def insert_demo2():
    from datetime import datetime
    logger.info(' --- begin ---')
    maxline = 500 * 10000
    batch_size = 2000
    sqllist = []

    insert_sql = f"INSERT INTO spider_test(name, age, address, date, phone) VALUES"
    batch_list = []
    idx = 0
    tmp_ls = []

    for i in range(0, maxline):
        #name, age, address, date, phone = create_fake_data()
        name, age, address, date, phone = 'wang-' + str(i), i, 'bj-' + str(i), '2021-07-16', '131231111'
        value_sql = f"('{name}',{age},'{address}','{date}','{phone}' )"
        idx += 1

        if idx < batch_size :
            tmp_ls.append(value_sql)
        elif idx == batch_size :
            # 满一个批次进行处理操作
            tmp_ls.append(value_sql)
            result_sql = insert_sql + ",".join(tmp_ls)
            sqllist.append(result_sql)
            #print('* inner batch tmp_ls=', ','.join(tmp_ls))
            tmp_ls.clear()
            idx = 0

    if tmp_ls:
        #print('* outer batch tmp_ls=', ','.join(tmp_ls))
        result_sql = insert_sql + ",".join(tmp_ls)
        sqllist.append(result_sql)

    logger.info('ready sql for insert')
    #print(sqllist)

    x = datetime.now()
    event_loop = asyncio.get_event_loop()

    event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
    event_loop.close()
    logger.info('批次插入' + str(maxline) + '条记录,'+ str(batch_size) +'条/批' + '共耗时' + str(datetime.now() - x))

def insert_demo3_1():
    from datetime import datetime
    logger.info(' --- begin ---')
    maxline = 500 * 10000
    batch_size = 2000
    sqllist = []

    insert_sql = f"INSERT INTO spider_test(name, age, address, date, phone) VALUES"
    batch_list = []
    idx = 0
    tmp_ls = []

    for i in range(0, maxline):
        #name, age, address, date, phone = create_fake_data()
        name, age, address, date, phone = 'wang-' + str(i), i, 'bj-' + str(i), '2021-07-16', '131231111'
        value_sql = f"('{name}',{age},'{address}','{date}','{phone}' )"
        idx += 1

        if idx < batch_size :
            tmp_ls.append(value_sql)
        elif idx == batch_size :
            # 满一个批次进行处理操作
            tmp_ls.append(value_sql)
            result_sql = insert_sql + ",".join(tmp_ls)
            sqllist.append(result_sql)
            #print('* inner batch tmp_ls=', ','.join(tmp_ls))
            tmp_ls.clear()
            idx = 0

    if tmp_ls:
        #print('* outer batch tmp_ls=', ','.join(tmp_ls))
        result_sql = insert_sql + ",".join(tmp_ls)
        sqllist.append(result_sql)

    logger.info('ready sql for insert')
    #print(sqllist)

    x = datetime.now()
    #event_loop = asyncio.get_event_loop()

    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
    event_loop.close()
    logger.info('批次插入' + str(maxline) + '条记录,'+ str(batch_size) +'条/批' + '共耗时' + str(datetime.now() - x))

def insert_demo3_2():
    from datetime import datetime
    logger.info(' --- begin ---')
    maxline = 500 * 10000
    batch_size = 2000
    sqllist = []

    insert_sql = f"INSERT INTO spider_test2(name, age, address, date, phone) VALUES"
    batch_list = []
    idx = 0
    tmp_ls = []

    for i in range(0, maxline):
        #name, age, address, date, phone = create_fake_data()
        name, age, address, date, phone = 'wang-' + str(i), i, 'bj-' + str(i), '2021-07-16', '131231111'
        value_sql = f"('{name}',{age},'{address}','{date}','{phone}' )"
        idx += 1

        if idx < batch_size :
            tmp_ls.append(value_sql)
        elif idx == batch_size :
            # 满一个批次进行处理操作
            tmp_ls.append(value_sql)
            result_sql = insert_sql + ",".join(tmp_ls)
            sqllist.append(result_sql)
            #print('* inner batch tmp_ls=', ','.join(tmp_ls))
            tmp_ls.clear()
            idx = 0

    if tmp_ls:
        #print('* outer batch tmp_ls=', ','.join(tmp_ls))
        result_sql = insert_sql + ",".join(tmp_ls)
        sqllist.append(result_sql)

    logger.info('ready sql for insert')
    #print(sqllist)

    x = datetime.now()
    #event_loop = asyncio.get_event_loop()

    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)

    event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
    event_loop.close()
    logger.info('批次插入' + str(maxline) + '条记录,'+ str(batch_size) +'条/批' + '共耗时' + str(datetime.now() - x))

def select_demo1():
    from datetime import datetime
    logger.info(' --- begin ---')
    #sqllist = ['select * from spider_test limit 2' for i in range(5)]
    sqllist =['select * from spider_test limit 0,2' , 'select * from spider_test limit 3,3']
    x = datetime.now()
    event_loop = asyncio.get_event_loop()
    task = event_loop.create_task(exec_insert(event_loop, sqltype='select', sqllist=sqllist))
    event_loop.run_until_complete(task)

    event_loop.close()
    logger.info('共耗时' + str(datetime.now() - x))

    results = task.result()
    if results:
        for rs in results:
            if rs.result():
                for item in rs.result():
                    logger.info(item)



def select_demo2():
    from datetime import datetime
    logger.info(' --- begin ---')
    sqllist = ['select count(*) as count  from ( select * from spider_test limit 100) t ' ]
    x = datetime.now()
    event_loop = asyncio.get_event_loop()
    task = event_loop.create_task(exec_insert(event_loop, sqltype='select', sqllist=sqllist))
    event_loop.run_until_complete(task)

    event_loop.close()
    logger.info('共耗时' + str(datetime.now() - x))

    results = task.result()
    count_num = -1
    if results:
        for rs in results:
            if rs.result():
                for item in rs.result():
                    count_num = item['count']
                    break

    logger.info(f'count_num={count_num}')


def example():
    from datetime import datetime

    x = datetime.now()
    sqllist = [f"insert into sales values({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20)" for  i in range(0,1000000)]
    sqllist = ['select * from xxx', 'select * from xxx']
    event_loop = asyncio.get_event_loop()
    event_loop.run_until_complete(exec_insert(event_loop, sqltype='select', sqllist=sqllist))
    logger.info('共耗时' + str(datetime.now() - x))


executor1 = ThreadPoolExecutor(5)
executor2 = ThreadPoolExecutor(10)

def main1():
    x = datetime.now()
    future_set = set()
    future1 = executor1.submit(insert_demo3_1)
    future2 = executor2.submit(insert_demo3_2)
    future_set.add(future1)
    future_set.add(future2)

    for future in as_completed(future_set):
        error = future.exception()
        if error is not None:
            logger.error(error)

    # 测试例子1
    # tasks = [insert_demo3, insert_demo3 ]
    # future_set = set()
    # x = datetime.now()
    # with ThreadPoolExecutor(len(tasks)) as executor:
    #     for task in tasks:
    #         future = executor.submit(task)
    #         future_set.add(future)

    # for future in as_completed(future_set):
    #     error = future.exception()
    #     if error is not None:
    #         logger.error(error)
    #
    # results = []
    # for future in future_set:
    #     results.append(future.result())
    # logger.info('* results => ', results )

    # 测试例子2
    # x = datetime.now()
    # thr_ls = []
    # for i in range(5):
    #     thr = threading.Thread(target=insert_demo3)
    #     thr.setDaemon(True)
    #     thr.start()
    #     thr_ls.append(thr)
    #
    # for thr in thr_ls:
    #     thr.join()

    logger.info('--- 主线程 结束---')
    logger.info('*** 共耗时' + str(datetime.now() - x))


def main2():
    x = datetime.now()
    pool = Pool(processes=4)
    pool.apply_async(insert_demo3_1)
    pool.apply_async(insert_demo3_2)
    pool.close()
    pool.join()
    logger.info('*** 共耗时' + str(datetime.now() - x))


if __name__ == '__main__':
    #insert_demo1()
    #insert_demo2()
    main1()
    #main2()

    #select_demo1()
    #select_demo2()






Comments

Make a comment

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。
  • Index
aaaaa