Kafka-简介(一)

一、Kafka背景及架构

Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。目前越来越多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。InfoQ一直在紧密关注Kafka的应用以及发展,“Kafka剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。

Kafka创建背景

Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。现在它已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

活动流数据是几乎所有站点在对其网站使用情况做报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索情况等内容。这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。

近年来,活动和运营数据处理已经成为了网站软件产品特性中一个至关重要的组成部分,这就需要一套稍微更加复杂的基础设施对其提供支持。

Kafka简介

kafka_introduction

  • 发布-订阅消息系统
  • 数据流平台
  • 存储

Kafka作为发布/订阅的消息中间件时,主要设计目标如下:

  • 支持单节点以及集群通信的功能;
  • 支持一对一、一对多、多对一、多对多通信模式;
  • 支持消息持久化,可以定期清除;
  • 支持消息重新消费;
  • 支持生产者与消费者的动态增加,支持服务端动态扩展,支持多种语言客户端;
  • 支持消息负载均衡;
  • 以时间复杂度为O(1)的方式提供消息持久化能力;
  • 对TB级以上数据也能保证常数时间复杂度的访问性能;
  • 极致的高性能与高吞吐量;
  • 支持消息分区及分布式消费。

Kafka作为数据流平台和存储时,主要设计目标如下:

  • 支持高效实时的数据流;
  • 支持离线数据处理和实时数据处理。
  • 支持与分布式大数据流处理系统如Hadoop,storm,spark,flink集成;
  • 支持高可靠的分布式数据备份集群存储模式。

kafka_streams

Spark+Flume+Kafka整合的系统中:

  • Flume相当于Producer,收集分布式日志数据,并且sink或者push到Kafka Cluster中
  • Spark相当于Consumer,Pull主动拉取Kafka中的数据,并且通过Spark Streaming等框架来处理数据

为何使用消息系统

  • 解耦

    在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

  • 冗余

    有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

  • 扩展性

    因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

  • 灵活性 & 峰值处理能力

    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

  • 可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

  • 顺序保证

    在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

  • 缓冲

    在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

  • 异步通信

    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

常用Message Queue对比

  • RabbitMQ

    RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

  • Redis

    Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

  • ZeroMQ

    ZeroMQ号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZeroMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演这个服务器角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果宕机,数据将会丢失。其中,Twitter的Storm 0.9.0以前的版本中默认使用ZeroMQ作为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty作为传输模块)。

  • ActiveMQ

    ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。

  • Kafka/Jafka

    Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

Kafka架构

  • Broker

    Kafka集群包含一个或多个服务器,这种服务器被称为broker

  • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

  • Partition

    Parition是物理上的概念,每个Topic包含一个或多个Partition.

  • Producer

    负责发布消息到Kafka broker

  • Consumer

    消息消费者,向Kafka broker读取消息的客户端。

  • Consumer Group

    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Kafka拓扑结构

Kafka_architecture

如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

Topic & Partition

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。若创建topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),如下图所示。

Kafka_Topic_Partition

每个日志文件都是一个log entrie序列,每个log entrie包含一个4字节整型数值(值为N+5),1个字节的”magic value”,4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下唯一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式如下:

message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes

这个log entries并非由一个文件构成,而是分成多个segment,每个segment以该segment第一条消息的offset命名并以“.kafka”为后缀。另外会有一个索引文件,它标明了每个segment下包含的log entry的offset范围,如下图所示。

Kafka_Log_Segment日志分段

因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。

Kafka_log_new_msg_write

对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置如下所示。

1
2
3
4
5
6
7
8
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according to the retention policies
log.retention.check.interval.ms=300000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

这里要注意,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每一个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常情况下Consumer会在消费完一条消息后递增该offset。当然,Consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。

Producer消息路由

Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置合理,所有消息可以均匀分布到不同的Partition里,这样就实现了负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。可以在$KAFKA_HOME/config/server.properties中通过配置项num.partitions来指定新建Topic的默认Partition数量,也可在创建Topic时通过参数指定,同时也可以在Topic创建之后通过Kafka提供的工具修改。

在发送一条消息时,可以指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪个Parition。Paritition机制可以通过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中如果key可以被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每个Parition都会有个序号,序号从0开始)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class JasonPartitioner<T> implements Partitioner {

public JasonPartitioner(VerifiableProperties verifiableProperties) {}

@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}

如果将上例中的类作为partition.class,并通过如下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。

1
2
3
4
5
6
7
8
9
10
public void sendMessage() throws InterruptedException{
  for(int i = 1; i <= 5; i++){
   List messageList = new ArrayList<KeyedMessage<String, String>>();
   for(int j = 0; j < 4; j++){
   messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
   }
   producer.send(messageList);
}
  producer.close();
}

则key相同的消息会被发送并存储到同一个partition里,而且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是通过Java程序调用Consumer后打印出的消息列表。

Kafka_message_key

Consumer Group

(本节所有描述都是基于Consumer hight level API而非low level API)。

使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

Kafka_consumer_group

这是Kafka用来实现一个Topic消息的广播(发给所有的Consumer)和单播(发给某一个Consumer)的手段。一个Topic可以对应多个Consumer Group。如果需要实现广播,只要每个Consumer有一个独立的Group就可以了。要实现单播只要所有的Consumer在同一个Group里。用Consumer Group还可以将Consumer进行自由的分组而不需要多次发送消息到不同的Topic。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可以使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还可以同时将数据实时备份到另一个数据中心,只需要保证这三个操作所使用的Consumer属于不同的Consumer Group即可。

下面这个例子更清晰地展示了Kafka Consumer Group的特性。首先创建一个Topic (名为topic1,包含3个Partition),然后创建一个属于group1的Consumer实例,并创建三个属于group2的Consumer实例,最后通过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了所有的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。如下图所示。

Kafka_consumer_group_example

Push vs. Pull

作为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,比如Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。

push模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据Consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适。

  • pull模式可简化broker的设计
  • Consumer可自主控制消费消息的速率
  • Consumer可以自己控制消费方式——即可批量消费也可逐条消费,从指定partition或位置(offset)开始消费等

Kafka delivery guarantee

有这么几种可能的delivery guarantee:

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次,很多时候这是用户所想要的。

当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。但是如果Producer发送数据给broker后,遇到网络问题而造成通信中断,那Producer就无法判断该条消息是否已经commit。虽然Kafka无法确定网络故障期间发生了什么,但是Producer可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有希望在Kafka未来的版本中实现。(所以目前默认情况下一条消息从Producer到broker是确保了At least once,可通过设置Producer异步发送实现At most once)。

接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,可以选择commit,该操作会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit之后的开始位置相同。当然可以将Consumer设置为autocommit,即Consumer一旦读到数据立即自动commit。如果只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并非在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。

读完消息先commit再处理消息。这种模式下,如果Consumer在commit后还没来得及处理消息就crash了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于At most once

读完消息先处理再commit。这种模式下,如果在处理完消息之后commit之前Consumer crash了,下次重新开始工作时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在很多使用场景下,消息都有一个主键,所以消息的处理往往具有幂等性,即多次处理这一条消息跟只处理一次是等效的,那就可以认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka本身提供的机制,主键本身也并不能完全保证操作的幂等性。而且实际上我们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,因为处理方式多种多样,我们不应该把处理过程的特性——如是否幂等性,当成Kafka本身的Feature)

如果一定要做到Exactly once,就需要协调offset和实际操作的输出。精典的做法是引入两阶段提交。如果能让offset和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,Consumer拿到数据后可能把数据放到HDFS,如果把最新的offset和数据本身一起写到HDFS,那就可以保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,无法存于HDFS,而low level API的offset是由自己去维护的,可以将之存于HDFS中)

总之,Kafka默认保证At least once,并且允许通过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协作,幸运的是Kafka提供的offset可以非常直接非常容易得使用这种方式。

有哪些环节会造成消息的重复消费?如果避免不了,如何去减少重复?

  • producer端重复发送
    producer端因发送超时等等原因做重试操作,目前broker端做重复请求的判断还是很难的,目前kafka也没有去做,而是存储完消息之后,如果开启了Log compaction,它会通过kafka消息中的key来判定是否是重复消息,是的话则会删除。

  • consumer消费后,未及时提交消费的offset便挂了,下次恢复后就会重复消费

    这个目前来说并没有通用的解决办法,先消费后提交offset可能会重复,先提交offset后消费可能造成消息丢失,所以一般还是优先保证消息不丢,在业务上去做去重判断。

二、Kafka性能

LinkedIn官方性能测试数据。

如下图,随着数据的持续写入,集群吞吐量呈现稳定趋势,并不会出现较大的吞吐量波动。
benchmark_recordsize_throughput

较小的消息是消息中间件的困难问题,因为它们放大了消息中间件网络传输等的开销。但是kafka在消息传递时,做了一定的优化,将若干小消息合并发送。可以通过下面两张趋势图,分别是 Records/s和 MB/s的吞吐量图形,来显示集群的吞吐量变化趋势。
benchmark_recordsize_throughput

benchmark_recordsize_throughput

通过上面两幅图可以看到,随着单条消息大小的增加,TPS随之减少;但集群整体数据吞吐量随之增加。

性能测试配置信息:

1
2
3
4
5
集群:3
CPU:Intel Xeon 2.5 GHz 6
内存:32G
硬盘:Six 7200 RPM SATA drives
网卡:1000Mb/s

性能测试结果:

producer性能测试结果:

1个producer,1个topic,6个partitions,无replication。 821,557 records/sec. 78.3 MB/sec
1个producer,1个topic,6个partitions,3个异步replication。 786,980 records/sec. 75.1 MB/sec
1个producer,1个topic,6个partitions,3个同步replication。 421,823 records/sec. 40.2 MB/sec
3个producer,1个topic,6个partitions,3个异步replication。 2,024,032 records/sec. 193.0 MB/sec

consumer性能测试结果:

1个consumer,1个topic,6个partitions,3个replication。 940,521 records/sec. 89.7 MB/sec
3个consumer,1个topic,6个partitions,3个replication。 2,615,968 records/sec. 249.5 MB/sec

InfoQ 测试

测试共使用6台安装Red Hat 6.6的虚拟机,3台作为Broker,另外3台作为Producer或者Consumer。每台虚拟机配置如下:

1
2
3
4
集群:3
CPU:8 vCPU, Intel(R) Xeon(R) CPU E5-2680 v2 @ 2.80GHz,2 Sockets,4 Cores per socket,1 Thread per core
内存:16 GB
磁盘:500 GB

Producer Number VS. Throughput

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,消息Payload为100字节。

测试项目:分别测试1,2,3个Producer时的吞吐量。

测试结果:使用不同个数Producer时的总吞吐率如下图所示

Kafka_producer_num_throughtput

测试结论:随着Producer个数的提升,每秒总共发送的消息量线性提升

Message Size VS. Throughput

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,3个Producer。

测试项目:分别测试消息长度为10,20,40,60,80,100,150,200,400,800,1000,2000,5000,10000字节时的集群总吞吐量。

测试结果:不同消息长度时的集群总吞吐率如下图所示:

Kafka_message_size_throughput

测试结论:消息越长,每秒所能发送的消息数越少,而每秒所能发送的消息的量(MB)越大。另外,每条消息除了Payload外,还包含其它Metadata,所以每秒所发送的消息量比每秒发送的消息数乘以100字节大,而Payload越大,这些Metadata占比越小,同时发送时的批量发送的消息体积越大,越容易得到更高的每秒消息量(MB/s)

Partition Number VS. Throughput

实验条件:3个Broker,1个Topic,无Replication,异步模式,3个Producer,消息Payload为100字节。

测试项目:分别测试1到9个Partition时的吞吐量。

测试结果:不同Partition数量时的集群总吞吐率如下图所示:

Kafka_partition_num_throughput

测试结论:当Partition数量小于Broker个数(3个)时,Partition数量越大,吞吐率越高,且呈线性提升。本文所有实验中,只启动3个Broker,而一个Partition只能存在于1个Broker上(不考虑Replication。即使有Replication,也只有其Leader接受读写请求),故当某个Topic只包含1个Partition时,实际只有1个Broker在为该Topic工作。如之前文章所讲,Kafka会将所有Partition均匀分布到所有Broker上,所以当只有2个Partition时,会有2个Broker为该Topic服务。3个Partition时同理会有3个Broker为该Topic服务。换言之,Partition数量小于等于3个时,越多的Partition代表越多的Broker为该Topic服务。如前几篇文章所述,不同Broker上的数据并行插入,这就解释了当Partition数量小于等于3个时,吞吐率随Partition数量的增加线性提升。

当Partition数量多于Broker个数时,总吞吐量并未有所提升,甚至还有所下降。可能的原因是,当Partition数量为4和5时,不同Broker上的Partition数量不同,而Producer会将数据均匀发送到各Partition上,这就造成各Broker的负载不同,不能最大化集群吞吐量。而上图中当Partition数量为Broker数量整数倍时吞吐量明显比其它情况高,也证实了这一点。

Replica Number VS. Throughput

实验条件:3个Broker,1个Topic,6个Partition,异步模式,3个Producer,消息Payload为100字节。

测试项目:分别测试1到3个Replica时的吞吐率。

测试结果:如下图所示:

Kafka_replica_num_throughput

测试结论:随着Replica数量的增加,吞吐率随之下降。但吞吐率的下降并非线性下降,因为多个Follower的数据复制是并行进行的,而非串行进行。

Consumer Only

实验条件:3个Broker,1个Topic,6个Partition,无Replication,异步模式,消息Payload为100字节。

测试项目:分别测试1到3个Consumer时的集群总吞吐率。

测试结果:在集群中已有大量消息的情况下,使用1到3个Consumer时的集群总吞吐量如下图所示:

Kafka_consumer_num_throughput

测试结论:单个Consumer每秒可消费306万条消息,该数量远大于单个Producer每秒可消费的消息数量,这保证了在合理的配置下,消息可被及时处理。并且随着Consumer数量的增加,集群总吞吐量线性增加。

注意: 多Consumer消费消息时以Partition为分配单位,当只有1个Consumer时,该Consumer需要同时从6个Partition拉取消息,该Consumer所在机器的I/O成为整个消费过程的瓶颈,而当Consumer个数增加至2个至3个时,多个Consumer同时从集群拉取消息,充分利用了集群的吞吐率。

三、Kafka高可用和一致性

Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对Failover要求非常高。因此,Kafka从0.8开始提供High Availability机制。本章节主要从Data Replication(数据备份)和Leader Election(选举)两方面介绍了Kafka的HA机制。

为何需要Replication

在Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。

如果Producer使用同步模式则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该Broker的数据的丢失。
如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。
由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。

为何需要Leader Election

注意:本章节所述Leader Election主要指Replica之间的Leader Election。

引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。

因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。

高可用解析–如何将所有Replica均匀分布到整个集群

为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到高可用的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。

Kafka分配Replica的算法如下:

  • 将所有Broker(假设共n个Broker)和待分配的Partition排序
  • 将第i个Partition分配到第(i mod n)个Broker上
  • 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

Data Replication

Kafka的Data Replication需要解决如下问题:

  • 怎样Propagate(传递,同步)消息
  • 在向Producer发送ACK前需要保证有多少个Replica已经收到该消息
  • 怎样处理某个Replica不工作的情况
  • 怎样处理Failed Replica恢复回来的情况

Propagate消息

Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的所有Replica的ACK(ACK策略不同,处理方式不同),该消息就被认为已经commit了,Leader将增加HW并且向Producer发送ACK。

为了提高性能,每个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。因此,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能完全保证异常发生后该条消息一定能被Consumer消费。但考虑到这种场景非常少见,可以认为这种方式在性能和数据持久化上做了一个比较好的平衡。在将来的版本中,Kafka会考虑提供更高的持久性。

Consumer读消息也是从Leader读取,只有被commit过的消息,才会暴露给Consumer。

Kafka Replication的数据流如下图所示:

Kafka_replication

###ACK前需要保证有多少个备份

和大部分分布式系统一样,Kafka处理失败需要明确定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与ZooKeeper的session(这个通过ZooKeeper的Heartbeat机制来实现)。二是Follower必须能够及时将Leader的消息复制过来,不能“落后太多”。

Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。如果一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。

这里的落后太多是指

  • replica.lag.time.max.ms:在0.9.0.0之前表示follower如果在此时间间隔内没有向leader发送fetch请求,则该follower就会被剔除isr列表,在0.9.0.0之后表示如果该follower在此时间间隔内一直没有追上过leader的所有消息,则该follower就会被剔除isr列表
  • replica.lag.max.messages(0.9.0.0版本中已被废除):follower如果落后leader的消息个数超过该值,则该follower就会被剔除isr列表。 废除的主要原因是:目前这个配置是个统一配置,不同的topic速率生产速率不太一样,没办法来指定一个具体的值来应用到所有的topic上。将来可以将这个配置下放到topic级别,关于这个问题,可以见这里的讨论Automate replica lag tuning

Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,完全同步复制要求所有能工作的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka非常重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种情况下如果Follower都复制完都落后于Leader,而如果Leader突然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower可以批量的从Leader复制数据,这样极大的提高复制性能(批量写磁盘),极大减少了Follower与Leader的差距。

一条消息只有被ISR里的所有Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而造成数据丢失(Consumer无法消费这些数据)。而对于Producer而言,它可以选择是否等待消息commit,这可以通过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。每一个producer发送消息给某个分区的leader副本,其他follower副本又会复制该消息。

request.required.acks配置参数说明如下:

  • acks=0:表示producer不需要leader发送响应,即producer只管发不管发送成功与否。延迟低,容易丢失数据。
  • acks=1:表示leader写入成功(但是并没有刷新到磁盘)后即向producer响应。延迟中等,一旦leader副本挂了,就会丢失数据。
  • acks=-1:表示leader会等待isr列表中所有副本都写入成功才向producer发送响应。延迟高、可靠性高。

同时对于isr列表的数量要求也有一个配置,该配置更好的平衡了吞吐量和高可靠性以及一致性问题。

  • min.insync.replicas:默认是1。当acks=-1的时候,leader在处理新消息前,会先判断当前isr列表的的size是否小于这个值,如果小于的话,则不允许写入,返回NotEnoughReplicasException异常。同时,一旦允许写入了之后,在响应producer之前也会判断当前isr列表的size是否小于该值,如果小于返回NotEnoughReplicasAfterAppendException异常

Leader Election算法

上文说明了Kafka是如何做Replication的,另外一个很重要的问题是当Leader宕机了,怎样在Follower中选举出新的Leader。因为Follower可能落后许多或者crash了,所以必须确保选择“最新”的Follower作为新的Leader。一个基本的原则就是,如果Leader不在了,新的Leader必须拥有原来的Leader commit过的所有消息。这就需要作一个折衷,如果Leader在标明一条消息被commit前等待更多的Follower确认,那在它宕机之后就有更多的Follower可以作为新的Leader,但这也会造成吞吐率的下降。

一种非常常用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,如果我们有2f+1个Replica(包含Leader和Follower),那在commit之前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。因为在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的所有消息。这种方式有个很大的优势,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。如果要容忍1个follower挂掉,必须要有3个以上的Replica,如果要容忍2个Follower挂掉,必须要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必须要有大量的Replica,而大量的Replica又会在大数据量下导致性能的急剧下降。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而很少在需要存储大量数据的系统中使用的原因。例如HDFS的HA Feature是基于majority-vote-based journal,但是它的数据存储并没有使用这种方式。

实际上,Leader Election算法非常多,比如ZooKeeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的Leader Election算法更像微软的PacificA算法。

Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的所有Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是非常有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前需要等待的Replica数量是一样的,但是ISR需要的总的Replica的个数几乎是Majority Vote的一半。

虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优势,但是Kafka作者认为Kafka可以通过Producer选择是否被commit阻塞来改善这一问题,并且节省下来的Replica和磁盘使得ISR模式仍然值得。

如何处理所有Replica都不工作

上文提到,在ISR中至少有一个follower时,Kafka可以确保已经commit的数据不丢失,但如果某个Partition的所有Replica都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:

  • 等待ISR中的任一个Replica“活”过来,并且选它作为Leader
  • 选择第一个“活”过来的Replica(不一定是ISR中的)作为Leader

这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。而且如果ISR中的所有Replica都无法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica作为Leader,而这个Replica不是ISR中的Replica,那即使它并不保证已经包含了所有已commit的消息,它也会成为Leader而作为consumer的数据源(前文有说明,所有读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在以后的版本中,Kafka支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

如何选举Leader

最简单最直观的方案是,所有Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时所有Follower都尝试创建该节点,而创建成功者(ZooKeeper保证只有一个能创建成功)即是新的Leader,其它Replica即为Follower。

但是该方法会有3个问题:

  • split-brain 这是由ZooKeeper的特性引起的,虽然ZooKeeper能保证所有Watch按顺序触发,但并不能保证同一时刻所有Replica“看”到的状态是一样的,这就可能造成不同Replica的响应不一致
  • herd effect 如果宕机的那个Broker上的Partition比较多,会造成多个Watch被触发,造成集群内大量的调整
  • ZooKeeper负载过重 每个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增加到几千个Partition时ZooKeeper负载会过重。

Kafka 0.8.*的Leader Election方案解决了上述问题,它在所有broker中选出一个controller,所有Partition的Leader选举都由controller决定。controller会将Leader的改变直接通过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此作为响应的Broker。同时controller也负责增删Topic以及Replica的重新分配。

KafkaController负责:

  • 监控所有broker的存活,以及向他们发送相关的执行命令。
  • 分区的状态维护:负责分区的新增、下线等,分区副本的leader选举
  • 副本的状态维护:负责副本的新增、下线等

如果某个broker挂了,leader副本在该broker上的分区就要重新进行leader选举。来简要描述下leader选举的过程

  • KafkaController会监听ZooKeeper的/brokers/ids节点路径,一旦发现有broker挂了,执行下面的逻辑。这里暂时先不考虑KafkaController所在broker挂了的情况,KafkaController挂了,各个broker会重新leader选举出新KafkaController
  • leader副本在该broker上的分区就要重新进行leader选举,目前的选举策略是
  • 优先从isr列表中选出第一个作为leader副本
  • 如果isr列表为空,则查看该topic的unclean.leader.election.enable配置。

    unclean.leader.election.enable:为true则代表允许选用非isr列表的副本作为leader,那么此时就意味着数据可能丢失。
    为false的话,则表示不允许,直接抛出NoReplicaOnlineException异常,造成leader副本选举失败。

  • 如果上述配置为true,则从其他副本中选出一个作为leader副本,并且isr列表只包含该leader副本。

  • 一旦选举成功,则将选举后的leader和isr和其他副本信息写入到该分区的对应的zk路径上。
  • KafkaController向上述相关的broker上发送LeaderAndIsr请求,将新分配的leader、isr、全部副本等信息传给他们。同时将向所有的broker发送UpdateMetadata请求,更新每个broker的缓存的metadata数据。
  • 如果是leader副本,更新该分区的leader、isr、所有副本等信息。
  • 如果是follower副本,更新该分区的leader、isr、所有副本等信息。
  • 最后创建新的fetch请求线程,向新leader不断发送fetch请求。

    四、Kafka存储

Kafka中的Message是以topic为基本单位组织的,不同的topic之间是相互独立的。每个topic又可以分成几个不同的partition(每个topic有几个partition是在创建topic时指定的),每个partition存储一部分Message。借用官方的一张图,可以直观地看到topic和partition的关系。

Kafka_log_new_msg_write

partition是以文件的形式存储在文件系统中,比如,创建了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录: page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为-,里面存储的分别就是这5个partition的数据。

Partition的数据文件

Partition中的每条Message由offset来表示它在这个partition中的偏移量,这个offset不是该Message在partition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了partition中的一条Message。因此,可以认为offset是partition中Message的id。partition中的每条Message包含了以下三个属性:

  • offset
  • MessageSize
  • data

其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。

如果一个partition只有一个数据文件会怎么样?

  • 新数据是添加在文件末尾(调用FileMessageSet的append方法),不论文件数据文件有多大,这个操作永远都是O(1)的。
  • 查找某个offset的Message(调用FileMessageSet的searchFor方法)是顺序查找的。因此,如果数据文件很大的话,查找的效率就低。

那Kafka是如何解决查找效率的的问题呢?有两大法宝:1) 分段 2) 索引。

数据文件的分段

Kafka解决查询效率的手段之一是将数据文件分段,比如有100条Message,它们的offset是从0到99。假设将数据文件分成5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就可以定位到该Message在哪个段中。

为数据文件建索引

Kafka_Log_Segment

数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka为每个分段后的数据文件建立了索引文件,文件名与数据文件的名字是一样的,只是文件扩展名为.index。
索引文件中包含若干个索引条目,每个条目表示数据文件中一条Message的索引。索引包含两个部分(均为4个字节的数字),分别为相对offset和position。

  • 相对offset:因为数据文件分段以后,每个数据文件的起始offset不为0,相对offset表示这条Message相对于其所属数据文件中最小的offset的大小。举例,分段后的一个数据文件的offset是从20开始,那么offset为25的Message在index文件中的相对offset就是25-20 = 5。存储相对offset可以减小索引文件占用的空间。
  • position,表示该条Message在数据文件中的绝对位置。只要打开文件并移动文件指针到这个position就可以读取对应的Message了。

index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

五、Kafka PageCache

Kafka PageCache简介

Kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,最终返回需要的数据。实际上PageCache是把尽可能多的空闲内存都当做了磁盘缓存来使用。同时如果有其他进程申请内存,回收PageCache的代价又很小,所以现代的OS都支持PageCache。

使用PageCache功能同时可以避免在JVM内部缓存数据,JVM为我们提供了强大的GC能力,同时也引入了一些问题不适用与Kafka的设计。

如果在Heap内管理缓存,JVM的GC线程会频繁扫描Heap空间,带来不必要的开销。如果Heap过大,执行一次Full GC对系统的可用性来说将是极大的挑战。

所有的In-Process Cache在OS中都有一份同样的PageCache。所以通过将缓存只放在PageCache,可以至少让可用缓存空间翻倍。

如果Kafka重启,所有的In-Process Cache都会失效,为了防止内存中的数据随着kafka重启而丢失,因此Kafka将消息数据存储在PageCache中,而在遇到上述状况,Kafka重启后,OS管理的PageCache依然可以继续使用。

Kafka Sendfile技术

PageCache还只是第一步,Kafka为了进一步的优化性能还采用了Sendfile技术。在解释Sendfile之前,首先介绍一下传统的网络I/O操作流程,大体上分为以下4步。

  1. OS 从硬盘把数据读到内核区的PageCache。
  2. 用户进程把数据从内核区Copy到用户区。
  3. 然后用户进程再把数据写入到Socket,数据流入内核区的Socket Buffer上。
  4. OS 再把数据从Buffer中Copy到网卡的Buffer上,这样完成一次发送。

传统IO流程

整个过程共经历两次Context Switch,四次System Call。同一份数据在内核Buffer与用户Buffer之间重复拷贝,效率低下。其中2、3两步没有必要,完全可以直接在内核区完成数据拷贝。这也正是Sendfile所解决的问题,经过Sendfile优化后,整个I/O过程就变成了下面这个样子。

Kafka的IO流程

通过以上的介绍不难看出,Kafka的设计初衷是尽一切努力在内存中完成数据交换,无论是对外作为一整个消息系统,或是内部同底层操作系统的交互。如果Producer和Consumer之间生产和消费进度上配合得当,完全可以实现数据交换零I/O。这也就是为什么说Kafka使用“硬盘”并没有带来过多性能损失的原因。

只要生产者与消费者的速度相差不大,消费者会直接读取之前生产者写入Page Cache的数据,大家在内存里完成接力,根本没有磁盘访问。

而比起在内存中维护一份消息数据的传统做法,这既不会重复浪费一倍的内存,Page Cache又不需要GC,而且即使Kafka重启了,Page Cache还依然在。

下图是实际生产情况的统计(20 Brokers, 75 Partitions per Broker, 110k msg/s):

Kafka_cluster_statistics

此时的集群只有写,没有读操作。10M/s左右的Send的流量是Partition之间进行Replicate而产生的。

而在有Read Request进来的时候分为两种情况,第一种是内存中完成数据交换,如下图:

Kafka_cluster_statistics_io

Send流量从平均10M/s增加到了到平均60M/s,而磁盘Read只有不超过50KB/s。PageCache降低磁盘I/O效果非常明显。

接下来是读一些收到了一段时间,已经从内存中被换出刷写到磁盘上的老数据。

Kafka_cluster_statistics_io_read

其他指标还是老样子,而磁盘Read已经飚高到40+MB/s。此时全部的数据都已经是走硬盘了。但也依然没有任何性能问题。

Tips

  1. Kafka官方并不建议通过Broker端的log.flush.interval.messages和log.flush.interval.ms来强制写盘,认为数据的可靠性应该通过Replica来保证,而强制Flush数据到磁盘会对整体性能产生影响。
  2. 可以通过调整/proc/sys/vm/dirty_background_ratio和/proc/sys/vm/dirty_ratio来调优性能。
  3. 脏页率超过第一个指标会启动pdflush开始Flush Dirty PageCache。
  4. 脏页率超过第二个指标会阻塞所有的写操作来进行Flush。
  5. 根据不同的业务需求可以适当的降低dirty_background_ratio和提高dirty_ratio。

结论

  1. Kafka在运行过程中,会尽可能多的把空闲内存都当做了磁盘缓存来使用。
  2. 当上层有写操作时,操作系统只是将数据写入PageCache,同时标记Page属性为Dirty。
  3. 当读操作发生时,先从PageCache中查找,如果发生缺页才进行磁盘调度,再写回Page Cache,最终返回需要的数据。
  4. 同时如果有其他进程申请内存,会回收抢占一部分PageCache,但也会导致Kafka吞吐量下降会不稳定。这个已在测试工作中遇到。
  5. Kafka使用PageCache功能同时可以避免在JVM内部缓存数据
  6. Kafka重启后,OS管理的PageCache不会被释放,依然可以继续使用。
  7. Kafka使用了Sendfile技术,使数据交换的I/O过程均处于内核态运行。
  8. Kafka的刷盘机制,会将PageCache中的消息数据刷到磁盘当中,保证数据不会丢失。

六、Kafka监控

Kafka-web-console

  1. Kafka-web-console开源项目已不再继续维护,开源开发者推荐使用Kafka-manage进行替代。
  2. Scala语言开发。
  3. Github: https://github.com/claudemamo/kafka-web-console

kafka_web_console

支持以下功能:

  1. Brokers列表
  2. Kafka 集群中 Topic列表,及对应的Partition、LogSiz e等信息
  3. 点击Topic,可以浏览对应的Consumer Groups、Offset、Lag等信息

KafkaOffsetMonitor

  1. KafkaOffsetMonitor开源项目已不再继续开发。功能简单,直观,提供查询功能,jar包部署。
  2. Scala语言开发。
  3. Github: https://github.com/quantifind/KafkaOffsetMonitor

kafka_offset_monitor

支持以下功能:

  1. Topic、Consumer Group列表
  2. 图形化展示topic和consumer之间的关系
  3. 图形化展示consumer的Offset、Lag等信息

Availability-Monitor-for-Kafka

  1. Availability-Monitor-for-Kafka是微软Microsoft开源的Kafka监控项目。使用简单,jar包部署。
  2. 社区一般活跃,暂无release版本。
  3. Java语言开发;与maven集成;与SQL Server集成。
  4. 最新支持到Kafka 0.8.1.1版本
  5. Github: https://github.com/Microsoft/Availability-Monitor-for-Kafka

Availability_Monitor_for_Kafka

Availability_Monitor_for_Kafka

支持以下功能:

  1. 非常low的一款监控工具,无web页面,只能通过控制台去观察
  2. 查看topic,partition等相关信息。
  3. 可监控发送与读取数据的可用性、延迟;
  4. Producer可用性=所有主题和分区成功发送消息数/尝试发送消息数
  5. Consumer可用性=所有主题和分区消息成功读取数/消息尝试读取数
  6. 通过往Producer发送数据测试Producer的可用性与延迟,消息以CanaryMessage_为前缀,可以通过producerProperties.json修改;
  7. 通过从所有分区的尾部读取数据来衡量Consumer的可用性与延迟

Kafka-monitor

  1. Kafka-monitor是Linkedin官方提供Kafka监控工具。代码提供3个branch分别对Kafka0.8,0.9,0.10提供支持。
  2. 社区一般活跃,暂时只发布一个release版本。2016年4月开始开发,开源时间较短。
  3. Java语言开发。与gradle集成。
  4. 最新支持到Kafka 0.10.0.1
  5. Github: https://github.com/linkedin/kafka-monitor

kafka_monitor

支持以下功能:

  1. 只能看,不能改
  2. kafka-monitor启动后会启动一个produce、一个consume,broker过时时间为10分钟; 用于捕获服务的可用性、消息丢失率、延迟率等,可监控集群与单个kafka;
  3. kafka-monitor默认自动创建监控topic,可以修改为已经存在的topic,自动增加partition
  4. produce用于生成消息到kafka,并产生生成速率、可用性度量数据
  5. consume从kafka中消费消息,并产生消息丢失率、消息重复率、端到端延迟 依赖生产服务来提供消息内嵌的消息序列号和时间戳。
  6. 支持restful接口

Kafka-manage

  1. Kafka-manage是Yahoo官方提供的Kafka监控工具。
  2. 开源项目社区非常活跃,已发布21个release版本。2015年1月29日开始开发,总计446 commits。
  3. 功能非常强大、全面。对新版Kafka有一定支持,但不一定会支持到最新版本功能。如0.10版本消息携带timestamp即时间戳字段,可以根据时间戳进行消息查询(暂不确定,待确认)。
  4. Scala语言开发。
  5. 要求Kafka 0.8.1.1 或 0.8.2. 或 0.9.0. 或0.10.0.*。
  6. 要求JDK8。
  7. 最新支持到Kafka 0.10.1.0
  8. Github: https://github.com/yahoo/kafka-manager

kafka_manager

支持以下功能:

  1. 管理多个集群(clusters)
  2. 群集状态的简单检查–主题、消费者、偏移量、服务实例、副本分布、分区分布 (topics, consumers, offsets, brokers, replica distribution, partition distribution)
  3. 运行leader副本选举
  4. 生成partition的分配,(通过选项来选择使用哪个broker)
  5. 运行Partition的重新分配(基于4.生成partition的分配)
  6. 通过可选的topic配置,创建一个topic(0.8.1.1与0.8.2 +有不同的配置)
  7. 删除topic(仅在0.8.2 +版本中支持,且需要在broker配置delete.topic.enable=true)
  8. topic列表现在显示标记为删除的内容(仅在0.8.2 +版本中支持)
  9. 批量生成多个topic的partition分配,(可以通过选项来选择使用哪个broker)
  10. 运行多topic的partition的批量重分配
  11. 为已存在的topic新增partitions
  12. 更新现已存在topic的配置
  13. 选择是否为broker级别和topic级别的指标而启用JMX轮询。
  14. 选择过滤出在zookeeper中没有ids/owners/&offsets/directories的消费者。

结论

  1. 前三个开源Kafka监控项目基本无需考虑。
  2. Kafka-monitor优势在于官方出品,属嫡系产品。虽然项目起步较晚,但未来前景可观。
  3. Kafka-manage优势在于社区活跃,功能支持强大,release版本众多,饱经生产考验。

七、Kafka Producer

Kafka Producer有sync和async两种发送方式。

Producer sync发送消息流程。

kafka_producer_sync

具体流程:

  1. 每new一个Java的Producer类,就会有创建Producer、ProducerPool等。
  2. ProducerPool为连接不同kafka broker的池,初始连接个数有broker.list参数决定。
  3. 发送消息时,首先序列化所有消息。
  4. 调用partitionAndCollate方法。
  5. 作用:获取所有partitions的leader所在leaderBrokerId(就是在该partiionid的leader分布在哪个broker上)
  6. NIO发送消息

Producer async发送消息流程
kafka_producer_async

Kafka中Producer异步发送消息是基于同步发送消息的接口来实现的,异步发送消息的实现很简单,客户端消息发送过来以后,先放入到一个队列中然后就返回了。Producer再开启一个线程(ProducerSendThread)不断从队列中取出消息,然后调用同步发送消息的接口将消息发送给Broker。

八、Kafka Consumer

kafka_consumer

消费者线程(consumer thread), 队列,拉取线程(fetch thread)三者之间关系

每一个topic至少需要创建一个consumer thread,如果有多个partitions,则可以创建多个consumer thread线程,consumer thread <= partitions数量,否则会有consumer thread空闲。

具体说明一下三者关系:

  • topic的partitions分布规则

    paritions是按照kafka brokerId有序分配的。

    例如现在有三个node安装了kafka broker服务端程序,brokerId分别设置为1,2,3,现在准备一个topic为test-string-topic,并且分配12个partitons,此时partitions的kafka broker节点分布情况为 ,partitions索引编号为0,3,6,9等4个partitions在brokerId=1上,1,4,7,10在brokerId=2上,2,5,8,11在brokerId=3上。

  • 创建consumer thread(consumer thread数量由客户端用于配置)

    consumer thread数量与BlockingQueue一一对应。即创建了几个consumer线程,即创建几个阻塞队列。

  • fetch thread

    拉取线程。topic分布在几个node上就有几个fetch thread,每个fetch thread会于kafka broker建立一个连接。即该consumer client与消费几个broker的partitions,即有几个fetch thread。

  • BlockingQueue

    阻塞队列,实际存储消息的队列。

  • 当consumer thread count=1时,按照上面的例子

    此时创建一个阻塞队列,即有一个blockingQueue1。三个fetch thread线程,该topic分布在三台node上,就有三个fetch thread,每个fetch thread会于kafka broker建立一个连接。3个fetch thread线程去拉取消息数据,最终放到blockingQueue1中,等待consumer thread来消费。

    消费者线程,缓冲队列,partitions分布列表如下:

    | | | |
    | ———— | ———— | ———— |
    | consumer线程 | Blocking Queue | partitions |
    | consumer thread 1 | blocking queue 1 | 0,1,2,3,4,5,6,7,8,9,10,11 |

    fetch thread与partitions分布列表如下:

    | | |
    | ———— | ———— |
    | fetch线程 | partitions |
    | fetch thread 1 | 0,3,6,9 |
    | fetch thread 2 | 1,4,7,10 |
    | fetch thread 3 | 2,5,8,11 |

  • 当consumer thread count=2时

    此时有consumerThread1和consumerThread2分别对应2个队列blockingQueue1,blockingQueue2,这2个消费者线程消费partitions依次为:0,1,2,3,4,5与6,7,8,9,10,11;

    消费者线程,缓冲队列,partitions分布列表如下:

    | | | |
    | ———— | ———— | ———— |
    | consumer线程 | Blocking Queue | partitions |
    | consumer thread 1 | blocking queue 1 | 0,1,2,3,4,5 |
    | consumer thread 2 | blocking queue 2 | 6,7,8,9,10,11 |

    fetch thread与partitions分布列表如下:

    | | |
    | ———— | ———— |
    | fetch线程 | partitions |
    | fetch thread 1 | 0,3,6,9 |
    | fetch thread 2 | 1,4,7,10 |
    | fetch thread 3 | 2,5,8,11 |

  • 当consumer thread count=4时

    消费者线程,缓冲队列,partitions分布列表如下:

    | | | |
    | ———— | ———— | ———— |
    | consumer线程 | Blocking Queue | partitions |
    | consumer thread 1 | blocking queue 1 | 0,1,2 |
    | consumer thread 2 | blocking queue 2 | 3,4,5 |
    | consumer thread 3 | blocking queue 3 | 6,7,8 |
    | consumer thread 4 | blocking queue 4 | 9,10,11 |

    fetch thread与partitions分布列表如下:

    | | |
    | ———— | ———— |
    | fetch线程 | partitions |
    | fetch thread 1 | 0,3,6,9 |
    | fetch thread 2 | 1,4,7,10 |
    | fetch thread 3 | 2,5,8,11 |

  • 同理当消费线程consumer thread count=n,都是安装上述分布规则来处理的。

九、Kafka Zookeeper

kafka_zk_node