kafka权威指南6~11

时间:Sept. 20, 2019 分类:

目录:

6 可靠的数据传递

6.1 可靠性保证

ACID是关系型数据库普遍支持的标准可靠性标准,指的原子性,一致性,隔离性和持久性

kafka可以保证的功能

  • 分区消息的顺序,如果同一个生产者往同一个分区写入消息,而且保证后写入的消息的偏移量大于之前写入的消息
  • 当消息被写入分区的所有同步副本时,被认为已提交,生产者可以选择接受不同类型的确认
  • 只要有一个副本是活跃的,那么提交的消息就不会丢失
  • 消费者只会消费已提交的消息

6.2 复制

分区有多个副本,消息会发送到leader副本,然后其他slave副本保持与其同步

slave被认为是活跃的,需要满足

  • 与zookeeper有一个活跃会话,就是在6s内有发送过心跳
  • 在过去10s内从首领获取过最新消息,并且是0延迟

如果不满足就被认为是非同步。如果集群内一个或多个副本在同步和非同步状态切换,说明集群内部出现了问题,通常是java垃圾回收导致,不恰当的垃圾回收会造成几秒的停顿,使broker与zookeeper的连接断掉

存在一个滞后的同步副本也会拖慢消费

6.3 broker配置

复制系数

topic级别为replication.factor,broker级别为default.replication.factor

复制系数提高会占用系数倍的磁盘

不完全的首领选举

unclean.leader.election在broker级别配置,默认为true

leader副本挂掉的时候,有slave副本是同步的,一个同步的slave被选举为新的leader副本,在选举过程中没有丢失数据

如果没有同步的副本会有两种选择

  • 不同步的slave副本成为leader副本,topic不能写入
  • 不同步的slave副本成为leader副本,写入旧leader副本的数据丢失,生产者可以写入数据,但是偏移量最开始是丢失部分的偏移量,导致多个消费者读取的数据会有不一致

unclean.leader.election为true就是允许不同步的副本成为leader,也就是不完全的首领选举

看消息重要性来决定这个配置

最少同步副本

在broker和topic级别,参数为min.insync.replicas

kafka在消息写入所有同步副本才会认为消息提交,如果同步副本只有一个,挂掉就没有了,所以要确保提交的数据写入不止一个活跃副本,如果min.insync.replicas配置为2,三副本的情况下,有两个副本不可用,broker会停止接受生产者请求,生产者收到NotEnoughReplicasException异常,但是消费者仍能读取数据,必须要恢复为两个同步副本才可以

6.4 在可靠的系统使用生产者

  • 如果有三个副本,并且禁用了不完全的首领选举,生产者设置ack为1,当leader写入成功,但是还没有同步到slave的时候,leader告知生产者之后崩溃,slave还没有同步但是认为自己是同步的,所以直接接管成为leader,消费者也不知道消息的丢失,在生产者角度就丢失数据了
  • 生产者也需要对leader不可用的错误做正确的处理

6.4.1 发送确认

  • acks=0,生产者将数据发送出去就认为消息写入kafka,如果分区离线等都不会受到任何错误
  • acks=1,在进行选举的时候会受到异常LeaderNotAvailableException,需要进行重试处理
  • acks=all,最保险但是效率最低

6.4.2 配置生产者的重试次数

生产者要处理两种问题

  • 一个是就是重试能解决的broker的错误LeaderNotAvailableException
  • 另一个是broker返回的INVALID_CONFIG

重试可能会因为网络问题写入两次相同消息的问题,可以通过在消息中加入唯一标识在消费端解决,实现幂等

6.4.3 额外的错误处理

  1. 消息大小错误,认证错误
  2. 序列化错误
  3. 重试达到次数或者消息占满内存上限

6.5 在可靠的系统使用消费者

从分区读取数据的时候,消费者会先获取一批消息,检查最大的偏移量,然后从这个偏移量开始读取另一批消息,保证正确的顺序获取新数据

如果一个消费者退出,另一个消费者需要知道从什么地方开始处理,这个偏移量需要消费者进行提交,但是又会有消费者提交了偏移量却没有处理数据的问题

6.5.1 消费者可靠性配置

  1. group.id
  2. auto.offset.reset 当请求的偏移量不存在的操作
  3. enable.auto.commit 消费者自动提交偏移量或者代码收到提交偏移量
  4. auto.commit.interval.ms 提交间隔

6.5.2 显示提交偏移量

  1. 在处理完事件后再提交偏移量
  2. 在提交频率和重复消息次数之间权衡
  3. 再均衡
  4. 消费者重试 可能消费者的后端会出现不能写入的问题

6.6 验证系统可靠性

6.6.1 配置验证

  • 配置是否满足需求
  • 了解系统的目的

在消费过程可以测试

  • leader停掉,生产者和消费者恢复的时间
  • 控制器重启,需要多久恢复
  • 依次重启broker是否丢失数据
  • 不完全选举测试

6.6.2 应用程序验证

确定kafka服务满足,在应用程序消费期望行为是否能满足

6.6.3 在生产环境监控

7 构建数据管道

7.1 构建数据管道需要考虑的点

  1. 及时性 kafka降低了生产者和消费者的时间敏感度
  2. 可靠性 避免单点故障能够在各种错误中恢复
  3. 高吞吐
  4. 数据格式
  5. 转换 ETL提取转换和加载
  6. 安全性 支持加密传输,支持SASL认证和授权
  7. 故障处理能力
  8. 耦合性和灵活性

7.2 Connect API和客户端API的选择

如果是从kafka到存储系统就使用Connect API即可

7.3 kafka Connect

7.4 Connect之外的选择

8 跨集群数据镜像

9 管理kafka

9.1 主题操作

使用kafka-topics.sh,指定--zookeeper参数即可

9.1.1 创建主题

$ ./kafka-topics.sh --create --zookeeper 192.168.121.15:2181/kafka --replication-factor 2 --partitions 1 -topic test
  • 如果指定了机架信息会分布在不同的机架,禁用需要使用--disable-rack-aware
  • 在自动化系统可以使用--if-not-exist,如果存在也不会报错

9.1.2 增加分区

如果增加消费者可能就需要增加分区,如果是基于键进行分区,映射也会发生变化

$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --alter --partitions 1 -topic test

分区无法减少的,因为删除了数据也跟着删除了,只能是删除主题重建

9.1.3 删除主题

broker的配置delete.topic.enable为true才能进行删除

$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --delete -topic test

9.1.4 列出集群中的所有主题

$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --list 

9.1.5 列出主题详细信息

$ ./kafka-topics.sh --zookeeper 192.168.121.15:2181/kafka --describe
  • 如果指定--topic则会列出当前topic的详细信息
  • 使用--topics-with-overrides可以获取包含覆盖配置的主题,没懂
  • 使用--under-replicated-partition列出所有包含不同步副本的分区
  • 使用--unavailable-partitions列出没有leader的分区

9.2 消费者群组

使用kafka-consumer-groups.sh,对于旧版需要指定--zookeeper参数,新版需要指定--bootstrap-server

9.2.1 列出并描述群组

旧版

$ ./kafka-consumer-groups.sh --zookeeper 192.168.121.15:2181/kafka --list

新版

$ ./kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092/kafka --list

可以用--describe获取详细信息,使用--group获取指定的group信息

字段含义

  • group
  • topic 被消费的主题
  • partition 被读取的分区
  • current-offset 消费者最近提交的偏移量,消费者当前读取的位置
  • log-end-offset 消费者最近读取消息的偏移量
  • lag 消费者current-offset与broker的log-end-offset之间的差距
  • owner 消费者id

9.2.2 删除群组

使用--delete指定--group

9.2.3 偏移量管理

包含导出导入偏移量

9.3 动态配置变更

用于覆盖主题配置和client配置,使用kafka-config.sh

9.3.1 覆盖主体默认配置

$ ./kafka-config.sh --zookeeper 192.168.121.15:2181/kafka --alter --entity-type topic --entity-name <topic name> --add-config <key>=<value>[,<key>=<value>]

可配置的参数

配置项 描述
cleanup.policy 如果设置为compact,只有包含最新key的消息才会被保留,其余的丢弃
compression.type broker写入磁盘的压缩格式,支持gzip,snappy和lz4
delete.retention.ms 被标识为待删除数据能够保留
file.delete.delay.ms 从磁盘上删除日志片段和索引之前可以保留多久
flush.message 需要收到多少消息才刷新到磁盘
flush.ms 将消息刷新到磁盘的等待时间
index.interval.bytes 日志片段的两个索引之间能够容纳的字节数
max.message.bytes 最大消息字节数
message.format.version 消息格式化版本
message.timestamp.difference.max.ms 消息的时间戳和broker收到时间的最大间隔,在message.timestamp.type为CreateTime生效
message.timestamp.type 时间戳类型,CreateTime是客户端指定,LogAppendTime消息写入broker的时间
min.insync.replicas 可用分区的最少同步副本
preallocate 如果为true,为新的日志预留分配空间
retention.bytes topic保留消息字节数
retention.ms topic保留消息时间
segment.bytes 日志片段的消息字节数
segment.index.bytes 单个日志片段的最大字节数
segment.jitter.ms segment.ms基础上的滚动的随机毫秒数
segment.ms 日志片段滚动时间
unclean.leader.election.enable 如果为true,非完全选举生效

9.3.2 覆盖客户端默认配置

--entity-type设置为client

可配置的参数

配置项 描述
producter_bytes_rate 单个生产者每秒可以发往broker字节数
consumer_bytes_rate 单个消费者每秒可以从broker读取的字节数

9.3.3 列出被覆盖的配置

$ ./kafka-config.sh --zookeeper 192.168.121.15:2181/kafka --describe --entity-type topic --entity-name <topic name> 

9.3.4 移除被覆盖的配置

$ ./kafka-config.sh --zookeeper 192.168.121.15:2181/kafka --alter --entity-type topic --entity-name <topic name> --delete-config <key>

9.4 分区管理

9.4.1 首选的首领选举

为啥要这么做?优化集群leader吗?

9.4.2 修改分区副本

参考kafka节点迁移

9.4.3 修改复制系数

也是需要通过json的方式

{
    "partitions": [
        {
            "topic": "<topic_name>",
            "partition": 0,
            "replicas": [
                1,
                2
            ]
        }
    ],
    "version": 1
}

9.4.4 转储日志片段

9.4.5 副本验证

使用kafka-replica-verification.sh,会从分区的副本上获取消息,检查副本是否具有相同的消息,如果不提供主题名称,就会验证所有主题,并且需要显示的指定主题

$ ./kafka-replica-verification.sh --broker-list 192.168.121.21:9092,192.168.121.115:9092,192.168.121.18:9092,192.168.121.92:9092,192.168.121.29:9092 --topic-white-list 'test*'

9.5 消费和生产

9.5.1 控制台消费者

从头读取

$ ./kafka-console-consumer.sh --zookeeper 192.168.121.15:2181/kafka --topic test --from-beginning
  • --formatter指定消息格式化器的类名,用于解码消息
  • --max-messages NUM 退出之前最多读取的消息数
  • --partition NUM 指定读取ID为NUM的分区

还有一些消费格式化器的选项

9.5.2 控制台生产者

$ ./kafka-console-producer.sh --broker-list 192.168.121.21:9092,192.168.121.115:9092,192.168.121.18:9092,192.168.121.92:9092,192.168.121.29:9092 --topic test
  • --sync 以同步的方式
  • --compression-codec STRING 压缩类型

10 监控kafka

10.1 度量指标基础

10.1.1 度量指标在哪里

zk的/brokers/ids/<ID>包含hostname和jmx_port

作为ELK使用kafka不是过多关心这个

11 流式处理