Kafka用户手册

1. APIS

Kafka主要包括五个核心apis。

  • 1.生产者api(Producer api)实现应用向 kafka 集群的topics发送流数据。
  • 2.消费者api(Consumer api)实现应用从 kafka 集群的topics读取数据流。
  • 3.流式api(Streams api)实现数据流从输入topics传输到输出topics。
  • 4.连接器api(Connect api)对于实现connect api 的应用,能持续从数据源系统或者应用拉取数据拉入(pull)到kafka集群中,或者持续将 kafka 数据推(push)到数据源系统或者应用。
  • 5.AdminClient api 实现管理和监控topics,brokers,或者其他 kafka 的属性。

kafka功能的实现与语言无关的,允许多种语言编程实现。kafka中官方文档使用的java的语言,其它客户端语言请点击这里。https://cwiki.apache.org/confluence/display/KAFKA/Clients

1.1 Producer Api

生产者api(Producer api)实现应用向 Kafka 集群的 topics 发送流数据。

Producer API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

1.2 Consumer API

消费者api(Consumer api)实现应用从 Kafka 集群的 topics 读取流数据。

Consumer API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

1.3 Streams API

流式api(Streams api)实现数据流从输入topics传输到输出topics。

Streams API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
</dependency>

1.4 Connect api

连接器api(Connect api)对于实现 connect api 的应用,能持续从数据源系统或者应用拉取数据拉入(pull) 到Kafka集群中,或者持续将Kafka数据推(push) 到后续的系统或者应用。

1.5 AdminClient API

AdminClient API 支持管理和检查topics, brokers, acls 和其它 kafka 对象。

AdminClient API 的 maven 依赖:

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>

1.6 遗弃 APIs

Kafka 依然保留限制遗弃的apis, 这些旧的scala apis仍然可以使用,以提高更强的兼容性。

Broker 配置

以下是必须配置的配置项:

1
2
3
broker.id
log.dirs
zookeeper.connect

以下是broker其他配置项:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
zookeeper.connect zookeeper 主机(String) string 高(high)
advertised.host.name 过时的:只有在’advertise.listeners’或者’listeners’未设置的时候使用。使用’advertise.listeners’替代。zookeeeper把 Kafka 的主机的集群信息公布给所有的客户端使用。 string null 高(high)
advertised.listeners string 高(high)
advertised.port 遗弃 int null 高(high)
auto.create.topics.enable 设置为true,可自动在服务节点上创建topic boolean true 高(high)
auto.leader.rebalance.enable 设置为true时,Kafka 的后台定时任务会定时检查leader是否平衡,不平衡自动平衡 boolean true 高(high)
background.threads Kafka 的后台的任务数目。 int 10 高(high)
broker.id Kafka 集群中,broker.id要求唯一,可以在配置文件中设置,但要保持唯一。如果不设置,系统会找到已经存在的id最大值,然后当前id+1 int -1 高(high)
compression.type 用来指定一个给定的topic的压缩方式,可指定的值’gzip’,’sanppy’,’lz4’,或者不压缩,默认为producer,保持与producer一致。 string producer 高(high)
delete.topic.enable 是否可以删除topic,如何设置为fasle,通过admin tool删除对集群无影响。 boolean false 高(high)
leader.imbalance.check.interval.seconds Kafka 后台处理partition平衡的周期任务的频率 long 300 高(high)
leader.imbalance.per.broker.percentage 每个broker节点上允许leader的占比,如果超过这个比例,后台平衡任务会触发,平衡leader节点。 int 10 高(high)
listeners 监听列表,需要设定通信格式和ip:port。如果listener没有设置,需要设置listener.security.protocol.map值。如果设置为0.0.0.0绑定所有接口。如不设置表示设置为默认值。格式如下:PLAINTEXT://myhost:9092,SSL://:9091CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093 string null 高(high)
log.dir 指定日志数据存放的路径,是单路径。 string /tmp/kafka-logs 高(high)
log.dirs 指定日志数据可以放到多个路径。未设置使用默认值log.dir值 string null 高(high)
log.flush.interval.messages 日志刷到日志时缓存中消息条数 long 9223372036854775807 高(high)
log.flush.interval.ms 在缓存中数据存在的时间,可以理解为周期性将缓存数据持久化。如果未设置则使用log.flush.scheduler.interval.ms设置 long null 高(high)
log.flush.offset.checkpoint.interval.ms 日志的恢复标记位刷新的频率。(日志文件有个标志位,是记录以及写入日志的检索,这里是用来设定这个标志位的频率)。 int 60000 高(high)
log.flush.scheduler.interval.ms 周期性缓存日志文件刷入到磁盘中。 long 9223372036854775807 高(high)
log.flush.start.offset.checkpoint.interval.ms 持久化日志起始位更新的频率 int 60000 高(high)
log.retention.bytes log可存储的最大字节数。大于则删除 long -1 高(high)
log.retention.hours log保存的时间(hours),超时删除。 int 168 高(high)
log.retention.minutes log保存的时间(minutes),超时删除。 int null 高(high)
log.roll.hours 如果旧的segement未达到新建的条件,新的segement需要等待最大时间。 int 168 高(high)
log.roll.jitter.hours long null 高(high)
log.roll.jitter.ms long null 高(high)
log.roll.ms 创建一个新的segement文件需要等待的时间。 long null 高(high)
log.segment.bytes 一个日志文件最大可保持的字节数 int 1073741824 [14,…] 高(high)
log.segment.delete.delay.ms 日志文件从文件系统中删除最大等待的时间 long 60000 高(high)
message.max.bytes Kafka 批处理最大允许的字节数。 int 1000012 高(high)
min.insync.replicas 当响应acks设置为all,表示所有同步的insync partition需要同步,这个参数是设置多少个同步即可,并不是所有的。但必须大于[n/2] int 1 高(high)
num.io.threads 服务器端用来处理客户端的线程数目,包括磁盘的I/O int 8 高(high)
num.network.threads 服务器端用来响应网络请求和发送网络应答的线程数目 int 3 高(high)
num.recovery.threads.per.data.dir 用来恢复日志的每个路径下的日志文件线程数目,当日志启动或者关闭前的刷数据 int 3 高(high)
num.replica.fetchers 备份broker的并行度,表示备份brokers用来同步源broker节点上数据的数目。 int 1 高(high)
offset.metadata.max.bytes metadata实体关联的offset最大可提交的字节数 int 4096 高(high)
offsets.commit.required.acks offsets提交被确认需要的acks. short -1 高(high)
offsets.commit.timeout.ms 一个offsets被提交,需要所有的备份完成同步。这个参数是用来等待确认超时时间。 int 5000 高(high)
offsets.load.buffer.size 从offsets segement 批量读入到cache的offsets的字节大小 int 5242880 高(high)
offsets.retention.check.interval.ms 检索旧的offsets的频率 long 60000 高(high)
offsets.retention.minutes 日志窗口为offsets topic保留的时间 int 1440 高(high)
offsets.topic.compression.codec 消息的压缩的方法 int 0 高(high)
offsets.topic.num.partitions topic提交offset需要的partitions的数目 int 50 高(high)
offsets.topic.replication.factor 设置一个topic的备份的数目。 short 3 高(high)
offsets.topic.segment.bytes offsets topic segment 需要被保持的合适大小。 int 104857600 高(high)
queued.max.requests 在相应阻塞之前运行最大的请求的数目 int 500 高(high)
replica.fetch.min.bytes 每次抓取数据需要最小抓取的量,如果没有达到,则按replicaMaxWaitTimeMs时间等等,超时,返回 int 1 高(high)
replica.fetch.wait.max.ms 备份文件去抓取主备份数据允许等待的最大时间 int 500 高(high)
replica.high.watermark.checkpoint.interval.ms the high watermark被保存到磁盘的频率 long 5000 高(high)
replica.lag.time.max.ms ISR的备份broker未完成同步被移除的时间最大时间 long 1000 高(high)
replica.socket.receive.buffer.bytes 网络请求套接字buffer大小 int 65536 高(high)
replica.socket.timeout.ms 网络请求套接字超时等待时间 int 30000 高(high)
request.timeout.ms 客户端最大等待时间,如果超时,客户端会retries设定的次数直到失败 int 30000 高(high)
socket.receive.buffer.bytes 客户端最大等待时间,如果超时,客户端会retries设定的次数直到失败 int 30000 高(high)
socket.receive.buffer.bytes 请求设置服务器套接字的大小 int 102400 高(high)
socket.request.max.bytes 请求套接字的最大字节数 int 104857600 高(high)
socket.send.buffer.bytes 发送服务器套接字的大小 int 102400 高(high)
transaction.max.timeout.ms 事务超时的最大等待时间。 int 900000 高(high)
transaction.state.log.load.buffer.size 从日志段事务批读取到缓存中的字节大小。 int 5242880 高(high)
transaction.state.log.min.isr 为了保证事务性,ISR中备份需要同步的数目 int 2 高(high)
transaction.state.log.num.partitions 主partition的备份数目同步的数目 int 50 高(high)
transaction.state.log.replication.factor 主节点的从节点的备份文件数目保证事务性 short 3 高(high)
transaction.state.log.segment.bytes 事务的topic段导入内存的大小 int 104857600 高(high)
transactional.id.expiration.ms 事务协调器最大的等待时间 int 604800000 高(high)
unclean.leader.election.enable 非ISR的replicas是否可以参与选择为leader。 boolean false 高(high)
zookeeper.connection.timeout.ms 客户端最大不能连接到zookeeper的等待时间。若未设置则session的设置起作用。 int null 高(high)
zookeeper.session.timeout.ms zookeeper session 超时时间。 int 6000 高(high)
zookeeper.set.acl 设置客户端使用包含acls boolean false 高(high)
broker.id.generation.enable 检验broker id的最大值是否正确 boolean true 高(high)
broker.rack string null 高(high)
connections.max.idle.ms 空闲的套接字处理线程最大的等待时间 long 600000 高(high)
controlled.shutdown.enable 是否允许关闭controlled节点 boolean true 高(high)
controlled.shutdown.max.retries controlled节点关闭失败的最多可尝试次数 int 3 高(high)
controlled.shutdown.retry.backoff.ms controlled 关闭失败,系统需要恢复之前的状态等待的时间 long 5000 高(high)
controller.socket.timeout.ms controlled到broker channels 超时时间 int 30000 高(high)
default.replication.factor 默认的备份的复制自动创建topics的个数 int 1 高(high)
delete.records.purgatory.purge.interval.requests int 1 高(high)
fetch.purgatory.purge.interval.requests int 1000 高(high)
group.initial.rebalance.delay.ms 组协调器进行负载均衡时的最大等待时间 int 3000 高(high)
group.max.session.timeout.ms 组中的消费者的session最大允许等待时间 int 300000 高(high)
group.min.session.timeout.ms 组中的消费者的session最小允许等待时间 int 6000 medium
inter.broker.listener.name 用于brokers中相互交流监听者的名字。如果没有设置,使用securitu.inter.broker.protocol,不可同时设置。 string null medium
inter.broker.protocol.version 用来指定inter-broker所采用的协议,特别是所有的broker完成升级后,用来检测是否所有的broker所采用的协议是否合法。 string 0.11.0-IV2 medium
log.cleaner.backoff.ms 当日志完成清理,清理程序休眠的时间。 long 15000 [0,…] medium
log.cleaner.dedupe.buffer.size 所有清理日志线程用来分离日志的总共的内存大小。 long 134217728 medium
log.cleaner.delete.retention.ms 被清理的日志保存的时间。 long 86400000 medium
log.cleaner.enable 设置日志清理文件能在服务器上默认运行。 boolean true medium
log.cleaner.io.buffer.load.factor 日志清理队列载入因子,越高清理日志越快,但hash冲突概率越高。 double 0.9 medium
log.cleaner.io.buffer.size 用于日志清理是I/O通道buffer大小 int 524288 medium
log.cleaner.io.max.bytes.per.second 日志清理读写限流。读写会小于这个值 double 1.7976931348623157E308 medium
log.cleaner.min.cleanable.ratio 触发日志清理最小的比例,比例是脏日志/所有日志 double 0.5 medium
log.cleaner.min.compaction.lag.ms 一条消息未被压缩的存在最短时间。仅仅适用于需要被压缩的日志 long 0 medium
log.cleaner.threads 后台用于清理日志的数目 long 1 medium
log.cleanup.policy 日志清理策略:取值’delete’和’compact’ list delete medium
log.index.interval.bytes 加入offset检索的键值对的区间 int 4096 [0,…] medium
log.index.size.max.bytes offset检索位最大字节数 int 10485760 [4,…] medium
log.message.format.version 指定消息格式追加在日志末尾的版本, int 10485760 0.11.0-IV2 medium
log.message.timestamp.difference.max.ms broker接受到消息的时间戳和指定时间戳之间允许最大时间差;如果log.message.timestamp.type=CreateTime,消息会被拒绝,如果消息时间差超过设定阈值。 long 9223372036854775807 medium
log.message.timestamp.type 消息时间戳是创建时间或者追加在日志结尾的时间。取值’createTime’ 或者’LogAppendTime’。 String CreateTime [CreateTime, LogAppendTime] medium
log.preallocate 是否提前分配日志文件,如果是 Kafka 使用环境是windows环境,建议设置为true boolean false medium
log.retention.check.interval.ms 毫秒级别,检查日志是否需要被删除。 long 300000 medium
max.connections.per.ip 每个ip允许最大数目的连接数。 int 2147483647 [1,…] medium
max.connections.per.ip.overrides 重写主机或者ip的最大连接数 string “” medium
num.partitions 默认topic的partitions的默认值。 int 1 [1,…] medium
principal.builder.class 通过继承这个接口,建立SSL 安全协议连接 class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder medium
producer.purgatory.purge.interval.requestss class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder medium
replica.fetch.backoff.ms 当从partition获取数据发生错误时,休眠的时间。 int 1000 [0,…] medium
replica.fetch.max.bytes partition备份获取主节点的最大字节数 int 1048576 [0,…] medium
replica.fetch.response.max.bytes partition备份获取主节点的应答最大字节数 int 10485760 [0,…] medium
reserved.broker.max.id broker.id可用的最大数目 int 1000 [0,…] medium
sasl.enabled.mechanisms Kafka 服务所允许的SASL机制的列表,默认的是GSSAPI list GSSAPI medium
sasl.kerberos.kinit.cmd kerberos kinit 命令的路径 string /usr/bin/kinit medium
sasl.kerberos.min.time.before.relogin 登录线程刷新时间间隔 long 6000 medium
sasl.kerberos.principal.to.local.rules 规则列表将主名字映射到简短的名字 list DFAULT medium
sasl.kerberos.service.name kerberos在 Kafka 中运行的方式,可以是JAAS的配置,或者 Kafka 的配置。 string null medium
sasl.kerberos.ticket.renew.jitter 重生的时间锁附加的随机差的百分比 double 0.5 medium
sasl.kerberos.ticket.renew.window.factor 从指定的时间窗口到最新的时间差登录超时,登录线程自动激活。 double 0.8 medium
sasl.mechanism.inter.broker.protocol 内置broker使用SASL机制,默认是GSSAPI String GSAPI medium
security.inter.broker.protocol brokers之间使用的安全通信协议,可选值为PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 String PLAINTEXT medium
ssl.client.auth Kafka broker应答客户端的权限控制,有三种,required,客户端应答需要认证。none表示不需要认证。 String none medium
ssl.enabled.protocols SSL连接可选用的协议列表 String TLSv1.2,TLSv1.1,TLSv1 medium
ssl.enabled.protocols SSL连接可选用的协议列表 String TLSv1.2,TLSv1.1,TLSv1 medium
ssl.keystore.type key store的文件格式,供客户端使用 String JKS medium
ssl.protocol SSLContext使用的SSL协议 String TLS medium
ssl.provider ssl连接安全的名字 String null medium
ssl.trustmanager.algorithm 用于SSL连接的信任管理器工厂算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 String PKIX medium
ssl.truststore.type 可信任的文件格式 String JKS medium
alter.config.policy.class.name alter 配置应该用于验证的策略类。 该类应该实现org.apache.kafka.server.policy.AlterConfigPolicy接口。 calss null medium
authorizer.class.name 授权继承的授权的类 string “” medium
create.topic.policy.class.name 应用于验证的创建主题策略类。 该类应该实现org.apache.kafka.server.policy.CreateTopicPolicy接口。具体来说,我们可以定义名称为INTERNAL和EXTERNAL的侦听器,该属性如下:INTERNAL:SSL,EXTERNAL:SSL class null medium
listener.security.protocol.map 在监听器名称和安全协议之间映射。 必须为同一个安全协议定义一个可用于多个端口或IP的安全协议。 例如,即使需要SSL,我们也可以分离内部和外部流量。,键和值由冒号分隔,映射条目用逗号分隔。 每个监听器名称只应在地图中显示一次。 string SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT medium
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” medium
metrics.num.samples 维护的样本数量用于计算度量。 int 2 [1,…] low
enable.idempotence 如果设置为true,每个消息在流中只复制一遍。 boolean false medium
interceptor.classes 用作拦截器的类的列表。 使用ProducerInterceptor接口,您可以在生成器发布到 Kafka 群集之前拦截(并且可能会变更)生产者收到的记录。 默认情况下,没有拦截器 list null medium
max.in.flight.requests.per.connection 在阻止之前,客户端将在单个连接上发送的未确认请求的最大数量。 请注意,如果此设置设置为大于1且发送失败,则会由于重试(即启用重试)而导致消息重新排序的风险。 int 5 medium
metadata.max.age.ms 以毫秒为单位的时间段,强制更新元数据,即使我们没有看到任何分区leader变化。 long 300000 [0,…] medium
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” low
metrics.num.samples 维护的样本数量用于计算度量。 int 2 low
metrics.recording.level 监控的最高的记录级别。 string INFO [INFO, DEBUG] low
metrics.sample.window.ms 时间窗口计算度量标准。。 long 30000 [0,..] low
quota.window.num 保留在内存中的客户端引用数目。 int 11 [1,..] low
quota.window.size.seconds 客户配引用在内存中的时间跨度 int 1 [1,..] low
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
transaction.abort.timed.out.transaction.cleanup.interval.ms 回滚已超时的交易的时间间隔 int 60000 [1,…] low
transaction.abort.timed.out.transaction.cleanup.interval.ms 由于transactional.id.expiration.ms传递而导致已过期的事务的间隔 int 3600000 [1,…] low
zookeeper.sync.time.ms zk的从节点可以落后主节点多远 int 2000 low

Topic 配置

Topic-level configuration 配置与主题相关的配置同时具有服务器默认值以及可选的每个主题的覆盖。 如果没有给出每个主题的配置,则使用服务器默认值。 可以通过提供一个或多个-config选项来在主题创建时设置覆盖。 此示例使用自定义消息大小和刷新率创建一个名为my-topic的主题:

1
2
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

重载后的topic相关属性可以使用新的命令再次修改,下面这个例子是重载后修改消息的大小

1
2
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000

检查重载topic可以使用一下命令:

1
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

移除重载可以:

1
bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes

以下是主题级配置。 该属性的服务器的默认配置在“服务器默认属性”标题下给出。 给定的服务器默认配置值仅适用于主题,如果它没有显式主题配置覆盖。

NAME DESCRIPTION TYPE DEFAULT VALID VALUES SERVER DEFAULT PROPERTY IMPORTANCE
cleanup.policy “删除”或“合并”的字符串。 此字符串指定在旧日志段上使用的保留策略。 默认策略(“delete”)将在达到保留时间或大小限制时将废弃旧段。 “合并”设置将启用主题上的日志压缩。 list delete [compact, delete] log.cleanup.policy medium
compression.type 指定主体的最终压缩类型。 此配置接受标准压缩编解码器(’gzip’,’snappy’,lz4)。 它还接受‘uncompressed’相当于没有压缩的未压缩 ,这意味着保留由生产者设置的原始压缩编解码器。 string producer [uncompressed, snappy, lz4, gzip, producer] compression.type medium
delete.retention.ms 保留删除记录压缩主题的标记的时间量。 此设置还给出消费者如果从偏移量0开始读取的时间,以确保它们获得最终阶段的有效快照(否则,在完成扫描之前可能会收集删除的标记)。 long 86400000 [0,…] log.cleaner.delete.retention.ms medium
file.delete.delay.ms 等待文件从文件系统删除的时间 long 60000 [0,…] log.segment.delete.delay.ms medium
flush.messages 此设置允许指定我们强制fsync写入日志的数据的间隔。 例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。 一般来说,我们建议您不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能更高效。 可以在每个主题的基础上覆盖此设置(请参阅每个主题的配置部分)。 long 9223372036854775807 [0,…] log.flush.interval.messages medium
flush.ms 此设置允许指定我们强制fsync写入日志的数据的时间间隔。 例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。 一般来说,我们建议您不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能更高效。 long 9223372036854775807 [0,…] log.flush.interval.ms medium
follower.replication.throttled.replicas 日志复制应在向后侧节流的副本列表。该清单应描述一组副本的形式[的partitionid]:[BrokerId],[partitionid的]:[BrokerId]:……或可替代的通配符“*”可以被用来扼杀所有副本这个话题。 list “” [partitionId],[brokerId]:[partitionId],[brokerId]:… follower.replication.throttled.replicas medium
index.interval.bytes 此设置控制 Kafka 向其偏移索引添加索引条目的频率。 默认设置确保我们大致每4096个字节索引消息。更多的索引允许读取更接近日志中的确切位置,但使索引更大。 你可能不需要改变这个。 int 4096 [0,..] log.index.interval.bytes medium
leader.replication.throttled.replicas 日志复制应在引导端进行限制的副本列表。该清单应描述一组副本的形式[的partitionid]:[BrokerId],[partitionid的]:[BrokerId]:……或可替代的通配符“*”可以被用来扼杀所有副本这个话题。 list “” [partitionId],[brokerId]:[partitionId],[brokerId]:… leader.replication.throttled.replicas medium
max.message.bytes Kafka 允许的批处理量大小。 如果这种情况有所增加,并且消费者的版本号大于0.10.2,消费者的提取大小也必须增加,以便他们可以获取最大批次。在最新的消息格式版本中,记录总是分批进行分组以提高效率。在以前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个记录。 int 1000012 [0,…] message.max.bytes medium
message.format.version 指定用于将消息附加到broker的日志的消息格式版本。 该值应该是一个有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0,更多细节检查ApiVersion。 通过设置特定的消息格式版本,用户正在证明磁盘上的所有现有消息都小于或等于指定版本。 不正确地设置此值将导致消费者使用旧版本,因为他们将接收到不了解的格式的消息。 string 0.11.0-IV2 log.message.format.version medium
message.timestamp.difference.max.ms broker接收到消息的时间戳与消息中指定的时间戳之间允许的最大差异。 如果message.timestamp.type = CreateTime,如果时间戳的差异超过此阈值,则消息将被拒绝。 如果message.timestamp.type = LogAppendTime,则忽略此配置。 long 9223372036854775807 [0,…] log.message.timestamp.difference.max.ms medium
message.timestamp.type 定义消息中的时间戳是消息创建时间还是日志追加时间。 该值应该是“CreateTime”或“LogAppendTime” string CreateTime log.message.timestamp.type medium
min.cleanable.dirty.ratio 此配置控制日志压缩程序将尝试清除日志的频率(假定启用日志压缩)。 默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。 更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 doublie 0.5 [0,…,1] log.cleaner.min.cleanable.ratio medium
min.compaction.lag.ms 消息在日志中保持不受压缩的最短时间。 仅适用于正在压实的日志。 long 0 [0,…,1] log.cleaner.min.compaction.lag.ms medium
min.insync.replicas 当生产者将acks设置为“all”(或“-1”)时,此配置指定必须确认写入的副本的最小数量,以使其被认为是成功的。如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。一起使用时,min.insync.replicas和acks允许您强化更大的耐用性保证。典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks“all”生成。 如果大多数副本没有收到写入,这将确保生产者引发异常。 int 1 [1,…] min.insync.replicas medium
preallocate 如果我们应该在创建新的日志段时在磁盘上预分配该文件,那么为真。。 boolean false log.preallocate medium
retention.bytes 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧日志段以释放空间。 默认情况下,没有大小限制仅限于时间限制。 long -1 log.retention.bytes medium
retention.ms 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。 这代表消费者必须阅读数据的时间长短。 long 604800000 log.retention.ms medium
segment.bytes 此配置控制日志的段文件大小。 一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 int 1073741824 [14,…] log.segment.bytes medium
segment.index.bytes 此配置控制将偏移映射到文件位置的索引的大小。 我们预先分配此索引文件,并在日志滚动后收缩它。 通常您不需要更改此设置。 int 10485760 [0,…] log.index.size.max.bytes medium
segment.jitter.ms 任务segement回滚抖动时间避免不必要的回滚。 long 0 [0,…] log.roll.jitter.ms medium
segment.ms 此配置控制 Kafka 将强制日志滚动的时间段,即使段文件未满,以确保保留可以删除或压缩旧数据。 long 604800000 [0,…] log.roll.ms medium
unclean.leader.election.enable 指示是否将不在ISR集中的副本作为最后的手段选举为领导者,这样做可能会导致数据丢失。 boolean false unclean.leader.election.enable medium

Producer 配置

以下是java producer 客户端配置:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
bootstrap.servers 用于建立与 Kafka 集群的初始连接的ip/port对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅用于发现完整服务器集的初始主机。格式如下host1:port1,host2:port2,…:由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整的服务器集(尽管如此,您可能需要多台服务器)。 list high
key.serializer 序列化所实现的接口:serializer class high
key.serializer key序列化所实现的接口:serializer class high
value.serializer value序列化所实现的接口:serializer class high
acks 在请求完成之前,生产者要求leader收到的确认数量。 这可以控制发送记录的可靠性。 允许以下设置:ack=0:如果设置为零,则生产者根本不会等待服务器的任何确认。 该记录将立即添加到套接字缓冲区并视为已发送。 在这种情况下,不能保证服务器已经收到记录,重试配置将不会生效(因为客户端通常不会知道有任何故障)。每个记录返回的偏移量将始终设置为-1。 ack=1:这将意味着领导者会将记录写入其本地日志,但将不会等待所有follower的全面确认。在这种情况下,如果领导者在确认记录之后立即失败,但在追随者未完成复制之前,该记录将丢失。 ack=all:这意味着领导者将等待全部follower完成复制。 只要至少有一个同步处理副本仍然存在,这将保证记录不会丢失。 这是最可靠的保证。 可以acks = -1设置。 string 1 [all, -1, 0, 1] high
buffer.memory 生产者可以用来缓冲等待发送到服务器的记录的总字节数。 如果记录发送速度比可以传送到服务器的速度快,那么制作者将阻止max.block.ms,之后它会抛出异常。此设置应大致对应于生产者将使用的总内存,但不是硬绑定,因为生产者使用的所有内存不用于缓冲。 一些额外的内存将用于压缩(如果启用压缩)以及维护飞行中请求。 long 33554432 [0,…] high
compression.type producer生成的所有数据的压缩类型。 默认值为none(即无压缩)。 有效值为none,gzip,snappy或lz4。 压缩是完整的数据,所以压缩类型也会影响压缩比(越多的批处理压缩越好)。 string none high
compression.type producer生成的所有数据的压缩类型。 默认值为none(即无压缩)。 有效值为none,gzip,snappy或lz4。 压缩是完整的数据,所以压缩类型也会影响压缩比(越多的批处理压缩越好)。 string none high
retries 设置大于零的值将导致客户端重新发送其发送失败并记录。 请注意,此重试与收到错误时客户端重新发送记录无异。 允许重试而不将max.in.flight.requests.per.connection设置为1将潜在地更改记录的排序,因为如果两个批次发送到单个分区,并且第一个失败并重试,但第二个成功,则记录 在第二批可能会出现第一。 int 0 [0,…,2147483647] high
ssl.key.password 密钥存储文件中私钥的密码。 这对于客户端是可选的。 password null high
ssl.keystore.location 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向认证。 string null high
ssl.keystore.password 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 password null high
ssl.truststore.location 信任存储文件的位置。 string null high
ssl.truststore.password 信任存储文件的密码。 如果未设置密码,对信任库的访问仍然可用,但是完整性检查被禁用。 password null high
batch.size 每当多个记录被发送到同一个分区时,生产者将尝试将消息批处理。 这有助于客户端和服务器上的性能。 此配置控制默认批量大小(以字节为单位)。不会尝试批量记录大于此大小的记录。发送到brokers的请求将包含多个批次,每个分区有一个,可以发送数据。小批量批量将使批量不太常见,并可能降低生产量(零批量大小将完全禁用批处理)。 非常大的批量大小可能会浪费更多的内存,因为我们将始终分配指定批量大小的缓冲区以预期额外的记录。 int 16384 [0,…] high
client.id 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 string “” high
connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接。 long 540000 high
linger.ms 生产者将在请求传输之间到达的任何记录组合成单个批量请求。通常情况下,只有在记录到达速度比发送速度快的情况下才会出现这种情况。然而,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。该设置通过添加少量人造延迟来实现,即,不是立即发出生产者将等待给定延迟的记录,以允许发送其他记录,以便发送可以批处理在一起。这可以被认为类似于Nagle在TCP中的算法。此设置给出批处理延迟的上限:一旦我们获得了一个分区的batch.size值的记录,它将立即被发送,而不管这个设置如何,但是如果我们比这个分区累积了很多字节,我们将’停留在指定的时间等待更多的记录显示。此设置默认为0(即无延迟)。例如,设置linger.ms = 5将具有减少发送的请求数量的效果,但对于在负载的缺失中发送的记录,将加起来5ms的延迟。 long 0 [0,…] high
max.block.ms 配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()将阻塞多长时间。这些方法可能被阻止,因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。 long 60000 [0,…] medium
max.request.size 请求的最大大小(以字节为单位)。 此设置将限制生产者在单个请求中发送的记录批次数,以避免发送巨大的请求。 这也是最大记录批量大小的上限。 请注意,服务器拥有自己的字节大小,可能与此不同。。 int 1048576 [0,…] medium
partitioner.class partitioner继承实现partitioner 接口 class org.apache.kafka.clients.producer.internals.DefaultPartitioner [0,…] medium
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [-1,…] medium
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms(代理配置),以减少由于不必要的生产者重试引起的消息重复的可能性。 int 3000 [0,…] medium
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ password null medium
sasl.kerberos.service.name Kafka 运行的Kerberos主体名称。 这可以在 Kafka 的JAAS配置或 Kafka 的配置中定义。 string null medium
sasl.mechanism SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 string GSSAPI medium
security.protocol 与brokers通信的协议,合法值: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. string PLAINTEXT medium
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 medium
ssl.enabled.protocols SSL连接的协议列表 list TLSv1.2,TLSv1.1,TLSv1 medium
ssl.keystore.type 密钥存储文件的文件格式。 这对于客户端是可选的。 string jks medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string tls medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 可信任存储文件的格式。 string jks medium
enable.idempotence 当设置为“true”时,生产者将确保每个消息的正好一个副本写入流中。 如果“false”,由于broker程序故障等导致的生产者重试,可能会在流中写入重试的消息的重复。 默认设置为“false”。 请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。 另外acks必须设置为“all”。 如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 boolean false medium
interceptor.classes 用作拦截器的类的列表。 使用ProducerInterceptor接口,您可以在生成器发布到 Kafka 群集之前拦截(并且可能会变更)生产者收到的记录。 默认情况下,没有拦截器。 list null medium
max.in.flight.requests.per.connection 在阻止之前,客户端将在单个连接上发送的未确认请求的最大数量。 请注意,如果此设置设置为大于1且发送失败,则会由于重试(即启用重试)而导致消息重新排序的风险。 int 5 medium
metadata.max.age.ms 以毫秒为单位的时间段,我们强制更新元数据,即使我们没有看到任何分区领导变化,主动发现任何新的经纪人或分区。 long 30000 medium
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” medium
metrics.num.samples 维护的样本数量用于计算度量。 int 2 medium
metrics.recording.level 指标的最高记录级别。 string info medium
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 medium
reconnect.backoff.max.ms 重新连接到重复无法连接的broker程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long 1000 medium
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 long 50 medium
sasl.kerberos.kinit.cmd Kerberos kinit命令路径。 string /usr/bin/kinit medium
sasl.kerberos.min.time.before.relogin 刷新尝试之间的登录线程睡眠时间。 long 60000 low
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新到窗口到期时间的指定窗口因子为止,此时将尝试续订 double 0.8 low
ssl.cipher.suites 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null low
ssl.keymanager.algorithm 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 string PKIX low
transaction.timeout.ms 事务协调器在主动中止正在进行的事务之前,等待事务状态更新的最大时间(以ms为单位)。如果此值大于代理中的max.transaction.timeout.ms设置,则请求将 失败,出现“InvalidTransactionTimeout”错误。 int 60000 low
transactional.id 用于事务传递的TransactionalId。 这使得可以跨越多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。 string null non-empty string low

Consumer 配置

以下是新消费者的配置

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
bootstrap.servers 用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表用于发现完整服务器集的初始主机。 list high
key.deserializer Deserializer类用于key实现Deserializer接口。 class high
value.deserializer 解串器类实现value的Deserializer接口。 class high
fetch.min.bytes 服务器应该为获取请求返回的最小数据量。 如果没有足够的数据可用,请求将等待这么多数据积累,然后再回答该请求。 1字节的默认设置意味着只要单字节的数据可用,或者提取请求超时等待数据到达,则应答提取请求。 将其设置为大于1将导致服务器等待更大量的数据累积,这可能会以一些额外的延迟为代价提高服务器吞吐量。。 int 1 [0,…] high
group.id 一个唯一的字符串,用于标识此消费者所属的消费者组。 如果消费者通过使用订阅(主题)或基于 Kafka 的偏移量管理策略来使用组管理功能,则此属性是必需的。 string “” high
heartbeat.interval.ms 使用 Kafka group 管理,心跳与消费者协调员之间的预期时间。 心跳用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 它可以调整得更低以控制正常重新平衡的预期时间。 int 3000 high
max.partition.fetch.bytes 服务器将返回的每个分区的最大数据量。 记录由消费者批量获取。 如果fetch的第一个非空分区中的第一个记录批次大于此限制,则仍将返回批处理,以确保消费者可以取得进展。接受的最大记录批量大小通过message.max.bytes(broker config)或max.message.bytes(topic config)定义。 请参阅fetch.max.bytes以限制消费者请求大小。 int 1048576 [0,…] high
ssl.key.password 密钥存储文件中私钥的密码。 这对于客户端是可选的。 password null high
ssl.keystore.location 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向认证。 String null high
ssl.keystore.password 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 password null high
ssl.truststore.location 信任存储文件的位置。 string null high
auto.offset.reset 如果 heartbeat.interval.ms 没有初始偏移量,或者当前偏移量在服务器上不再存在,采取何种策略:earliest:每次从初始位置取。latest:每次从最新位置取。none:从消费的offset值开始取,不存在,直接抛出异常。anything esle :直接抛出异常 string null high
connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接。 long 540000 high
enable.auto.commit 如果真的,消费者的偏移将在后台定期提交 boolean true high
exclude.internal.topics 来自内部主题的记录(如偏移量)是否应该暴露给消费者。 如果设置为true,则从内部topic接收记录的唯一方法是订阅它。 boolean true high
fetch.max.bytes 服务器为获取请求返回的最大数据量。 记录由消费者批量获取,如果第一个非空分区中的第一个记录批次大于此值,则记录批次仍将被返回,以确保消费者能够取得进展。。 int 52428800 [0,…] high
isolation.level 控制如何读取事务写入的消息。 如果设置为read_committed只返回事务消息,如果设置为read_uncommitted,consumer.poll() 返回所有消息,甚至被无法确保的事务性消息。非事务消息将以任意一种方式无条件返回。消息将始终以偏移顺序返回。 consumer.poll()将只返回消息,直到最后一个稳定的偏移量(LSO),它是小于第一个打开事务的偏移量的一个。 特别是在属于正在进行的消费的消息之后出现的任何消息将被阻塞,直到相关交易完成。 因此,在事务性处理中,read_committed的消费者将无法读取高水印。 string read_uncommitted [read_committed, read_uncommitted] high
max.poll.interval.ms 在使用消费者组管理时调用poll()的最大延迟。 这会在获取更多记录之前将消费者空闲时间的上限置于上限之内。 如果在此超时到期之前未调用poll(),则消费者被认为失败,并且该组将重新平衡,以便将分区重新分配给另一个成员。 int 300000 [0,…] high
max.poll.records 在单次调用poll()中返回的最大消息数。 int 500 [1,…] high
partition.assignment.strategy 分组分配策略的类名称,当使用组管理时,将会把消费者按照组的对应关系根据定义的分配策略分配 list class org.apache.kafka.clients.consumer.RangeAssignor high
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 65536 [-1,…] medium
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 int 305000 [0,…] medium
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ password null medium
sasl.kerberos.service.name Kafka 运行的Kerberos主体名称。 这可以在 Kafka 的JAAS配置或 Kafka 的配置中定义。 string null medium
sasl.mechanism SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 string GSSAPI medium
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1 medium
security.protocol 用于与brokers沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL string PLAINTEXT medium
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 medium
ssl.keystore.type 密钥存储文件的文件格式。 这对于客户端是可选的。 string jks medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任存储文件的文件格式。 string jks medium
auto.commit.interval.ms 如果enable.auto.commit设置为true,则以毫秒为单位的消费者偏移量自动提交给 Kafka 的频率。 int 5000 [0,…] medium
check.crcs 自动检查CRC32记录的消耗。 这样可以确保邮件不发生在线或磁盘损坏。 这个检查增加了一些开销,因此在寻求极端性能的情况下可能会被禁用。 boolean true low
client.id 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 string “” low
fetch.max.wait.ms 如果没有足够的数据来满足fetch.min.bytes给出的要求,服务器将在接收到提取请求之前阻止的最长时间。 int 500 [0,…] low
interceptor.classes 用作拦截器的类的列表。 实现ConsumerInterceptor接口允许您拦截(也可能是变异)消费者收到的记录。 默认情况下,没有拦截器 list null low
metadata.max.age.ms 以毫秒为单位的时间段,强制更新元数据,即使我们没有看到任何分区领导变化,主动发现任何新的brokers或分区。 list null low
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” low
metrics.num.samples 维护的样本数量用于计算度量。 int 2 [1,…] low
metrics.recording.level 指标的最高记录级别。。 string info [INFO, DEBUG] low
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,…] low
reconnect.backoff.max.ms 重新连接到反复无法连接的brokers程序时,以毫秒为单位的最大时间等待时间。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long 1000 [0,…] low
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对brokers的所有连接尝试。 long 50 [0,…] low
retry.backoff.ms 尝试重试给定主题分区的失败请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 long 100 [0,…] low
sasl.kerberos.kinit.cmd kerberos kinit 命令的路径 string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin 刷新尝试之间的登录线程睡眠时间。 long 60000 low
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新到票证时间到期的指定窗口因子为止,此时将尝试续订阅。 double 0.8 low
ssl.cipher.suites 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null low
ssl.keymanager.algorithm 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 string PKIX low

低(旧)版本 Consumer 配置

必须配置的低版本 Consumer 配置如下:

  • group.id
  • zookeeper.connect
PROPERTY DEFAULT DESCRIPTION
group.id 一个唯一标识消费者所属消费者,类型是string。 通过设置相同的组ID,多个进程表示它们都是同一消费者组的一个。
zookeeper.connect 指定ZooKeeper连接字符串,格式为hostname:port,其中host和port是ZooKeeper服务器的主机和端口。 为了允许在ZooKeeper机器关闭时通过其他ZooKeeper节点进行连接,您还可以以hostname1:host1,hostname2:port2,hostname3:port3的形式指定多个主机。该服务器还可以具有ZooKeeper chroot路径,作为其ZooKeeper连接字符串的一部分,将其数据放置在全局ZooKeeper命名空间中的某个路径下。 如果是这样,消费者应该在其连接字符串中使用相同的chroot路径。 例如,要给出/ chroot / path的chroot路径,您将给出连接字符串为hostname1:port1,hostname2:port2,hostname3:port3 / chroot / path。
consumer.id null 未设置,自动增加
socket.timeout.ms 30*1000 网络请求超时时间,真实的网络超时时间max.fetch.wait + socket.timeout.ms.
socket.receive.buffer.bytes 64*1024 网络请求接受的网络字节数
fetch.message.max.bytes 1024*1024 每个topic-partition提取请求中尝试获取的消息的字节数。 这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 提取请求大小必须至少与服务器允许的最大消息大小一样大,否则生产者可能发送大于消费者可以获取的消息。
num.consumer.fetchers 1 消费者获取消息的线程数
auto.commit.enable true 如果为true,定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用记录偏移量作为新消费者开始的位置。
auto.commit.interval.ms 60*1000 提交给zookeeper的消费者的offsets的频率,时间为ms。
queued.max.message.chunks 2 消息块的最大被缓冲消息数目。 每个块可以达到fetch.message.max.bytes。
rebalance.max.retries 4 当新的消费者加入消费者组时,消费者集合尝试“重新平衡”负载,为每个消费者分配分区。 如果一组消费者在这项任务发生时发生变化,则重新平衡将失败并重试。 此设置控制放弃之前的最大尝试次数。
fetch.min.bytes 1 服务器应该为获取请求返回的最小数据量。 如果没有足够的数据可用,请求将等待这么多数据积累,然后再回答该请求。
fetch.wait.max.ms 100 如果没有足够的数据来立即满足fetch.min.bytes,服务器将在应答抓取请求之前阻止的最长时间
rebalance.backoff.ms 2000 重新平衡期间重试之间的退后时间。 如果未明确设置,则使用zookeeper.sync.time.ms中的值。
refresh.leader.backoff.ms 200 退出等待时间,然后再尝试确定刚刚失去领导者的分区新的领导者。
auto.offset.reset largest 当 zookeeper中offset没有初始化或者超出值的范围应该怎么做:smallest:自动设置为最小的offset;largest:自动设置为最新的offset;*anything:在消费者抛出异常。
consumer.timeout.ms -1 如果在指定的时间间隔后没有消息可用,则向用户发出超时异常
exclude.internal.topics true 来自内部主题的消息(如偏移量)是否应该暴露给消费者。
client.id group id value 客户端ID是每个请求中发送的用户指定的字符串,用于帮助跟踪调用。 它应该逻辑地标识发出请求的应用程序。
zookeeper.session.timeout.ms 6000 ZooKeeper会话超时。 如果消费者在这段时间内没有对ZooKeeper心跳,那么它被认为是死亡的,并且会发生重新平衡。
zookeeper.connection.timeout.ms 6000 与zookeeper建立连接时客户端等待的最长时间。
zookeeper.sync.time.ms 2000 zk的follower可以落后zk leader多长时间
offsets.storage zookeeper offsets应该存储在哪里
offsets.channel.backoff.ms 1000 重新连接偏移信道或重试失败的偏移提取/提交请求时的退避周期。
offsets.channel.socket.timeout.ms 10000 读取偏移提取/提交请求的响应时的套接字超时。 此超时也用于用于查询偏移量管理器的ConsumerMetadata请求。
offsets.commit.max.retries 5 失败时重试偏移提交多次。 此重试计数仅适用于停机期间的偏移提交。 它不适用于源自自动提交线程的提交。 它也不适用于在提交偏移量之前查询偏移协调器的尝试。 即如果消费者元数据请求由于任何原因而失败,则将重试它,并且重试不计入该限制。
dual.commit.enabled true 如果您使用“kafka”作为offsets.storage,则可以向ZooKeeper(除Kafka之外)进行双重提交偏移。 在从基于zookeeper的偏移存储迁移到基于kafka的偏移量存储的过程中需要这样做。 对于任何给定的消费者组,在该组中的所有实例已迁移到提交到代理(而不是直接到ZooKeeper)的新版本之后,可以将其关闭。
partition.assignment.strategy range 在“range”或“roundrobin”策略之间选择将分区分配给消费者流。循环分区分配器布置所有可用的分区和所有可用的消费者线程。然后,继续从分区到消费者线程进行循环任务。如果所有消费者实例的订阅是相同的,则分区将被均匀分布。 (即,分区所有权计数将在所有消费者线程之间的差异仅在一个增量之内。)循环分配仅在以下情况下被允许:(a)每个主题在消费者实例中具有相同数量的流(b)集合订阅的主题对于组内的每个消费者实例都是相同的。范围分区以每个主题为基础。对于每个主题,我们按数字顺序排列可用的分区,并以字典顺序排列消费者线程。然后,我们将分区数除以消费者流(线程)的总数来确定分配给每个消费者的分区数。如果不均匀分割,那么前几个消费者将会有一个额外的分区。

Kafka Connect 配置

Kafka Connect 配置如下:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
config.storage.topic 存储连接器配置的Kafka主题的名称 string high
group.id 一个唯一的字符串,用于标识此工作所属的Connect集群组。 string high
key.converter 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这可以控制写入或从卡夫卡读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 class high
offset.storage.topic 存储连接器偏移的Kafka主题的名称 string high
status.storage.topic 存储连接器和任务状态的Kafka主题的名称 string high
value.converter 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 class high
internal.key.converter 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这可以控制写入或从卡夫卡读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置控制框架使用的内部记账数据的格式,如配置和偏移量,因此用户通常可以使用任何正常的转换器实现。 class high
internal.value.converter 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置控制框架使用的内部记账数据的格式,如配置和偏移量,因此用户通常可以使用任何正常的转换器实现。 class high
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅影响用于发现完整服务器集的初始主机。 该列表应该以host1:port1,host2:port2,….的格式。由于这些服务器仅用于初始连接才能发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集 的服务器(尽管如此,您可能需要多个服务器)。 list localhost:9092 high
heartbeat.interval.ms 使用Kafka集团管理设施时心跳与团队协调员之间的预期时间。 心跳用于确保工作人员的会话保持活动,并促进新成员加入或离开组时的重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 它可以调整得更低以控制正常重新平衡的预期时间。 int 3000 high
rebalance.timeout.ms 一旦重新平衡开始,每个worker最多可以加入该组。 这基本上是所有任务刷新任何未决数据和提交偏移所需的时间量的限制。 如果超时超时,那么worker将从组中删除,这将导致偏移提交失败。 int 60000 high
session.timeout.ms 用于检测worker故障的超时时间。 worker定期发送心跳以表明其对经纪人的活力。 如果在此会话超时到期之前,代理没有收到心跳线,那么代理将从组中删除该worker并启动重新平衡。 请注意,该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms中的代理配置中配置的允许范围内。 int 10000 high
ssl.key.password 密钥存储文件中私钥的密码。 这对于客户端是可选的。 password null high
ssl.keystore.location 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向认证。 string null high
ssl.keystore.password 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 password null high
ssl.truststore.location 信任存储文件的位置。 string null high
ssl.truststore.password 信任存储文件的密码。 如果未设置密码,对信任库的访问仍然可用,但是完整性检查被禁用。 password null high
connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接。 long 540000 high
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [0,…] high
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 int 40000 [0,…] high
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ password null high
sasl.kerberos.service.name Kafka运行的Kerberos主体名称。 这可以在Kafka的JAAS配置或Kafka的配置中定义。 string null high
sasl.mechanism SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 string GSSAPI high
security.protocol 用于与brokers沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT high
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 [0,…] high
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1 high
ssl.keystore.type 密钥存储文件的文件格式。 这对于客户端是可选的。 string JKS medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任存储文件的文件格式。 string JKS medium
worker.sync.timeout.ms 当worker与其他worker不同步并需要重新同步配置时,请等待这段时间才能放弃,离开组,然后等待退伍期才能重新加入。。 int 3000 medium
worker.unsync.backoff.ms 当worker与其他worker不同步,并且无法赶上worker.sync.timeout.ms,请在重新加入之前离开Connect集群很长时间。 int 300000 medium
access.control.allow.methods 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。 Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。 string “” medium
access.control.allow.origin 将Access-Control-Allow-Origin标头设置为REST API请求的值。要启用跨源访问,请将其设置为应允许访问API的应用程序的域,或者“*”允许从任何 域。 默认值仅允许从REST API的域访问。 string “” medium
client.id 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 string “” medium
config.storage.replication.factor 创建配置存储主题时使用的复制因子 short 3 [1,…] medium
metadata.max.age.ms 以毫秒为单位的时间段,我们强制更新元数据,即使我们没有看到任何分区领导变化,主动发现任何新的经纪人或分区。 long 300000 [0,…] medium
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” medium
metrics.num.samples 维护的样本数量用于计算度量。 int 2 [1,…] medium
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,…] medium
offset.flush.interval.ms 尝试提交任务偏移量的间隔。 long 60000 medium
offset.flush.timeout.ms 在取消进程并恢复要在将来尝试中提交的偏移量数据之前,等待记录刷新并分配要提交到偏移存储的偏移量数据的最大毫秒数。 long 5000 medium
offset.storage.partitions 创建偏移存储主题时使用的分区数 int 25 [1,…] medium
offset.storage.replication.factor 创建偏移存储主题时使用的复制因子 short 3 [1,…] medium
plugin.path 包含插件(连接器,转换器,转换)的逗号分隔的路径列表。 该列表应包含顶级目录,其中包括以下任何组合:a)立即包含jars与插件及其依赖关系的目录b)具有插件及其依赖项的uber-jars c)立即包含插件类的包目录结构的目录及其 依赖关系注意:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path = / usr / local / share / java,/ usr / local / share / kafka / plugins,/ opt / connectors list null medium
reconnect.backoff.max.ms 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long 1000 [0,…] medium
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 long 50 [0,…] medium
rest.advertised.host.name 如果这样设置,这将被发送给其他工作人员连接到的主机名。 string null medium
rest.advertised.port 如果这样设置,这是将被发送给其他worker连接的端口。 int null medium
rest.host.name REST API的主机名。 如果这样设置,它将只绑定到这个接口。 string null medium
rest.port 端口用于REST API侦听。 int 8083 medium
retry.backoff.ms 尝试重试给定主题分区的失败请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 long 100 [0,…] medium
sasl.kerberos.kinit.cmd Kerberos kinit命令路径。 string /usr/bin/kinit medium
sasl.kerberos.min.time.before.relogin 刷新尝试之间的登录线程睡眠时间。 long 60000 medium
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 medium
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新到机票到期时间的指定窗口因子为止,此时将尝试续订ticket double 0.8 medium
ssl.cipher.suites 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 list null medium
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null medium
ssl.keymanager.algorithm 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509 medium
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null medium
ssl.trustmanager.algorithm 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 string PKIX medium
status.storage.partitions 创建状态存储主题时使用的分区数 int 5 [1,…] medium
status.storage.replication.factor 创建状态存储主题时使用的复制因子 short 3 [1,…] medium
task.shutdown.graceful.timeout.ms 等待任务正常关机的时间。 这是总时间,而不是每个任务。 所有任务已关闭触发,然后依次等待。 long 5000 medium

Kafka Streams 配置

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
application.id 流处理应用程序的标识符。 在Kafka群集中必须是唯一的。 用作1)默认的client-id前缀,2)用于成员资格管理的group-id,3)changelog主题前缀。 string high
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅影响用于发现完整服务器集的初始主机。 该列表应该以host1:port1,host2:port2,….的格式。由于这些服务器仅用于初始连接才能发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集 的服务器(尽管如此,您可能需要多个服务器)。 list high
replication.factor 流处理应用程序创建的更改日志主题和重新分区主题的复制因素。 int 1 high
state.dir 状态存储的位置 string /tmp/kafka-streams high
cache.max.bytes.buffering 用于所有线程缓冲的最大内存字节数 long 10485760 [0,…] medium
client.id 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 string “” medium
default.key.serde 实现接口serde的key的默认的序列化。 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde medium
default.timestamp.extractor 实现TimestampExtractor接口的默认时间戳提取器类。。 class org.apache.kafka.streams.processor.FailOnInvalidTimestamp medium
default.value.serde 用于实现Serde接口的值的默认serializer / deserializer类。 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde medium
num.standby.replicas 每个任务的备用副本数。 int 0 medium
num.stream.threads 执行流处理的线程数。 int 1 medium
processing.guarantee 处理程序使用保证。 可能的值为at_least_once(默认)和exact_once。 可能的值为at_least_once(默认)和exact_once。 string at_least_once [at_least_once, exactly_once] medium
security.protocol 用于与brokers沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT medium
application.server 主机:端口对指向嵌入式用户定义的端点,可用于发现单个KafkaStreams应用程序中状态存储的位置 string “” medium
buffered.records.per.partition 每个分区缓存的最大记录数。 int 1000 medium
commit.interval.ms 用于保存处理器位置的频率。 (注意,如果’processing.guarantee’设置为’exact_once’,默认值为100,否则默认值为30000。 long 30000 medium
connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接。 long 540000 medium
key.serde Serializer / deserializer类用于实现Serde接口的密钥。 此配置已被弃用,请改用default.key.serde class null medium
metadata.max.age.ms 以毫秒为单位的时间段,强制更新元数据,即使没有看到任何分区领导变化或者发现任何新的broker或分区。 long 300000 [0,…] medium
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” low
metrics.num.samples 维护的样本数量用于计算度量。 int 2 [1,…] low
metrics.recording.level 指标的最高记录级别。 string INFO [1,…] [INFO, DEBUG]
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 low
partition.grouper 实现PartitionGrouper接口的Partition grouper类。 class org.apache.kafka.streams.processor.DefaultPartitionGrouper low
poll.ms 以毫秒为单位等待输入的时间量。 long 100 low
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 32768 [0,…] low
reconnect.backoff.max.ms 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long 1000 [0,…] low
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 long 50 [0,…] low
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 int 40000 [0,…] low
retry.backoff.ms 尝试重试给定主题分区的失败请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 long 100 [0,…] low
rocksdb.config.setter 一个Rocks DB配置setter类或类名实现了RocksDBConfigSetter接口 class null low
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 [0,…] low
state.cleanup.delay.ms 在分区迁移之前删除状态之前等待的时间(毫秒)的时间量。 至少state.cleanup.delay.ms尚未修改的状态目录将被删除 long 600000 low
timestamp.extractor Timestamp提取器类,实现Timestamp Extractor接口。 此配置已弃用,请改用default.timestamp.extractor class null low
value.serde Serializer / deserializer类,用于实现Serde接口的值。 此配置已被弃用,请改用default.value.serde class null low
windowstore.changelog.additional.retention.ms 添加到Windows系统维护Ms以确保数据未被过早地从日志中删除。 允许时钟漂移 默认为1天 long 86400000 low
zookeeper.connect Zookeeper连接字符串用于Kafka主题管理。 此配置已被弃用,将被忽略,因为Streams API不再使用Zookeeper。 string low

AdminClient 配置

以下是 Kafka Admin 客户端库的配置:

NAME DESCRIPTION TYPE DEFAULT VALID VALUES IMPORTANCE
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅影响用于发现完整服务器集的初始主机。 该列表应该以host1:port1,host2:port2,….的格式。由于这些服务器仅用于初始连接才能发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集 的服务器(尽管如此,您可能需要多个服务器)。 list high
ssl.key.password 密钥存储文件中私钥的密码。 这对于客户端是可选的 password null high
ssl.keystore.location 密钥存储文件中私钥的密码。 这对于客户端是可选的 string null high
ssl.keystore.password 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 password null high
ssl.truststore.location 信任存储文件的位置。 string null high
ssl.truststore.password 信任存储文件的密码。 如果未设置密码,对信任库的访问仍然可用,但是完整性检查被禁用。 password null high
client.id 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 string “” high
connections.max.idle.ms 在此配置指定的毫秒数之后关闭空闲连接。 long 300000 high
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 int 65536 [-1,…] medium
request.timeout.ms 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 int 120000 [0,…] medium
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ int 120000 [0,…] medium
sasl.mechanism SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 string GSSAPI medium
security.protocol 用于与broker沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 string PLAINTEXT medium
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 int 131072 [-1,…] medium
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1 medium
ssl.keystore.type 密钥存储文件的文件格式。 这对于客户端是可选的。 string JKS medium
ssl.protocol 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 string TLS medium
ssl.provider 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 string null medium
ssl.truststore.type 信任存储文件的文件格式。 string JKS medium
metadata.max.age.ms 以毫秒为单位的时间段,强制更新元数据,即使没有看到任何分区领导变化,主动发现任何新的broekr或分区。 long 300000 [0,…] low
metric.reporters 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 list “” low
metrics.num.samples 维护的样本数量用于计算度量。 int 2 [1,…] low
metrics.recording.level 指标的最高记录级别。 string INFO [INFO, DEBUG] low
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,…] low
reconnect.backoff.max.ms 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 long 1000 [0,…] low
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 long 50 [0,…] low
retries 在失败之前重试呼叫的最大次数。 int 5 [0,…] low
retry.backoff.ms 尝试重试失败的请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 long 100 [0,…] low
sasl.kerberos.kinit.cmd Kerberos kinit命令路径。 string /usr/bin/kinit low
sasl.kerberos.min.time.before.relogin 刷新尝试之间的登录线程睡眠时间。 long 60000 low
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05 low
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新到tickets到期时间的指定窗口因子为止,此时将尝试续订tickets。 double 0.8 low
ssl.cipher.suites 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 list null low
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null low
ssl.keymanager.algorithm 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509 low
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null low
ssl.trustmanager.algorithm 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 string PKIX low

官方文档

http://kafka.apache.org/documentation/