kafka权威指南1~5

时间:Sept. 18, 2019 分类:

目录:

kafka最初是LinkedIn的一个内部基础设施系统,不是解决传统的数据存储系统,而是把数据看成持续变化和不断增长的流,基于这种想法构建的数据系统,而在想法实现后比预先有更广的适用性

kafka是一个分布式系统,可以自由伸缩,可以存储数据,提供了数据层的可复制,持久化和存储时间自由定义。

所以kafka并不是简单的消息队列

1. 初始kafka

1.1 发布和订阅消息系统

发布和订阅是一个经典的场景了,生产者根据某些方式进行分发(当然很多的都不分发直接发送到一个队列),订阅者通过订阅来接受指定的消息,中心点broker

1.2 kafka登场

kafka的数据按照一定的顺序持久化保存

1.2.1 消息和批次

kafka的数据单元为消息,包括键和值。消息写入的时候可以通过hash键的方式进行分发到不同分区,保证相同的键会到相同的分区。

为了提高效率,属于同一主题和分区的消息会分批次写入kafka,可以减少网络开销,不过时延就会增大

1.2.2 模式

消息模式是用来定义消息内容,例如json和xml格式

1.2.3 主题和分区

kafka的消息通过topic主题进行分类

topic可以分为好多partition分区,消息以追加的方式写入分区,因为分区的问题无法保持主题范围内的消息读取顺序,然后保持先进先出的方式顺序读取。

分区也是kafka实现数据冗余和伸缩性的方式。

1.2.4 生产者和消费者

生产者创建消息,一般一个消息会发布到一个topic上,会在默认情况下就将消息均匀的分布到主题的所有分区上,而不用关心是写到那个分区

消息键和分区器可以实现为键生成一个散列值,将其映射到指定分区,生产者也可以自定义分区器,根据业务规则将消息映射到分区

消费者读取消息,可以订阅一个或者多个topic,然后按照消息顺序读取,通过消息的偏移量来区分读取过的信息。

偏移量是另一种元数据,在给定分区每个消息的偏移量都是唯一的,消费者将每个消息偏移量保存在zookeeper或者kafka上,消费者关闭或者重启不会消失。

消费者是消费群组的一部分,消费者读取各自所属的分区,这种映射为所有权关系

1.2.5 broker和集群

集群内节点被称为broker,负责接收消息,为消息设置偏移量,并提交到磁盘保存,为消费者提供服务,对分区请求响应,返回磁盘上的消息。

每个集群内部会有一个broker充当控制管理器,负责将分区分配到broker和监控broker

一个分区属于一个broker,这个broker就是分区的leader

一个分区会分配个多个broker,就会产生分区复制,为分区提供了消息冗余

保留消息是在broker层面,保留一定时间或者一定大小,达到上限就删除,而主题层面定义保留策略,可以为消息保留到不再使用

1.2.6 多集群

使用了kafka提供的MirrorMaker

1.3 为什么使用kafka

  • 多生产者
  • 多消费者
  • 基于磁盘的数据存储
  • 伸缩性
  • 好性能

2 安装kafka

2.1 安装zk(略)

2.2 安装kafka的broker(略)

启动kafka

$ ./kafka-server-start.sh -daemon

创建topic

$ ./kafka-topics.sh --create --zookeeper 192.168.121.15:2181/kafka --replication-factor 2 --partitions 1 -topic test
Created topic "test".
$ ./kafka-topics.sh  --list --zookeeper 192.168.121.15:2181/kafka | grep test
test
$ ./kafka-topics.sh  --zookeeper 192.168.121.45:2181/kafka --describe --topic test
Topic:test  PartitionCount:1    ReplicationFactor:2 Configs:
    Topic: test Partition: 0    Leader: 2   Replicas: 2,6   Isr: 2,6

主要是配置的server.properties中zookeeper.connect=192.168.121.45:2181,192.168.121.4:2181,192.168.121.5:2181,192.168.121.14:2181,192.168.121.15:2181/kafka

往测试topic发送数据

$ ./kafka-console-producer.sh --broker-list 192.168.121.21:9092,192.168.121.115:9092,192.168.121.18:9092,192.168.121.92:9092,192.168.121.29:9092 --topic test
test message 1
test message 2

查看集群内的broker可以使用

$ ./kafka-broker-api-versions.sh --bootstrap-server 192.168.121.92:9092

测试读取数据

$. /kafka-console-consumer.sh --zookeeper 192.168.121.15:2181/kafka --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
test message 1
test message 2

2.3 broker配置

2.3.1 常规配置

  • broker.id
  • port
  • zookeeper.connect
  • log.dirs
  • num.recovery.threads.per.data.dir 这个配置是每个目录的线程,对broker崩溃进行恢复的时候可以节省时间
  • auto.create.topics.enable 如果是手动创建topic需要将其设置为false

2.3.2 topic的默认设置

kafka为topic设置了很多的默认配置,包括log.retention.hours.per.topic,log.retention.bytes.per.topic和log.seg.ment.bytes.per.topic

1.number.partition

参数指定了topic包含多少partition,默认topic自动创建功能是开启的,partition就是该参数的值,默认是1,分区数可以增加但是不能减少,如果如要小于配置值,需要手动创建主题了

选定分区数量考虑的因素

  • topic的吞吐量
  • 一般写入数据库也不会超过每秒50MB,单分区吞吐一般也不会超过50MB
  • 参考broker的磁盘,网络带宽
  • 单个broker对分区的个数有限制,分区越多,内存占用越多,会影响选举时间

2.log.retention.ms

kafka根据时间保留数据,默认是168小时(一周),一共有log.retention.hour|minutes|ms三个参数,但是是以文件最后修改时间为准,如果迁移过分区就不准确了

3.log.retention.bytes

kafka根据大小保留数据,默认是一个分区1GB,以时间和大小最先达到的为准

4.log.segment.bytes

kafka日志分段大小,默认是1GB

5.log.segment.ms

kafka日志分段时间,没有默认值

6.message.max.bytes

限制单个消息的大小,默认是1000000,也就是1MB,指的是压缩后的大小

如果生产者生产的消息大于这个值,broker会返回错误,消费者也最好设置与之相等,如果小于会存在不能消费的情况

同时需要调整的有replicas.fetch.max.byte,用于同步的消息大小,为啥不设置为一个?

2.4 硬件的选择

kafka的jvm不需要很大,内存都用作页缓存即可

2.5 云端kafka(略)

2.6 kafka集群(略)

2.7 生产环境的注意事项(略)

3. 生产者

3.1 生产者概览

  1. 创建一个ProducerRecord发送给分区器
  2. 分区器确定分区后将记录添加到记录批次(属于同一主题和分区的属于一个批次)
  3. 生产者的独立线程将其发送到响应的broker
  4. server收到响应之后会返回一个RecordMetaData对象包含主题分区和偏移量,如果写入失败会返回错误,生产者重试,重试多次都失败就返回错误信息

3.2 创建kafka生产者(略)

3.3 发送消息到kafka(略)

3.4 生产者配置

1.acks

  • 0 生产者不会等待有多少个副本收到消息
  • 1 生产者等待leader写入回复消息
  • all 生产者等待所有副本写入回复消息

2.buffer.memory

生产者内存缓冲区大小,当满了之后要不send阻塞,要不抛出异常,取决于block.on.buffer.full,在0.9版本更换为max.block.ms表示可以阻塞多久

3.compression.type

默认情况下消息不进行压缩,可以配置snappy,gzip和lz4等

  • snappy占用cpu较少,但是压缩比更高
  • gzip占用cpu较多,但是压缩后更小

4.retries

重试次数,重试间隔100ms,可以通过retry.backoff.ms配置(可以测试故障恢复或者选举的时间)

5.batch.size

同一批次可用的内存大小,如果配置小了,内存满了一定会发送,配置大了,生产者可能不会等到满就进行发送,由linger.ms配置

6.linger.ms

上述

7.client.id

用于表示,可以是任意字符串

8.max.in.flight.requests.per.connection

生产者在收到服务器响应之前可以发送多少消息,值越高就会占用越多的内存,但是也会提高吞吐量,timeout.ms指定了同步副本返回消息确认的时间

9.timeout.ms,request.timeout.ms

上述

10.max.block.ms

用于控制生产者获取元数据信息的阻塞时间

11.max.request.size

生产者发送的消息最大大小,也可以是一个批次的最大大小

12.receive.buffer.bytes和send.buffer.bytes

定义生产者接收和发送数据包的缓冲区大小,如果为-1使用操作系统默认值

3.5 序列化器

3.6 分区

4. 消费者

4.1 kafkaconsumer的概念

4.1.1 消费者和消费者群组

消费者属于消费者群组,订阅是由群组完成

当有四个分区

  • 一个消费者,这个消费者消费四个分区
  • 两个消费者就是一对二
  • 四个消费者就是一对一
  • 五个消费者也是一对一,但是有一个消费者没有分区

多个群组可以订阅同一个topic,群组之间互不影响

4.1.2 消费者群组和分区再平衡

当消费者群组中的消费者崩溃的时候,这个分区将由群组中的其他消费者来进行读取,这个过程被称为再平衡,在再平衡的期间的时候会造成群组消费短时间不可用,因为需要进行重新获取消费的偏移量

消费者通过群组协调器的broker(每个群组都有一个协调器)发送心跳,用于维护群组从属关系和分区所有权。消费者在轮训消息或者提交偏移量时发送心跳,在0.10版本出现单独的心跳线程,用于固定频率发送心跳

4.2 创建消费者

4.3 订阅主题

4.4 轮序主题

4.5 消费者配置

1.fetch.min.bytes

消费者从服务器获取消息的最小字节数,当有足够的数据才会发送给消费者,否则在数据量大的时候频番发送会造成CPU升高

2.fetch.max.wait.ms

消费者从服务器获取消息的最长等待时间,和fetch.min.bytes中第一个生效的

3.max.partition.fetch.bytes

每个分区返回给消费者的最大字节数,默认是1MB

4.session.timeout.ms

心跳超时时间,默认是3s,消费者发送的频率通过heartbeat.interval.ms,一般为session.timeout.ms的1/3

5.auto.offset.reset

当偏移量失效的读取位置,默认是latest是最新的,另一个earliest从分区起始位置

6.enable.auto.commit

提交偏移量的方式,more你是true,为了避免重复数据和数据丢失可以设置为false,由自己控制提交偏移量

7.partition.assignment.strategy

分区和消费者间的分配策略,有Range和RoundRobin

8.client.id

用来标识发送的消息

9.max.poll.records

单次调用call方法能返回的记录数

10.receive.buffer.bytes和send.buffer.bytes

读写的缓冲区大小,可以设置为-1使用操作系统的默认配置

4.6 提交和偏移量

4.7 再均衡监听器

4.8 从特定偏移量处开始处理记录

4.9 如何退出

4.10 反序列化器

4.11 独立消费者

5. 深入kafka

5.1 集群成员关系

kafka使用zookeeper维护成员关系,每个broker都有一个单独的标识符,会使用这个id注册到zookeeper上,kafka组件通过订阅zookeeper的/brokers/ids路径,当有broker退出或者加入集群,组件会获得通知

如果启动一个相同id的broker会在zookeeper无法注册

当broker宕机或者网络原因无法连通时,broker会与zookeeper的连接断掉,broker启动时注册的节点会在zookeeper上移除,监听broker列表的kafka组件也会告知broker移除

当broker关闭,id等数据还在zookeeper中保存,如果启动一个与之相同id的broker会拥有与之前一致的分区和topic(但是旧数据不会被拷贝???)

5.2 控制器

控制器其实就是broker,但是负责分区leader的选举,集群第一个启动的broker会在zookeeper里创建/controller让自己成为控制器,其他节点启动的时候也会尝试,但是会返回一个节点已经存在的异常,明白有控制器之后就会去watch控制节点创建的对象,获取节点变更的通知

当控制节点关闭,zookeeper上/controller节点小时,其他节点watch不到就会得到节点消失的消息,然后进行注册,第一个注册成功成为控制器,其他节点获取到存在控制器节点就开始watch这个控制器节点。控制节点会获取到一个全新的更大的Controller epoch,其他broker获取到这个值之后,小于这个值的消息就会忽略

当控制器发现broker关闭离开集群,对于leader为这个broker的分区,会遍历这些分区来为其分配新的leader,就是分区副本列表中的下一个副本(但是不会创建新的副本吗???)

当控制器发现一个broker加入集群,就会使用broker的id来检查是否包含现有分区的副本,如果有就直接,把变更消息发送给新来的broker和其他broker,新的broker上的副本从leader获取消息

如果有多个控制节点,通过Controller epoch防止脑裂

5.3 复制

topic包含多个分区,分区包含多个副本

副本分两种

  • leader副本 每个分区都有一个首领副本,为了保证数据的一致性,所有请求者和消费者都过这个副本
  • slave副本 leader副本以外的所有副本,不处理client的请求,只是从leader副本获取数据,与leader副本保持一致,如果其中一个崩溃,其中一个就会被变为leader副本

leader的另一个作用是搞清楚哪个副本与自己的状态是一致的。

slave副本为了和leader保持一致,会向leader发送获取数据的请求,这种请求和消费者读取消息而发送的请求一致,leader将消息发送给slave。

slave如果10s没有进行请求消息或者10s内没有请求最新的数据就会认为是不同步的,如果slave与leader不同步是不会成新的leader

5.4 处理请求

broker的大部分工作是处理客户端,分区副本和控制器发送分区首领的请求

kafka提供了一个基于TCP的二进制协议,指定了请求消息的格式以及broker如何对请求进行相应

请求消息格式包括

  • Request type(API key)
  • Request version(broker可以处理不同版本的客户端请求,并根据client版本做出不同的响应)
  • Correlation ID(具有唯一性的数字,用于标识请求消息,也会出现在响应消息和错误日志)
  • Client ID(用于标识发送请求的client)

broker会在它所监听的每个端口运行一个Acceptor线程,这个线程会创建一个连接,并将其交给Processor的线程(网络线程)去处理,Processor线程负责从Client获取消息, 并将其放到请求队列,然后从响应队列获取响应数据发送给client

请求消息被放到队列之后IO线程会进行处理会负责处理

  • 生产请求 生产者发送的写入broker请求
  • 获取请求 消费者或者slave副本从broker读取数据

client访问leader副本是需要发往正确的broker,这些信息是从任意一个broker获取,broker会缓存这些信息,client也会缓存,并且会根据metadata.max.age.ms参数来定时更新这些信息

5.4.1 生产请求

acks配置参数指定了要多少个broker确认才可以认为一个消息是写入成功,如果acks为1则leader收到消息就认为写入成功,如果为all则需要所有同步副本收到消息才算写入成功,为0则是生产者把消息发送出去即可,完全不等broker回应

broker在接收请求前会判断发送用户权限,acks值是否有效,acks为all的时候是否有足够多的副本保证消息写入

broker在收到请求后,如果acks为0或1的时候返回响应,额如果为all则等待所有写入后返回响应,消息写入文件系统缓存,然后被刷写到磁盘。

5.4.2 获取请求

获取请求使用topic的那个分区从某一偏移量开始的数据,也可以设置broker最多可以返回多少数据

broker接收到的是一个不存在的偏移量等会返回错误

如果请求的偏移量存在,broker按照client指定的数量上限从分区读取消息,再返回客户端,也可以设置下限,当有一定量消息再进行发送减少CPU和网络开销,也可以定义超时时间,达到多久发送数据

kafka使用了零复制技术,kafka直接将其发送到网络通道,而不经过任何缓冲区,其他数据库会将其保存在本地缓存,这样避免了字节复制,也不需要管理内存缓冲区。

当leader副本写入数据,但是slave副本没有写入数据的时候,也不会将这些数据返回给slave,replica.lag.time.max.ms参数控制副本在复制消息的最大超时时间

5.4.3 其他请求

5.5 物理存储

存储目录通过log.dirs参数进行配置

5.5.1 分区分配

  • broker之间平均分配分区副本
  • 每个分区的每个副本分布在不同的broker
  • 如果broker指定了机架信息,每个分区的每个副本分布在不同的机架

分区的时候没有考虑可用空间和工作负载的问题

5.5.2 文件管理

分区被分为若干的片段,默认一个片段为1GB或者一周的数据,以较小的为准,达到上限就会关闭当前文件并重新开始写一个新文件,新的文件叫做活跃片段,活跃片段不会被删除

5.5.3 文件格式

保存在磁盘上的数据格式和从发送者发送的数据格式,发送给消费者的数据格式是一致的,所以可以使用零复制,也避免了已压缩的数据进行解压和再压缩

5.5.4 索引

kafka为每个分区维护一个索引,索引把偏移量映射到片段文件,索引也是分段的,在删除消息的时候删除对应索引

kafka不维护索引的校验和,如果索引损毁,会重新读取消息并录制偏移量和位置生成索引,如果有必要可以删除索引,是安全的。

5.5.5 清理

5.5.6 清理工作原理

日志片段可以分为两个部分

  • 干净的部分
  • 污浊的部分

主要应对的就是使用kafka存储一些服务的状态,可以合并保留最新的状态,而不关心中间态

5.5.7 被删除的事件

topic删除需要先将其置为null,然后保留一段事件才会被删除

5.5.8 何时会清理主题(略)