rabbitmq

时间:March 29, 2019 分类:

目录:

消息中间件

消息中间件是在消息传输过程中保存消息的容器,消息中间件再将消息从它的源中继到它的目标时充当中间人的用,队列的主要目的是提供路由并保证消息的传递,如果发送的消息时接收者不可用,消息队列会保留消息,直到可以成功的传递它为止,当然消息队列保存的消息也是有期限的。

强调就是提供路由,并且有期限即租约的概念

  • 消息发送者可以发送一个消息而无需等待响应,消息发送者将消息发送到一条虚拟的通道(主题或者队列),消费者只需要监听或者订阅该通道,一条消息可以发给一个或者多个消息接收者,这些接收者都无需对消息发送者做出同步的回应,这个过程是一个异步的过程。

最简单的就是用户注册功能,在注册完成的同时,会写入后台数据库,会发邮件提醒,数据库和邮件服务器都是接收者。

  • 应用程序和应用程序之间是松耦合的关系,发送者和接受者甚至都可以不同时在线

例如在线交易系统为了保证数据的最终一致,在支付系统处理过后会把支付的结果放到消息中间件中,通知订单系统修改订单的支付状态,这两个系统就通过次过程解耦,而支付和消息中间件是一个事务,要强一致

两个程序之间可以是程序——>消息中间件——>网络——>消息中间件——>程序

点对点模型

点对点模型主要用户消息生产者和消息的消费者之间点对点之间通信,消息生产者讲消息发送给特定名字标识的消费者,这个名字实际上是对应于消费服务中的一个队列,在消息传递给消费者之前它被存储在这一消息队列中,队列消息可以放在内存中,也可以放在磁盘中持久化,以确保消息服务出现故障的时候仍然能够进行消息的传递

特点为

  1. 每个消息只有一个消费者
  2. 发送者和接受者没有时间依赖
  3. 接受者确认消息接受和处理成功

在接受者确认数据接受并成功处理后,消息中间件才会进行数据的删除

发布-订阅模型

所有的订阅者都是像一个特定的主题进行订阅,当消息产生后,会发送给消息中间件,然后广播给所有的订阅者

特点为

  1. 每个消息可以有0到多个订阅者
  2. 发布者和订阅者有时间依赖
  3. 订阅者必须保持持续的活动状态以接收消息,如果订阅者无法进行连接就不会再给该订阅者发送该数据(非持久订阅),持久性订阅则建立订阅关系后,消息就不会消失,不管订阅者是否在线,消息中间件会有订阅者列表,和每个消息的订阅者的反馈,是成功还是失败还是未知。对于失败和未知会持续进行发送数据,直到可以连通之后会把未发到的数据进行发送给订阅者。

一般生产环境中会使用持久订阅,而非持久订阅一般用于像网站活动进行内容推送

消息中间件的应用场景

  1. 网站用户注册,在注册成功后,可以写入数据库,然后会有短信或者邮件提醒,写入数据库和短信和邮件提醒就是通过消息中间件进行发送
  2. 日志分析,很多应用服务日志都写入到相同的数据中间件进行数据汇总,然后进行PV分析,或者行为分析
  3. 消息暂存,通过消息中间件进行消息暂存,可以用于后期进行压测,当然也可以通过tcpcopy等方式
  4. 消息延迟发送,用于限时秒杀等活动

AMQP协议

消息中间件主要靠一个AMQP协议,AMQP协议是一个异步消息传递所使用的应用层协议规范。

分两种模式,一个是push模式推送消息,一个是pull模式拉取消息。

push模式需要消息中间件记录订阅关系和推送轨迹,但是当消费者故障的时候会存在服务端积压消息和订阅者过多造成的推送压力过大。

pull模式的实时性不好,需要客户端记录pull的偏移量等数据

rabbitMQ

rabbitMQ实在AMQP协议标准的基础上完成,遵循Mozila Public License,采用Erlang实现的工业级的消息队列服务器。

其核心组件为Exchange(路由规则)和Queue(队列)

  • Service(broker):接收客户端连接,实现AMQP消息队列和路由功能,可以包含多个virtual host
  • virtual host:一个虚拟概念,类似于权限控制组,一个virtual host里面可以有若干个Exchange和Queue,但是权限控制的最小细粒度为virtual host
  • Exchange:接收生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列,ExchangeType决定了Exchange的路由消息行为,例如,在RabbitMQ中,ExchangeType有direct,Fanout和topic三种,不同类型的Exchange路由行为是不一样的
  • Queue:消息队列,用于存储还未被消费者消费的消息,非持久存储会在写在内存,重启后失效,而持久化存储会写入到磁盘
  • Message:由Header和Body组成,Header是由生产者添加的各种属性的集合,包括message是否持久化,由哪个Message Queue接收,优先级是多少,而Body是真正传输数据
  • BindingKey:所谓绑定就是将一个特定的Exchange和一个特定的Queue绑定起来,绑定的关键字为BindingKey

direct Exchange 直接交互式处理路由键(routing key),需要将一个队列绑定到交换机上,要求消息与一个特定的路由键完全匹配,如果一个队列绑定到交换机上要求为路由键为'why',那么只有被标记为'why'的消息才被转发,不会转发其他任何路由键,包括'why.domain','why.root'等在why的基础上扩展的路由键都不进行转发。

Fanout Exchange 广播式路由键,需要将队列绑定到交换机上,每个发送到交换机的消息都会被转发到该交换机绑定的所有队列上,相当于复制了多份,不过Fanout转发是最快的

topic Exchange 主题式路由键,通过消息的路由关键字和绑定关键字的匹配模式,将消息路由到匹配(部分匹配或全部匹配)的队列中,这种路由器可以被支持经典的发布/订阅传输模型,使用主题名字空间作为消息寻址模式,将消息传递给那些部分匹配或者全部匹配的多个消费者,*匹配一个词组,#匹配零个或多个词组

队列方式

简单队列

Work Queue

一个消息只能被一个消费者获取

有两种模式,一种是RR轮询的方式,另一种是能者多劳的方式,能者多劳就要求消息的确认方式为手动确认

自动确认是消息从队列中读取就认为消费成功,而手动确认需要等待消费者反馈才认为消费成功,在这之前队列都会将该消息标记为不可用状态

订阅模式

一个生产者和多个消费者,每个消费者有自己的队列,而生产者将消息发送到交换机,由交换机发送个各个队列(交换机不会存储数据)

主题匹配模式

安装

$ yum install -y epel-release
$ yum install -y rabbitmq-server
$ rabbitmq-plugins list
$ rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
  mochiweb
  webmachine
  rabbitmq_web_dispatch
  amqp_client
  rabbitmq_management_agent
  rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
$ systemctl start rabbitmq-server.service
$ rabbitmqctl list_queues
Listing queues ...
...done.

可以看到启动了15672端口,可以通过默认的guest用户登录,密码也是guest

常见命令

rabbitmqctl add_vhost vhostname 创建vhost
rabbitmqctl delete_vhost vhostname
rabbitmqctl list_vhost
rabbitmqctl add_user username password 创建用户
rabbitmqctl change_password username password 修改密码 
rabbitmqctl set_permissions -p v_host user ".*" ".*" ".*" 授权
rabbitmqctl set_user_tags username administrator 设置用户权限
rabbitmqctl list_queues 查看队列

常用配置

/etc/rabbitmq/rabbitmq-env.conf

  • RABBITMQ_NODE_IP_ADDRESS 指定IP地址
  • RABBITMQ_NODE_PORT 指定端口号,默认5672
  • RABBITMQ_CONFIG_FILE 配置文件目录,配置文件的后缀必须为.config
  • RABBITMQ_LOG_BASE 日志文件路径

config采用了标准的erlang的配置文件

  • tcp_listeners设置rabbitmq的监听端口
  • disk_free_limit磁盘低水位线,磁盘容量低于指定值则停止接收数据
  • vm_memory_high_watermark内存的低水位线,低于则会开启流控机制,默认值为0.4,即40%

rabbitmq会维持大量的网络连接,所以系统允许同时打开的最大文件数需要调整。推荐的允许同时打开的最大文件数为65535。有两个地方需要设置:

  • 系统每个用户允许的最大同时打开文件数,ulimit -n,可以通过ulimit -n size设置
  • 系统允许的最大同时打开文件数,fs.file-max