前言
前一篇博客介绍了Kafka的性能测试方案以及测试结果。
Kafka性能测试
Kafka的性能瓶颈处于网络I/O和磁盘I/O。在网络I/O方面,如今服务器大部分是千兆网卡甚至万兆网卡,当千兆网卡时,1000M bits / s,即实时吞吐量可以达到125M/s;而万兆网卡,实时吞吐量则可以达到1250M/s。可以看到万兆网卡服务器,网络I/O基本不存在瓶颈,这时Kafka的性能瓶颈大部分则在磁盘I/O上。上一篇博客进行的性能测试,其实是进行了两轮,第一轮集群采用了万兆网卡、机械磁盘,网络I/O无瓶颈,磁盘I/O性能较差。测试之后吞吐量,TPS均不高,明显瓶颈在磁盘I/O,吞吐量上限也就200-300M/s。第二轮测试,换了NAS磁盘,性能明显提升,网络方面依然是万兆网卡。测试之后,集群吞吐量在600-700M/s,但无法持久,因为关闭了Kafka broker数据落盘/刷盘机制,交由操作系统自行决定什么时候进行数据刷盘。在测试过程中,前一分钟或半分钟数据不断累加,均存储在内存中,因此吞吐量极高,峰值可达到700M/s+。后续开始数据刷盘,吞吐量降低,至400M/s左右。
总结来说,由于目前服务器环境网络I/O基本无瓶颈,因此Kafka的性能瓶颈基本在于磁盘I/O,磁盘I/O有多快,Kafka性能就有多快。
Kafka Producer 性能测试代码
maven依赖
pom.xml文件配置如下:
1 | <dependencies> |
Constants类–常量类
1 | package com.unionpay.kafka.test.common; |
AbstractTest类–抽象并发压测类
该类主要作为并发类存在,启动主方法时,需传入三个参数,分别是threadCount(并发线程数量),loopCount(每个线程循环次数),bytesCount(单词调用消息体大小)。
定义了抽象方法如下,由子类实现,从而完成业务逻辑调用。
1 | public abstract void exec(CountDownLatch startLatch, CountDownLatch endLatch); |
完成类代码如下:
1 | package com.unionpay.kafka.test.common; |
PropertiesUtil类–参数工具类
该类由加载指定properties配置文件,返回Properties对象。而在工程的resources路径下定义了producer.properties文件,主要存储producer相关配置信息。
PropertiesUtil类完整代码如下:
1 | package com.unionpay.kafka.test.util; |
producer.properties
producer.properties文件内容如下:
1 | bootstrap.servers=172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092 |
参数说明:
- bootstrap.servers kafka brokers集群地址。
- topic producer发送消息topic名称。根据性能测试条件,传入不同topic,如topic_log_replica_6_1表示,partitions为6,replica为1。
- acks producer发送消息,接收到acks数量,即认为消息已送达。acks可配置值为0,1,all。
- buffer.memory 发送队列buffer所占用内存大小
- batch.size 批量发送大小
- linger.ms 批量发送时间间隔?
Producer类–Kafka producer 类
该类首先加载producer.properties配置文件数据,然后初始化KafkaProducer对象,并封装Kafka producer 相关api。Producer类完整代码如下:
1 | package com.unionpay.kafka.test; |
BenchmarkMain类-性能测试主类
该类作为性能测试的入口类,继承抽象并发类AbstractTest,并实现exec()方法。exec()方法内,首先根据传入的bytesCount,构造待传入的消息字符串。然后根据传入的loopCount,循环调用Producer类的send方法。
而在main方法中,即程序启动时,首先调用BenchmarkMain类的构造方法,然后调用父类AbstractTest的start方法,提交多线程并发。该类完整代码如下:
1 | package com.unionpay.kafka.test; |