MacOS M1 上,测试使用 Alink 的 java 代码读写 Kafka 数据源,了解一下 Alink API 的基本构成。
Kafka 数据源准备
在 MacOS 上搭建 Kafka
1 | brew install Kafka |
安装过程会自动安装zookeeper,并给出zookeeper和kafka的路径、文件个数及存储大小:
1 | /opt/homebrew/Cellar/zookeeper/3.7.0_1: 430 files, 36.5MB |
进入 kafka 的 bin 目录:
1 | cd /opt/homebrew/Cellar/kafka/2.3.1/bin |
启动 zookeeper:
1 | zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties & |
启动 kafka:
1 | kafka-server-start /opt/homebrew/etc/kafka/server.properties & |
Topic 相关命令行
创建 Topic:
1 | kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iris |
写入 Topic:
1 | kafka-console-producer --broker-list localhost:9092 --topic iris |
消费 Topic:
1 | kafka-console-consumer --bootstrap-server localhost:9092 --topic iris --from-beginning |
Alink 写入 Topic
Kafka011SinkStreamOp
1 | private static void writeKafka() throws Exception { |
Alink 读取 Topic
Kafka011SourceStreamOp
1 | private static void readKafka() throws Exception { |
JsonValueStreamOp 进行大 json 处理
处理从 kafka 读取的 message 大 json 字段,重新设置输出列,使用 JsonValueStreamOp ,通过设置需要提取内容的 JsonPath ,提取出各列数据,字段类型均为 String 。
1 | private static void readKafka() throws Exception { |
select 进行字段类型转换
1 | private static void readKafka() throws Exception { |