当前位置: 首页 > news >正文

大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)

文章目录

    • 一、概述
    • 二、Hudi CLI
    • 三、Spark 与 Hudi 整合使用
      • 1)Spark 测试
      • 2)Spark 与 Hudi 整合使用
        • 1、启动spark-shell
        • 2、导入park及Hudi相关包
        • 3、定义变量
        • 4、模拟生成Trip乘车数据
        • 5、将模拟数据List转换为DataFrame数据集
        • 6、将数据写入到hudi
    • 四、Flink 与 Hudi 整合使用
      • 1)启动flink集群
      • 2) 启动flink SQL 客户端
      • 3)添加数据
      • 4)查询数据(批式查询)
      • 5)更新数据
      • 6)Streaming Query(流式查询)

一、概述

Hudi(Hadoop Upserts Deletes and Incrementals),简称Hudi,是一个流式数据湖平台,支持对海量数据快速更新,内置表格式,支持事务的存储层、 一系列表服务、数据服务(开箱即用的摄取工具)以及完善的运维监控工具,它可以以极低的延迟将数据快速存储到HDFS或云存储(S3)的工具,最主要的特点支持记录级别的插入更新(Upsert)和删除,同时还支持增量查询。

GitHub地址:https://github.com/apache/hudi

官方文档:https://hudi.apache.org/cn/docs/overview

关于Apache Hudi 数据湖 也可以参考我这篇文章:大数据Hadoop之——新一代流式数据湖平台 Apache Hudi

在这里插入图片描述

二、Hudi CLI

构建hudi后,可以通过cd hudi cli&&./hudi-cli.sh启动shell。一个hudi表驻留在DFS上的一个称为basePath的位置,我们需要这个位置才能连接到hudi表。Hudi库有效地在内部管理此表,使用.hoodie子文件夹跟踪所有元数据。

编译生成的包如下:
在这里插入图片描述

# 启动
./hudi-cli/hudi-cli.sh

在这里插入图片描述

三、Spark 与 Hudi 整合使用

Hudi 流式数据湖平台,协助管理数据,借助HDFS文件系统存储数据,使用Spark操作数据。
在这里插入图片描述

Hadoop 安装可参考我这篇文章:大数据Hadoop原理介绍+安装+实战操作(HDFS+YARN+MapReduce)
Hadoop HA安装可参考我这篇文章:大数据Hadoop之——Hadoop 3.3.4 HA(高可用)原理与实现(QJM)
Spark 环境配置可以参考我这篇文章:大数据Hadoop之——计算引擎Spark

1)Spark 测试

cd $SPARK_HOME
hdfs dfs -mkdir /tmp/
hdfs dfs -put README.md /tmp/
hdfs dfs -text /tmp/README.md

# 启动spark-shell
./bin/spark-shell --master local[2]

val datasRDD = sc.textFile("/tmp/README.md")
# 行数
datasRDD.count()
# 读取第一行数据
datasRDD.first()
val dataframe = spark.read.textFile("/tmp/README.md")
dataframe.printSchema
dataframe.show(10,false)

在这里插入图片描述

2)Spark 与 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/quick-start-guide/
在spark-shell命令行,对Hudi表数据进行操作,需要运行spark-shell命令是,添加相关的依赖包,命令如下:

1、启动spark-shell

【第一种方式】在线联网下载相关jar包

### 启动spark-shell,使用spark-shell操作hudi数据湖
### 第一种方式
./bin/spark-shell \
  --master local[2] \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.12.0 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

### 上述命令需要联网,基于ivy下载下载相关jar包到本地,然后加载到CLASSPATH,其中包含三个jar包。

【第二种方式】离线使用已经下载好的jar包

### 第二种方式,使用--jars
cd /opt/apache
wget https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.3.0/spark-avro_2.12-3.3.0.jar

cd $SPARK_HOME
./bin/spark-shell \
--master local[2] \
--jars  /opt/apache/hudi-0.12.0/packaging/hudi-spark-bundle/target/hudi-spark3.2-bundle_2.12-0.12.0.jar,/opt/apache/hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/unused-1.0.0.jar,/opt/apache/spark-avro_2.12-3.3.0.jar \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"

2、导入park及Hudi相关包

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

3、定义变量

val tableName = "hudi_trips_cow"
# 存储到HDFS
val basePath = "hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow"
# 存储到本地
# val basePath = "file:///tmp/hudi_trips_cow"

4、模拟生成Trip乘车数据

##构建DataGenerator对象,用于模拟生成10条Trip乘车数据
val dataGen = new DataGenerator
 
val inserts = convertToStringList(dataGen.generateInserts(10))

其中,DataGenerator可以用于生成测试数据,用来完成后续操作。

5、将模拟数据List转换为DataFrame数据集

##转成df
val df = spark.read.json(spark.sparkContext.parallelize(inserts,2))

##查看数据结构
df.printSchema()
##查看数据
df.show()
# 指定字段查询
df.select("rider","begin_lat","begin_lon","driver","end_lat","end_lon","fare","partitionpath","ts","uuid").show(10,truncate=false)

6、将数据写入到hudi

# 将数据保存到hudi表中,由于Hudi诞生时基于Spark框架,所以SparkSQL支持Hudi数据源,直接通过format指定数据源Source,设置相关属性保存数据即可,注意,hudi不是正真存储数据,而是管理数据。

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Overwrite).
  save(basePath)

## 重要参数说明
#参数:getQuickstartWriteConfigs,设置写入/更新数据至Hudi时,Shuffle时分区数目
#参数:PRECOMBINE_FIELD_OPT_KEY,数据合并时,依据主键字段
#参数:RECORDKEY_FIELD_OPT_KEY,每条记录的唯一id,支持多个字段
#参数:PARTITIONPATH_FIELD_OPT_KEY,用于存放数据的分区字段

本地存储
在这里插入图片描述
HDFS 存储
在这里插入图片描述

四、Flink 与 Hudi 整合使用

官方示例:https://hudi.apache.org/docs/flink-quick-start-guide

1)启动flink集群

下载地址:http://flink.apache.org/downloads.html

### 1、下载软件包
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz
tar -xf flink-1.14.6-bin-scala_2.12.tgz
export FLINK_HOME=/opt/apache/flink-1.14.6

### 2、设置HADOOP_CLASSPATH
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
export HADOOP_CONF_DIR='/opt/apache/hadoop/etc/hadoop'

### 3、启动单节点flink 集群
# Start the Flink standalone cluster,这里先修改slot数量,默认是1,这里改成4
# taskmanager.numberOfTaskSlots: 4
cd $FLINK_HOME
./bin/start-cluster.sh

# 测试可用性
./bin/flink run  examples/batch/WordCount.jar

在这里插入图片描述

2) 启动flink SQL 客户端

# 【第一种方式】指定jar包
./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell

# 【第二种方式】还可以将jar包放在$FINK_HOME/lib目录下
./bin/sql-client.sh embedded shell

3)添加数据

-- sets up the result mode to tableau to show the results directly in the CLI
SET 'sql-client.execution.result-mode' = 'tableau';

CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

INSERT INTO t1 VALUES ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1');
-- insert data using values
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

在这里插入图片描述
HDFS上查看
在这里插入图片描述

4)查询数据(批式查询)

select * from t1;

在这里插入图片描述

5)更新数据

-- this would update the record with key 'id1'
insert into t1 values
  ('id1','Danny',27,TIMESTAMP '1970-01-01 00:00:01','par1');

6)Streaming Query(流式查询)

首先创建表t2,设置相关属性,以流的方式查询读取,映射到上面表:t1

  • read.streaming.enabled 设置为true,表明通过streaming的方式读取表数据;
  • read.streaming.check-interval 指定了source监控新的commits的间隔时间4s
  • table.type 设置表类型为 MERGE_ON_READ
CREATE TABLE t2(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/flink-hudi-t1',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  -- this option enable the streaming read
  'read.start-commit' = '20210316134557', -- specifies the start commit instant time
  'read.streaming.check-interval' = '4' -- specifies the check interval for finding new source commits, default 60s.
);

-- Then query the table in stream mode
select * from t2;

注意:查看可能会遇到如下错误:

[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat

【解决】添加hadoop-mapreduce-client-core-xxx.jarhive-exec-xxx.jar到Flink lib中。

cp /opt/apache/hadoop-3.3.2/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.2.jar $FLINK_HOME/lib
cp ./hudi-0.12.0/hudi-examples/hudi-examples-spark/target/lib/hive-exec-2.3.1-core.jar $FLINK_HOME/lib

在这里插入图片描述
Hive 与 Hudi的整合,小伙伴可以先看官网文档:https://hudi.apache.org/docs/syncing_metastore/#flink-setup

Spark 和 Hudi整合,Flink 与 Hudi整合先到这里了,还有很多其它大数据组件与Hudi的整合示例讲解会放在后面文章讲解,请小伙伴耐心等待,有任何疑问欢迎留言,会持续更新【大数据+云原生】相关的文章~

相关文章:

  • 智能机器人项目,安装人脸识别face_recognition报错解决
  • EMQX(emqtt)安装与入门
  • ansible常用模块的用法和ansible基于模块方式实现LNMP
  • ArcGIS || ENVI:如何将彩色影像拆分为R、G、B以及H、S、I(B/V)影像?
  • 【英语:基础进阶_原著扩展阅读】J6.原著阅读实战训练
  • SpringBoot学习1—安装与配置
  • 10月笔试面试记录
  • 【概率论与数理统计(研究生课程)】知识点总结9(回归分析)
  • 1-2Java程序运行机制以及运行过程
  • 初次使用Ubuntu18.04遇到的问题——笔记4 (Ubuntu18.04+Anaconda+Pycharm+Pytorch)
  • apache服务web页面执行shell脚本
  • git如何回滚,返回到之前的记录
  • Qt实现侧边栏显示隐藏以及自定义提示框
  • ESP8266/esp32接入阿里云物联网平台点灯控制类案例
  • 【从小白到大白05】c和c++内存管理
  • [数据结构]链表的实现在PHP中
  • 【5+】跨webview多页面 触发事件(二)
  • 【划重点】MySQL技术内幕:InnoDB存储引擎
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • JavaScript设计模式与开发实践系列之策略模式
  • Redis字符串类型内部编码剖析
  • Spring核心 Bean的高级装配
  • supervisor 永不挂掉的进程 安装以及使用
  • underscore源码剖析之整体架构
  • uni-app项目数字滚动
  • Vue官网教程学习过程中值得记录的一些事情
  • 对象引论
  • 关于字符编码你应该知道的事情
  • 王永庆:技术创新改变教育未来
  • 网络应用优化——时延与带宽
  • 移动端解决方案学习记录
  • AI又要和人类“对打”,Deepmind宣布《星战Ⅱ》即将开始 ...
  • Python 之网络式编程
  • 昨天1024程序员节,我故意写了个死循环~
  • ​520就是要宠粉,你的心头书我买单
  • ​RecSys 2022 | 面向人岗匹配的双向选择偏好建模
  • ​一、什么是射频识别?二、射频识别系统组成及工作原理三、射频识别系统分类四、RFID与物联网​
  • # .NET Framework中使用命名管道进行进程间通信
  • #设计模式#4.6 Flyweight(享元) 对象结构型模式
  • (2022 CVPR) Unbiased Teacher v2
  • (22)C#传智:复习,多态虚方法抽象类接口,静态类,String与StringBuilder,集合泛型List与Dictionary,文件类,结构与类的区别
  • (Forward) Music Player: From UI Proposal to Code
  • (附程序)AD采集中的10种经典软件滤波程序优缺点分析
  • (黑客游戏)HackTheGame1.21 过关攻略
  • (六)vue-router+UI组件库
  • (转)Java socket中关闭IO流后,发生什么事?(以关闭输出流为例) .
  • *Algs4-1.5.25随机网格的倍率测试-(未读懂题)
  • .gitattributes 文件
  • .Mobi域名介绍
  • .NET 5种线程安全集合
  • .NET 8.0 中有哪些新的变化?
  • .NET CF命令行调试器MDbg入门(一)
  • .NET Core6.0 MVC+layui+SqlSugar 简单增删改查
  • .Net Framework 4.x 程序到底运行在哪个 CLR 版本之上