Hive自定义Serde,实现自定义多字符串作为分隔符
因为Hive提供的Serde只支持单字符作为分隔符,这样可能会导致因为某些属性中的值带有特殊字符,导致将CSV等文档load到Hive 表中时数据格式出现异常,数据出现偏差。所以我自己写了一个自定义的Serde来实现多字符作为分隔符来防止出现上面的问题。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.SerDeSpec;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
//import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;//import java.io.IOException;
//import java.io.StringReader;
//import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;/**** @author Liuhd* @datetime 2024/7/11 16:05* @description 多字符分隔符CSV序列化反序列化实现* @version 1.0*/
@SerDeSpec(schemaProps = {"columns", "custom.delimiter"})
public final class MultiCharDelimiterCSVSerde2 extends AbstractSerDe {private static final Logger LOG = LoggerFactory.getLogger(MultiCharDelimiterCSVSerde2.class.getName());// 定义正则表达式特殊字符模式为私有静态常量private static final String REGEX_SPECIAL_CHARS_PATTERN = "([\\\\^$.|?*+()\\[\\]{}])";private ObjectInspector inspector;private int numCols;private List<String> row;private String customDelimiter;//如果自定义分隔符在整个表操作过程中不会改变,可以将转义后的正则表达式缓存起来,而不是每次 deserialize 调用都进行转义。private String escapedDelimiter;public MultiCharDelimiterCSVSerde2() {}/*** 初始化函数* 通过读取configuration和properties对象来设置内部状态* 主要功能是解析列名,设置对象检查器,并初始化一些内部变量** @param conf Hadoop的Configuration对象,用于获取表的配置信息* @param tbl Properties对象,包含表的元数据信息* @throws SerDeException 如果初始化过程中出现错误,如缺少或配置信息不正确*/@Overridepublic void initialize(Configuration conf, Properties tbl) throws SerDeException {// 从tbl属性中获取列名字符串String columns = tbl.getProperty("columns");// 将列名字符串分割成列表List<String> columnNames = Arrays.asList(columns.split(","));
// List<String> columnNames = Arrays.stream(columns.split(","))
// .map(String::trim) // 考虑列名周围可能的空格
// .distinct() // 移除重复项
// .filter(s -> !s.isEmpty()) // 移除空字符串
// .collect(Collectors.toList());// 设置列数this.numCols = columnNames.size();if (this.numCols == 0) {throw new SerDeException("No valid columns found.");}// 初始化列的对象检查器列表List<ObjectInspector> columnOIs = new ArrayList<>(this.numCols);// 如果列名为空或不存在,则抛出异常if (columns == null || columns.isEmpty()) {throw new SerDeException("Columns property is missing or empty.");}// 为每一列添加默认的对象检查器(这里是字符串检查器)for (int i = 0; i < this.numCols; ++i) {columnOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);}// 使用列名和对应的对象检查器创建结构对象检查器this.inspector = ObjectInspectorFactory.getStandardStructObjectInspector(columnNames, columnOIs);// 初始化表示一行数据的列表this.row = new ArrayList<>(this.numCols);// 将每一列的初始值设为nullfor (int i = 0; i < this.numCols; ++i) {this.row.add(null);
// this.row.add((Object) null);}// 从tbl属性中获取自定义分隔符,如果没有设置则使用默认值this.customDelimiter = tbl.getProperty("custom.delimiter", "@%!xyz!%");// 如果自定义分隔符在整个表操作过程中不会改变,可以将转义后的正则表达式缓存起来,而不是每次 deserialize 调用都进行转义。this.escapedDelimiter = escapeRegexSpecialChars(this.customDelimiter); //// 记录初始化完成的日志信息,包括列名和自定义分隔符LOG.info("Initialized with columns: {} and custom delimiter: {}", columnNames, this.customDelimiter);}@Override/*** 实现反序列化功能,将给定的Writable对象转换为文本行,并根据自定义分隔符拆分为列值。** @param blob 待反序列化的Writable对象,此处特指Text类型。* @return 反序列化后的行数据,列数据按指定分隔符分割。* @throws SerDeException 如果反序列化过程中发生错误。*/public Object deserialize(Writable blob) throws SerDeException {// 将输入的Writable对象转换为Text类型,即文本行。Text rowText = (Text) blob;// 使用正则表达式根据自定义分隔符分割文本行,-1表示保留尾部的空字符串。
// String[] tokens = rowText.toString().split(escapeRegexSpecialChars(this.customDelimiter), -1);String[] tokens = rowText.toString().split(this.escapedDelimiter, -1);// 遍历并设置每一列的值,确保不会超出tokens数组的长度。for (int i = 0; i < this.numCols; ++i) {// 如果当前列有对应的值,则设置为相应tokens的值,否则设置为null。this.row.set(i, i < tokens.length ? tokens[i] : null);}// 记录调试信息,帮助了解反序列化过程中分割出的列数据量。LOG.debug("Deserialized row with tokens: {}", tokens.length);// 返回反序列化后的行数据。return this.row;}@Override/*** 将对象序列化为Writable类型** @param obj 要序列化的对象* @param objInspector 对象检查器,用于获取对象的结构信息* @return 序列化后的Writable对象* @throws SerDeException 如果序列化过程中出现错误** 此方法主要负责将一个结构对象序列化为文本形式它首先验证结构字段的数量是否与表列数匹配,* 如果不匹配则记录错误日志并抛出异常如果匹配,则使用自定义分隔符将各个字段值连接起来,* 并返回序列化后的Text对象*/public Writable serialize(Object obj, ObjectInspector objInspector) throws SerDeException {// 将对象检查器转换为结构对象检查器,用于获取结构字段信息StructObjectInspector outputRowOI = (StructObjectInspector) objInspector;// 获取所有结构字段引用List<? extends StructField> outputFieldRefs = outputRowOI.getAllStructFieldRefs();// 检查字段数量是否与表列数匹配if (outputFieldRefs.size() != this.numCols) {// 如果不匹配,记录错误日志并抛出异常LOG.error("Serialization error: Mismatch in field count. Expected: {}, Actual: {}", this.numCols, outputFieldRefs.size());throw new SerDeException("Cannot serialize the object because there are " + outputFieldRefs.size() + " fields but the table has " + this.numCols + " columns.");} else {// 如果匹配,开始构建序列化后的字符串StringBuilder writer = new StringBuilder(this.numCols * 10 );// 遍历每个字段for (int c = 0; c < this.numCols; ++c) {// 如果不是第一个字段,追加自定义分隔符if (c > 0) {writer.append(this.customDelimiter);}// 获取当前字段的值Object field = outputRowOI.getStructFieldData(obj, outputFieldRefs.get(c));// 将字段值转换为字符串形式并追加到结果中StringObjectInspector fieldOI = (StringObjectInspector) outputFieldRefs.get(c).getFieldObjectInspector();writer.append(fieldOI.getPrimitiveJavaObject(field));}// 记录调试日志,表示已序列化一行LOG.debug("Serialized row with fields: {}", outputFieldRefs.size());// 返回序列化后的文本对象return new Text(writer.toString());}}@Overridepublic ObjectInspector getObjectInspector() throws SerDeException {return this.inspector;}@Overridepublic Class<? extends Writable> getSerializedClass() {return Text.class;}@Overridepublic SerDeStats getSerDeStats() {return null;}/*** 转义正则表达式中的特殊字符** 此方法确保输入字符串中的正则表达式特殊字符被正确转义,以避免它们在正则表达式处理时被误解释为元字符* 特殊字符包括:\ ^ $ . | ? * + ( ) [ ] { }** @param input 待转义的输入字符串* @return 转义后的字符串,其中所有正则表达式特殊字符都被转义*/private String escapeRegexSpecialChars(String input) {// 对输入进行边界条件检查if (input == null || input.isEmpty()) {return input; // 返回原输入,避免对空字符串或null值进行无意义的处理}// 使用正则表达式替换所有特殊字符,使其被正确转义// 正则表达式中的特殊字符在字符串中被替换为对应的转义序列return input.replaceAll( REGEX_SPECIAL_CHARS_PATTERN , "\\\\$1");}}
编译和打包
使用Maven或其他构建工具将上述代码编译并打包成JAR文件。
将JAR文件上传到Hive服务器
例如,将JAR文件上传到 /usr/lib/hive/lib/
目录。
在Hive中注册和使用Serde
JAR 文件添加到 Hive 的类路径中,你可以使用以下几种方法:
方法一:使用 ADD JAR
命令
这是最简单和常用的方法,可以在 Hive CLI 或者 Beeline 中直接执行:
ADD JAR /path/to/your/custom-serde.jar;
这样会在当前会话中添加 JAR 文件。如果你需要在每次启动 Hive 时都加载这个 JAR 文件,可以使用下面的方法。
方法二:在 Hive 配置文件中添加
你可以将 JAR 文件的路径添加到 Hive 的 hive-site.xml
配置文件中。具体步骤如下:
- 打开
hive-site.xml
文件(通常位于$HIVE_HOME/conf
目录下)。 - 添加以下配置:
<property><name>hive.aux.jars.path</name><value>/path/to/your/custom-serde.jar</value>
</property>
- 保存文件并重启 Hive 服务。
方法三:将 JAR 文件放置在 Hive 的 lib 目录下
你可以直接将 JAR 文件复制到 Hive 的 lib
目录中,这样每次启动 Hive 时,都会自动加载这些 JAR 文件。具体步骤如下:
- 将 JAR 文件复制到 Hive 的
lib
目录:
cp /path/to/your/custom-serde.jar $HIVE_HOME/lib/
- 重启 Hive 服务。
方法四:通过 HADOOP_CLASSPATH
环境变量
你也可以通过设置 HADOOP_CLASSPATH
环境变量来添加 JAR 文件:
- 打开你的终端配置文件,例如
.bashrc
或.bash_profile
。 - 添加以下行:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/path/to/your/custom-serde.jar
3.保存文件并刷新环境变量:
source ~/.bashrc
4.重启 Hive 服务。
总结
这几种方法可以帮助你将自定义的 JAR 文件添加到 Hive 的类路径中,选择合适的方法根据你的使用场景和需求。对于临时加载 JAR 文件,使用 ADD JAR
命令最为方便;对于长期使用,修改配置文件或者直接放置在 lib
目录下更为合适。
ADD JAR /usr/lib/hive/lib/custom-serde.jar;CREATE TABLE my_table (col1 STRING,col2 INT
)
ROW FORMAT SERDE 'com.example.CustomSerDe'
STORED AS TEXTFILE;-- Load data into the table and query
LOAD DATA INPATH '/path/to/datafile' INTO TABLE my_table;
SELECT * FROM my_table;
通过以上步骤,就可以在Hive中使用自定义Serde来处理自定义格式的数据。