当前位置: 首页 > news >正文

基于AWS Serverless的Glue服务进行ETL(提取、转换和加载)数据分析(二)——数据清洗、转换

2 数据清洗、转换

此实验使用S3作为数据源

ETL:

E    extract         输入
T    transform     转换
L    load             输出

大纲

  • 2 数据清洗、转换
    • 2.1 架构图
    • 2.2 数据清洗
    • 2.3 编辑脚本
      • 2.3.1 连接数据源(s3)
      • 2.3.2. 数据结构转换
      • 2.3.2 数据结构拆分、定义
      • 2.3.3 清洗后的数据写入新s3
      • 2.3.4 运行作业
    • 2.4 数据分区
      • 2.4.1 编辑脚本
      • 2.4.2 运行脚本
    • 2.5 总结

2.1 架构图

在这里插入图片描述

2.2 数据清洗

此步会将S3中的原始数据清洗成我们想要的自定义结构的数据。之后,我们可通过APIGateway+Lambda+Athena来实现一个无服务器的数据分析服务。

步骤图例
1、入口在这里插入图片描述
2、创建Job(s3作为数据源,则Type选择Spark,若为Kinesis等,选择Stream Spark)在这里插入图片描述
3、IAM角色需要有s3与Glue的权限在这里插入图片描述
4、选择s3脚本位置,若已经完成脚本的编写工作,则可以选择第二项或第三项,若无则Glue会提供默认脚本在这里插入图片描述
5、安全配置参数在这里插入图片描述建议:添加参数–enable-auto-scaling为true。每次在我们执行Job任务时,会根据运行 ETL 任务的数据处理单元(DPU)的个数来分配动态IP,在我们子网的动态IP数低于DPU数时,Job将会执行失败。此参数将会动态分配IP。
6、数据源()在这里插入图片描述
7、数据目标(我们会将清洗后的数据存储到新的s3桶)在这里插入图片描述
8、设计架构(在本案例中,我们会自定义脚本。所以不再在此处设计架构)(此处设计后,脚本会自动生成相关代码)在这里插入图片描述
9、保存在这里插入图片描述

2.3 编辑脚本

脚本中的args参数的键值需要从Job的安全配置参数中定义

2.3.1 连接数据源(s3)

#数据源
datasource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "datasource")

2.3.2. 数据结构转换

mapped_readings = ApplyMapping.apply(frame = datasource, mappings = [("lclid", "string", "meter_id", "string"), \("datetime", "string", "reading_time", "string"), \("KWH/hh (per half hour)", "double", "reading_value", "double")], \transformation_ctx = "mapped_readings")

2.3.2 数据结构拆分、定义

mapped_readings_df = DynamicFrame.toDF(mapped_readings)mapped_readings_df = mapped_readings_df.withColumn("obis_code", lit(""))
mapped_readings_df = mapped_readings_df.withColumn("reading_type", lit("INT"))reading_time = to_timestamp(col("reading_time"), "yyyy-MM-dd HH:mm:ss")
mapped_readings_df = mapped_readings_df \.withColumn("week_of_year", weekofyear(reading_time)) \.withColumn("date_str", regexp_replace(col("reading_time").substr(1,10), "-", "")) \.withColumn("day_of_month", dayofmonth(reading_time)) \.withColumn("month", month(reading_time)) \.withColumn("year", year(reading_time)) \.withColumn("hour", hour(reading_time)) \.withColumn("minute", minute(reading_time)) \.withColumn("reading_date_time", reading_time) \.drop("reading_time")

2.3.3 清洗后的数据写入新s3

# write data to S3
filteredMeterReads = DynamicFrame.fromDF(mapped_readings_df, glueContext, "filteredMeterReads")s3_clean_path = "s3://" + args['clean_data_bucket']glueContext.write_dynamic_frame.from_options(frame = filteredMeterReads,connection_type = "s3",connection_options = {"path": s3_clean_path},format = "parquet",transformation_ctx = "s3CleanDatasink")

2.3.4 运行作业

    执行成功后,状态将变为"SUCCESS",失败将会给出失败信息,可在CloudWatch 中查看详情

在这里插入图片描述

在这里插入图片描述


清洗后的数据保存到了s3


在这里插入图片描述
数据清洗完毕后,可通过上一篇中的爬网程序步骤,将清洗后的数据的结构创建表到数据目录中,
此时我们可以使用Athena对清洗后的数据进行分析。

2.4 数据分区

接下来我们对数据进行分区处理(此处只提供了按天分区
重新进行数据清洗中的创建Job操作后,重写脚本

2.4.1 编辑脚本

连接数据源。表为上一步最后重新爬取生成的新表。

cleanedMeterDataSource = glueContext.create_dynamic_frame.from_catalog(database = args['db_name'], table_name = tableName, transformation_ctx = "cleanedMeterDataSource")

根据type与data_str分区

business_zone_bucket_path_daily = "s3://{}/daily".format(args['business_zone_bucket'])businessZone = glueContext.write_dynamic_frame.from_options(frame = cleanedMeterDataSource, \connection_type = "s3", \connection_options = {"path": business_zone_bucket_path_daily, "partitionKeys": ["reading_type", "date_str"]},\format = "parquet", \transformation_ctx = "businessZone")

2.4.2 运行脚本

分区后的数据结果:
在这里插入图片描述
再次创建、运行爬网程序,将会在数据目录中生成新的分区表。

2.5 总结

到这一步,我们已经使用Glue ETL对s3桶中的数据进行了清洗、分区操作。在进行上篇中的Athena操作后,我们已经可以通过Athena直接查询到清洗、分区后的数据集了。
接下来,我们会通过使用APIGateway+Lambda+Athena来构建一个无服务器的数据查询分析服务。

相关文章:

  • 单片机怎么实现真正的多线程?
  • 第一个小记录达成:第一个年费会员用户
  • 微服务--一篇入门kubernets
  • 【重点】【区间问题】56.合并区间
  • Unity 获取物体的子物体的方法
  • 11、pytest断言预期异常
  • JVM之jps虚拟机进程状态工具
  • MySQL之时间戳(DateTime和TimeStamp)
  • 深度学习设计基于Tensorflow卷积神经网络猫的品种识别系统
  • GAN:SNGAN-谱归一化GANs
  • BearPi Std 板从入门到放弃 - 引气入体篇(3)(上两篇前言)
  • STM32单片机项目实例:基于TouchGFX的智能手表设计(1)项目介绍及GUI界面基础
  • 创建conan包-打包现有二进制文件
  • 用HeidiSQL在MySQL中新建用户
  • JIRA 重建索引
  • [分享]iOS开发-关于在xcode中引用文件夹右边出现问号的解决办法
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • android 一些 utils
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • cookie和session
  • CSS 三角实现
  • ES6--对象的扩展
  • go语言学习初探(一)
  • Linux编程学习笔记 | Linux多线程学习[2] - 线程的同步
  • vue总结
  • 阿里云Kubernetes容器服务上体验Knative
  • 从零开始的webpack生活-0x009:FilesLoader装载文件
  • 订阅Forge Viewer所有的事件
  • 解析 Webpack中import、require、按需加载的执行过程
  • 让你的分享飞起来——极光推出社会化分享组件
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 手写双向链表LinkedList的几个常用功能
  • 通过获取异步加载JS文件进度实现一个canvas环形loading图
  • 问题之ssh中Host key verification failed的解决
  • 我从编程教室毕业
  • #QT(一种朴素的计算器实现方法)
  • (10)ATF MMU转换表
  • (9)STL算法之逆转旋转
  • (BFS)hdoj2377-Bus Pass
  • (js)循环条件满足时终止循环
  • (附表设计)不是我吹!超级全面的权限系统设计方案面世了
  • (附源码)php投票系统 毕业设计 121500
  • (附源码)spring boot公选课在线选课系统 毕业设计 142011
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统
  • (附源码)计算机毕业设计SSM智能化管理的仓库管理
  • (转)es进行聚合操作时提示Fielddata is disabled on text fields by default
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • *Django中的Ajax 纯js的书写样式1
  • .“空心村”成因分析及解决对策122344
  • .net 微服务 服务保护 自动重试 Polly
  • .NetCore Flurl.Http 升级到4.0后 https 无法建立SSL连接
  • .net操作Excel出错解决
  • .NET中两种OCR方式对比
  • .sh文件怎么运行_创建优化的Go镜像文件以及踩过的坑
  • @Repository 注解