RabbitMQ 入门 DEMO - 基于 PHP AMQPlib
通过 composer 安装 PHP AMQPlib
composer require php-amqplib/php-amqplib
目录结构:
├─rabbitmq └─vendor ├─composer ├─php-amqplib │ └─php-amqplib │ └─PhpAmqpLib │ ├─Channel │ ├─Connection │ │ └─Heartbeat │ ├─Exception │ ├─Exchange │ ├─Helper │ │ └─Protocol │ ├─Message │ └─Wire │ └─IO └─phpseclib └─phpseclib └─phpseclib ├─Crypt ├─File │ └─ASN1 ├─Math ├─Net │ └─SFTP └─System └─SSH └─Agent
生产者和消费者demo
生产者
<?php require "../vendor/autoload.php"; use PhpAmqpLibConnectionAMQPStreamConnection; use PhpAmqpLibMessageAMQPMessage; if ($argc < 2) { exit("please input argument, like `php -f directP.php mystring`"); } $conn = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest, /); if (!$conn->isConnected()) { exit("connect err..."); } $channel = $conn->channel(); $properties = [ content_type => text/plain, delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT ]; $channel->basic_publish(new AMQPMessage($argv[1], $properties), "myExchange", "my", true); $channel->close(); $conn->close();
消费者
<?php require "../vendor/autoload.php"; use PhpAmqpLibConnectionAMQPStreamConnection; $conn = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest, /); if (!$conn->isConnected()) { exit("connect err..."); } $channel = $conn->channel(); $callback = function($msg) { $msg->delivery_info[channel]->basic_ack($msg->delivery_info[delivery_tag], false); // default ack this msg echo $msg->body . PHP_EOL; if ($msg->body == quit) { exit(quit now...); } file_put_contents(./test.txt, json_encode($msg)); return true; }; $channel->basic_consume(myQueue, , false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $conn->close();
测试
开启消费者:
php -f directC.php
生产消息:
php -f directP.php "hello world"
此时可以在消费者终端看到消息。
下一篇:
【数据治理】揭开主数据管理的陷阱