zl程序教程

您现在的位置是:首页 >  其他

当前栏目

MapReduce明星搜索指数统计,找出人气王详解大数据

搜索统计数据 详解 找出 MapReduce 指数 明星
2023-06-13 09:20:22 时间

1、项目介绍

  本项目我们使用明星搜索指数数据,分别统计出搜索指数最高的男明星和女明星。

2、数据集

  image

3、分析

  基于项目的需求,我们通过以下几步完成:

  1、编写Mapper类,按需求将数据集解析为key=gender,value=name+hotIndex,然后输出。

  2、编写Combiner类,合并Mapper输出结果,然后输出给Reducer。

  3、编写Partitioner类,按性别,将结果指定给不同的Reduce执行。

  4、编写Reducer类,分别统计出男、女明星的最高搜索指数。

  5、编写run方法执行MapReduce任务

4、实现

package com.buaa; 

 

import java.io.IOException; 

 

import org.apache.hadoop.conf.Configuration; 

import org.apache.hadoop.conf.Configured; 

import org.apache.hadoop.fs.FileSystem; 

import org.apache.hadoop.fs.Path; 

import org.apache.hadoop.io.Text; 

import org.apache.hadoop.mapreduce.Job; 

import org.apache.hadoop.mapreduce.Mapper; 

import org.apache.hadoop.mapreduce.Partitioner; 

import org.apache.hadoop.mapreduce.Reducer; 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 

import org.apache.hadoop.util.Tool; 

import org.apache.hadoop.util.ToolRunner; 

 

/** * @ProjectName CountStarSearchIndex 

* @PackageName com.buaa 

* @ClassName SearchStarIndex 

* @Description 统计分别统计出男女明星最大搜索指数 

* @Author 刘吉超 

* @Date 2016-05-12 16:30:23 

*/ public class SearchStarIndex extends Configured implements Tool { // 分隔符/t private static String TAB_SEPARATOR =  /t  // 男 private static String MALE =  male  // 女 private static String FEMALE =  female  

 /* 

 * 解析明星数据 

 */ public static class IndexMapper extends Mapper Object, Text, Text, Text  { /* 

 * 每次调用map(LongWritable key, Text value, Context context)解析一行数据。 

 * 每行数据存储在value参数值中。然后根据 /t 分隔符,解析出明星姓名,性别和搜索指数 

 */ public void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 将数据解析为数组 

 String[] tokens = value.toString().split(TAB_SEPARATOR); 

 if(tokens != null   tokens.length  = 3){ // 性别 

 String gender = tokens[1].trim(); // 名称、关注指数 

 String nameHotIndex = tokens[0].trim() + TAB_SEPARATOR + tokens[2].trim(); 

 // 输出key=gender value=name+hotIndex 

 context.write(new Text(gender), new Text(nameHotIndex)); 

 } 

 } 

 } 

 /* 

 * 根据性别对数据进行分区,将 Mapper的输出结果均匀分布在 reduce上 

 */ public static class IndexPartitioner extends Partitioner Text, Text  { 

 @Override public int getPartition(Text key, Text value, int numReduceTasks) { // 按性别分区 

 String sex = key.toString(); 

 // 默认指定分区 0 if(numReduceTasks == 0) return 0; 

 // 性别为男,选择分区0 if(MALE.equals(sex)){ return 0; 

 }else if(FEMALE.equals(sex)){ // 性别为女,选择分区1 return 1 % numReduceTasks; 

 }else // 性别未知,选择分区2 return 2 % numReduceTasks; 

 

 } 

 } 

 /* 

 * 定义Combiner,对 map端的输出结果,先进行一次合并,减少数据的网络输出 

 */ public static class IndexCombiner extends Reducer Text, Text, Text, Text  { 

 

 @Override public void reduce(Text key, Iterable Text  values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; 

 String name=   

 for (Text val : values) { 

 String[] valTokens = val.toString().split(TAB_SEPARATOR); 

 int hotIndex = Integer.parseInt(valTokens[1]); 

 if(hotIndex   maxHotIndex){ 

 name = valTokens[0]; 

 maxHotIndex = hotIndex; 

 } 

 } 

 

 context.write(key, new Text(name + TAB_SEPARATOR + maxHotIndex)); 

 } 

 } 

 /* 

 * 统计男、女明星最高搜索指数 

 */ public static class IndexReducer extends Reducer Text, Text, Text, Text  { /* 

 * 调用reduce(key, Iterable  Text  values, context)方法来处理每个key和values的集合。 

 * 我们在values集合中,计算出明星的最大搜索指数 

 */ 

 @Override public void reduce(Text key, Iterable Text  values, Context context)throws IOException, InterruptedException { int maxHotIndex = Integer.MIN_VALUE; 

 String name =     

 // 根据key,迭代 values集合,求出最高搜索指数 for (Text val : values) { 

 String[] valTokens = val.toString().split(TAB_SEPARATOR); 

 int hotIndex = Integer.parseInt(valTokens[1]); 

 if (hotIndex   maxHotIndex) { 

 name = valTokens[0]; 

 maxHotIndex = hotIndex; 

 } 

 } 

 

 context.write(new Text(name), new Text(key + TAB_SEPARATOR + maxHotIndex)); 

 } 

 } 

 

 @SuppressWarnings( deprecation ) 

 @Override public int run(String[] args) throws Exception { // 读取配置文件 

 Configuration conf = new Configuration(); 

 // 如果目标文件夹存在,则删除 

 Path mypath = new Path(args[1]); 

 FileSystem hdfs = mypath.getFileSystem(conf); if (hdfs.isDirectory(mypath)) { 

 hdfs.delete(mypath, true); 

 } 

 // 新建一个任务 

 Job job = new Job(conf,  searchStarIndex  // 主类 

 job.setJarByClass(SearchStarIndex.class); 

 // reduce的个数设置为2 

 job.setNumReduceTasks(2); // 设置Partitioner类 

 job.setPartitionerClass(IndexPartitioner.class); 

 // Mapper 

 job.setMapperClass(IndexMapper.class); // Reducer 

 job.setReducerClass(IndexReducer.class); 

 // map 输出key类型 

 job.setMapOutputKeyClass(Text.class); // map 输出value类型 

 job.setMapOutputValueClass(Text.class); 

 // 设置Combiner类 

 job.setCombinerClass(IndexCombiner.class); 

 // 输出结果 key类型 

 job.setOutputKeyClass(Text.class); // 输出结果 value类型 

 job.setOutputValueClass(Text.class); 

 // 输入路径 

 FileInputFormat.addInputPath(job, new Path(args[0])); // 输出路径 

 FileOutputFormat.setOutputPath(job, new Path(args[1])); 

 // 提交任务 return job.waitForCompletion(true) ? 0 : 1; 

 } 

 public static void main(String[] args) throws Exception { 

 String[] args0 = {  hdfs://ljc:9000/buaa/index/index.txt ,  hdfs://ljc:9000/buaa/index/out/  

 }; int ec = ToolRunner.run(new Configuration(), new SearchStarIndex(), args0); 

 System.exit(ec); 

 } 

}

5、运行效果

  image

7701.html

分布式文件系统,分布式数据库区块链并行处理(MPP)数据库,数据挖掘开源大数据平台数据中台数据分析数据开发数据治理数据湖数据采集