使用MapReduce实现温度排序详解大数据
2023-06-13 09:20:26 时间
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class KeyPair implements WritableComparable KeyPair {
private int hot;
private int year;
public int getYear() {
return year;
public void setYear(int year) {
this.year = year;
public int getHot() {
return hot;
public void setHot(int hot) {
this.hot = hot;
public int compareTo(KeyPair o) {
int result = this.year-o.getYear();
if(result!=0){
return result 0?-1:1;
return -( this.hot o.getHot() ? -1 :(this.hot == o.getHot()?0:1));
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(year);
dataOutput.writeInt(hot);
public void readFields(DataInput dataInput) throws IOException {
this.year=dataInput.readInt();
this.hot=dataInput.readInt();
@Override
public String toString() {
return year+"/t"+hot;
@Override
public int hashCode() {
return new Integer(year+hot).hashCode();
}
Sort.java:
package temperaturesort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class Sort extends WritableComparator { public Sort(){ super(KeyPair.class,true); @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair key1 = (KeyPair)a; KeyPair key2 = (KeyPair)b; int result = key1.getYear()-key2.getYear(); if(result!=0){ return result 0?-1:1; return key1.getHot() key2.getHot() ? 1 :(key1.getHot() == key2.getHot()?0:-1); }
Partition.java:
package temperaturesort; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; public class Partition extends Partitioner KeyPair,Text { @Override public int getPartition(KeyPair keyPair, Text text, int num) { return keyPair.getYear()*127 % num; }
Group.java:
package temperaturesort; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.io.WritableComparator; public class Group extends WritableComparator { public Group(){ super(KeyPair.class,true); @Override public int compare(WritableComparable a, WritableComparable b) { KeyPair key1 = (KeyPair)a; KeyPair key2 = (KeyPair)b; return key1.getYear() key2.getYear() ? -1 : (key1.getYear()==key2.getYear()?0:1); }
RunJob.java:
package temperaturesort; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date;
public class RunJob { public static class TempSortMapper extends Mapper Object,Text,KeyPair,Text { static SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strArr=line.split("/t"); if(strArr.length==2){ try { Date date = simpleDateFormat.parse(strArr[0]); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); int year = calendar.get(1); int hot = Integer.parseInt(strArr[1].substring(0,strArr[1].indexOf("C"))); KeyPair keyPair =new KeyPair(); keyPair.setHot(hot); keyPair.setYear(year); /*System.out.println("-------------------------------------------------------------------"); System.out.println(keyPair);*/ context.write(keyPair,value); } catch (ParseException e) { e.printStackTrace(); public static class TempSortReducer extends Reducer KeyPair,Text,KeyPair,Text { @Override protected void reduce(KeyPair key, Iterable Text values, Context context) throws IOException, InterruptedException { for(Text text:values) context.write(key,text); public static void main(String[] args) throws Exception { //System.setProperty("hadoop.home.dir","E://softs//majorSoft//hadoop-2.7.5"); Configuration conf = new Configuration(); conf.set("mapreduce.app-submission.cross-platform", "true"); Path fileInput = new Path("hdfs://mycluster/testFile/hot.txt"); Path fileOutput = new Path("hdfs://mycluster/output/hot"); Job job =Job.getInstance(conf ,"temperatureSort"); job.setJar("E://bigData//hadoopDemo//out//artifacts//wordCount_jar//hadoopDemo.jar"); job.setJarByClass(RunJob.class); job.setMapperClass(TempSortMapper.class); job.setReducerClass(TempSortReducer.class); job.setMapOutputKeyClass(KeyPair.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(3); job.setSortComparatorClass(Sort.class); job.setPartitionerClass(Partition.class); job.setGroupingComparatorClass(Group.class); FileInputFormat.addInputPath(job,fileInput); FileOutputFormat.setOutputPath(job,fileOutput); System.exit(job.waitForCompletion(true)?0:1); }
其中自定义的sort和parititon是在mapTask任务之后使用的,而Group是在reduce任务使用的。
9430.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- 数据透视表上线!如何在纯前端实现这个强大的数据分析功能?
- MySQL之数据排序技巧(mysql排序)
- 使用SQL Server实现数据排序(sqlserver排序)
- 利用 Oracle 中的排序函数实现数据排序(oracle排序函数)
- MySQL实现按组取Top1数据(mysql分组排序取第一条)
- 利用 Oracle 视图实现数据的排序功能(oracle视图排序)
- 利用Oracle排序语句实现数据排序(oracle排序语句)
- 如何使用MySQL进行数据排序——降序与升序的比较(mysql降序升序)
- 排序以SQL Server实现数据逆序排序(sqlserver 逆序)
- SQL Server表排序:花式排列数据增强读取效率(sqlserver表排序)
- Mongodb实现数据排序的方法(mongodb的sort)
- SQL Server支持数据逆序排序的特性(逆序 sqlserver)
- MySQL数据排序如何使用ASC进行升序排列(mysql中asc升序)
- MySQL实现数据去重排序方法(mysql中取不重复)
- Oracle中利用降序排序实现数据有效排序(oracle中降排序)
- Mysql实现数据排序上移下移操作详解(mysql上移下移)
- MySQL如何应对上亿数据的排序问题(mysql 上亿数据排序)
- Oracle中对数据按百分比排序的分析(oracle以百分比排序)
- Oracle中如何利用排序号实现数据排序(oracle中排序号)
- Oracle中排名函数有效果实现数据排序(oracle中排名函数)
- Oracle中一列数据排序的简单实现(oracle一列数排序)
- Redis实现数据查询与排序(redis 获取数据排序)
- sql获取分组排序后数据的脚本