Kafka性能测试(二)

前言

前一篇博客介绍了Kafka的性能测试方案以及测试结果。
Kafka性能测试

Kafka的性能瓶颈处于网络I/O和磁盘I/O。在网络I/O方面,如今服务器大部分是千兆网卡甚至万兆网卡,当千兆网卡时,1000M bits / s,即实时吞吐量可以达到125M/s;而万兆网卡,实时吞吐量则可以达到1250M/s。可以看到万兆网卡服务器,网络I/O基本不存在瓶颈,这时Kafka的性能瓶颈大部分则在磁盘I/O上。上一篇博客进行的性能测试,其实是进行了两轮,第一轮集群采用了万兆网卡、机械磁盘,网络I/O无瓶颈,磁盘I/O性能较差。测试之后吞吐量,TPS均不高,明显瓶颈在磁盘I/O,吞吐量上限也就200-300M/s。第二轮测试,换了NAS磁盘,性能明显提升,网络方面依然是万兆网卡。测试之后,集群吞吐量在600-700M/s,但无法持久,因为关闭了Kafka broker数据落盘/刷盘机制,交由操作系统自行决定什么时候进行数据刷盘。在测试过程中,前一分钟或半分钟数据不断累加,均存储在内存中,因此吞吐量极高,峰值可达到700M/s+。后续开始数据刷盘,吞吐量降低,至400M/s左右。

总结来说,由于目前服务器环境网络I/O基本无瓶颈,因此Kafka的性能瓶颈基本在于磁盘I/O,磁盘I/O有多快,Kafka性能就有多快。

Kafka Producer 性能测试代码

maven依赖

pom.xml文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${version.0.11.0.0}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>prepare-package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

Constants类–常量类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.unionpay.kafka.test.common;

/**
* date: 2017/07/26 10:31.
* author: Yueqi Shi
*/

public class Constants {
public static final String PROPERTIES_FILE_NAME = "producer.properties";

public static final String TOPIC_KEY = "topic";

public static final String BROKERS_ADDRESS_KEY = "bootstrap.servers";

public static final String ACKS_KEY = "acks";

public static final String CLIENT_ID_KEY = "client.id";

public static final String KEY_SERIALIZER_CLASS_KEY = "key.serializer";

public static final String VALUE_SERIALIZER_CLASS_KEY = "value.serializer";

public static final String BUFFER_MEMORY_KEY = "buffer.memory";

public static final String BATCH_SIZE_KEY = "batch.size";

public static final String LINGER_MS_KEY = "linger.ms";


public static final String DEFAULT_KEY_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer";

public static final String DEFAULT_VALUE_SERIALIZER_CLASS = "org.apache.kafka.common.serialization.StringSerializer";

public static final String DEFAULT_ACKS = "1";

public static final String DEFAULT_BRAOKERS_ADDRESS_KEY = "172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092";

public static final String DEFAULT_CLIENT_ID = "producer_test_id";

public static final Long DEFAULT_BUFFER_MEMORY = 32 * 1024 * 1024L;

public static final Integer DEFAULT_BATCH_SIZE = 16384;

public static final Integer DEFAULT_LINGER_MS = 0;
}

AbstractTest类–抽象并发压测类

该类主要作为并发类存在,启动主方法时,需传入三个参数,分别是threadCount(并发线程数量),loopCount(每个线程循环次数),bytesCount(单词调用消息体大小)。

定义了抽象方法如下,由子类实现,从而完成业务逻辑调用。

1
public abstract void exec(CountDownLatch startLatch, CountDownLatch endLatch);

完成类代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.unionpay.kafka.test.common;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* date: 2016/10/19 10:25.
* author: Yueqi Shi
*/

public abstract class AbstractTest {

public static final CountDownLatch startLatch = new CountDownLatch(1);
public static CountDownLatch endLatch;

/**
* 每个thread调用次数
*/

private int loopCount;

/**
* thread数量
*/

private int threadCount;

/**
* 消息bytes大小
*/

private int bytesCount;

public AbstractTest(String[] args) {
if(args == null || args.length != 3) {
throw new IllegalArgumentException("Input paramters's size must be 3 : loopCount, threadCount, bytesCount");
}

this.threadCount = Integer.parseInt(args[0]);
this.loopCount = Integer.parseInt(args[1]);
this.bytesCount = Integer.parseInt(args[2]);
endLatch = new CountDownLatch(threadCount);
}

public abstract void exec(CountDownLatch startLatch, CountDownLatch endLatch);

public int getLoopCount() {
return loopCount;
}

public int getThreadCount() {
return threadCount;
}

public int getBytesCount() {
return bytesCount;
}

public void start() {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
for (int j = 0; j < threadCount; j++) {
executor.submit(new Runnable() {
public void run() {
exec(AbstractTest.startLatch, AbstractTest.endLatch);
}
});
}


try {
startLatch.countDown();
long startTime = System.currentTimeMillis();

endLatch.await();

long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
double tps = threadCount * loopCount * 1000.0 / totalTime;
double throughput = tps * bytesCount / (1024 * 1024);

System.out.println("benchmark result:[threadCount: " + threadCount
+ ", execute per thread: " + loopCount + ", total execute:" + threadCount * loopCount
+ ", startTime: " + startTime + ", endTime: " + endTime + ", total time: " + totalTime + "ms"
+ ", tps:" + tps + ", throughput: " + throughput+ "]");

executor.shutdown();

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

}
}

PropertiesUtil类–参数工具类

该类由加载指定properties配置文件,返回Properties对象。而在工程的resources路径下定义了producer.properties文件,主要存储producer相关配置信息。

PropertiesUtil类完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.unionpay.kafka.test.util;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URL;
import java.util.Enumeration;
import java.util.Properties;

/**
* date: 2017/07/26 10:30.
* author: Yueqi Shi
*/

public class PropertiesUtil {
public static Properties getProperties(String propertiesName) {
Properties properties = new Properties();
try {
Enumeration<URL> urls = null;
urls = Thread.currentThread().getContextClassLoader().getResources(propertiesName);
URL url = null;

while(urls.hasMoreElements()) {
url = urls.nextElement();
if(url != null) {
break;
}
}

properties.load(url.openStream());
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return properties;
}
}

producer.properties

producer.properties文件内容如下:

1
2
3
4
5
6
bootstrap.servers=172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092
topic=topic_log_replica_6_1
acks=1
buffer.memory=67108864
batch.size=819600
linger.ms=1

参数说明:

  • bootstrap.servers kafka brokers集群地址。
  • topic producer发送消息topic名称。根据性能测试条件,传入不同topic,如topic_log_replica_6_1表示,partitions为6,replica为1。
  • acks producer发送消息,接收到acks数量,即认为消息已送达。acks可配置值为0,1,all。
  • buffer.memory 发送队列buffer所占用内存大小
  • batch.size 批量发送大小
  • linger.ms 批量发送时间间隔?

Producer类–Kafka producer 类

该类首先加载producer.properties配置文件数据,然后初始化KafkaProducer对象,并封装Kafka producer 相关api。Producer类完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.unionpay.kafka.test;

import com.unionpay.kafka.test.common.Constants;
import com.unionpay.kafka.test.util.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
* date: 2017/07/03 16:53.
* author: Yueqi Shi
*/

public class Producer {
private String topic;
private KafkaProducer<String, String> producer;

public Producer() {
producer = new KafkaProducer<String, String>(buildProperties());

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":[Kafka producer graceful shutdown]");
Producer.this.shuntown();
}
});
}

private Properties buildProperties() {
Properties props = PropertiesUtil.getProperties(Constants.PROPERTIES_FILE_NAME);
Properties properties = new Properties();

properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty(Constants.BROKERS_ADDRESS_KEY, Constants.DEFAULT_BRAOKERS_ADDRESS_KEY));
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, props.getProperty(Constants.KEY_SERIALIZER_CLASS_KEY, Constants.DEFAULT_KEY_SERIALIZER_CLASS));
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, props.getProperty(Constants.VALUE_SERIALIZER_CLASS_KEY, Constants.DEFAULT_VALUE_SERIALIZER_CLASS));
properties.put(ProducerConfig.ACKS_CONFIG, props.getProperty(Constants.ACKS_KEY, Constants.DEFAULT_ACKS));
properties.put(ProducerConfig.CLIENT_ID_CONFIG, props.getProperty(Constants.CLIENT_ID_KEY, Constants.DEFAULT_CLIENT_ID));
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, props.getProperty(Constants.BUFFER_MEMORY_KEY, Constants.DEFAULT_BUFFER_MEMORY.toString()));
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, props.getProperty(Constants.BATCH_SIZE_KEY, Constants.DEFAULT_BATCH_SIZE.toString()));
properties.put(ProducerConfig.LINGER_MS_CONFIG, props.getProperty(Constants.LINGER_MS_KEY, Constants.DEFAULT_LINGER_MS.toString()));

this.topic = props.getProperty(Constants.TOPIC_KEY);

return properties;
}

/**
* shuntown
*/

public void shuntown() {
if (null != producer) {
producer.close();
producer = null;
}
}

public boolean send(String msg) {
if (msg == null || msg.equals("")) {
System.out.println("[SndSmsPoolProducer] Message could not be empty!");
return false;
}

try {
producer.send(new ProducerRecord<String, String>(this.topic, msg)).get();
return true;
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("produce: " + msg + " fail.");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("produce: " + msg + " fail.");
}

return false;
}

public boolean send(String partitionKey, String msg) {
if (msg == null || msg.equals("")) {
System.out.println("Producer Message could not be empty!");
return false;
}

try {
producer.send(new ProducerRecord<String, String>(this.topic, partitionKey, msg)).get();
return true;
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("produce: " + msg + " fail.");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("produce: " + msg + " fail.");
}

return false;
}
}

BenchmarkMain类-性能测试主类

该类作为性能测试的入口类,继承抽象并发类AbstractTest,并实现exec()方法。exec()方法内,首先根据传入的bytesCount,构造待传入的消息字符串。然后根据传入的loopCount,循环调用Producer类的send方法。

而在main方法中,即程序启动时,首先调用BenchmarkMain类的构造方法,然后调用父类AbstractTest的start方法,提交多线程并发。该类完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package com.unionpay.kafka.test;

import com.unionpay.kafka.test.common.AbstractTest;
import java.util.concurrent.CountDownLatch;

/**
* date: 2017/06/20 16:06.
* author: Yueqi Shi
*/

public class BenchmarkMain extends AbstractTest {
private static final String BYTE_SAMPLE = "A";

/**
* single benchmark test
*/

private static final Producer producer = new Producer();

private static final String PARTITION_KEY = "1";

public BenchmarkMain(String[] args) {
super(args);
}

public static void main(String[] args) throws InterruptedException {
new BenchmarkMain(args).start();
}

@Override
public void exec(CountDownLatch startLatch, CountDownLatch endLatch) {
int loopCount = super.getLoopCount();
StringBuilder messageBuilder = new StringBuilder();
for (int i = 0; i < super.getBytesCount(); i++) {
messageBuilder.append(BYTE_SAMPLE);
}
String message = messageBuilder.toString();

try {
startLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}

long startTime = System.currentTimeMillis();

for (int i = 0; i < loopCount; i++) {
producer.send(message);
}

long endTime = System.currentTimeMillis();
long totalTime = endTime - startTime;
System.out.println(Thread.currentThread().getName() + ":[tps: " + super.getLoopCount() * 1000.0 / totalTime + ", startTime: "
+ startTime + ", endTime: " + endTime + ", total time: " + totalTime + "ms]");

endLatch.countDown();
}
}

参考链接

  1. Kafka官方性能测试报告
  2. Kafka官方性能测试配置参数
  3. Kafka性能测试
  4. Kafka 最佳实践