0%

Kafka

术语

Topic

主题,承载消息的逻辑容器,在实际使用中用于区分业务类型。

Record

消息,Kafka处理的主要对象

Partition

分区,每个topic包含一个或多个partition,是一个有序的消息序列。物理上,每个partition对应一个文件夹,该文件夹下存储该partition的数据和索引文件。每条消息被发送到broker之前,会根据分区规则选择存储到哪个分区。

Offset

消息位移,表示分区中每条消息的位置信息,是一个单调递增且不变的值。offset是消息在分区中的唯一标识,kafka通过offset来保证消息在分区中的顺序性,kafka保证的是分区有序而不是主题有序。

image-20190819170404800

几种重要的offset:

  • LogStartOffset,第一条消息的offset,等于0
  • HW(High Watermark),高水位,标识特定的offset,消费者只能拉取这个offset之前的消息
  • LEO(Log End Offset)标识当前日志文件下一条待写入消息的offset,等于当前日志分区最后一条消息offset加1

分区ISR集合中的每个副本都会维护自身的LEO,ISR集合中最小的LEO即为分区的HW。Kafka的复制机制既不是同步复制也不是异步复制,这种ISR的方式有效权衡了数据可靠性和性能之间的关系。

Replica

副本,同一条消息被拷贝到多个地方以提供数据冗余,副本分为leader replica(领导者副本)和follower replica(追随者副本)。副本是在分区层级下的,即每个分区可以配置多个副本实现高可用。生产者和消费者只和leader副本交互,follower副本只负责消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。

分区中的所有副本统称为AR(AssignedReplicas)。所有与leader副本保持一定程度同步的副本(包括leader副本在内〕组成ISROn-SyncReplicas),ISR集合是AR集合中的一个子集。消息会先发送到lead巳r副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。“一定程度的同步”是指可忍受的滞后范围,这个范围可以通过参数进行配置。与leader副本同步滞后过多的副本(不包括leader副本)组成OSR(Out-of-SyncReplicas),由此可见,AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISR,OSR集合为空。

leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本“追上”了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会(不过这个原则也可以通过修改相应的参数配置来改变)。

Producer

生产者,向主题发布新消息的应用程序

Consumer

消费者,从主题订阅新消息的应用程序。kafka consumer使用拉(pull)模式从服务端拉取消息,并保存消费的具体位置。消费者宕机恢复后可根据之前保存的消费位置重新拉取需要的消息进行消费以避免消息丢失。

Consumer Offset

消费者消费进度,每个消费者都有自己的消费者位移

Consumer Group

多个消费者实例组成一个组,同时消费多个分区以实现高吞吐

Rebalance

重平衡,消费者组内某个消费者实例挂掉后,其它消费者实例自动重新分配订阅主题分区的过程,是Kafka消费端实现高可用的重要手段

Broker

服务代理节点,大多数情况下,broker可以看作一台kafka服务器,前提是这台服务器上只部署了一个kafka实例

架构

Kafka通过ZooKeeper对元数据进行管理,包括集群、broker、主题、分区等内容。

kafka architecture 架构

生产者

1
2
3
4
5
6
7
8
9
10
11
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

KafkaProducer是线程安全的,在多个线程中使用同一个producer会有更高的性能。

send本身就是异步的,有两个重载的方法,返回的Future对象可以使调用方稍后获得发送的结果。发送消息一般可能发生两种异常:可重试异常和不可重试异常。常见的可重试异常有NetworkException、LeaderNotAvailableException、UnkonwnTopicOrPartitionException、NotEnoughRepicaException。常见的不可重试异常如RecordTooLargeException。

1
2
Future<RecordMetadata> send(ProducerRecord<K, V> record);
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

对于可重试异常,如果配置了retries参数,只要在规定重试次数内自行恢复就不会抛出异常。默认值是0,配置方式如下:

1
props.put(ProducerConfg.RETRIES_CONFIG, 3 ) ;

send方法将消息发送给broker时,有可能经过拦截器(Interceptor)、序列化器(Serializer)和分区器(Partitioner)后才真正发往broker。

image-20190819220846252

拦截器

生产者拦截器可以用来在消息发送前做一些准备工作,如按照某个规则过滤不符合要求的消息、修改消息内容等,也可以做一些定制化的需求,如统计类工作。KafkaProducer不仅可以指定一个拦截器,还可以指定多个拦截器组成拦截链。拦截链中,如果某个拦截器执行失败,下一个拦截器会接着从上一个执行成功的拦截器继续执行。

序列化器

生产者发送数据时需要通过序列化把对象转换成字节数组,消费者消费消息时要使用与生产者序列化器兼容的反序列化器将字节数组转化为对象。如果Kafka客户端提供的几种序列化器都无法满足需求,可以选择使用Avro、JSON、Thrift、ProtoBuf等通用的序列化工具来实现,或者使用自定义的序列化器实现。

分区器

分区器的作用是为消息分配分区,如果消息ProducerRecord中没有指定partition字段,producer将根据key来计算partition的值。如果key不为null,默认的分区器对key进行hash(采用MurmurHash2算法),相同key对应的消息会被写入同一个分区。如果key为null,消息将会以轮询的方式发往主题内各个可用分区。

消息收集器

生产者客户端由两个线程协调运行:主线程和Sender线程。在主线程中创建消息、通过可能的拦截器、序列化器和分区器后缓存到消息累加器(RecordAccumulator,也称作消息收集器)中。Sender线程从RecordAccumulator中获取消息并将其发送到Kafka中。

RecordAccumulator主要用来缓存消息以便于Sender线程可以批量发送,进而减少网络传输资源的消耗以提升性能。缓存大小可以通过生产者客户端参数buffer.memory配置,默认值为32MB。如果生产者发送消息的速度超过发送到服务器的速度,会导致生产者空间不足,send()方法要么被阻塞要么抛出异常。该特性由参数max.block.ms配置,默认值为60s。

RecordAccumulator内部为每个分区都维护了一个双端队列,消息写入缓存时,追加到双端队列的尾部;sender读取消息时,从双端队列的头部读取。InFlightRequests会缓存已经发出去但还没收到响应的请求,并通过还未确认的请求计算负载最小的Node(leastLoadedNode)。

元数据更新

元数据具体记录了集群中有哪些主题,这些主题有哪些分区,每个分区的lead巳r副本分配在哪个节点上,follower副本分配在哪些节点上,哪些副本在AR、ISR等集合中,集群中有哪些节点,控制器节点又是哪一个等信息。

当客户端中没有需要使用的元数据,如没有指定主题信息或超过metadata.max.age.ms(默认5分钟)时间没有更新元数据都会引起元数据的更新操作。元数据更新操作在客户端内部进行,对客户端外部使用者不可见。更新元数据时,先挑选出leastLoadedNode,向这个Node发送MetadataRequest请求获取元数据信息。这个请求由Sender线程发起,主线程也需要读取这些信息,它们之间通过synchronized和final关键字来保证。

消费者

每个消费者(Consumer)对应一个消费组(Consumer Group),可以通过增加/减少消费者个数来提高/降低整体消费能力。当消费者个数大于分区个数时,会有消费者分配不到任何分区。

消息中间件一般有两种消息投递模式:点对点模式(P2P,Point-to-Point)模式和发布/订阅(Pub/Sub)模式。发布/订阅模式在一对多广播时采用。Kafka同时支持两种消息投递模式,当消费者都属于不同的消费组时,所有消息被广播给所有消费者,每条消息都会被所有消费者处理,相当于发布订阅模式;当所有消费者都属于同一个消费组时,每个消息只会被一个消费者处理,相当于点对点模式。

参考资料

《深入理解Kafka:核心设计与实践原理》