当前位置: 首页 > news >正文

实时即未来,车联网项目之原始终端数据实时ETL【二】

文章目录

    • Flink 将报文解析后的数据推送到 kafka 中
    • 实时ETL开发
    • 原始数据的实时ETL设置
      • 开发的流程
      • 开发的类名 —— KafkaSourceDataTask
      • 设置 checkpoint 中 statebackend
      • 数据积压和反压机制
      • 抽象 BaseTask 用于处理数据流和读取kafka数据

Flink 将报文解析后的数据推送到 kafka 中

  • 步骤

    1. 开启 kafka 集群

      # 三台节点都要开启 kafka 
      [root@node01 kafka]# bin/kafka-server-start.sh -daemon config/server.properties
      
    2. 使用 kafka tool 连接 kafka 集群,创建 topic

      # 第1种方式通过命令
      bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic vehicledata --replication-factor 2 --partitions 3
      # 查看 kafka topic 的列表
      bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
      # 第2种 kafka tool 工具
      

image-20210920091015186

  1. 通过 flink 将解析后的报文 json 字符串推送到 kafka 中

    package cn.maynor.flink.source;
    
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import javax.annotation.Nullable;
    import java.util.Properties;
    
    /**
     * Author maynor
     * Date 2021/9/20 9:11
     * 实现flink将数据写入到kafka集群中
     * 开发步骤:
     * 1.开启流处理环境
     * 2.设置并行度、chk、重启策略等参数
     * 3.创建FlinkKafkaProducer类
     * 3.1.配置属性
     * 4.设置数据源
     * 5.执行流处理环境
     */
    public class FlinkKafkaWriter {
        public static void main(String[] args) {
            //1.开启流处理环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.设置并行度、chk、重启策略等参数
            env.setParallelism(1);
            //2.1.读取车辆 json 数据
            DataStreamSource<String> source = env
                    .readTextFile("F:\\1.授课视频\\4-车联网项目\\05_深圳24期\\全部讲义\\2-星途车联网系统第二章-原始终端数据实时ETL\\原始数据\\sourcedata.txt");
            //3.创建FlinkKafkaProducer类
            //3.1.配置属性
            Properties props = new Properties();
            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
            props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "5");
            props.setProperty(ProducerConfig.ACKS_CONFIG, "0");
            //props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.flink.api.common.serialization.SimpleStringSchema");
    
            //3.2.实例化FlinkKafkaProducer
            FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                    "vehicledata",
                    new KafkaSerializationSchema<String>() {
                        @Override
                        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                            return new ProducerRecord(
                                    "vehicledata",
                                    element.getBytes()
                            );
                        }
                    },
                    props,
                    FlinkKafkaProducer.Semantic.NONE
            );
    
            //4.设置数据源
            source.addSink(producer);
            //5.执行流处理环境
            try {
                env.execute();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

实时ETL开发

  • 创建模块 —— StreamingAnalysis
  • 导入项目的 pom 依赖
  • 常见包的含义 task , source ,sink ,entity
  • 配置文件的导入 conf.properties 和 logback.xml
  • 工具类的走读
    • 日期处理
    • 读取配置文件 静态代码块
    • 字符串常用工具 - 字符串翻转
    • JSON 字符串转对象

原始数据的实时ETL设置

开发的流程

image-20210920113801725

开发的类名 —— KafkaSourceDataTask

		//todo 1.创建流执行环境
        //todo 2.设置并行度 ①配置文件并行度设置 ②客户端设置 flink run -p 2 ③在程序中 env.setParallel(2) ④算子上并行度(级别最高)
        //todo 3.开启checkpoint及相应的配置,最大容忍次数,最大并行checkpoint个数,checkpoint间最短间隔时间,checkpoint的最大
        //todo 容忍的超时时间,checkpoint如果取消是否删除checkpoint 等
        //todo 4.开启重启策略
        //todo 5. 读取kafka中的数据
        //todo 5.1 设置 FlinkKafkaConsumer
        //todo 5.2 配置参数
        //todo 5.3 消费 kafka 的offset 提交给 flink 来管理
        //todo 6 env.addSource
        //todo 7 打印输出
        //todo 8 将读取出来的 json 字符串转换成 maynorDataObj
        //todo 9 将数据拆分成正确的数据和异常的数据
        //todo 10 将正确的数据保存到 hdfs
        //todo 11 将错误的数据保存到 hdfs 上
        //todo 12 将正确的数据写入到 hbase 中
        //todo 8 执行流环境

设置 checkpoint 中 statebackend

  • 配置的地方有两种

    1. 配置文件中 flink-conf.yaml
    2. 在 job 中配置 env.setStateBackend()
  • 配置的方式三种

    1. memorystatebackend
    2. fsStatebackend
    3. rocksdbStatebackend(状态特别大的使用)
  • 配置读取kafka的数据的设置

    
    

数据积压和反压机制

  • 就是生产的数据大于消费的数据的速度,造成数据的积压

  • 解决反压机制的方法

    image-20210809160703505

    通过 credit 和 反压策略解决数据堆积问题

    image-20210920162227213

抽象 BaseTask 用于处理数据流和读取kafka数据

  • 将公共的固定的代码抽象出来 BaseTask 抽象类
  • 使用 Flink 的自带的 ParameterTool 来接收 client 或 配置文件中的配置

相关文章:

  • python 的re.findall的Bug以及解决方法
  • 在Windows系统上部署DHCP服务器
  • Java多线程~CAS的原理及其应用
  • [CSS]盒子模型
  • 【 C++ 】开散列哈希桶的模拟实现
  • 阿里云数据库(RDS)查看空间使用情况
  • 【C++编程语言】之 文件操作
  • 人生模式 - 如何跟潜意识对话
  • ubuntu18.04安装redis
  • 02 LaTeX文字实战应用
  • Flash:Flash动画设计软件界面的简介、Flash AS 3.0代码编程入门教程之详细攻略
  • C语言进阶——自定义类型
  • 微信公众号网课查题系统
  • golang学习笔记系列之函数
  • VJ_Dressing_思维
  • 9月CHINA-PUB-OPENDAY技术沙龙——IPHONE
  • 【162天】黑马程序员27天视频学习笔记【Day02-上】
  • 【附node操作实例】redis简明入门系列—字符串类型
  • 【前端学习】-粗谈选择器
  • GitUp, 你不可错过的秀外慧中的git工具
  • Hibernate最全面试题
  • JavaScript学习总结——原型
  • Java方法详解
  • Spring核心 Bean的高级装配
  • 案例分享〡三拾众筹持续交付开发流程支撑创新业务
  • 前端性能优化——回流与重绘
  • 世界编程语言排行榜2008年06月(ActionScript 挺进20强)
  • 腾讯优测优分享 | 你是否体验过Android手机插入耳机后仍外放的尴尬?
  • 运行时添加log4j2的appender
  • media数据库操作,可以进行增删改查,实现回收站,隐私照片功能 SharedPreferences存储地址:
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • 阿里云ACE认证学习知识点梳理
  • 基于django的视频点播网站开发-step3-注册登录功能 ...
  • ​3ds Max插件CG MAGIC图形板块为您提升线条效率!
  • ​ubuntu下安装kvm虚拟机
  • #我与Java虚拟机的故事#连载03:面试过的百度,滴滴,快手都问了这些问题
  • #我与Java虚拟机的故事#连载06:收获颇多的经典之作
  • #我与Java虚拟机的故事#连载18:JAVA成长之路
  • (01)ORB-SLAM2源码无死角解析-(66) BA优化(g2o)→闭环线程:Optimizer::GlobalBundleAdjustemnt→全局优化
  • (14)目标检测_SSD训练代码基于pytorch搭建代码
  • (4)事件处理——(7)简单事件(Simple events)
  • (BFS)hdoj2377-Bus Pass
  • (NO.00004)iOS实现打砖块游戏(九):游戏中小球与反弹棒的碰撞
  • (zt)最盛行的警世狂言(爆笑)
  • (二)fiber的基本认识
  • (附源码)spring boot基于小程序酒店疫情系统 毕业设计 091931
  • (附源码)springboot 基于HTML5的个人网页的网站设计与实现 毕业设计 031623
  • (附源码)SSM环卫人员管理平台 计算机毕设36412
  • (三)uboot源码分析
  • (深度全面解析)ChatGPT的重大更新给创业者带来了哪些红利机会
  • (十八)用JAVA编写MP3解码器——迷你播放器
  • (算法)前K大的和
  • (一)基于IDEA的JAVA基础1
  • (转)视频码率,帧率和分辨率的联系与区别
  • (转载)利用webkit抓取动态网页和链接