使用MapReduce求任何两名员工信息传递所需要经过的中间节点数详解大数据
2023-06-13 09:20:27 时间
问题分析
该公司所有员工可以形成入下图的树形结构,求两个员工的沟通的中间节点数,可转换在员工树中求两个节点连通所经过的节点数,即从其中一节点到汇合节点经过节点数加上另一节点到汇合节点经过节点数。例如求M到Q所需节点数,可以先找出M到A经过的节点数,然后找出Q到A经过的节点数,两者相加得到M到Q所需节点数。
求M到Q所需节点数
在作业中首先在Mapper阶段所有员工数据,其中经理数据key为0值、value为”员工编号,员工经理编号”,然后在Reduce阶段把所有员工放到员工列表和员工对应经理链表Map中,最后在Reduce的Cleanup中按照上面说所算法对任意两个员工计算出沟通的路径长度并输出。
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Q10MiddlePersonsCountForComm extends Configured implements Tool { public static class MapClass extends Mapper LongWritable, Text, IntWritable, Text { // 在Mapper阶段所有员工数据,其中经理数据key为0值、value为 员工编号,员工经理编号 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] kv = value.toString().split( , context.write(new IntWritable(0), new Text(kv[0] + , + ( .equals(kv[3]) ? : kv[3]))); } } // 在Reduce阶段把所有员工放到员工列表和员工对应经理链表Map中 public static class Reduce extends Reducer IntWritable, Text, NullWritable, Text { List String employeeList = new ArrayList String // 存放员工编号的链表 Map String, String employeeToManagerMap = new HashMap String, String // 存放员工编号与其上司经理编号的映射关系 public void reduce(IntWritable key, Iterable Text values, Context context) throws IOException, InterruptedException { for (Text value : values) { employeeList.add(value.toString().split( , )[0].trim()); employeeToManagerMap.put(value.toString().split( , )[0].trim(), value.toString().split( , )[1].trim()); } } @Override // 最后在Reduce的Cleanup中按照上面所说算法对任意两个员工计算出沟通的路径长度并输出。 protected void cleanup(Context context) throws IOException, InterruptedException { int totalEmployee = employeeList.size(); int i, j; int distance; System.out.println(employeeList); System.out.println(employeeToManagerMap); // 输出任意两个员工之间的连通路径长度 for (i = 0; i (totalEmployee - 1); i++) { for (j = (i + 1); j totalEmployee; j++) { distance = calculateDistance(i, j); String value = employeeList.get(i) + and + employeeList.get(j) + = + distance; context.write(NullWritable.get(), new Text(value)); } } } private int calculateDistance(int i, int j) { String employeeA = employeeList.get(i); String employeeB = employeeList.get(j); int distance = 0; if (employeeToManagerMap.get(employeeA).equals(employeeB) || employeeToManagerMap.get(employeeB).equals(employeeA)) { distance = 0; } else if (employeeToManagerMap.get(employeeA).equals(employeeToManagerMap.get(employeeB))) { distance = 0; } else { List String employeeA_ManagerList = new ArrayList String List String employeeB_ManagerList = new ArrayList String employeeA_ManagerList.add(employeeA); // A链表中插入A员工编号 String current = employeeA; while (false == employeeToManagerMap.get(current).isEmpty()) { current = employeeToManagerMap.get(current); employeeA_ManagerList.add(current); // 通过A员工编号得到对应的上司经理的编号,并插入链表,最终得到从A开始,且存放向上追溯相关联的经理节点的链表 } employeeB_ManagerList.add(employeeB); // B链表中插入B员工编号 current = employeeB; while (false == employeeToManagerMap.get(current).isEmpty()) { current = employeeToManagerMap.get(current); employeeB_ManagerList.add(current); // 通过B员工编号得到对应的上司经理的编号,并插入链表,最终得到从B开始,且存放向上追溯相关联的经理节点的链表 } int ii = 0, jj = 0; String currentA_manager, currentB_manager; boolean found = false; // 找到A和B链表交集,得到路径长度 for (ii = 0; ii employeeA_ManagerList.size(); ii++) { currentA_manager = employeeA_ManagerList.get(ii); for (jj = 0; jj employeeB_ManagerList.size(); jj++) { currentB_manager = employeeB_ManagerList.get(jj); if (currentA_manager.equals(currentB_manager)) { found = true; break; } } if (found) { break; } } distance = ii + jj - 1; } return distance; } } @Override public int run(String[] args) throws Exception { Job job = new Job(getConf(), Q10MiddlePersonsCountForComm job.setJobName( Q10MiddlePersonsCountForComm job.setJarByClass(Q10MiddlePersonsCountForComm.class); job.setMapperClass(MapClass.class); job.setReducerClass(Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs(); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); job.waitForCompletion(true); return job.isSuccessful() ? 0 : 1; } public static void main(String[] args) throws Exception { int res = ToolRunner.run(new Configuration(), new Q10MiddlePersonsCountForComm(), args); System.exit(res); } }
用于计算的基础数据请参考:http://blog.ytso.com/post/17840.html
9817.html
分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集相关文章
- 腾讯云MySQL数据库架构双节点、三节点和单节点区别对比
- 大数据NiFi(四):NiFi单节点安装
- 【Groovy】自定义 Xml 生成器 BuilderSupport ( 构造 Xml 节点类 | 封装节点名称、节点值、节点属性、子节点 | 将封装的节点数据转为 Xml 字符串 )
- SQLServer 错误 8974 表错误:对象 ID O_ID,索引 ID I_ID,分区 ID PN_ID,分配单元 ID A_ID (类型为 TYPE)。 页 P_ID2,槽 S_ID2 和页 P_ID3,槽 P_ID3 都指向了位于页 P_ID1,槽 S_ID1,文本 ID TEXT_ID 的行外数据节点。 故障 处理 修复 支持远程
- hadoop datanode节点超时时间设置详解大数据
- hadoop添加节点和删除节点详解大数据
- HDFS源码分析(四)—–节点Decommission机制详解大数据
- 深入了解Oracle分布式事务机制,优化多节点系统并确保数据一致性(oracle分布式事务)
- Redis集群:如何增加新节点?(redis增加节点)
- MySQL实现获取父节点数据的递归查询(mysql递归查询父节点)
- MySQL实现递归子节点,让查询数据更方便(mysql递归子节点)
- 使用SQL Server软件构建数据节点,少花钱更高效!(sqlserver 费用)
- Oracle内存告警尽早预防错过恢复节点(oracle内存告警)
- Redis集群实现跨节点数据同步(redis集群间数据同步)
- 据恢复红色神奇故障节点下的数据复原(redis集群节点故障数)
- 搭建16节点Redis集群实现可靠数据存储(redis集群16节点)
- 复制Redis跨节点复制实现数据同步的最佳实践(redis 跨节点)
- 数据使用Redis实现父子节点数据的获取(redis 获取父子节点)
- 借助Redis实现节点数据高一致性(redis节点数据一致性)
- 深入探索Redis节点如何查看数据(redis节点怎么看数据)
- xpath的数据和节点类型以及XPath中节点匹配的基本方法
- 在javascript中关于节点内容加强
- js创建元素(节点)示例
- js树插件zTree获取所有选中节点数据的方法