当前位置: 首页 > news >正文

Hive自定义Serde,实现自定义多字符串作为分隔符

        因为Hive提供的Serde只支持单字符作为分隔符,这样可能会导致因为某些属性中的值带有特殊字符,导致将CSV等文档load到Hive 表中时数据格式出现异常,数据出现偏差。所以我自己写了一个自定义的Serde来实现多字符作为分隔符来防止出现上面的问题。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
//import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;//import java.io.IOException;
//import java.io.StringReader;
//import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;/**** @author Liuhd* @datetime 2024/7/11 16:05* @description 多字符分隔符CSV序列化反序列化实现* @version 1.0*/
@SerDeSpec(schemaProps = {"columns", "custom.delimiter"})
public final class MultiCharDelimiterCSVSerde2 extends AbstractSerDe {private static final Logger LOG = LoggerFactory.getLogger(MultiCharDelimiterCSVSerde2.class.getName());// 定义正则表达式特殊字符模式为私有静态常量private static final String REGEX_SPECIAL_CHARS_PATTERN = "([\\\\^$.|?*+()\\[\\]{}])";private ObjectInspector inspector;private int numCols;private List<String> row;private String customDelimiter;//如果自定义分隔符在整个表操作过程中不会改变,可以将转义后的正则表达式缓存起来,而不是每次 deserialize 调用都进行转义。private String escapedDelimiter;public MultiCharDelimiterCSVSerde2() {}/*** 初始化函数* 通过读取configuration和properties对象来设置内部状态* 主要功能是解析列名,设置对象检查器,并初始化一些内部变量** @param conf Hadoop的Configuration对象,用于获取表的配置信息* @param tbl Properties对象,包含表的元数据信息* @throws SerDeException 如果初始化过程中出现错误,如缺少或配置信息不正确*/@Overridepublic void initialize(Configuration conf, Properties tbl) throws SerDeException {// 从tbl属性中获取列名字符串String columns = tbl.getProperty("columns");// 将列名字符串分割成列表List<String> columnNames = Arrays.asList(columns.split(","));
//        List<String> columnNames = Arrays.stream(columns.split(","))
//                .map(String::trim) // 考虑列名周围可能的空格
//                .distinct() // 移除重复项
//                .filter(s -> !s.isEmpty()) // 移除空字符串
//                .collect(Collectors.toList());// 设置列数this.numCols = columnNames.size();if (this.numCols == 0) {throw new SerDeException("No valid columns found.");}// 初始化列的对象检查器列表List<ObjectInspector> columnOIs = new ArrayList<>(this.numCols);// 如果列名为空或不存在,则抛出异常if (columns == null || columns.isEmpty()) {throw new SerDeException("Columns property is missing or empty.");}// 为每一列添加默认的对象检查器(这里是字符串检查器)for (int i = 0; i < this.numCols; ++i) {columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);}// 使用列名和对应的对象检查器创建结构对象检查器this.inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);// 初始化表示一行数据的列表this.row = new ArrayList<>(this.numCols);// 将每一列的初始值设为nullfor (int i = 0; i < this.numCols; ++i) {this.row.add(null);
//            this.row.add((Object) null);}// 从tbl属性中获取自定义分隔符,如果没有设置则使用默认值this.customDelimiter = tbl.getProperty("custom.delimiter", "@%!xyz!%");// 如果自定义分隔符在整个表操作过程中不会改变,可以将转义后的正则表达式缓存起来,而不是每次 deserialize 调用都进行转义。this.escapedDelimiter = escapeRegexSpecialChars(this.customDelimiter); //// 记录初始化完成的日志信息,包括列名和自定义分隔符LOG.info("Initialized with columns: {} and custom delimiter: {}", columnNames, this.customDelimiter);}@Override/*** 实现反序列化功能,将给定的Writable对象转换为文本行,并根据自定义分隔符拆分为列值。** @param blob 待反序列化的Writable对象,此处特指Text类型。* @return 反序列化后的行数据,列数据按指定分隔符分割。* @throws SerDeException 如果反序列化过程中发生错误。*/public Object deserialize(Writable blob) throws SerDeException {// 将输入的Writable对象转换为Text类型,即文本行。Text rowText = (Text) blob;// 使用正则表达式根据自定义分隔符分割文本行,-1表示保留尾部的空字符串。
//        String[] tokens = rowText.toString().split(escapeRegexSpecialChars(this.customDelimiter), -1);String[] tokens = rowText.toString().split(this.escapedDelimiter, -1);// 遍历并设置每一列的值,确保不会超出tokens数组的长度。for (int i = 0; i < this.numCols; ++i) {// 如果当前列有对应的值,则设置为相应tokens的值,否则设置为null。this.row.set(i, i < tokens.length ? tokens[i] : null);}// 记录调试信息,帮助了解反序列化过程中分割出的列数据量。LOG.debug("Deserialized row with tokens: {}", tokens.length);// 返回反序列化后的行数据。return this.row;}@Override/*** 将对象序列化为Writable类型** @param obj 要序列化的对象* @param objInspector 对象检查器,用于获取对象的结构信息* @return 序列化后的Writable对象* @throws SerDeException 如果序列化过程中出现错误** 此方法主要负责将一个结构对象序列化为文本形式它首先验证结构字段的数量是否与表列数匹配,* 如果不匹配则记录错误日志并抛出异常如果匹配,则使用自定义分隔符将各个字段值连接起来,* 并返回序列化后的Text对象*/public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {// 将对象检查器转换为结构对象检查器,用于获取结构字段信息StructObjectInspector outputRowOI = (StructObjectInspector) objInspector;// 获取所有结构字段引用List<? extends StructField> outputFieldRefs = outputRowOI.getAllStructFieldRefs();// 检查字段数量是否与表列数匹配if (outputFieldRefs.size() != this.numCols) {// 如果不匹配,记录错误日志并抛出异常LOG.error("Serialization error: Mismatch in field count. Expected: {}, Actual: {}", this.numCols, outputFieldRefs.size());throw new SerDeException("Cannot serialize the object because there are " + outputFieldRefs.size() + " fields but the table has " + this.numCols + " columns.");} else {// 如果匹配,开始构建序列化后的字符串StringBuilder writer = new StringBuilder(this.numCols * 10 );// 遍历每个字段for (int c = 0; c < this.numCols; ++c) {// 如果不是第一个字段,追加自定义分隔符if (c > 0) {writer.append(this.customDelimiter);}// 获取当前字段的值Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c));// 将字段值转换为字符串形式并追加到结果中StringObjectInspector fieldOI = (StringObjectInspector) outputFieldRefs.get(c).getFieldObjectInspector();writer.append(fieldOI.getPrimitiveJavaObject(field));}// 记录调试日志,表示已序列化一行LOG.debug("Serialized row with fields: {}", outputFieldRefs.size());// 返回序列化后的文本对象return new Text(writer.toString());}}@Overridepublic ObjectInspector getObjectInspector() throws SerDeException {return this.inspector;}@Overridepublic Class<? extends Writable> getSerializedClass() {return Text.class;}@Overridepublic SerDeStats getSerDeStats() {return null;}/*** 转义正则表达式中的特殊字符** 此方法确保输入字符串中的正则表达式特殊字符被正确转义,以避免它们在正则表达式处理时被误解释为元字符* 特殊字符包括:\ ^ $ . | ? * + ( ) [ ] { }** @param input 待转义的输入字符串* @return 转义后的字符串,其中所有正则表达式特殊字符都被转义*/private String escapeRegexSpecialChars(String input) {// 对输入进行边界条件检查if (input == null || input.isEmpty()) {return input; // 返回原输入,避免对空字符串或null值进行无意义的处理}// 使用正则表达式替换所有特殊字符,使其被正确转义// 正则表达式中的特殊字符在字符串中被替换为对应的转义序列return input.replaceAll( REGEX_SPECIAL_CHARS_PATTERN , "\\\\$1");}}

编译和打包

使用Maven或其他构建工具将上述代码编译并打包成JAR文件。

将JAR文件上传到Hive服务器

        例如,将JAR文件上传到 /usr/lib/hive/lib/ 目录。

在Hive中注册和使用Serde

 JAR 文件添加到 Hive 的类路径中,你可以使用以下几种方法:

方法一:使用 ADD JAR 命令

这是最简单和常用的方法,可以在 Hive CLI 或者 Beeline 中直接执行:

ADD JAR /path/to/your/custom-serde.jar;

这样会在当前会话中添加 JAR 文件。如果你需要在每次启动 Hive 时都加载这个 JAR 文件,可以使用下面的方法。

方法二:在 Hive 配置文件中添加

你可以将 JAR 文件的路径添加到 Hive 的 hive-site.xml 配置文件中。具体步骤如下:

  1. 打开 hive-site.xml 文件(通常位于 $HIVE_HOME/conf 目录下)。
  2. 添加以下配置:
<property><name>hive.aux.jars.path</name><value>/path/to/your/custom-serde.jar</value>
</property>
  1. 保存文件并重启 Hive 服务。

方法三:将 JAR 文件放置在 Hive 的 lib 目录下

你可以直接将 JAR 文件复制到 Hive 的 lib 目录中,这样每次启动 Hive 时,都会自动加载这些 JAR 文件。具体步骤如下:

  1. 将 JAR 文件复制到 Hive 的 lib 目录:
cp /path/to/your/custom-serde.jar $HIVE_HOME/lib/
  1. 重启 Hive 服务。

方法四:通过 HADOOP_CLASSPATH 环境变量

你也可以通过设置 HADOOP_CLASSPATH 环境变量来添加 JAR 文件:

  1. 打开你的终端配置文件,例如 .bashrc 或 .bash_profile
  2. 添加以下行:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/path/to/your/custom-serde.jar

     3.保存文件并刷新环境变量:

source ~/.bashrc

     4.重启 Hive 服务。

总结

这几种方法可以帮助你将自定义的 JAR 文件添加到 Hive 的类路径中,选择合适的方法根据你的使用场景和需求。对于临时加载 JAR 文件,使用 ADD JAR 命令最为方便;对于长期使用,修改配置文件或者直接放置在 lib 目录下更为合适。

ADD JAR /usr/lib/hive/lib/custom-serde.jar;CREATE TABLE my_table (col1 STRING,col2 INT
)
ROW FORMAT SERDE 'com.example.CustomSerDe'
STORED AS TEXTFILE;-- Load data into the table and query
LOAD DATA INPATH '/path/to/datafile' INTO TABLE my_table;
SELECT * FROM my_table;

通过以上步骤,就可以在Hive中使用自定义Serde来处理自定义格式的数据。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【C++】对象模型和this指针
  • vivado ODT
  • 【HarmonyOS NEXT星河版开发学习】小型测试案例01-今日头条置顶练习
  • 【算法速刷(4/100)】LeetCode —— 155.最小栈
  • Java反序列化漏洞实战:原理剖析与复现步骤
  • 与大语言模型Transformer的奇妙旅程
  • 手机三要素接口怎么对接呢?(二)
  • MediaHub中的卡片实现进展汇报
  • 数据结构:链表经典算法OJ题
  • 【Linux】权限理解
  • Python的lambda函数
  • dockerfile之vllm大模型镜像构建
  • Go语言加Vue3零基础入门全栈班10 Go语言+gRPC用户微服务项目实战 2024年07月31日 课程笔记
  • Hugging Face下载模型
  • 技术详解:视频美颜SDK与直播美颜插件开发指南
  • [译]Python中的类属性与实例属性的区别
  • 【Amaple教程】5. 插件
  • 【翻译】babel对TC39装饰器草案的实现
  • 【剑指offer】让抽象问题具体化
  • angular组件开发
  • CSS魔法堂:Absolute Positioning就这个样
  • Mysql优化
  • Python十分钟制作属于你自己的个性logo
  • React+TypeScript入门
  • TCP拥塞控制
  • Vue组件定义
  • 编写符合Python风格的对象
  • 湖南卫视:中国白领因网络偷菜成当代最寂寞的人?
  • ------- 计算机网络基础
  • 前端技术周刊 2018-12-10:前端自动化测试
  • 使用iElevator.js模拟segmentfault的文章标题导航
  • 项目管理碎碎念系列之一:干系人管理
  • 远离DoS攻击 Windows Server 2016发布DNS政策
  • 在Docker Swarm上部署Apache Storm:第1部分
  • 2017年360最后一道编程题
  • ​Z时代时尚SUV新宠:起亚赛图斯值不值得年轻人买?
  • ​力扣解法汇总946-验证栈序列
  • #每日一题合集#牛客JZ23-JZ33
  • #知识分享#笔记#学习方法
  • (1)Nginx简介和安装教程
  • (13):Silverlight 2 数据与通信之WebRequest
  • (WSI分类)WSI分类文献小综述 2024
  • (超详细)2-YOLOV5改进-添加SimAM注意力机制
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (学习日记)2024.04.04:UCOSIII第三十二节:计数信号量实验
  • (转) RFS+AutoItLibrary测试web对话框
  • (转)Linux整合apache和tomcat构建Web服务器
  • .NET Core WebAPI中封装Swagger配置
  • .NET Core 将实体类转换为 SQL(ORM 映射)
  • .Net 高效开发之不可错过的实用工具
  • .Net 转战 Android 4.4 日常笔记(4)--按钮事件和国际化
  • .Net7 环境安装配置
  • .net中的Queue和Stack
  • @Transactional 详解
  • @WebServiceClient注解,wsdlLocation 可配置