RabbitMQ 延迟队列-基于PHP实现

RabbitMQ 延迟队列-基于PHP实现

kain
2020-07-02 / 0 评论 / 163 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2020年07月02日,已超过1417天没有更新,文章所提及的内容可能已过时失效,所以请自行测试验证。

安装 RabbitMQ 延迟队列插件

RabbitMQ 延迟队列插件未安装直接使用的话,会报错:

unknown exchange type 'x-delayed-message'

插件下载地址:https://www.rabbitmq.com/community-plugins.html 。下载 Erlang 可执行文件之后,复制到rabbit服务的插件目录(自己的安装目录,我的是 C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.14\plugins )中,然后开启插件服务:

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

测试步骤

  1. 创建测试项目的目录 mq
  2. https://github.com/php-amqplib/php-amqplib 下载AMQP库(当然也可以通过 composer 安装,这里为了简单直接自己处理了),放入 mq 目录
  3. 编写 index.php,实现自动加载
  4. 创建 test 目录,里面分别创建 mqc.php 消费者和 mqp.php 生产者两个文件
  5. 跑脚本,测试消息的生产和消费:

    • php -f index.php delayP p 3 ,生产消息,延时3秒
    • php -f index.php delayC c ,消费消息

目录结构

├─PhpAmqpLib
│  ├─Channel
│  ├─Connection
│  ├─Exception
│  ├─Exchange
│  ├─Helper
│  │  └─Protocol
│  ├─Message
│  └─Wire
│      └─IO
├─test
│  ├─delayP.php
│  └─delayC.php
└─index.php

源码

index.php

<?php

function my_autoloader($cName) {
    include(__DIR__."/".$cName.".php");
}

spl_autoload_register("my_autoloader");

print_r($argv);

if (isset($argv[2])) {
    $cname = '\test\\'.$argv[1];
    if (!class_exists($cname)) {
        exit("class (".$cname.") not exists".PHP_EOL);
    }
    $c = new $cname();
    if (!method_exists($c, $argv[2])) {
        exit("method (".$argv[2].") not exists".PHP_EOL);
    }
    if (isset($argv[3])) {
        call_user_func(array($c, $argv[2]), $argv[3]);
    } else {
        call_user_func(array($c, $argv[2]));
    }
} else if (isset($argv[1])) {
    if (!function_exists($argv[1])) {
        exit("function (".$argv[1].") not exists".PHP_EOL);
    }
    $argv[1]();
} else {
    exit("please input at least one argument".PHP_EOL);
}

delayP.php

<?php

namespace test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class delayP {
    private $host = 'localhost';
    private $port = 5672;
    private $user = 'guest';
    private $password = 'guest';
    
    // 可能丢失
    public function p($delayTime = 5) {
        $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
        $channel = $connection->channel();

        $channel->exchange_declare('delay.myExchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
        $channel->queue_declare('delay.myQueue', false, true, false, false);
        $channel->queue_bind('delay.myQueue', 'delay.myExchange', 'my');

        // 准备消息
        $msg = new AMQPMessage(
            json_encode([
                'data' => "延迟队列数据".time(),
                'sendTime' => time(),
                'expectRunTime' => time() + $delayTime
            ]), [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT // 持久化
            ]
        );        
        $msg->set('application_headers', new AMQPTable([
            'x-delay' => $delayTime * 1000 // 延迟时间,单位毫秒
        ]));
        $channel->basic_publish($msg, 'delay.myExchange', 'my');
        echo "basic_publish success";

        $channel->close();
        $connection->close();
    }
}

delayC.php

<?php

namespace test;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class delayC {
    private $host = 'localhost';
    private $port = 5672;
    private $user = 'guest';
    private $password = 'guest';
    
    // 自动 ACK
    public function c() {
        $connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->password, '/', false, 'AMQPLAIN', null, 'en_US', 3.0, 120.0, null, true, 60);
        $channel = $connection->channel();

        $channel->exchange_declare('delay.myExchange', 'x-delayed-message', false, true, false, false, false, new AMQPTable(['x-delayed-type' => 'direct']));
        $channel->queue_declare('delay.myQueue', false, true, false, false);
        
        //闭包回调函数
        $callback = function ($msg) {
            echo $msg->body.' '.time();
            echo PHP_EOL;
        };
        $channel->basic_qos(null, 1, null);
        $channel->basic_consume('delay.myQueue', '', false, true, false, false, $callback);

        while (count($channel->callbacks)) {
            $channel->wait();
        }
        $channel->close();
        $connection->close();
    }
}
0

评论 (0)

取消