Kafka快速开始

zookeeper配置

Zookeeper下载

下载地址:http://apache.claz.org/zookeeper/zookeeper-3.4.10/

下载成功后,执行命令,完成解压缩

1
tar -xvf zookeeper-3.4.10.tar.gz

Zookeeper配置

Zookeeper推荐使用集群模式,且推荐使用奇数台实例部署(zookeeper特性:集群中只要有过半的机器是正常工作的,那么整个集群对外就是可用的,对于集群部署2n或2n-1个zookeeper实例,对实例宕机数量的容忍度是一样的,都是n-1。所以为了更加高效和节省资源,推荐奇数台实例部署模式)。在这里,采用3台实例部署为例(ip地址为172.21.198.89,172.21.198.90,172.21.198.91)。

首先配置第一个zookeeper实例,ip地址为172.21.198.89,进入zookeeper下conf目录,配置zoo.cfg文件。配置信息如下:

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5

# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/home/ggzjs/zookeeper/zookeeper-3.4.10/data/data
dataLogDir=/home/ggzjs/zookeeper/zookeeper-3.4.10/data/log

# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
server.1=172.21.195.89:2889:3889
server.2=172.21.195.90:2889:3889
server.3=172.21.195.91:2889:3889

参数说明

  • tickTime:zookeeper中使用的基本时间单位, 毫秒值.
  • initLimit:zookeeper集群中的包含多台server, 其中一台为leader, 集群中其余的server为follower. initLimit参数配置初始化连接时, follower和leader之间的最长同步等待初始化时间. 此时该参数设置为10, 说明时间限制为10倍tickTime, 即10*2000=20000ms=20s.
  • syncLimit:该参数配置Zookeeper集群中leader实例和follower实例之间发送消息, 请求和应答的最大时间长度. 此时该参数设置为5, 说明时间限制为5倍tickTime, 即10000ms=10s.
  • dataDir:数据目录. 可以是任意目录。设置为/home/ggzjs/zookeeper/zookeeper-3.4.10/data/data,即当前zookeeper安装目录下的/data/data目录。
  • dataLogDir:log目录, 同样可以是任意目录. 如果没有设置该参数, 将使用和dataDir相同的设置。设置为/home/ggzjs/zookeeper/zookeeper-3.4.10/data/log,即当前zookeeper安装目录下的/data/log目录。
  • clientPort:监听client连接的端口号.在集群模式中,各个实例的clientPort必须不同。但在集群模式中,建议相同,均配置为2181。
  • server.X=A:B:C 其中X是一个数字, 表示这是第几号server。 A是该server所在的IP地址。 B配置该server和集群中的leader交换消息所使用的端口。 C配置选举leader时所使用的端口。由于配置的是集群模式,推荐使用相同的port。
  • myid文件:在集群模式和集群模式中,需要在刚刚设置的dataDir目录下,创建一个myid文件,输入对应唯一标识号。如刚刚设置的Zookeeper为集群中第一台,即设置myid中值为1。该数字必须和zoo.cfg文件中的server.X中的X一一对应。

    zookeeper_myid

第二台、第三台zookeeper实例配置

由于采用了3台实例部署(ip地址为172.21.198.89,172.21.198.90,172.21.198.91),且zookeeper安装路径一致,且端口采用均一致,因此配置文件完全一样。只需在dataDir路径下,新建myid文件,输入对应的唯一标识号。在这里,172.21.198.89为1,172.21.198.90为2,172.21.198.91为3。

Zookeeper启动

1
sh bin/zkServer.sh satrt

kafka配置

kafka版本

这里暂采用0.11.0.0版本

kafka下载

kafka_2.11-0.11.0.0.tgz

kafka_2.12-0.11.0.0.tgz

  • kafka_2.11-0.11.0.0.tgz 使用scala-2.11,需要JDK 7 或 JDK 8
  • kafka_2.12-0.11.0.0.tgz 使用scala-2.12,需要JDK 8
  • 推荐使用kafka_2.11-0.11.0.0.tgz

其他版本kafka下载

kafka全部介质下载地址

kafka安装

1
tar -xvf kafka_2.11-0.11.0.0.tgz

kafka配置

采用3台实例部署,ip地址为172.21.198.89,172.21.198.90,172.21.198.91。

配置第一台 kafka 实例

首先进入第一台 kafka 实例 config 目录,配置server.properties文件。其中几个最关键的参数如下,其他详细配置参见-Kafka用户手册

broker.id=0  
listeners=PLAINTEXT://172.21.195.89:9092
log.dirs=/home/ggzjs/kafka-logs
zookeeper.connect=172.21.195.89:2181,172.21.195.90:2181,172.21.195.91:2181

参数说明:

  • broker.id:表示Kafka集群中当前broker的id,需保证唯一
  • listeners:部署的broker占用ip:port
  • log.dirs:指定当前broker的logs存放地址
  • zookeeper.connect:指定Zookeeper集群。kafka 依赖Zookeeper,会将配置信息,消费者状态,topic信息等内容写入到Zookeeper结点中。

第二台,第三台 kafka实例同理配置,需指定各自的唯一broker.id,以及listeners。 若安装路径一直,log.dirs和zookeeper.connect配置,在三台实例中是相同的。

kafka启动

1
sh ./bin/kafka-server-start.sh -daemon ./config/server.properties

kafka命令

1.创建topic

1
sh kafka-topics.sh --create --topic topic_1 --partitions 2 --replication-factor 2 --zookeeper 172.21.195.89:2181,172.21.195.90:2181,172.21.195.91:2181

参数说明:

  • –topic:指定topic名称
  • –partitions:指定当前topic的partitions(分区)数量
  • –replication-factor:指定当前topic每一个partition的replica数量
  • –zookeeper:指定Zookeeper集群地址

2.查看topic列表

1
sh kafka-topics.sh --list --zookeeper 172.21.195.89:2181,172.21.195.90:2181,172.21.195.91:2181

producer demo

如下代码示例是最简单的一个Producer同步发送消息的demo。必须要配置如下参数:

  • topic
  • broker 集群地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerMain {
private static final String KAFKA_TOPIC = "test_topic";

private static final String BROKERS_ADDRESS = "172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092";

private static final int REQUEST_REQUIRED_ACKS = 1;

public static final String MESSAGE_PRE = "message_";

private KafkaProducer<String, String> producer;

public ProducerMain() {
producer = new KafkaProducer<String, String>(buildProperties());
}

private Properties buildProperties() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(REQUEST_REQUIRED_ACKS));

return props;
}

/**
* 开始生产msg
*/

public void run() {
for (int i = 0; i < 1000; i++) {
String message = ProducerMain.MESSAGE_PRE + i;

try {
producer.send(new ProducerRecord<String, String>(ProducerMain.KAFKA_TOPIC, message)).get();
System.out.println("produce: " + message + " success.");
} catch (InterruptedException e) {
e.printStackTrace();
System.out.println("produce: " + message + " fail.");
} catch (ExecutionException e) {
e.printStackTrace();
System.out.println("produce: " + message + " fail.");
}
}
}

public static void main(String[] args) {
new ProducerMain().run();
}
}

consumer demo

如下代码示例是Consumer接收消息的demo。必须要配置如下参数:

  • topic
  • broker 集群地址
  • consumer group
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Collections;
import java.util.Properties;

/**
* date: 2017/09/19 11:46.
* author: Yueqi Shi
*/

public class ConsumerMain {
private final KafkaConsumer<String, String> consumer;
private static final String KAFKA_TOPIC = "test_topic";
private static final String BROKERS_ADDRESS = "172.21.195.89:9092,172.21.195.90:9092,172.21.195.91:9092";
private static final String COUSUMER_GROUP = "test_consumer_group";

public ConsumerMain() {
consumer = new KafkaConsumer<String, String>(this.buildProperties());
}

private Properties buildProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS_ADDRESS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, COUSUMER_GROUP);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

return props;
}

public void run() {
while (true) {
consumer.subscribe(Collections.singletonList(KAFKA_TOPIC));
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println(records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset());
}
}
}

public static void main(String[] args) {
new ConsumerMain().run();
}
}