查看本文前,希望你能熟读一下这部分的概念,对于后续理解会有很大的帮助。

一、基础概念

rabbitmq组成部分如下:

Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue。
Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ。
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

消息发布流程:

—–发送消息—–
1、生产者和Broker建立TCP连接。
2、生产者和Broker建立通道(也被称为信道)。
3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。
4、Exchange将消息转发到指定的Queue(队列)

—–接收消息—–

1、消费者和Broker建立TCP连接
2、消费者和Broker建立通道
3、消费者监听指定的Queue(队列)
4、当有消息到达Queue时Broker默认将消息推送给消费者。
5、消费者接收到消息。
6、使用nohup让消费者以守护进程运行

二、安装依赖包(这里不使用php扩展,而使用composer)

提示:如果你的英文足够好,可以移步:官方github文档 ;嫌速度慢就挂梯子吧。

1.composer安装

composer require php-amqplib/php-amqplib

2.安装完成后在vendor目录下会有个php-amqplib文件夹。

注意:composer下载的依赖是有问题的,很多官方提供的方法缺失,导致一度怀疑自己是不是看不懂文档...

题外话:或许笔者太菜了,不明白其他博文为什么执行一下composer就可以使用那些方法...

例如:$msg->ack();作为消息应答时,这个ack方法就没有,其他还有一些方法就不说了。

所以还得用git再拉一遍完整的代码

git clone https://github.com/php-amqplib/php-amqplib.git

拉取完毕后,替换 /vendor/php-amqplib/php-amqplib/下的内容 为git刚刚拉取下来的文件。

这样就可以使用完整的包了。

三、创建命令

因为rabbitMq的消费者行为必须是在cli模式下运行,所以我们无法用普通的url访问去启动它(并且这也不合理,往往需要用到rabbitMq这等消息队列的业务场景,也说明业务量巨大;倘若使用url请求访问,就是同步的行为了,等一个天荒地老不说,完全违背了使用rabbitMq异步操作的初衷)。

查看thinkphp5.1文档:命令行          创建自定义指令

那么我们知道第一步:在cli模式下,运行

php think make:command Order order

生成一个order指令文件如下:

四、配置rabbitMq的一些基本信息

在config目录下,创建一个rabbitmq.php文件,用于设置rabbitMq的基本配置。

<?php
// +----------------------------------------------------------------------
// | rabbitMQ 示例配置
// 
//   Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:
//                      fanout:所有bind到此exchange的queue都可以接收消息
//                      direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
//                      topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
//                      headers:通过headers 来决定把消息发给哪些queue(这个很少用)
// +----------------------------------------------------------------------



return [
    // 连接信息
    'AMQP' => [
        'host'     => '127.0.0.1',
        'port'     => '5672',
        'username' => 'guest',
        'password' => 'guest',
        'vhost'    => '/'
    ],
    
    //测试队列:direct 路由模式
    'test_queue' => [
        'exchange_name' => 'test_exchange',    //交换机名称
        'exchange_type' => 'direct',            //交换机运行模式(从上面四种模式中选)
        'queue_name'    => 'test_queue',       //队列名称
        'route_key'     => 'test',             //路由键,用于绑定队列与交换机
        'consumer_tag'  => 'test'              //消费标签
    ],


    //订单队列:direct 路由模式
    'order_queue' => [
        'exchange_name' => 'order_exchange',    //交换机名称
        'exchange_type' => 'direct',            //交换机运行模式(从上面四种模式中选)
        'queue_name'    => 'order_queue',       //队列名称
        'route_key'     => 'order',             //路由键,用于绑定队列与交换机
        'consumer_tag'  => 'order'              //消费标签
    ],
    

    //...等等其它队列
];

五、创建生产者

在common文件夹下创建MqProducer控制器

<?php

namespace app\common\controller;



use think\facade\Config;
//引入rabbitMq所需类
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


class MqProducer
{
    /**
     * 生产者
     * @param $data 控制器传来的数据
     * @param $name 配置文件下rabbitmq文件前缀名
     * @throws \Exception
     */
    public function publish($data, $name)
    {
        //获取rabbitMq配置(刚刚在config文件下配置的rabbitmq信息)
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');

        //建立连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );

        //建立通道
        $channel = $connection->channel();

        //初始化交换机  
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);

        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);

        //绑定队列到交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
        
        //生成消息(json格式传输)
        $msg = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));

        //推送消息到交换机
        $channel->basic_publish($msg, $amqpDefail['exchange_name'], $amqpDefail['route_key']);

        //关闭通道
        $channel->close();

        //断开连接
        $connection->close();
    }
}

六、创建消费者

<?php

namespace app\common\controller;


use think\facade\Config;
use think\facade\Log;
//引入rabbitMq所需类
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;

class MqConsumer
{
    /**
     * 消费者(路由模式)
     * @param $name 配置文件下rabbitmq文件前缀名
     * @throws \ErrorException
     */
    public function consumer($name)
    {
        //获取rabbitMq配置
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');

        //建立连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );

        //建立通道
        $channel = $connection->channel();

        //流量控制(也被称为公平调度,这里设置一个消费者一次只处理200条消息)
        $channel->basic_qos(null, 200, null);

        //初始化交换机
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);

        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);

        //绑定队列与交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
        
        //消费消息
        $channel->basic_consume($amqpDefail['queue_name'], $amqpDefail['consumer_tag'], false, false, false, false, [$this, 'callback']);
        
        //注册退出函数行为(其实用不到,因为要保证消费者一直运行,所以不能断开连接))
        // register_shutdown_function([$this,'shutdown'], $channel, $connection);

        //消息未处理完毕时,循环监听并一直处理上方callback方法的逻辑
        $channel->consume();

    }
    
    /**
     * 回调后消息处理(业务逻辑放置此处)
     * @param  $msg
     */
    public function callback($msg)
    {
        //$msg->body  通过消费者传来的控制器接收的数据
        $data = json_decode($msg->body,true);//拿到数据
        //这里我拿来做了一个插库的动作
        app('weloginper')->create([
            'ToUserName' => 'test',
            'FromUserName' => 'test',
            'CreateTime' => 'test',
            'MsgType' => 'test',
            'Event' => 'test',
            'EventKey' => 'test',
            'phone' => 'test',
        ]);
        //如果有个值是quit,则让该消费者停止消费
        /*if($data['aa'] == 'quit'){
            $msg->getChannel()->basic_cancel($msg->getConsumerTag());
        }*/
        $msg->ack();  //消息应答:这波200条消息处理完毕后进行消息确认,告诉mq可以开始发下一波200条消息了
    }

    
    
    /**
     * @param $channel 信道
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
     */
    function shutdown($channel, $connection)
    {
        $channel->close(); //关闭通道
        $connection->close(); //断开连接
    }


}

七、创建一个普通的控制器,来触发生产行为

<?php

namespace app\common\controller;

use think\Console;

class Index
{
    public function index()
    {
         $page = input('get.page') ? input('get.page') : 1;//得到初始页码
         $data = [
                'page' => $page,  //当前页
         ];
         if($page <= 10){  //当前不是第10页则一直调用刚刚定义的生产者publish去生产消息
             app('producer')->publish($data, 'order');
             $url = url('/common/index/index')  . '?page=' . ++$page;
             die("<script>location.href='{$url}';</script>");
         }
         
         return json([
            'code' => 200,
            'msg' => 'order队列推送成功'
         ]);
    }



}

八、改写第三步的创建命令内容,加入消费行为

 九、去除禁止访问common模块

tp5.1默认是禁止访问common模块的,所以需要删除以保证程序能正常运行。

十、开始测试

初始情况下是没有队列和交换机的,如下:

我们来到root目录下启动cli  运行一下刚刚配置的命令如下:

这时候访问一下第七步编写的普通控制器,调用生产者往mq中推送消息。

从第1页推送到第11页,结合控制器中的代码来看,我们向生产者中推送了10条消息,如果逻辑正确的话,表中应该有10条数据。

 那么我们来看表中的数据:

确实是10条没错,那么就调用成功啦,这就是一个简单thinkphp5.1的结合rabbitmq实战。

十一、消费者以守护进程运行

刚刚那种php think order其实在退出cli后,会断开连接,这显然是不合理的,难道我们执行rabbitmq要一直开着cli么?

其实只需要一点小改动即可常驻后台,而不用担心退出cli消费者就不执行了↓↓↓

将  php think order  改为  nohup php think order > /dev/null 2 > /dev/null & 

如果 nohup php think order > /dev/null 2 > /dev/null & 没用

就得改为nohup php think order > /dev/null 2>&1 &

这段使用了nohup命令、&命令、还有重定向等。

具体命令详情移步:nohup和&后台运行,进程查看及终止

LINUX查看进程的4种方法(小结)

如下:

这里按住ctrl+c终止上段命令执行

再改用命令常驻启动之后,得到了该进程的id为18476

倘若想停止这个进程,使用kill 18476 或者 kill -9 18476 即可;如果觉得一个个杀死进程太过麻烦,可以使用 killall -9 进程名 来一次性杀死相关进程,比方说这里杀死order进程,就是killall -9 order

命令的意思在右侧,已翻译出来,大致就是:忽略nohup产生的日志信息,不写入到服务器文件中。

假如不小心退出了cli界面,也可以使用 ps -aux命令来查看启动进程id

笔者这里之前使用了php think temp命令启动了15个进程,并且退出了cli界面。这里演示怎么使用ps -aux命令,请忽略为什么不是上面的php think order 命令 (因为我换成了另一个命令...实际上是一样的,由php think order启动的进程,则会显示php think order,不要来杠)

当然你也可以使用 px aux|grep temp来只查看启动的temp进程,实际上笔者更推荐这样单独查看,而不是使用ps aux查看全局

十二、开启多个消费者

有时候一个队列里有很多消息待消费,只开启一个消费者显然速度是会很慢的,这时候就要开启多个消费者了。

怎么开启多个消费者呢?

很简单,将刚刚的 nohup php think order > /dev/null 2 > /dev/null & (这个没用就得改为:nohup php think order > /dev/null 2>&1 &)运行几次就开启了几个消费者,并且每个消费都依照消费者代码中设置的公平调度数据来处理一次处理消息(这里我在代码中设置的消息数据一次处理200条)。

这里开启8个消费者:

为了业务的更快完成,消费者的数量理论上是越多越好,但rabbitmq异步虽然消耗很少的资源,并不是说不耗费资源。如果开启太多消费者,很有可能造成服务器宕机or丢失数据的情况,所以具体请按照自身服务器配置来开启消费者数量,量力而行。

最后:

完成此博文,所参考资料来自(也有一些方法参数具体解释也可以在下面查到,因为我懒就不一一解释了):

ThinkPHP6应用RabbitMQ
学习thinkphp6.0使用rabbitmq示例

RabbitMQ的六种模式总结

tp6中使用rabbitmq

TP6框架下安装AMQP开发RabbitMQ

GitHub - php-amqplib/php-amqplib: The most widely used PHP client for RabbitMQ




2022/7/22补充:由于之前使用的仅有一台服务器做mq,并实时监听进行消费,这台服务器配置比较拉跨...在推送模板消息的时候开了15个消费者cpu一下跑到了50%,然后稳定在43%左右,所以想着说拿另一台服务器单独抽来做mq服务器,这样就不会影响主服务器功能了。

连接也很简单,在另一台服务器里装好mq,能正常访问web界面后,放开原本服务器的5672端口;在原本的服务器rabbitMq.php里将ip改成另一台服务器的ip即可。

 运行一下看看效果:

 可以看到另一台121服务器已经接收到刚刚推送的order队列数据了

生产者使用同名队列推送到mq的时候,注意传输的数据要与消费者逻辑相呼应,不然原始同名守护进程将全部死掉,导致业务崩盘;比方说你在消费者业务逻辑中test队列需要使用$data['test'],但是另一台推送服务器中的生产者的test队列并没有传该值到mq,那么原始服务器已经启动的test队列守护进程将全部死掉


2023/1/11后记

上面那种nohup配合&的方式不知道为啥总是运行一阵子后,进程就死掉了。

改用Supervisor来守护进程了,可以移步:Supervisor

利用Supervisor守护php进程  进行查看

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐