← 返回首页
存储策略和容错机制
发表时间:2023-10-29 14:29:17
存储策略和容错机制

存储策略和容错机制。

1.存储策略

在kafka中每个topic包含1到多个partition,每个partition存储一部分Message。每条Message包含三个属性,其中有一个是offset。

offset相当于partition中这个message的唯一id,那么如何通过id高效的找到message?

两大法宝:分段+索引,kafak中数据的存储方式是这样的: 1. 每个partition由多个segment【片段】组成,每个segment中存储多条消息。 2. 每个partition在内存中对应一个index,记录每个segment中的第一条消息偏移量。

Kafka中数据的存储流程是这样的:生产者生产的消息会被发送到topic的多个partition上,topic收到消息后往对应partition的最后一个segment上添加该消息,segment达到一定的大小后会创建新的segment。

来看这个图,可以认为是针对topic中某个partition的描述

2.容错机制

1).当Kafka集群中的一个Broker节点宕机,会出现什么现象?

我们使用kill -9 杀掉master中的broker进程测试

[root@master kafka]# jps
30502 Jps
16361 QuorumPeerMain
17118 Kafka
[root@master kafka]# kill -9 17118

我们可以先通过zookeeper来查看一下,因为当kafka集群中的broker节点启动之后,会自动向zookeeper中进行注册,保存当前节点信息。

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[cluster, controller_epoch, controller, brokers, zookeeper, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config, hbase]
[zk: localhost:2181(CONNECTED) 1] ls /brokers/ids
[1, 2]

[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://slave1:9092"],"jmx_port":-1,"host":"slave1","timestamp":"1698675025841","port":9092,"version":4}
cZxid = 0x400000021
ctime = Mon Oct 30 22:10:25 CST 2023
mZxid = 0x400000021
mtime = Mon Oct 30 22:10:25 CST 2023
pZxid = 0x400000021
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0xc643d0000
dataLength = 182
numChildren = 0
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/2
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://slave2:9092"],"jmx_port":-1,"host":"slave2","timestamp":"1698675028579","port":9092,"version":4}
cZxid = 0x400000031
ctime = Mon Oct 30 22:10:28 CST 2023
mZxid = 0x400000031
mtime = Mon Oct 30 22:10:28 CST 2023
pZxid = 0x400000031
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x200000c235d0000
dataLength = 182
numChildren = 0

此时发现zookeeper的/brokers/ids下面只有2个节点信息, 0号broker(master主机的broker) 已经删除。

然后再使用describe查询topic的详细信息,会发现此时的分区的leader全部变成了目前存活的另外两个节点。

[root@master kafka]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic mytopic1
Topic: mytopic1 PartitionCount: 5       ReplicationFactor: 2    Configs: 
        Topic: mytopic1 Partition: 0    Leader: 1       Replicas: 0,1   Isr: 1
        Topic: mytopic1 Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: mytopic1 Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2
        Topic: mytopic1 Partition: 3    Leader: 2       Replicas: 0,2   Isr: 2
        Topic: mytopic1 Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1

此时可以发现Isr中的内容和Replicas中的不一样了,因为Isr中显示的是目前正常运行的节点 所以当Kafka集群中的一个Broker节点宕机之后,对整个集群而言没有什么特别的大影响,此时集群会给partition重新选出来一些新的Leader节点。

2).当Kafka集群中新增一个Broker节点,会出现什么现象?

新加入一个broker节点,zookeeper会自动识别并在适当的机会选择此节点提供服务,再次启动master节点中的broker进程测试。

[root@master ~]# cd $KAFKA_HOME
[root@master kafka]# bin/kafka-server-start.sh -daemon  config/server.properties

此时到zookeeper中查看一下。

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0, 1, 2]

发现broker.id为0的这个节点信息也有了,在通过describe查看topic的描述信息,Isr中的信息和Replicas中的内容是一样的了。

[root@master kafka]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic mytopic1
Topic: mytopic1 PartitionCount: 5       ReplicationFactor: 2    Configs: 
        Topic: mytopic1 Partition: 0    Leader: 1       Replicas: 0,1   Isr: 1,0
        Topic: mytopic1 Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: mytopic1 Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: mytopic1 Partition: 3    Leader: 2       Replicas: 0,2   Isr: 2,0
        Topic: mytopic1 Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1,0

但是有个问题:发现新启动的这个节点不会是任何分区的leader?怎么重新均匀分配呢?

有以下两种方案:

1).Broker中的自动均衡策略(默认已经有)

auto.leader.rebalance.enable=true
leader.imbalance.check.interval.seconds 默认值:300

2).手动执行均衡策略

[root@master kafka]# bin/kafka-leader-election.sh --bootstrap-server master:9092 --election-type preferred --all-topic-partitions

[root@master kafka]# bin/kafka-topics.sh --describe --zookeeper master:2181 --topic mytopic1
Topic: mytopic1 PartitionCount: 5       ReplicationFactor: 2    Configs: 
        Topic: mytopic1 Partition: 0    Leader: 0       Replicas: 0,1   Isr: 1,0
        Topic: mytopic1 Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2
        Topic: mytopic1 Partition: 2    Leader: 2       Replicas: 2,0   Isr: 2,0
        Topic: mytopic1 Partition: 3    Leader: 0       Replicas: 0,2   Isr: 2,0
        Topic: mytopic1 Partition: 4    Leader: 1       Replicas: 1,0   Isr: 1,0

执行后我们发现leader就实现了均匀分配。