当前位置: 首页> 汽车> 报价 > Kafka环境搭建

Kafka环境搭建

时间:2025/7/12 14:23:56来源:https://blog.csdn.net/duke_ding2/article/details/141085826 浏览次数: 0次

文章目录

  • 环境
  • 准备
    • Java
    • Kafka
  • 启动
  • 生产和消费message
  • 使用Kafka Connect来导入导出数据
  • 参考

环境

  • RHEL 9.4
  • Java 21.0.4
  • Kafka 2.13

准备

Java

https://www.oracle.com/cn/java/technologies/downloads/ 处下载Java。我下载的是 jdk-21_linux-x64_bin.tar.gz

解压:

tar -zxvf jdk-21_linux-x64_bin.tar.gz

修改 ~/.bashrc

export PATH=/home/ding/Downloads/jdk-21.0.4/bin:$PATH
export JAVA_HOME=/home/ding/Downloads/jdk-21.0.4

保存退出,并source一下:

. ~/.bashrc

Kafka

https://kafka.apache.org/quickstart 处点击 Download 链接,打开下载页面,然后下载Kafka。我下载的是 kafka_2.13-3.7.1.tgz

解压:

tar -zxvf kafka_2.13-3.7.1.tgz

启动

Kafka可使用ZooKeeper或KRaft启动,本例中使用的是ZooKeeper(ZooKeeper已包含在Kafka下载包里)。

启动ZooKeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

启动Kafka(打开另一个窗口):

bin/kafka-server-start.sh config/server.properties

生产和消费message

创建topic quickstart-events

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

查看topic:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events	TopicId: 02qO77BpQUShHMMpEtlJfA	PartitionCount: 1	ReplicationFactor: 1	Configs: Topic: quickstart-events	Partition: 0	Leader: 0	Replicas: 0	Isr: 0

生产message:

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

接下来就可以交互式的生产message,比如:

>This is my first event
>This is my second event

最后按Ctrl + C退出。

消费message:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

可见,收到了刚才生产的两条消息。

最后按Ctrl + C退出,显示:

^CProcessed a total of 2 messages

再次消费message:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

可见,之前已被消费的message还在,并没有从消息队列移除。

同样,如果同时有多个消费者,它们都能收到message。打开多个终端,并运行上面的命令。

新打开一个终端,并生产message:

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

输入一些message,比如:

>gogogo
>hello
>hi

切换到多个消费者终端,可见,它们都会实时收到message:

gogogo
hello
hi

同理,如果有多个生产者在同时生产message,则消费者能收到所有的message。

总结:一个生产者可以对应多个消费者,反之亦然,二者是M * N的关系。

使用Kafka Connect来导入导出数据

可使用Kafka Connect来向Kafka系统导入数据,或者从Kafka系统向外导出数据。事实上Kafka已经自带了一些connector。本例中,将从文件向Kafka topic导入数据,以及从Kafka topic向文件导出数据。

编辑 config/connect-standalone.properties 文件,添加如下内容:

plugin.path=libs/connect-file-3.7.1.jar

注:可到 libs 目录下,确认具体的文件名。

创建文件 test.txt

foo
bar

现在,就可以导入/导出数据了:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

该命令中:

  • connect-standalone.sh :standalone connect
  • connect-standalone.properties :connect的配置
  • connect-file-source.properties :具体的connector,该配置文件里指定了源文件配置:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
  • connect-file-sink.properties :具体的connector,该配置文件里指定了目标文件配置:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt

总结:该命令使用了两个connector,一个是从指定文件导入数据,另一个是导出数据到指定文件。

运行该命令后,就会生成 test.sink.txt 文件:

[ding@192 kafka_2.13-3.7.1]$ cat test.sink.txt 
foo
bar

可见,已经把数据(来源于文件)导出到目标文件。

当然,也可以通过consumer来消费这些数据:

[ding@192 kafka_2.13-3.7.1]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}

注意:和前面的consumer相比,只是topic不同。

此时,如果实时修改 test.txt 文件:

echo "Another line" >> test.txt

则会立即反映出效果。

  • test.sink.txt 会立即更新:
[ding@192 kafka_2.13-3.7.1]$ cat test.sink.txt 
foo
bar
Another line
  • consumer会收到消息:
[ding@192 kafka_2.13-3.7.1]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

其中,最后一行是实时出现的。

参考

  • https://kafka.apache.org/quickstart
关键字:Kafka环境搭建

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: