zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

使用MapReduce列出工资最高的头三名员工姓名及其工资详解大数据

数据 使用 详解 及其 员工 最高 MapReduce 工资
2023-06-13 09:20:27 时间

问题分析
求工资最高的头三名员工姓名及工资,可以通过冒泡法得到。在Mapper阶段输出经理数据和员工对应经理表数据,其中经理数据key为0值、value为”员工姓名,员工工资”;最后在Reduce中通过冒泡法遍历所有员工,比较员工工资多少,求出前三名。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class Q8SalaryTop3Salary extends Configured implements Tool {

 public static class MapClass extends Mapper LongWritable, Text, IntWritable, Text  {

 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 String[] kv = value.toString().split( , 

 //  员工姓名,工资 

 context.write(new IntWritable(0), new Text(kv[1].trim() +  ,  + kv[5].trim()));

 }

 }

 public static class Reduce extends Reducer IntWritable, Text, Text, Text  {

 public void reduce(IntWritable key, Iterable Text  values, Context context) throws IOException, InterruptedException {

 String empName;

 String firstEmpName =  

 String secondEmpName =  

 String thirdEmpName =  

 

 long empSalary = 0;

 long firstEmpSalary = 0;

 long secondEmpSalary = 0;

 long thirdEmpSalary = 0;

 for (Text val : values) {

 empName = val.toString().split( , )[0];

 empSalary = Long.parseLong(val.toString().split( , )[1]);

 

 if(empSalary   firstEmpSalary) {

 thirdEmpName = secondEmpName;

 thirdEmpSalary = secondEmpSalary;

 secondEmpName = firstEmpName;

 secondEmpSalary = firstEmpSalary;

 firstEmpName = empName;

 firstEmpSalary = empSalary;

 } else if (empSalary   secondEmpSalary) {

 thirdEmpName = secondEmpName;

 thirdEmpSalary = secondEmpSalary;

 secondEmpName = empName;

 secondEmpSalary = empSalary;

 } else if (empSalary   thirdEmpSalary) {

 thirdEmpName = empName;

 thirdEmpSalary = empSalary;

 }

 }

 

 context.write(new Text(  First employee name:  + firstEmpName), new Text( Salary:  + firstEmpSalary));

 context.write(new Text(  Second employee name:  + secondEmpName), new Text( Salary:  + secondEmpSalary));

 context.write(new Text(  Third employee name:  + thirdEmpName), new Text( Salary:  + thirdEmpSalary));

 }

 }

 @Override

 public int run(String[] args) throws Exception {

 Job job = new Job(getConf(),  Q8SalaryTop3Salary 

 job.setJobName( Q8SalaryTop3Salary 

 job.setJarByClass(Q8SalaryTop3Salary.class);

 job.setMapperClass(MapClass.class);

 job.setReducerClass(Reduce.class);

 job.setMapOutputKeyClass(IntWritable.class); 

 job.setMapOutputValueClass(Text.class);

 job.setInputFormatClass(TextInputFormat.class);

 job.setOutputKeyClass(Text.class);

 job.setOutputFormatClass(TextOutputFormat.class);

 job.setOutputValueClass(Text.class);

 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();

 FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

 FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

 job.waitForCompletion(true);

 return job.isSuccessful() ? 0 : 1;

 }

 public static void main(String[] args) throws Exception {

 int res = ToolRunner.run(new Configuration(), new Q8SalaryTop3Salary(), args);

 System.exit(res);

 }

}

用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html

9815.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集