当前位置: 首页 > news >正文

通过写文件方式写入 Hive 数据

通过写文件方式写入 Hive 数据

Hive最简单的写入数据方式就是通过Hive Jdbc写入Hive数据,但这并不是写入Hive最高效的方法。

Hive通过读取相关Hdfs的文件来获取数据信息,而通过直接写入Hdfs文件数据达到写入Hive数据的效果,这是目前最高效的方法。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

通用写法

最通用的写法就是通过Serializer配合StandardStructObjectInspector序列化数据,再通过RecordWriter写入数据,它适用于几乎目前所有的文件类型。

StandardStructObjectInspector用于描述表结构和字段类型。

Serializer有多种实现,分别对应每种Hadoop文件格式的序列化器,例如:ParquetHiveSerDe、AvroSerDe、OrcSerde等。

RecordWriter创建需要HiveOutputFormat,HiveOutputFormat也是有多种Hadoop文件格式的实现的,例如:OrcOutputFormat、HiveIgnoreKeyTextOutputFormat、MapredParquetOutputFormat,用于写入相应格式的数据。

通过StorageFormatDescriptor可以快速的获取相应文件格式的Serializer、HiveOutputFormat,只需要StorageFormatFactory#get(formatType)即可创建一个对应文件格式类型的StorageFormatDescriptor,StorageFormatDescriptor也是有各种数据格式类型实现的,例如TextFileStorageFormatDescriptor、ParquetFileStorageFormatDescriptor等等。

StorageFormatDescriptor的getSerde()、getOutputFormat()、getInputFormat()等方法,可以获取Serializer和HiveOutputFormat。

当然你也可以通过Table API获取StorageDescriptor从而获取相应的OutputFormat和Serializer。

@Test
public void test2()throws ClassNotFoundException, IllegalAccessException, InstantiationException,
HiveException, IOException, SerDeException {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");StorageDescriptor sd = Table.getEmptyTable(null, null).getSd();SerDeInfo serDeInfo = new SerDeInfo();HashMap<String, String> parameters = new HashMap<>();parameters.put(serdeConstants.SERIALIZATION_FORMAT, "1");serDeInfo.setParameters(parameters);serDeInfo.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());sd.setInputFormat(SequenceFileInputFormat.class.getName());sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());StorageFormatFactory storageFormatFactory = new StorageFormatFactory();sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());// 通过格式类型获取StorageFormatDescriptor,这里一般有TEXT、AVRO、PARQUET、ORC这几种,可通过IOConstants查看StorageFormatDescriptor storageFormatDescriptor =storageFormatFactory.get(IOConstants.TEXTFILE);sd.setInputFormat(storageFormatDescriptor.getInputFormat());sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());String serdeLib = storageFormatDescriptor.getSerde();if (serdeLib != null) {sd.getSerdeInfo().setSerializationLib(serdeLib);}SerDeInfo serdeInfo = sd.getSerdeInfo();Properties tableProperties = new Properties();//        tableProperties.put(serdeConstants.FIELD_DELIM, (byte) 1);tableProperties.setProperty(serdeConstants.FIELD_DELIM, ",");//        tableProperties.setProperty(serdeConstants.COLLECTION_DELIM, "");//        tableProperties.setProperty(serdeConstants.MAPKEY_DELIM, "");Serializer recordSerDe =(Serializer) (Class.forName(serdeInfo.getSerializationLib()).newInstance());SerDeUtils.initializeSerDe((Deserializer) recordSerDe, configuration, tableProperties, null);Class<? extends OutputFormat> outputFormatClz =HiveFileFormatUtils.getOutputFormatSubstitute(Class.forName(storageFormatDescriptor.getOutputFormat()));HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();// 这里对应hive相应的表、分区路径、还有一个随机的文件名Path path =new Path( ".../hive/warehouse/table_name/pt_day=12/pt_hour=12/test");JobConf jobConf = new JobConf(configuration);jobConf.setMapOutputCompressorClass(GzipCodec.class);jobConf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC,GzipCodec.class.getName());FileSinkOperator.RecordWriter recordWriter =HiveFileFormatUtils.getRecordWriter(jobConf,outputFormat,recordSerDe.getSerializedClass(),false,tableProperties,path,Reporter.NULL);ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("address")),new ArrayList<>(Arrays.asList(intListInspector)));Object[] instance = new Object[1];ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[0] = address;Writable serialize = recordSerDe.serialize(instance, standardStructObjectInspector);recordWriter.write(serialize);recordWriter.close(false);
}

其他写法

Text格式

通过TextOutputFormat写入Text格式的Hive表数据文件,以下是一张拥有"id", "address"字段的表,而map是一个Map类型的字段

@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);TextOutputFormat<Object, Object> objectObjectTextOutputFormat =new TextOutputFormat<>();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter<Object, Object> recordWriter =objectObjectTextOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

ORC格式

ORC格式的写入和Text相似,不多说,只示范Map类型写入

写入MAP<STRING, MAP<STRING, STRING>>类型数据
@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);OrcOutputFormat orcOutputFormat = new OrcOutputFormat();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter recordWriter =orcOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

Parquet格式

Parquest通过MessageType表示表结构,用group存储数据类型和数据,最后通过ParquetWriter写入数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

写入MAP<STRING, MAP<STRING, STRING>>类型数据

数据如下:

id: 100
addresskey_valuekey: key0value: value0key_valuekey: key1value: value1key_valuekey: key2value: value4

格式如下:

message Pair {optional int32 id;optional group address (MAP) {repeated group key_value {optional binary key;optional binary value;}}
}

代码如下:

@Test
public void testWriteIdWithMap1() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();String name = "address";// 注意这里的named后面必须是key、valuePrimitiveType keyType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("key");PrimitiveType valueType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("value");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalMap().key(keyType).value(valueType).named(name);MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);Group mapGroup = simpleGroup.addGroup(1);for (int i = 0; i < 3; i++) {Group entry0 = mapGroup.addGroup(0);entry0.add(0, "key" + i);entry0.add(1, "value" + i * i);}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
写入ARRAY<ARRAY<INT>>类型数据
@Test
public void testWriteIdWithArrayArray2() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();PrimitiveType named =Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("address");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalList().optionalListElement().element(named).named("address").named("address");MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);// add groupGroup address = simpleGroup.addGroup(1);for (int i = 0; i < 5; i++) {// group add list entryGroup listGroup = address.addGroup(0);// add groupGroup sublist = listGroup.addGroup(0);for (int j = 5; j < 10; j++) {// group add list entryGroup subListGroup = sublist.addGroup(0);subListGroup.add(0, i * i);}}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【C++】日期和时间
  • SpringCloudGateway网关技术
  • 【Kotlin设计模式】Kotlin实现工厂模式
  • 【WPF】WPF学习之面试常问问题
  • Visual Studio中 自动生成版本号递增版本号
  • React 入门第四天:理解React中的路由与导航
  • 【C#】字段
  • 点晴oa办公系统提效管理+业务协同
  • 极光公布2024年第二季度财报
  • MYSQL -NATURAL JOIN ,单行函数
  • FFmpeg的入门实践系列四(AVS)
  • 给鼠标一个好看的指针特效 鼠标光标如何修改形状?
  • Cisco-综合实验二
  • Linux--NAT,代理服务,内网穿透
  • Python网络爬虫模拟登录与验证解析
  • 收藏网友的 源程序下载网
  • [nginx文档翻译系列] 控制nginx
  • [微信小程序] 使用ES6特性Class后出现编译异常
  • [译] 怎样写一个基础的编译器
  • 4月23日世界读书日 网络营销论坛推荐《正在爆发的营销革命》
  • Angular js 常用指令ng-if、ng-class、ng-option、ng-value、ng-click是如何使用的?
  • Java多线程(4):使用线程池执行定时任务
  • Netty 框架总结「ChannelHandler 及 EventLoop」
  • 程序员该如何有效的找工作?
  • 聚类分析——Kmeans
  • 聊聊flink的TableFactory
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 如何优雅的使用vue+Dcloud(Hbuild)开发混合app
  • 数据可视化之 Sankey 桑基图的实现
  • 微信小程序开发问题汇总
  • 学习使用ExpressJS 4.0中的新Router
  • 运行时添加log4j2的appender
  • 自制字幕遮挡器
  • python最赚钱的4个方向,你最心动的是哪个?
  • 新海诚画集[秒速5センチメートル:樱花抄·春]
  • ​ssh免密码登录设置及问题总结
  • ​比特币大跌的 2 个原因
  • #NOIP 2014# day.1 T3 飞扬的小鸟 bird
  • (16)Reactor的测试——响应式Spring的道法术器
  • (17)Hive ——MR任务的map与reduce个数由什么决定?
  • (1综述)从零开始的嵌入式图像图像处理(PI+QT+OpenCV)实战演练
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (附源码)ssm本科教学合格评估管理系统 毕业设计 180916
  • (教学思路 C#之类三)方法参数类型(ref、out、parmas)
  • (一)Mocha源码阅读: 项目结构及命令行启动
  • (转)利用ant在Mac 下自动化打包签名Android程序
  • (转)如何上传第三方jar包至Maven私服让maven项目可以使用第三方jar包
  • (最优化理论与方法)第二章最优化所需基础知识-第三节:重要凸集举例
  • .locked1、locked勒索病毒解密方法|勒索病毒解决|勒索病毒恢复|数据库修复
  • .md即markdown文件的基本常用编写语法
  • .NET 4.0中使用内存映射文件实现进程通讯
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • ;号自动换行
  • ??如何把JavaScript脚本中的参数传到java代码段中