zl程序教程

您现在的位置是:首页 >  后端

当前栏目

大数据入门第九天——MapReduce详解(五)mapJoin、GroupingComparator与更多MR实例

实例数据入门 详解 MapReduce MR
2023-09-27 14:23:44 时间

一、数据倾斜分析——mapJoin

  1.背景

    接上一个day的Join算法,我们的解决join的方式是:在reduce端通过pid进行串接,这样的话:

--order
1001,20150710,P0001,2
1002,20150710,P0001,3
1002,20150710,P0002,3
--product
P0001,小米5,1000,2
P0002,锤子T1,1000,3

    例如订单中的小米5卖的比较好(截止博客时间,已经是米7将出的时候了。),这样的话大部分的数据都流向了P0001的这个reduce上,而P0002

的锤子的reduce确很轻松,这样,就产生了数据倾斜了!

    更多的数据倾斜的介绍,参考http://blog.csdn.net/u010039929/article/details/55044407

    我们这里用的是比较简单的map端join!也就是不需要通过reduce来串接了。具体来说就是在map端就直接拼接好,不需要reduce来拼接;那我们就需要在map的阶段进行join连接,也就是map端就需要能够连接,那就是产品全表(字典表)需要在map端就有这个字典表,放在内存而不放在输入文件。这里

mapreduce给我们提供了一个很棒的解决方案:DistributedCache,了解这个,可以参考http://blog.csdn.net/lzm1340458776/article/details/42971075

    相关的分布式缓存的用法,参考http://blog.csdn.net/qq1010885678/article/details/50751007

    当然,首先应当查看的,应该是官方文档的介绍:点击查看

  2.代码

package com.mr.mapjoin;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.Map;

/**
 * mapper
 *
 * @author zcc ON 2018/2/5
 **/
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    Map<String, String> infoMap = new HashMap<>();
    Text k = new Text();
    /**
     * 启动之前进行一些必要的初始化工作
     * @param context 上下文
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        // BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt")));
        String path = context.getCacheFiles()[0].getPath();
        BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
        String line;
        while (StringUtils.isNotEmpty(line = br.readLine())) {
            String[] fields = line.split(",");
            // 将字典加载进入map
            infoMap.put(fields[0], fields[1]);
        }
        // 关闭流
        br.close();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String orderLine = value.toString();
        // 切分订单信息
        String[] fields = orderLine.split("\t");
        String pName = infoMap.get(fields[1]);
        k.set(orderLine + "\t" + pName);
        // 写出去
        context.write(k, NullWritable.get());
    }
}
MapJoinMapper
package com.mr.mapjoin;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.net.URI;

/**
 * driver
 *
 * @author zcc ON 2018/2/5
 **/
public class MapJoinDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        // 初始化Job
        Job job = Job.getInstance(conf);
        // job相关配置
        job.setJarByClass(MapJoinDriver.class);
        job.setMapperClass(MapJoinMapper.class);
        // 这里直接省略reduce阶段(map端已经完成)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        FileInputFormat.setInputPaths(job, new Path("F:/input"));
        FileOutputFormat.setOutputPath(job, new Path("F:/output"));
        // 指定缓存
        /* job.addArchiveToClassPath(archive); */// 缓存jar包到task运行节点的classpath中
        /* job.addFileToClassPath(file); */// 缓存普通文件到task运行节点的classpath中
        /* job.addCacheArchive(uri); */// 缓存压缩包文件到task运行节点的工作目录
        /* job.addCacheFile(uri) */// 缓存普通文件到task运行节点的工作目录
        job.addCacheFile(new URI("file:/F:/pd.txt"));
        // 指定不需要reduce
        job.setNumReduceTasks(0);
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
MapJoinDriver

   更多,待补充

二、倒排索引建立

   1.需求     

      需求:有大量的文本(文档、网页),需要建立搜索索引

     

      // 对比与wordcount的不同

     2.思路

      把单词和文件作为key

    3.代码

package com.mr.inverseindex;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

/**
 * mapper
 *
 * @author zcc ON 2018/2/5
 **/
public class InverseIndexMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(" ");
        // 得到文件名
        FileSplit fileSplit = (FileSplit) context.getInputSplit();
        String fileName = fileSplit.getPath().getName();
        for (String field : fields) {
            k.set(field + "--" + fileName);
            context.write(k, v);
        }
    }
}
InverseIndexMapper
package com.mr.inverseindex;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reducer
 *
 * @author zcc ON 2018/2/5
 **/
public class InverseIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    IntWritable v = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int count = 0;
        for (IntWritable value : values) {
            count += value.get();
        }
        v.set(count);
        context.write(key, v);
    }
}
InverseIndexReducer
package com.mr.inverseindex;

import com.mr.wordcount.WordCountDriver;
import com.mr.wordcount.WordCountMapper;
import com.mr.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * driver
 *
 * @author zcc ON 2018/2/5
 **/
public class InverseIndexDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置本程序jar包本地位置
        job.setJarByClass(InverseIndexDriver.class);
        // 指定本业务job要使用的mapper/reducer业务类
        job.setMapperClass(InverseIndexMapper.class);
        job.setReducerClass(InverseIndexReducer.class);
        // 指定map输出的数据类型(由于可插拔的序列化机制导致)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.setInputPaths(job, new Path("F:/input"));
        FileOutputFormat.setOutputPath(job, new Path("F:/output"));
        // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
        // job.submit();
        // 反馈集群信息
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
InverseIndexDriver

    输出结果:

hello--a.txt    3
hello--b.txt    2
hello--c.txt    2
jerry--a.txt    1
jerry--b.txt    3
jerry--c.txt    1
tom--a.txt    2
tom--b.txt    1
tom--c.txt    1

    可以看到,很多时候是会出现一次无法解决的情况,需要配合多次mapreduce配合!

   再次在结果上执行mapreduce:

package cn.itcast.bigdata.mr.inverindex;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class IndexStepTwo {
    public static class IndexStepTwoMapper extends Mapper<LongWritable, Text, Text, Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] files = line.split("--");
            context.write(new Text(files[0]), new Text(files[1]));
        }
    }
    public static class IndexStepTwoReducer extends Reducer<Text, Text, Text, Text>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer();
            for (Text text : values) {
                sb.append(text.toString().replace("\t", "-->") + "\t");
            }
            context.write(key, new Text(sb.toString()));
        }
    }
    public static void main(String[] args) throws Exception {
        
        if (args.length < 1 || args == null) {
            args = new String[]{"D:/temp/out/part-r-00000", "D:/temp/out2"};
        }
        
        Configuration config = new Configuration();
        Job job = Job.getInstance(config);
        
        job.setMapperClass(IndexStepTwoMapper.class);
        job.setReducerClass(IndexStepTwoReducer.class);
//        job.setMapOutputKeyClass(Text.class);
//        job.setMapOutputValueClass(Text.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        System.exit(job.waitForCompletion(true) ? 1:0);
    }
}
IndexStepTwo

   所以,总结诸多示例来说,使用Mapreduce的最大的关键是确定什么作为key,因为key相同的会归并到reduce进行处理,接下来的示例也都是这个思路!

三、寻找共同好友

  1.需求   

    以下是qq的好友列表数据,冒号前是一个用户,冒号后是该用户的所有好友(数据中的好友关系是单向的)

A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

   2.分析

    前面已经提到,遇到mapreduce问题,有时候一步不好解决,可以逐步逼近,多次求解!

#1.1求出哪些人都有好友c,也就是求出c是哪些人的共同好友
c --> a    b d f g
    #2.1得到有关c的共同好友的关系
    a-b c
    a-d c
    a-g c
#1.2同理,求出d是哪些人的共同好友
d --> a e g h
    #2.2同理,得到有关d的共同好友的关系
    a-e d
    a-g d

#3对第二步结果再进行mapreduce,则得到例如a-g c d这样的共同好友列表了!

  3.阶段一代码: 

package com.mr.fans;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * mapper
 *
 * @author zcc ON 2018/2/6
 **/
public class SharedFriendsMapper extends Mapper<LongWritable, Text, Text, Text>{
    Text k = new Text();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(":");
        // 切割用户和好友 A:B,C,D
        String person = fields[0];
        String[] friends = fields[1].split(",");
        for (String friend : friends) {
            k.set(friend);
            v.set(person);
            // 输出K-V对,<好友,用户>
            context.write(k, v);
        }
    }
}
SharedFriendsMapper
package com.mr.fans;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reducer
 *
 * @author zcc ON 2018/2/6
 **/
public class SharedFriendsReducer extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer();
        for (Text value : values) {
            sb.append(value).append("-");
        }
        // 去除最后一个字符(,)
        sb.deleteCharAt(sb.length() - 1);
        v.set(sb.toString());
        context.write(key, v);
    }
}
SharedFriendsReducer
package com.mr.fans;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * driver
 *
 * @author zcc ON 2018/2/6
 **/
public class SharedFriendsDriver {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置本程序jar包本地位置
        job.setJarByClass(SharedFriendsDriver.class);
        // 指定本业务job要使用的mapper/reducer业务类
        job.setMapperClass(SharedFriendsMapper.class);
        job.setReducerClass(SharedFriendsReducer.class);
        // 指定map输出的数据类型(由于可插拔的序列化机制导致)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
        FileInputFormat.setInputPaths(job, new Path("F:/input"));
        FileOutputFormat.setOutputPath(job, new Path("F:/output"));
        // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
        // job.submit();
        // 反馈集群信息
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 :1);
    }
}
SharedFriendsDriver

  阶段一结果:

A    I-K-C-B-G-F-H-O-D
B    A-F-J-E
C    A-E-B-H-F-G-K
D    G-C-K-A-L-F-E-H
E    G-M-L-H-A-F-B-D
F    L-M-D-C-G-A
G    M
H    O
I    O-C
J    O
K    B
L    D-E
M    E-F
O    A-H-I-J-F

   这里提一下,经过查证,输出的k,v之间的默认分隔符是“\t”,我们也可以自己定义:

conf.set("mapred.textoutputformat.separator", ";");

    更多自定义分隔符,参考http://www.xuebuyuan.com/2132293.html

   4.阶段二代码 

package com.mr.fans;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Arrays;

/**
 * mapper step2
 *
 * @author zcc ON 2018/2/6
 **/
public class SharedFriendsMapper2 extends Mapper<LongWritable, Text, Text, Text>{
    Text k = new Text();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // A    B-C-D-E-F(默认分隔符\t)
        String line = value.toString();
        String[] fields = line.split("\t");
        // 切割用户和好友
        String friend = fields[0];
        String[] persons = fields[1].split("-");
        // 使用工具排序
        Arrays.sort(persons);
        for (int i = 0; i < persons.length - 2; i++) {
            for (int j = i + 1; j < persons.length -1; j++) {
                k.set(persons[i] + "-" + persons[j]);
                v.set(friend);
                // 将B-C A进行写出!,这样相同的X-Y的对会归集到同一个reduce
                context.write(k, v);
            }
        }
    }
}
SharedFriendsMapper2
package com.mr.fans;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reducer step2
 *
 * @author zcc ON 2018/2/6
 **/
public class SharedFriendsReducer2 extends Reducer<Text, Text, Text, Text> {
    Text v = new Text();
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer();
        for (Text value : values) {
            sb.append(value).append(",");
        }
        // 去除最后一个字符(,)
        sb.deleteCharAt(sb.length() - 1);
        v.set(sb.toString());
        context.write(key, v);
    }
}
SharedFriendsReducer2
package com.mr.fans;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * step2
 *
 * @author zcc ON 2018/2/6
 **/
public class SharedFriendsDriver2 {
    public static void main(String[] args) throws Exception{
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置本程序jar包本地位置
        job.setJarByClass(SharedFriendsDriver2.class);
        // 指定本业务job要使用的mapper/reducer业务类
        job.setMapperClass(SharedFriendsMapper2.class);
        job.setReducerClass(SharedFriendsReducer2.class);
        // 指定map输出的数据类型(由于可插拔的序列化机制导致)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        // 指定job的原始输入/输出目录(可以改为由外面输入,而不必写死)
        FileInputFormat.setInputPaths(job, new Path("F:/output"));
        FileOutputFormat.setOutputPath(job, new Path("F:/output2"));
        // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
        // job.submit();
        // 反馈集群信息
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 :1);
    }
}
SharedFriendsDriver2

  部分结果示例:  

A-B    C,E
A-C    F,D
A-D    E,F
A-E    B,C,D
A-F    C,D,B,E,O
A-G    D,E,F,C
A-H    E,O,C,D
A-I    O
A-K    D
A-L    F,E

四、web日志预处理

  1.需求:

    对web访问日志中的各字段识别切分

    去除日志中不合法的记录

    根据KPI统计需求,生成各类访问请求过滤数据

  部分日志示例:

194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"
183.49.46.228 - - [18/Sep/2013:06:49:23 +0000] "-" 400 0 "-" "-"
163.177.71.12 - - [18/Sep/2013:06:49:33 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
163.177.71.12 - - [18/Sep/2013:06:49:36 +0000] "HEAD / HTTP/1.1" 200 20 "-" "DNSPod-Monitor/1.0"
View Code

     总的来说,就是根据业务逻辑进行日志的清洗

   2.代码

    这里暂不深入了。给出核心代码: 

public class WebLogBean {
    
    private String remote_addr;// 记录客户端的ip地址
    private String remote_user;// 记录客户端用户名称,忽略属性"-"
    private String time_local;// 记录访问时间与时区
    private String request;// 记录请求的url与http协议
    private String status;// 记录请求状态;成功是200
    private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    private String http_referer;// 用来记录从那个页面链接访问过来的
    private String http_user_agent;// 记录客户浏览器的相关信息

    private boolean valid = true;// 判断数据是否合法

    
    
    public String getRemote_addr() {
        return remote_addr;
    }

    public void setRemote_addr(String remote_addr) {
        this.remote_addr = remote_addr;
    }

    public String getRemote_user() {
        return remote_user;
    }

    public void setRemote_user(String remote_user) {
        this.remote_user = remote_user;
    }

    public String getTime_local() {
        return time_local;
    }

    public void setTime_local(String time_local) {
        this.time_local = time_local;
    }

    public String getRequest() {
        return request;
    }

    public void setRequest(String request) {
        this.request = request;
    }

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    public String getBody_bytes_sent() {
        return body_bytes_sent;
    }

    public void setBody_bytes_sent(String body_bytes_sent) {
        this.body_bytes_sent = body_bytes_sent;
    }

    public String getHttp_referer() {
        return http_referer;
    }

    public void setHttp_referer(String http_referer) {
        this.http_referer = http_referer;
    }

    public String getHttp_user_agent() {
        return http_user_agent;
    }

    public void setHttp_user_agent(String http_user_agent) {
        this.http_user_agent = http_user_agent;
    }

    public boolean isValid() {
        return valid;
    }

    public void setValid(boolean valid) {
        this.valid = valid;
    }
    
    
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.valid);
        sb.append("\001").append(this.remote_addr);
        sb.append("\001").append(this.remote_user);
        sb.append("\001").append(this.time_local);
        sb.append("\001").append(this.request);
        sb.append("\001").append(this.status);
        sb.append("\001").append(this.body_bytes_sent);
        sb.append("\001").append(this.http_referer);
        sb.append("\001").append(this.http_user_agent);
        return sb.toString();
}
}
日志Bean
public class WebLogParser {
    public static WebLogBean parser(String line) {
        WebLogBean webLogBean = new WebLogBean();
        String[] arr = line.split(" ");
        if (arr.length > 11) {
            webLogBean.setRemote_addr(arr[0]);
            webLogBean.setRemote_user(arr[1]);
            webLogBean.setTime_local(arr[3].substring(1));
            webLogBean.setRequest(arr[6]);
            webLogBean.setStatus(arr[8]);
            webLogBean.setBody_bytes_sent(arr[9]);
            webLogBean.setHttp_referer(arr[10]);
            
            if (arr.length > 12) {
                webLogBean.setHttp_user_agent(arr[11] + " " + arr[12]);
            } else {
                webLogBean.setHttp_user_agent(arr[11]);
            }
            if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
                webLogBean.setValid(false);
            }
        } else {
            webLogBean.setValid(false);
        }
        return webLogBean;
    }
   
    public static String parserTime(String time) {
        
        time.replace("/", "-");
        return time;
        
    }
}
原始日志处理类Preser
public class WeblogPreProcess {

    static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        Text k = new Text();
        NullWritable v = NullWritable.get();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String line = value.toString();
            WebLogBean webLogBean = WebLogParser.parser(line);
            if (!webLogBean.isValid())
                return;
            k.set(webLogBean.toString());
            context.write(k, v);

        }

    }

    public static void main(String[] args) throws Exception {
        
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        
        job.setJarByClass(WeblogPreProcess.class);
        
        job.setMapperClass(WeblogPreProcessMapper.class);
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        
        job.waitForCompletion(true);
        
    }
}
mapreduce程序

     字符串、日期时间等的处理需要加强!

五、自定义GroupingComparator

  1.需求

    有如下订单:

    

    现在需要求出每一个订单中成交金额最大的一笔交易

  2.分析   

      1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce

    2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

  注意,这不是一种常规的做法,只是我们刚好利用了它的一种机制!

   默认的分组WritableComparator是通过文本的内容是否相同来决定是否是同一个Key,从而在reduce进行分组,我们只需要修改这个,即可!

   3.代码

  核心代码:

    比较器:这里我们选择继承它的一个通用实现:WritableComparator

public class OrderComparator extends WritableComparator{
    protected OrderComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 强转为子类
        OrderBean bean1 = (OrderBean) a;
        OrderBean bean2 = (OrderBean) b;
        // 将ID是否相同视为一组
        return bean1.getItemId().compareTo(bean2.getItemId());
    }
}
/** 
 * 自定义分组比较器 
 * @author 廖*民 
 * time : 2015年1月18日下午9:15:26 
 * @version 
 */  
class MyGroupComparator implements RawComparator<CombineKey> {  
  
    // 分组策略中,这个方法不是重点  
    public int compare(CombineKey o1, CombineKey o2) {  
        // TODO Auto-generated method stub  
        return 0;  
    }  
  
    /** 
     * b1 表示第一个参与比较的字节数组 
     * s1 表示第一个字节数组中开始比较的位置  
     * l1 表示第一个字节数组中参与比较的字节长度  
     * b2 表示第二个参与比较的字节数组  
     * s2 表示第二个字节数组中开始比较的位置  
     * l2 表示第二个字节数组参与比较的字节长度 
     */  
    public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {  
  
        // 这里是按第CombineKey中的第一个元素进行分组,因为是long类型,所以是8个字节  
        return WritableComparator.compareBytes(b1, s1, 8, b2, s2, 8);  
    }  
  
直接实现RawComparator

    OrderBean的比较逻辑:

  @Override
    public int compareTo(OrderBean o) {
        // 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
        int cmp = this.getItemId().compareTo(o.getItemId());
        if (0 == cmp) {
            return -Double.compare(this.getAmount(), o.getAmount());
        }
        return cmp;
    }

    job中进行设置分组器:

     // 关键步骤,设置分组器
        job.setGroupingComparatorClass(OrderComparator.class);

  完整代码:

package com.mr.group;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * mapper
 *
 * @author zcc ON 2018/2/6
 **/
public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
    OrderBean bean = new OrderBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split(",");
        String itemId = fields[0];
        double amount = Double.parseDouble(fields[2]);
        bean.set(itemId, amount);
        context.write(bean, NullWritable.get());
    }
}
GroupMapper
package com.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * reducer
 *
 * @author zcc ON 2018/2/6
 **/
public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 设置了自定义分组策略后,到这里就是ID相同则为一组了,取出第一个key则为结果
        context.write(key, NullWritable.get());
    }
}
GroupReducer
package com.mr.group;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * compator
 *
 * @author zcc ON 2018/2/6
 **/
public class OrderComparator extends WritableComparator{
    protected OrderComparator() {
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        // 强转为子类
        OrderBean bean1 = (OrderBean) a;
        OrderBean bean2 = (OrderBean) b;
        // 将ID是否相同视为一组
        return bean1.getItemId().compareTo(bean2.getItemId());
    }
}
OrderComparator
package com.mr.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * bean
 *
 * @author zcc ON 2018/2/6
 **/
public class OrderBean implements WritableComparable<OrderBean>{
    private String itemId;
    private Double amount;

    public OrderBean() {
    }

    public OrderBean(String itemId, Double amount) {
        this.itemId = itemId;
        this.amount = amount;
    }
    public void set(String itemId, Double amount) {
        this.itemId = itemId;
        this.amount = amount;
    }
    public String getItemId() {
        return itemId;
    }

    public void setItemId(String itemId) {
        this.itemId = itemId;
    }

    public Double getAmount() {
        return amount;
    }

    public void setAmount(Double amount) {
        this.amount = amount;
    }

    @Override
    public int compareTo(OrderBean o) {
        // 注意比较的逻辑,id相等,则比较amount,否则比较id即可,这样相同id就连在一起了
        int cmp = this.getItemId().compareTo(o.getItemId());
        if (0 == cmp) {
            return -Double.compare(this.getAmount(), o.getAmount());
        }
        return cmp;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(itemId);
        out.writeDouble(amount);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.itemId = in.readUTF();
        this.amount = in.readDouble();
    }

    @Override
    public String toString() {
        return itemId + "\t" + amount;
    }
}
OrderBean
package com.mr.group;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * driver
 *
 * @author zcc ON 2018/2/6
 **/
public class GroupDriver {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);
        // 设置本程序jar包本地位置
        job.setJarByClass(GroupDriver.class);
        // 指定本业务job要使用的mapper/reducer业务类
        job.setMapperClass(GroupMapper.class);
        job.setReducerClass(GroupReducer.class);
        // 指定map输出的数据类型(由于可插拔的序列化机制导致)
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 指定最终输出(reduce)的的数据类型(可选,因为有时候不需要reduce)
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);
        // 关键步骤,设置分组器
        job.setGroupingComparatorClass(OrderComparator.class);
        FileInputFormat.setInputPaths(job, new Path("F:/input"));
        FileOutputFormat.setOutputPath(job, new Path("F:/output"));
        // 提交(将job中的相关参数以及java类所在的jar包提交给yarn运行)
        // job.submit();
        // 反馈集群信息
        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}
GroupDriver