Alink 代码连接 Kafka 数据源

MacOS M1 上,测试使用 Alink 的 java 代码读写 Kafka 数据源,了解一下 Alink API 的基本构成。

Kafka 数据源准备

在 MacOS 上搭建 Kafka

1
2
3
4
brew install Kafka

# M1 使用以下命令
brew install -s Kafka

安装过程会自动安装zookeeper,并给出zookeeper和kafka的路径、文件个数及存储大小:

1
2
/opt/homebrew/Cellar/zookeeper/3.7.0_1: 430 files, 36.5MB
/opt/homebrew/Cellar/kafka/3.0.0: 171 files, 59.5MB

进入 kafka 的 bin 目录:

1
cd /opt/homebrew/Cellar/kafka/2.3.1/bin

启动 zookeeper:

1
zookeeper-server-start /opt/homebrew/etc/kafka/zookeeper.properties &

启动 kafka:

1
kafka-server-start /opt/homebrew/etc/kafka/server.properties &

Topic 相关命令行

创建 Topic:

1
2
3
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic iris

kafka-topics --list --bootstrap-server localhost:9092

写入 Topic:

1
kafka-console-producer --broker-list localhost:9092 --topic iris

消费 Topic:

1
kafka-console-consumer --bootstrap-server localhost:9092 --topic iris --from-beginning

Kafka011SinkStreamOp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private static void writeKafka() throws Exception {
// 共 150 条数据
String URL = "https://alink-release.oss-cn-beijing.aliyuncs.com/data-files/iris.csv";
String SCHEMA_STR
= "sepal_length double, sepal_width double, petal_length double, petal_width double, category string";
CsvSourceStreamOp csvStreamSource = new CsvSourceStreamOp().setFilePath(URL).setSchemaStr(SCHEMA_STR);

Kafka011SinkStreamOp kafkaStreamSink = new Kafka011SinkStreamOp()
.setBootstrapServers("localhost:9092")
.setDataFormat("json")
.setTopic("iris");

csvStreamSource.link(kafkaStreamSink);

StreamOperator.execute();
}

Kafka011SourceStreamOp

1
2
3
4
5
6
7
8
9
10
11
private static void readKafka() throws Exception {
Kafka011SourceStreamOp kafkaStreamSource = new Kafka011SourceStreamOp()
.setBootstrapServers("localhost:9092")
.setTopic("iris")
.setStartupMode("EARLIEST")
.setGroupId("alink_group");

kafkaStreamSource.print();

StreamOperator.execute();
}

JsonValueStreamOp 进行大 json 处理

处理从 kafka 读取的 message 大 json 字段,重新设置输出列,使用 JsonValueStreamOp ,通过设置需要提取内容的 JsonPath ,提取出各列数据,字段类型均为 String 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void readKafka() throws Exception {
Kafka011SourceStreamOp kafkaStreamSource = new Kafka011SourceStreamOp()
.setBootstrapServers("localhost:9092")
.setTopic("iris")
.setStartupMode("EARLIEST")
.setGroupId("alink_group");

StreamOperator jsonValueExtractor = kafkaStreamSource
.link(
new JsonValueStreamOp()
.setSelectedCol("message")
.setReservedCols(new String[] {})
.setOutputCols(
new String[] {"sepal_length", "sepal_width", "petal_length", "petal_width", "category"})
.setJsonPath(new String[] {"$.sepal_length", "$.sepal_width", "$.petal_length", "$.petal_width",
"$.category"})
);

System.out.print(jsonValueExtractor.getSchema());

jsonValueExtractor.print();

StreamOperator.execute();
}

select 进行字段类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private static void readKafka() throws Exception {
Kafka011SourceStreamOp kafkaStreamSource = new Kafka011SourceStreamOp()
.setBootstrapServers("localhost:9092")
.setTopic("iris")
.setStartupMode("EARLIEST")
.setGroupId("alink_group");

StreamOperator jsonValueExtractor = kafkaStreamSource
.link(
new JsonValueStreamOp()
.setSelectedCol("message")
.setReservedCols(new String[] {})
.setOutputCols(
new String[] {"sepal_length", "sepal_width", "petal_length", "petal_width", "category"})
.setJsonPath(new String[] {"$.sepal_length", "$.sepal_width", "$.petal_length", "$.petal_width",
"$.category"})
)
.select("CAST(sepal_length AS DOUBLE) AS sepal_length, "
+ "CAST(sepal_width AS DOUBLE) AS sepal_width, "
+ "CAST(petal_length AS DOUBLE) AS petal_length, "
+ "CAST(petal_width AS DOUBLE) AS petal_width, category"
);

System.out.print(jsonValueExtractor.getSchema());

jsonValueExtractor.print();

StreamOperator.execute();
}