不积跬步,无以至千里!
这几天狂看mapreduce对hbase进行操作的例子,消化吸收,熟能生巧,掌握mapreduce和hbase交互的各个细节,以及整体流程等,整体流程等年前写一篇总结下,这几天先狂看吧
看,复制,修改,运行,遇到各种问题,解决,慢慢的就熟了。
这个类是干啥的呢,其实就是对hbase的某表进行简单操作,不过用的是mapreduce,即效率高,当然,看这个,主要吸收其用mapreduce操作hbase的思路及如何编程。
这个类功能是:将表中所有行 列族f1的某列q1的值倒序后放在另一列族f2的列q2,所谓倒序即 abc变成cba,456变成654,其中q1和q2是可以相同的,因为不同列族嘛。
上代码了:
[java] view plaincopyprint?
package mapreduce.hbase;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class TestMRHBase {
private static final Log LOG = LogFactory.getLog(TestMRHBase.class);
static final String MULTI_REGION_TABLE_NAME = "mr_test"; // 表名
static final byte[] INPUT_FAMILY = Bytes.toBytes("contents");// 原列族名
static final byte[] OUTPUT_FAMILY = Bytes.toBytes("text");// 写入的列族名
static final byte[] QUALIFIER_NAME = Bytes.toBytes("data");// 列名
public static class TestMap extends
TableMapper {
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
if (value.size() != 1) {
throw new IOException("There should only be one column! ");
}
Map<byte[], NavigableMap<byte[], NavigableMapbyte[]>>> cf = value
.getMap();
if (!cf.containsKey(INPUT_FAMILY)) {
throw new IOException("Wrong input columns. Missing: '"
+ Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
String originalValue = new String(value.getValue(INPUT_FAMILY,
QUALIFIER_NAME), HConstants.UTF8_ENCODING);
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected
Put outval = new Put(key.get());
outval.add(OUTPUT_FAMILY, QUALIFIER_NAME, Bytes.toBytes(newValue
.toString()));
context.write(key, outval);
}
}
public void testMultiRegionTable() throws IOException,
InterruptedException, ClassNotFoundException {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "node2,node4,node3");
runTestOnTable(new HTable(new Configuration(conf),
MULTI_REGION_TABLE_NAME));
}
private void runTestOnTable(HTable table) throws IOException,
InterruptedException, ClassNotFoundException {
Job job = null;
try {
LOG.info("Before map/reduce startup");
job = new Job(table.getConfiguration(), "process column contents");
job.setNumReduceTasks(1);
job.setJarByClass(TestMRHBase.class);
Scan scan = new Scan();
scan.addFamily(INPUT_FAMILY);
TableMapReduceUtil.initTableMapperJob(Bytes.toString(table
.getTableName()), scan, TestMap.class,
ImmutableBytesWritable.class, Put.class, job);
TableMapReduceUtil.initTableReducerJob(Bytes.toString(table
.getTableName()), IdentityTableReducer.class, job);
FileOutputFormat.setOutputPath(job, new Path("test"));
LOG.info("Started " + Bytes.toString(table.getTableName()));
job.waitForCompletion(true);
LOG.info("After map/reduce completion");
// verify map-reduce results
// verify(Bytes.toString(table.getTableName()));
} finally {
// mrCluster.shutdown();
if (job != null) {
FileUtil.fullyDelete(new File(job.getConfiguration().get(
"hadoop.tmp.dir")));
}
}
}
public static void main(String[] args) throws Exception {
TestMRHBase testMRHBase = new TestMRHBase();
testMRHBase.testMultiRegionTable();
}
}
过程中遇到一些异常,唉…… 一定要长记性啊,否则,大好青春不能总被同样的错误耗费掉啊
1.classnotfoundexception:又是这个,这次卖QQ号码平台的错误是因为没job.setJarByClass
2.java.lang.NoSuchMethodException: mapreduce.hbase.TestMRHBase$TestMap.()
这是为什么呢?
因为类中类TestMap没有设置成static类型,结果map任务时找不到此map的init,只要类中map类设成static即可
其他的一个小问题就不说了。
在拿来别人的东西时,如果想修改下,一定要读透了
取其精华,去其糟粕