【数仓建模过程】Spark数据清洗篇
1. 环境准备
首先,确保你有适当的Spark环境。你可以使用Spark SQL、Spark DataFrame API或Spark RDD API来处理数据。对于数据仓库任务,通常推荐使用Spark SQL或DataFrame API,因为它们更适合结构化数据处理。
2. 数据读取
从ODS层或其他数据源读取数据。Spark支持多种数据源,如Hive表、HDFS文件、关系型数据库等。
from pyspark.sql import SparkSession# 创建SparkSession
spark = SparkSession.builder \.appName("DWD Data Processing") \.enableHiveSupport() \.getOrCreate()# 从ODS层读取数据
ods_data = spark.sql("SELECT * FROM ods.your_table")
2. 数据质量检查
在进行数据清洗之前,我们应该先检查数据质量,以了解数据的现状。
from pyspark.sql.functions import col, count, when, isnan, isnull# 检查每列的空值和NaN值
null_nan_counts = ods_data.select([count(when(isnull(c) | isnan(c), c)).alias(c + '_null_or_nan_count')for c in ods_data.columns
])# 计算空值和NaN的百分比
total_count = ods_data.count()
null_nan_percentages = null_nan_counts.select([(col(c) / total_count * 100).alias(c + '_null_or_nan_percentage')for c in null_nan_counts.columns
])# 显示结果
null_nan_counts.show()
null_nan_percentages.show()# 检查唯一值
unique_counts = ods_data.agg(*[countDistinct(col(c)).alias(c + '_distinct_count') for c in ods_data.columns])
unique_counts.show()
3. 数据清洗和转换
基于数据质量检查的结果,我们可以更有针对性地进行数据清洗和转换。
from pyspark.sql.functions import col, when, count, lit
from pyspark.sql.types import IntegerType# 计算每行的非空值数量
row_counts = ods_data.select([(count(when(~(isnull(c) | isnan(c)), c)) / len(ods_data.columns)).alias('non_null_ratio')for c in ods_data.columns
])# 设定一个阈值,例如要求至少50%的列有非空值
threshold = 0.5# 筛选出满足条件的行
filtered_data = ods_data.withColumn('non_null_ratio', row_counts.non_null_ratio) \.filter(col('non_null_ratio') >= threshold) \.drop('non_null_ratio')print(f"原始数据行数: {ods_data.count()}")
print(f"过滤后数据行数: {filtered_data.count()}")# 处理剩余的空值和NaN
cleaned_data = filtered_datafor column in filtered_data.columns:# 根据数据类型和业务需求选择适当的填充值if filtered_data.schema[column].dataType == IntegerType():cleaned_data = cleaned_data.fillna({column: 0})else:cleaned_data = cleaned_data.fillna({column: "Unknown"})# 数据类型转换(如果需要)
converted_data = cleaned_data.withColumn("age", col("age").cast(IntegerType()))# 条件转换
processed_data = converted_data.withColumn("age_group", when(col("age") < 18, "Under 18").when((col("age") >= 18) & (col("age") < 60), "Adult").otherwise("Senior")
)## 4. 清洗后的数据质量检查在数据清洗和转换之后,我们应该再次检查数据质量,以确保我们的处理达到了预期效果。```python
# 再次检查空值和NaN(这次应该没有空值了)
post_clean_null_nan_counts = processed_data.select([count(when(isnull(c) | isnan(c), c)).alias(c + '_null_or_nan_count')for c in processed_data.columns
])post_clean_null_nan_counts.show()# 再次检查唯一值
post_clean_unique_counts = processed_data.agg(*[countDistinct(col(c)).alias(c + '_distinct_count') for c in processed_data.columns])
post_clean_unique_counts.show()
3. 数据清洗和转换
对数据进行必要的清洗和转换操作,以符合DWD层的要求。这可能包括处理空值、数据类型转换、去重等。
from pyspark.sql.functions import col, when, count, lit, length, to_timestamp, window, sum, from_unixtime, unix_timestamp
from pyspark.sql.types import IntegerType, TimestampType
from pyspark.sql.window import Window# 步骤1: 转换时间戳格式
filtered_data = ods_data.withColumn("timestamp",to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss")
)# 确保时间戳转换成功
filtered_data = filtered_data.filter(col("timestamp").isNotNull())print(f"时间戳转换后,数据行数: {filtered_data.count()}")# 步骤2: 删除只有user_id,其他列全为null的数据
columns_except_user_id = [c for c in filtered_data.columns if c != 'user_id']
filtered_data = filtered_data.filter(sum([col(c).isNotNull() for c in columns_except_user_id]) > 0)print(f"删除只有user_id的行后,数据行数: {filtered_data.count()}")# 步骤3: 处理user_id在同一分钟内出现多次且状态没有变化的情况
window_spec = Window.partitionBy('user_id', window('timestamp', '1 minute'))
duplicate_data = filtered_data.withColumn('count', count('user_id').over(window_spec)) \.withColumn('status_changed', sum(col('status').cast('int')).over(window_spec) > 0) \.filter((col('count') > 1) & (col('status_changed') == False))deduplicated_data = filtered_data.join(duplicate_data, ['user_id', 'timestamp'], 'left_anti')print(f"去重后,数据行数: {deduplicated_data.count()}")# 步骤4: 处理数据格式错误
cleaned_data = deduplicated_data.filter((length(col('name')) <= 20) & # 姓名不超过20个字符(col('age').between(0, 120)) & # 年龄在0到120之间(col('email').rlike('^[a-zA-Z0-9._-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,4}$')) # 邮箱格式检查
)print(f"处理数据格式错误后,数据行数: {cleaned_data.count()}")# 步骤5: 处理剩余的空值和NaN
for column in cleaned_data.columns:if cleaned_data.schema[column].dataType == IntegerType():cleaned_data = cleaned_data.fillna({column: 0})elif cleaned_data.schema[column].dataType == TimestampType():# 对于时间戳类型,我们可以选择填充为一个特定的日期,或者保持为null# 这里我们选择保持为nullpasselse:cleaned_data = cleaned_data.fillna({column: "Unknown"})# 步骤6: 数据类型转换和条件转换
converted_data = cleaned_data.withColumn("age", col("age").cast(IntegerType()))processed_data = converted_data.withColumn("age_group", when(col("age") < 18, "Under 18").when((col("age") >= 18) & (col("age") < 60), "Adult").otherwise("Senior")
)# 步骤7: 删除空值比例过高的行
row_counts = processed_data.select([(count(when(~(col(c).isNull() | isnan(col(c))), c)) / len(processed_data.columns)).alias('non_null_ratio')for c in processed_data.columns
])threshold = 0.5
final_data = processed_data.withColumn('non_null_ratio', row_counts.non_null_ratio) \.filter(col('non_null_ratio') >= threshold) \.drop('non_null_ratio')print(f"最终数据行数: {final_data.count()}")# 显示最终数据的样本,包括转换后的时间戳
final_data.select("user_id", "timestamp", "name", "age", "email", "status").show(5, truncate=False)
4. 数据质量检查
在导入DWD层之前,进行数据质量检查是很重要的。
# 示例:检查空值比例
null_counts = processed_data.select([sum(col(c).isNull().cast("int")).alias(c) for c in processed_data.columns])
null_percentages = null_counts.select([((col(c) / processed_data.count()) * 100).alias(c) for c in null_counts.columns])# 示例:检查唯一值
unique_counts = processed_data.agg(*[countDistinct(col(c)).alias(c) for c in processed_data.columns])
5. 数据写入
将处理后的数据写入DWD层。通常,DWD层的数据会以Hive表的形式存储。
# 写入Hive表
processed_data.write \.mode("overwrite") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")
6. 数据分区
对于大型表,考虑使用分区来优化查询性能。
# 按日期分区写入
processed_data.write \.partitionBy("date") \.mode("overwrite") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")
7. 处理增量数据
在实际场景中,我们只需要处理增量数据而不是每次全量处理。
# 假设我们有一个最后处理时间
last_process_time = get_last_process_time()# 只处理新数据
new_data = spark.sql(f"SELECT * FROM ods.your_table WHERE update_time > '{last_process_time}'")# 处理新数据...# 将新数据追加到DWD表
new_processed_data.write \.mode("append") \.format("parquet") \.saveAsTable("dwd.your_dwd_table")# 更新处理时间
update_last_process_time(current_timestamp)
8. 错误处理和日志记录
确保添加适当的错误处理和日志记录,以便于故障排除和监控。
import logginglogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)try:# 你的Spark处理代码logger.info("Data processing completed successfully")
except Exception as e:logger.error(f"Error occurred during data processing: {str(e)}")raise
9. 优化性能
根据需要使用Spark的优化技术,如缓存频繁使用的数据、调整分区等。
# 缓存频繁使用的数据
processed_data.cache()# 重分区以优化并行度
optimized_data = processed_data.repartition(100)
10. 数据血缘和元数据管理
记录数据的血缘关系和元数据,这对于数据治理和追踪非常重要。
# 示例:记录元数据
metadata = {"source_table": "ods.your_table","target_table": "dwd.your_dwd_table","process_date": current_date,"columns_processed": processed_data.columns
}
log_metadata(metadata)
通过以上步骤,你可以使用Spark有效地处理数据并将其导入DWD层。记住,实际的实现可能需要根据你的具体需求和数据特性进行调整。同时,定期监控和优化你的Spark作业以确保其效率和可靠性也是很重要的。