laravel如何使用kafka,及其kafka类
本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.
以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php
<?php namespace AppTools; use IlluminateConfigRepository; use IlluminateSupportFacadesDB; use MonologLogger; use MonologHandlerStreamHandler; use IlluminateHttpRequest; class Kafka { public $broker_list = 127.0.0.1;//配置kafka,可以用逗号隔开多个kafka public $topic = test;//管道名称 public $partition = 0; protected $producer = null; protected $consumer = null; public function __construct() { if (empty($this->broker_list)) { throw new InvalidConfigException("broker not config"); } $rk = new RdKafkaProducer(); if (empty($rk)) { throw new InvalidConfigException("producer error"); } $rk->setLogLevel(LOG_DEBUG); if (!$rk->addBrokers($this->broker_list)) { throw new InvalidConfigException("producer error"); } $this->producer = $rk; } /** * 生产者 * @param array $messages * @return mixed */ public function send($messages = [],$topic) { $topic = $this->producer->newTopic($topic); return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages)); } /** * 消费者 */ public function consumer($object, $callback){ $conf = new RdKafkaConf(); $conf->set(group.id, 0); $conf->set(metadata.broker.list, $this->broker_list); $topicConf = new RdKafkaTopicConf(); $topicConf->set(auto.offset.reset, smallest); $conf->setDefaultTopicConf($topicConf); $consumer = new RdKafkaKafkaConsumer($conf); $consumer->subscribe([$this->topic]); echo "waiting for messages..... "; while(true) { $message = $consumer->consume(120*1000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: echo "message payload...."; $object->$callback($message->payload); break; } sleep(1); } } } ?>
在控制器中如何使用:
首先再头部导入这个类:use AppToolsKafka;
下面是使用生产者实例:
public function test(){ $topic = tool;//输入使用管道名称 $data[shop_id] = 58; $data[bar_code]=586; $data[goods_num] = 1; $data[goods_unit] = 个; $Kafka = new Kafka(); $Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json var_dump($Error_Msg); }
下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:
<?php $conf = new RdKafkaConf(); $conf->set(group.id, myConsumerGroup); $rk = new RdKafkaConsumer($conf); $rk->addBrokers("localhost:9092"); $topicConf = new RdKafkaTopicConf(); $topicConf->set(auto.commit.interval.ms, 100); $topicConf->set(offset.store.method, file); $topicConf->set(offset.store.path, sys_get_temp_dir()); $topicConf->set(auto.offset.reset, smallest); $topic = $rk->newTopic("tool", $topicConf);//读取的管道 // Start consuming partition 0 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED); while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: //没有错误打印信息 $message = json_decode(json_encode($message),true); $data = json_decode($message[payload],true); var_dump($data); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "等待接收信息 "; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "超时 "; break; default: throw new Exception($message->errstr(), $message->err); break; } sleep(1); } ?>