zl程序教程

您现在的位置是:首页 >  后端

当前栏目

使用MapReduce实现温度排序详解大数据

排序数据 实现 使用 详解 MapReduce 温度
2023-06-13 09:20:26 时间
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.WritableComparable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; public class KeyPair implements WritableComparable KeyPair { private int hot; private int year; public int getYear() { return year; public void setYear(int year) { this.year = year; public int getHot() { return hot; public void setHot(int hot) { this.hot = hot; public int compareTo(KeyPair o) { int result = this.year-o.getYear(); if(result!=0){ return result 0?-1:1; return -( this.hot o.getHot() ? -1 :(this.hot == o.getHot()?0:1)); public void write(DataOutput dataOutput) throws IOException { dataOutput.writeInt(year); dataOutput.writeInt(hot); public void readFields(DataInput dataInput) throws IOException { this.year=dataInput.readInt(); this.hot=dataInput.readInt(); @Override public String toString() { return year+"/t"+hot; @Override public int hashCode() { return new Integer(year+hot).hashCode(); }

Sort.java:

package temperaturesort; 

import org.apache.hadoop.io.WritableComparable; 

import org.apache.hadoop.io.WritableComparator; 

public class Sort extends WritableComparator { 

 public Sort(){ 

 super(KeyPair.class,true); 

 @Override 

 public int compare(WritableComparable a, WritableComparable b) { 

 KeyPair key1 = (KeyPair)a; 

 KeyPair key2 = (KeyPair)b; 

 int result = key1.getYear()-key2.getYear(); 

 if(result!=0){ 

 return result 0?-1:1; 

 return key1.getHot() key2.getHot() ? 1 :(key1.getHot() == key2.getHot()?0:-1); 

}

Partition.java:

package temperaturesort; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Partitioner; 

public class Partition extends Partitioner KeyPair,Text { 

 @Override 

 public int getPartition(KeyPair keyPair, Text text, int num) { 

 return keyPair.getYear()*127 % num; 

}

Group.java:

package temperaturesort; 

import org.apache.hadoop.io.WritableComparable; 

import org.apache.hadoop.io.WritableComparator; 

public class Group extends WritableComparator { 

 public Group(){ 

 super(KeyPair.class,true); 

 @Override 

 public int compare(WritableComparable a, WritableComparable b) { 

 KeyPair key1 = (KeyPair)a; 

 KeyPair key2 = (KeyPair)b; 

 return key1.getYear() key2.getYear() ? -1 : (key1.getYear()==key2.getYear()?0:1); 

}

RunJob.java:

package temperaturesort; 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.fs.Path; 

import org.apache.hadoop.io.IntWritable; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 

import org.apache.hadoop.mapreduce.Mapper; 

import org.apache.hadoop.mapreduce.Reducer; 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

import java.io.IOException; 

import java.text.ParseException; 

import java.text.SimpleDateFormat; 

import java.util.Calendar; 

import java.util.Date; 


public class RunJob { public static class TempSortMapper extends Mapper Object,Text,KeyPair,Text { static SimpleDateFormat simpleDateFormat =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] strArr=line.split("/t"); if(strArr.length==2){ try { Date date = simpleDateFormat.parse(strArr[0]); Calendar calendar = Calendar.getInstance(); calendar.setTime(date); int year = calendar.get(1); int hot = Integer.parseInt(strArr[1].substring(0,strArr[1].indexOf("C"))); KeyPair keyPair =new KeyPair(); keyPair.setHot(hot); keyPair.setYear(year); /*System.out.println("-------------------------------------------------------------------"); System.out.println(keyPair);*/ context.write(keyPair,value); } catch (ParseException e) { e.printStackTrace(); public static class TempSortReducer extends Reducer KeyPair,Text,KeyPair,Text { @Override protected void reduce(KeyPair key, Iterable Text values, Context context) throws IOException, InterruptedException { for(Text text:values) context.write(key,text); public static void main(String[] args) throws Exception { //System.setProperty("hadoop.home.dir","E://softs//majorSoft//hadoop-2.7.5"); Configuration conf = new Configuration(); conf.set("mapreduce.app-submission.cross-platform", "true"); Path fileInput = new Path("hdfs://mycluster/testFile/hot.txt"); Path fileOutput = new Path("hdfs://mycluster/output/hot"); Job job =Job.getInstance(conf ,"temperatureSort"); job.setJar("E://bigData//hadoopDemo//out//artifacts//wordCount_jar//hadoopDemo.jar"); job.setJarByClass(RunJob.class); job.setMapperClass(TempSortMapper.class); job.setReducerClass(TempSortReducer.class); job.setMapOutputKeyClass(KeyPair.class); job.setMapOutputValueClass(Text.class); job.setNumReduceTasks(3); job.setSortComparatorClass(Sort.class); job.setPartitionerClass(Partition.class); job.setGroupingComparatorClass(Group.class); FileInputFormat.addInputPath(job,fileInput); FileOutputFormat.setOutputPath(job,fileOutput); System.exit(job.waitForCompletion(true)?0:1); }

其中自定义的sort和parititon是在mapTask任务之后使用的,而Group是在reduce任务使用的。

9430.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集