当前位置: 首页 > news >正文

创建第一个 Flink 项目

一、运行环境介绍

Flink执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarnk8sMesos等不同的资源管理器部署自己的应用。

环境依赖:
【1】JDK环境:Flink核心模块均使用 Java开发,所以运行环境需要依赖JDKJDK版本需要保证在1.8以上。
【2】Maven编译环境:Flink的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE开发Flink Application,则建议使用Maven作为项目工程编译方式。需要注意的是,Flink程序需要Maven的版本在3.0.4及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA:需要安装scala插件以及scala环境等;

二、Flink项目 Scala版 DataSet 有界流

需求:同进文件文件中的单词出现的次数;

【1】创建Maven项目,pom.xml文件中配置如下依赖

<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.12</artifactId><version>1.10.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.12</artifactId><version>1.10.0</version></dependency>
</dependencies><build><plugins><!-- 该插件用于将Scala代码编译成class文件 --><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.4.6</version><executions><execution><goals><!--声明绑定到 maven 的compile阶段--><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>

【2】resource目录中添加需要进行统计的文件文件及内容
[点击并拖拽以移动] ​

【3】WordCount.java文件内容如下,需要注意隐私转换问题,需要引入scala._

 import org.apache.flink.api.scala._/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {def main(args: Array[String]): Unit = {//创建一个批处理的执行环境val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment//从文件中读取数据var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group byval resultDataSet: DataSet[(String,Int)] = inputDateSet.flatMap(_.split(" "))//分词得到所有 word构成的数据集.map((_,1))//_表示当前 word 转换成一个二元组(word,count).groupBy(0)//以二元组中第一个元素作为key.sum(1) //1表示聚合二元组的第二个元素的值//打印输出resultDataSet.print()}
}

【4】统计结果展示:
[点击并拖拽以移动] ​

三、Flink项目 Scala版 DataStream 无界流

【1】StreamWordCount.java文件内容如下

package com.zzx.flinkimport org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 接受 socket 文本流val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);//定义转换操作 word countval resultDataStream: DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束env.execute("stream word count word")}
}

【2】我这里在Hadoop1中通过nc -lk xxx打开一个socket通信
点击并拖拽以移动​

【3】查看IDEA输出统计内容如下:输出word的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
[点击并拖拽以移动] ​

【4】当我们需要将Java文件打包上传到Flink的时候,这里的hostport可以从参数中进行获取,代码修改如下:

package com.zzx.flinkimport org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._object StreamWordCount {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment// 接受 socket 文本流  hostname:prot 从程序运行参数中读取val params: ParameterTool = ParameterTool.fromArgs(args);val hostname: String = params.get("host");val port: Int = params.getInt("port");val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);//定义转换操作 word countval resultDataStream: DataStream[(String,Int)] = inputDataStream.flatMap(_.split(" "))//以空格分词,得到所有的 word.filter(_.nonEmpty).map((_,1))//转换成 word count 二元组.keyBy(0)//按照第一个元素分组.sum(1)//按照第二个元素求和resultDataStream.print()//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束env.execute("stream word count word")}
}

相关文章:

  • XUbuntu22.04之8款免费UML工具(一百九十七)
  • 【Flink on k8s】- 12 - Flink kubernetes operator 的高级特性
  • 在目标检测的图框标注中注意一下几点
  • java中用thumbnailator依赖写一个压缩图片的类,只要图片大小超过1M就无线循环下去的详细代码实例?
  • 【ARM Trace32(劳特巴赫) 使用介绍 13 -- Trace32 变量篇】
  • 得帆云助力容百科技构建CRM系统,实现LTC全流程管理
  • 【管理运筹学】背诵手册(七)| 网络计划与排队论
  • 游戏架构之面向对象模型和组件模型
  • 【ML】softmax简单理解。
  • 【IC前端虚拟项目】工程目录组织说明
  • ospf选路
  • git 常用部分方法
  • node.js出现version `GLIBC_2.27‘ not found的解决方案
  • Java 使用html2image将html生成缩略图图片
  • Liunx Centos 防火墙操作
  • ----------
  • 【跃迁之路】【477天】刻意练习系列236(2018.05.28)
  • git 常用命令
  • JavaScript实现分页效果
  • Java小白进阶笔记(3)-初级面向对象
  • PhantomJS 安装
  • Rancher-k8s加速安装文档
  • Redis 中的布隆过滤器
  • Vue.js 移动端适配之 vw 解决方案
  • 翻译:Hystrix - How To Use
  • 前端自动化解决方案
  • 手写双向链表LinkedList的几个常用功能
  • 一道闭包题引发的思考
  • #pragam once 和 #ifndef 预编译头
  • $redis-setphp_redis Set命令,php操作Redis Set函数介绍
  • (39)STM32——FLASH闪存
  • (Redis使用系列) SpirngBoot中关于Redis的值的各种方式的存储与取出 三
  • (zt)最盛行的警世狂言(爆笑)
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (附源码)计算机毕业设计SSM教师教学质量评价系统
  • (力扣题库)跳跃游戏II(c++)
  • (一)Spring Cloud 直击微服务作用、架构应用、hystrix降级
  • (转)memcache、redis缓存
  • .babyk勒索病毒解析:恶意更新如何威胁您的数据安全
  • .NET DataGridView数据绑定说明
  • .NET Remoting学习笔记(三)信道
  • .NET 应用启用与禁用自动生成绑定重定向 (bindingRedirect),解决不同版本 dll 的依赖问题
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地中转一个自定义的弱事件(可让任意 CLR 事件成为弱事件)
  • .NET/MSBuild 中的发布路径在哪里呢?如何在扩展编译的时候修改发布路径中的文件呢?
  • @软考考生,这份软考高分攻略你须知道
  • [AX]AX2012 AIF(四):文档服务应用实例
  • [BZOJ 4598][Sdoi2016]模式字符串
  • [DM复习]Apriori算法-国会投票记录关联规则挖掘(上)
  • [elastic 8.x]java客户端连接elasticsearch与操作索引与文档
  • [go] 策略模式
  • [hive] posexplode函数
  • [iHooya]2023年1月30日作业解析
  • [Oh My C++ Diary]用cout输出时后endl的使用
  • [Power Query] 分组依据
  • [Redis实战]分布式锁-redission