1. APIS
Kafka主要包括五个核心apis。
- 1.生产者api(Producer api)实现应用向 kafka 集群的topics发送流数据。
- 2.消费者api(Consumer api)实现应用从 kafka 集群的topics读取数据流。
- 3.流式api(Streams api)实现数据流从输入topics传输到输出topics。
- 4.连接器api(Connect api)对于实现connect api 的应用,能持续从数据源系统或者应用拉取数据拉入(pull)到kafka集群中,或者持续将 kafka 数据推(push)到数据源系统或者应用。
- 5.AdminClient api 实现管理和监控topics,brokers,或者其他 kafka 的属性。
kafka功能的实现与语言无关的,允许多种语言编程实现。kafka中官方文档使用的java的语言,其它客户端语言请点击这里。https://cwiki.apache.org/confluence/display/KAFKA/Clients
1.1 Producer Api
生产者api(Producer api)实现应用向 Kafka 集群的 topics 发送流数据。
Producer API 的 maven 依赖:
1 | <dependency> |
1.2 Consumer API
消费者api(Consumer api)实现应用从 Kafka 集群的 topics 读取流数据。
Consumer API 的 maven 依赖:
1 | <dependency> |
1.3 Streams API
流式api(Streams api)实现数据流从输入topics传输到输出topics。
Streams API 的 maven 依赖:
1 | <dependency> |
1.4 Connect api
连接器api(Connect api)对于实现 connect api 的应用,能持续从数据源系统或者应用拉取数据拉入(pull) 到Kafka集群中,或者持续将Kafka数据推(push) 到后续的系统或者应用。
1.5 AdminClient API
AdminClient API 支持管理和检查topics, brokers, acls 和其它 kafka 对象。
AdminClient API 的 maven 依赖:
1 | <dependency> |
1.6 遗弃 APIs
Kafka 依然保留限制遗弃的apis, 这些旧的scala apis仍然可以使用,以提高更强的兼容性。
Broker 配置
以下是必须配置的配置项:
1 | broker.id |
以下是broker其他配置项:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
zookeeper.connect | zookeeper 主机(String) | string | 高(high) | ||
advertised.host.name | 过时的:只有在’advertise.listeners’或者’listeners’未设置的时候使用。使用’advertise.listeners’替代。zookeeeper把 Kafka 的主机的集群信息公布给所有的客户端使用。 | string | null | 高(high) | |
advertised.listeners | string | 高(high) | |||
advertised.port | 遗弃 | int | null | 高(high) | |
auto.create.topics.enable | 设置为true,可自动在服务节点上创建topic | boolean | true | 高(high) | |
auto.leader.rebalance.enable | 设置为true时,Kafka 的后台定时任务会定时检查leader是否平衡,不平衡自动平衡 | boolean | true | 高(high) | |
background.threads | Kafka 的后台的任务数目。 | int | 10 | 高(high) | |
broker.id | Kafka 集群中,broker.id要求唯一,可以在配置文件中设置,但要保持唯一。如果不设置,系统会找到已经存在的id最大值,然后当前id+1 | int | -1 | 高(high) | |
compression.type | 用来指定一个给定的topic的压缩方式,可指定的值’gzip’,’sanppy’,’lz4’,或者不压缩,默认为producer,保持与producer一致。 | string | producer | 高(high) | |
delete.topic.enable | 是否可以删除topic,如何设置为fasle,通过admin tool删除对集群无影响。 | boolean | false | 高(high) | |
leader.imbalance.check.interval.seconds | Kafka 后台处理partition平衡的周期任务的频率 | long | 300 | 高(high) | |
leader.imbalance.per.broker.percentage | 每个broker节点上允许leader的占比,如果超过这个比例,后台平衡任务会触发,平衡leader节点。 | int | 10 | 高(high) | |
listeners | 监听列表,需要设定通信格式和ip:port。如果listener没有设置,需要设置listener.security.protocol.map值。如果设置为0.0.0.0绑定所有接口。如不设置表示设置为默认值。格式如下:PLAINTEXT://myhost:9092,SSL://:9091CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093 | string | null | 高(high) | |
log.dir | 指定日志数据存放的路径,是单路径。 | string | /tmp/kafka-logs | 高(high) | |
log.dirs | 指定日志数据可以放到多个路径。未设置使用默认值log.dir值 | string | null | 高(high) | |
log.flush.interval.messages | 日志刷到日志时缓存中消息条数 | long | 9223372036854775807 | 高(high) | |
log.flush.interval.ms | 在缓存中数据存在的时间,可以理解为周期性将缓存数据持久化。如果未设置则使用log.flush.scheduler.interval.ms设置 | long | null | 高(high) | |
log.flush.offset.checkpoint.interval.ms | 日志的恢复标记位刷新的频率。(日志文件有个标志位,是记录以及写入日志的检索,这里是用来设定这个标志位的频率)。 | int | 60000 | 高(high) | |
log.flush.scheduler.interval.ms | 周期性缓存日志文件刷入到磁盘中。 | long | 9223372036854775807 | 高(high) | |
log.flush.start.offset.checkpoint.interval.ms | 持久化日志起始位更新的频率 | int | 60000 | 高(high) | |
log.retention.bytes | log可存储的最大字节数。大于则删除 | long | -1 | 高(high) | |
log.retention.hours | log保存的时间(hours),超时删除。 | int | 168 | 高(high) | |
log.retention.minutes | log保存的时间(minutes),超时删除。 | int | null | 高(high) | |
log.roll.hours | 如果旧的segement未达到新建的条件,新的segement需要等待最大时间。 | int | 168 | 高(high) | |
log.roll.jitter.hours | long | null | 高(high) | ||
log.roll.jitter.ms | long | null | 高(high) | ||
log.roll.ms | 创建一个新的segement文件需要等待的时间。 | long | null | 高(high) | |
log.segment.bytes | 一个日志文件最大可保持的字节数 | int | 1073741824 | [14,…] | 高(high) |
log.segment.delete.delay.ms | 日志文件从文件系统中删除最大等待的时间 | long | 60000 | 高(high) | |
message.max.bytes | Kafka 批处理最大允许的字节数。 | int | 1000012 | 高(high) | |
min.insync.replicas | 当响应acks设置为all,表示所有同步的insync partition需要同步,这个参数是设置多少个同步即可,并不是所有的。但必须大于[n/2] | int | 1 | 高(high) | |
num.io.threads | 服务器端用来处理客户端的线程数目,包括磁盘的I/O | int | 8 | 高(high) | |
num.network.threads | 服务器端用来响应网络请求和发送网络应答的线程数目 | int | 3 | 高(high) | |
num.recovery.threads.per.data.dir | 用来恢复日志的每个路径下的日志文件线程数目,当日志启动或者关闭前的刷数据 | int | 3 | 高(high) | |
num.replica.fetchers | 备份broker的并行度,表示备份brokers用来同步源broker节点上数据的数目。 | int | 1 | 高(high) | |
offset.metadata.max.bytes | metadata实体关联的offset最大可提交的字节数 | int | 4096 | 高(high) | |
offsets.commit.required.acks | offsets提交被确认需要的acks. | short | -1 | 高(high) | |
offsets.commit.timeout.ms | 一个offsets被提交,需要所有的备份完成同步。这个参数是用来等待确认超时时间。 | int | 5000 | 高(high) | |
offsets.load.buffer.size | 从offsets segement 批量读入到cache的offsets的字节大小 | int | 5242880 | 高(high) | |
offsets.retention.check.interval.ms | 检索旧的offsets的频率 | long | 60000 | 高(high) | |
offsets.retention.minutes | 日志窗口为offsets topic保留的时间 | int | 1440 | 高(high) | |
offsets.topic.compression.codec | 消息的压缩的方法 | int | 0 | 高(high) | |
offsets.topic.num.partitions | topic提交offset需要的partitions的数目 | int | 50 | 高(high) | |
offsets.topic.replication.factor | 设置一个topic的备份的数目。 | short | 3 | 高(high) | |
offsets.topic.segment.bytes | offsets topic segment 需要被保持的合适大小。 | int | 104857600 | 高(high) | |
queued.max.requests | 在相应阻塞之前运行最大的请求的数目 | int | 500 | 高(high) | |
replica.fetch.min.bytes | 每次抓取数据需要最小抓取的量,如果没有达到,则按replicaMaxWaitTimeMs时间等等,超时,返回 | int | 1 | 高(high) | |
replica.fetch.wait.max.ms | 备份文件去抓取主备份数据允许等待的最大时间 | int | 500 | 高(high) | |
replica.high.watermark.checkpoint.interval.ms | the high watermark被保存到磁盘的频率 | long | 5000 | 高(high) | |
replica.lag.time.max.ms | ISR的备份broker未完成同步被移除的时间最大时间 | long | 1000 | 高(high) | |
replica.socket.receive.buffer.bytes | 网络请求套接字buffer大小 | int | 65536 | 高(high) | |
replica.socket.timeout.ms | 网络请求套接字超时等待时间 | int | 30000 | 高(high) | |
request.timeout.ms | 客户端最大等待时间,如果超时,客户端会retries设定的次数直到失败 | int | 30000 | 高(high) | |
socket.receive.buffer.bytes | 客户端最大等待时间,如果超时,客户端会retries设定的次数直到失败 | int | 30000 | 高(high) | |
socket.receive.buffer.bytes | 请求设置服务器套接字的大小 | int | 102400 | 高(high) | |
socket.request.max.bytes | 请求套接字的最大字节数 | int | 104857600 | 高(high) | |
socket.send.buffer.bytes | 发送服务器套接字的大小 | int | 102400 | 高(high) | |
transaction.max.timeout.ms | 事务超时的最大等待时间。 | int | 900000 | 高(high) | |
transaction.state.log.load.buffer.size | 从日志段事务批读取到缓存中的字节大小。 | int | 5242880 | 高(high) | |
transaction.state.log.min.isr | 为了保证事务性,ISR中备份需要同步的数目 | int | 2 | 高(high) | |
transaction.state.log.num.partitions | 主partition的备份数目同步的数目 | int | 50 | 高(high) | |
transaction.state.log.replication.factor | 主节点的从节点的备份文件数目保证事务性 | short | 3 | 高(high) | |
transaction.state.log.segment.bytes | 事务的topic段导入内存的大小 | int | 104857600 | 高(high) | |
transactional.id.expiration.ms | 事务协调器最大的等待时间 | int | 604800000 | 高(high) | |
unclean.leader.election.enable | 非ISR的replicas是否可以参与选择为leader。 | boolean | false | 高(high) | |
zookeeper.connection.timeout.ms | 客户端最大不能连接到zookeeper的等待时间。若未设置则session的设置起作用。 | int | null | 高(high) | |
zookeeper.session.timeout.ms | zookeeper session 超时时间。 | int | 6000 | 高(high) | |
zookeeper.set.acl | 设置客户端使用包含acls | boolean | false | 高(high) | |
broker.id.generation.enable | 检验broker id的最大值是否正确 | boolean | true | 高(high) | |
broker.rack | string | null | 高(high) | ||
connections.max.idle.ms | 空闲的套接字处理线程最大的等待时间 | long | 600000 | 高(high) | |
controlled.shutdown.enable | 是否允许关闭controlled节点 | boolean | true | 高(high) | |
controlled.shutdown.max.retries | controlled节点关闭失败的最多可尝试次数 | int | 3 | 高(high) | |
controlled.shutdown.retry.backoff.ms | controlled 关闭失败,系统需要恢复之前的状态等待的时间 | long | 5000 | 高(high) | |
controller.socket.timeout.ms | controlled到broker channels 超时时间 | int | 30000 | 高(high) | |
default.replication.factor | 默认的备份的复制自动创建topics的个数 | int | 1 | 高(high) | |
delete.records.purgatory.purge.interval.requests | int | 1 | 高(high) | ||
fetch.purgatory.purge.interval.requests | int | 1000 | 高(high) | ||
group.initial.rebalance.delay.ms | 组协调器进行负载均衡时的最大等待时间 | int | 3000 | 高(high) | |
group.max.session.timeout.ms | 组中的消费者的session最大允许等待时间 | int | 300000 | 高(high) | |
group.min.session.timeout.ms | 组中的消费者的session最小允许等待时间 | int | 6000 | medium | |
inter.broker.listener.name | 用于brokers中相互交流监听者的名字。如果没有设置,使用securitu.inter.broker.protocol,不可同时设置。 | string | null | medium | |
inter.broker.protocol.version | 用来指定inter-broker所采用的协议,特别是所有的broker完成升级后,用来检测是否所有的broker所采用的协议是否合法。 | string | 0.11.0-IV2 | medium | |
log.cleaner.backoff.ms | 当日志完成清理,清理程序休眠的时间。 | long | 15000 | [0,…] | medium |
log.cleaner.dedupe.buffer.size | 所有清理日志线程用来分离日志的总共的内存大小。 | long | 134217728 | medium | |
log.cleaner.delete.retention.ms | 被清理的日志保存的时间。 | long | 86400000 | medium | |
log.cleaner.enable | 设置日志清理文件能在服务器上默认运行。 | boolean | true | medium | |
log.cleaner.io.buffer.load.factor | 日志清理队列载入因子,越高清理日志越快,但hash冲突概率越高。 | double | 0.9 | medium | |
log.cleaner.io.buffer.size | 用于日志清理是I/O通道buffer大小 | int | 524288 | medium | |
log.cleaner.io.max.bytes.per.second | 日志清理读写限流。读写会小于这个值 | double | 1.7976931348623157E308 | medium | |
log.cleaner.min.cleanable.ratio | 触发日志清理最小的比例,比例是脏日志/所有日志 | double | 0.5 | medium | |
log.cleaner.min.compaction.lag.ms | 一条消息未被压缩的存在最短时间。仅仅适用于需要被压缩的日志 | long | 0 | medium | |
log.cleaner.threads | 后台用于清理日志的数目 | long | 1 | medium | |
log.cleanup.policy | 日志清理策略:取值’delete’和’compact’ | list | delete | medium | |
log.index.interval.bytes | 加入offset检索的键值对的区间 | int | 4096 | [0,…] | medium |
log.index.size.max.bytes | offset检索位最大字节数 | int | 10485760 | [4,…] | medium |
log.message.format.version | 指定消息格式追加在日志末尾的版本, | int | 10485760 | 0.11.0-IV2 | medium |
log.message.timestamp.difference.max.ms | broker接受到消息的时间戳和指定时间戳之间允许最大时间差;如果log.message.timestamp.type=CreateTime,消息会被拒绝,如果消息时间差超过设定阈值。 | long | 9223372036854775807 | medium | |
log.message.timestamp.type | 消息时间戳是创建时间或者追加在日志结尾的时间。取值’createTime’ 或者’LogAppendTime’。 | String | CreateTime | [CreateTime, LogAppendTime] | medium |
log.preallocate | 是否提前分配日志文件,如果是 Kafka 使用环境是windows环境,建议设置为true | boolean | false | medium | |
log.retention.check.interval.ms | 毫秒级别,检查日志是否需要被删除。 | long | 300000 | medium | |
max.connections.per.ip | 每个ip允许最大数目的连接数。 | int | 2147483647 | [1,…] | medium |
max.connections.per.ip.overrides | 重写主机或者ip的最大连接数 | string | “” | medium | |
num.partitions | 默认topic的partitions的默认值。 | int | 1 | [1,…] | medium |
principal.builder.class | 通过继承这个接口,建立SSL 安全协议连接 | class | org.apache.kafka.common.security.auth.DefaultPrincipalBuilder | medium | |
producer.purgatory.purge.interval.requestss | class | org.apache.kafka.common.security.auth.DefaultPrincipalBuilder | medium | ||
replica.fetch.backoff.ms | 当从partition获取数据发生错误时,休眠的时间。 | int | 1000 | [0,…] | medium |
replica.fetch.max.bytes | partition备份获取主节点的最大字节数 | int | 1048576 | [0,…] | medium |
replica.fetch.response.max.bytes | partition备份获取主节点的应答最大字节数 | int | 10485760 | [0,…] | medium |
reserved.broker.max.id | broker.id可用的最大数目 | int | 1000 | [0,…] | medium |
sasl.enabled.mechanisms | Kafka 服务所允许的SASL机制的列表,默认的是GSSAPI | list | GSSAPI | medium | |
sasl.kerberos.kinit.cmd | kerberos kinit 命令的路径 | string | /usr/bin/kinit | medium | |
sasl.kerberos.min.time.before.relogin | 登录线程刷新时间间隔 | long | 6000 | medium | |
sasl.kerberos.principal.to.local.rules | 规则列表将主名字映射到简短的名字 | list | DFAULT | medium | |
sasl.kerberos.service.name | kerberos在 Kafka 中运行的方式,可以是JAAS的配置,或者 Kafka 的配置。 | string | null | medium | |
sasl.kerberos.ticket.renew.jitter | 重生的时间锁附加的随机差的百分比 | double | 0.5 | medium | |
sasl.kerberos.ticket.renew.window.factor | 从指定的时间窗口到最新的时间差登录超时,登录线程自动激活。 | double | 0.8 | medium | |
sasl.mechanism.inter.broker.protocol | 内置broker使用SASL机制,默认是GSSAPI | String | GSAPI | medium | |
security.inter.broker.protocol | brokers之间使用的安全通信协议,可选值为PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 | String | PLAINTEXT | medium | |
ssl.client.auth | Kafka broker应答客户端的权限控制,有三种,required,客户端应答需要认证。none表示不需要认证。 | String | none | medium | |
ssl.enabled.protocols | SSL连接可选用的协议列表 | String | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.enabled.protocols | SSL连接可选用的协议列表 | String | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | key store的文件格式,供客户端使用 | String | JKS | medium | |
ssl.protocol | SSLContext使用的SSL协议 | String | TLS | medium | |
ssl.provider | ssl连接安全的名字 | String | null | medium | |
ssl.trustmanager.algorithm | 用于SSL连接的信任管理器工厂算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 | String | PKIX | medium | |
ssl.truststore.type | 可信任的文件格式 | String | JKS | medium | |
alter.config.policy.class.name | alter 配置应该用于验证的策略类。 该类应该实现org.apache.kafka.server.policy.AlterConfigPolicy接口。 | calss | null | medium | |
authorizer.class.name | 授权继承的授权的类 | string | “” | medium | |
create.topic.policy.class.name | 应用于验证的创建主题策略类。 该类应该实现org.apache.kafka.server.policy.CreateTopicPolicy接口。具体来说,我们可以定义名称为INTERNAL和EXTERNAL的侦听器,该属性如下:INTERNAL:SSL,EXTERNAL:SSL 。 |
class | null | medium | |
listener.security.protocol.map | 在监听器名称和安全协议之间映射。 必须为同一个安全协议定义一个可用于多个端口或IP的安全协议。 例如,即使需要SSL,我们也可以分离内部和外部流量。,键和值由冒号分隔,映射条目用逗号分隔。 每个监听器名称只应在地图中显示一次。 | string | SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT | medium | |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | medium | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | [1,…] | low |
enable.idempotence | 如果设置为true,每个消息在流中只复制一遍。 | boolean | false | medium | |
interceptor.classes | 用作拦截器的类的列表。 使用ProducerInterceptor接口,您可以在生成器发布到 Kafka 群集之前拦截(并且可能会变更)生产者收到的记录。 默认情况下,没有拦截器 | list | null | medium | |
max.in.flight.requests.per.connection | 在阻止之前,客户端将在单个连接上发送的未确认请求的最大数量。 请注意,如果此设置设置为大于1且发送失败,则会由于重试(即启用重试)而导致消息重新排序的风险。 | int | 5 | medium | |
metadata.max.age.ms | 以毫秒为单位的时间段,强制更新元数据,即使我们没有看到任何分区leader变化。 | long | 300000 | [0,…] | medium |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | low | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | low | |
metrics.recording.level | 监控的最高的记录级别。 | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | 时间窗口计算度量标准。。 | long | 30000 | [0,..] | low |
quota.window.num | 保留在内存中的客户端引用数目。 | int | 11 | [1,..] | low |
quota.window.size.seconds | 客户配引用在内存中的时间跨度 | int | 1 | [1,..] | low |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | low | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | low | |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 回滚已超时的交易的时间间隔 | int | 60000 | [1,…] | low |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 由于transactional.id.expiration.ms传递而导致已过期的事务的间隔 | int | 3600000 | [1,…] | low |
zookeeper.sync.time.ms | zk的从节点可以落后主节点多远 | int | 2000 | low |
Topic 配置
Topic-level configuration 配置与主题相关的配置同时具有服务器默认值以及可选的每个主题的覆盖。 如果没有给出每个主题的配置,则使用服务器默认值。 可以通过提供一个或多个-config选项来在主题创建时设置覆盖。 此示例使用自定义消息大小和刷新率创建一个名为my-topic的主题:1
2> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
重载后的topic相关属性可以使用新的命令再次修改,下面这个例子是重载后修改消息的大小1
2bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
检查重载topic可以使用一下命令:1
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
移除重载可以:1
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
以下是主题级配置。 该属性的服务器的默认配置在“服务器默认属性”标题下给出。 给定的服务器默认配置值仅适用于主题,如果它没有显式主题配置覆盖。
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | SERVER DEFAULT PROPERTY | IMPORTANCE |
---|---|---|---|---|---|---|
cleanup.policy | “删除”或“合并”的字符串。 此字符串指定在旧日志段上使用的保留策略。 默认策略(“delete”)将在达到保留时间或大小限制时将废弃旧段。 “合并”设置将启用主题上的日志压缩。 | list | delete | [compact, delete] | log.cleanup.policy | medium |
compression.type | 指定主体的最终压缩类型。 此配置接受标准压缩编解码器(’gzip’,’snappy’,lz4)。 它还接受‘uncompressed’相当于没有压缩的未压缩 ,这意味着保留由生产者设置的原始压缩编解码器。 | string | producer | [uncompressed, snappy, lz4, gzip, producer] | compression.type | medium |
delete.retention.ms | 保留删除记录压缩主题的标记的时间量。 此设置还给出消费者如果从偏移量0开始读取的时间,以确保它们获得最终阶段的有效快照(否则,在完成扫描之前可能会收集删除的标记)。 | long | 86400000 | [0,…] | log.cleaner.delete.retention.ms | medium |
file.delete.delay.ms | 等待文件从文件系统删除的时间 | long | 60000 | [0,…] | log.segment.delete.delay.ms | medium |
flush.messages | 此设置允许指定我们强制fsync写入日志的数据的间隔。 例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。 一般来说,我们建议您不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能更高效。 可以在每个主题的基础上覆盖此设置(请参阅每个主题的配置部分)。 | long | 9223372036854775807 | [0,…] | log.flush.interval.messages | medium |
flush.ms | 此设置允许指定我们强制fsync写入日志的数据的时间间隔。 例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。 一般来说,我们建议您不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能更高效。 | long | 9223372036854775807 | [0,…] | log.flush.interval.ms | medium |
follower.replication.throttled.replicas | 日志复制应在向后侧节流的副本列表。该清单应描述一组副本的形式[的partitionid]:[BrokerId],[partitionid的]:[BrokerId]:……或可替代的通配符“*”可以被用来扼杀所有副本这个话题。 | list | “” | [partitionId],[brokerId]:[partitionId],[brokerId]:… | follower.replication.throttled.replicas | medium |
index.interval.bytes | 此设置控制 Kafka 向其偏移索引添加索引条目的频率。 默认设置确保我们大致每4096个字节索引消息。更多的索引允许读取更接近日志中的确切位置,但使索引更大。 你可能不需要改变这个。 | int | 4096 | [0,..] | log.index.interval.bytes | medium |
leader.replication.throttled.replicas | 日志复制应在引导端进行限制的副本列表。该清单应描述一组副本的形式[的partitionid]:[BrokerId],[partitionid的]:[BrokerId]:……或可替代的通配符“*”可以被用来扼杀所有副本这个话题。 | list | “” | [partitionId],[brokerId]:[partitionId],[brokerId]:… | leader.replication.throttled.replicas | medium |
max.message.bytes | Kafka 允许的批处理量大小。 如果这种情况有所增加,并且消费者的版本号大于0.10.2,消费者的提取大小也必须增加,以便他们可以获取最大批次。在最新的消息格式版本中,记录总是分批进行分组以提高效率。在以前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个记录。 | int | 1000012 | [0,…] | message.max.bytes | medium |
message.format.version | 指定用于将消息附加到broker的日志的消息格式版本。 该值应该是一个有效的ApiVersion。 一些例子是:0.8.2,0.9.0.0,0.10.0,更多细节检查ApiVersion。 通过设置特定的消息格式版本,用户正在证明磁盘上的所有现有消息都小于或等于指定版本。 不正确地设置此值将导致消费者使用旧版本,因为他们将接收到不了解的格式的消息。 | string | 0.11.0-IV2 | log.message.format.version | medium | |
message.timestamp.difference.max.ms | broker接收到消息的时间戳与消息中指定的时间戳之间允许的最大差异。 如果message.timestamp.type = CreateTime,如果时间戳的差异超过此阈值,则消息将被拒绝。 如果message.timestamp.type = LogAppendTime,则忽略此配置。 | long | 9223372036854775807 | [0,…] | log.message.timestamp.difference.max.ms | medium |
message.timestamp.type | 定义消息中的时间戳是消息创建时间还是日志追加时间。 该值应该是“CreateTime”或“LogAppendTime” | string | CreateTime | log.message.timestamp.type | medium | |
min.cleanable.dirty.ratio | 此配置控制日志压缩程序将尝试清除日志的频率(假定启用日志压缩)。 默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。 更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 | doublie | 0.5 | [0,…,1] | log.cleaner.min.cleanable.ratio | medium |
min.compaction.lag.ms | 消息在日志中保持不受压缩的最短时间。 仅适用于正在压实的日志。 | long | 0 | [0,…,1] | log.cleaner.min.compaction.lag.ms | medium |
min.insync.replicas | 当生产者将acks设置为“all”(或“-1”)时,此配置指定必须确认写入的副本的最小数量,以使其被认为是成功的。如果无法满足此最小值,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。一起使用时,min.insync.replicas和acks允许您强化更大的耐用性保证。典型的情况是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks“all”生成。 如果大多数副本没有收到写入,这将确保生产者引发异常。 | int | 1 | [1,…] | min.insync.replicas | medium |
preallocate | 如果我们应该在创建新的日志段时在磁盘上预分配该文件,那么为真。。 | boolean | false | log.preallocate | medium | |
retention.bytes | 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧日志段以释放空间。 默认情况下,没有大小限制仅限于时间限制。 | long | -1 | log.retention.bytes | medium | |
retention.ms | 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。 这代表消费者必须阅读数据的时间长短。 | long | 604800000 | log.retention.ms | medium | |
segment.bytes | 此配置控制日志的段文件大小。 一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 | int | 1073741824 | [14,…] | log.segment.bytes | medium |
segment.index.bytes | 此配置控制将偏移映射到文件位置的索引的大小。 我们预先分配此索引文件,并在日志滚动后收缩它。 通常您不需要更改此设置。 | int | 10485760 | [0,…] | log.index.size.max.bytes | medium |
segment.jitter.ms | 任务segement回滚抖动时间避免不必要的回滚。 | long | 0 | [0,…] | log.roll.jitter.ms | medium |
segment.ms | 此配置控制 Kafka 将强制日志滚动的时间段,即使段文件未满,以确保保留可以删除或压缩旧数据。 | long | 604800000 | [0,…] | log.roll.ms | medium |
unclean.leader.election.enable | 指示是否将不在ISR集中的副本作为最后的手段选举为领导者,这样做可能会导致数据丢失。 | boolean | false | unclean.leader.election.enable | medium |
Producer 配置
以下是java producer 客户端配置:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | 用于建立与 Kafka 集群的初始连接的ip/port对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅用于发现完整服务器集的初始主机。格式如下host1:port1,host2:port2,…:由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整的服务器集(尽管如此,您可能需要多台服务器)。 | list | high | ||
key.serializer | 序列化所实现的接口:serializer | class | high | ||
key.serializer | key序列化所实现的接口:serializer | class | high | ||
value.serializer | value序列化所实现的接口:serializer | class | high | ||
acks | 在请求完成之前,生产者要求leader收到的确认数量。 这可以控制发送记录的可靠性。 允许以下设置:ack=0:如果设置为零,则生产者根本不会等待服务器的任何确认。 该记录将立即添加到套接字缓冲区并视为已发送。 在这种情况下,不能保证服务器已经收到记录,重试配置将不会生效(因为客户端通常不会知道有任何故障)。每个记录返回的偏移量将始终设置为-1。 ack=1:这将意味着领导者会将记录写入其本地日志,但将不会等待所有follower的全面确认。在这种情况下,如果领导者在确认记录之后立即失败,但在追随者未完成复制之前,该记录将丢失。 ack=all:这意味着领导者将等待全部follower完成复制。 只要至少有一个同步处理副本仍然存在,这将保证记录不会丢失。 这是最可靠的保证。 可以acks = -1设置。 | string | 1 | [all, -1, 0, 1] | high |
buffer.memory | 生产者可以用来缓冲等待发送到服务器的记录的总字节数。 如果记录发送速度比可以传送到服务器的速度快,那么制作者将阻止max.block.ms,之后它会抛出异常。此设置应大致对应于生产者将使用的总内存,但不是硬绑定,因为生产者使用的所有内存不用于缓冲。 一些额外的内存将用于压缩(如果启用压缩)以及维护飞行中请求。 | long | 33554432 | [0,…] | high |
compression.type | producer生成的所有数据的压缩类型。 默认值为none(即无压缩)。 有效值为none,gzip,snappy或lz4。 压缩是完整的数据,所以压缩类型也会影响压缩比(越多的批处理压缩越好)。 | string | none | high | |
compression.type | producer生成的所有数据的压缩类型。 默认值为none(即无压缩)。 有效值为none,gzip,snappy或lz4。 压缩是完整的数据,所以压缩类型也会影响压缩比(越多的批处理压缩越好)。 | string | none | high | |
retries | 设置大于零的值将导致客户端重新发送其发送失败并记录。 请注意,此重试与收到错误时客户端重新发送记录无异。 允许重试而不将max.in.flight.requests.per.connection设置为1将潜在地更改记录的排序,因为如果两个批次发送到单个分区,并且第一个失败并重试,但第二个成功,则记录 在第二批可能会出现第一。 | int | 0 | [0,…,2147483647] | high |
ssl.key.password | 密钥存储文件中私钥的密码。 这对于客户端是可选的。 | password | null | high | |
ssl.keystore.location | 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向认证。 | string | null | high | |
ssl.keystore.password | 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 | password | null | high | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | high | |
ssl.truststore.password | 信任存储文件的密码。 如果未设置密码,对信任库的访问仍然可用,但是完整性检查被禁用。 | password | null | high | |
batch.size | 每当多个记录被发送到同一个分区时,生产者将尝试将消息批处理。 这有助于客户端和服务器上的性能。 此配置控制默认批量大小(以字节为单位)。不会尝试批量记录大于此大小的记录。发送到brokers的请求将包含多个批次,每个分区有一个,可以发送数据。小批量批量将使批量不太常见,并可能降低生产量(零批量大小将完全禁用批处理)。 非常大的批量大小可能会浪费更多的内存,因为我们将始终分配指定批量大小的缓冲区以预期额外的记录。 | int | 16384 | [0,…] | high |
client.id | 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 | string | “” | high | |
connections.max.idle.ms | 在此配置指定的毫秒数之后关闭空闲连接。 | long | 540000 | high | |
linger.ms | 生产者将在请求传输之间到达的任何记录组合成单个批量请求。通常情况下,只有在记录到达速度比发送速度快的情况下才会出现这种情况。然而,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。该设置通过添加少量人造延迟来实现,即,不是立即发出生产者将等待给定延迟的记录,以允许发送其他记录,以便发送可以批处理在一起。这可以被认为类似于Nagle在TCP中的算法。此设置给出批处理延迟的上限:一旦我们获得了一个分区的batch.size值的记录,它将立即被发送,而不管这个设置如何,但是如果我们比这个分区累积了很多字节,我们将’停留在指定的时间等待更多的记录显示。此设置默认为0(即无延迟)。例如,设置linger.ms = 5将具有减少发送的请求数量的效果,但对于在负载的缺失中发送的记录,将加起来5ms的延迟。 | long | 0 | [0,…] | high |
max.block.ms | 配置控制KafkaProducer.send()和KafkaProducer.partitionsFor()将阻塞多长时间。这些方法可能被阻止,因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。 | long | 60000 | [0,…] | medium |
max.request.size | 请求的最大大小(以字节为单位)。 此设置将限制生产者在单个请求中发送的记录批次数,以避免发送巨大的请求。 这也是最大记录批量大小的上限。 请注意,服务器拥有自己的字节大小,可能与此不同。。 | int | 1048576 | [0,…] | medium |
partitioner.class | partitioner继承实现partitioner 接口 | class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | [0,…] | medium |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 32768 | [-1,…] | medium |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms(代理配置),以减少由于不必要的生产者重试引起的消息重复的可能性。 | int | 3000 | [0,…] | medium |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ | password | null | medium | |
sasl.kerberos.service.name | Kafka 运行的Kerberos主体名称。 这可以在 Kafka 的JAAS配置或 Kafka 的配置中定义。 | string | null | medium | |
sasl.mechanism | SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 | string | GSSAPI | medium | |
security.protocol | 与brokers通信的协议,合法值: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. | string | PLAINTEXT | medium | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 131072 | medium | |
ssl.enabled.protocols | SSL连接的协议列表 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | 密钥存储文件的文件格式。 这对于客户端是可选的。 | string | jks | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 | string | tls | medium | |
ssl.provider | 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 | string | null | medium | |
ssl.truststore.type | 可信任存储文件的格式。 | string | jks | medium | |
enable.idempotence | 当设置为“true”时,生产者将确保每个消息的正好一个副本写入流中。 如果“false”,由于broker程序故障等导致的生产者重试,可能会在流中写入重试的消息的重复。 默认设置为“false”。 请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。 另外acks必须设置为“all”。 如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 | boolean | false | medium | |
interceptor.classes | 用作拦截器的类的列表。 使用ProducerInterceptor接口,您可以在生成器发布到 Kafka 群集之前拦截(并且可能会变更)生产者收到的记录。 默认情况下,没有拦截器。 | list | null | medium | |
max.in.flight.requests.per.connection | 在阻止之前,客户端将在单个连接上发送的未确认请求的最大数量。 请注意,如果此设置设置为大于1且发送失败,则会由于重试(即启用重试)而导致消息重新排序的风险。 | int | 5 | medium | |
metadata.max.age.ms | 以毫秒为单位的时间段,我们强制更新元数据,即使我们没有看到任何分区领导变化,主动发现任何新的经纪人或分区。 | long | 30000 | medium | |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | medium | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | medium | |
metrics.recording.level | 指标的最高记录级别。 | string | info | medium | |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | medium | |
reconnect.backoff.max.ms | 重新连接到重复无法连接的broker程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 | long | 1000 | medium | |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 | long | 50 | medium | |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径。 | string | /usr/bin/kinit | medium | |
sasl.kerberos.min.time.before.relogin | 刷新尝试之间的登录线程睡眠时间。 | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将睡眠,直到从上次刷新到窗口到期时间的指定窗口因子为止,此时将尝试续订 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 | list | null | low | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | low | |
ssl.keymanager.algorithm | 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | low | |
ssl.trustmanager.algorithm | 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 | string | PKIX | low | |
transaction.timeout.ms | 事务协调器在主动中止正在进行的事务之前,等待事务状态更新的最大时间(以ms为单位)。如果此值大于代理中的max.transaction.timeout.ms设置,则请求将 失败,出现“InvalidTransactionTimeout”错误。 | int | 60000 | low | |
transactional.id | 用于事务传递的TransactionalId。 这使得可以跨越多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。 | string | null | non-empty string | low |
Consumer 配置
以下是新消费者的配置
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | 用于建立与 Kafka 集群的初始连接的主机/端口对列表。客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表用于发现完整服务器集的初始主机。 | list | high | ||
key.deserializer | Deserializer类用于key实现Deserializer接口。 | class | high | ||
value.deserializer | 解串器类实现value的Deserializer接口。 | class | high | ||
fetch.min.bytes | 服务器应该为获取请求返回的最小数据量。 如果没有足够的数据可用,请求将等待这么多数据积累,然后再回答该请求。 1字节的默认设置意味着只要单字节的数据可用,或者提取请求超时等待数据到达,则应答提取请求。 将其设置为大于1将导致服务器等待更大量的数据累积,这可能会以一些额外的延迟为代价提高服务器吞吐量。。 | int | 1 | [0,…] | high |
group.id | 一个唯一的字符串,用于标识此消费者所属的消费者组。 如果消费者通过使用订阅(主题)或基于 Kafka 的偏移量管理策略来使用组管理功能,则此属性是必需的。 | string | “” | high | |
heartbeat.interval.ms | 使用 Kafka group 管理,心跳与消费者协调员之间的预期时间。 心跳用于确保消费者的会话保持活动,并在新消费者加入或离开组时促进重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 它可以调整得更低以控制正常重新平衡的预期时间。 | int | 3000 | high | |
max.partition.fetch.bytes | 服务器将返回的每个分区的最大数据量。 记录由消费者批量获取。 如果fetch的第一个非空分区中的第一个记录批次大于此限制,则仍将返回批处理,以确保消费者可以取得进展。接受的最大记录批量大小通过message.max.bytes(broker config)或max.message.bytes(topic config)定义。 请参阅fetch.max.bytes以限制消费者请求大小。 | int | 1048576 | [0,…] | high |
ssl.key.password | 密钥存储文件中私钥的密码。 这对于客户端是可选的。 | password | null | high | |
ssl.keystore.location | 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向认证。 | String | null | high | |
ssl.keystore.password | 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 | password | null | high | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | high | |
auto.offset.reset | 如果 heartbeat.interval.ms 没有初始偏移量,或者当前偏移量在服务器上不再存在,采取何种策略:earliest:每次从初始位置取。latest:每次从最新位置取。none:从消费的offset值开始取,不存在,直接抛出异常。anything esle :直接抛出异常 | string | null | high | |
connections.max.idle.ms | 在此配置指定的毫秒数之后关闭空闲连接。 | long | 540000 | high | |
enable.auto.commit | 如果真的,消费者的偏移将在后台定期提交 | boolean | true | high | |
exclude.internal.topics | 来自内部主题的记录(如偏移量)是否应该暴露给消费者。 如果设置为true,则从内部topic接收记录的唯一方法是订阅它。 | boolean | true | high | |
fetch.max.bytes | 服务器为获取请求返回的最大数据量。 记录由消费者批量获取,如果第一个非空分区中的第一个记录批次大于此值,则记录批次仍将被返回,以确保消费者能够取得进展。。 | int | 52428800 | [0,…] | high |
isolation.level | 控制如何读取事务写入的消息。 如果设置为read_committed只返回事务消息,如果设置为read_uncommitted,consumer.poll() 返回所有消息,甚至被无法确保的事务性消息。非事务消息将以任意一种方式无条件返回。消息将始终以偏移顺序返回。 consumer.poll()将只返回消息,直到最后一个稳定的偏移量(LSO),它是小于第一个打开事务的偏移量的一个。 特别是在属于正在进行的消费的消息之后出现的任何消息将被阻塞,直到相关交易完成。 因此,在事务性处理中,read_committed的消费者将无法读取高水印。 | string | read_uncommitted | [read_committed, read_uncommitted] | high |
max.poll.interval.ms | 在使用消费者组管理时调用poll()的最大延迟。 这会在获取更多记录之前将消费者空闲时间的上限置于上限之内。 如果在此超时到期之前未调用poll(),则消费者被认为失败,并且该组将重新平衡,以便将分区重新分配给另一个成员。 | int | 300000 | [0,…] | high |
max.poll.records | 在单次调用poll()中返回的最大消息数。 | int | 500 | [1,…] | high |
partition.assignment.strategy | 分组分配策略的类名称,当使用组管理时,将会把消费者按照组的对应关系根据定义的分配策略分配 | list | class org.apache.kafka.clients.consumer.RangeAssignor | high | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 65536 | [-1,…] | medium |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 | int | 305000 | [0,…] | medium |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ | password | null | medium | |
sasl.kerberos.service.name | Kafka 运行的Kerberos主体名称。 这可以在 Kafka 的JAAS配置或 Kafka 的配置中定义。 | string | null | medium | |
sasl.mechanism | SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 | string | GSSAPI | medium | |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
security.protocol | 用于与brokers沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL | string | PLAINTEXT | medium | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 131072 | medium | |
ssl.keystore.type | 密钥存储文件的文件格式。 这对于客户端是可选的。 | string | jks | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 | string | TLS | medium | |
ssl.provider | 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 | string | null | medium | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | jks | medium | |
auto.commit.interval.ms | 如果enable.auto.commit设置为true,则以毫秒为单位的消费者偏移量自动提交给 Kafka 的频率。 | int | 5000 | [0,…] | medium |
check.crcs | 自动检查CRC32记录的消耗。 这样可以确保邮件不发生在线或磁盘损坏。 这个检查增加了一些开销,因此在寻求极端性能的情况下可能会被禁用。 | boolean | true | low | |
client.id | 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 | string | “” | low | |
fetch.max.wait.ms | 如果没有足够的数据来满足fetch.min.bytes给出的要求,服务器将在接收到提取请求之前阻止的最长时间。 | int | 500 | [0,…] | low |
interceptor.classes | 用作拦截器的类的列表。 实现ConsumerInterceptor接口允许您拦截(也可能是变异)消费者收到的记录。 默认情况下,没有拦截器 | list | null | low | |
metadata.max.age.ms | 以毫秒为单位的时间段,强制更新元数据,即使我们没有看到任何分区领导变化,主动发现任何新的brokers或分区。 | list | null | low | |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | low | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | [1,…] | low |
metrics.recording.level | 指标的最高记录级别。。 | string | info | [INFO, DEBUG] | low |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | [0,…] | low |
reconnect.backoff.max.ms | 重新连接到反复无法连接的brokers程序时,以毫秒为单位的最大时间等待时间。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 | long | 1000 | [0,…] | low |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对brokers的所有连接尝试。 | long | 50 | [0,…] | low |
retry.backoff.ms | 尝试重试给定主题分区的失败请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 | long | 100 | [0,…] | low |
sasl.kerberos.kinit.cmd | kerberos kinit 命令的路径 | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | 刷新尝试之间的登录线程睡眠时间。 | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将睡眠,直到从上次刷新到票证时间到期的指定窗口因子为止,此时将尝试续订阅。 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 | list | null | low | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | low | |
ssl.keymanager.algorithm | 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | low | |
ssl.trustmanager.algorithm | 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 | string | PKIX | low |
低(旧)版本 Consumer 配置
必须配置的低版本 Consumer 配置如下:
- group.id
- zookeeper.connect
PROPERTY | DEFAULT | DESCRIPTION |
---|---|---|
group.id | 一个唯一标识消费者所属消费者,类型是string。 通过设置相同的组ID,多个进程表示它们都是同一消费者组的一个。 | |
zookeeper.connect | 指定ZooKeeper连接字符串,格式为hostname:port,其中host和port是ZooKeeper服务器的主机和端口。 为了允许在ZooKeeper机器关闭时通过其他ZooKeeper节点进行连接,您还可以以hostname1:host1,hostname2:port2,hostname3:port3的形式指定多个主机。该服务器还可以具有ZooKeeper chroot路径,作为其ZooKeeper连接字符串的一部分,将其数据放置在全局ZooKeeper命名空间中的某个路径下。 如果是这样,消费者应该在其连接字符串中使用相同的chroot路径。 例如,要给出/ chroot / path的chroot路径,您将给出连接字符串为hostname1:port1,hostname2:port2,hostname3:port3 / chroot / path。 | |
consumer.id | null | 未设置,自动增加 |
socket.timeout.ms | 30*1000 | 网络请求超时时间,真实的网络超时时间max.fetch.wait + socket.timeout.ms. |
socket.receive.buffer.bytes | 64*1024 | 网络请求接受的网络字节数 |
fetch.message.max.bytes | 1024*1024 | 每个topic-partition提取请求中尝试获取的消息的字节数。 这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 提取请求大小必须至少与服务器允许的最大消息大小一样大,否则生产者可能发送大于消费者可以获取的消息。 |
num.consumer.fetchers | 1 | 消费者获取消息的线程数 |
auto.commit.enable | true | 如果为true,定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用记录偏移量作为新消费者开始的位置。 |
auto.commit.interval.ms | 60*1000 | 提交给zookeeper的消费者的offsets的频率,时间为ms。 |
queued.max.message.chunks | 2 | 消息块的最大被缓冲消息数目。 每个块可以达到fetch.message.max.bytes。 |
rebalance.max.retries | 4 | 当新的消费者加入消费者组时,消费者集合尝试“重新平衡”负载,为每个消费者分配分区。 如果一组消费者在这项任务发生时发生变化,则重新平衡将失败并重试。 此设置控制放弃之前的最大尝试次数。 |
fetch.min.bytes | 1 | 服务器应该为获取请求返回的最小数据量。 如果没有足够的数据可用,请求将等待这么多数据积累,然后再回答该请求。 |
fetch.wait.max.ms | 100 | 如果没有足够的数据来立即满足fetch.min.bytes,服务器将在应答抓取请求之前阻止的最长时间 |
rebalance.backoff.ms | 2000 | 重新平衡期间重试之间的退后时间。 如果未明确设置,则使用zookeeper.sync.time.ms中的值。 |
refresh.leader.backoff.ms | 200 | 退出等待时间,然后再尝试确定刚刚失去领导者的分区新的领导者。 |
auto.offset.reset | largest | 当 zookeeper中offset没有初始化或者超出值的范围应该怎么做:smallest:自动设置为最小的offset;largest:自动设置为最新的offset;*anything:在消费者抛出异常。 |
consumer.timeout.ms | -1 | 如果在指定的时间间隔后没有消息可用,则向用户发出超时异常 |
exclude.internal.topics | true | 来自内部主题的消息(如偏移量)是否应该暴露给消费者。 |
client.id | group id value | 客户端ID是每个请求中发送的用户指定的字符串,用于帮助跟踪调用。 它应该逻辑地标识发出请求的应用程序。 |
zookeeper.session.timeout.ms | 6000 | ZooKeeper会话超时。 如果消费者在这段时间内没有对ZooKeeper心跳,那么它被认为是死亡的,并且会发生重新平衡。 |
zookeeper.connection.timeout.ms | 6000 | 与zookeeper建立连接时客户端等待的最长时间。 |
zookeeper.sync.time.ms | 2000 | zk的follower可以落后zk leader多长时间 |
offsets.storage | zookeeper | offsets应该存储在哪里 |
offsets.channel.backoff.ms | 1000 | 重新连接偏移信道或重试失败的偏移提取/提交请求时的退避周期。 |
offsets.channel.socket.timeout.ms | 10000 | 读取偏移提取/提交请求的响应时的套接字超时。 此超时也用于用于查询偏移量管理器的ConsumerMetadata请求。 |
offsets.commit.max.retries | 5 | 失败时重试偏移提交多次。 此重试计数仅适用于停机期间的偏移提交。 它不适用于源自自动提交线程的提交。 它也不适用于在提交偏移量之前查询偏移协调器的尝试。 即如果消费者元数据请求由于任何原因而失败,则将重试它,并且重试不计入该限制。 |
dual.commit.enabled | true | 如果您使用“kafka”作为offsets.storage,则可以向ZooKeeper(除Kafka之外)进行双重提交偏移。 在从基于zookeeper的偏移存储迁移到基于kafka的偏移量存储的过程中需要这样做。 对于任何给定的消费者组,在该组中的所有实例已迁移到提交到代理(而不是直接到ZooKeeper)的新版本之后,可以将其关闭。 |
partition.assignment.strategy | range | 在“range”或“roundrobin”策略之间选择将分区分配给消费者流。循环分区分配器布置所有可用的分区和所有可用的消费者线程。然后,继续从分区到消费者线程进行循环任务。如果所有消费者实例的订阅是相同的,则分区将被均匀分布。 (即,分区所有权计数将在所有消费者线程之间的差异仅在一个增量之内。)循环分配仅在以下情况下被允许:(a)每个主题在消费者实例中具有相同数量的流(b)集合订阅的主题对于组内的每个消费者实例都是相同的。范围分区以每个主题为基础。对于每个主题,我们按数字顺序排列可用的分区,并以字典顺序排列消费者线程。然后,我们将分区数除以消费者流(线程)的总数来确定分配给每个消费者的分区数。如果不均匀分割,那么前几个消费者将会有一个额外的分区。 |
Kafka Connect 配置
Kafka Connect 配置如下:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
config.storage.topic | 存储连接器配置的Kafka主题的名称 | string | high | ||
group.id | 一个唯一的字符串,用于标识此工作所属的Connect集群组。 | string | high | ||
key.converter | 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这可以控制写入或从卡夫卡读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 | class | high | ||
offset.storage.topic | 存储连接器偏移的Kafka主题的名称 | string | high | ||
status.storage.topic | 存储连接器和任务状态的Kafka主题的名称 | string | high | ||
value.converter | 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 | class | high | ||
internal.key.converter | 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这可以控制写入或从卡夫卡读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置控制框架使用的内部记账数据的格式,如配置和偏移量,因此用户通常可以使用任何正常的转换器实现。 | class | high | ||
internal.value.converter | 用于在Kafka Connect格式和写入Kafka的序列化表单之间转换的转换器类。 这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置控制框架使用的内部记账数据的格式,如配置和偏移量,因此用户通常可以使用任何正常的转换器实现。 | class | high | ||
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅影响用于发现完整服务器集的初始主机。 该列表应该以host1:port1,host2:port2,….的格式。由于这些服务器仅用于初始连接才能发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集 的服务器(尽管如此,您可能需要多个服务器)。 | list | localhost:9092 | high | |
heartbeat.interval.ms | 使用Kafka集团管理设施时心跳与团队协调员之间的预期时间。 心跳用于确保工作人员的会话保持活动,并促进新成员加入或离开组时的重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 它可以调整得更低以控制正常重新平衡的预期时间。 | int | 3000 | high | |
rebalance.timeout.ms | 一旦重新平衡开始,每个worker最多可以加入该组。 这基本上是所有任务刷新任何未决数据和提交偏移所需的时间量的限制。 如果超时超时,那么worker将从组中删除,这将导致偏移提交失败。 | int | 60000 | high | |
session.timeout.ms | 用于检测worker故障的超时时间。 worker定期发送心跳以表明其对经纪人的活力。 如果在此会话超时到期之前,代理没有收到心跳线,那么代理将从组中删除该worker并启动重新平衡。 请注意,该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms中的代理配置中配置的允许范围内。 | int | 10000 | high | |
ssl.key.password | 密钥存储文件中私钥的密码。 这对于客户端是可选的。 | password | null | high | |
ssl.keystore.location | 密钥存储文件的位置。 这对于客户端是可选的,可以用于客户端的双向认证。 | string | null | high | |
ssl.keystore.password | 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 | password | null | high | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | high | |
ssl.truststore.password | 信任存储文件的密码。 如果未设置密码,对信任库的访问仍然可用,但是完整性检查被禁用。 | password | null | high | |
connections.max.idle.ms | 在此配置指定的毫秒数之后关闭空闲连接。 | long | 540000 | high | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 32768 | [0,…] | high |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 | int | 40000 | [0,…] | high |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ | password | null | high | |
sasl.kerberos.service.name | Kafka运行的Kerberos主体名称。 这可以在Kafka的JAAS配置或Kafka的配置中定义。 | string | null | high | |
sasl.mechanism | SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 | string | GSSAPI | high | |
security.protocol | 用于与brokers沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 | string | PLAINTEXT | high | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 131072 | [0,…] | high |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | high | |
ssl.keystore.type | 密钥存储文件的文件格式。 这对于客户端是可选的。 | string | JKS | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 | string | TLS | medium | |
ssl.provider | 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 | string | null | medium | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | medium | |
worker.sync.timeout.ms | 当worker与其他worker不同步并需要重新同步配置时,请等待这段时间才能放弃,离开组,然后等待退伍期才能重新加入。。 | int | 3000 | medium | |
worker.unsync.backoff.ms | 当worker与其他worker不同步,并且无法赶上worker.sync.timeout.ms,请在重新加入之前离开Connect集群很长时间。 | int | 300000 | medium | |
access.control.allow.methods | 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。 Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。 | string | “” | medium | |
access.control.allow.origin | 将Access-Control-Allow-Origin标头设置为REST API请求的值。要启用跨源访问,请将其设置为应允许访问API的应用程序的域,或者“*”允许从任何 域。 默认值仅允许从REST API的域访问。 | string | “” | medium | |
client.id | 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 | string | “” | medium | |
config.storage.replication.factor | 创建配置存储主题时使用的复制因子 | short | 3 | [1,…] | medium |
metadata.max.age.ms | 以毫秒为单位的时间段,我们强制更新元数据,即使我们没有看到任何分区领导变化,主动发现任何新的经纪人或分区。 | long | 300000 | [0,…] | medium |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | medium | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | [1,…] | medium |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | [0,…] | medium |
offset.flush.interval.ms | 尝试提交任务偏移量的间隔。 | long | 60000 | medium | |
offset.flush.timeout.ms | 在取消进程并恢复要在将来尝试中提交的偏移量数据之前,等待记录刷新并分配要提交到偏移存储的偏移量数据的最大毫秒数。 | long | 5000 | medium | |
offset.storage.partitions | 创建偏移存储主题时使用的分区数 | int | 25 | [1,…] | medium |
offset.storage.replication.factor | 创建偏移存储主题时使用的复制因子 | short | 3 | [1,…] | medium |
plugin.path | 包含插件(连接器,转换器,转换)的逗号分隔的路径列表。 该列表应包含顶级目录,其中包括以下任何组合:a)立即包含jars与插件及其依赖关系的目录b)具有插件及其依赖项的uber-jars c)立即包含插件类的包目录结构的目录及其 依赖关系注意:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path = / usr / local / share / java,/ usr / local / share / kafka / plugins,/ opt / connectors | list | null | medium | |
reconnect.backoff.max.ms | 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 | long | 1000 | [0,…] | medium |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 | long | 50 | [0,…] | medium |
rest.advertised.host.name | 如果这样设置,这将被发送给其他工作人员连接到的主机名。 | string | null | medium | |
rest.advertised.port | 如果这样设置,这是将被发送给其他worker连接的端口。 | int | null | medium | |
rest.host.name | REST API的主机名。 如果这样设置,它将只绑定到这个接口。 | string | null | medium | |
rest.port | 端口用于REST API侦听。 | int | 8083 | medium | |
retry.backoff.ms | 尝试重试给定主题分区的失败请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 | long | 100 | [0,…] | medium |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径。 | string | /usr/bin/kinit | medium | |
sasl.kerberos.min.time.before.relogin | 刷新尝试之间的登录线程睡眠时间。 | long | 60000 | medium | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | medium | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将睡眠,直到从上次刷新到机票到期时间的指定窗口因子为止,此时将尝试续订ticket | double | 0.8 | medium | |
ssl.cipher.suites | 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 | list | null | medium | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | medium | |
ssl.keymanager.algorithm | 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | medium | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | medium | |
ssl.trustmanager.algorithm | 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 | string | PKIX | medium | |
status.storage.partitions | 创建状态存储主题时使用的分区数 | int | 5 | [1,…] | medium |
status.storage.replication.factor | 创建状态存储主题时使用的复制因子 | short | 3 | [1,…] | medium |
task.shutdown.graceful.timeout.ms | 等待任务正常关机的时间。 这是总时间,而不是每个任务。 所有任务已关闭触发,然后依次等待。 | long | 5000 | medium |
Kafka Streams 配置
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
application.id | 流处理应用程序的标识符。 在Kafka群集中必须是唯一的。 用作1)默认的client-id前缀,2)用于成员资格管理的group-id,3)changelog主题前缀。 | string | high | ||
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅影响用于发现完整服务器集的初始主机。 该列表应该以host1:port1,host2:port2,….的格式。由于这些服务器仅用于初始连接才能发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集 的服务器(尽管如此,您可能需要多个服务器)。 | list | high | ||
replication.factor | 流处理应用程序创建的更改日志主题和重新分区主题的复制因素。 | int | 1 | high | |
state.dir | 状态存储的位置 | string | /tmp/kafka-streams | high | |
cache.max.bytes.buffering | 用于所有线程缓冲的最大内存字节数 | long | 10485760 | [0,…] | medium |
client.id | 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 | string | “” | medium | |
default.key.serde | 实现接口serde的key的默认的序列化。 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde | medium | |
default.timestamp.extractor | 实现TimestampExtractor接口的默认时间戳提取器类。。 | class | org.apache.kafka.streams.processor.FailOnInvalidTimestamp | medium | |
default.value.serde | 用于实现Serde接口的值的默认serializer / deserializer类。 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde | medium | |
num.standby.replicas | 每个任务的备用副本数。 | int | 0 | medium | |
num.stream.threads | 执行流处理的线程数。 | int | 1 | medium | |
processing.guarantee | 处理程序使用保证。 可能的值为at_least_once(默认)和exact_once。 可能的值为at_least_once(默认)和exact_once。 | string | at_least_once | [at_least_once, exactly_once] | medium |
security.protocol | 用于与brokers沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 | string | PLAINTEXT | medium | |
application.server | 主机:端口对指向嵌入式用户定义的端点,可用于发现单个KafkaStreams应用程序中状态存储的位置 | string | “” | medium | |
buffered.records.per.partition | 每个分区缓存的最大记录数。 | int | 1000 | medium | |
commit.interval.ms | 用于保存处理器位置的频率。 (注意,如果’processing.guarantee’设置为’exact_once’,默认值为100,否则默认值为30000。 | long | 30000 | medium | |
connections.max.idle.ms | 在此配置指定的毫秒数之后关闭空闲连接。 | long | 540000 | medium | |
key.serde | Serializer / deserializer类用于实现Serde接口的密钥。 此配置已被弃用,请改用default.key.serde | class | null | medium | |
metadata.max.age.ms | 以毫秒为单位的时间段,强制更新元数据,即使没有看到任何分区领导变化或者发现任何新的broker或分区。 | long | 300000 | [0,…] | medium |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | low | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | [1,…] | low |
metrics.recording.level | 指标的最高记录级别。 | string | INFO | [1,…] | [INFO, DEBUG] |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | low | |
partition.grouper | 实现PartitionGrouper接口的Partition grouper类。 | class | org.apache.kafka.streams.processor.DefaultPartitionGrouper | low | |
poll.ms | 以毫秒为单位等待输入的时间量。 | long | 100 | low | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 32768 | [0,…] | low |
reconnect.backoff.max.ms | 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 | long | 1000 | [0,…] | low |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 | long | 50 | [0,…] | low |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 | int | 40000 | [0,…] | low |
retry.backoff.ms | 尝试重试给定主题分区的失败请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 | long | 100 | [0,…] | low |
rocksdb.config.setter | 一个Rocks DB配置setter类或类名实现了RocksDBConfigSetter接口 | class | null | low | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 131072 | [0,…] | low |
state.cleanup.delay.ms | 在分区迁移之前删除状态之前等待的时间(毫秒)的时间量。 至少state.cleanup.delay.ms尚未修改的状态目录将被删除 | long | 600000 | low | |
timestamp.extractor | Timestamp提取器类,实现Timestamp Extractor接口。 此配置已弃用,请改用default.timestamp.extractor | class | null | low | |
value.serde | Serializer / deserializer类,用于实现Serde接口的值。 此配置已被弃用,请改用default.value.serde | class | null | low | |
windowstore.changelog.additional.retention.ms | 添加到Windows系统维护Ms以确保数据未被过早地从日志中删除。 允许时钟漂移 默认为1天 | long | 86400000 | low | |
zookeeper.connect | Zookeeper连接字符串用于Kafka主题管理。 此配置已被弃用,将被忽略,因为Streams API不再使用Zookeeper。 | string | low |
AdminClient 配置
以下是 Kafka Admin 客户端库的配置:
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口对列表。 客户端将使用所有服务器,无论这里指定哪些服务器进行引导 - 此列表仅影响用于发现完整服务器集的初始主机。 该列表应该以host1:port1,host2:port2,….的格式。由于这些服务器仅用于初始连接才能发现完整的集群成员资格(可能会动态更改),因此此列表不需要包含完整集 的服务器(尽管如此,您可能需要多个服务器)。 | list | high | ||
ssl.key.password | 密钥存储文件中私钥的密码。 这对于客户端是可选的 | password | null | high | |
ssl.keystore.location | 密钥存储文件中私钥的密码。 这对于客户端是可选的 | string | null | high | |
ssl.keystore.password | 密钥存储文件的存储密码。 这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 | password | null | high | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | high | |
ssl.truststore.password | 信任存储文件的密码。 如果未设置密码,对信任库的访问仍然可用,但是完整性检查被禁用。 | password | null | high | |
client.id | 在发出请求时传递给服务器的id字符串。 这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪超出ip / port的请求源。 | string | “” | high | |
connections.max.idle.ms | 在此配置指定的毫秒数之后关闭空闲连接。 | long | 300000 | high | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 65536 | [-1,…] | medium |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。 如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 | int | 120000 | [0,…] | medium |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。 这里描述JAAS配置文件格式。 该值的格式为:’(=)*;’ | int | 120000 | [0,…] | medium |
sasl.mechanism | SASL机制用于客户端连接。 这可能是安全提供者可用的任何机制。 GSSAPI是默认机制。 | string | GSSAPI | medium | |
security.protocol | 用于与broker沟通的协议。 有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 | string | PLAINTEXT | medium | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。 如果值为-1,则将使用OS默认值。 | int | 131072 | [-1,…] | medium |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | medium | |
ssl.keystore.type | 密钥存储文件的文件格式。 这对于客户端是可选的。 | string | JKS | medium | |
ssl.protocol | 用于生成SSLContext的SSL协议。 默认设置是TLS,这对大多数情况都是适用的。 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 | string | TLS | medium | |
ssl.provider | 用于SSL连接的安全提供程序的名称。 默认值是JVM的默认安全提供程序。 | string | null | medium | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | medium | |
metadata.max.age.ms | 以毫秒为单位的时间段,强制更新元数据,即使没有看到任何分区领导变化,主动发现任何新的broekr或分区。 | long | 300000 | [0,…] | low |
metric.reporters | 用作指标记录员的类的列表。 实现MetricReporter接口允许插入将被通知新的度量标准创建的类。 JmxReporter始终包含在注册JMX统计信息中。 | list | “” | low | |
metrics.num.samples | 维护的样本数量用于计算度量。 | int | 2 | [1,…] | low |
metrics.recording.level | 指标的最高记录级别。 | string | INFO | [INFO, DEBUG] | low |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | [0,…] | low |
reconnect.backoff.max.ms | 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。 如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 | long | 1000 | [0,…] | low |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。 这避免了在紧密循环中重复连接到主机。 此退货适用于客户端对代理的所有连接尝试。 | long | 50 | [0,…] | low |
retries | 在失败之前重试呼叫的最大次数。 | int | 5 | [0,…] | low |
retry.backoff.ms | 尝试重试失败的请求之前等待的时间。 这样可以避免在某些故障情况下以严密的循环重复发送请求。 | long | 100 | [0,…] | low |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径。 | string | /usr/bin/kinit | low | |
sasl.kerberos.min.time.before.relogin | 刷新尝试之间的登录线程睡眠时间。 | long | 60000 | low | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | low | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将睡眠,直到从上次刷新到tickets到期时间的指定窗口因子为止,此时将尝试续订tickets。 | double | 0.8 | low | |
ssl.cipher.suites | 密码套件列表。 这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 | list | null | low | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | low | |
ssl.keymanager.algorithm | 密钥管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | low | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | low | |
ssl.trustmanager.algorithm | 信任管理器工厂用于SSL连接的算法。 默认值是为Java虚拟机配置的信任管理器工厂算法。 | string | PKIX | low |