laravel如何使用kafka,及其kafka类

本文并没有kafka的安装教程,本文是针对已经安装kafka及其配置好kafka的php拓展并且使用laravel框架进行开发项目,配置一个可供laravel框架使用的生产及消费者类.

以下代码修改自本站的YII框架关于kafka类的代码,经过测试使用在本人的项目中,可正常运行,larvael版本:5.6 代码放置larvael框架位置:app/Tools/Kafka.php

<?php
namespace AppTools;

use IlluminateConfigRepository;

use IlluminateSupportFacadesDB;
use MonologLogger;
use MonologHandlerStreamHandler;

use IlluminateHttpRequest;

class Kafka
{
    public $broker_list = 127.0.0.1;//配置kafka,可以用逗号隔开多个kafka
    public $topic = test;//管道名称
    public $partition = 0;

    protected $producer = null;
    protected $consumer = null;

    public function __construct()
    {
        if (empty($this->broker_list)) {
            throw new InvalidConfigException("broker not config");
        }
        $rk = new RdKafkaProducer();
        if (empty($rk)) {
            throw new InvalidConfigException("producer error");
        }
        $rk->setLogLevel(LOG_DEBUG);
        if (!$rk->addBrokers($this->broker_list)) {
            throw new InvalidConfigException("producer error");
        }
        $this->producer = $rk;
    }

    /**
     * 生产者
     * @param array $messages
     * @return mixed
     */
    public function send($messages = [],$topic)
    {
        $topic = $this->producer->newTopic($topic);
        return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
    }

    /**
     * 消费者
     */
    public function consumer($object, $callback){
        $conf = new RdKafkaConf();
        $conf->set(group.id, 0);
        $conf->set(metadata.broker.list, $this->broker_list);

        $topicConf = new RdKafkaTopicConf();
        $topicConf->set(auto.offset.reset, smallest);

        $conf->setDefaultTopicConf($topicConf);

        $consumer = new RdKafkaKafkaConsumer($conf);

        $consumer->subscribe([$this->topic]);

        echo "waiting for messages.....
";
        while(true) {
            $message = $consumer->consume(120*1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    echo "message payload....";
                    $object->$callback($message->payload);
                    break;
            }
            sleep(1);
        }
    }
}
?>

在控制器中如何使用:

首先再头部导入这个类:use AppToolsKafka;

下面是使用生产者实例:

public function test(){

     $topic = tool;//输入使用管道名称
     $data[shop_id] = 58;
     $data[bar_code]=586;
     $data[goods_num] = 1;
     $data[goods_unit] = 个;

$Kafka = new Kafka();
$Error_Msg = $Kafka->send($data,$topic);//传入数组会自动转换json
var_dump($Error_Msg);


   }

下面是消费者实例,消费者我这里使用了的是php脚本进行的操作:

<?php

$conf = new RdKafkaConf();

$conf->set(group.id, myConsumerGroup);

$rk = new RdKafkaConsumer($conf);
$rk->addBrokers("localhost:9092");

$topicConf = new RdKafkaTopicConf();
$topicConf->set(auto.commit.interval.ms, 100);
$topicConf->set(offset.store.method, file);
$topicConf->set(offset.store.path, sys_get_temp_dir());
$topicConf->set(auto.offset.reset, smallest);

$topic = $rk->newTopic("tool", $topicConf);//读取的管道

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
        //没有错误打印信息
           $message = json_decode(json_encode($message),true);
           $data = json_decode($message[payload],true);
           var_dump($data);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "等待接收信息
";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "超时
";
            break;
        default:
           throw new Exception($message->errstr(), $message->err);
            break;
    }
 sleep(1);
}

?>
经验分享 程序员 微信小程序 职场和发展