将mr写到Hbase上
HBase MR
2023-09-27 14:28:01 时间
新建maven项目
导入依赖
project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" modelVersion 4.0.0 /modelVersion groupId com.100 /groupId artifactId MRHbasetest /artifactId version 0.0.1-SNAPSHOT /version?xml version="1.0" encoding="UTF-8"? ?xml-stylesheet type="text/xsl" href="configuration.xsl"? Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. See accompanying LICENSE file. !-- Put site-specific property overrides in this file. -- configuration property name fs.defaultFS /name value hdfs://master:9000 /value /property property name io.file.buffer.size /name value 131072 /value /property property name hadoop.tmp.dir /name value file:/usr/temp /value /property property name hadoop.proxyuser.root.hosts /name value * /value /property property name hadoop.proxyuser.root.groups /name value * /value /property /configurationhbase-site.xml?xml version="1.0"? ?xml-stylesheet type="text/xsl" href="configuration.xsl"? configuration property name hbase.zookeeper.quorum /name value master,slave1,slave2 /value description The directory shared by RegionServers. /description /property property name hbase.zookeeper.property.clientPort /name value 2181 /value /property /configurationlog4j.properties# Global logging configuration log4j.rootLogger=INFO, stdout # MyBatis logging configuration... log4j.logger.org
log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%nimport java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableReducer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; //计算wordcount 把结果保存到hbase里面 //bd17:wc 列簇:c 列名称 count 用单词table public class MRToHbase { public static class MrToBaseMap extends Mapper LongWritable, Text, Text, IntWritable { private static final IntWritable ONE = new IntWritable(1); private String[] info; private Text outputKey = new Text(); @Override protected void map(LongWritable key, Text value, Mapper LongWritable, Text, Text, IntWritable .Context context) throws IOException, InterruptedException { info = value.toString().split("\\s"); for (String word : info) { if(word.length()!=0){ outputKey.set(word); context.write(outputKey, ONE); // reducer 类需要继承自hbase api 中提供的tablereducer 类型 public static class MrToHBaseReduce extends TableReducer Text, IntWritable, NullWritable { private int sum; private NullWritable outputKey = NullWritable.get(); private Put outputValue; @Override protected void reduce(Text key, Iterable IntWritable values, Reducer Text, IntWritable, NullWritable, Mutation .Context context) throws IOException, InterruptedException { sum = 0; for (IntWritable value : values) { System.out.println(value.toString()); sum += value.get(); // 构建put对象 即往hbase里面插入一条数据的具体内容 // 构建put对象 即往hbase里面插入一条数据的具体内容 outputValue =new Put(Bytes.toBytes(key.toString())); outputValue.addColumn(Bytes.toBytes("c"), Bytes.toBytes("count"), Bytes.toBytes(sum+"")); context.write(outputKey, outputValue); //main 方法启动 并且设置hbase链接和输出格式 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //使用hbaseconfiguration 来创建job的配置对象 Configuration configuration =HBaseConfiguration.create(); Job job =Job.getInstance(configuration); job.setJarByClass(MRToHbase.class); job.setJobName("wordcount写入到hbase"); job.setMapperClass(MrToBaseMap.class); job.setReducerClass(MrToHBaseReduce.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Mutation.class); //使用TableMapReduceUtil 工具类来做与hbase 交互的mr的初始化设置 TableMapReduceUtil.initTableReducerJob("bd17:wc", MrToHBaseReduce.class, job); FileInputFormat.addInputPath(job, new Path("/reversetext/reverse1.txt")); System.exit(job.waitForCompletion(true)?0:1); }
Mysql 流增量写入 Hdfs(二) --Storm + hdfs 的流式处理 一. 概述 上一篇我们介绍了如何将数据从 mysql 抛到 kafka,这次我们就专注于利用 storm 将数据写入到 hdfs 的过程,由于 storm 写入 hdfs 的可定制东西有些多,我们先不从 kafka 读取,而先自己定义一个 Spout 数据充当数据源,下章再进行整合。