#1,python获取当前计算机cpu数量的方法
from multiprocessing import cpu_count
print(cpu_count())
2,aiomysql例子
import asyncio, aiomysql
from loguru import logger
# 单独建立一个py文件,存放数据库相关的信息
#from tests.settings import DB_HOST, DB_NAME, DB_USER, DB_PASSWD, DB_PORT, DB_MAXSIZE, DB_MINSIZE, DB_COMMIT, DB_CHARSET
from faker import Faker
import random
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED
from concurrent.futures import as_completed
import threading
from datetime import datetime
from multiprocessing import Pool
DB_HOST = '127.0.0.1'
DB_USER = 'root'
DB_PASSWD = 'root'
DB_NAME = 'spider'
DB_PORT = 3306
DB_MINSIZE = 1
DB_MAXSIZE = 100
DB_COMMIT = True
DB_CHARSET = 'utf8mb4'
class AsyncMysql(object):
def __init__(self, loop):
self._host = DB_HOST
self._port = DB_PORT
self._db = DB_NAME
self._user = DB_USER
self._password = DB_PASSWD
self._minsize = DB_MINSIZE
self._maxsize = DB_MAXSIZE
self._autocommit = DB_COMMIT
self._charset = DB_CHARSET
self._pool = None
self._loop = loop
async def get_db_pool(self):
logger.info('begin create pool')
self._pool = await aiomysql.create_pool(host=self._host, port=self._port,
user=self._user, password=self._password,
cursorclass=aiomysql.DictCursor,
db=self._db, loop=self._loop, autocommit=self._autocommit,
charset='utf8')
logger.info('create_aiomysql_pool success')
'''外面调用这个方法,传sql列表一次500-1000个sql即可'''
async def exe_sql_task(self, sqltype='insert', sql_list=[]):
if isinstance(sql_list, list):
if sqltype == 'insert':
task_list = [self.exe_sql(sql=sql) for sql in sql_list]
elif sqltype == 'select':
task_list = [self.exe_sql(sqltype=sqltype, sql=sql) for sql in sql_list]
sql_list = None
if len(task_list) > 0:
logger.info('start execute sql')
result, pending = await asyncio.wait(task_list)
logger.info('execute sql down')
if pending:
logger.info('canceling tasks')
logger.error(pending)
for t in pending:
t.cancel()
if result:
return result
else:
return None
else:
logger.info('Error constructing SQL list')
else:
logger.error('exe_sql_task方法传入的sql与param应为list')
async def exe_sql(self, sqltype='insert', sql=''):
if self._pool is None:
await self.get_db_pool()
async with self._pool.acquire() as conn:
async with conn.cursor() as cur:
result = None
try:
if sqltype == 'insert':
await cur.execute(sql)
await conn.commit() # ws
elif sqltype == 'select':
await cur.execute(sql)
result = await cur.fetchall()
except Exception as ex:
if self._autocommit == False:
if conn:
await conn.rollback()
''' 新增代码将错误记录写入数据库,唯一ID、sql、异常信息三列即可 '''
logger.info('SQL:{}\n 执行异常,错误原因为:'.format(sql))
logger.error(ex)
return None
return result
async def __aenter__(self):
if self._pool is None:
await self.get_db_pool()
return self
async def _close(self):
if self._pool is not None:
self._pool.close()
await self._pool.wait_closed()
logger.info('close_aiomysql_pool success')
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self._close()
async def exec_insert(event_loop, sqltype='insert', sqllist=[]):
async with AsyncMysql(event_loop) as ae:
results = await ae.exe_sql_task(sqltype, sqllist)
return results
fake = Faker('zh_CN')
def create_fake_data():
name = fake.name()
address = str(fake.address()).strip().replace('\n', '').replace('\r', '')
date = fake.date()
phone = fake.phone_number()
age = random.randint(1, 100)
return name, age, address, date, phone
def insert_demo1():
from datetime import datetime
logger.info(' --- begin ---')
maxline = 500 * 10000
#sqllist = [f"insert into spider_test(name, age, address) values('li-{i}',{i}, 'bj-{i}' )" for i in range(0, maxline)]
sqllist = []
for i in range(0, maxline):
#name, age, address, date, phone = create_fake_data()
name, age, address, date, phone = 'wang-'+ str(i) , i, 'bj-' + str(i) , '2021-07-16' , '131231111'
insert_sql = f"insert into spider_test(name, age, address, date, phone) values('{name}', {age}, '{address}', '{date}', '{phone}')"
sqllist.append(insert_sql)
#print(insert_sql)
logger.info('ready sql for insert')
x = datetime.now()
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
event_loop.close()
logger.info('单次插入' + str(maxline) + '条记录,' +'共耗时' + str(datetime.now() - x))
def insert_demo2():
from datetime import datetime
logger.info(' --- begin ---')
maxline = 500 * 10000
batch_size = 2000
sqllist = []
insert_sql = f"INSERT INTO spider_test(name, age, address, date, phone) VALUES"
batch_list = []
idx = 0
tmp_ls = []
for i in range(0, maxline):
#name, age, address, date, phone = create_fake_data()
name, age, address, date, phone = 'wang-' + str(i), i, 'bj-' + str(i), '2021-07-16', '131231111'
value_sql = f"('{name}',{age},'{address}','{date}','{phone}' )"
idx += 1
if idx < batch_size :
tmp_ls.append(value_sql)
elif idx == batch_size :
# 满一个批次进行处理操作
tmp_ls.append(value_sql)
result_sql = insert_sql + ",".join(tmp_ls)
sqllist.append(result_sql)
#print('* inner batch tmp_ls=', ','.join(tmp_ls))
tmp_ls.clear()
idx = 0
if tmp_ls:
#print('* outer batch tmp_ls=', ','.join(tmp_ls))
result_sql = insert_sql + ",".join(tmp_ls)
sqllist.append(result_sql)
logger.info('ready sql for insert')
#print(sqllist)
x = datetime.now()
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
event_loop.close()
logger.info('批次插入' + str(maxline) + '条记录,'+ str(batch_size) +'条/批' + '共耗时' + str(datetime.now() - x))
def insert_demo3_1():
from datetime import datetime
logger.info(' --- begin ---')
maxline = 500 * 10000
batch_size = 2000
sqllist = []
insert_sql = f"INSERT INTO spider_test(name, age, address, date, phone) VALUES"
batch_list = []
idx = 0
tmp_ls = []
for i in range(0, maxline):
#name, age, address, date, phone = create_fake_data()
name, age, address, date, phone = 'wang-' + str(i), i, 'bj-' + str(i), '2021-07-16', '131231111'
value_sql = f"('{name}',{age},'{address}','{date}','{phone}' )"
idx += 1
if idx < batch_size :
tmp_ls.append(value_sql)
elif idx == batch_size :
# 满一个批次进行处理操作
tmp_ls.append(value_sql)
result_sql = insert_sql + ",".join(tmp_ls)
sqllist.append(result_sql)
#print('* inner batch tmp_ls=', ','.join(tmp_ls))
tmp_ls.clear()
idx = 0
if tmp_ls:
#print('* outer batch tmp_ls=', ','.join(tmp_ls))
result_sql = insert_sql + ",".join(tmp_ls)
sqllist.append(result_sql)
logger.info('ready sql for insert')
#print(sqllist)
x = datetime.now()
#event_loop = asyncio.get_event_loop()
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
event_loop.close()
logger.info('批次插入' + str(maxline) + '条记录,'+ str(batch_size) +'条/批' + '共耗时' + str(datetime.now() - x))
def insert_demo3_2():
from datetime import datetime
logger.info(' --- begin ---')
maxline = 500 * 10000
batch_size = 2000
sqllist = []
insert_sql = f"INSERT INTO spider_test2(name, age, address, date, phone) VALUES"
batch_list = []
idx = 0
tmp_ls = []
for i in range(0, maxline):
#name, age, address, date, phone = create_fake_data()
name, age, address, date, phone = 'wang-' + str(i), i, 'bj-' + str(i), '2021-07-16', '131231111'
value_sql = f"('{name}',{age},'{address}','{date}','{phone}' )"
idx += 1
if idx < batch_size :
tmp_ls.append(value_sql)
elif idx == batch_size :
# 满一个批次进行处理操作
tmp_ls.append(value_sql)
result_sql = insert_sql + ",".join(tmp_ls)
sqllist.append(result_sql)
#print('* inner batch tmp_ls=', ','.join(tmp_ls))
tmp_ls.clear()
idx = 0
if tmp_ls:
#print('* outer batch tmp_ls=', ','.join(tmp_ls))
result_sql = insert_sql + ",".join(tmp_ls)
sqllist.append(result_sql)
logger.info('ready sql for insert')
#print(sqllist)
x = datetime.now()
#event_loop = asyncio.get_event_loop()
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
event_loop.run_until_complete(exec_insert(event_loop, sqltype='insert', sqllist=sqllist))
event_loop.close()
logger.info('批次插入' + str(maxline) + '条记录,'+ str(batch_size) +'条/批' + '共耗时' + str(datetime.now() - x))
def select_demo1():
from datetime import datetime
logger.info(' --- begin ---')
#sqllist = ['select * from spider_test limit 2' for i in range(5)]
sqllist =['select * from spider_test limit 0,2' , 'select * from spider_test limit 3,3']
x = datetime.now()
event_loop = asyncio.get_event_loop()
task = event_loop.create_task(exec_insert(event_loop, sqltype='select', sqllist=sqllist))
event_loop.run_until_complete(task)
event_loop.close()
logger.info('共耗时' + str(datetime.now() - x))
results = task.result()
if results:
for rs in results:
if rs.result():
for item in rs.result():
logger.info(item)
def select_demo2():
from datetime import datetime
logger.info(' --- begin ---')
sqllist = ['select count(*) as count from ( select * from spider_test limit 100) t ' ]
x = datetime.now()
event_loop = asyncio.get_event_loop()
task = event_loop.create_task(exec_insert(event_loop, sqltype='select', sqllist=sqllist))
event_loop.run_until_complete(task)
event_loop.close()
logger.info('共耗时' + str(datetime.now() - x))
results = task.result()
count_num = -1
if results:
for rs in results:
if rs.result():
for item in rs.result():
count_num = item['count']
break
logger.info(f'count_num={count_num}')
def example():
from datetime import datetime
x = datetime.now()
sqllist = [f"insert into sales values({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20),({i},'test{i}',10,20)" for i in range(0,1000000)]
sqllist = ['select * from xxx', 'select * from xxx']
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(exec_insert(event_loop, sqltype='select', sqllist=sqllist))
logger.info('共耗时' + str(datetime.now() - x))
executor1 = ThreadPoolExecutor(5)
executor2 = ThreadPoolExecutor(10)
def main1():
x = datetime.now()
future_set = set()
future1 = executor1.submit(insert_demo3_1)
future2 = executor2.submit(insert_demo3_2)
future_set.add(future1)
future_set.add(future2)
for future in as_completed(future_set):
error = future.exception()
if error is not None:
logger.error(error)
# 测试例子1
# tasks = [insert_demo3, insert_demo3 ]
# future_set = set()
# x = datetime.now()
# with ThreadPoolExecutor(len(tasks)) as executor:
# for task in tasks:
# future = executor.submit(task)
# future_set.add(future)
# for future in as_completed(future_set):
# error = future.exception()
# if error is not None:
# logger.error(error)
#
# results = []
# for future in future_set:
# results.append(future.result())
# logger.info('* results => ', results )
# 测试例子2
# x = datetime.now()
# thr_ls = []
# for i in range(5):
# thr = threading.Thread(target=insert_demo3)
# thr.setDaemon(True)
# thr.start()
# thr_ls.append(thr)
#
# for thr in thr_ls:
# thr.join()
logger.info('--- 主线程 结束---')
logger.info('*** 共耗时' + str(datetime.now() - x))
def main2():
x = datetime.now()
pool = Pool(processes=4)
pool.apply_async(insert_demo3_1)
pool.apply_async(insert_demo3_2)
pool.close()
pool.join()
logger.info('*** 共耗时' + str(datetime.now() - x))
if __name__ == '__main__':
#insert_demo1()
#insert_demo2()
main1()
#main2()
#select_demo1()
#select_demo2()