当前位置: 首页 > news >正文

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

【Flink-Kafka-To-Mysql】使用 Flink 实现 Kafka 数据写入 Mysql(根据对应操作类型进行增、删、改操作)

  • 1)导入依赖
  • 2)resources
    • 2.1.appconfig.yml
    • 2.2.application.properties
    • 2.3.log4j.properties
    • 2.4.log4j2.xml
  • 3)util
    • 3.1.KafkaMysqlUtils
    • 3.2.CustomDeSerializationSchema
  • 4)po
    • 4.1.TableBean
  • 5)kafkacdc2mysql
    • 5.1.Kafka2MysqlApp

需求描述:

1、数据从 Kafka 写入 Mysql。

2、相关配置存放于 Mysql 中,通过 Mysql 进行动态读取。

3、此案例中的 Kafka 是进行了 Kerberos 安全认证的,如果不需要自行修改。

4、Kafka 数据为 Json 格式,获取到的数据根据操作类型字段进行增删改操作。

5、读取时使用自定义 Source,写入时使用自定义 Sink。

6、消费 Kafka 数据时自定义反序列化。

1)导入依赖

这里的依赖比较冗余,大家可以根据各自需求做删除或保留。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>gaei.cn.x5l</groupId><artifactId>x8vbusiness</artifactId><version>1.0.0</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><target.java.version>1.8</target.java.version><maven.compiler.source>${target.java.version}</maven.compiler.source><maven.compiler.target>${target.java.version}</maven.compiler.target><scala.binary.version>2.12</scala.binary.version><scala.version>2.12.10</scala.version><flink.version>1.14.0</flink.version><log4j.version>2.17.2</log4j.version><hadoop.version>3.1.2</hadoop.version><hive.version>3.1.2</hive.version><mongo.driver.version>3.12.6</mongo.driver.version><mongo.driver.core.version>4.3.1</mongo.driver.core.version></properties><dependencies><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><!--            <exclusions>--><!--                <exclusion>--><!--                    <groupId>mysql</groupId>--><!--                    <artifactId>mysql-connector-java</artifactId>--><!--                </exclusion>--><!--            </exclusions>--></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.9.0</version></dependency><!-- 基础依赖  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 基础依赖  结束--><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId><version>1.14.0</version><scope>provided</scope></dependency><!-- 使用 hive sql时注销,其他时候可以放开 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- TABLE  结束--><!-- sql  开始--><!-- sql解析 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- sql解析 结束 --><!-- sql连接 kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-sql-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- sql  结束--><!-- 检查点 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-state-processor-api_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>commons-lang</groupId><artifactId>commons-lang</artifactId><version>2.5</version><scope>compile</scope></dependency><!-- 本地监控任务 开始 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 本地监控任务 结束 --><!-- DataStream 开始 --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!-- hdfs --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.3.1</version><exclusions><exclusion><groupId>org.apache.avro</groupId><artifactId>avro</artifactId></exclusion></exclusions></dependency><!-- 重点,容易被忽略的jar --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-auth</artifactId><version>${hadoop.version}</version></dependency><!-- rocksdb_2 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- 其他 --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.23</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version><scope>provided</scope></dependency><dependency><groupId>org.jyaml</groupId><artifactId>jyaml</artifactId><version>1.3</version></dependency><!-- TABLE  开始--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><!--            <version>${flink.version}</version>--><version>1.13.5</version><scope>provided</scope></dependency><!-- TABLE  结束--><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.3</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mongodb-cdc</artifactId><version>2.3.0</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><!--            <version>5.1.44</version>--><version>8.0.27</version><scope>runtime</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid</artifactId><version>1.2.8</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>bson</artifactId><version>${mongo.driver.core.version}</version></dependency><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver-core</artifactId><version>${mongo.driver.core.version}</version></dependency><!--    使用 mongodb-driver 重新打包成的 custom-mongo-core  --><dependency><groupId>org.mongodb</groupId><artifactId>mongodb-driver</artifactId><version>3.12.6</version></dependency></dependencies><build><plugins><!-- Java Compiler --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>${target.java.version}</source><target>${target.java.version}</target></configuration></plugin><!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --><!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.0.0</version><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>org.apache.logging.log4j:*</exclude><exclude>org.apache.flink:flink-runtime-web_2.11</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.owp.flink.kafka.KafkaSourceDemo</mainClass></transformer><!-- flink sql 需要  --><!-- The service transformer is needed to merge META-INF/services files --><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><!-- ... --></transformers></configuration></execution></executions></plugin></plugins><pluginManagement><plugins><!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. --><plugin><groupId>org.eclipse.m2e</groupId><artifactId>lifecycle-mapping</artifactId><version>1.0.0</version><configuration><lifecycleMappingMetadata><pluginExecutions><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><versionRange>[3.0.0,)</versionRange><goals><goal>shade</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution><pluginExecution><pluginExecutionFilter><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><versionRange>[3.1,)</versionRange><goals><goal>testCompile</goal><goal>compile</goal></goals></pluginExecutionFilter><action><ignore/></action></pluginExecution></pluginExecutions></lifecycleMappingMetadata></configuration></plugin></plugins></pluginManagement></build></project>

2)resources

2.1.appconfig.yml

mysql.url: "jdbc:mysql://1.1.1.1:3306/test?useSSL=false"
mysql.username: "test"
mysql.password: "123456"
mysql.driver: "com.mysql.jdbc.Driver"

2.2.application.properties

url=mongodb://test:test123456@10.1.1.1:34516/?authSource=admin
#database=diagnosis
#collection=diagnosisEntiry
maxConnectionIdleTime=1000000
batchSize=1# flink
checkpoint.interval=300000
checkpoint.minPauseBetweenCheckpoints=10000
checkpoint.checkpointTimeout=400000
maxConcurrentCheckpoints=1
restartInterval=120
restartStrategy=3
checkpointDataUri=hdfs://nameserver/user/flink/rocksdbcheckpoint_mongomysql.url=jdbc:mysql://1.1.1.1:3306/test?useSSL=false
mysql.username=test
mysql.password=123456#envType=PRE
envType=PRD# mysql  druid 连接池生产环境连接池配置
druid.driverClassName=com.mysql.jdbc.Driver
#生产
druid.url=jdbc:mysql://1.1.1.1:3306/test
druid.username=test
druid.password=123456
# 初始化连接数
druid.initialSize=1
# 最大连接数
druid.maxActive=5
# 最大等待时间
druid.maxWait=3000

2.3.log4j.properties

log4j.rootLogger=info, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

2.4.log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5"><Properties><property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" /><property name="LOG_LEVEL" value="ERROR" /></Properties><appenders><console name="console" target="SYSTEM_OUT"><PatternLayout pattern="${LOG_PATTERN}"/><ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/></console><File name="log" fileName="tmp/log/job.log" append="false"><PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/></File></appenders><loggers><root level="${LOG_LEVEL}"><appender-ref ref="console"/><appender-ref ref="log"/></root></loggers>
</configuration>

3)util

3.1.KafkaMysqlUtils

public class KafkaUtils {public static FlinkKafkaConsumer<ConsumerRecord<String, String>> getKafkaConsumer(List<String> topic) throws IOException {Properties prop1 = confFromYaml();//认证环境String envType = prop1.getProperty("envType");Properties prop = new Properties();System.setProperty("java.security.krb5.conf", "/opt/conf/krb5.conf");prop.put("security.protocol", "SASL_PLAINTEXT");prop.put("sasl.jaas.config", "com.sun.security.auth.module.Krb5LoginModule required "+ "useTicketCache=false  "+ "serviceName=\"" + "kafka" + "\" "+ "useKeyTab=true "+ "keyTab=\"" + "/opt/conf/test.keytab" + "\" "+ "principal=\"" + getKafkaKerberos(envType).get("principal") + "\";");prop.put("bootstrap.servers", getKafkaKerberos(envType).get("bootstrap.servers"));prop.put("group.id", "Kafka2Mysql_test");prop.put("auto.offset.reset", "earliest");prop.put("enable.auto.commit", "false");prop.put("max.poll.interval.ms", "60000");prop.put("max.poll.records", "3000");prop.put("session.timeout.ms", "600000");//        List<String> topics = Stream.of(prop.getProperty("topics").split(",", -1))
//                .collect(Collectors.toList());prop.put("key.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");prop.put("value.serializer", "org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.ByteArrayDeserializer");FlinkKafkaConsumer<ConsumerRecord<String, String>> consumer = new FlinkKafkaConsumer<ConsumerRecord<String, String>>(topic, new CustomDeSerializationSchema(), prop);consumer.setStartFromGroupOffsets();consumer.setCommitOffsetsOnCheckpoints(true);return consumer;}public static void main(String[] args) throws Exception {Properties druidConf = KafkaUtils.getDruidConf();if (druidConf == null) {throw new RuntimeException("缺少druid相关配置信息,请检查");}DataSource dataSource = DruidDataSourceFactory.createDataSource(druidConf);Connection connection = dataSource.getConnection();PreparedStatement showDatabases = connection.prepareStatement("\n" +"select count(*) from tab_factory");ResultSet resultSet = showDatabases.executeQuery();while (resultSet.next()) {String string = resultSet.getString(1);System.out.println(string);}resultSet.close();showDatabases.close();connection.close();}public static Properties getDruidConf() {try {Properties prop = confFromYaml();String driverClassName = prop.get("druid.driverClassName").toString();String url = prop.get("druid.url").toString();String username = prop.get("druid.username").toString();String password = prop.get("druid.password").toString();String initialSize = prop.get("druid.initialSize").toString();String maxActive = prop.get("druid.maxActive").toString();String maxWait = prop.get("druid.maxWait").toString();Properties p = new Properties();p.put("driverClassName", driverClassName);p.put("url", url);p.put("username", username);p.put("password", password);p.put("initialSize", initialSize);p.put("maxActive", maxActive);p.put("maxWait", maxWait);
//            p.forEach((k,v)-> System.out.println("连接池属性 "+k+"="+v));return p;} catch (Exception e) {e.printStackTrace();}return null;}// envType     PRE  PRDpublic static Map<String, String> getKafkaKerberos(String envType) {Map<String, String> map = new HashMap<>();if ("PRD".equalsIgnoreCase(envType)) {map.put("principal", "prd@PRD.PRD.COM");map.put("bootstrap.servers", "kfk01.prd:9092,kfk02.prd:9092,kfk03.prd:9092,kfk04.prd:9092,kfk05.prd:9092,kfk06.prd:9092");} else if ("PRE".equalsIgnoreCase(envType)) {map.put("principal", "pre@PRE.PRE.COM");map.put("bootstrap.servers", "kfk01.pre:9092,kfk02.pre:9092,kfk03.pre:9092");} /*else if ("TEST".equalsIgnoreCase(envType)) {map.put("principal","test@TEST.TEST.COM");map.put("bootstrap.servers","test@TEST.TEST.COM");} */ else {System.out.println("没有该" + envType + "环境");throw new RuntimeException("没有该" + envType + "环境");}return map;}public static StreamExecutionEnvironment setupFlinkEnv(StreamExecutionEnvironment env) throws IOException {Properties prop = confFromYaml();env.enableCheckpointing(Long.valueOf(prop.getProperty("checkpoint.interval")), CheckpointingMode.EXACTLY_ONCE);//这里会造成offset提交的延迟env.getCheckpointConfig().setMinPauseBetweenCheckpoints(Long.valueOf(prop.getProperty("checkpoint.minPauseBetweenCheckpoints")));env.getCheckpointConfig().setCheckpointTimeout(Long.valueOf(prop.getProperty("checkpoint.checkpointTimeout")));env.getCheckpointConfig().setMaxConcurrentCheckpoints(Integer.valueOf(prop.getProperty("maxConcurrentCheckpoints")));env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5);env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.valueOf(prop.getProperty("restartStrategy")), // 尝试重启的次数,不宜过小,分布式任务很容易出问题(正常情况),建议3-5次Time.of(Integer.valueOf(prop.getProperty("restartInterval")), TimeUnit.SECONDS) // 延时));// 设置状态后端存储方式
//        env.setStateBackend(new RocksDBStateBackend((String) prop.getProperty("checkPointPath"), true));
//        env.setStateBackend(new MemoryStateBackend());env.setStateBackend(new RocksDBStateBackend(String.valueOf(prop.getProperty("checkpointDataUri")), true));return env;}public static Properties confFromYaml() {Properties prop = new Properties();InputStream resourceStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("application.properties");try {prop.load(resourceStream);} catch (Exception e) {e.printStackTrace();} finally {try {if (resourceStream != null) {resourceStream.close();}} catch (Exception ex) {ex.printStackTrace();}}return prop;}
}

3.2.CustomDeSerializationSchema

public class CustomDeSerializationSchema implements KafkaDeserializationSchema<ConsumerRecord<String, String>> {private static String encoding = "UTF8";//是否表示l流的最后一条元素,设置为false,表示数据会源源不断的到来@Overridepublic boolean isEndOfStream(ConsumerRecord<String, String> nextElement) {return false;}//这里返回一个ConsumerRecord<String,String>类型的数据,除了原数据还包括topic,offset,partition等信息@Overridepublic ConsumerRecord<String, String> deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {byte[] key = (record.key() == null ? "".getBytes() : record.key());return new ConsumerRecord<String, String>(record.topic(),record.partition(),record.offset(),record.timestamp(),record.timestampType(),record.checksum(),record.serializedKeySize(),record.serializedValueSize(),/*这里我没有进行空值判断,生产一定记得处理*/new  String(key, encoding),new  String(record.value(), encoding));}//指定数据的输入类型@Overridepublic TypeInformation<ConsumerRecord<String, String>> getProducedType() {return TypeInformation.of(new TypeHint<ConsumerRecord<String, String>>() {});}
}

4)po

4.1.TableBean

@Data
public class TableBean {private String database;private String table;private String primaryKey;private TableBean() {}public TableBean(String database, String table, String primaryKey) {this.database = '`' + database + '`';this.table = '`' + table + '`';this.primaryKey = primaryKey;}
}

5)kafkacdc2mysql

5.1.Kafka2MysqlApp

public class Kafka2MysqlApp {// key 是 topic 名,value是对应数据库表中的主键列名private static final Map<String, TableBean> map = new HashMap<>();static {//表名这里没有进行配置,后面根据实际业务进行配置即可map.put("mysql_tab1", new TableBean("db1", "", "alarm_id"));map.put("mysql_tab2", new TableBean("db2", "", "id"));}public static void main(String[] args) throws Exception {ArrayList<String> topicList = new ArrayList<>(map.keySet());StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().disableOperatorChaining();KafkaUtils.setupFlinkEnv(env);RichSinkFunction<ConsumerRecord<String, String>> sinkFunction =new RichSinkFunction<ConsumerRecord<String, String>>() {DataSource dataSource = null;@Overridepublic void open(Configuration parameters) throws Exception {initDruidDataSource();}private void initDruidDataSource() throws Exception {Properties druidConf = KafkaUtils.getDruidConf();if (druidConf == null) {throw new RuntimeException("缺少druid相关配置信息,请检查");}dataSource = DruidDataSourceFactory.createDataSource(druidConf);}@Overridepublic void close() throws Exception {}@Overridepublic void invoke(ConsumerRecord<String, String> record, Context context) throws Exception {if (dataSource == null) {throw new RuntimeException("连接池未初始化");}String operationType = "";String keyId = "";String sql = "";try (Connection connection = dataSource.getConnection()) {//定义表名String table_name = record.topic();JSONObject jsonObject = JSONObject.parseObject(record.value());operationType = jsonObject.getString("operationType");jsonObject.remove("operationType");String primaryKey = map.get(record.topic()).getPrimaryKey();String database = map.get(record.topic()).getDatabase();keyId = jsonObject.getString(primaryKey);List<String> columns = new ArrayList<>();List<String> columnValues = new ArrayList<>();jsonObject.forEach((k, v) -> {columns.add(k);columnValues.add(v.toString());});if ("INSERT".equals(operationType)) {try {sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";PreparedStatement preparedStatement = connection.prepareStatement(sql);preparedStatement.setObject(1, keyId);preparedStatement.executeUpdate();preparedStatement.close();} catch (Exception ignore) {}StringBuilder sb = new StringBuilder();sb.append("insert into ").append(database).append(".").append(table_name).append("(");for (String column : columns) {sb.append("`").append(column).append("`,");}sb.append(") values(");for (String columnValue : columnValues) {sb.append("?,");}sb.append(")");//去除最后一个逗号sql = sb.toString().replace(",)", ")");PreparedStatement preparedStatement = connection.prepareStatement(sql);for (int i = 0; i < columnValues.size(); i++) {preparedStatement.setObject(i + 1, columnValues.get(i));}preparedStatement.executeUpdate();preparedStatement.close();} else if ("UPDATE".equals(operationType)) {StringBuilder sb = new StringBuilder();sb.append("update ").append(database).append(".").append(table_name).append(" set ");for (String column : columns) {sb.append("`").append(column).append("`= ?,");}String sqlPre = sb.substring(0, sb.length() - 1);sql = sqlPre + " where " + primaryKey + "='" + keyId + "'";PreparedStatement preparedStatement = connection.prepareStatement(sql);for (int i = 0; i < columnValues.size(); i++) {preparedStatement.setObject(i + 1, columnValues.get(i));}preparedStatement.executeUpdate();preparedStatement.close();} else if ("DELETE".equals(operationType)) {sql = "delete from " + database + "." + table_name + " where " + primaryKey + "= ?";PreparedStatement preparedStatement = connection.prepareStatement(sql);preparedStatement.setObject(1, keyId);preparedStatement.executeUpdate();preparedStatement.close();}} catch (Exception e) {System.out.printf("mysql同步操作(%s)有误,主键是%s,原因是%s,对应topic数据是%s%n", operationType, keyId, e.getMessage(), record);System.out.println("执行sql语句为 " + sql);throw new RuntimeException(e);}}};env.addSource(KafkaUtils.getKafkaConsumer(topicList)).addSink(sinkFunction);env.execute("kafka2mysql synchronization " + topicList.toString());}
}

相关文章:

  • Java关键字(1)
  • MathType 运行时错误‘53’:文件未找到:MathPage.WLL_文件未找到mathpage.wll
  • 【Matlab】CNN卷积神经网络时序预测算法
  • 八皇后问题(C语言)
  • Tg5032smn:高稳定性105℃高温
  • 【数据结构】六、树和二叉树
  • JavaScript原型,原型链
  • Android studio 使用greenDao根据实体类生成dao类
  • SpringMVC源码解析——DispatcherServlet初始化
  • 深入理解WPF MVVM:探索数据绑定与命令的优雅之道
  • 青龙面板的安装
  • C++20形式的utf-8字符串转宽字符串,不依赖编译器编码形式
  • Python-01-print、input、#
  • 云服务器系统盘40GB或50GB是否够用?
  • PPT录制视频的方法,轻松提升演示效果!
  • Android优雅地处理按钮重复点击
  • extract-text-webpack-plugin用法
  • Git同步原始仓库到Fork仓库中
  • JS变量作用域
  • Markdown 语法简单说明
  • PAT A1017 优先队列
  • Vue ES6 Jade Scss Webpack Gulp
  • 从零开始的webpack生活-0x009:FilesLoader装载文件
  • 浮现式设计
  • 解析 Webpack中import、require、按需加载的执行过程
  • 前嗅ForeSpider中数据浏览界面介绍
  • 如何优雅地使用 Sublime Text
  • 如何正确配置 Ubuntu 14.04 服务器?
  • 实战|智能家居行业移动应用性能分析
  • 微信如何实现自动跳转到用其他浏览器打开指定页面下载APP
  • 小程序button引导用户授权
  • 学习使用ExpressJS 4.0中的新Router
  • C# - 为值类型重定义相等性
  • ​水经微图Web1.5.0版即将上线
  • ​业务双活的数据切换思路设计(下)
  • # 数据结构
  • #14vue3生成表单并跳转到外部地址的方式
  • #stm32驱动外设模块总结w5500模块
  • (6)【Python/机器学习/深度学习】Machine-Learning模型与算法应用—使用Adaboost建模及工作环境下的数据分析整理
  • (五) 一起学 Unix 环境高级编程 (APUE) 之 进程环境
  • (一)UDP基本编程步骤
  • (一)搭建springboot+vue前后端分离项目--前端vue搭建
  • (源码版)2024美国大学生数学建模E题财产保险的可持续模型详解思路+具体代码季节性时序预测SARIMA天气预测建模
  • (转)C#开发微信门户及应用(1)--开始使用微信接口
  • .NET BackgroundWorker
  • .NET 材料检测系统崩溃分析
  • .NET 指南:抽象化实现的基类
  • .NET实现之(自动更新)
  • .NET使用存储过程实现对数据库的增删改查
  • .net通用权限框架B/S (三)--MODEL层(2)
  • /etc/X11/xorg.conf 文件被误改后进不了图形化界面
  • /ThinkPHP/Library/Think/Storage/Driver/File.class.php  LINE: 48
  • :=
  • ?php echo ?,?php echo Hello world!;?
  • [2016.7 test.5] T1