文章目录
- 环境
- 准备
- Java
- Kafka
- 启动
- 生产和消费message
- 使用Kafka Connect来导入导出数据
- 参考
环境
- RHEL 9.4
- Java 21.0.4
- Kafka 2.13
准备
Java
在 https://www.oracle.com/cn/java/technologies/downloads/
处下载Java。我下载的是 jdk-21_linux-x64_bin.tar.gz
。
解压:
tar -zxvf jdk-21_linux-x64_bin.tar.gz
修改 ~/.bashrc
:
export PATH=/home/ding/Downloads/jdk-21.0.4/bin:$PATH
export JAVA_HOME=/home/ding/Downloads/jdk-21.0.4
保存退出,并source一下:
. ~/.bashrc
Kafka
在 https://kafka.apache.org/quickstart
处点击 Download
链接,打开下载页面,然后下载Kafka。我下载的是 kafka_2.13-3.7.1.tgz
。
解压:
tar -zxvf kafka_2.13-3.7.1.tgz
启动
Kafka可使用ZooKeeper或KRaft启动,本例中使用的是ZooKeeper(ZooKeeper已包含在Kafka下载包里)。
启动ZooKeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动Kafka(打开另一个窗口):
bin/kafka-server-start.sh config/server.properties
生产和消费message
创建topic quickstart-events
:
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
查看topic:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: 02qO77BpQUShHMMpEtlJfA PartitionCount: 1 ReplicationFactor: 1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
生产message:
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
接下来就可以交互式的生产message,比如:
>This is my first event
>This is my second event
最后按Ctrl + C退出。
消费message:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
可见,收到了刚才生产的两条消息。
最后按Ctrl + C退出,显示:
^CProcessed a total of 2 messages
再次消费message:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event
可见,之前已被消费的message还在,并没有从消息队列移除。
同样,如果同时有多个消费者,它们都能收到message。打开多个终端,并运行上面的命令。
新打开一个终端,并生产message:
bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
输入一些message,比如:
>gogogo
>hello
>hi
切换到多个消费者终端,可见,它们都会实时收到message:
gogogo
hello
hi
同理,如果有多个生产者在同时生产message,则消费者能收到所有的message。
总结:一个生产者可以对应多个消费者,反之亦然,二者是M * N的关系。
使用Kafka Connect来导入导出数据
可使用Kafka Connect来向Kafka系统导入数据,或者从Kafka系统向外导出数据。事实上Kafka已经自带了一些connector。本例中,将从文件向Kafka topic导入数据,以及从Kafka topic向文件导出数据。
编辑 config/connect-standalone.properties
文件,添加如下内容:
plugin.path=libs/connect-file-3.7.1.jar
注:可到 libs
目录下,确认具体的文件名。
创建文件 test.txt
:
foo
bar
现在,就可以导入/导出数据了:
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
该命令中:
connect-standalone.sh
:standalone connectconnect-standalone.properties
:connect的配置connect-file-source.properties
:具体的connector,该配置文件里指定了源文件配置:
name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=test.txt
connect-file-sink.properties
:具体的connector,该配置文件里指定了目标文件配置:
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=test.sink.txt
总结:该命令使用了两个connector,一个是从指定文件导入数据,另一个是导出数据到指定文件。
运行该命令后,就会生成 test.sink.txt
文件:
[ding@192 kafka_2.13-3.7.1]$ cat test.sink.txt
foo
bar
可见,已经把数据(来源于文件)导出到目标文件。
当然,也可以通过consumer来消费这些数据:
[ding@192 kafka_2.13-3.7.1]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
注意:和前面的consumer相比,只是topic不同。
此时,如果实时修改 test.txt
文件:
echo "Another line" >> test.txt
则会立即反映出效果。
test.sink.txt
会立即更新:
[ding@192 kafka_2.13-3.7.1]$ cat test.sink.txt
foo
bar
Another line
- consumer会收到消息:
[ding@192 kafka_2.13-3.7.1]$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}
其中,最后一行是实时出现的。
参考
https://kafka.apache.org/quickstart