RabbitMQ 入门 DEMO - 基于 PHP AMQPlib

通过 composer 安装 PHP AMQPlib

composer require php-amqplib/php-amqplib

目录结构:

├─rabbitmq
└─vendor
    ├─composer
    ├─php-amqplib
    │  └─php-amqplib
    │      └─PhpAmqpLib
    │          ├─Channel
    │          ├─Connection
    │          │  └─Heartbeat
    │          ├─Exception
    │          ├─Exchange
    │          ├─Helper
    │          │  └─Protocol
    │          ├─Message
    │          └─Wire
    │              └─IO
    └─phpseclib
        └─phpseclib
            └─phpseclib
                ├─Crypt
                ├─File
                │  └─ASN1
                ├─Math
                ├─Net
                │  └─SFTP
                └─System
                    └─SSH
                        └─Agent

生产者和消费者demo

生产者

<?php

require "../vendor/autoload.php";

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

if ($argc < 2) {
          
   
	exit("please input argument, like `php -f directP.php mystring`");
}

$conn = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest, /);
if (!$conn->isConnected()) {
          
   
	exit("connect err...");
}
$channel = $conn->channel();

$properties = [
	content_type => text/plain,
	delivery_mode => AMQPMessage::DELIVERY_MODE_PERSISTENT
];
$channel->basic_publish(new AMQPMessage($argv[1], $properties), "myExchange", "my", true);

$channel->close();
$conn->close();

消费者

<?php

require "../vendor/autoload.php";

use PhpAmqpLibConnectionAMQPStreamConnection;

$conn = new AMQPStreamConnection(127.0.0.1, 5672, guest, guest, /);
if (!$conn->isConnected()) {
          
   
	exit("connect err...");
}
$channel = $conn->channel();


$callback = function($msg) {
          
   
    $msg->delivery_info[channel]->basic_ack($msg->delivery_info[delivery_tag], false); // default ack this msg
    echo $msg->body . PHP_EOL;
	if ($msg->body == quit) {
          
   
		exit(quit now...);
	}
    file_put_contents(./test.txt, json_encode($msg));
    return true;
};


$channel->basic_consume(myQueue, , false, false, false, false, $callback);

while(count($channel->callbacks)) {
          
   
	$channel->wait();
}


$channel->close();
$conn->close();

测试

开启消费者:

php -f directC.php

生产消息:

php -f directP.php "hello world"

此时可以在消费者终端看到消息。

经验分享 程序员 微信小程序 职场和发展