# 使用pecl命令安装 $ pecl install amqp
# 使用composer安装 $ composer require php-amqplib/php-amqplib
require_once __DIR__ . '/vendor/autoload.php'; $connection = new PhpAmqpLib\Connection\AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest' ); $channel = $connection->channel();
$queueName = 'my_queue'; // 队列名称 $channel->queue_declare($queueName, false, true, false, false);
$msg = new PhpAmqpLib\Message\AMQPMessage('Hello RabbitMQ!'); $channel->basic_publish($msg, '', $queueName);
$callback = function($msg) { echo 'Received message: ' . $msg->body . "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_consume($queueName, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
在此代码中,我们创建了一个回调函数来接收和处理消息。处理完消息后,通过 basic_ack 方法来确认消息已被成功处理,这样即使消费者发生故障,消息也不会丢失。
$channel->close(); $connection->close();