python 操作kafka 生产消费
生产:
from kafka import KafkaProducer
from kafka.errors import kafka_errors
import traceback
import json
# topic 不存在默认会创建
def producer_demo():
producer = KafkaProducer(
bootstrap_servers=[192.168.168.168:9092],
api_version=(2, 13))
print(init done)
# 发送消息
for i in range(0, 10):
future = producer.send(
topic=heno,
value=test.encode(utf-8),
# partition=1
)
try:
future.get(timeout=10) # 监控是否发送成功
except kafka_errors: # 发送失败抛出kafka_errors
traceback.format_exc()
if __name__ == __main__:
producer_demo()
消费:
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
import json
def consumer_demo():
consumer = KafkaConsumer(
heno,
bootstrap_servers=192.168.201.151:9092,
group_id=consumer_1,
api_version=(2, 13)
)
for message in consumer:
print(message.value.decode())
if __name__ == __main__:
consumer_demo()
