当前位置: 首页 > news >正文

Flink加载维度数据

Flink加载维度数据

1、为何要加载维度数据?

在我们构建实时数仓时,不能光有事实数据,也需要加载维度数据来标明这些事实数据的具体含义。若只含有事实数据的话,就相当于只有数据本身在不断地变化,而并不知道这些数据具体表示什么意思。因此,我们应当加载维度数据进来。

2、加载维度数据的方式

此处,将提供两种常见的用于加载维度数据的方式。

方式一:缓存文件

district.txt文件:存放于resources资源目录下

1   nanjing
2   suzhou
3   changzhou
4   xuzhou

主体代码

package recovery;import modules.env.Environments;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import scala.Tuple3;import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;/*** 缓存文件的注册与获取*/
public class TestCache {public static void main(String[] args) throws Exception {// 创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 1.注册缓存文件String path = Thread.currentThread().getContextClassLoader().getResource("district.txt").getPath();// 获取静态文件district.txt的路径see.registerCachedFile(path,"district"); // 缓存至环境中// 2.注册任务侦听器see.registerJobListener(new JobListener() {@Overridepublic void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable) {// 任务提交时// 任务正常:输出jobClient,任务异常:throwableif (Objects.nonNull(jobClient)) {// 输出IDSystem.out.println(jobClient.getJobID().toString());// 输出状态try {System.err.println(jobClient.getJobStatus().get(10, TimeUnit.SECONDS).name());} catch (Exception e) {System.err.println(e.getMessage());}}else if (Objects.nonNull(throwable)) {// 异常不为空System.err.println(throwable.getMessage());}}@Overridepublic void onJobExecuted(@Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable) {// 任务执行// 任务正常:输出jobExecutionResult,任务异常:throwableif (Objects.nonNull(jobExecutionResult)) {System.out.println(jobExecutionResult);}else if (Objects.nonNull(throwable)){System.err.println(throwable.getMessage());}}});// 3.数据:ID,温度,时间戳// 生成水位线TimestampAssignerSupplier<Tuple3> supplier = new TimestampAssignerSupplier<Tuple3>() {@Overridepublic TimestampAssigner<Tuple3> createTimestampAssigner(Context context) {return (element,recordTimestamp) -> (Long) element._3();}};WatermarkStrategy<Tuple3> watermark = WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner(supplier);// 数据see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000)))// 4.将缓存文件中地址内容来替代数据中的ID号【通过ID关联】.setParallelism(1).assignTimestampsAndWatermarks(watermark).map(new RichMapFunction<Tuple3, Tuple3>() {Map<Integer,String> idName = new HashMap<>(); // 全局Map// 初始化资源@Overridepublic void open(Configuration parameters) throws Exception {// 读取缓存文件File district = getRuntimeContext().getDistributedCache().getFile("district");try(BufferedReader br = new BufferedReader(new FileReader(district))){ // 会自动释放()内资源String line;while (Objects.nonNull(line = br.readLine())) {String[] s = line.split("\\s+");idName.put(Integer.valueOf(s[0]),s[1]);}}catch (Exception ex){ex.printStackTrace();}}@Overridepublic Tuple3 map(Tuple3 value) throws Exception {return new Tuple3(idName.get(value._1()),value._2(),value._3());}// 释放资源@Overridepublic void close() throws Exception {idName.clear();}}).print();see.execute("cache-test");}
}

结果展示

(nanjing,34,1727094791401)
(suzhou,36,1727094792401)
(nanjing,35,1727094793401)
(changzhou,32,1727094794401)
(suzhou,33,1727094795401)

方式二:广播变量

主要代码

package recovery;import modules.env.Environments;
import modules.time.Timer;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowStagger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import scala.Tuple2;
import scala.Tuple3;import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;/*** 广播变量的发送与获取* 连接流 connect*/
public class TestBroadcastConnect {public static void main(String[] args) throws Exception {// 1.创建环境StreamExecutionEnvironment see = new Environments().build().enableCheckpoint("file:///D:/phase/flink_state_backend", 3, 1, 1).enableRetries(3, 1).enableStateBackend("hashmap", true, false).finish(RuntimeExecutionMode.STREAMING, 1, 3);// 2.广播变量MapStateDescriptor desc1 = new MapStateDescriptor("idCity", Integer.class, String.class); // 描述特征BroadcastStream<Tuple2> broadcastStream = see.fromCollection(Arrays.asList(// 广播出去的内容new Tuple2(1, "nanjing"),new Tuple2(2, "suzhou"),new Tuple2(3, "wuxi"))).broadcast(desc1); // 广播流// 3.数据:ID,温度,时间戳see.fromCollection(Arrays.asList(new Tuple3(1,34,System.currentTimeMillis()),new Tuple3(2,36,System.currentTimeMillis()+1000),new Tuple3(1,35,System.currentTimeMillis()+2000),new Tuple3(3,32,System.currentTimeMillis()+3000),new Tuple3(2,33,System.currentTimeMillis()+4000))).setParallelism(1).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Tuple3>) (element,recordTimestamp) -> (Long) element._3()))// 4.连接流:与广播流数据进行连接(获取广播变量,变为广播连接流).connect(broadcastStream)// 5.将广播变量中地址内容来替代数据中的ID号【通过ID关联】.process(new BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>() {@Overridepublic void processElement(Tuple3 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.ReadOnlyContext ctx, Collector<Tuple3> out) throws Exception {Object v = ctx.getBroadcastState(desc1).get(value._1()); // 取out.collect(new Tuple3(v,value._2(),value._3()));}@Overridepublic void processBroadcastElement(Tuple2 value, BroadcastProcessFunction<Tuple3, Tuple2, Tuple3>.Context ctx, Collector<Tuple3> out) throws Exception {ctx.getBroadcastState(desc1).put(value._1,value._2); // 存}})// 6.业务: 平均温度.keyBy(t3->t3._1().toString()).window(Timer.tumbling(5,0,TimeUnit.SECONDS ,WindowStagger.NATURAL)).process(new ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>() {@Overridepublic void process(String city, ProcessWindowFunction<Tuple3, Tuple2, String, TimeWindow>.Context context, Iterable<Tuple3> elements, Collector<Tuple2> out) throws Exception {float avg = 0.0f;int count = 0;Iterator<Tuple3> it = elements.iterator();while(it.hasNext()){count++;avg += (Integer) it.next()._2();}avg /= count;// 将平均温度往后送out.collect(new Tuple2(city,avg));}})// 相当于print()操作.addSink(new SinkFunction<Tuple2>() {@Overridepublic void invoke(Tuple2 value, Context context) throws Exception {System.out.println(value);}});see.execute("broadcast-connect");}
}

结果展示

(nanjing,34.5)
(suzhou,36.0)
(wuxi,32.0)
(suzhou,33.0)

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • JBoss EJBInvokerServlet CVE-2013-4810 反序列化漏洞
  • Android——内部/外部存储
  • Fyne ( go跨平台GUI )中文文档-容器和布局 (四)
  • 操作系统笔记三
  • C++笔记---set和map
  • ElasticSearch数据类型和分词器
  • (十五)、把自己的镜像推送到 DockerHub
  • python中网络爬虫框架
  • 机械快门,电子快门,电子前帘快门 的原理
  • SPECFEM手册的一些翻译(Chapter 4)
  • Qt 状态机编程,双层状态机,实现暂停恢复
  • 【手写数据库内核组件】1001词法分析器,语言被程序识别的第一步,将语句分解为最小词根token
  • 常见框架漏洞复现
  • 不同语言的switch/case语句
  • 【通讯协议】S32K142芯片——LIN通信的学习和配置
  • 自己简单写的 事件订阅机制
  • 《Java编程思想》读书笔记-对象导论
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • 08.Android之View事件问题
  • 2017年终总结、随想
  • 230. Kth Smallest Element in a BST
  • css布局,左右固定中间自适应实现
  • C语言笔记(第一章:C语言编程)
  • dva中组件的懒加载
  • gcc介绍及安装
  • Javascript 原型链
  • JAVA之继承和多态
  • JDK 6和JDK 7中的substring()方法
  • Linux各目录及每个目录的详细介绍
  • MySQL Access denied for user 'root'@'localhost' 解决方法
  • mysql 数据库四种事务隔离级别
  • Mysql5.6主从复制
  • oldjun 检测网站的经验
  • Sass Day-01
  • 阿里云前端周刊 - 第 26 期
  • 阿里云应用高可用服务公测发布
  • 构造函数(constructor)与原型链(prototype)关系
  • ------- 计算机网络基础
  • 那些被忽略的 JavaScript 数组方法细节
  • 进程与线程(三)——进程/线程间通信
  • 摩拜创始人胡玮炜也彻底离开了,共享单车行业还有未来吗? ...
  • ​什么是bug?bug的源头在哪里?
  • # 手柄编程_北通阿修罗3动手评:一款兼具功能、操控性的电竞手柄
  • #1014 : Trie树
  • (1)(1.8) MSP(MultiWii 串行协议)(4.1 版)
  • (C语言)二分查找 超详细
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (Redis使用系列) Springboot 使用redis实现接口幂等性拦截 十一
  • (附源码)php投票系统 毕业设计 121500
  • (六)软件测试分工
  • (四)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • .net core 调用c dll_用C++生成一个简单的DLL文件VS2008
  • .Net Core 中间件验签