当前位置: 首页 > news >正文

flink自定义process,使用状态求历史总和(scala)

es idea maven 依赖

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch7_2.11</artifactId>
<version>1.11.1</version>
</dependency>


import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch7.{ElasticsearchSink, RestClientFactory}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema}
import org.apache.flink.util.Collector
import org.apache.http.HttpHost
import org.apache.http.client.config.RequestConfig
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.client.{Requests, RestClientBuilder}import java.time.Duration
import java.util.Propertiesobject Test {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironment//需要状态开启下面的配置//env.setStateBackend(new RocksDBStateBackend(s"hdfs://${namenodeID}", true))//hdfs 作为状态后端//env.enableCheckpointing(10 * 60 * 1000L)//env.getCheckpointConfig.setCheckpointTimeout(10 * 60 * 1000L)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //事件时间val props = new Propertiesprops.setProperty("bootstrap.servers", "host:6667") //有些是9092端口props.setProperty("group.id", "groupId")props.setProperty("retries", "10")props.setProperty("retries.backoff.ms", "100")props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000")//是否配置了权限,有的话加上下面的配置// props.setProperty("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='' password='';")//props.setProperty("security.protocol", "SASL_PLAINTEXT");// props.setProperty("sasl.mechanism", "PLAIN")val httpHosts = new java.util.ArrayList[HttpHost]httpHosts.add(new HttpHost("esIPOne", 9200, "http"))httpHosts.add(new HttpHost("esIPTwo", 9200, "http"))httpHosts.add(new HttpHost("esIPThree", 9200, "http"))val esSinkBuilder = new ElasticsearchSink.Builder[ResultBean](httpHosts, new ElasticsearchSinkFunction[ResultBean] {def process(element: ResultBean, ctx: RuntimeContext, indexer: RequestIndexer) {val json = new java.util.HashMap[String, Any]json.put("@timestamp", element.ts)json.put("data", element.data)json.put("sum", element.sum)val rqst = Requests.indexRequest().index("indexName").id(element.id).source(json).opType(DocWriteRequest.OpType.INDEX)indexer.add(rqst)}})setESConf(esSinkBuilder, 5000)val myConsumer = new FlinkKafkaConsumer[DemoBean]("topicName", new DemoKafka(), props).setStartFromEarliest() //从什么时间开始读val source = env.addSource(myConsumer).uid("source-data").name("数据源").assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[DemoBean](Duration.ofSeconds(1)).withTimestampAssigner(new SerializableTimestampAssigner[DemoBean] {override def extractTimestamp(element: DemoBean, recordTimestamp: Long): Long = element.ts}).withIdleness(Duration.ofSeconds(5))).uid("water-marks").name("注册水位线")source.keyBy(k => k.id).process(new DemoProcess()).uid("demo-process").name("process 示例").addSink(esSinkBuilder.build()).uid("es-sink").name("数据写入es")env.execute("任务名")}private class DemoKafka() extends KafkaDeserializationSchema[DemoBean] {override def isEndOfStream(t: DemoBean): Boolean = falseoverride def deserialize(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]]): DemoBean = {val value = new String(consumerRecord.value())val list = value.split("\t")DemoBean(list(0), list(1), list(2).toInt, list(3).toLong)}override def getProducedType: TypeInformation[DemoBean] = TypeExtractor.getForClass(classOf[DemoBean])}private class DemoProcess extends KeyedProcessFunction[String, DemoBean, ResultBean] {private var hisSumState: ValueState[Int] = _override def open(parameters: Configuration): Unit = {hisSumState = getRuntimeContext.getState(new ValueStateDescriptor("his-sum", classOf[Int]))}override def processElement(data: DemoBean, ctx: KeyedProcessFunction[String, DemoBean, ResultBean]#Context, out: Collector[ResultBean]): Unit = {val his = if (hisSumState.value() == null) 0 else hisSumState.value()val now = data.valuehisSumState.update(now)out.collect(ResultBean(data.id, data.data, his + now, data.value))}}def setESConf[T](esSinkBuilder: ElasticsearchSink.Builder[T], numMaxActions: Int) {esSinkBuilder.setBulkFlushMaxActions(numMaxActions)esSinkBuilder.setBulkFlushMaxSizeMb(10)esSinkBuilder.setBulkFlushInterval(10000)esSinkBuilder.setBulkFlushBackoff(true)esSinkBuilder.setBulkFlushBackoffDelay(2)esSinkBuilder.setBulkFlushBackoffRetries(3)esSinkBuilder.setRestClientFactory(new RestClientFactory {override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = {restClientBuilder.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {override def customizeRequestConfig(requestConfigBuilder: RequestConfig.Builder): RequestConfig.Builder = {requestConfigBuilder.setConnectTimeout(12000)requestConfigBuilder.setSocketTimeout(90000)}})}})}private case class DemoBean(id: String, data: String, value: Int, ts: Long)private case class ResultBean(id: String, data: String, sum: Int, ts: Long)
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • OpenSSH从7.4升级到9.8的过程 亲测--图文详解
  • 安卓13设置动态显示隐藏第一页的某一项 动态显示隐藏无障碍 android13设置动态显示隐藏第一页的某一项
  • 4款音频转文字在线转换工具帮你解锁新的记录模式。
  • RabbitMQ 高级特性——发送方确认
  • 力扣239 滑动窗口最大值 Java版本
  • C++ 新特性
  • Ceph官方文档_02_Ceph初学者指南
  • 基于php的小说阅读系统
  • docker安装及使用
  • CS61B学习 part1
  • Python Web 中间件开发与优化指南
  • 使用sqoop报错
  • JavaScript可视化
  • Hive企业级调优[3]—— Explain 查看执行计划
  • Linux(ubuntu)(文件IO——fopen)
  • 【JavaScript】通过闭包创建具有私有属性的实例对象
  • JavaScript 基础知识 - 入门篇(一)
  • Js基础——数据类型之Null和Undefined
  • leetcode386. Lexicographical Numbers
  • Mysql优化
  • Node + FFmpeg 实现Canvas动画导出视频
  • node.js
  • pdf文件如何在线转换为jpg图片
  • Solarized Scheme
  • Zsh 开发指南(第十四篇 文件读写)
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 给新手的新浪微博 SDK 集成教程【一】
  • 面试题:给你个id,去拿到name,多叉树遍历
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 微信公众号开发小记——5.python微信红包
  • 责任链模式的两种实现
  • 正则表达式
  • 最近的计划
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • ​linux启动进程的方式
  • ​力扣解法汇总1802. 有界数组中指定下标处的最大值
  • # 职场生活之道:善于团结
  • #快捷键# 大学四年我常用的软件快捷键大全,教你成为电脑高手!!
  • (delphi11最新学习资料) Object Pascal 学习笔记---第5章第5节(delphi中的指针)
  • (done) 两个矩阵 “相似” 是什么意思?
  • (MATLAB)第五章-矩阵运算
  • (pytorch进阶之路)扩散概率模型
  • (Redis使用系列) Springboot 在redis中使用BloomFilter布隆过滤器机制 六
  • (solr系列:一)使用tomcat部署solr服务
  • (补)B+树一些思想
  • (二)Pytorch快速搭建神经网络模型实现气温预测回归(代码+详细注解)
  • (附源码)spring boot基于小程序酒店疫情系统 毕业设计 091931
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (求助)用傲游上csdn博客时标签栏和网址栏一直显示袁萌 的头像
  • (实战篇)如何缓存数据
  • (续)使用Django搭建一个完整的项目(Centos7+Nginx)
  • (一)80c52学习之旅-起始篇
  • (原創) 如何解决make kernel时『clock skew detected』的warning? (OS) (Linux)
  • (转)LINQ之路
  • **Java有哪些悲观锁的实现_乐观锁、悲观锁、Redis分布式锁和Zookeeper分布式锁的实现以及流程原理...