当前位置: 首页 > news >正文

Flink 实时数仓(三)【DWD 层搭建(一)】

1、DWD 层搭建

DWD 层的数据来源有两部分:日志和业务数据中的事实

  • DWD 层存储的是事实表(存储在 Kafka)
  • 命名规范:dwd_数据域_表名

这里我们的数据域主要包括:流量域(日志)、交易域(交易事实)、工具域(优惠券操作)、互动域(收藏评价)和用户域(注册),供 16 张事实表:

数据域表数量表名
流量域3未经加工、独立访客、用户跳出
交易域7加购物车、订单预处理、下单、取消订单、支付、支付成功、退单退款成功
工具域3优惠券领取、满减券、打折券
互动域2收藏、评价
用户域1注册

1.1、设计分析

DWD 层的数据源:

  • 日志(topic_log):5种(启动、页面、曝光、动作。错误)
  • 业务(topic_db):十几张(所有需要处理的事实表)

比如订单相关的业务过程:订单表&订单明细表&订单明细购物券表&订单明细活动表

在离线数仓中,我们一般会选择一个最细粒度的事实表(比如订单明细表)作为 DWD 事实表的粒度,然后再和其他订单表进行 join 合并以丰富维度(可能包含维度退化);

1.1.1、ODS 消费策略

        现在我们要讨论 DWD 怎么从 ODS 去消费数据?现在我们一共有 2 个 topic,每个 topic 中包含的都是不同结构的数据(日志数据有 5 种,业务表的结构也都不一样),所以我们需要拆分开来再消费,这里有两种方案:

方案1
  • 直接使用 Flink 在 ODS 的 topic 基础之上再次消费,分流(侧输出流)之后按照表名再次写入到 Kafka 的不同 topic 中
  • 然后在此使用 Flink 消费不同的 topic(一个 topic 就是一张表的数据)并将不同主题的数据进行关联
  • 最后再写入到 Kafka 的主题当中(这里的主题相当于数据域)

优点:把不同表写入不同的 topic 中,这样当 Flink 去消费的时候数据量就会小很多

缺点:时效性可能差一点,因为需要在 ODS 基础上再次消费写入到 Kafka 才能供 Flink 去消费; 

方案2
  • Flink 直接对 ODS 的 topic 进行消费,在程序中过滤想要的数据并进行关联
  • 最后写入到 Kakfa 主题中

优点:时效性好,设计简答

缺点:数据量太大

综上,当数据量小的时候,更适合选择方案2;当数据量大的时候,适合选择方案1;所以这里对于日志数据,我们采用方案1;而业务数据采用方案2;

1.2、DWD 层实现

1.2.1、流量域未经加工事务事实表

        流量域中的数据全部来自日志主题(topic_log),因为日志数据包含 5 中类型(启动、页面、曝光、动作。错误)所以,我们需要把它先拆分成 5 个主题,也就是使用 Flink 将 topic_log 中的数据分成 5 个流,再写入 5 个不同的主题当中;

1)ETL

        数据传输过程中可能会出现部分数据丢失的情况,导致 JSON 数据结构不再完整,但是我们之前在 Flume 采集数据到 Kafka 的时候已经在 Flume 拦截器中过滤掉了,所以这里不需要再做处理;

2)新老访客状态标记修复

下面是一段日志数据:

{"common":{"ar":"370000","ba":"Xiaomi","ch":"web","is_new":"0","md":"Xiaomi 10 Pro ","mid":"mid_2190279","os":"Android 11.0","uid":"688","vc":"v2.1.134"},"page":{"during_time":11863,"item":"34,27,14","item_type":"sku_ids","page_id":"trade"},"ts":1651304100000}

        日志数据中 common 字段下的 is_new 字段是用来标记新老访客状态的,1 表示新访客,0 表示老访客。前端埋点采集到的数据可靠性无法保证,可能会出现老访客被标记为新访客的问题,因此需要对该标记进行修复。

前端埋点新老访客状态标记设置规则

以神策提供的第三方埋点服务中新老访客状态标记设置规则为例

  • Web 端:用户第一次访问埋入神策 SDK 页面的当天(即第一天),JS SDK 会在网页的 cookie 中设置一个首日访问的标记,并设置第一天 24 点之前,该标记为 true,即第一天触发的网页端所有事件中,is_new = 1。第一天之后,该标记则为 false,即第一天之后触发的网页端所有事件中,is_new = 0
  • 小程序端:用户第一天访问埋入神策 SDK 的页面时,小程序 SDK 会在 storage 缓存中创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。即第一天触发的小程序端所有事件中,is_new = 1。第一天之后,该标记则为 false,即第一天之后触发的小程序端所有事件中,is_new = 0
  • APP 端:用户安装 App 后,第一次打开埋入神策 SDK 的 App 的当天,Android/iOS SDK 会在手机本地缓存内,创建一个首日为 true 的标记,并且设置第一天 24 点之前,该标记均为 true。即第一天触发的 APP 端所有事件中,is_new = 1。第一天之后,该标记则为 false,即第一天之后触发的 APP 端所有事件中,is_new = 0

        对于这类日志,如果首日之后用户清除了手机本地缓存中的标记,再次启动 APP 会重新设置一个首日为 true 的标记,导致本应为 0 的 is_new 字段被置为 1,可能会给相关指标带来误差。因此,有必要对新老访客状态标记进行修复。

修复思路

运用 Flink 状态编程,为每个 mid (machine id)维护一个键控状态,记录首次访问日期。

  • 如果 is_new 的值为 11代表是首日登录
  1. 如果键控状态为 null,认为本次是该访客首次访问 APP,将日志中 ts 对应的日期更新到状态中,不对 is_new 字段做修改;
  2. 如果键控状态不为 null,且首次访问日期不是当日,说明访问的是老访客,将 is_new 字段置为0
  3. 如果键控状态不为 null,且首次访问日期是当日,说明访问的是新访客,不做操作;
  • 如果 is_new 的值为 00代表是老用户
  1. 如果键控状态为 null,说明访问 APP 的是老访客但本次是该访客的页面日志首次进入程序。当前端新老访客状态标记丢失时,日志进入程序被判定为老访客,Flink 程序就可以纠正被误判的访客状态标记,只要将状态中的日期设置为今天之前即可。本程序选择将状态更新为昨日;
  2. 如果键控状态不为 null,说明程序已经维护了首次访问日期,不做操作。
创建时间工具类

        这里我们用的是自定义的时间工具类,因为 SimpleDateFormat 里面的 format 方法不是线程安全的,我们这里使用 LocalDateTime,并且封装了一些方法方便直接将时间戳转为日期字符串格式;

import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Date;public class DateFormatUtil {private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd");private static final DateTimeFormatter dtfFull = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");public static Long toTs(String dtStr, boolean isFull) {LocalDateTime localDateTime = null;if (!isFull) {dtStr = dtStr + " 00:00:00";}localDateTime = LocalDateTime.parse(dtStr, dtfFull);return localDateTime.toInstant(ZoneOffset.of("+8")).toEpochMilli();}public static Long toTs(String dtStr) {return toTs(dtStr, false);}public static String toDate(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtf.format(localDateTime);}public static String toYmdHms(Long ts) {Date dt = new Date(ts);LocalDateTime localDateTime = LocalDateTime.ofInstant(dt.toInstant(), ZoneId.systemDefault());return dtfFull.format(localDateTime);}public static void main(String[] args) {System.out.println(toYmdHms(System.currentTimeMillis()));}
}

接下来我们就可以对日志数据开始消费并对新老访客标记进行修复:

public class BaseLogApp {public static void main(String[] args) {// TODO 1. 获取执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1); // 生产环境中设置为kafka主题的分区数// 1.1 开启checkpointenv.enableCheckpointing(5 * 60000L, CheckpointingMode.EXACTLY_ONCE);env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/s/ck");env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 设置最大共存的checkpoint数量env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); // 固定频率重启: 尝试3次重启,每5s重启一次// 1.2 设置状态后端env.setStateBackend(new HashMapStateBackend());// TODO 2. 消费 kafka topic_logString topic = "topic_log";String groupId = "basic_log_app"; // 防止消费者组重复DataStreamSource<String> logDS = env.addSource(MyKafkaUtil.getFlinkKafkaConsumer(topic, groupId));// TODO 3. ETL(flume已经设置拦截器了,这里不需要处理)// TODO 4. 新老访客标志修复(状态编程)// 按照 mid 分组得到键控流KeyedStream<String, String> keyedStream = logDS.keyBy(json -> JSONObject.parseObject(json).getJSONObject("common").getString("mid"));// 富函数有open,close等很多方法和上下文信息SingleOutputStreamOperator<JSONObject> jsonWithNewFlagDS = keyedStream.map(new RichMapFunction<String, JSONObject>() {private ValueState<String> lastVisitState;@Overridepublic void open(Configuration parameters) throws Exception {lastVisitState = getRuntimeContext().getState(new ValueStateDescriptor<String>("last-visit", String.class));}@Overridepublic JSONObject map(String value) throws Exception {JSONObject jsonObject = JSONObject.parseObject(value);// 获取 is_new 标记 和 tsString is_new = jsonObject.getJSONObject("common").getString("is_new");Long ts = jsonObject.getLong("ts");// 将时间戳转换为 yyyy-MM-ddString date = DateFormatUtil.toDate(ts);// 获取状态中的日期String lastDate = lastVisitState.value();if ("1".equals(is_new)) { // 标记为1,代表是新访客if (lastDate == null) { // 确实是新访客lastVisitState.update(date);} else if (!lastDate.equals(date)) { // 不是新访客jsonObject.getJSONObject("common").put("is_new", "0");}} else { // 标记为0,不是新访客if (lastDate == null) { // 如果状态为null,说明是老访客的标记丢失需要修复(只要比现在的日期小即可)lastVisitState.update(DateFormatUtil.toDate(ts - 24 * 60 * 60 * 1000L));}}return jsonObject;}});// TODO 5. 使用侧输出流分流// TODO 6. 提取各个输出流数据// TODO 7. 写出到对应的主题// TODO 8. 启动程序}}

上面我们使用自定义的反序列化器去读取 Kafka topic_log 中的数据,在反序列化时,我们已经排除掉了 null 的 message,所以将来 Flink 作为生产者在序列化写入到 Kafka 时不需要再对 null 值做判断,因为不可能再存在 null 值了;

 3)分流(侧输出流)

通过分流对日志数据进行拆分,生成五张事务事实表写入 Kafka

  • 流量域页面浏览事务事实表(浏览日志)
  • 流量域启动事务事实表(启动日志)
  • 流量域动作事务事实表(动作日志)
  • 流量域曝光事务事实表(曝光日志)
  • 流量域错误事务事实表(错误日志)

实现思路

前端埋点获取的 JSON 字符串(日志)可能存在 common、start、page、displays、actions、err、ts 等七种字段。其中

  • common 对应的是公共信息,是所有日志都有的字段
  • err 对应的是错误信息,所有日志都可能有的字段
  • start 对应的是启动信息,启动日志才有的字段
  • page 对应的是页面信息,页面日志才有的字段
  • displays 对应的是曝光信息,曝光日志才有的字段,曝光日志可以归为页面日志,因此必然有 page 字段
  • actions 对应的是动作信息,动作日志才有的字段,同样属于页面日志,必然有 page 字段。动作信息和曝光信息可以同时存在。
  • ts 对应的是时间戳,单位:毫秒,所有日志都有的字段

综上,我们可以将前端埋点获取的日志分为两大类:启动日志页面日志。二者都有 common 字段和 ts 字段,都可能有 err 字段。

  • 页面日志一定有 page 字段,一定没有 start 字段,可能有 displays 和 actions 字段;
  • 启动日志一定有 start 字段,一定没有 page、displays 和 actions 字段。

①所有日志数据都可能拥有 err 字段,所以首先获取 err 字段,如果返回值不为 null 则将整条日志数据发送到错误侧输出流。然后删掉 JSONObject 中的 err 字段及对应值;

②判断是否有 start 字段,如果有则说明数据为启动日志,将其发送到启动侧输出流;如果没有则说明为页面日志,进行下一步;

③页面日志必然有 page 字段、 common 字段和 ts 字段,获取它们的值,ts 封装为包装类 Long,其余两个字段的值封装为 JSONObject;

④判断是否有 displays 字段,如果有,将其值封装为 JSONArray,遍历该数组,依次获取每个元素(记为 display),封装为JSONObject。创建一个空的 JSONObject,将 display、common、page和 ts 添加到该对象中,获得处理好的曝光数据,发送到曝光侧输出流。动作日志的处理与曝光日志相同(注意:一条页面日志可能既有曝光数据又有动作数据,二者没有任何关系,因此曝光数据不为 null 时仍要对动作数据进行处理);

⑤动作日志和曝光日志处理结束后删除 displays 和 actions 字段,此时主流的 JSONObject 中只有 common 字段、 page 字段和 ts 字段,即为最终的页面日志。

处理结束后,页面日志数据位于主流,其余四种日志分别位于对应的侧输出流,将五条流的数据写入 Kafka 对应主题即可。

使用侧输出流分流
// TODO 5. 使用侧输出流分流 (页面日志放到主流,其它的4种日志放到4个侧输出流)OutputTag<String> startTag = new OutputTag<>("start");OutputTag<String> displaysTag = new OutputTag<>("displays");OutputTag<String> actionsTag = new OutputTag<>("actions");OutputTag<String> errorTag = new OutputTag<>("error");SingleOutputStreamOperator<String> pageDS = jsonWithNewFlagDS.process(new ProcessFunction<JSONObject, String>() {/*** {*      "common":{*          "ar":"370000",*          "ba":"Xiaomi",*          "ch":"web",*          "is_new":"0",*          "md":"Xiaomi 10 Pro ",*          "mid":"mid_2190279",*          "os":"Android 11.0",*          "uid":"688",*          "vc":"v2.1.134"*          },*      "page":{*          "during_time":11863,*          "item":"34,27,14",*          "item_type":"sku_ids",*          "page_id":"trade"*          },*      "ts":1651304100000*  }*/@Overridepublic void processElement(JSONObject value, Context ctx, Collector<String> out) throws Exception {// TODO 1. 尝试获取错误日志// 每种日志都可能存在错误信息String err = value.getString("err");if (err != null && !"".equals(err)){ctx.output(errorTag,value.toJSONString());}// 移除错误字段value.remove("err");// TODO 2. 尝试获取启动日志String start = value.getString("start");if (start != null && !"".equals(start)){ctx.output(startTag,value.toJSONString());}else {// TODO 3. 尝试获取曝光日志// 获取公共信息和页面id和时间戳String common = value.getString("common");String page_id = value.getJSONObject("page").getString("page_id");String ts = value.getString("ts");JSONArray displays = value.getJSONArray("displays");if (displays != null && !displays.isEmpty()){for (int i = 0; i < displays.size(); i++) {JSONObject display = displays.getJSONObject(i);display.put("common",common);display.put("page_id",page_id);display.put("ts",ts);ctx.output(displaysTag,display.toJSONString());}}// TODO 4. 尝试获取动作日志JSONArray actions = value.getJSONArray("actions");if (actions != null && !actions.isEmpty()){for (int i = 0; i < actions.size(); i++) {JSONObject action = actions.getJSONObject(i);action.put("common",common);action.put("page_id",page_id);// 动作日志本身就有时间戳ctx.output(actionsTag,action.toJSONString());}}// 移除曝光和动作字段数据value.remove("display");value.remove("actions");// TODO 5. 获取页面日志out.collect(value.toJSONString()); // 写入到主流(页面日志)}}});
提取侧输出流并写入到 Kafka 主题

这里 Flink 充当生产者来把数据从 Kafka 的 topic_log 主题写入到 Kafka 的 5 个不同日志主题中:

// TODO 6. 提取各个流的数据DataStream<String> startDS = pageDS.getSideOutput(startTag);DataStream<String> displayDS = pageDS.getSideOutput(displaysTag);DataStream<String> actionDS = pageDS.getSideOutput(actionsTag);DataStream<String> errorDS = pageDS.getSideOutput(errorTag);// TODO 7. 写入到对应主题String page_topic = "dwd_traffic_page_log";String start_topic = "dwd_traffic_start_log";String display_topic = "dwd_traffic_display_log";String action_topic = "dwd_traffic_action_log";String error_topic = "dwd_traffic_error_log";pageDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(page_topic));startDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(start_topic));displayDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(display_topic));actionDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(action_topic));errorDS.addSink(MyKafkaUtil.getFlinkKafkaProducer(error_topic));// TODO 8. 启动任务env.execute("BaseLogApp");

总结

        至此,DWD 层的第一个需求就已经实现了,我们把日志数据写到了 5 个主题中,剩下的需求下次完成;

        明天是 8.1 ,又是新的一个月,也是我们离开主校区般往龙城校区的日子,希望等到下次回来主校区时,大学三年多的努力会有满意的结果!也希望自己可以有一个新的精神面貌!

        还有就是忙过2024这一年,一定要好好锻炼身体,大二还好,最近一年运动量太少了,天天久坐和不可避免的熬夜,拿命在肝!到时候一定要好好调养一下身体,狠狠锻炼!

        emm... 不要去焦虑还没发生的事情,做好当下的每一件大事小事,相信自己,加油!

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 《人性的枷锁:菲利普的人生探索能解开枷锁吗?》
  • 树套树模板
  • PYTHON专题-(5)类的专有方法
  • 每日学术速递8.3
  • Xilinx管脚验证流程及常见问题
  • conda环境pip 安装Tensorflow-gpu 2.10.2提示nbconvert 的包依赖冲突
  • OpenStack Yoga版安装笔记(十二)nova安装(下)
  • 林轩田机器学习基石——笔记1.2 Learn to Answer Yes/No(如何进行学习)
  • Flink 实时数仓(二)【DIM 层搭建】
  • 中介子方程七十九
  • Apache Kylin数据模型设计:从ETL到多维分析
  • 自闭症儿童无法上学?专业康复机构是希望的灯塔
  • OpenCV Python 图像相加与透明色转换
  • Day13--JavaWeb学习之Servlet后端渲染界面
  • 算法学习day28
  • 11111111
  • angular组件开发
  • Java Agent 学习笔记
  • Java 多线程编程之:notify 和 wait 用法
  • Java 内存分配及垃圾回收机制初探
  • javascript从右向左截取指定位数字符的3种方法
  • PAT A1092
  • vue 配置sass、scss全局变量
  • 不上全站https的网站你们就等着被恶心死吧
  • 分享一个自己写的基于canvas的原生js图片爆炸插件
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 网页视频流m3u8/ts视频下载
  • 如何用纯 CSS 创作一个货车 loader
  • # Pytorch 中可以直接调用的Loss Functions总结:
  • # 深度解析 Socket 与 WebSocket:原理、区别与应用
  • #传输# #传输数据判断#
  • (11)MSP430F5529 定时器B
  • (二十四)Flask之flask-session组件
  • (汇总)os模块以及shutil模块对文件的操作
  • (一)Java算法:二分查找
  • (转)大型网站架构演变和知识体系
  • (转)树状数组
  • (转载)Linux网络编程入门
  • (自适应手机端)响应式新闻博客知识类pbootcms网站模板 自媒体运营博客网站源码下载
  • .net core webapi 部署iis_一键部署VS插件:让.NET开发者更幸福
  • .NET/C# 使用 ConditionalWeakTable 附加字段(CLR 版本的附加属性,也可用用来当作弱引用字典 WeakDictionary)
  • .net生成的类,跨工程调用显示注释
  • .net中调用windows performance记录性能信息
  • .set 数据导入matlab,设置变量导入选项 - MATLAB setvaropts - MathWorks 中国
  • @JsonFormat与@DateTimeFormat注解的使用
  • [4]CUDA中的向量计算与并行通信模式
  • [Algorithm][动态规划][子序列问题][最长递增子序列][摆动序列]详细讲解
  • [c#基础]DataTable的Select方法
  • [c++] 单例模式 + cyberrt TimingWheel 单例分析
  • [C++] 轻熟类和对象
  • [codeforces]Levko and Permutation
  • [Docker]五.Docker中Dockerfile详解
  • [Git].gitignore失效的原因
  • [Godot] 3D拾取
  • [HNOI2010]BUS 公交线路