当前位置: 首页 > news >正文

湖仓一体电商项目(十四):实时任务执行流程

文章目录

实时任务执行流程

一、准备环境

二、启动Flink代码

三、启动数据采集接口代码

四、启动模拟数据代码


实时任务执行流程

目前暂时将项目在本地执行,执行顺序如下:

一、准备环境

这里默认HDFS、Hive、HBase、Kafka环境已经准备,启动maxwell组件监控mysql业务库数据:

#在Kafka中创建好对应的kafka topic(已创建的topic,可忽略,避免重复创建)
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DB-BUSSINESS-DATA --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-ODS-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DIM-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWD-USER-LOGIN-TOPIC --partitions 3 --replication-factor 3

./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic KAFKA-DWS-USER-LOGIN-WIDE-TOPIC --partitions 3 --replication-factor 3

#启动maxwell
[root@node3 ~]# cd /software/maxwell-1.28.2/bin
[root@node3 bin]#  maxwell --config ../config.properties

#在Hive中创建好需要的Iceberg各层的表
add jar /software/hive-3.1.2/lib/iceberg-hive-runtime-0.12.1.jar;
add jar /software/hive-3.1.2/lib/libfb303-0.9.3.jar;

CREATE TABLE ODS_MEMBER_INFO  (
id string,
user_id string,
member_growth_score string,
member_level string,
balance string,
gmt_create string,
gmt_modified string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_INFO/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);


CREATE TABLE ODS_MEMBER_ADDRESS  (
id string,
user_id string,
province string,
city string,
area string,
address string,
log string,
lat string,
phone_number string,
consignee_name string,
gmt_create string,
gmt_modified string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_MEMBER_ADDRESS/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE ODS_USER_LOGIN (
id string,
user_id string,
ip string,
login_tm string,
logout_tm string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/ODS_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE DWD_USER_LOGIN (
id string,
user_id string,
ip string,
login_tm string,
logout_tm string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWD_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);

CREATE TABLE DWS_USER_LOGIN (
user_id string,
ip string,
gmt_create string,
login_tm string,
logout_tm string,
member_level string,
province string,
city string,
area string,
address string,
member_points string,
balance string,
member_growth_score string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://mycluster/lakehousedata/icebergdb/DWS_USER_LOGIN/' 
TBLPROPERTIES ('iceberg.catalog'='location_based_table',
'write.metadata.delete-after-commit.enabled'= 'true',
'write.metadata.previous-versions-max' = '3'
);


#启动Clickhouse
[root@node1 ~]# service clickhouse-server start

#在Clickhouse中创建好对应表
create table dm_user_login_info(
 dt String,
 province String,
 city String,
 user_id String,
 login_tm String,
 gmt_create String
) engine = MergeTree() order by dt;

二、启动Flink代码

依次启动如下Flink代码:”ProduceKafkaDBDataToODS.scala”、“DimDataToHBase.scala”、“ProduceKafkaODSDataToDWD.scala”、“ProduceUserLogInToDWS.scala”、“ProcessUserLoginInfoToDM.scala”代码。各个代码中Kafka Connector属性“scan.startup.mode”设置为“latest-offset”,从最新位置消费数据。

注意:代码执行时可以设置使用内存参数:-Xmx300m -Xms300m

三、启动数据采集接口代码

启动项目“LakeHouseDataPublish”发布数据。

四、启动模拟数据代码

启动项目“LakeHouseMockData”中模拟向数据库中生产数据代码“RTMockDBData.java”。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨

相关文章:

  • 猿创征文|Java中的IO流大家族 (两万字详解)
  • SQL Server 2014安装教程(保姆级图解教程)
  • 大白话理解-微信小程序获取授权
  • Spring常用注解。
  • Spring之拦截器
  • 【知识图谱】Louvain、LPA等5类经典社区发现算法 Python 实战
  • SQL server 2008 r2 安装教程
  • python wechat --- 企业微信机器人API
  • 阿里云ossutil使用
  • Spring——事务注解@Transactional【建议收藏】
  • scanf、printf使用详解
  • 基于文化算法优化的神经网络预测研究(Matlab代码实现)
  • ubuntu安装mysql详细过程
  • SpringCloud面试题及答案 300道,springcloud面试题总结 (持续更新)
  • 图文详解核方法——以核支持向量机KVSM为例
  • 【React系列】如何构建React应用程序
  • 【跃迁之路】【477天】刻意练习系列236(2018.05.28)
  • 230. Kth Smallest Element in a BST
  • CentOS学习笔记 - 12. Nginx搭建Centos7.5远程repo
  • Django 博客开发教程 16 - 统计文章阅读量
  • download使用浅析
  • Java知识点总结(JavaIO-打印流)
  • js
  • Laravel 中的一个后期静态绑定
  • Odoo domain写法及运用
  • python docx文档转html页面
  • 代理模式
  • 对超线程几个不同角度的解释
  • 分享一份非常强势的Android面试题
  • 关于使用markdown的方法(引自CSDN教程)
  • 前端知识点整理(待续)
  • 视频flv转mp4最快的几种方法(就是不用格式工厂)
  • 测评:对于写作的人来说,Markdown是你最好的朋友 ...
  • # .NET Framework中使用命名管道进行进程间通信
  • #stm32驱动外设模块总结w5500模块
  • $HTTP_POST_VARS['']和$_POST['']的区别
  • (Matlab)使用竞争神经网络实现数据聚类
  • (二)Pytorch快速搭建神经网络模型实现气温预测回归(代码+详细注解)
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (算法)Game
  • (一)认识微服务
  • (转)关于多人操作数据的处理策略
  • (转)人的集合论——移山之道
  • (转)为C# Windows服务添加安装程序
  • .net core 6 集成 elasticsearch 并 使用分词器
  • .NET 发展历程
  • .net 获取url的方法
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地中转一个自定义的弱事件(可让任意 CLR 事件成为弱事件)
  • .net6 webapi log4net完整配置使用流程
  • .NetCore实践篇:分布式监控Zipkin持久化之殇
  • .net操作Excel出错解决
  • .NET使用存储过程实现对数据库的增删改查
  • .sh 的运行
  • @Autowired 与@Resource的区别
  • @data注解_SpringBoot 使用WebSocket打造在线聊天室(基于注解)