当前位置: 首页 > news >正文

flink读kafka写mysql数据库

场景:从kafka读数据,通过jdbc写入mysql

示例:
#往kafka测试主题写入数据
kafka-console-producer.sh --broker-list wh01t:21007 --topic ypg_test --producer.config /client/Kafka/kafka/config/producer.properties
–创建mysql测试表
– dsg.test definition

CREATE TABLE test (
id varchar(50) NOT NULL,
c_date date DEFAULT NULL,
PRIMARY KEY (id)
) ;

flink主类:

package com.pinko.testcaseimport com.security.InitKafkaUtil
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011import scala.collection.JavaConverters.seqAsJavaListConverter/* 测试 */
object Test05FromKafkaToMysql {def main(args: Array[String]): Unit = {val prop = InitKafkaUtil.initPros()InitKafkaUtil.securityPrepare// 加载执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//参数消费多个topicval topics: List[String] = List("ypg_test")val kafkaConsumer = new FlinkKafkaConsumer011[String](topics.asJava, new SimpleStringSchema(), prop)
//    val kafka = env.fromElements("ypghello", "ypgworld")println("flink环境加载完成,开始处理数据...")/* kafka消息处理逻辑 */val kafka = env.addSource(kafkaConsumer)kafka.print()kafka.addSink(new MysqlSink())env.execute("Test05FromKafka")}
}
package com.pinko.testcaseimport org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.sink.{RichSinkFunction, SinkFunction}import java.sql.{Connection, DriverManager, PreparedStatement}class MysqlSink extends RichSinkFunction[String] {var conn: Connection = _var ps: PreparedStatement = _override def open(parameters: Configuration): Unit = {val conn_str = "jdbc:mysql://10.22.33.44:2883/testdb|test|test#123";val conns = conn_str.split("\\|")val url: String = conns(0)val username: String = conns(1)val password: String = conns(2)conn = DriverManager.getConnection(url, username, password)println(conn)}override def invoke(value: String, context: SinkFunction.Context[_]): Unit = {val sql = s"insert into test values ('$value', now()) on duplicate key update id = values(id),c_date = values(c_date)"println(sql)ps = conn.prepareStatement(sql)val rowsAffected = ps.executeUpdate()if (rowsAffected > 0) {println("更新成功")} else {println("没有进行更新操作")}}override def close(): Unit = {ps.close()conn.close()}
}

相关文章:

  • 【LeetCode】【1】两数之和(1141字)
  • 美业美容院会员服务预约店铺管理小程序的效果是什么
  • 粤嵌—2024/5/17—N 皇后 ||(✔)
  • 各大模型厂商API使用:百度、阿里、豆包、kimi、deepseek
  • AIGC 004-T2I-adapter另外一种支持多条件组合控制的文生图方案!
  • 计算机视觉与深度学习实战:以Python为工具,基于深度学习的汽车目标检测
  • C语言 | Leetcode C语言题解之第113题路径总和II
  • Java线程池机制揭秘:一文掌握核心概念与实战技巧
  • 基于小波分析和机器学习(SVM,KNN,NB,MLP)的癫痫脑电图检测(MATLAB环境)
  • Day04:CSS 进阶
  • Ubuntu18.04 OpenSSH升级
  • IT行业的现状与未来发展趋势:从云计算到量子计算的技术变革
  • 猫头虎 解析:为什么AIGC在国内适合做TOB,在国外适合做TOC?
  • 【数据结构与算法 经典例题】相交链表
  • 【java程序设计期末复习】chapter7 内部类和异常类
  • ECMAScript6(0):ES6简明参考手册
  • flutter的key在widget list的作用以及必要性
  • Github访问慢解决办法
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • Making An Indicator With Pure CSS
  • NLPIR语义挖掘平台推动行业大数据应用服务
  • node 版本过低
  • ⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)
  • Vue学习第二天
  • Web标准制定过程
  • 安卓应用性能调试和优化经验分享
  • 前端学习笔记之观察者模式
  • 浅谈web中前端模板引擎的使用
  • 通过获取异步加载JS文件进度实现一个canvas环形loading图
  • 微信小程序填坑清单
  • ​Kaggle X光肺炎检测比赛第二名方案解析 | CVPR 2020 Workshop
  • ​如何在iOS手机上查看应用日志
  • ​业务双活的数据切换思路设计(下)
  • # Redis 入门到精通(八)-- 服务器配置-redis.conf配置与高级数据类型
  • #php的pecl工具#
  • $nextTick的使用场景介绍
  • (173)FPGA约束:单周期时序分析或默认时序分析
  • (ibm)Java 语言的 XPath API
  • (Redis使用系列) SpringBoot 中对应2.0.x版本的Redis配置 一
  • (八)Spring源码解析:Spring MVC
  • (不用互三)AI绘画:科技赋能艺术的崭新时代
  • (苍穹外卖)day03菜品管理
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (二十六)Java 数据结构
  • (接口自动化)Python3操作MySQL数据库
  • (三)centos7案例实战—vmware虚拟机硬盘挂载与卸载
  • (删)Java线程同步实现一:synchronzied和wait()/notify()
  • (十八)Flink CEP 详解
  • (实测可用)(3)Git的使用——RT Thread Stdio添加的软件包,github与gitee冲突造成无法上传文件到gitee
  • (推荐)叮当——中文语音对话机器人
  • (原創) 如何將struct塞進vector? (C/C++) (STL)
  • (转载)VS2010/MFC编程入门之三十四(菜单:VS2010菜单资源详解)
  • ***php进行支付宝开发中return_url和notify_url的区别分析
  • .NET Core 将实体类转换为 SQL(ORM 映射)
  • .NET 将混合了多个不同平台(Windows Mac Linux)的文件 目录的路径格式化成同一个平台下的路径