RabbitMQ 入门 DEMO - 基于 PHP AMQPlib
通过 composer 安装 PHP AMQPlib
composer require php-amqplib/php-amqplib
目录结构:
├─rabbitmq
└─vendor
├─composer
├─php-amqplib
│ └─php-amqplib
│ └─PhpAmqpLib
│ ├─Channel
│ ├─Connection
│ │ └─Heartbeat
│ ├─Exception
│ ├─Exchange
│ ├─Helper
│ │ └─Protocol
│ ├─Message
│ └─Wire
│ └─IO
└─phpseclib
└─phpseclib
└─phpseclib
├─Crypt
├─File
│ └─ASN1
├─Math
├─Net
│ └─SFTP
└─System
└─SSH
└─Agent
生产者和消费者demo
生产者
<?php
require "../vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
if ($argc < 2) {
exit("please input argument, like `php -f directP.php mystring`");
}
$conn = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest, /);
if (!$conn->isConnected()) {
exit("connect err...");
}
$channel = $conn->channel();
$properties = [
content_type => text/plain,
delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
$channel->basic_publish(new AMQPMessage($argv[1], $properties), "myExchange", "my", true);
$channel->close();
$conn->close();
消费者
<?php
require "../vendor/autoload.php";
use PhpAmqpLibConnectionAMQPStreamConnection;
$conn = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest, /);
if (!$conn->isConnected()) {
exit("connect err...");
}
$channel = $conn->channel();
$callback = function($msg) {
$msg->delivery_info[channel]->basic_ack($msg->delivery_info[delivery_tag], false); // default ack this msg
echo $msg->body . PHP_EOL;
if ($msg->body == quit) {
exit(quit now...);
}
file_put_contents(./test.txt, json_encode($msg));
return true;
};
$channel->basic_consume(myQueue, , false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$conn->close();
测试
开启消费者:
php -f directC.php
生产消息:
php -f directP.php "hello world"
此时可以在消费者终端看到消息。
下一篇:
【数据治理】揭开主数据管理的陷阱
