zl程序教程

您现在的位置是:首页 >  其他

当前栏目

System|分布式|MapReduce Lab

2023-03-15 22:02:11 时间

之前写分布式系统课的Lab4的时候只顾埋头做,偶然发现居然是MIT 6.824的lab1的java移植版。正好借此机会复习并整理一下。

Framework

Mapper

public static void doMap(String jobName, int mapTask, String inFile, int nReduce, MapFunc mapF) {
        List<KeyValue> mapResult = mapF.map(inFile, readFile(Paths.get(inFile),false));
        mapResult
                .stream()
                .collect(Collectors.groupingBy(kv -> hashCode(kv.key) % nReduce))
                .forEach((reduceTask, value) -> writeFile(Paths.get(Utils.reduceName(jobName, mapTask, reduceTask)), JSON.toJSONString(value)));
    }

之前的文章讲过,map本质上就是

map (in_key, in_value) -> list(out_key, intermediate_value)

这里的这个过程通过传入的MapFunc完成,暂且滞后。readFile是我自己实现的工具函数,可忽视,参数里的false指的是读的不是临时文件不需要删除。

生成KV列表之后,通过stream转变为流,通过hashCode%R进行分组。

然后对每个分组,通过规则输出到对应的中间文件中。

    public static String reduceName(String jobName, int mapTask, int reduceTask) {
        return "mrtmp." + jobName + "-" + mapTask+ "-" + reduceTask;
    }

Reducer

public static void doReduce(String jobName, int reduceTask, String outFile, int nMap, ReduceFunc reduceF) {
        Stream<List<KeyValue>> MapResults= IntStream.range(0, nMap)
                .mapToObj(m -> JSON.parseArray(Mapper.readFile(Paths.get(Utils.reduceName(jobName, m, reduceTask)),true), KeyValue.class));
        Map<String,List<KeyValue>> MapFlatResults = MapResults.flatMap(Collection::stream).collect(Collectors.groupingBy(kv -> kv.key));
        Map<String,String> res = new TreeMap<>();
        for(Map.Entry<String,List<KeyValue>> entry : MapFlatResults.entrySet()){
            res.put(entry.getKey(),reduceF.reduce(entry.getKey(),entry.getValue().stream().map(kv -> kv.value).toArray(String[]::new)));
        }
        Mapper.writeFile(Paths.get(outFile),JSON.toJSONString(res));

    }

首先是从0->M,读取所有的中间文件,第二个参数是true表示需要删除中间文件。结果存储在Stream<List<KeyValue>>中。

然后使用flatMap对于流进行处理,合并list,按照key进行分组。

reduce (out_key, list(intermediate_value)) -> out_value

但是简单的按key分组之后,得到的是Map<String,List<KeyValue>>而不是Map<String,List<Value>>,因此用下面的步骤取出Value

entry.getValue().stream().map(kv -> kv.value).toArray(String[]::new)

这样,通过对每个<key,list<value>>执行ReduceFunc,即可得到最终的结果并汇总输出,输出结果是JSON的map。


Application

上面的过程是整个框架的抽象,相当于设计模式的模板方法,实际上执行的Reduce、Map函数都需要我们自己实现。

WordCount

统计多个文件中单词的数目

public class WordCount {

    public static List<KeyValue> mapFunc(String file, String value) {
        // Your code here (Part II)
        Pattern pattern = Pattern.compile("[a-zA-Z0-9]+");
    Matcher matcher = pattern.matcher(value);
        List<KeyValue> kvs = new ArrayList<>();
        while (matcher.find()) {
            kvs.add(new KeyValue(matcher.group(), ""));
        }
        return kvs;
}

    public static String reduceFunc(String key, String[] values) {
        // Your code here (Part II)
        return Integer.toString(values.length);
    }
}

输出则是为每个匹配的单词都创建<word,null>。

个人感觉如果是输出<word,count>会更好,Reduce部分只需要sum,而且节约空间。

map (file, content) -> list(word, null)

单纯计算传入列表的长度即可。如果上面是<word,count>,这里则进行sum。

reduce (word, list(null)) -> count

InvertedIndex

统计单词所在的文件,以及总文件数

public class InvertedIndex {

    public static List<KeyValue> mapFunc(String file, String value) {
            // Your code here (Part V)
            Pattern pattern = Pattern.compile("[a-zA-Z0-9]+");
            Matcher matcher = pattern.matcher(value);
            List<KeyValue> kvs = new ArrayList<>();
            while (matcher.find()) {
            kvs.add(new KeyValue(matcher.group(), file));
        }
        return kvs;
    }

    public static String reduceFunc(String key, String[] values) {
        //  Your code here (Part V)
        return Stream.of(values).sorted().distinct().count() + " " +
                Stream.of(values).sorted().distinct().collect(Collectors.joining(","));
    }

输出则是为每个匹配的单词都创建<word,file>,感觉这里先做一个去重可能会比较好。

map (file, content) -> list(word, file)

这里事实上应该先存Stream.of(values).sorted().distinct()的,不过懒。如果上面做了去重这里就不需要做了。

输出是将所有文件用逗号分隔,前面加上count。

reduce (word, list(file)) -> InvertedIndex

Scheduler

此外,MapReduce显然是多线程的,虽然这是单机Lab,不过还是需要考虑到worker失败的可能性。(master没考虑,论文说打log,做备份,从checkpoint开始重新启动)

public static void schedule(String jobName, String[] mapFiles, int nReduce, JobPhase phase, Channel<String> registerChan) {
        int nTasks = -1; // number of map or reduce tasks
        int nOther = -1; // number of inputs (for reduce) or outputs (for map)
        switch (phase) {
            case MAP_PHASE:
                nTasks = mapFiles.length;
                nOther = nReduce;
                break;
            case REDUCE_PHASE:
                nTasks = nReduce;
                nOther = mapFiles.length;
                break;
        }
        System.out.println(String.format("Schedule: %d %s tasks (%d I/Os)", nTasks, phase, nOther));
        /**
        // All ntasks tasks have to be scheduled on workers. Once all tasks
        // have completed successfully, schedule() should return.
        //
        // Your code here (Part III, Part IV).
        //
        */
            try {
            CountDownLatch latch = new CountDownLatch(nTasks);
            for(int i = 0; i<nTasks;i++){
                new WorkerThread(registerChan,new DoTaskArgs(jobName, mapFiles[i], phase, i, nOther),latch).start();
            }
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(String.format("Schedule: %s done", phase));

    }

这里利用CountDownLatch并发控制,当phase中的所有worker都完成任务后,返回。

这里存在设计问题,理论上只要Mapper输出了中间文件,Reduce就能读取,直到所有文件都读取完毕再开始计算。而Lab版是所有文件输出完毕之后再开始读取。效率肯定是不行的。

    private static class WorkerThread extends Thread {
        final Channel<String> workers;
        final DoTaskArgs args;
        final CountDownLatch latch;

        public WorkerThread(Channel<String> workers, DoTaskArgs args, CountDownLatch latch) {
            this.workers = workers;
            this.args = args;
            this.latch = latch;
        }

        @Override
        public void run() {
                try {
                    String worker = workers.read();
                    Call.getWorkerRpcService(worker).doTask(args);
                    workers.write(worker);
                    latch.countDown();
                } catch (InterruptedException e) {
                    Utils.debug("worker thread interrupted");
                    new WorkerThread(workers,args,latch).start();
                } catch (SofaRpcException e) {
                    Utils.debug("worker failed");
                    new WorkerThread(workers,args,latch).start();
                }
            }
    }

当worker完成之后递减CountDownLock。如果worker故障,简单地新建worker即可。

这得益于Map无状态纯函数的特点,只要输入相同,输出始终相同,幂等性。所以我们只需要和原论文一样重新调度一个worker执行相同task即可。

不过这是因为中间结果输出在本地,如果输出在分布式环境下的话,master需要通知reducer读的位置改变了。(参考论文)

另一种想法是多备份,也就是同一任务交给多个worker进行。


因为听说明年的分布式系统课改成云计算课(吐槽一波,云计算不就是分布式换皮吗),不知道本lab是不是会继续作为作业,所以助教移植的lab框架的代码就不发布了,只分享我自己写的部分。

https://github.com/sjtuzwj/DistributedSystemLab