zl程序教程

您现在的位置是:首页 >  其他

当前栏目

8.2.1输入分片InputSplit和输入处理格式FileInputFormat

输入 处理 格式 分片 8.2
2023-09-14 09:07:37 时间

 

1.1.1         输入分片和记录

 

1)输入分片InputSplit接口

 

输入分片一般是文件,也可以数据库中的若干行。记录对应一行数据。输入分片在java表示为InputSplit接口,getlength函数返回大小,用于分片排序,大的先处理。Getlocation函数返回分片位置,让map任务尽量本地化。分片并不包含数据本身,而是指向数据的索引。

 

public abstract class InputSplit {

 

  /**

 

   * Get the size of the split, so that the input splits can be sorted by size.

 

   * @return the number of bytes in the split

 

   * @throws IOException

 

   * @throws InterruptedException

 

   * split的长度用byte表示

 

   */

 

  public abstract long getLength() throws IOException, InterruptedException;

 

 

 

  /**

 

   * Get the list of nodes by name where the data for the split would be local.

 

   * The locations do not need to be serialized.

 

   * 获取split所在的节点

 

   * @return a new array of the node nodes.

 

   * @throws IOException

 

   * @throws InterruptedException

 

   */

 

  public abstract

 

    String[] getLocations() throws IOException, InterruptedException;

 

 

 

  /**

 

   * Gets info about which nodes the input split is stored on and how it is

 

   * stored at each location.

 

   * 返回split所在的节点信息以及在该节点上如何存储 memory

 

   * @return list of <code>SplitLocationInfo</code>s describing how the split

 

   *    data is stored at each location. A null value indicates that all the

 

   *    locations have the data stored on disk.

 

   * @throws IOException

 

   */

 

  @Evolving

 

  public SplitLocationInfo[] getLocationInfo() throws IOException {

 

    return null;

 

  }

 

}

 

2InputFormat输入处理类

 

应用开发人员不直接处理InputSplit,而是InputFormat创建分片并将分片分割为记录。所有InputFormat都要直接或间接的继承InputFormat抽象类。包含分片切割函数getSplits()和创建读取记录操作对象的函数createRecordReader函数。

 

getSplits()方法

 

此方法接受JobContext接受环境信息,得到要处理的文件信息后,进行逻辑切割,产生InputSplit集合返回。然后将分片发送给application master,AM根据分片位置创建map任务。

 

List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;

 

createRecordReader函数

 

根据传入的分片,创建RecordReader对象,RecordReader相当于迭代器,取出记录生成键值对,传给Mapper的 run方法。Run方法中通过context.nextKeyValue()切换迭代器指向的键值。

 

public abstract RecordReader<K,V> createRecordReader(InputSplit split,TaskAttemptContext context) throws IOException, InterruptedException;

 

3FileInputFormat

 

继承于InputFormat,以文件作为数据的源的基类,作用是指出输入文件的位置和实现输入分片的代码,把分片分割成记录是FileInputFormat子类的完成,子类包括CombineFileInputFormat、TextInputFormat、KeyValue TextInputFormat、NLineInputFormat等

 

 

 

4FileInputFormat类的输入路径

 

输入路径:Add方法添加路径,set方法设置路径。一条路径可以是一个文件、一个目录、或二者的集合。默认不递归子目录,需要将mapreduce.input.fileinputformat.input.dir.recuresive设置为true才会递归子目录。public static void addInputPath(Job job,Path path)

 

public static void addInputPaths(Job job,String commaseparatePaths)

 

public static void setInputPaths(Job job,Path… inputPaths)

 

public static void setInputPaths(Job job,String commaseparatePaths)

 

过滤器:还可以用FileInputFormat的setInputPathFilter()方法设置过滤器来过滤非隐藏的文件,默认过滤排除隐藏文件(名称中以“.”和“_”开头文件)。

 

属性设置路径和过滤器:路径和过滤器可以用属性来设置,String任务可以通过-input选项设置路径

 

 

 

5FileInputFormat类的输入分片

 

FileInputFormat会对超过HDFS块的输入进行分片,默认分片大小等于块大小,也可以通过属性设置,分片大小计算:max(minimumSize,min(maximumSize, blockSize))。可以通过调节maximumSize的大小来调节分片大小。

 

 

 

 

 

6)小文件与CombineFileInputFormat

 

Hadoop适合处理大文件,对于大量的小文件,如果每个文件创建一个map任务,会导致map任务开销太大,增加小文件的寻址次数,尽量避免大量小文件处理,所以用CombineInputFormat把多个小文件打包到一个分片,并且考虑多个文件的机架节点关系。

 

7)避免切分

 

如果不希望文件被切分,例如判断文件中记录是否有序,可以让minimumSize值大于最大文件的大小,但是文件的大小不能超过blockSize,或者重写FileInputFormat方法isSplitable()返回为false。下面介绍将多个小文件合成一个大的序列文件的例子:

 

1)自定义完整文件输入处理类如下:

 

Public class WholeFileInputFormat extends FileInputFormat<NullWritable, ByteWritable>

 

{

 

@override//不得分片

 

protected boolean isSplitable(JobContext context,Path file){return false;}

 

       @override

 

       public RecordReader<NullWritable,BytesWritable> createRecordReader ( InputSplit split,TaskAttemptContext context )throws IOException,InterruptedException

 

{

 

  WholeFileRecordReader reader=new WholeFileRecordReader();

 

  reader.initialize(split,context);

 

  return reader;

 

}

 

}

 

2)自定义完整文件读取类WholeFileRecordReader

 

WholeFileRecordReader类通过initialize()方法传入文件信息,然后调用nextKeyValue()方法一次性读取整个文件的内容,通过布尔值processed判断是否读取执行过。其他函数都是返回值。将FileSplit转为一条记录,键为null,值为文件内容。

 

package org.edu.bupt.xiaoye.hadooptest;

 

 

 

import java.io.IOException;

 

 

 

import org.apache.hadoop.conf.Configuration;

 

import org.apache.hadoop.fs.FSDataInputStream;

 

import org.apache.hadoop.fs.FileSystem;

 

import org.apache.hadoop.fs.Path;

 

import org.apache.hadoop.io.BytesWritable;

 

import org.apache.hadoop.io.IOUtils;

 

import org.apache.hadoop.io.NullWritable;

 

import org.apache.hadoop.mapreduce.InputSplit;

 

import org.apache.hadoop.mapreduce.RecordReader;

 

import org.apache.hadoop.mapreduce.TaskAttemptContext;

 

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

 

 

/**

 

 * 继承RecordReader

 

 * 该类用来将分片分割成记录,从而生成key和value。例如TextInputFormat中的key和value就是RecordReader的子类产生的。

 

 * 在这里,我们继承了这个类,将重写了key和value的生成算法。对一个分片来说,只生成一个key-value对。其中key为空,value为该分片

 

 * 的所有内容

 

 * @author Xiaoye

 

 */

 

public class WholeFileRecordReader extends

 

              RecordReader<NullWritable, BytesWritable> {

 

       // 用来盛放传递过来的分片

 

       private FileSplit fileSplit;

 

       private Configuration conf;

 

       //将作为key-value中的value值返回

 

       private BytesWritable value = new BytesWritable();

 

       // 因为只生成一条记录,所以只需要调用一次。因此第一次调用过后将processed赋值为true,从而结束key和value的生成

 

       private boolean processed = false;

 

 

 

       /**

 

        * 设置RecordReader的分片和配置对象。

 

        */

 

       @Override

 

       public void initialize(InputSplit split, TaskAttemptContext context)

 

                     throws IOException, InterruptedException {

 

              this.fileSplit = (FileSplit) split;

 

              this.conf = context.getConfiguration();

 

       }

 

 

 

       /**

 

        * 核心算法

 

        * 用来产生key-value值

 

        * 一次读取整个文件内容注入value对象

 

        */

 

       @Override

 

       public boolean nextKeyValue() throws IOException, InterruptedException {

 

              if (!processed) {

 

                     /*

 

                      * 注意这儿,fileSplit中只是存放着待处理内容的位置 大小等信息,并没有实际的内容

 

                      * 因此这里会通过fileSplit找到待处理文件,然后再读入内容到value中

 

                      */

 

                     byte[] contents = new byte[(int) fileSplit.getLength()];

 

                     Path file = fileSplit.getPath();

 

                     FileSystem fs = file.getFileSystem(conf);

 

                     FSDataInputStream in = null;

 

                     try {

 

                            in = fs.open(file);

 

                            IOUtils.readFully(in, contents, 0, contents.length);

 

                            value.set(contents, 0, contents.length);

 

                     } finally {

 

                            IOUtils.closeStream(in);

 

                     }

 

                     processed = true;

 

                     return true;

 

              }

 

              return false;

 

       }

 

 

 

       @Override

 

       public NullWritable getCurrentKey() throws IOException,

 

                     InterruptedException {

 

              return NullWritable.get();

 

       }

 

 

 

       @Override

 

       public BytesWritable getCurrentValue() throws IOException,

 

                     InterruptedException {

 

              return value;

 

       }

 

 

 

       @Override

 

       public float getProgress() throws IOException, InterruptedException {

 

              return processed ? 1.0f : 0.0f;

 

       }

 

 

 

       @Override

 

       public void close() throws IOException {

 

              //do nothing

 

       }

 

3)将若干个小文件打包成顺序文件的mapreduce作业

 

通过WholeFileRecordReader类读取所有小文件的内容,以文件名称为输出键,以内容未一条记录,然后合并成一个大的顺序文件。

 

public class SmallFilesToSequenceFileConverter extends configured implement Tool

 

{

 

       package com.pzoom.mr.sequence;

 

 

 

import java.io.IOException;

 

import java.util.Random;

 

 

 

import org.apache.hadoop.conf.Configuration;

 

import org.apache.hadoop.fs.Path;

 

import org.apache.hadoop.io.BytesWritable;

 

import org.apache.hadoop.io.NullWritable;

 

import org.apache.hadoop.io.Text;

 

import org.apache.hadoop.mapreduce.InputSplit;

 

import org.apache.hadoop.mapreduce.Job;

 

import org.apache.hadoop.mapreduce.Mapper;

 

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

 

import org.apache.hadoop.mapreduce.lib.input.FileSplit;

 

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 

import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

 

 

 

public class SmallFilesToSequenceFileConverter {

 

///定义map函数

 

       static class SequenceFileMapper extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {

 

              private Text filenameKey;

 

              //定义设置文件函数

 

              @Override

 

              protected void setup(Context context) throws IOException,

 

              InterruptedException {

 

                     InputSplit split = context.getInputSplit();

 

                     Path path = ((FileSplit)split).getPath();

 

                     filenameKey = new Text(path.toString());

 

              }

 

 //定义map函数

 

              @Override

 

              protected void map(NullWritable key, BytesWritable value,

 

                            Context context) throws IOException, InterruptedException {

 

                     context.write(filenameKey, value);

 

              }

 

//定义run函数

 

@Override

 

public int run (String[] args)throws IOException {

 

       Configuration conf = getConf();

 

       if(conf==null)

 

{

 

       return -1;

 

}

 

       Job job=JobBuilder.parseInputAndOutput(this,conf,args);

 

              job.setInputFormatClass(WholeFileInputFormat.class);

 

              job.setOutputFormatClass(SequenceFileOutputFormat.class);输出序列file

 

              job.setOutputKeyClass(Text.class);

 

              job.setOutputValueClass(BytesWritable.class);

 

              job.setMapperClass(SequenceFileMapper.class);

 

             

 

              return job.waitForCompletion(true)? 0:1;}

 

//args传入输入输出路径

 

 public static void main(String[] args) throws IOException{

 

              int exitCode=ToolRunner.run(new SmallFilesToSequenceFileConverter(),args);

 

System.exit(exitCode);

 

                     }

 

}

 

}

 

4)执行小文件合并为大文件的命令

 

各参数含义:采用本地配置文件,两个reduces任务,输入文件夹,输出文件夹

 

%hadoop jar job.jar SmallFilesToSequenceFileConverter –conf conf/Hadoop-localhost.xml –D mapreduece.job.reduces-2 input/smallfiles output

 

5)通过命令来查看输出结果

 

%hadoop fs –conf conf/Hadoop-localhost.xml –text output/part-r-00000

 

 

输出结果是以小文件路径为键,以内容为值的合并序列文件

 

8)分片中的文件信息

 

可以调用Mapper的Context对象的getInputSplit()方法获取InputSplit对象,强制转为FileSplit,可以从对象中获取分片所属文件的信息。

 

自己开发了一个股票智能分析软件,功能很强大,需要的点击下面的链接获取:

https://www.cnblogs.com/bclshuai/p/11380657.html