zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

【hadoop学习项目】7. 实现自定义局部排序和全局排序

hadoop项目学习排序 实现 自定义 全局 局部
2023-09-11 14:20:02 时间

0. 项目结构

在这里插入图片描述
domain中存储的是继承WritableComparable的数据对象;
sort中实现的是局部排序;
totalsort中实现的是全局排序。

数据内容
sort1.txt(sort处理)

movie1 72
movie2 83
movie3 67
movie4 79
movie5 84
movie6 68
movie7 79
movie8 56
movie9 69 
movie10 57
movie11 68

sort2.txt(totalsort处理)

93 239 231
23 22 213
613 232 614
213 3939 232
4546 565 613
231 231
2339 231
1613 5656 657
61313 4324 213
613 2 232 32
393 613 4535
61321 3942 453
6133 392 453
6131 322 452
232 393 455
3613 3939 3242

1. domain

Movie

package hadoop_test.sort_test_08.domain;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// 继承WritableComparable,可对其进行排序
public class Movie implements WritableComparable<Movie> {
	
	private String name;
	private int hot;

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(name);
		out.writeInt(hot);
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.name=in.readUTF();
		this.hot=in.readInt();
		
	}

/*
compareTo:
	设置排序规则(this.hot = a, o.hot = b):
		返回 0   按照 ab 排序
		返回 整数   按照 ba 排序
		返回 负数  按照 ab 排序 

*/
	@Override
	public int compareTo(Movie o) {
		System.out.println("this.hot: " + this.hot +" "+ "o.hot: " + o.hot);
		System.out.println(o.hot-this.hot);
//		return this.hot-o.hot;		// 由小到大排列
		return o.hot-this.hot;		// 由大到小排列
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getHot() {
		return hot;
	}

	public void setHot(int hot) {
		this.hot = hot;
	}

	@Override
	public String toString() {
		return "Movie [name=" + name + ", hot=" + hot + "]";
	}
	
	

}

排序是MapReduce框架中最重要的操作之一。MapTask和ReduceTask都会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序的数据均会被排序,而不管逻辑上是否需要。默认是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率到达一定阈值后,再对缓冲区中的数据进行一次快速排序,将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写到磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大的文件;如果内存中文件大小或者数据超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

拓展资料:
Hadoop之排序
MapReduce怎么优雅地实现全局排序
五、MapReduce练习----学生成绩按照总成绩降序排列,总成绩相同依次按照语文成绩、数学成绩

2. sort

目标:基于sort1.txt中电影评分,对其进行从大到小排序

SortDriver

package hadoop_test.sort_test_08.sort;

import hadoop_test.Utils_hadoop;
import hadoop_test.sort_test_08.domain.Movie;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class SortDriver {
	
	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");

		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);
		
		job.setJarByClass(SortDriver.class);
		
		job.setMapperClass(SortMapper.class);
	
		job.setMapOutputKeyClass(Movie.class);
		job.setMapOutputValueClass(NullWritable.class);

		FileInputFormat.setInputPaths(job,new Path("/hadoop_test/sort_test/sort.txt"));
		FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/sort_test/result1"));
		
		job.waitForCompletion(true);
	}

}

SortMapper

package hadoop_test.sort_test_08.sort;

import hadoop_test.sort_test_08.domain.Movie;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable, Text,Movie,NullWritable> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		Movie movie = new Movie();
		String line = value.toString();
		String name = line.split(" ")[0];
		int hot=Integer.parseInt(line.split(" ")[1]);
		movie.setName(name);
		movie.setHot(hot);
//		movie类作为key
		context.write(movie, NullWritable.get());
	}
}

输出结果
在这里插入图片描述

3. totalsort

目标:用按照不同数位划分出三个不同的Reducer来处理,并且Reducer内部都整体有序。
在这里插入图片描述

TotalSortDriver

package hadoop_test.sort_test_08.totalsort;

import hadoop_test.Utils_hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class TotalSortDriver {

	public static void main(String[] args) throws Exception {
		System.setProperty("HADOOP_USER_NAME", "root");

		Configuration conf=new Configuration();
		Job job=Job.getInstance(conf);

		job.setJarByClass(TotalSortDriver.class);
		
		job.setMapperClass(TotalSortMapper.class);
		job.setReducerClass(TotalSortReducer.class);

		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(IntWritable.class);
		
		job.setNumReduceTasks(3);
		job.setPartitionerClass(TotalSortPartitioner.class);

		FileInputFormat.setInputPaths(job,new Path("/hadoop_test/sort_test/sort2.txt"));
		FileOutputFormat.setOutputPath(job,new Path("/hadoop_test/sort_test/result2"));
		
		job.waitForCompletion(true);

	}
}

TotalSortMapper

package hadoop_test.sort_test_08.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TotalSortMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String line = value.toString();
		String[] data = line.split(" ");
		for(String num:data){
			context.write(new IntWritable(Integer.parseInt(num)),new IntWritable(1));
		}
	}

}

TotalSortPartitioner

package hadoop_test.sort_test_08.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 *
 * 为什么有不用默认分区?因为默认是key 的hashcode分区,这达不到全排序效果,所以需要自定义分区
 * 此外,回去复习正则的知识
 * @author ysq
 *
 */

public class TotalSortPartitioner extends Partitioner<IntWritable,IntWritable> {

	@Override
	public int getPartition(IntWritable key, IntWritable value, int numPartitions) {
//		key.toString().matches("[0-9]")匹配10一下数子key.toString().matches("[0-9][0-9]匹配0-100
		if(key.toString().matches("[0-9]")|key.toString().matches("[0-9][0-9]")){
			return 0;	// 个位数和两位数在 part-r-00000
		}else if(key.toString().matches("[0-9][0-9][0-9]")){
			return 1;	// 三位数在 part-r-00001
		}else{
			return 2;	// 四位数在 part-r-00002
		}
		
	}

}

TotalSortReducer

package hadoop_test.sort_test_08.totalsort;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;


public class TotalSortReducer extends Reducer<IntWritable,IntWritable ,IntWritable, IntWritable> {
	@Override
	protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
		int result=0;
		for(IntWritable value:values){
			result=result+value.get();
		}
		context.write(key, new IntWritable(result));
	}
}

输出结果
在这里插入图片描述