rabbitmq实战指南
目录:
1 Rabbitmq简介
1.1 什么是消息中间件
消息传递有两种模式,一种是点对点,另一种是发布订阅
1.2 消息中间件的作用
作用有
- 解耦:中间件在处理过程中插入了一个隐含的基于数据的接口,两边处理都要遵循这一规范,允许两边扩展处理过程
- 冗余:数据持久化直到数据被处理完成
- 扩展性:扩展只需要扩展处理过程即可
- 削峰:在访问量徒增的时候防止后端处理程序崩溃
- 可恢复性:在一部分组件丢失的时候不会影响系统,在恢复之后能继续处理
- 顺序保证:保障数据的顺序性
- 缓冲:提高消息的写入速度
- 异步通信:提供异步处理机制
1.3 Rabbitmq起源
Rabbitmq是Erlang语言实现的AMQP(高级消息队列协议)的消息中间件
之前有一些中间件的商业实现,但是由于价格高昂,为了打破壁垒使消息在平台互通,JMS应运而生,提供JavaAPI方式隐藏单独的MQ产品商的接口,解决互通的问题,activemq就是一种JMS的实现。之后06年Cisio,redhat,imatix等联合定制了AMQP标准,作为应用层的一个开放标准协议
Rabbitmq最初版本实现了AMQP的一个关键特性,对队列和交换机资源进行配置
rabbitmq的特点
- 可靠性,传递确认,发布确认,持久性等
- 灵活的路由,消息进入队列先通过交换器路由消息
- 扩展性,可以多个节点组成一个集群,并可以扩展节点
- 高可用性,队列可以在集群中设置镜像,使部分节点出问题队列仍然可用
- 多种协议,除了支持AMQP协议,还支持,STOMP,MQTT等多种消息中间件协议
- 多语言客户端,支持常用语言,Java,Python,Ruby,php,C#,JavaScript等
- 管理界面
- 插件机制,可以实现多方面的扩展,也可以编写自己的插件
1.4 Rabbitmq的安装和简单使用
- 安装Erlang
- 安装rabbitmq
- 启动服务
rabbitmq-server -detached
并用rabbitmqctl status
查看状态 - 生产和消费
# 添加新用户,用户名为root,密码为rootl23
rabbitmqctl add user root root
# 为root用户设置所有权限
rabbitmqctl set_permissions - p I root ".*" ".*" ".*"
# 设置root用户为管理员角色
rabbitmqctl set user_tags root administrator
2. Rabbitmq入门
2.1 相关概念介绍
- 生产者创建消息到Rabbitmq,消息包含消息体(payload)和标签(Label),标签主要用于表述消息,例如交换器名称和一个路由键
- 消费者订阅到Rabbitmq的队列上,消费的时候消费的消息体,标签会在路由的时候被丢弃
- Queue队列是rabbitmq的内部对象,用于存储消息,多个消费者订阅相同的队列会根据RR的模式获取数据
- 生产者将消息发送到交换器(Exchange),交换器将消息路由到一个或多个队列,如果路由不到会返回给生产者或者直接丢弃,交换器有四种类型会有四种不同的路由策略
- 路由键(routingkey),生产者发送给rabbitmq的时候一般都会指定一个路由键,与交换器类型和绑定键联合使用生效
- 绑定(binding),队列和交换器通过binding方式关联到一起,绑定的时候会指定一个bindingkey
交换器类型
- fanout 把消息路由到所有与之绑定的队列
- direct 把消息路由到BindingKey和RoutingKey完全匹配的队列
- topic 和direct类似,但是不是完全匹配,BindingKey和RoutingKey是用'
.
'分隔的字符串,使用*和#来进行模糊匹配
- header 根据消息内容的header进行路由,性能差
rabbitmq运转流程
生产者发送消息
- 生产者连接到rabbitmq的broker,并建立一个连接(Connection),开启一个信道(Channel)
- 生产者声明一个交换机,并设置相关属性(交换机类型,是否持久化)
- 生产者声明一个队列,并设置相关属性(是否排他,是否持久化,是否自动删除)
- 生产者将交换机和队列通过路由键绑定
- 生产者发送消息到broker,消息包括路由键和交换机等信息
- 响应的交换机根据接收的路由键查找匹配的队列
- 如果找到就放到相应的队列,如果没有找到就根据生产者配置属性选择丢弃还是返回生产者
- 关闭信道
- 关闭连接
消费者接受消息
- 消费者连接到rabbitmq的broker,并建立一个连接(Connection),开启一个信道(Channel)
- 消费者向指定的消费队列请求消息,可能会设置回调函数以及一些准备工作
- 等待broker回应并投递响应队列的消息,消费者接受消息
- 消费者确认接收到消息
- broker删除从队列中删除已经完成的消费的消息
- 关闭信道
- 关闭连接
连接就是tcp的连接的,可以进行复用,信道具有不同的Id,是建立在Connect上的虚拟连接,rabbitmq的每条AMQP指令都是通过信道完成的,如果信道本身流量很大就需要开辟多个Connect,将信道均摊到多个Connect
2.2 AMQP协议介绍
生产者的流转过程
消费者的流转过程
3 客户端开发向导
Connection可以用来创建多个Channel,但是Channel在线程之间不共享,因为多线程共享Channel是非线程安全的
消费模式有两种,推push和拉pull
- 推模式通过持续订阅的方式来消费消息
- 拉模式单条的获取消息
autoAck是否自动确认消息被消费,当autoAck为false的时候就会在将队列的消息分为等待投递给消费者的消息和已经投递但是未被确认的消息,对应的web管理界面的Ready和Unacked的消息数,命令rabbitmqctl list_queues name messages_ready
也可以看到
4 Rabbitmq进阶
4.1 消息的何去何从
- mandatory,如果为true,消息没有符合条件的队列会返回给生产者,如果为false就被丢弃
- immediate,如果为true,当队列没有消费者的时候不会被放到队列,而直接返给消费者,3.0被抛弃了
- 备份交换机,将没有对应队列的消息将其存储在这里,防止被丢弃
4.2 过期时间
- 消息的过期时间,channel.queueDeclare方法中加入x-message-ttl参数实现的,单位为毫秒
- 设置队列的TTL,通过 channel.queueDeclare方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间
4.3 死信队列
死信交换机DLX(Dead Letter Exchange),当一个消息在队列中变为死信的时候就会被发送到DLX
消息变为死信的情况:
- 消息被拒绝,并设置requeue参数为false
- 消息过期
- 队列达到最大长度
通过在channel.queueDeclare方法中设置x-dead-letter-exchange参数来为这个队列添加DLX
web管理界面的
- D持久化
- TTL超时时间
- DLX为设置了死信队列
- DLK关联routingkey
4.4 延迟队列
消息发送到队列不会立刻被消费者立刻拿到
使用场景
- 订单下单后有30min支付,如果30min没有支付成功,那么这个订单就会异常处理
- 手机遥控智能设备在指定时间进行工作,就可以将用户指令发送到延迟队列,当指令设定的时间到了再推送到设备
实际是通过TTL和死信队列完成的延迟效果
4.5 优先级队列
优先级大的队列有高的优先权,优先级高的消费具备优先被消费的权限,当然在队列没有堵塞的情况下才有优先消费
可以通过设置队列的x-max-priority参数来实现,消息的优先级默认为0,最大不能超过队列的优先级
4.6 RPC实现
略
4.7 持久化
持久化分三个部分
- 交换器
- 队列
- 消息
交换机和队列持久化都靠声明队列时将durable参数置为true实现,消息的持久化需要将消息的投递方式BasicProperties中的deliveryMode属性设置为2,但是单独设置消息持久化没有用,必须队列也持久化,消息持久化会影响Rabbitmq的写入性能
最后要保证autoack为false,这样才能保证数据被消费并处理,才算数据不丢失
4.8 生产者确认
生产者的消息确认到达服务器
- 事务机制
- 发送方确认机制