Kafka最佳实践

broker配置

broker配置文件为config/server.properties文件,配置内容主要分为以下几个模块,其他详细配置参见-Kafka用户手册

Server Basics

Kafka server 基本配置

  • broker.id:是kafka集群server的唯一标识。
  • delete.topic.enable:是否开启topic删除功能,可根据具体需求决定。开发测试环境推荐开启,生产环境推荐关闭。
1
2
3
4
5
# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
#delete.topic.enable=false

Socket Server Settings

Kafka 网络相关配置

  • listeners:由用户配置协议,ip,port。
  • 其他配置项,开发测试环境可使用默认配置;生产环境推荐如下配置。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://172.21.195.89:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=8

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

Log Basics

Kafka log 基本配置

  • log.dirs:log文件存储路径
  • num.partitions:topic默认的partitions数量。在创建topic时,一般会指定partitions数量,因此该配置项在上述条件下基本无用。为了防止在创建topic时,未指定partitions数量,因此推荐使用配置为3。
  • 其他配置推荐使用默认配置
1
2
3
4
5
6
7
8
9
10
11
# A comma seperated list of directories under which to store log files
log.dirs=/home/ggzjs/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

Internal Topic Settings

Kafka 内部topic配置

  • 开发测试环境推荐使用默认配置,均为1
  • 生产环境推荐如下配置,replication数量为3,isr数量为2。
1
2
3
4
5
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2

Log Flush Policy

Kafka log 刷盘、落盘机制

  • log.flush.interval.messages:日志落盘消息条数间隔,即每接收到一定条数消息,即进行log落盘。
  • log.flush.interval.ms:日志落盘时间间隔,单位ms,即每隔一定时间,即进行log落盘。
  • 强烈推荐开发、测试、生产环境均采用默认值,即不配置该配置,交由操作系统自行决定何时落盘,以提升性能。
  • 若对消息高可靠性要求较高的应用系统,可针对topic级别的配置,配置该属性。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
# 1. Durability: Unflushed data may be lost if you are not using replication.
# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

Log Retention Policy

Kafka log保留策略配置

  • log.retention.hours:日志保留时间,单位小时。和log.retention.minutes两个配置只需配置一项。
  • log.retention.minutes:日志保留时间,单位分钟。和log.retention.hours两个配置只需配置一项。
  • log.retention.bytes:日志保留大小。一topic的一partition下的所有日志大小总和达到该值,即进行日志清除任务。当日志保留时间或日志保留大小,任一条件满足即进行日志清除任务。
  • log.segment.bytes:日志分段大小。即一topic的一partition下的所有日志会进行分段,达到该大小,即进行日志分段,滚动出新的日志文件。
  • log.retention.check.interval.ms:日志保留策略定期检查时间间隔,单位ms。
  • log.segment.delete.delay.ms:日志分段删除延迟时间间隔,单位ms。
  • 日志保留大小,保留时间以及日志分段大小可根据具体服务器磁盘空间大小,业务场景自行决定。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
#log.retention.hours=168
log.retention.minutes=10

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
log.retention.bytes=5368709120

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=536870912

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

log.segment.delete.delay.ms=60000

log.cleaner.enable=true

Zookeeper

Kafka zookeeper 配置

  • zookeeper.connect:zk连接地址
  • zookeeper.connection.timeout.ms:zk连接超时时间,默认6s。可根据具体的应用场景进行更改,特可采用如下配置。
1
2
3
4
5
6
7
8
9
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=172.21.195.89:2181,172.21.195.90:2181,172.21.195.91:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=60000

Group Coordinator Settings

Kafka consumer group 协调配置

  • 生产环境推荐配置3000
  • 开发测试环境推荐配置0
1
2
3
4
5
6
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=3000

producer配置

  • ProducerConfig.ACKS_CONFIG:producer发送消息,server端确认机制。可配置值为0,1,all。0时,producer只需发送,不需要server端确认接收,适合对消息可靠性不敏感的应用系统;1时,producer发送,需要server端该topic的leader确认接收,适合对消息可靠性相对敏感的应用系统;all时,producer发送,需要server端该topic的全部结点均确认接收,适合对消息可靠性特别敏感的应用系统;业务的应用系统可根据具体的应用场景自行决定采用何种策略。
  • 其他配置推荐使用默认值。

代码示例详情参见Kafka 快速开始文档。

1
2
3
4
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, "1");

consumer配置

  • ConsumerConfig.GRO
  • _ID_CONFIG:配置consumer的group id。由应用系统进行各自的配置。
  • ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG:配置consumer消费消息自动提交消费记录。也可改为手动提交,由应用系统根据需求决定。
  • 其他配置推荐使用默认值。
1
2
3
4
5
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_consumer_group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

参考资料

  1. Kafka官方性能测试报告
  2. Kafka官方性能测试配置参数
  3. Kafka性能测试报告
  4. Kafka用户手册