当前位置: 首页> 娱乐> 影视 > 总结:Flink任务执行

总结:Flink任务执行

时间:2025/9/7 20:45:43来源:https://blog.csdn.net/w2009211777/article/details/140189268 浏览次数:0次

一、背景

一直很好奇,Flink是如何运行我们的java类任务的,今天先记录下。

二、步骤

1、项目中添加Maven 依赖项
<dependencies><!-- Flink Core --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.15.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.15.0</version></dependency><!-- Flink Kafka Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.12</artifactId><version>1.15.0</version></dependency><!-- Flink MySQL Connector --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.12</artifactId><version>1.15.0</version></dependency><!-- MySQL JDBC Driver --><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency>
</dependencies>
2、编写 Flink 任务


创建一个 Flink 任务类,该类将从 Kafka 读取数据,处理数据,并将结果写入 MySQL。

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcExecutionOptions;
import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
import org.apache.flink.streaming.connectors.jdbc.JdbcStatementBuilder;import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;public class KafkaToMySQLJob {public static void main(String[] args) throws Exception {// 创建执行环境final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 配置 Kafka 消费者Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "flink-group");FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("kafka-topic",new SimpleStringSchema(),properties);// 从 Kafka 读取数据DataStream<String> stream = env.addSource(kafkaConsumer);// 处理数据(这里简单地将字符串转换为大写)DataStream<String> processedStream = stream.map(String::toUpperCase);// 将数据写入 MySQLprocessedStream.addSink(JdbcSink.sink("INSERT INTO your_table (column) VALUES (?)",new JdbcStatementBuilder<String>() {@Overridepublic void accept(PreparedStatement ps, String str) throws SQLException {ps.setString(1, str);}},new JdbcExecutionOptions.Builder().withBatchSize(100).withBatchIntervalMs(200).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://localhost:3306/your_database").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("your_username").withPassword("your_password").build()));// 执行任务env.execute("Kafka to MySQL Flink Job");}
}
3、打包和部署


使用 Maven 或 Gradle 打包你的项目,生成一个 JAR 文件。然后,将 JAR 文件提交到 Flink 集群执行。

mvn clean package
./bin/flink run -c your.package.KafkaToMySQLJob target/your-project-1.0-SNAPSHOT.jar

通过以上步骤,你就可以创建一个从 Kafka 读取数据并写入 MySQL 的 Flink 任务。你可以根据实际需求调整 Kafka 和 MySQL 的配置,以及数据处理逻辑。

关键字:总结:Flink任务执行

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: