Flink 代码跑在 IDEA 里,尝试连接 Kafka?Kafaka 装在 WSL2 中,怎么连接?

📅 2026/6/21 4:49:32
Flink 代码跑在 IDEA 里,尝试连接 Kafka?Kafaka 装在 WSL2 中,怎么连接?
Flink 代码跑在 IDEA 里尝试连接 KafkaKafaka 装在 WSL2 中怎么连接连接 WSL2 里的 Kafka关键在于解决网络连通性问题。你的 Flink 代码跑在 IDEA 里Windows 上的 IDE但实际执行在 WSL2 的 Java 环境所以需要找到一个 WSL2 和 Windows 都能访问的地址来配置 Kafka。这里给你一个最稳妥、能一次成功的完整操作流程1. 获取 WSL2 的正确 IP 地址最关键一步千万不要用localhost因为 WSL2 和 Windows 的网络命名空间不同直接写localhost:9092大概率会连接失败。在WSL2 终端里执行这个命令拿到你的 WSL2 网卡 IPbaship addr | grep eth0你会看到类似172.x.y.z这样的地址记下它这就是客户端我们的 Flink 程序要连接的 Kafka Broker 地址。2. 修改你的 Kafka 配置让它监听这个 IP去你的 Kafka 安装目录比如/usr/local/kafka/config/server.properties找到并修改advertised.listeners属性properties# 将这行配置的地址改为你的 WSL2 IP advertised.listenersPLAINTEXT://172.x.y.z:9092advertised.listeners这个参数决定了Kafka 告诉客户端Flink去哪里找自己不设对的话即使 Flink 找到了门Kafka 也会指错路。然后重启 Kafka让配置生效。3. 在 Flink 项目里添加 Kafka 连接器依赖在你的pom.xml的dependencies里加上这个。你用的是 Flink 1.15flink-connector-kafka的版本一般是3.0.0或3.1.0建议和你的 Flink 版本保持一致。xml!-- Kafka 连接器依赖 -- dependency groupIdorg.apache.flink/groupId artifactIdflink-connector-kafka/artifactId version3.0.0-1.17/version !-- 如果 Flink 是 1.15试试 3.0.0-1.17 或 3.1.0-1.18 -- /dependency4. 用 Flink 写一个消费者程序测试连接用下面的代码把bootstrap.servers换成你刚才记下的WSL2 IP主题名换成你 Kafka 里有的主题。这段代码是连接 Kafka 并消费数据的最小单元。javaimport org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class KafkaFlinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); // 1. 用你的 WSL2 IP 地址替换 [WSL2_IP] String bootstrapServers PLAINTEXT://172.x.y.z:9092; String topicName 你的主题名; String consumerGroup flink-test-group; // 2. 创建 Kafka Source (推荐使用较新的 KafkaSource 构建器) KafkaSourceString source KafkaSource.Stringbuilder() .setBootstrapServers(bootstrapServers) .setTopics(topicName) .setGroupId(consumerGroup) .setStartingOffsets(OffsetsInitializer.latest()) // 从最新位置开始读 .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); // 3. 将 Source 添加到环境并打印输出 env.fromSource(source, WatermarkStrategy.noWatermarks(), Kafka Source) .print(Kafka Output); env.execute(Flink Kafka Connector Test); } }如果连接失败还有个后备方案有些版本的 WSL2 网络配置比较复杂如果直接连 IP 还是失败可以考虑在 Windows 端管理员权限配置一个端口转发让 Windows 的localhost:9092指向 WSL2 的9092端口cmdnetsh interface portproxy add v4tov4 listenport9092 listenaddress0.0.0.0 connectport9092 connectaddress172.x.y.z这样配置后你的 Flink 程序里就可以先用localhost:9092来尝试连接了。 操作顺序总结获取 IP在 WSL2 里执行ip addr | grep eth0。改 Kafka 配置修改advertised.listeners为这个 IP重启 Kafka。加依赖在你的pom.xml里添加flink-connector-kafka。运行测试代码把bootstrap.servers改成这个 IP运行你的 Flink 消费者程序。先用这个“硬核”的 IP 方式跑通之后再慢慢优化。先去试一下如果遇到报错把错误信息贴给我我帮你分析是网络问题、配置问题还是依赖版本问题