当前位置: 首页> 财经> 金融 > 通过写文件方式写入 Hive 数据

通过写文件方式写入 Hive 数据

时间:2025/7/11 19:30:41来源:https://blog.csdn.net/HHoao/article/details/141651981 浏览次数:1次

通过写文件方式写入 Hive 数据

Hive最简单的写入数据方式就是通过Hive Jdbc写入Hive数据,但这并不是写入Hive最高效的方法。

Hive通过读取相关Hdfs的文件来获取数据信息,而通过直接写入Hdfs文件数据达到写入Hive数据的效果,这是目前最高效的方法。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

通用写法

最通用的写法就是通过Serializer配合StandardStructObjectInspector序列化数据,再通过RecordWriter写入数据,它适用于几乎目前所有的文件类型。

StandardStructObjectInspector用于描述表结构和字段类型。

Serializer有多种实现,分别对应每种Hadoop文件格式的序列化器,例如:ParquetHiveSerDe、AvroSerDe、OrcSerde等。

RecordWriter创建需要HiveOutputFormat,HiveOutputFormat也是有多种Hadoop文件格式的实现的,例如:OrcOutputFormat、HiveIgnoreKeyTextOutputFormat、MapredParquetOutputFormat,用于写入相应格式的数据。

通过StorageFormatDescriptor可以快速的获取相应文件格式的Serializer、HiveOutputFormat,只需要StorageFormatFactory#get(formatType)即可创建一个对应文件格式类型的StorageFormatDescriptor,StorageFormatDescriptor也是有各种数据格式类型实现的,例如TextFileStorageFormatDescriptor、ParquetFileStorageFormatDescriptor等等。

StorageFormatDescriptor的getSerde()、getOutputFormat()、getInputFormat()等方法,可以获取Serializer和HiveOutputFormat。

当然你也可以通过Table API获取StorageDescriptor从而获取相应的OutputFormat和Serializer。

@Test
public void test2()throws ClassNotFoundException, IllegalAccessException, InstantiationException,
HiveException, IOException, SerDeException {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");StorageDescriptor sd = Table.getEmptyTable(null, null).getSd();SerDeInfo serDeInfo = new SerDeInfo();HashMap<String, String> parameters = new HashMap<>();parameters.put(serdeConstants.SERIALIZATION_FORMAT, "1");serDeInfo.setParameters(parameters);serDeInfo.setSerializationLib(MetadataTypedColumnsetSerDe.class.getName());sd.setInputFormat(SequenceFileInputFormat.class.getName());sd.setOutputFormat(HiveSequenceFileOutputFormat.class.getName());StorageFormatFactory storageFormatFactory = new StorageFormatFactory();sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName());// 通过格式类型获取StorageFormatDescriptor,这里一般有TEXT、AVRO、PARQUET、ORC这几种,可通过IOConstants查看StorageFormatDescriptor storageFormatDescriptor =storageFormatFactory.get(IOConstants.TEXTFILE);sd.setInputFormat(storageFormatDescriptor.getInputFormat());sd.setOutputFormat(storageFormatDescriptor.getOutputFormat());String serdeLib = storageFormatDescriptor.getSerde();if (serdeLib != null) {sd.getSerdeInfo().setSerializationLib(serdeLib);}SerDeInfo serdeInfo = sd.getSerdeInfo();Properties tableProperties = new Properties();//        tableProperties.put(serdeConstants.FIELD_DELIM, (byte) 1);tableProperties.setProperty(serdeConstants.FIELD_DELIM, ",");//        tableProperties.setProperty(serdeConstants.COLLECTION_DELIM, "");//        tableProperties.setProperty(serdeConstants.MAPKEY_DELIM, "");Serializer recordSerDe =(Serializer) (Class.forName(serdeInfo.getSerializationLib()).newInstance());SerDeUtils.initializeSerDe((Deserializer) recordSerDe, configuration, tableProperties, null);Class<? extends OutputFormat> outputFormatClz =HiveFileFormatUtils.getOutputFormatSubstitute(Class.forName(storageFormatDescriptor.getOutputFormat()));HiveOutputFormat outputFormat = (HiveOutputFormat) outputFormatClz.newInstance();// 这里对应hive相应的表、分区路径、还有一个随机的文件名Path path =new Path( ".../hive/warehouse/table_name/pt_day=12/pt_hour=12/test");JobConf jobConf = new JobConf(configuration);jobConf.setMapOutputCompressorClass(GzipCodec.class);jobConf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.COMPRESS_CODEC,GzipCodec.class.getName());FileSinkOperator.RecordWriter recordWriter =HiveFileFormatUtils.getRecordWriter(jobConf,outputFormat,recordSerDe.getSerializedClass(),false,tableProperties,path,Reporter.NULL);ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("address")),new ArrayList<>(Arrays.asList(intListInspector)));Object[] instance = new Object[1];ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[0] = address;Writable serialize = recordSerDe.serialize(instance, standardStructObjectInspector);recordWriter.write(serialize);recordWriter.close(false);
}

其他写法

Text格式

通过TextOutputFormat写入Text格式的Hive表数据文件,以下是一张拥有"id", "address"字段的表,而map是一个Map类型的字段

@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);TextOutputFormat<Object, Object> objectObjectTextOutputFormat =new TextOutputFormat<>();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter<Object, Object> recordWriter =objectObjectTextOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

ORC格式

ORC格式的写入和Text相似,不多说,只示范Map类型写入

写入MAP<STRING, MAP<STRING, STRING>>类型数据
@Test
public void testWriteMap() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");OrcSerde orcSerde = new OrcSerde();Object[] instance = new Object[2];instance[0] = 1;ArrayList<Integer> address = new ArrayList<>();for (int i = 5; i < 10; i++) {address.add(i * i);}instance[1] = address;ObjectInspector intInspector =ObjectInspectorFactory.getReflectionObjectInspector(Integer.class,ObjectInspectorFactory.ObjectInspectorOptions.JAVA);StandardListObjectInspector intListInspector =ObjectInspectorFactory.getStandardListObjectInspector(intInspector);StandardStructObjectInspector standardStructObjectInspector =ObjectInspectorFactory.getStandardStructObjectInspector(new ArrayList<>(List.of("id", "address")),new ArrayList<>(Arrays.asList(intInspector, intListInspector)));Writable serialize =orcSerde.serialize(instance, standardStructObjectInspector);OrcOutputFormat orcOutputFormat = new OrcOutputFormat();Path path =new Path(".../hive/warehouse/table_name/partition/file");try {JobConf entries = new JobConf(configuration);RecordWriter recordWriter =orcOutputFormat.getRecordWriter(null, entries, path.toString(), Reporter.NULL);recordWriter.write(NullWritable.get(), serialize);recordWriter.close(Reporter.NULL);} catch (IOException e) {throw new RuntimeException(e);}return null;});
}

Parquet格式

Parquest通过MessageType表示表结构,用group存储数据类型和数据,最后通过ParquetWriter写入数据

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

写入MAP<STRING, MAP<STRING, STRING>>类型数据

数据如下:

id: 100
addresskey_valuekey: key0value: value0key_valuekey: key1value: value1key_valuekey: key2value: value4

格式如下:

message Pair {optional int32 id;optional group address (MAP) {repeated group key_value {optional binary key;optional binary value;}}
}

代码如下:

@Test
public void testWriteIdWithMap1() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();String name = "address";// 注意这里的named后面必须是key、valuePrimitiveType keyType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("key");PrimitiveType valueType =Types.optional(PrimitiveType.PrimitiveTypeName.BINARY).named("value");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalMap().key(keyType).value(valueType).named(name);MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);Group mapGroup = simpleGroup.addGroup(1);for (int i = 0; i < 3; i++) {Group entry0 = mapGroup.addGroup(0);entry0.add(0, "key" + i);entry0.add(1, "value" + i * i);}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
写入ARRAY<ARRAY<INT>>类型数据
@Test
public void testWriteIdWithArrayArray2() {UserGroupInformation admin = UserGroupInformation.createRemoteUser("hive");admin.doAs((PrivilegedAction<Void>)() -> {Configuration configuration = new Configuration();configuration.set("fs.defaultFS", "...");try {Path path =new Path(".../hive/warehouse/table_name/partition/file");Types.MessageTypeBuilder messageTypeBuilder = Types.buildMessage();PrimitiveType named =Types.optional(PrimitiveType.PrimitiveTypeName.INT32).named("address");messageTypeBuilder.optional(PrimitiveType.PrimitiveTypeName.INT32).named("id");messageTypeBuilder.optionalList().optionalListElement().element(named).named("address").named("address");MessageType pari = messageTypeBuilder.named("Pair");SimpleGroup simpleGroup = new SimpleGroup(pari);ParquetWriter<Group> parquetWriter =ExampleParquetWriter.builder(path).withWriteMode(ParquetFileWriter.Mode.OVERWRITE).withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0).withCompressionCodec(CompressionCodecName.UNCOMPRESSED).withConf(configuration).withType(pari).withDictionaryEncoding(false).withRowGroupSize(134217728L).build();simpleGroup.add(0, 100);// add groupGroup address = simpleGroup.addGroup(1);for (int i = 0; i < 5; i++) {// group add list entryGroup listGroup = address.addGroup(0);// add groupGroup sublist = listGroup.addGroup(0);for (int j = 5; j < 10; j++) {// group add list entryGroup subListGroup = sublist.addGroup(0);subListGroup.add(0, i * i);}}parquetWriter.write(simpleGroup);parquetWriter.close();} catch (IOException e) {throw new RuntimeException(e);}return null;});
}
关键字:通过写文件方式写入 Hive 数据

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com

责任编辑: