Redis 作为消息中间件

Reads: 304 Edit

redis存储脚本

# -*- coding: utf-8 -*-

import redis
import service.util_tool as tool
import settings

redis_connect = redis.StrictRedis(host=settings.REDIS_HOST, port=6379)  # 连接 redis

def save_shareholder_information_record(record_dict):
    """
    保存企业的'股东信息'数据到 redis 缓存队列
    :param record_str:
    :return:
    """

    if record_dict:
        try:
            # 将 record_str 放入 qimingpian_spider:enterprise_shareholder_information_seen_record
            sadd_result = redis_connect.sadd('qimingpian_spider:enterprise_shareholder_information_seen_record', str(record_dict))
            # print('sadd_result=' , sadd_result)

            if sadd_result == 1:
                # 爬取时间
                record_dict['crawl_time'] = tool.now_formated_time()
                # 把格式化的数据放入队列缓存 qimingpian_spider:enterprise_shareholder_information_new_record
                result = redis_connect.sadd('qimingpian_spider:enterprise_shareholder_information_new_record', str(record_dict))
        except Exception as e:
            print(str(e))

def query_shareholder_information_record():
    """
    读取企业的'股东信息'数据的缓存队列数据
    :return:
    """
    try:
        bytes_text = redis_connect.spop('qimingpian_spider:enterprise_shareholder_information_new_record')  # bytes
        return bytes_text
    except Exception as e:
        print(str(e))



处理缓存消息

import json

 # 从缓存队里获得未处理的'企业-股东信息'数据
            shareholder_information_bytes_text = rdo.query_shareholder_information_record()
            if(shareholder_information_bytes_text and shareholder_information_bytes_text != b'None'):
                str_text = shareholder_information_bytes_text.decode('utf8').replace("'", '"')
                shareholder_information_json_dict = json.loads(str_text)  # dict
                shareholder_information_json_dict['id'] = tool.getUUID()
                shareholder_information_json_dict.update(basic_record_dict)

请求后台

def get_url_content(url, form , max_try_number=5):
    try_num = 5
    headers = {
        "Content-Type": "application/x-www-form-urlencoded",
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/73.0.3683.86 Safari/537.36'
    }

    while True:
        try:
            response = requests.post(url=url, data=form, headers=headers, timeout=120)
            return response.json()
        except Exception as err:
            print(url, '抓取数据报错',str(err))
            try_num = try_num + 1
            if try_num >= max_try_number:
                print('尝试失败次数超过5次,放弃尝试!')
                return None

从Redis队列里取数据进行消费

import time
import service.util_tool as tool
import service.seeyii_data_parser as sp
import db.seeyii_redis_dao as rdo

if __name__ == '__main__':
    while True:
        print("读取'视野金服'缓存队里中的数据开始..." , tool.now_formated_time() )
        sleepFlag = False  # 旗标
        # 从缓存队里获得未处理的'投资机构列表'数据
        investedCompany_bytes_text = rdo.query_invested_company_record()

        if (investedCompany_bytes_text and investedCompany_bytes_text != b'None'):
            print("插入'投资机构列表'数据 =>" , investedCompany_bytes_text)
            sp.parse_data(investedCompany_bytes_text)
        else:
            sleepFlag = True

        if sleepFlag:
            # 每5秒查询一次缓存队列
            time.sleep(5)

Redis作为消息队列

import redis
import time


class RedisQueue(object):
    def __init__(self, name, namespace='queue', **redis_kwargs):
        # redis的默认参数为:host='localhost', port=6379, db=0, 其中db为定义redis database的数量
        self.__db = redis.Redis(**redis_kwargs)
        self.key = '%s:%s' % (namespace, name)
        # self.key = 'aaa:bbb'

    def qsize(self):
        return self.__db.llen(self.key)  # 返回队列里面list内元素的数量

    def put(self, item):
        self.__db.rpush(self.key, item)  # 添加新元素到队列最右方

    def get_wait(self, timeout=None):
        # 返回队列第一个元素,如果为空则等待至有元素被加入队列(超时时间阈值为timeout,如果为None则一直等待)
        item = self.__db.blpop(self.key, timeout=timeout)
        # if item:
        #     item = item[1]  # 返回值为一个tuple
        return item

    def get_nowait(self):
        # 直接返回队列第一个元素,如果队列为空返回的是None
        item = self.__db.lpop(self.key)
        return item

python redis decode_responses

redis 取出的结果默认是字节,设置 decode_responses=True 改成字符串

from redis import Redis

client = Redis(host='127.0.0.1', port=6379,decode_responses=True)


Comments

Make a comment

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。