python 操作kafka 生产消费
生产:
from kafka import KafkaProducer from kafka.errors import kafka_errors import traceback import json # topic 不存在默认会创建 def producer_demo(): producer = KafkaProducer( bootstrap_servers=[192.168.168.168:9092], api_version=(2, 13)) print(init done) # 发送消息 for i in range(0, 10): future = producer.send( topic=heno, value=test.encode(utf-8), # partition=1 ) try: future.get(timeout=10) # 监控是否发送成功 except kafka_errors: # 发送失败抛出kafka_errors traceback.format_exc() if __name__ == __main__: producer_demo()
消费:
from kafka import KafkaProducer, KafkaConsumer, TopicPartition import json def consumer_demo(): consumer = KafkaConsumer( heno, bootstrap_servers=192.168.201.151:9092, group_id=consumer_1, api_version=(2, 13) ) for message in consumer: print(message.value.decode()) if __name__ == __main__: consumer_demo()