3

MapReduce将HDFS文本数据导入HBase中

 2 years ago
source link: https://songlee24.github.io/2015/08/13/hdfs-import-to-hbase/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

MapReduce将HDFS文本数据导入HBase中

发表于 2015-08-13

  |   分类于 大数据-HBase

  |  

HBase本身提供了很多种数据导入的方式,通常有两种常用方式:

  1. 使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase
  2. 另一种方式就是使用HBase原生Client API

本文就是示范如何通过MapReduce作业从一个文件读取数据并写入到HBase中。

首先启动Hadoop与HBase,然后创建一个空表,用于后面导入数据:

hbase(main):006:0> create 'mytable','cf'
0 row(s) in 10.8310 seconds

=> Hbase::Table - mytable
hbase(main):007:0> list
TABLE
mytable
1 row(s) in 0.1220 seconds

=> ["mytable"]
hbase(main):008:0> scan 'mytable'
ROW COLUMN+CELL
0 row(s) in 0.2130 seconds

一、示例程序

下面的示例程序通过TableOutputFormat将HDFS上具有一定格式的文本数据导入到HBase中。

首先创建MapReduce作业,目录结构如下:

Hdfs2HBase/
├── classes
└── src
├── Hdfs2HBase.java
├── Hdfs2HBaseMapper.java
└── Hdfs2HBaseReducer.java

Hdfs2HBaseMapper.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class Hdfs2HBaseMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text line, Context context) throws IOException,InterruptedException {
String lineStr = line.toString();
int index = lineStr.indexOf(":");
String rowkey = lineStr.substring(0, index);
String left = lineStr.substring(index+1);
context.write(new Text(rowkey), new Text(left));
}
}

Hdfs2HBaseReducer.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class Hdfs2HBaseReducer extends Reducer<Text, Text, ImmutableBytesWritable, Put> {
public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
String k = rowkey.toString();
for(Text val : value) {
Put put = new Put(k.getBytes());
String[] strs = val.toString().split(":");
String family = strs[0];
String qualifier = strs[1];
String v = strs[2];
put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
context.write(new ImmutableBytesWritable(k.getBytes()), put);
}
}
}

Hdfs2HBase.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hdfs2HBase {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <infile> <table>");
System.exit(2);
}

Job job = new Job(conf, "hdfs2hbase");
job.setJarByClass(Hdfs2HBase.class);
job.setMapperClass(Hdfs2HBaseMapper.class);
job.setReducerClass(Hdfs2HBaseReducer.class);

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);

job.setOutputFormatClass(TableOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);

System.exit(job.waitForCompletion(true)?0:1);
}
}

配置javac编译依赖环境:

$HADOOP_HOME/share/hadoop/common/hadoop-common-2.4.1.jar
$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.4.1.jar
$HADOOP_HOME/share/hadoop/common/lib/commons-cli-1.2.jar

这里要操作HBase,故除了上面三个jar包,还需要$HBASE_HOME/lib目录下的jar包。为了方便,我们在/etc/profileCLASSPATH里包含所有的依赖包:

TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:/home/hadoop/hadoop/share/hadoop/common/hadoop-common-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.6.0.jar:/home/hadoop/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$HBASE_JARS

编译

$ javac -d classes/ src/*.java

打包

$ jar -cvf hdfs2hbase.jar classes

运行

创建一个data.txt文件,内容如下(列族是建表时创建的列族cf):

r1:cf:c1:value1 
r2:cf:c2:value2
r3:cf:c3:value3

将文件复制到hdfs上:

$ hadoop/bin/hadoop fs -put data.txt /hbase

运行MapReduce作业:

$ hadoop/bin/hadoop jar Hdfs2HBase/hdfs2hbase.jar com.lisong.hdfs2hbase.Hdfs2HBase /hbase/data.txt mytable

报错NoClassDefFoundError找不到类定义:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/io/ImmutableBytesWritable
at com.lisong.hdfs2hbase.Hdfs2HBase.main(Hdfs2HBase.java:30)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more

原因是我没有把HBase的jar包加到hadoop-env.sh中。

TEMP=`ls /home/hadoop/hbase/lib/*.jar`
HBASE_JARS=`echo $TEMP | sed 's/ /:/g'`
HADOOP_CLASSPATH=$HBASE_JARS

再次运行发现又报了Unable to initialize MapOutputCollector的错误:

15/08/10 08:55:44 WARN mapred.MapTask: Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer
java.lang.NullPointerException
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)
...
at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapred.LocalJobRunner: map task executor complete.
15/08/10 08:55:44 WARN mapred.LocalJobRunner: job_local2138114942_0001
java.lang.Exception: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)
Caused by: java.io.IOException: Unable to initialize any output collector
at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)
...
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/08/10 08:55:44 INFO mapreduce.Job: Job job_local2138114942_0001 failed with state FAILED due to: NA
15/08/10 08:55:45 INFO mapreduce.Job: Counters: 0

原因是我没有指明Map输出的Key/Value类型,在Hdfs2HBase.java中添加以下两句:

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

如果没有专门定义Mapper输出类型的话,job.setOutputKeyClassjob.setOutputValueClass设置的是Mapper和Reducer两个的输出类型。

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);

而Hdfs2HBaseMapper输出类型是Text/Text,所以这里需要单独指定。


修改Hdfs2HBase.java

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hdfs2HBase {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage: wordcount <infile> <table>");
System.exit(2);
}

Job job = new Job(conf, "hdfs2hbase");
job.setJarByClass(Hdfs2HBase.class);
job.setMapperClass(Hdfs2HBaseMapper.class);
job.setReducerClass(Hdfs2HBaseReducer.class);

job.setMapOutputKeyClass(Text.class); // +
job.setMapOutputValueClass(Text.class); // +

job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Put.class);

job.setOutputFormatClass(TableOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, otherArgs[1]);

System.exit(job.waitForCompletion(true)?0:1);
}
}

再次编译、打包,然后运行成功!

查询HBase表,验证数据是否已导入:

hbase(main):001:0> scan 'mytable'
ROW COLUMN+CELL
r1 column=cf:c1, timestamp=1439223857492, value=value1
r2 column=cf:c2, timestamp=1439223857492, value=value2
r3 column=cf:c3, timestamp=1439223857492, value=value3
3 row(s) in 1.3820 seconds

可以看到,数据导入成功!

由于需要频繁的与存储数据的RegionServer通信,占用资源较大,一次性入库大量数据时,TableOutputFormat效率并不好。

二、拓展-TableReducer

我们可以将Hdfs2HBaseReducer.java代码改成下面这样,作用是一样的:

package com.lisong.hdfs2hbase;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class Hdfs2HBaseReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
public void reduce(Text rowkey, Iterable<Text> value, Context context) throws IOException,InterruptedException {
String k = rowkey.toString();
for(Text val : value) {
Put put = new Put(k.getBytes());
String[] strs = val.toString().split(":");
String family = strs[0];
String qualifier = strs[1];
String v = strs[2];
put.add(family.getBytes(), qualifier.getBytes(), v.getBytes());
context.write(new ImmutableBytesWritable(k.getBytes()), put);
}
}
}

这里直接继承了TableReducerTableReducer是部分特例化的Reducer,它只有三个类型参数:输入Key/Value是对应Mapper的输出,输出Key可以是任意的类型,但是输出Value必须是一个PutDelete实例。

编译打包运行,结果与前面的一样!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK