JMX与kafka监控(一)

1.前言

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来进行监控数据的调用获取,从而完成自己的监控平台。
关于监控指标的描述

2.JMX简介

JMX的全称为Java Management Extensions。 顾名思义,是管理Java的一种扩展。这种机制可以方便的管理、监控正在运行中的Java程序。常用于管理线程,内存,日志Level,服务重启,系统环境等。

JMX架构

jmx_architecture

从图中可以看到,JMX的结构一共分为三层:

1、基础层:主要是MBean,被管理的资源。

基础层包括了一系列的接口定义和属性描述,通常JMX所管理的资源有一个或多个MBean组成,因此这个资源可以是任何由Java语言开发的组件。

MBean:是Managed Bean的简称,可以翻译为“管理构件”。在JMX中MBean代表一个被管理的资源实例,通过MBean中暴露的方法和属性,外界可以获取被管理的资源的状态和操纵MBean的行为。事实上,MBean就是一个Java Object,同JavaBean模型一样,外界可获取Object的值和使用反射来调用Object的方法,只是MBean更为复杂和高级一些。MBean通过公共方法以及遵从特定的设计模式封装了属性和操作,以便暴露给管理应用程序。例如,一个只读属性在管理构件中只有Get方法,既有Get又有Set方法表示是一个可读写的属性。MBean分为如下四种,接下来主要介绍standard MBean。

类型 描述
standard MBean 这种类型的MBean最简单,它能管理的资源(包括属性,方法,时间)必须定义在接口中,然后MBean必须实现这个接口。它的命名也必须遵循一定的规范,例如我们的MBean为Hello,则接口必须为HelloMBean。
dynamic MBean 必须实现javax.management.DynamicMBean接口,所有的属性,方法都在运行时定义
open MBean 此MBean的规范还不完善,正在改进中
model MBean 与标准和动态MBean相比,你可以不用写MBean类,只需使用javax.management.modelmbean.RequiredModelMBean即可。RequiredModelMBean实现了ModelMBean接口,而ModelMBean扩展了DynamicMBean接口,因此与DynamicMBean相似,Model MBean的管理资源也是在运行时定义的。与DynamicMBean不同的是,DynamicMBean管理的资源一般定义在DynamicMBean中(运行时才决定管理那些资源),而model MBean管理的资源并不在MBean中,而是在外部(通常是一个类),只有在运行时,才通过set方法将其加入到model MBean中。

2、代理层:MBeanServer,主要是提供对资源的注册和管理。

MBeanServer: MBean生存在一个MBeanServer中。MBeanServer管理这些MBean,并且代理外界对它们的访问。并且MBeanServer提供了一种注册机制,使得外界可以通过名字来得到相应的MBean实例。

Agent层管理相应的MBean资源,并且为远端用户提供访问的接口。Agent层主要定义了各种服务以及通信模型。该层的核心是MBeanServer,所有的MBean都要向它注册,才能被管理。注册在MBeanServer上的MBean并不直接和远程应用程序进行通信,他们通过协议适配器(Adapter)和连接器(Connector)进行通信。通常Agent层由一个MBeanServer和多个系统服务组成,JMX Agent并不关心它所管理的资源是什么。

3、接入层:提供远程访问的入口。

接入层关心Agent如何被远端用户访问的细节。它定义了一系列用来访问Agent的接口和组件,包括Adapter和Connector的描述。

如果一个Java对象可以由一个遵循JMX规范的管理器应用管理,那么这个Java对象就可以由JMX管理资源。要使一个Java对象可管理,则必须创建相应的MBean对象,并通过这些MBean对象管理相应的Java对象。当拥有MBean类后,需要将其实例化并注册到MBeanServer上。

简单JMX Server Demo

一个MBean定义一个接口,而且这个接口的名字必须是其被管理的资源的对象类的名称后面加上”MBean”。

1
2
3
4
5
6
public interface HelloMBean {
public String getName();
public void setName(String name);
public void printHello();
public void printHello(String whoName);
}

Hello是实现MBean接口的类,然后将其注册到MBeanServer中就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Hello implements HelloMBean {
private String name;

public Hello() {
}

public Hello(String name) {
this.name = name;
}

@Override
public String getName() {
return name;
}

@Override
public void setName(String name) {
this.name = name;
}

@Override
public void printHello() {
System.out.println("Hello world, "+ name);
}

@Override
public void printHello(String whoName) {
System.out.println("Hello, "+whoName);
}
}

注册MBean,启动 connector。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class SimpleJmxServer {
private static final String NAME = "HelloWorld";
private static final int JMX_PORT = 9999;
private static final String DOMAIN_NAME = "jmxrmi";

public static void main(String[] args) throws MalformedObjectNameException,
NotCompliantMBeanException, InstanceAlreadyExistsException,
MBeanRegistrationException, IOException {


// 首先建立一个MBeanServer,MBeanServer用来管理我们的MBean,通常是通过MBeanServer来获取我们MBean的信息,
// 间接调用MBean的方法,然后生产我们的资源的一个对象。
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();

Hello hello = new Hello(NAME);
ObjectName helloName = new ObjectName(DOMAIN_NAME + ":name=" + NAME);

// 将 hello 这个对象注册到MBeanServer上去
mbs.registerMBean(hello, helloName);

Registry registry = LocateRegistry.createRegistry(JMX_PORT);

JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + JMX_PORT + "/" + DOMAIN_NAME);
JMXConnectorServer jmxConnector = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
jmxConnector.start();

// 使用 jconsole 连接 service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi
// 或直接使用 jconsole 连接 localhost:9999
}
}

使用JConsole查看注册到JMX的MBean。
jmx_jconsole_server_demo

JVM参数启动JMX

因为java默认自带的了JMX RMI的连接器。所以,只需要在启动java程序的时候带上运行参数,就可以开启Agent的RMI协议的连接器。

1
-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.port=9999

JMX Client Demo

也可以通过API的方式,来连接JMXConnectorServer管理MBean.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class SimpleJmxClient {

private static final int JMX_PORT = 9999;
private static final String DOMAIN_NAME = "jmxrmi";

public static void main(String[] args) throws IOException,
MalformedObjectNameException, InstanceNotFoundException,
AttributeNotFoundException, InvalidAttributeValueException,
MBeanException, ReflectionException, IntrospectionException {


JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:" + JMX_PORT + "/" + DOMAIN_NAME);
JMXConnector jmxc = JMXConnectorFactory.connect(url);
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();

//print domains
System.out.println("----------Domains----------");
String domains[] = mbsc.getDomains();
for (int i = 0; i < domains.length; i++) {
System.out.println("Domain[" + i + "] = " + domains[i]);
}

//MBean count
System.out.println("----------MBeans Count----------");
System.out.println("MBean count: " + mbsc.getMBeanCount());

//process attribute
System.out.println("----------process attribute----------");
ObjectName mBeanName = new ObjectName(DOMAIN_NAME + ":name=HelloWorld");
mbsc.setAttribute(mBeanName, new Attribute("Name", "test_mbean_name"));//注意这里是Name而不是name
System.out.println("Name:" + mbsc.getAttribute(mBeanName, "Name"));

// 通过代理,反射调用Hello中的printHello方法
System.out.println("----------proxy----------");
HelloMBean proxy = MBeanServerInvocationHandler.newProxyInstance(mbsc, mBeanName, HelloMBean.class, false);
proxy.printHello();
proxy.printHello("proxy name");

// 通过rmi的方式执行,反射调用
mbsc.invoke(mBeanName, "printHello", null, null);
mbsc.invoke(mBeanName, "printHello", new String[]{"proxy name2"}, new String[]{String.class.getName()});

//get mbean information
System.out.println("----------mbean information----------");
MBeanInfo info = mbsc.getMBeanInfo(mBeanName);
System.out.println("Hello Class: " + info.getClassName());
for (int i = 0; i < info.getAttributes().length; i++) {
System.out.println("Hello Attribute:" + info.getAttributes()[i].getName());
}
for (int i = 0; i < info.getOperations().length; i++) {
System.out.println("Hello Operation:" + info.getOperations()[i].getName());
}

//ObjectName of MBean
System.out.println("----------all ObjectName----------");
Set<ObjectInstance> set = mbsc.queryMBeans(null, null);
for (Iterator<ObjectInstance> it = set.iterator(); it.hasNext(); ) {
ObjectInstance oi = it.next();
System.out.println(oi.getObjectName());
}
jmxc.close();
}
}

3.kafka JMX

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下:

1
2
3
4
5

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi

可以看到jmx_port信息注册到zk结点。
jmx_kafka_zk_node

JConsole监控kafka

通过Jconsole连接
jmx_jconsole_kafka_connect
jmx_jconsole_kafka

API监控kafka

Kafka Mbean定义参考:http://kafka.apache.org/documentation.html#monitoring

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class kafkaMonitor {
private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";
private static final String BYTES_REJECTED_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesRejectedPerSec";
private static final String FAILED_FETCH_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedFetchRequestsPerSec";
private static final String FAILED_PRODUCE_REQUESTS_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=FailedProduceRequestsPerSec";
private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";

private static final String JXM_URL = "service:jmx:rmi:///jndi/rmi://172.18.55.21:9999/jmxrmi";

public void extractMonitorData() {
try {
MBeanServerConnection jmxConnection = this.getMBeanServerConnection(JXM_URL);

ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);
ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);
ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);
ObjectName bytesRejectedPerSecObj = new ObjectName(BYTES_REJECTED_PER_SEC);
ObjectName failedFetchRequestsPerSecObj = new ObjectName(FAILED_FETCH_REQUESTS_PER_SEC);
ObjectName failedProduceRequestsPerSecObj = new ObjectName(FAILED_PRODUCE_REQUESTS_PER_SEC);
ObjectName produceRequestPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);

printObjectNameDetails(messageCountObj, "Messages in /sec", jmxConnection);
printObjectNameDetails(bytesInPerSecObj, "Bytes in /sec", jmxConnection);
printObjectNameDetails(bytesOutPerSecObj, "Bytes out /sec", jmxConnection);
printObjectNameDetails(bytesRejectedPerSecObj, "Bytes rejected /sec", jmxConnection);
printObjectNameDetails(failedFetchRequestsPerSecObj, "Failed fetch request /sec", jmxConnection);
printObjectNameDetails(failedProduceRequestsPerSecObj, "Failed produce request /sec", jmxConnection);
printObjectNameDetails(produceRequestPerSecObj, "Produce request in /sec", jmxConnection);
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
new kafkaMonitor().extractMonitorData();
}

/**
* 获得 MBeanServer 的连接
*
* @param jmxUrl
* @return
* @throws IOException
*/

private MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {
JMXServiceURL url = new JMXServiceURL(jmxUrl);
JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
return mbsc;
}

/**
* 打印 ObjectName 对象详细信息
* @param objectName
* @param printTitle
* @param jmxConnection
*/

private void printObjectNameDetails(ObjectName objectName, String printTitle, MBeanServerConnection jmxConnection) {
try {
System.out.println("----------"+ printTitle +"----------");
System.out.println("TotalCount: " + (Long) jmxConnection.getAttribute(objectName, "Count"));
System.out.println("MeanRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "MeanRate")));
System.out.println("OneMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "OneMinuteRate")));
System.out.println("FiveMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FiveMinuteRate")));
System.out.println("FifteenMinuteRate: " + String.format("%.2f", (double) jmxConnection.getAttribute(objectName, "FifteenMinuteRate")));
} catch (Exception e) {
e.printStackTrace();
}
}
}

API监控结果

运行程序,可以看到打印如下信息:
分别是消息接收情况统计,bytes接收统计,bytes输出统计,bytes拒绝统计,失败拉取请求统计,失败生产消息统计,生产消息统计。

每一项统计均包括总量统计,平均速率,最近一分钟速率,最近5分钟速率,最近15分钟速率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
----------Messages in /sec----------
TotalCount: 106883
MeanRate: 1.30
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Bytes in /sec----------
TotalCount: 8795044
MeanRate: 107.26
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Bytes out /sec----------
TotalCount: 8763647
MeanRate: 106.88
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Bytes rejected /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Failed fetch request /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Failed produce request /sec----------
TotalCount: 0
MeanRate: 0.00
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00
----------Produce request in /sec----------
TotalCount: 100000
MeanRate: 1.22
OneMinuteRate: 0.00
FiveMinuteRate: 0.00
FifteenMinuteRate: 0.00

kafka 监控平台

Kafka-manager使用JMX与Kafka集群进行通信,获取统计监控信息。可以看到监控信息正是刚刚使用JAVA API获取的监控信息。
jmx_kafka_manager

4.参考链接

  1. https://github.com/apache/kafka/
  2. http://kafka.apache.org/
  3. http://kafka.apache.org/documentation.html#monitoring