kafka权威指南6~11
目录:
6 可靠的数据传递
6.1 可靠性保证
ACID是关系型数据库普遍支持的标准可靠性标准,指的原子性,一致性,隔离性和持久性
kafka可以保证的功能
- 分区消息的顺序,如果同一个生产者往同一个分区写入消息,而且保证后写入的消息的偏移量大于之前写入的消息
- 当消息被写入分区的所有同步副本时,被认为已提交,生产者可以选择接受不同类型的确认
- 只要有一个副本是活跃的,那么提交的消息就不会丢失
- 消费者只会消费已提交的消息
6.2 复制
分区有多个副本,消息会发送到leader副本,然后其他slave副本保持与其同步
slave被认为是活跃的,需要满足
- 与zookeeper有一个活跃会话,就是在6s内有发送过心跳
- 在过去10s内从首领获取过最新消息,并且是0延迟
如果不满足就被认为是非同步。如果集群内一个或多个副本在同步和非同步状态切换,说明集群内部出现了问题,通常是java垃圾回收导致,不恰当的垃圾回收会造成几秒的停顿,使broker与zookeeper的连接断掉
存在一个滞后的同步副本也会拖慢消费
6.3 broker配置
复制系数
topic级别为replication.factor,broker级别为default.replication.factor
复制系数提高会占用系数倍的磁盘
不完全的首领选举
unclean.leader.election在broker级别配置,默认为true
leader副本挂掉的时候,有slave副本是同步的,一个同步的slave被选举为新的leader副本,在选举过程中没有丢失数据
如果没有同步的副本会有两种选择
- 不同步的slave副本成为leader副本,topic不能写入
- 不同步的slave副本成为leader副本,写入旧leader副本的数据丢失,生产者可以写入数据,但是偏移量最开始是丢失部分的偏移量,导致多个消费者读取的数据会有不一致
unclean.leader.election为true就是允许不同步的副本成为leader,也就是不完全的首领选举
看消息重要性来决定这个配置
最少同步副本
在broker和topic级别,参数为min.insync.replicas
kafka在消息写入所有同步副本才会认为消息提交,如果同步副本只有一个,挂掉就没有了,所以要确保提交的数据写入不止一个活跃副本,如果min.insync.replicas配置为2,三副本的情况下,有两个副本不可用,broker会停止接受生产者请求,生产者收到NotEnoughReplicasException异常,但是消费者仍能读取数据,必须要恢复为两个同步副本才可以
6.4 在可靠的系统使用生产者
- 如果有三个副本,并且禁用了不完全的首领选举,生产者设置ack为1,当leader写入成功,但是还没有同步到slave的时候,leader告知生产者之后崩溃,slave还没有同步但是认为自己是同步的,所以直接接管成为leader,消费者也不知道消息的丢失,在生产者角度就丢失数据了
- 生产者也需要对leader不可用的错误做正确的处理
6.4.1 发送确认
- acks=0,生产者将数据发送出去就认为消息写入kafka,如果分区离线等都不会受到任何错误
- acks=1,在进行选举的时候会受到异常LeaderNotAvailableException,需要进行重试处理
- acks=all,最保险但是效率最低
6.4.2 配置生产者的重试次数
生产者要处理两种问题
- 一个是就是重试能解决的broker的错误LeaderNotAvailableException
- 另一个是broker返回的INVALID_CONFIG
重试可能会因为网络问题写入两次相同消息的问题,可以通过在消息中加入唯一标识在消费端解决,实现幂等
6.4.3 额外的错误处理
- 消息大小错误,认证错误
- 序列化错误
- 重试达到次数或者消息占满内存上限
6.5 在可靠的系统使用消费者
从分区读取数据的时候,消费者会先获取一批消息,检查最大的偏移量,然后从这个偏移量开始读取另一批消息,保证正确的顺序获取新数据
如果一个消费者退出,另一个消费者需要知道从什么地方开始处理,这个偏移量需要消费者进行提交,但是又会有消费者提交了偏移量却没有处理数据的问题
6.5.1 消费者可靠性配置
- group.id
- auto.offset.reset 当请求的偏移量不存在的操作
- enable.auto.commit 消费者自动提交偏移量或者代码收到提交偏移量
- auto.commit.interval.ms 提交间隔
6.5.2 显示提交偏移量
- 在处理完事件后再提交偏移量
- 在提交频率和重复消息次数之间权衡
- 再均衡
- 消费者重试 可能消费者的后端会出现不能写入的问题
6.6 验证系统可靠性
6.6.1 配置验证
- 配置是否满足需求
- 了解系统的目的
在消费过程可以测试
- leader停掉,生产者和消费者恢复的时间
- 控制器重启,需要多久恢复
- 依次重启broker是否丢失数据
- 不完全选举测试
6.6.2 应用程序验证
确定kafka服务满足,在应用程序消费期望行为是否能满足
6.6.3 在生产环境监控
略
7 构建数据管道
7.1 构建数据管道需要考虑的点
- 及时性 kafka降低了生产者和消费者的时间敏感度
- 可靠性 避免单点故障能够在各种错误中恢复
- 高吞吐
- 数据格式
- 转换 ETL提取转换和加载
- 安全性 支持加密传输,支持SASL认证和授权
- 故障处理能力
- 耦合性和灵活性
7.2 Connect API和客户端API的选择
如果是从kafka到存储系统就使用Connect API即可
7.3 kafka Connect
略
7.4 Connect之外的选择
略
8 跨集群数据镜像
略
9 管理kafka
9.1 主题操作
使用kafka-topics.sh,指定--zookeeper参数即可
9.1.1 创建主题
$ ./kafka-topics.sh --create --zookeeper 192.168.121.15:2181/kafka --replication-factor 2 --partitions 1 -topic test
- 如果指定了机架信息会分布在不同的机架,禁用需要使用
--disable-rack-aware
- 在自动化系统可以使用
--if-not-exist
,如果存在也不会报错
9.1.2 增加分区
如果增加消费者可能就需要增加分区,如果是基于键进行分区,映射也会发生变化
$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --alter --partitions 1 -topic test
分区无法减少的,因为删除了数据也跟着删除了,只能是删除主题重建
9.1.3 删除主题
broker的配置delete.topic.enable为true才能进行删除
$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --delete -topic test
9.1.4 列出集群中的所有主题
$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --list
9.1.5 列出主题详细信息
$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --describe
- 如果指定--topic则会列出当前topic的详细信息
- 使用--topics-with-overrides可以获取包含覆盖配置的主题,没懂
- 使用--under-replicated-partition列出所有包含不同步副本的分区
- 使用--unavailable-partitions列出没有leader的分区
9.2 消费者群组
使用kafka-consumer-groups.sh,对于旧版需要指定--zookeeper参数,新版需要指定--bootstrap-server
9.2.1 列出并描述群组
旧版
$ ./kafka-consumer-groups.sh --zookeeper 192.168.121.15:2181/kafka --list
新版
$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092/kafka --list
可以用--describe获取详细信息,使用--group获取指定的group信息
字段含义
- group
- topic 被消费的主题
- partition 被读取的分区
- current-offset 消费者最近提交的偏移量,消费者当前读取的位置
- log-end-offset 消费者最近读取消息的偏移量
- lag 消费者current-offset与broker的log-end-offset之间的差距
- owner 消费者id
9.2.2 删除群组
使用--delete
指定--group
9.2.3 偏移量管理
包含导出导入偏移量
9.3 动态配置变更
用于覆盖主题配置和client配置,使用kafka-config.sh
9.3.1 覆盖主体默认配置
$ ./kafka-config.sh --zookeeper 192.168.121.15:2181/kafka --alter --entity-type topic --entity-name <topic name> --add-config <key>=<value>[,<key>=<value>]
可配置的参数
配置项 | 描述 |
---|---|
cleanup.policy | 如果设置为compact,只有包含最新key的消息才会被保留,其余的丢弃 |
compression.type | broker写入磁盘的压缩格式,支持gzip,snappy和lz4 |
delete.retention.ms | 被标识为待删除数据能够保留 |
file.delete.delay.ms | 从磁盘上删除日志片段和索引之前可以保留多久 |
flush.message | 需要收到多少消息才刷新到磁盘 |
flush.ms | 将消息刷新到磁盘的等待时间 |
index.interval.bytes | 日志片段的两个索引之间能够容纳的字节数 |
max.message.bytes | 最大消息字节数 |
message.format.version | 消息格式化版本 |
message.timestamp.difference.max.ms | 消息的时间戳和broker收到时间的最大间隔,在message.timestamp.type为CreateTime生效 |
message.timestamp.type | 时间戳类型,CreateTime是客户端指定,LogAppendTime消息写入broker的时间 |
min.insync.replicas | 可用分区的最少同步副本 |
preallocate | 如果为true,为新的日志预留分配空间 |
retention.bytes | topic保留消息字节数 |
retention.ms | topic保留消息时间 |
segment.bytes | 日志片段的消息字节数 |
segment.index.bytes | 单个日志片段的最大字节数 |
segment.jitter.ms | segment.ms基础上的滚动的随机毫秒数 |
segment.ms | 日志片段滚动时间 |
unclean.leader.election.enable | 如果为true,非完全选举生效 |
9.3.2 覆盖客户端默认配置
将--entity-type
设置为client
可配置的参数
配置项 | 描述 |
---|---|
producter_bytes_rate |
单个生产者每秒可以发往broker字节数 |
consumer_bytes_rate |
单个消费者每秒可以从broker读取的字节数 |
9.3.3 列出被覆盖的配置
$ ./kafka-config.sh --zookeeper 192.168.121.15:2181/kafka --describe --entity-type topic --entity-name <topic name>
9.3.4 移除被覆盖的配置
$ ./kafka-config.sh --zookeeper 192.168.121.15:2181/kafka --alter --entity-type topic --entity-name <topic name> --delete-config <key>
9.4 分区管理
9.4.1 首选的首领选举
为啥要这么做?优化集群leader吗?
9.4.2 修改分区副本
9.4.3 修改复制系数
也是需要通过json的方式
{
"partitions": [
{
"topic": "<topic_name>",
"partition": 0,
"replicas": [
1,
2
]
}
],
"version": 1
}
9.4.4 转储日志片段
略
9.4.5 副本验证
使用kafka-replica-verification.sh
,会从分区的副本上获取消息,检查副本是否具有相同的消息,如果不提供主题名称,就会验证所有主题,并且需要显示的指定主题
$ ./kafka-replica-verification.sh --broker-list 192.168.121.21:9092,192.168.121.115:9092,192.168.121.18:9092,192.168.121.92:9092,192.168.121.29:9092 --topic-white-list 'test*'
9.5 消费和生产
9.5.1 控制台消费者
从头读取
$ ./kafka-console-consumer.sh --zookeeper 192.168.121.15:2181/kafka --topic test --from-beginning
- --formatter指定消息格式化器的类名,用于解码消息
- --max-messages NUM 退出之前最多读取的消息数
- --partition NUM 指定读取ID为NUM的分区
还有一些消费格式化器的选项
略
9.5.2 控制台生产者
$ ./kafka-console-producer.sh --broker-list 192.168.121.21:9092,192.168.121.115:9092,192.168.121.18:9092,192.168.121.92:9092,192.168.121.29:9092 --topic test
- --sync 以同步的方式
- --compression-codec STRING 压缩类型
10 监控kafka
10.1 度量指标基础
10.1.1 度量指标在哪里
zk的/brokers/ids/<ID>
包含hostname和jmx_port
作为ELK使用kafka不是过多关心这个
11 流式处理
略