zl程序教程

您现在的位置是:首页 >  大数据

当前栏目

使用MapReduce求各个城市的员工的总工资详解大数据

数据 使用 详解 员工 城市 各个 MapReduce 工资
2023-06-13 09:20:27 时间

问题分析
求各个城市员工的总工资,需要得到各个城市所有员工的工资,通过对各个城市所有员工工资求和得到总工资。首先和测试例子1类似在Mapper的Setup阶段缓存部门对应所在城市数据,然后在Mapper阶段抽取出key为城市名称(利用缓存数据把部门编号对应为所在城市名称),value为员工工资,接着在Shuffle阶段把传过来的数据处理为城市名称对应该城市所有员工工资,最后在Reduce中按照城市归组,遍历城市所有员工,求出工资总数并输出。

import java.io.BufferedReader;

import java.io.FileReader;

import java.io.IOException;

import java.net.URI;

import java.util.HashMap;

import java.util.HashSet;

import java.util.Map;

import java.util.Set;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class Q4SumCitySalary extends Configured implements Tool {

 public static class MapClass extends Mapper LongWritable, Text, Text, Text  {

 private Map String, String  deptMap = new HashMap String, String 

 private String[] kv;

 @Override

 protected void setup(Context context) throws IOException, InterruptedException {

 BufferedReader in = null;

 try {

 URI[] paths = DistributedCache.getCacheFiles(context.getConfiguration());

 String deptIdName = null;

 for (URI path : paths) {

 if (path.toString().contains( dept )) {

 in = new BufferedReader(new FileReader(path.toString()));

 while (null != (deptIdName = in.readLine())) {

 // key为部门号,value为部门名

 deptMap.put(deptIdName.split( , )[0], deptIdName.split( , )[2]);

 }

 }

 }

 } catch (IOException e) {

 e.printStackTrace();

 } finally {

 try {

 if (in != null) {

 in.close();

 }

 } catch (IOException e) {

 e.printStackTrace();

 }

 }

 }

 // 【map阶段】通过部门号找出每个部门所有的个体的工资,并输出得到: 部门所对应的城市名,每个个体的工资 

 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

 kv = value.toString().split( , 

 if (deptMap.containsKey(kv[7])) {

 if (null != kv[5]   ! .equals(kv[5].toString())) {

 context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));

 }

 }

 }

 }

 public static class Reduce extends Reducer Text, Text, Text, LongWritable  {

 // 【reduce阶段】所干的活就是把相同key中的value做一个累加,输出: 城市名,工资总数 

 public void reduce(Text key, Iterable Text  values, Context context) throws IOException, InterruptedException {

 long sumSalary = 0;

 for (Text val : values) {

 sumSalary += Long.parseLong(val.toString());

 }

 context.write(key, new LongWritable(sumSalary));

 }

 }

 @Override

 public int run(String[] args) throws Exception {

 Job job = new Job(getConf(),  Q4SumCitySalary 

 job.setJobName( Q4SumCitySalary 

 job.setJarByClass(Q4SumCitySalary.class);

 job.setMapperClass(MapClass.class);

 job.setReducerClass(Reduce.class);

 job.setInputFormatClass(TextInputFormat.class);

 job.setOutputFormatClass(TextOutputFormat.class);

 job.setOutputKeyClass(Text.class);

 job.setOutputValueClass(Text.class);

 String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();

 DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());

 FileInputFormat.addInputPath(job, new Path(otherArgs[1]));

 FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));

 job.waitForCompletion(true);

 return job.isSuccessful() ? 0 : 1;

 }

 public static void main(String[] args) throws Exception {

 int res = ToolRunner.run(new Configuration(), new Q4SumCitySalary(), args);

 System.exit(res);

 }

}

用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html

原创文章,作者:Maggie-Hunter,如若转载,请注明出处:https://blog.ytso.com/9811.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集