一、背景
一直很好奇,Flink是如何运行我们的java类任务的,今天先记录下。
二、步骤
1、项目中添加Maven 依赖项
<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.15.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.15.0</version></dependency><!-- Flink Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.15.0</version></dependency><!-- Flink MySQL Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.15.0</version></dependency><!-- MySQL JDBC Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency>
</dependencies>
2、编写 Flink 任务:
创建一个 Flink 任务类,该类将从 Kafka 读取数据,处理数据,并将结果写入 MySQL。
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;public class KafkaToMySQLJob {public static void main(String[] args) throws Exception {// 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 消费者Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("kafka-topic",new SimpleStringSchema(),properties);// 从 Kafka 读取数据DataStream<String> stream = env.addSource(kafkaConsumer);// 处理数据(这里简单地将字符串转换为大写)DataStream<String> processedStream = stream.map(String::toUpperCase);// 将数据写入 MySQLprocessedStream.addSink(JdbcSink.sink("INSERT INTO your_table (column) VALUES (?)",new JdbcStatementBuilder<String>() {@Overridepublic void accept(PreparedStatement ps, String str) throws SQLException {ps.setString(1, str);}},new JdbcExecutionOptions.Builder().withBatchSize(100).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));// 执行任务env.execute("Kafka to MySQL Flink Job");}
}
3、打包和部署:
使用 Maven 或 Gradle 打包你的项目,生成一个 JAR 文件。然后,将 JAR 文件提交到 Flink 集群执行。
mvn clean package
./bin/flink run -c your.package.KafkaToMySQLJob target/your-project-1.0-SNAPSHOT.jar
通过以上步骤,你就可以创建一个从 Kafka 读取数据并写入 MySQL 的 Flink 任务。你可以根据实际需求调整 Kafka 和 MySQL 的配置,以及数据处理逻辑。