redis存储脚本
# -*- coding: utf-8 -*-
import redis
import service.util_tool as tool
import settings
redis_connect = redis.StrictRedis(host=settings.REDIS_HOST, port=6379) # 连接 redis
def save_shareholder_information_record(record_dict):
"""
保存企业的'股东信息'数据到 redis 缓存队列
:param record_str:
:return:
"""
if record_dict:
try:
# 将 record_str 放入 qimingpian_spider:enterprise_shareholder_information_seen_record
sadd_result = redis_connect.sadd('qimingpian_spider:enterprise_shareholder_information_seen_record', str(record_dict))
# print('sadd_result=' , sadd_result)
if sadd_result == 1:
# 爬取时间
record_dict['crawl_time'] = tool.now_formated_time()
# 把格式化的数据放入队列缓存 qimingpian_spider:enterprise_shareholder_information_new_record
result = redis_connect.sadd('qimingpian_spider:enterprise_shareholder_information_new_record', str(record_dict))
except Exception as e:
print(str(e))
def query_shareholder_information_record():
"""
读取企业的'股东信息'数据的缓存队列数据
:return:
"""
try:
bytes_text = redis_connect.spop('qimingpian_spider:enterprise_shareholder_information_new_record') # bytes
return bytes_text
except Exception as e:
print(str(e))
处理缓存消息
import json
# 从缓存队里获得未处理的'企业-股东信息'数据
shareholder_information_bytes_text = rdo.query_shareholder_information_record()
if(shareholder_information_bytes_text and shareholder_information_bytes_text != b'None'):
str_text = shareholder_information_bytes_text.decode('utf8').replace("'", '"')
shareholder_information_json_dict = json.loads(str_text) # dict
shareholder_information_json_dict['id'] = tool.getUUID()
shareholder_information_json_dict.update(basic_record_dict)
请求后台
def get_url_content(url, form , max_try_number=5):
try_num = 5
headers = {
"Content-Type": "application/x-www-form-urlencoded",
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
}
while True:
try:
response = requests.post(url=url, data=form, headers=headers, timeout=120)
return response.json()
except Exception as err:
print(url, '抓取数据报错',str(err))
try_num = try_num + 1
if try_num >= max_try_number:
print('尝试失败次数超过5次,放弃尝试!')
return None
从Redis队列里取数据进行消费
import time
import service.util_tool as tool
import service.seeyii_data_parser as sp
import db.seeyii_redis_dao as rdo
if __name__ == '__main__':
while True:
print("读取'视野金服'缓存队里中的数据开始..." , tool.now_formated_time() )
sleepFlag = False # 旗标
# 从缓存队里获得未处理的'投资机构列表'数据
investedCompany_bytes_text = rdo.query_invested_company_record()
if (investedCompany_bytes_text and investedCompany_bytes_text != b'None'):
print("插入'投资机构列表'数据 =>" , investedCompany_bytes_text)
sp.parse_data(investedCompany_bytes_text)
else:
sleepFlag = True
if sleepFlag:
# 每5秒查询一次缓存队列
time.sleep(5)
Redis作为消息队列
import redis
import time
class RedisQueue(object):
def __init__(self, name, namespace='queue', **redis_kwargs):
# redis的默认参数为:host='localhost', port=6379, db=0, 其中db为定义redis database的数量
self.__db = redis.Redis(**redis_kwargs)
self.key = '%s:%s' % (namespace, name)
# self.key = 'aaa:bbb'
def qsize(self):
return self.__db.llen(self.key) # 返回队列里面list内元素的数量
def put(self, item):
self.__db.rpush(self.key, item) # 添加新元素到队列最右方
def get_wait(self, timeout=None):
# 返回队列第一个元素,如果为空则等待至有元素被加入队列(超时时间阈值为timeout,如果为None则一直等待)
item = self.__db.blpop(self.key, timeout=timeout)
# if item:
# item = item[1] # 返回值为一个tuple
return item
def get_nowait(self):
# 直接返回队列第一个元素,如果队列为空返回的是None
item = self.__db.lpop(self.key)
return item
python redis decode_responses
redis 取出的结果默认是字节,设置 decode_responses=True 改成字符串
from redis import Redis
client = Redis(host='127.0.0.1', port=6379,decode_responses=True)