zl程序教程

您现在的位置是:首页 >  其它

当前栏目

如何在MaxCompute上运行HadoopMR作业

如何 运行 作业 MaxCompute
2023-09-14 09:00:28 时间
本文用到的

阿里云数加-大数据计算服务MaxCompute产品地址:https://www.aliyun.com/product/odps

MaxCompute(原ODPS)有一套自己的MapReduce编程模型和接口,简单说来,这套接口的输入输出都是MaxCompute中的Table,处理的数据是以Record为组织形式的,它可以很好地描述Table中的数据处理过程,然而与社区的Hadoop相比,编程接口差异较大。Hadoop用户如果要将原来的Hadoop MR作业迁移到MaxCompute的MR执行,需要重写MR的代码,使用MaxCompute的接口进行编译和调试,运行正常后再打成一个Jar包才能放到MaxCompute的平台来运行。这个过程十分繁琐,需要耗费很多的开发和测试人力。如果能够完全不改或者少量地修改原来的Hadoop MR代码就能在MaxCompute平台上跑起来,将是一个比较理想的方式。

现在MaxCompute平台提供了一个HadoopMR到MaxCompute MR的适配工具,已经在一定程度上实现了Hadoop MR作业的二进制级别的兼容,即用户可以在不改代码的情况下通过指定一些配置,就能将原来在Hadoop上运行的MR jar包拿过来直接跑在MaxCompute上。该插件的下载地址在:http://repo.aliyun.com/download/hadoop2openmr-1.0.jar,目前该插件处于测试阶段,暂时还不能支持用户自定义comparator和自定义key类型,下面将以WordCount程序为例,介绍一下这个插件的基本使用方式。

使用该插件在MaxCompute平台跑一个HadoopMR作业的基本步骤如下:

1. 下载HadoopMR的插件

通过http://repo.aliyun.com/download/hadoop2openmr-1.0.jar下载插件,包名为hadoop2openmr-1.0.jar,注意,这个jar里面已经包含hadoop-2.7.2版本的相关依赖,在作业的jar包中请不要携带hadoop的依赖,避免版本冲突。

2. 准备好HadoopMR的程序jar包

编译导出WordCount的jar包:wordcount_test.jar ,wordcount程序的源码如下:


package com.aliyun.odps.mapred.example.hadoop;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import java.util.StringTokenizer;

public class WordCount {

 public static class TokenizerMapper

 extends Mapper Object, Text, Text, IntWritable {

 private final static IntWritable one = new IntWritable(1);

 private Text word = new Text();

 public void map(Object key, Text value, Context context

 ) throws IOException, InterruptedException {

 StringTokenizer itr = new StringTokenizer(value.toString());

 while (itr.hasMoreTokens()) {

 word.set(itr.nextToken());

 context.write(word, one);

 public static class IntSumReducer

 extends Reducer Text,IntWritable,Text,IntWritable {

 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable IntWritable values,

 Context context

 ) throws IOException, InterruptedException {

 int sum = 0;

 for (IntWritable val : values) {

 sum += val.get();

 result.set(sum);

 context.write(key, result);

 public static void main(String[] args) throws Exception {

 Configuration conf = new Configuration();

 Job job = Job.getInstance(conf, "word count");

 job.setJarByClass(WordCount.class);

 job.setMapperClass(TokenizerMapper.class);

 job.setCombinerClass(IntSumReducer.class);

 job.setReducerClass(IntSumReducer.class);

 job.setOutputKeyClass(Text.class);

 job.setOutputValueClass(IntWritable.class);

 FileInputFormat.addInputPath(job, new Path(args[0]));

 FileOutputFormat.setOutputPath(job, new Path(args[1]));

 System.exit(job.waitForCompletion(true) ? 0 : 1);

}
3. 测试数据准备 创建输入表和输出表
create table if not exists wc_in(line string);

create table if not exists wc_out(key string, cnt bigint);
通过tunnel将数据导入输入表中

待导入文本文件data.txt的数据内容如下:

hello maxcompute

hello mapreduce

例如可以通过如下命令将data.txt的数据导入wc_in中,

tunnel upload data.txt wc_in;
4. 准备好表与hdfs文件路径的映射关系配置

配置文件命名为:wordcount-table-res.conf

{

 "file:/foo": {

 "resolver": {

 "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver",

 "properties": {

 "text.resolver.columns.combine.enable": "true",

 "text.resolver.seperator": "\t"

 "tableInfos": [

 "tblName": "wc_in",

 "partSpec": {},

 "label": "__default__"

 "matchMode": "exact"

 "file:/bar": {

 "resolver": {

 "resolver": "com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver",

 "properties": {

 "binary.resolver.input.key.class" : "org.apache.hadoop.io.Text",

 "binary.resolver.input.value.class" : "org.apache.hadoop.io.LongWritable"

 "tableInfos": [

 "tblName": "wc_out",

 "partSpec": {},

 "label": "__default__"

 "matchMode": "fuzzy"

}

配置项说明:

整个配置是一个json文件,描述hdfs上文件与maxcompute上表之间的映射关系,一般要配置输入和输出两部分,一个HDFS路径对应一个resolver配置,tableInfos配置以及matchMode配置。


resolver: 用于配置如何对待文件中的数据,目前有com.aliyun.odps.mapred.hadoop2openmr.resolver.TextFileResolver和com.aliyun.odps.mapred.hadoop2openmr.resolver.BinaryFileResolver两个内置的resolver可以选用。除了指定好resolver的名字,还需要为相应的resolver配置一些properties指导它正确的进行数据解析。

TextFileResolver: 对于纯文本的数据,输入输出都会当成纯文本对待。当作为输入resolver配置时,需要配置的properties有text.resolver.columns.combine.enable和text.resolver.seperator,当text.resolver.columns.combine.enable配置为true时,会把输入表的所有列按找text.resolver.seperator指定的分隔符组合成一个字符串作为输入。否则,会把输入表的前两列分别作为key,value。 BinaryFileResolver: 可以处理二进制的数据,自动将数据转换为maxcompute可以支持的数据类型,如bigint, bool, double等。当作为输出resolver配置时,需要配置的properties有binary.resolver.input.key.class和binary.resolver.input.value.class,分别代表中间结果的key和value类型。
tableInfos:用户配置HDFS对应的maxcompute的表,目前只支持配置表的名字tblName,而partSpec和label请保持和示例一致。 matchMode:路径的匹配模式,可选项为exact和fuzzy,分别代表精确匹配和模糊匹配,如果设置为fuzzy,则可以通过正则来匹配HDFS的输入路径 5. 使用MaxCompute命令行工具odpscmd提交作业

maxcompute命令行工具的安装和配置方法参考:http://repo.aliyun.com/odpscmd/

在maxcompute的命令行下运行如下命令:

jar -DODPS_HADOOPMR_TABLE_RES_CONF=./wordcount-table-res.conf -classpath hadoop2openmr-1.0.jar,wordcount_test.jar com.aliyun.odps.mapred.example.hadoop.WordCount /foo /bar; 

这里假设我们已经将hadoop2openmr-1.0.jar和wordcount_test.jar以及wordcount-table-res.conf已经放置到odpscmd的当前目录,否则在指定配置和-classpath的路径时需要做相应的修改。

运行过程如下图所示:

1

当作业运行完成后,可以查看结果表wc_out里面的内容,验证作业成功结束,结果符合预期。

2

欢迎加入MaxCompute钉钉群讨论

42559c7dde62e4d333c90e02efdf416257a4be27


MaxCompute中如何通过logview诊断慢作业 MaxCompute致力于批量结构化数据的存储和计算,提供海量数据仓库的解决方案及分析建模服务,在MaxCompute执行sql任务的时候有时候作业会很慢,本文通过查看logview排查具体任务慢的原因
双11史上作业数最多,人工干预最少—MaxCompute 自适应执行引擎DAG2.0为1.5亿分布式作业提供自动化保障 阿里巴巴双11史上作业数最多,但人工干预最少的一次双11技术保障: 从动态并发调整带来的单日10亿计算节点的节省,到数据智能编排带来的高级基线单个作业数十个小时执行时间的缩短,再到全新Bubble执行模式在百万作业获取的30%以上的性能提升。 本文为大家介绍,在2020年阿里巴巴集团双11海量作业数目与突变的数据特性面前,阿里云MaxCompute 新一代 DAG 2.0 执行引擎,通过其自适应的动态执行能力和新架构上的全新计算模式,如何为双11大促数据的及时产出提供自动化保障。
Maxcompute Spark作业管控利器—Cupid Console Cupid Console是MaxCompute客户端 0.33.1 及更新版本新增的一个插件,下载MaxCompute客户端最新版本:https://github.com/aliyun/aliyun-odps-console/releases,配置好后,运行odpscmd,针对spark作业的管控,Cupid Console增加了一组spark的命令,可以通过运行help spark查看这些命令的用法。
MaxCompute Studio使用心得系列7——作业对比 在数据开发过程中,我们通常需要将两个作业进行对比从而定位作业运行性能或者结果有差异的问题,但是对比作业时需要同时打开两个studio 的tab页,或者两个Logview页,不停切换进行对比,使用起来非常的不方便。
使用 top instance 命令查看运行中 MaxCompute 作业 我们都知道,在 MaxCompute Console 里,可以使用下面的命令来列出运行完成的 instance 列表。 show p|proc|processlist [from yyyy-MM-dd ] [to yyyy-MM-dd ] [-p project ] [-limit nu...
使用MaxCompute Java SDK运行安全相关命令 使用MaxCompute Console的同学,可能都使用过MaxCompute安全相关的命令。官方文档上有详细的MaxCompute安全指南,并给出了安全相关语句汇总。
简而言之,权限管理、列级别访问控制、项目空间安全配置以及跨项目空间的资源分享都属于 odps 安全命令相关的范畴。
一探究竟:善用 MaxCompute Studio 分析 SQL 作业 头疼的问题 MaxCompute 用户一个常见的问题是:同一个周期任务,为什么最近几天比之前慢了很多?或者为什么之前都能按时产出的作业最近经常破线? 通常来说,引起作业执行变慢的原因有:quota 组资源不足、输入数据量变动、数据分布情况变动、平台硬件故障引发重跑等。
在 MaxCompute UDF 中运行 Scipy 新版 MaxCompute Isolation Session 支持 Python UDF。也就是说,Python UDF 中已经可以跑二进制包。刚才以 Scipy 为例踩了一下坑,把相关的过程分享出来。
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。