hadoop 使用Avro排序
2023-09-14 09:02:30 时间
在上例中,使用Avro框架求出数据的最大值,本例使用Avro对数据排序,输入依然是之前的样本,输出使用文本(也可以输出Avro格式)。
1、在Avro的Schema中直接设置排序方向。
dataRecord.avsc,放入resources目录下:
{ "type":"record", "name":"WeatherRecord", "doc":"A weather reading", "fields":[ {"name":"year","type":"int"}, {"name":"temperature","type":"int","order":"descending"} ] }
原常量类:
public class AvroSchemas { private Schema currentSchema; //本例中不使用常量,修改成资源中加载 public static final Schema SCHEMA = new Schema.Parser().parse("{\n" + "\t\"type\":\"record\",\n" + "\t\"name\":\"WeatherRecord\",\n" + "\t\"doc\":\"A weather reading\",\n" + "\t\"fields\":[\n" + "\t\t{\"name\":\"year\",\"type\":\"int\"},\n" + "\t\t{\"name\":\"temperature\",\"type\":\"int\",\"order\":\"descending\"}\n" + "\t]\t\n" + "}"); public AvroSchemas() throws IOException { Schema.Parser parser = new Schema.Parser(); //采用从资源文件中读取Avro数据格式 this.currentSchema = parser.parse(getClass().getResourceAsStream("dataRecord.avsc")); } public Schema getCurrentSchema() { return currentSchema; } }
2、mapper
public class AvroMapper extends Mapper<LongWritable,Text,AvroKey<GenericRecord>,AvroValue<GenericRecord>> { private RecordParser parser = new RecordParser(); // private GenericRecord record = new GenericData.Record(AvroSchemas.SCHEMA); private AvroSchemas schema; private GenericRecord record; public AvroMapper() throws IOException { schema =new AvroSchemas(); record = new GenericData.Record(schema.getCurrentSchema()); } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { parser.parse(value.toString()); if(parser.isValid()){ record.put("year",parser.getYear()); record.put("temperature",parser.getData()); context.write(new AvroKey<>(record),new AvroValue<>(record)); } } }
3、reducer
public class AvroReducer extends Reducer<AvroKey<GenericRecord>,AvroValue<GenericRecord>,IntPair,NullWritable> { //多文件输出,本例中每年一个文件 private MultipleOutputs<IntPair,NullWritable> multipleOutputs; /** * Called once at the start of the task. * * @param context */ @Override protected void setup(Context context) throws IOException, InterruptedException { multipleOutputs = new MultipleOutputs<>(context); } @Override protected void reduce(AvroKey<GenericRecord> key, Iterable<AvroValue<GenericRecord>> values, Context context) throws IOException, InterruptedException { //在混洗阶段完成排序,reducer只需直接输出数据 for (AvroValue<GenericRecord> value : values){ GenericRecord record = value.datum(); //多文件输出,每年一个文件。 multipleOutputs.write(new IntPair((Integer) record.get("year"),(Integer)(record.get("temperature"))),NullWritable.get(),record.get("year").toString()); // context.write(new IntPair((Integer) record.get("year"),(Integer)(record.get("temperature"))),NullWritable.get()); } } }
4、job
public class AvroSort extends Configured implements Tool { @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); conf.set("mapreduce.job.ubertask.enable","true"); Job job = Job.getInstance(conf,"Avro sort"); job.setJarByClass(AvroSort.class); //通过AvroJob直接设置Avro key和value的输入和输出,而不是使用Job来设置 AvroJob.setMapOutputKeySchema(job, AvroSchemas.SCHEMA); AvroJob.setMapOutputValueSchema(job,AvroSchemas.SCHEMA); // AvroJob.setOutputKeySchema(job,AvroSchemas.SCHEMA); job.setMapperClass(AvroMapper.class); job.setReducerClass(AvroReducer.class); job.setInputFormatClass(TextInputFormat.class); // job.setOutputFormatClass(AvroKeyOutputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); Path outPath = new Path(args[1]); FileSystem fileSystem = outPath.getFileSystem(conf); //删除输出路径 if(fileSystem.exists(outPath)) { fileSystem.delete(outPath,true); } return job.waitForCompletion(true) ? 0:1; } public static void main(String[] args) throws Exception{ int exitCode = ToolRunner.run(new AvroSort(),args); System.exit(exitCode); } }
相关文章
- [Hadoop]chukwa与ganglia的区别
- Hadoop Mapreduce分区、分组、二次排序过程详解
- Hadoop生态上几个技术的关系与区别:hive、pig、hbase 关系与区别 Pig
- 解决spark on yarn报错:File /tmp/hadoop-root/nm-local-dir/filecache does not exist
- Hadoop MapReduce实例:按手机上网总流量降序排序代码实现及结果演示
- Hadoop hdfs 使用流来下载文件数据代码示例
- Hadoop SSH免密登录公钥生成并实现不同主机间的免密登录
- Hadoop快速入门——第二章、分布式集群(第三节、HDFS Shell的常用命令)
- 9.2.2 hadoop全排序实例详解
- Hadoop阅读笔记(三)——深入MapReduce排序和单表连接
- 017-Hadoop Hive sql语法详解7-去重排序、数据倾斜
- 【大数据project师之路】Hadoop——MapReduce概述
- centOS6.3(64bit)Hadoop的Eclipse开发环境搭建
- 使用hadoop命令rcc生成Record 一个简单的方法来实现自己的定义writable对象
- hadoop系列-hadoop版本选择
- 大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)
- 大数据Hadoop之——基于内存型SQL查询引擎Presto(Presto-Trino环境部署)
- Hadoop_HDFS(二):Shell操作之文件的管理(上传下载删除等)