道者编程


消息队列rabbitmq

RabbitMQ是一个在AMQP基础上完成的,可复用的企业消息系统,底层基于Erlang语言。

一:centos7安装RabbitMQ

这玩意儿安装很扯淡,官方推荐rpm安装,rpm安装本身是最简单的,但是安装RabbitMQ却不简单,很可能需要修改仓库地址。不同linux版本不一样,centos6和centos7也不一样。我这里不用rpm,手动编译Erlang,然后选择编译好的RabbitMQ。

1:安装Erlang

1):先安装几个必要的插件

$ yum -y install gcc glibc-devel make ncurses-devel openssl-devel autoconf unixODBC unixODBC-devel socat

2):Erlang下载地址:http://www.erlang.org/downloads,我这里下载21.1版本

$ wget http://erlang.org/download/otp_src_21.1.tar.gz #下载
$ tar -xvf otp_src_21.1.tar.gz #解压
$ cd otp_src_21.1/ #进入目录准备编译
$ ./configure --prefix=/usr/local/erlang --without-javac #忽略java编译
$ make #编译
$ make install #安装

 make & make install 这两步很慢,巨慢无比,耐心等待。

3):进入我们安装后的目录测试一下是否安装成功

$ /usr/local/erlang/bin/erl
安装成功

二:安装rabbitmq

1:下载地址:http://www.rabbitmq.com/download.html

2:因为我上面的Erlang是手动编译的,所以这里不选择rpm方式安装,直接下载解压包,从这里下载:https://github.com/rabbitmq/rabbitmq-server/releases

$ wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz
$ tar xvJf rabbitmq-server-generic-unix-3.7.8.tar.xz
$ mv rabbitmq_server-3.7.8/ /usr/local/rabbitmq #解压后移动到你想放到的目录
这个是编译好的,可以直接用。

3:设置环境变量,设置两个,一个是Erlang,一个是rabbitmq,打开文件/etc/profile文件,在文件最后加入以下三行:

export ERLANG_PATH=$PATH:/usr/local/erlang/bin #erlang安装目录
export RABBITMQ_PATH=$PATH:/usr/local/rabbitmq/sbin #rabbitmq安装目录
export PATH=$PATH:$ERLANG_PATH:$RABBITMQ_PATH
运行命令生效:

source /etc/profile
启动一下:

$ rabbitmq-server

成功

启动web管理后台:

$ rabbitmq-plugins enable rabbitmq_management #disable为关闭
开启防火墙,打开15672端口

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload #重启防火墙
web界面出来了

guest用户被限制,只能通过127.0.0.1访问,当然也可以修改配置文件开启guest远程访问。这里我们新建一个用户,并授予管理员权限。
用户名:admin 密码:123

$ rabbitmqctl add_user admin 123 #添加用户
$ rabbitmqctl set_user_tags admin administrator #分配角色

登录成功

相关命令:命令在/usrlocal/rabbitmq/sbin下

rabbitmq-server -detached #后台启动
rabbitmqctl stop #关闭服务
rabbitmqctl status #查看状态
rabbitmqctl list_users #列出角色

三:安装php扩展(我用的php7.2版本)

php是用amqp调用RabbitMQ,所以先下载ampq

$ wget https://pecl.php.net/get/amqp-1.9.3.tgz #下载
$ tar -xvf amqp-1.9.3.tgz #解压
$ cd amqp-1.9.3
$ /usr/local/php/bin/phpize #用phpize生成编译文件,注意查看你的php在哪里
$ ./configure --with-php-config=/usr/local/php/bin/php-config
到这一步,我这里报错了:checking for amqp using pkg-config... configure: error: librabbitmq not found

这个错误提示还要安装一个破玩意:rabbitmq-c

去这里下载:https://github.com/alanxz/rabbitmq-c/releases

$ wget https://github.com/alanxz/rabbitmq-c/archive/v0.9.0.tar.gz
$ tar -xvf v0.9.0.tar.gz
$ cd rabbitmq-c-0.9.0/
准备configure的时候,发现没有configure,0.9改成cmake了,靠,安装一下cmake

$ yum -y install cmake
$ cmake . -DCMAKE_INSTALL_PREFIX=/usr/local/rabbitmq-c-0.9.0 #指定安装目录
$ make
$ make install

然后回过头去再编译amqp-1.9.3

$ ./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.9.0
$ make
$ make install

妈的,make报错!

发现点蛛丝马迹,上面进入了/usr/local/rabbitmq-c-0.9.0/lib 目录,查看一下发现/usr/local/rabbitmq-c-0.9.0/没有lib,但有个lib64位。

处理一下:

$ cp -R /usr/local/rabbitmq-c-0.9.0/lib64/ /usr/local/rabbitmq-c-0.9.0/lib

接着干:make && make install ,OK,这也太不智能了吧。


加入到php.ini 查看一下!

extension=amqp.so 
 

扩展安装成功,这时候就可以用PHP操作RabbitMQ了。

四:一些基本术语参数

1:Message:消息。包括消息头和消息体

2:Publisher:生产者。发布消息的一方

3:Consumer:消费者。接受消息的一方

4:Connection:网络链接,TCP链接

5:Channel:信道。建立在真实的TCP连接内地虚拟连接,所有命令通过信道输入输出,多路复用一条 TCP 连接,降低TCP开销

6:ExChange:交换区。因为RabbitMQ是基于AMQP的,AMQP协议中的核心思想就是生产者和消费者隔离,也就是生产者不直接把消息发到队列,而是先发给ExChange(交换区),Exchange按照特定的策略转发到Queue(队列中)进行存储,所以ExChange的作用就是负责转发,生产者只面向 Exchange 发布消息,消费者只面向 Queue 消费消息。

Direct:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,完全相等,则发送到该Binding对应的Queue中。

Topic:模糊匹配,如果匹配上了,则发送到该Binding对应的Queue中。
* 表示可以匹配零个或多个字符(Routing key是user.# user.a user.b user都可以匹配)
# 表示可以匹配一个字符 (Routing key是user.* user.a user.b 可以匹配;user user.a.c 不可以匹配)

Fanout:直接将消息转发到所有binding的对应queue中,忽略Routing key。

Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。

7:Binding:绑定。建立交换区与消息队列之间的关联,也就是设置规则,交换区该发送到哪个队列。

8:Queue:消息队列容器,存储消息队列的地方。durability表示是否持久化,durable表示是,transient表示否。

9:Vhost:翻译叫什么虚拟主机,其实就类似于mysql的数据库,一个意思。

五:一些基本命令

$ rabbitmq-server -detached #后台启动
$ rabbitmqctl stop [] #停止RabbitMQ服务,同时关闭erlang节点和应用程序
$ rabbitmqctl status #查看状态
$ rabbitmqctl stop []  
$ rabbitmqctl stop_app  #停止RabbitMQ服务,仅关闭erlang节点上的rabbit应用程序
$ rabbitmqctl start_app   #启动erlan node上的rabbitmq的应用
    

#用户管理
$ rabbitmqctl list_users #列出角色
$ rabbitmqctl add_user admin 123 #添加用户和密码 这里用户名:admin 密码:123
$ rabbitmqctl set_user_tags admin administrator #分配角色
$ rabbitmqctl change_password username newpassword #修改用户密码
$ rabbitmqctl delete_user username #删除用户

# vhost(Virtual host)管理,这玩意儿相当于mysql的数据库
$ rabbitmqctl add_vhost {name} #添加
$ rabbitmqctl delete_vhost {name} #删除
$ rabbitmqctl list_vhosts {name} #查看所有

六:php操作RabbitMQ

1:实践:用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1

<?php
$conn = new AMQPConnection([
    'host' => '127.0.0.1',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest'
]);
if(!$conn->connect()){//建立连接
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)


$ExChangeName = 'goods_msm'; //交换区名称
$queueName = 'goods_worker'; //队列名称
$routeName1 = 'code1'; //路由key 


//创建交换机对象     
$exChange = new AMQPExchange($channel);    
$exChange->setName($ExChangeName); 
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
echo "Exchange Status:".$exChange->declare()."\n";   //查看如果交换机不存在则进行创建
运行,如果没有报错的话会输出:

Exchange Status:1

通过web窗口看一下

交换区建立成功

继续:

//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化 
echo "Message Total:".$queue->declare()."\n";   //查看,如果不存在则创建
可以看到队列创建成功

继续往下走:

//绑定交换区与队列,指定路由键 
//rabbitmq不是直接发送到队列,发送到交换区,由交换区决定发给某个队列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //绑定路由
$conn->disconnect(); //关闭连接
可以看到绑定成功

也可以用命令查看:


这样就创建成功了,下面继续用生产者,消费者干起来。最终代码:

<?php
/**
 * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1
 * 此代码不是生产者,也不是消费者
**/
$conn = new AMQPConnection([
    'host' => '127.0.0.1',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest'
]);
if(!$conn->connect()){//建立连接
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)

$ExChangeName = 'goods_msm'; //交换区名称
$queueName = 'goods_worker'; //队列名称
$routeName1 = 'code1'; //路由key 

//创建交换机对象     
$exChange = new AMQPExchange($channel);    
$exChange->setName($ExChangeName); 
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
echo "Exchange Status:".$exChange->declare()."\n";   //查看如果交换机不存在则进行创建

//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化 
echo "Message Total:".$queue->declare()."\n";   //查看,如果不存在则创建


//绑定交换区与队列,指定路由键 
//rabbitmq不是直接发送到队列,发送到交换区,由交换区决定发给某个队列
echo 'Queue Bind: '.$queue->bind($ExChangeName, $routeName1)."\n"; //绑定路由
$conn->disconnect(); //关闭连接

2:生产者

<?php
/**
 * 生产者
 * 生产者也就是发送方
**/
$conn = new AMQPConnection([
    'host' => '127.0.0.1',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest'
]);
if(!$conn->connect()){//建立连接
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)

$ExChangeName = 'goods_msm'; //交换区名称
$routeName1 = 'code1'; //路由key 

//创建交换机对象   
$exChange = new AMQPExchange($channel);    
$exChange->setName($ExChangeName); 

$exChange->publish('第一条测试消息', $routeName1); //发送消息
$conn->disconnect(); //关闭连接
 运行一下,到web管理界面看看

消息发送成功

3:消费者

<?php
/**
 * 用PHP创建交换区:goods_msm,队列名称:goods_worker,以及路由key:code1
 * 此代码不是生产者,也不是消费者
**/
$conn = new AMQPConnection([
    'host' => '127.0.0.1',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest'
]);
if(!$conn->connect()){//建立连接
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)

$queueName = 'goods_worker'; //队列名称

//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化 


//接受消息
$queue->consume(function ($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //处理消息
}, AMQP_AUTOACK); //自动应答

$conn->disconnect(); //关闭连接

注意:这里需要注意的是这个方法:$queue->consume,queue对象有两个方法可用于取消息:consume和get。前者是阻塞的,无消息时会被挂起,适合循环中使用;后者则是非阻塞的,取消息时有则取,无则返回false。

就是说用了consume之后,会同步阻塞,该程序常驻内存,不能用nginx,apache调用。 

运行一下生产者,消费端输出:


7:实战

实战1:延时队列

(1)需求:电子商务网站下单后,30分钟如果未付款,改变订单状态

(2)原理:rabbitmq没有直接延时队列的功能,不过可以通过模拟实现此功能。用过redis的都知道,redis有一个ttl功能,就是生存周期,某个key设置生存周期,过期就会消失,rabbitmq也有此功能。rabbitmq消息过期进入死信队列,然后配置一个转发,把死信队列的消息转发到某个队列,这样就可以操作这个队列。

(3)步骤:

a:创建订单的时候,同时发送订单到消息队列,并且设置过期时间

b:mq回调,也就是改变订单状态服务,判断30分钟后是否付款,如果未付款,改变订单状态为:无效订单。

MQ也要分两步,首先是进入死信队列,但是死信队列不能直接消费,需要转发出来,利用MQ的两个特性:

a:ttl,生存周期

b:Dead Letter Exchanges(DLX)

这里设置两个地方用于转发:

x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange
x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

具体参考MQ官方文档:http://www.rabbitmq.com/dlx.html

demo:

<?php
/**
1:创建第1个队列,此队列为下单的时候放入队列,然后设置过期时间
2:创建第2个队列,第1个队列过期后自动转入该队列
3:所以处理N久时间过期,只要处理第2个队列就行了。
**/
$conn = new AMQPConnection([
    'host' => '127.0.0.1',
    'vhost' => '/',
    'port' => 5672,
    'login' => 'guest',
    'password' => 'guest'
]);

$params = [
	//订单最开始在这里 第1个队列
    'exchangeName' => 'old_order_exchange', //交换区名称
    'queueName' => 'old_order_msg', //订单队列
    'routeKey' => 'old_order_route', //路由key 

    //过期后转发到这里 第2个队列
    'exchangeName2' => 'new_order_exchange', //交换区名称
    'queueName2' => 'new_order_msg', //订单队列
    'routeKey2' => 'new_order_route', //路由key 
];

if(!$conn->connect()){//建立连接
  die('connetc error');
}
$channel = new AMQPChannel($conn); //创建channel(信道或者叫通道)

//创建交换机对象 第1个队列
$exChange = new AMQPExchange($channel);    
$exChange->setName($params['exchangeName']); 
$exChange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exChange->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
$exChange->declare();

//创建交换机对象 第2个队列
$exChange2 = new AMQPExchange($channel);    
$exChange2->setName($params['exchangeName2']);
$exChange2->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exChange2->setFlags(AMQP_DURABLE); //持久化 ,支持rabbitMq重启时交换机自动恢复
$exChange2->declare();

//第1个队列
$queue = new AMQPQueue($channel); 
$queue->setName($params['queueName']);   
$queue->setFlags(AMQP_DURABLE); //队列持久化 
$queue->setArguments([
	//如果过期,把订单转发到以下
	'x-dead-letter-exchange' => $params['exchangeName2'],
	'x-dead-letter-routing-key' => $params['routeKey2'],
	'x-message-ttl' => 10000, //60秒过期,注意:message是单个消息过期,mq也可以设置整个队列过期时间
]);
$queue->declare();

//第2个队列    
$queue2 = new AMQPQueue($channel); 
$queue2->setName($params['queueName2']);   
$queue2->setFlags(AMQP_DURABLE); //队列持久化 
$queue2->declare();

//绑定交换区与队列,指定路由键 第1个队列
$queue->bind($params['exchangeName'], $params['routeKey']); //绑定订单定时队列

//绑定交换区与队列,指定路由键 第2个队列
$queue2->bind($params['exchangeName2'], $params['routeKey2']); //绑定过期转发订单队列

echo $exChange->publish('订单'.date('Y-m-d H:i:s'), $params['routeKey']); //发送消息,往第1个队列发

$conn->disconnect(); //关闭连接

 运行下这个程序:

这里有1条消息

过期后转到这里来了!


最新评论:
2楼 广东省深圳市南山区 电信 发表于 2018-11-20 09:56:31
给你64分,还有36分,给你双击666
共有 1 条记录  首页 上一页 下一页 尾页 1
我要评论:

看不清楚


链接