当前位置: 首页 > news >正文

Flink 入门案例介绍

一、工程搭建

  • 在 IDEA 中创建一个 Maven 工程:FlinkTutorial

  • 在 pom 文件中引入依赖:

    <dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.10.1</version></dependency><!-- 2.12 是scala版本 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.10.1</version></dependency>
    </dependencies>
    

二、批处理 WordCount 案例

package com.app.wc// 批处理 WordCount
public class WordCount {public static void main(String[] args) throws Exception {// 1.创建 flink 执行环境ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 2.读取文件数据// DataSource 是 Operator 的子类,Operator 是 DataSet 的子类// Flink 的批处理是基于 DataSet 类型的 API 来处理DataSource<String> inputData = env.readTextFile("datas/word.txt");// 3.执行数据处理(按空格分词并转换成 (word, 1) 这样的二元组格式),分组聚合DataSet<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap())  //需要传入FlatMapFunction接口的实现类.groupBy(0)  //可以传入KeySelector实现类或位置索引或字段名.sum(1);  // 传入进行聚合计算的位置索引// 4.输出result.print();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

三、有界流处理 WordCount 案例

package com.app.wc// 流处理WordCount
public class StreamWordCount {public static void main(String[] args) throws Exception {// 1.创建flink流处理执行环境对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(8); // 设置并发度// 2.读取文件StreamDataSource<String> inputData = env.readTextFile("datas/word.txt");// 3.处理数据(分词,转换结构),并分组聚合DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);// 4.输出result.print();// 5.执行任务(流处理是事件触发的)env.execute();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

四、无界流处理 WordCount 案例

方便生产环境部署

package com.app.wcpublic class StreamWordCount2 {public static void main(String[] args) throws Exception {// 1.创建flink流处理执行环境对象StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(8); // 设置并发度// 2.监听 7777 端口服务(nc -lk 7777)// 2.1 使用 ParameterTool 类从启动参数中获取配置项ParameterTool tool = ParameterTool.formArgs(args);String hostname = tool.get("hostname");int port = tool.getInt("port");// 2.2 获取数据流DataStream<String> inputData = env.socketTextFile(hostname, port);// 3.处理数据(分词,转换结构),并分组聚合DataStream<Tuple2<String, Integer>> result = inputData.flatMap(new MyFlatMap()).keyBy(0).sum(1);// 4.输出result.print();// 5.执行任务(流处理是事件触发的)env.execute();}// 自定义FlatMapFunction接口的实现类,并定义输入和输出泛型,实现 flatMap 方法// Tuple2 是 flink 包下的,区别于 Scala 中的 Tuple2public class MyFlatMap implements FlatMapFunction<String, Tuple2<String, Integer>> {@overridepublic void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {// 按空格分词String[] words = value.split(" ");// 遍历数组并转换为二元组输出for(String word : words) {out.collect(new Tuple2(word, 1));}}}
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Autoxjs 实践-Spring Boot 集成 WebSocket
  • NMF算法
  • el-input中change事件造成的坑
  • AI大数据处理与分析实战--体育问卷分析
  • 评价GPT-4的方案
  • blender从视频中动作捕捉,绑定到人物模型
  • 【调度算法】Boltzmann选择
  • Spring Boot 实现动态数据源配置
  • 嵌入式中C语言经典的面试题分享
  • 《TCP/IP网络编程》(第十三章)多种I/O函数(2)
  • GPT-4欺骗人类的惊人成功率达99.16%!
  • 华为坤灵路由器配置SSH
  • LeetCode 2356, 238, 141
  • 1025 反转链表
  • 大模型PEFT(二) 之 大模型LoRA指令微调实践
  • [译]如何构建服务器端web组件,为何要构建?
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • 2017前端实习生面试总结
  • 5分钟即可掌握的前端高效利器:JavaScript 策略模式
  • ABAP的include关键字,Java的import, C的include和C4C ABSL 的import比较
  • C语言笔记(第一章:C语言编程)
  • iOS动画编程-View动画[ 1 ] 基础View动画
  • java 多线程基础, 我觉得还是有必要看看的
  • JS实现简单的MVC模式开发小游戏
  • Magento 1.x 中文订单打印乱码
  • ReactNativeweexDeviceOne对比
  • React-redux的原理以及使用
  • Spring-boot 启动时碰到的错误
  • Travix是如何部署应用程序到Kubernetes上的
  • Vue官网教程学习过程中值得记录的一些事情
  • vue中实现单选
  • yii2中session跨域名的问题
  • 目录与文件属性:编写ls
  • 你不可错过的前端面试题(一)
  • 使用Tinker来调试Laravel应用程序的数据以及使用Tinker一些总结
  • 微信公众号开发小记——5.python微信红包
  • SAP CRM里Lead通过工作流自动创建Opportunity的原理讲解 ...
  • ​如何使用ArcGIS Pro制作渐变河流效果
  • #14vue3生成表单并跳转到外部地址的方式
  • #if和#ifdef区别
  • ( 10 )MySQL中的外键
  • (1)STL算法之遍历容器
  • (11)工业界推荐系统-小红书推荐场景及内部实践【粗排三塔模型】
  • (NO.00004)iOS实现打砖块游戏(十二):伸缩自如,我是如意金箍棒(上)!
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (十六)串口UART
  • (微服务实战)预付卡平台支付交易系统卡充值业务流程设计
  • (一)插入排序
  • .bat批处理(五):遍历指定目录下资源文件并更新
  • .net core IResultFilter 的 OnResultExecuted和OnResultExecuting的区别
  • .Net Remoting(分离服务程序实现) - Part.3
  • .NET/ASP.NETMVC 大型站点架构设计—迁移Model元数据设置项(自定义元数据提供程序)...
  • .NET平台开源项目速览(15)文档数据库RavenDB-介绍与初体验
  • .NET性能优化(文摘)
  • .Net中的集合