消息队列rabbitmq
RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统,底层基于Erlang语言。
一:centos7安装RabbitMQ
这玩意儿安装很扯淡,官方推荐rpm安装,rpm安装本身是最简单的,但是安装RabbitMQ却不简单,很可能需要修改仓库地址。不同linux版本不一样,centos6和centos7也不一样。我这里不用rpm,手动编译Erlang,然后选择编译好的RabbitMQ。
1:安装Erlang
1):先安装几个必要的插件
$ yum -y install gcc glibc-devel make ncurses-devel openssl-devel autoconf unixODBC unixODBC-devel socat
2):Erlang下载地址:http://www.erlang.org/downloads,我这里下载21.1版本
$ wget http://erlang.org/download/otp_src_21.1.tar.gz #下载 $ tar -xvf otp_src_21.1.tar.gz #解压 $ cd otp_src_21.1/ #进入目录准备编译 $ ./configure --prefix=/usr/local/erlang --without-javac #忽略java编译 $ make #编译 $ make install #安装
make & make install 这两步很慢,巨慢无比,耐心等待。
3):进入我们安装后的目录测试一下是否安装成功
$ /usr/local/erlang/bin/erl安装成功
二:安装rabbitmq
1:下载地址:http://www.rabbitmq.com/download.html
2:因为我上面的Erlang是手动编译的,所以这里不选择rpm方式安装,直接下载解压包,从这里下载:https://github.com/rabbitmq/rabbitmq-server/releases
$ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz $ tar xvJf rabbitmq-server-generic-unix-3.7.8.tar.xz $ mv rabbitmq_server-3.7.8/ /usr/local/rabbitmq #解压后移动到你想放到的目录这个是编译好的,可以直接用。
3:设置环境变量,设置两个,一个是Erlang,一个是rabbitmq,打开文件/etc/profile文件,在文件最后加入以下三行:
export ERLANG_PATH=$PATH:/usr/local/erlang/bin #erlang安装目录 export RABBITMQ_PATH=$PATH:/usr/local/rabbitmq/sbin #rabbitmq安装目录 export PATH=$PATH:$ERLANG_PATH:$RABBITMQ_PATH运行命令生效:
source /etc/profile启动一下:
$ rabbitmq-server
成功
启动web管理后台:
$ rabbitmq-plugins enable rabbitmq_management #disable为关闭开启防火墙,打开15672端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --reload #重启防火墙web界面出来了
guest用户被限制,只能通过127.0.0.1访问,当然也可以修改配置文件开启guest远程访问。这里我们新建一个用户,并授予管理员权限。
用户名:admin 密码:123
$ rabbitmqctl add_user admin 123 #添加用户 $ rabbitmqctl set_user_tags admin administrator #分配角色
登录成功
相关命令:命令在/usrlocal/rabbitmq/sbin下
rabbitmq-server -detached #后台启动 rabbitmqctl stop #关闭服务 rabbitmqctl status #查看状态 rabbitmqctl list_users #列出角色
三:安装php扩展(我用的php7.2版本)
php是用amqp调用RabbitMQ,所以先下载ampq
$ wget https://pecl.php.net/get/amqp-1.9.3.tgz #下载 $ tar -xvf amqp-1.9.3.tgz #解压 $ cd amqp-1.9.3 $ /usr/local/php/bin/phpize #用phpize生成编译文件,注意查看你的php在哪里 $ ./configure --with-php-config=/usr/local/php/bin/php-config到这一步,我这里报错了:checking for amqp using pkg-config... configure: error: librabbitmq not found
这个错误提示还要安装一个破玩意:rabbitmq-c
去这里下载:https://github.com/alanxz/rabbitmq-c/releases
$ wget https://github.com/alanxz/rabbitmq-c/archive/v0.9.0.tar.gz $ tar -xvf v0.9.0.tar.gz $ cd rabbitmq-c-0.9.0/准备configure的时候,发现没有configure,0.9改成cmake了,靠,安装一下cmake
$ yum -y install cmake $ cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c-0.9.0 #指定安装目录 $ make $ make install
然后回过头去再编译amqp-1.9.3
$ ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.9.0 $ make $ make install
妈的,make报错!
发现点蛛丝马迹,上面进入了/usr/local/rabbitmq-c-0.9.0/lib 目录,查看一下发现/usr/local/rabbitmq-c-0.9.0/没有lib,但有个lib64位。
处理一下:
$ cp -R /usr/local/rabbitmq-c-0.9.0/lib64/ /usr/local/rabbitmq-c-0.9.0/lib
接着干:make && make install ,OK,这也太不智能了吧。
加入到php.ini 查看一下!
extension=amqp.so
扩展安装成功,这时候就可以用PHP操作RabbitMQ了。
四:一些基本术语参数
1:Message:消息。包括消息头和消息体
2:Publisher:生产者。发布消息的一方
3:Consumer:消费者。接受消息的一方
4:Connection:网络链接,TCP链接
5:Channel:信道。建立在真实的TCP连接内地虚拟连接,所有命令通过信道输入输出,多路复用一条 TCP 连接,降低TCP开销
6:ExChange:交换区。因为RabbitMQ是基于AMQP的,AMQP协议中的核心思想就是生产者和消费者隔离,也就是生产者不直接把消息发到队列,而是先发给ExChange(交换区),Exchange按照特定的策略转发到Queue(队列中)进行存储,所以ExChange的作用就是负责转发,生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息。
Direct:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,完全相等,则发送到该Binding对应的Queue中。
Topic:模糊匹配,如果匹配上了,则发送到该Binding对应的Queue中。
* 表示可以匹配零个或多个字符(Routing key是user.# user.a user.b user都可以匹配)
# 表示可以匹配一个字符 (Routing key是user.* user.a user.b 可以匹配;user user.a.c 不可以匹配)
Fanout:直接将消息转发到所有binding的对应queue中,忽略Routing key。
Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。
7:Binding:绑定。建立交换区与消息队列之间的关联,也就是设置规则,交换区该发送到哪个队列。
8:Queue:消息队列容器,存储消息队列的地方。durability表示是否持久化,durable表示是,transient表示否。
9:Vhost:翻译叫什么虚拟主机,其实就类似于mysql的数据库,一个意思。
五:一些基本命令
$ rabbitmq-server -detached #后台启动 $ rabbitmqctl stop [] #停止RabbitMQ服务,同时关闭erlang节点和应用程序 $ rabbitmqctl status #查看状态 $ rabbitmqctl stop [ ] $ rabbitmqctl stop_app #停止RabbitMQ服务,仅关闭erlang节点上的rabbit应用程序 $ rabbitmqctl start_app #启动erlan node上的rabbitmq的应用 #用户管理 $ rabbitmqctl list_users #列出角色 $ rabbitmqctl add_user admin 123 #添加用户和密码 这里用户名:admin 密码:123 $ rabbitmqctl set_user_tags admin administrator #分配角色 $ rabbitmqctl change_password username newpassword #修改用户密码 $ rabbitmqctl delete_user username #删除用户 # vhost(Virtual host)管理,这玩意儿相当于mysql的数据库 $ rabbitmqctl add_vhost {name} #添加 $ rabbitmqctl delete_vhost {name} #删除 $ rabbitmqctl list_vhosts {name} #查看所有
六:php操作RabbitMQ
1:实践:用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1
<?php $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); if(!$conn->connect()){//建立连接 die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) $ExChangeName = 'goods_msm'; //交换区名称 $queueName = 'goods_worker'; //队列名称 $routeName1 = 'code1'; //路由key //创建交换机对象 $exChange = new AMQPExchange($channel); $exChange->setName($ExChangeName); $exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复 echo "Exchange Status:".$exChange->declare()."\n"; //查看如果交换机不存在则进行创建运行,如果没有报错的话会输出:
Exchange Status:1
通过web窗口看一下
交换区建立成功
继续:
//创建队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); //队列持久化 echo "Message Total:".$queue->declare()."\n"; //查看,如果不存在则创建可以看到队列创建成功
继续往下走:
//绑定交换区与队列,指定路由键 //rabbitmq不是直接发送到队列,发送到交换区,由交换区决定发给某个队列 echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //绑定路由 $conn->disconnect(); //关闭连接可以看到绑定成功
也可以用命令查看:
这样就创建成功了,下面继续用生产者,消费者干起来。最终代码:
<?php /** * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1 * 此代码不是生产者,也不是消费者 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); if(!$conn->connect()){//建立连接 die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) $ExChangeName = 'goods_msm'; //交换区名称 $queueName = 'goods_worker'; //队列名称 $routeName1 = 'code1'; //路由key //创建交换机对象 $exChange = new AMQPExchange($channel); $exChange->setName($ExChangeName); $exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复 echo "Exchange Status:".$exChange->declare()."\n"; //查看如果交换机不存在则进行创建 //创建队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); //队列持久化 echo "Message Total:".$queue->declare()."\n"; //查看,如果不存在则创建 //绑定交换区与队列,指定路由键 //rabbitmq不是直接发送到队列,发送到交换区,由交换区决定发给某个队列 echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //绑定路由 $conn->disconnect(); //关闭连接
2:生产者
<?php /** * 生产者 * 生产者也就是发送方 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); if(!$conn->connect()){//建立连接 die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) $ExChangeName = 'goods_msm'; //交换区名称 $routeName1 = 'code1'; //路由key //创建交换机对象 $exChange = new AMQPExchange($channel); $exChange->setName($ExChangeName); $exChange->publish('第一条测试消息', $routeName1); //发送消息 $conn->disconnect(); //关闭连接运行一下,到web管理界面看看
消息发送成功
3:消费者
<?php /** * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1 * 此代码不是生产者,也不是消费者 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); if(!$conn->connect()){//建立连接 die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) $queueName = 'goods_worker'; //队列名称 //创建队列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $queue->setFlags(AMQP_DURABLE); //队列持久化 //接受消息 $queue->consume(function ($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //处理消息 }, AMQP_AUTOACK); //自动应答 $conn->disconnect(); //关闭连接
注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。
就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。
运行一下生产者,消费端输出:
7:实战
实战1:延时队列
(1)需求:电子商务网站下单后,30分钟如果未付款,改变订单状态
(2)原理:rabbitmq没有直接延时队列的功能,不过可以通过模拟实现此功能。用过redis的都知道,redis有一个ttl功能,就是生存周期,某个key设置生存周期,过期就会消失,rabbitmq也有此功能。rabbitmq消息过期进入死信队列,然后配置一个转发,把死信队列的消息转发到某个队列,这样就可以操作这个队列。
(3)步骤:
a:创建订单的时候,同时发送订单到消息队列,并且设置过期时间
b:mq回调,也就是改变订单状态服务,判断30分钟后是否付款,如果未付款,改变订单状态为:无效订单。
MQ也要分两步,首先是进入死信队列,但是死信队列不能直接消费,需要转发出来,利用MQ的两个特性:
a:ttl,生存周期
b:Dead Letter Exchanges(DLX)
这里设置两个地方用于转发:
x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送
具体参考MQ官方文档:http://www.rabbitmq.com/dlx.html
demo:
<?php /** 1:创建第1个队列,此队列为下单的时候放入队列,然后设置过期时间 2:创建第2个队列,第1个队列过期后自动转入该队列 3:所以处理N久时间过期,只要处理第2个队列就行了。 **/ $conn = new AMQPConnection([ 'host' => '127.0.0.1', 'vhost' => '/', 'port' => 5672, 'login' => 'guest', 'password' => 'guest' ]); $params = [ //订单最开始在这里 第1个队列 'exchangeName' => 'old_order_exchange', //交换区名称 'queueName' => 'old_order_msg', //订单队列 'routeKey' => 'old_order_route', //路由key //过期后转发到这里 第2个队列 'exchangeName2' => 'new_order_exchange', //交换区名称 'queueName2' => 'new_order_msg', //订单队列 'routeKey2' => 'new_order_route', //路由key ]; if(!$conn->connect()){//建立连接 die('connetc error'); } $channel = new AMQPChannel($conn); //创建channel(信道或者叫通道) //创建交换机对象 第1个队列 $exChange = new AMQPExchange($channel); $exChange->setName($params['exchangeName']); $exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复 $exChange->declare(); //创建交换机对象 第2个队列 $exChange2 = new AMQPExchange($channel); $exChange2->setName($params['exchangeName2']); $exChange2->setType(AMQP_EX_TYPE_DIRECT); //direct类型 $exChange2->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复 $exChange2->declare(); //第1个队列 $queue = new AMQPQueue($channel); $queue->setName($params['queueName']); $queue->setFlags(AMQP_DURABLE); //队列持久化 $queue->setArguments([ //如果过期,把订单转发到以下 'x-dead-letter-exchange' => $params['exchangeName2'], 'x-dead-letter-routing-key' => $params['routeKey2'], 'x-message-ttl' => 10000, //60秒过期,注意:message是单个消息过期,mq也可以设置整个队列过期时间 ]); $queue->declare(); //第2个队列 $queue2 = new AMQPQueue($channel); $queue2->setName($params['queueName2']); $queue2->setFlags(AMQP_DURABLE); //队列持久化 $queue2->declare(); //绑定交换区与队列,指定路由键 第1个队列 $queue->bind($params['exchangeName'], $params['routeKey']); //绑定订单定时队列 //绑定交换区与队列,指定路由键 第2个队列 $queue2->bind($params['exchangeName2'], $params['routeKey2']); //绑定过期转发订单队列 echo $exChange->publish('订单'.date('Y-m-d H:i:s'), $params['routeKey']); //发送消息,往第1个队列发 $conn->disconnect(); //关闭连接
运行下这个程序:
这里有1条消息
过期后转到这里来了!