当前位置: 首页 > news >正文

实时即未来,车联网项目之将数据落地到文件系统和数据库【三】

文章目录

    • 实时读取流数据的步骤
    • 原始数据实时ETL任务分析 Hive
      • 将HDFS数据映射到Hive表
      • 自定义Sink数据写入Hive表(了解)
    • 原始数据实时ETL落地到HBase
      • HBase的rowkey设计原则
      • HBase的rowkey设计方法
      • 正常数据落地到HBase
    • 原始数据实时 ETL 任务 HBase 调优
      • 数据写入HBase优化 - 客户端优化
      • 数据写入HBase预分区
      • 数据写入HBase预写日志
      • 数据写入HBase使用压缩和编码

实时读取流数据的步骤

image-20210922085751532

原始数据实时ETL任务分析 Hive

将HDFS数据映射到Hive表

  • 需要指定的HDFS的目录

    image-20210922091210155

  • 回忆如何映射HDFS数据到Hive表中

    ① 创建表 create external table maynor_src (…) row formate delimited field terminate by ‘\t’ partitioned by(dt string) location ‘hdfs://node01…/maynor_src’;

    ② 使用数据库

    ③ 添加文件夹到指定分区

    ​ alter table maynor_src add partition(dt=‘20210922’) location ‘hdfs://node01:8020/apps/warehouse/ods.db/maynor_src/20210922’

    #!/bin/bash
    
    dt=`date -d '1 days ago' +'%Y%m%d'`
    tableName=$1
    
    ssh node03 `/export/server/hive/bin/hive -e "use maynor_ods;alter table ${tableName} add partition(dt=${dt}) location 'hdfs://node01:8020/apps/warehouse/ods.db/${tableName}/${dt}"`
    
  • 如何实现从HDFS中正确或错误的数据映射到Hive表中

  • 如何自动化HDFS数据到Hive表中

    # 使用shell 脚本
    alter table maynor_src add partition (dt="20210922") location "/apps/hive/warehouse/ods.db/maynor_src/20210922";
    
  • 如何执行 t+1 离线任务,设置调度的两种方式

    ① crontab

    ​ linux 自带调度

    ② 调度平台

    ​ azkaban airflow dolphinscheduler oozie 自研

自定义Sink数据写入Hive表(了解)

  • 实现步骤

    package cn.maynor.streaming.sink;
    
    import cn.maynor.streaming.entity.maynorDataObj;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.Statement;
    
    /**
     * Author maynor
     * Date 2021/9/22 10:02
     * Desc 将每条车辆的数据直接写入到 Hive 中
     */
    public class SaveErrorDataHiveSink extends RichSinkFunction<maynorDataObj> {
        //定义 logger
        private static final Logger logger = LoggerFactory.getLogger(SaveErrorDataHiveSink.class);
        //2.创建有参构造方法,参数包括数据库名和表名
        //定义变量
        private String dbName;
        private String tableName;
        //定义连接对象和statement对象
        private Connection conn = null;
        private Statement statement = null;
        //构造方法
        public SaveErrorDataHiveSink(String _dbName,String _tableName){
            this.dbName = _dbName;
            this.tableName = _tableName;
        }
    
        //3.重写open方法进行Hive连接的初始化
        @Override
        public void open(Configuration parameters) throws Exception {
            //3.1 将JDBC驱动 org.apache.hive.jdbc.HiveDriver 加载进来
            //获取全局参数
            ParameterTool parameterTool = (ParameterTool) getRuntimeContext()
                    .getExecutionConfig()
                    .getGlobalJobParameters();
            //获取当前上下文中 hive 的驱动
            Class.forName(parameterTool.getRequired("hive.driver"));
            //3.2 设置JDBC连接Hive的连接器,端口为10000
            conn = DriverManager.getConnection(
                    parameterTool.getRequired("hive.url"),
                    parameterTool.getRequired("hive.user"),
                    parameterTool.get("hive.password")
            );
            //3.3 创建Statement
            statement = conn.createStatement();
            //3.4 定义 schemaAndTableExists 实现库不存在创建库,表不存在创建表
            Boolean flag = schemaAndTableExists(dbName,tableName,statement);
            if(flag){
                logger.info("当前数据库和表初始化成功!");
            }else{
                logger.warn("请检查数据库和表!");
            }
        }
    
        //5.重写cloese方法 关闭连接
        @Override
        public void close() throws Exception {
            if(!statement.isClosed())statement.close();
            if(!conn.isClosed())conn.close();
        }
    
        //4.重写invoke将每条数据
        @Override
        public void invoke(maynorDataObj value, Context context) throws Exception {
            //4.1 编写SQL将数据插入到表中
            // insert into maynor_error values('11111');
            StringBuffer buffer = new StringBuffer();
            buffer.append("INSERT INTO "+tableName);
            buffer.append(" VALUES('");
            buffer.append(value.getErrorData()+"'");
            //4.2 执行statement.executeUpdate 将数据直接落地到Hive表
            statement.executeUpdate(buffer.toString());
        }
    
        //6.定义 schemaAndTableExists 方法 create database if not exists库或表, execute,选择数据库
    
        /**
         * 初始化数据库和数据表,如果初始化成功返回 true,否则 false
         * @param dbName
         * @param tableName
         * @param statement
         * @return
         */
        private Boolean schemaAndTableExists(String dbName, String tableName, Statement statement) {
            //数据库是否存在
            Boolean flag = true;
            try{
                //初始化数据库
                String createDBSQL="create database if not exists "+dbName;
                boolean executeDB = statement.execute(createDBSQL);
                if(executeDB){
                    logger.info("当前数据库创建成功");
                    flag = true;
                }else{
                    logger.info("当前数据库已经存在");
                    flag = true;
                }
                //初始化数据表
                String createTableSQL = "use "+tableName+";create table if not exists "+tableName+" (json string) partition by dt" +
                        " row formatted delimited field terminate by '\t' location '/apps/hive/warehouse/ods.db/maynor_error'";
                boolean executeTable = statement.execute(createTableSQL);
                if(executeTable){
                    logger.info("当前数据库表创建成功");
                    flag = true;
                }else{
                    logger.info("当前数据表已经存在");
                    flag = true;
                }
            }catch (Exception ex){
                logger.warn("初始化失败!");
                flag = false;
            }
            return flag;
        }
    }
    

原始数据实时ETL落地到HBase

  • 写入hbase的步骤和准备
    1. 写入的表名
    2. hbase的rowkey
    3. 写入的列簇 columnFamily
    4. 列名和列值

HBase的rowkey设计原则

① rowkey 的长度原则 , 16个字节

② rowkey 的散列原则 ,尽量保证离散

③ rowkey 的唯一原则 , rowkey不要一样

HBase的rowkey设计方法

① 加盐 —— 随机数

② Hash散列

③ 翻转字符串

正常数据落地到HBase

  • 开启 HBase 集群

    # 首先开启 hdfs ,zookeeper 
    /export/server/hbase/start-hbase.sh
    
  • 进入到 HBase命令行

    hbase shell
    
  • 创建HBase表 - maynor_src ,列簇为 cf

    # 查看hbase所有表
    list
    # 查看namespace(数据库)
    list_namespace
    # 创建数据表
    hbase(main):005:0> create 'maynor_src','cf'
    # 查看表中的数据
    scan 'maynor_src'
    
  • 开发步骤

    //1.创建 SrcDataToHBaseSink类继承 RichSinkFunction<maynorDataObj>
    //2.创建一个有参数-表名的构造方法
    //3.重写open方法
    //3.1 从上下文获取到全局的参数
    //3.2 设置hbase的配置,Zookeeper Quorum集群和端口和TableInputFormat的输入表
    //3.3 通过连接工厂创建连接
    //3.4 通过连接获取表对象
    //4.重写close方法
    //4.1 关闭hbase 表和连接资源
    //5. 重写 invoke 方法,将读取的数据写入到 hbase
    //5.1 setDataSourcePut输入参数value,返回put对象
    //6. 实现 setDataSourcePut 方法
    //6.1 如何设计rowkey VIN+时间戳翻转
    //6.2 定义列簇的名称
    //6.3 通过 rowkey 实例化 put
    //6.4 将所有的字段添加到put的字段中
    

原始数据实时 ETL 任务 HBase 调优

数据写入HBase优化 - 客户端优化

  • 为什么需要优化呢?

防止出现每条数据都读写 HBase 数据库,造成集群宕机和数据丢失。

  • 批量写入需要使用的缓存对象 - BufferedMutator 写数据的原理

    将数据按批次写入到 BufferedMutator 对象中,按时间或者按大小写入。

  • 代码逻辑优化

    BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tableName));
            params.writeBufferSize(10 * 1024 * 1024L);
            params.setWriteBufferPeriodicFlushTimeoutMs(5 * 1000L);
    
    //5.1 setDataSourcePut输入参数value,返回put对象
            try {
                Put put = setDataSourcePut(value);
                mutator.mutate(put);
                //5.2 指定时间内的数据强制刷写到hbase
                mutator.flush();
            }catch (Exception ex){
                logger.error("写入到hbase失败:"+ex.getMessage());
            }
    
  • 在主流程中将数据写入到 maynor_src

数据写入HBase预分区

  • 预分区的概念

  • 创建预分区的语法

    
    

数据写入HBase预写日志

  • 预写日志的作用

  • memstore在HBase读写作用

数据写入HBase使用压缩和编码

  • 编码压缩其实是对列数据的压缩

  • 编码压缩的优势

  • 编码类型

  • 创建一个 fast_diff 编码的 maynor_src 表

    alter 'maynor_src', { NAME => 'cf', DATA_BLOCKs_ENCODING => 'FAST_DIFF' }
    
  • 压缩算法

  • 创建一个 gz 或 snappy 压缩的 maynor_src_gz 表

    create 'maynor_src',{NAME => 'cf',COMPRESSION => 'gz'}create 'maynor_src_snappy', { NAME => 'cf', COMPRESSION => 'SNAPPY' }
    
  • 查看数据量大小

相关文章:

  • 非零基础自学Java (老师:韩顺平) 第23章 反射(reflection) 23.5 哪些类型 有 Class 对象
  • 一文精通数据库操作--mysql(25分钟)
  • 地理信息系统:绪论重点基础知识
  • deeplab v3论文精读
  • (附源码)ssm跨平台教学系统 毕业设计 280843
  • MSOX3052A是德混合信号示波器500MHz
  • python字符串拼接
  • 边缘计算:基于tflite实现安卓边缘端在线训练on device training(端侧训练)万字长文详解
  • Vue计算属性computed和监听属性watch的区别
  • Spring 单例注入其它 scope 的四种解决方法 @Scope 源码解读
  • FastAPI简介与快速体验
  • 《微信小程序案例9》小程序登录流程
  • 【Python初级人工智能精讲】用Paddlehub给一段没有标点符号的文字加上合适的标点符号
  • idea使用fiddler抓包分析,fiddler抓取https
  • (附源码)python旅游推荐系统 毕业设计 250623
  • Elasticsearch 参考指南(升级前重新索引)
  • JAVA 学习IO流
  • MyEclipse 8.0 GA 搭建 Struts2 + Spring2 + Hibernate3 (测试)
  • orm2 中文文档 3.1 模型属性
  • PAT A1120
  • spring boot下thymeleaf全局静态变量配置
  • SQLServer之索引简介
  • - 概述 - 《设计模式(极简c++版)》
  • 技术攻略】php设计模式(一):简介及创建型模式
  • 力扣(LeetCode)357
  • 巧用 TypeScript (一)
  • 深度解析利用ES6进行Promise封装总结
  • ​​​​​​​​​​​​​​Γ函数
  • ​io --- 处理流的核心工具​
  • $forceUpdate()函数
  • (20050108)又读《平凡的世界》
  • (Forward) Music Player: From UI Proposal to Code
  • (Spark3.2.0)Spark SQL 初探: 使用大数据分析2000万KF数据
  • (保姆级教程)Mysql中索引、触发器、存储过程、存储函数的概念、作用,以及如何使用索引、存储过程,代码操作演示
  • (分布式缓存)Redis分片集群
  • (附源码)springboot猪场管理系统 毕业设计 160901
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • (转)linux 命令大全
  • (转载)跟我一起学习VIM - The Life Changing Editor
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .NET Remoting Basic(10)-创建不同宿主的客户端与服务器端
  • .NET 中使用 TaskCompletionSource 作为线程同步互斥或异步操作的事件
  • .NET连接MongoDB数据库实例教程
  • .NET运行机制
  • .py文件应该怎样打开?
  • [ vulhub漏洞复现篇 ] Django SQL注入漏洞复现 CVE-2021-35042
  • [2019.3.20]BZOJ4573 [Zjoi2016]大森林
  • [51nod1610]路径计数
  • [Android Pro] android 混淆文件project.properties和proguard-project.txt
  • [Android]Android P(9) WIFI学习笔记 - 扫描 (1)
  • [Bada开发]初步入口函数介绍
  • [C]编译和预处理详解
  • [CF482B]Interesting Array
  • [CSDN首发]鱿鱼游戏的具体玩法详细介绍
  • [C语言]——C语言常见概念(1)