当前位置: 首页 > news >正文

【数仓建模过程】Spark数据清洗篇

1. 环境准备

首先,确保你有适当的Spark环境。你可以使用Spark SQL、Spark DataFrame API或Spark RDD API来处理数据。对于数据仓库任务,通常推荐使用Spark SQL或DataFrame API,因为它们更适合结构化数据处理。

2. 数据读取

从ODS层或其他数据源读取数据。Spark支持多种数据源,如Hive表、HDFS文件、关系型数据库等。

from pyspark.sql import SparkSession# 创建SparkSession
spark = SparkSession.builder \.appName("DWD Data Processing") \.enableHiveSupport() \.getOrCreate()# 从ODS层读取数据
ods_data = spark.sql("SELECT * FROM ods.your_table")

2. 数据质量检查

在进行数据清洗之前,我们应该先检查数据质量,以了解数据的现状。

from pyspark.sql.functions import col, count, when, isnan, isnull# 检查每列的空值和NaN值
null_nan_counts = ods_data.select([count(when(isnull(c) | isnan(c), c)).alias(c + '_null_or_nan_count')for c in ods_data.columns
])# 计算空值和NaN的百分比
total_count = ods_data.count()
null_nan_percentages = null_nan_counts.select([(col(c) / total_count * 100).alias(c + '_null_or_nan_percentage')for c in null_nan_counts.columns
])# 显示结果
null_nan_counts.show()
null_nan_percentages.show()# 检查唯一值
unique_counts = ods_data.agg(*[countDistinct(col(c)).alias(c + '_distinct_count') for c in ods_data.columns])
unique_counts.show()

3. 数据清洗和转换

基于数据质量检查的结果,我们可以更有针对性地进行数据清洗和转换。

from pyspark.sql.functions import col, when, count, lit
from pyspark.sql.types import IntegerType# 计算每行的非空值数量
row_counts = ods_data.select([(count(when(~(isnull(c) | isnan(c)), c)) / len(ods_data.columns)).alias('non_null_ratio')for c in ods_data.columns
])# 设定一个阈值,例如要求至少50%的列有非空值
threshold = 0.5# 筛选出满足条件的行
filtered_data = ods_data.withColumn('non_null_ratio', row_counts.non_null_ratio) \.filter(col('non_null_ratio') >= threshold) \.drop('non_null_ratio')print(f"原始数据行数: {ods_data.count()}")
print(f"过滤后数据行数: {filtered_data.count()}")# 处理剩余的空值和NaN
cleaned_data = filtered_datafor column in filtered_data.columns:# 根据数据类型和业务需求选择适当的填充值if filtered_data.schema[column].dataType == IntegerType():cleaned_data = cleaned_data.fillna({column: 0})else:cleaned_data = cleaned_data.fillna({column: "Unknown"})# 数据类型转换(如果需要)
converted_data = cleaned_data.withColumn("age", col("age").cast(IntegerType()))# 条件转换
processed_data = converted_data.withColumn("age_group", when(col("age") < 18, "Under 18").when((col("age") >= 18) & (col("age") < 60), "Adult").otherwise("Senior")
)## 4. 清洗后的数据质量检查在数据清洗和转换之后,我们应该再次检查数据质量,以确保我们的处理达到了预期效果。```python
# 再次检查空值和NaN(这次应该没有空值了)
post_clean_null_nan_counts = processed_data.select([count(when(isnull(c) | isnan(c), c)).alias(c + '_null_or_nan_count')for c in processed_data.columns
])post_clean_null_nan_counts.show()# 再次检查唯一值
post_clean_unique_counts = processed_data.agg(*[countDistinct(col(c)).alias(c + '_distinct_count') for c in processed_data.columns])
post_clean_unique_counts.show()

3. 数据清洗和转换

对数据进行必要的清洗和转换操作,以符合DWD层的要求。这可能包括处理空值、数据类型转换、去重等。

from pyspark.sql.functions import col, when, count, lit, length, to_timestamp, window, sum, from_unixtime, unix_timestamp
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.window import Window# 步骤1: 转换时间戳格式
filtered_data = ods_data.withColumn("timestamp",to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)# 确保时间戳转换成功
filtered_data = filtered_data.filter(col("timestamp").isNotNull())print(f"时间戳转换后,数据行数: {filtered_data.count()}")# 步骤2: 删除只有user_id,其他列全为null的数据
columns_except_user_id = [c for c in filtered_data.columns if c != 'user_id']
filtered_data = filtered_data.filter(sum([col(c).isNotNull() for c in columns_except_user_id]) > 0)print(f"删除只有user_id的行后,数据行数: {filtered_data.count()}")# 步骤3: 处理user_id在同一分钟内出现多次且状态没有变化的情况
window_spec = Window.partitionBy('user_id', window('timestamp', '1 minute'))
duplicate_data = filtered_data.withColumn('count', count('user_id').over(window_spec)) \.withColumn('status_changed', sum(col('status').cast('int')).over(window_spec) > 0) \.filter((col('count') > 1) & (col('status_changed') == False))deduplicated_data = filtered_data.join(duplicate_data, ['user_id', 'timestamp'], 'left_anti')print(f"去重后,数据行数: {deduplicated_data.count()}")# 步骤4: 处理数据格式错误
cleaned_data = deduplicated_data.filter((length(col('name')) <= 20) &  # 姓名不超过20个字符(col('age').between(0, 120)) &  # 年龄在0到120之间(col('email').rlike('^[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,4}$'))  # 邮箱格式检查
)print(f"处理数据格式错误后,数据行数: {cleaned_data.count()}")# 步骤5: 处理剩余的空值和NaN
for column in cleaned_data.columns:if cleaned_data.schema[column].dataType == IntegerType():cleaned_data = cleaned_data.fillna({column: 0})elif cleaned_data.schema[column].dataType == TimestampType():# 对于时间戳类型,我们可以选择填充为一个特定的日期,或者保持为null# 这里我们选择保持为nullpasselse:cleaned_data = cleaned_data.fillna({column: "Unknown"})# 步骤6: 数据类型转换和条件转换
converted_data = cleaned_data.withColumn("age", col("age").cast(IntegerType()))processed_data = converted_data.withColumn("age_group", when(col("age") < 18, "Under 18").when((col("age") >= 18) & (col("age") < 60), "Adult").otherwise("Senior")
)# 步骤7: 删除空值比例过高的行
row_counts = processed_data.select([(count(when(~(col(c).isNull() | isnan(col(c))), c)) / len(processed_data.columns)).alias('non_null_ratio')for c in processed_data.columns
])threshold = 0.5
final_data = processed_data.withColumn('non_null_ratio', row_counts.non_null_ratio) \.filter(col('non_null_ratio') >= threshold) \.drop('non_null_ratio')print(f"最终数据行数: {final_data.count()}")# 显示最终数据的样本,包括转换后的时间戳
final_data.select("user_id", "timestamp", "name", "age", "email", "status").show(5, truncate=False)

4. 数据质量检查

在导入DWD层之前,进行数据质量检查是很重要的。

# 示例:检查空值比例
null_counts = processed_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in processed_data.columns])
null_percentages = null_counts.select([((col(c) / processed_data.count()) * 100).alias(c) for c in null_counts.columns])# 示例:检查唯一值
unique_counts = processed_data.agg(*[countDistinct(col(c)).alias(c) for c in processed_data.columns])

5. 数据写入

将处理后的数据写入DWD层。通常,DWD层的数据会以Hive表的形式存储。

# 写入Hive表
processed_data.write \.mode("overwrite") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")

6. 数据分区

对于大型表,考虑使用分区来优化查询性能。

# 按日期分区写入
processed_data.write \.partitionBy("date") \.mode("overwrite") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")

7. 处理增量数据

在实际场景中,我们只需要处理增量数据而不是每次全量处理。

# 假设我们有一个最后处理时间
last_process_time = get_last_process_time()# 只处理新数据
new_data = spark.sql(f"SELECT * FROM ods.your_table WHERE update_time > '{last_process_time}'")# 处理新数据...# 将新数据追加到DWD表
new_processed_data.write \.mode("append") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")# 更新处理时间
update_last_process_time(current_timestamp)

8. 错误处理和日志记录

确保添加适当的错误处理和日志记录,以便于故障排除和监控。

import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)try:# 你的Spark处理代码logger.info("Data processing completed successfully")
except Exception as e:logger.error(f"Error occurred during data processing: {str(e)}")raise

9. 优化性能

根据需要使用Spark的优化技术,如缓存频繁使用的数据、调整分区等。

# 缓存频繁使用的数据
processed_data.cache()# 重分区以优化并行度
optimized_data = processed_data.repartition(100)

10. 数据血缘和元数据管理

记录数据的血缘关系和元数据,这对于数据治理和追踪非常重要。

# 示例:记录元数据
metadata = {"source_table": "ods.your_table","target_table": "dwd.your_dwd_table","process_date": current_date,"columns_processed": processed_data.columns
}
log_metadata(metadata)

通过以上步骤,你可以使用Spark有效地处理数据并将其导入DWD层。记住,实际的实现可能需要根据你的具体需求和数据特性进行调整。同时,定期监控和优化你的Spark作业以确保其效率和可靠性也是很重要的。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 深入理解DPO(Direct Preference Optimization)算法
  • erlang学习:用OTP构建系统1
  • 单链表——随机链表的复制
  • Mask R-CNN论文原理讲解
  • 【C#】静态成员(static)与实例成员(非静态成员)的理解
  • macos USB外接键盘ctrl键绑定方法 解决外接USB键盘与mac键盘不一致问题
  • JVM【面试题】2024最新
  • 【C++ | 设计模式】工厂方法模式的详解与实现
  • Kompose工具:转换Compose项目为K8S项目
  • 深度强化学习算法(三)(附带MATLAB程序)
  • priority_queue模拟
  • 【动态规划】区间dp
  • 通过SynchronousQueue方式实现线程间数据传递
  • 算法笔记|Day37动态规划X
  • Websocket笔记
  • 3.7、@ResponseBody 和 @RestController
  • Android 架构优化~MVP 架构改造
  • Angular4 模板式表单用法以及验证
  • Brief introduction of how to 'Call, Apply and Bind'
  • ECMAScript入门(七)--Module语法
  • nodejs调试方法
  • overflow: hidden IE7无效
  • spring security oauth2 password授权模式
  • Three.js 再探 - 写一个跳一跳极简版游戏
  • Web Storage相关
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 当SetTimeout遇到了字符串
  • 排序(1):冒泡排序
  • 推荐一个React的管理后台框架
  • 我看到的前端
  • 无服务器化是企业 IT 架构的未来吗?
  • 终端用户监控:真实用户监控还是模拟监控?
  • 走向全栈之MongoDB的使用
  • Oracle Portal 11g Diagnostics using Remote Diagnostic Agent (RDA) [ID 1059805.
  • 曾刷新两项世界纪录,腾讯优图人脸检测算法 DSFD 正式开源 ...
  • ​如何防止网络攻击?
  • #传输# #传输数据判断#
  • #我与Java虚拟机的故事#连载10: 如何在阿里、腾讯、百度、及字节跳动等公司面试中脱颖而出...
  • (04)odoo视图操作
  • (1)安装hadoop之虚拟机准备(配置IP与主机名)
  • (35)远程识别(又称无人机识别)(二)
  • (aiohttp-asyncio-FFmpeg-Docker-SRS)实现异步摄像头转码服务器
  • (C语言)strcpy与strcpy详解,与模拟实现
  • (ZT)出版业改革:该死的死,该生的生
  • (附源码)springboot家庭装修管理系统 毕业设计 613205
  • (学习日记)2024.03.25:UCOSIII第二十二节:系统启动流程详解
  • (转)Scala的“=”符号简介
  • (转)全文检索技术学习(三)——Lucene支持中文分词
  • (自适应手机端)行业协会机构网站模板
  • *++p:p先自+,然后*p,最终为3 ++*p:先*p,即arr[0]=1,然后再++,最终为2 *p++:值为arr[0],即1,该语句执行完毕后,p指向arr[1]
  • .form文件_SSM框架文件上传篇
  • .NET MVC、 WebAPI、 WebService【ws】、NVVM、WCF、Remoting
  • .NET/C# 使用 ConditionalWeakTable 附加字段(CLR 版本的附加属性,也可用用来当作弱引用字典 WeakDictionary)
  • ::
  • @SpringBootConfiguration重复加载报错