HBase的Thrift API定义,可以通过链接 http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/resources/org/apache/hadoop/hbase/thrift/Hbase.thrift?view=markup看到,我们需要安装Thrift编译器,才能生成HBase跨语言的API。
首先下载上面链接的内容,保存为Hbase.thrift。
然后,执行如下命令,生成不同编程语言的HBase API:
01 |
[hadoop@master hbase]$ thrift --gen cpp Hbase.thrift |
02 |
[hadoop@master hbase]$ thrift --gen java Hbase.thrift |
03 |
[hadoop@master hbase]$ thrift --gen py Hbase.thrift |
04 |
[hadoop@master hbase]$ thrift --gen perl Hbase.thrift |
05 |
[hadoop@master hbase]$ thrift --gen csharp Hbase.thrift |
06 |
[hadoop@master hbase]$ thrift --gen php Hbase.thrift |
07 |
[hadoop@master hbase]$ thrift --gen js Hbase.thrift |
08 |
[hadoop@master hbase]$ thrift --gen go Hbase.thrift |
09 |
[hadoop@master hbase]$ thrift --gen erl Hbase.thrift |
10 |
[hadoop@master hbase]$ thrift --gen delphi Hbase.thrift |
11 |
[hadoop@master hbase]$ thrift --gen hs Hbase.thrift |
12 |
[hadoop@master hbase]$ thrift --gen html Hbase.thrift |
13 |
[hadoop@master hbase]$ thrift --gen c_glib Hbase.thrift |
14 |
[hadoop@master hbase]$ thrift --gen cocoa Hbase.thrift |
15 |
[hadoop@master hbase]$ thrift --gen rb Hbase.thrift |
16 |
[hadoop@master hbase]$ thrift --gen st Hbase.thrift |
17 |
[hadoop@master hbase]$ thrift --gen xsd Hbase.thrift |
18 |
[hadoop@master hbase]$ ls |
19 |
gen-as3 gen-cocoa gen-csharp gen-erl gen-hs gen-java gen-perl gen-py gen-st Hbase.thrift |
20 |
gen-c_glib gen-cpp gen-delphi gen-go gen-html gen-js gen-php gen-rb gen-xsd |
这里,我们基于Java语言,使用HBase 的Thrift 客户端API访问HBase表。事实上,如果使用Java来实现对HBase表的操作,最好是使用HBase的原生API,无论从性能还是便利性方面,都会提供更好的体验。使用Thrift API访问,实际也是在HBase API之上进行了一层封装,可能初次使用Thrift API感觉很别扭,有时候还要参考Thrift服务端的实现代码。
准备工作如下:
-
- 下载Thrift软件包,解压缩后,拷贝thrift-0.9.0/lib/java/src下面的代码到工作区(开发工具中)
- 将上面生成的gen-java目录中代码拷贝到工作区
- 保证HBase集群正常运行,接着启动HBase的Thrift服务,执行如下命令:
1 |
bin/hbase thrift -b master -p 9090 start |
上面,HBase的Thrift服务端口为9090,下面通过Thrift API访问的时候,需要用到,而不是HBase的服务端口(默认60000)。
接着,实现一个简单的例子,访问Hbase表。
首先,我们通过HBase Shell创建一个表:
1 |
create 'test_info' , 'info' |
表名为test_info,列簇名称为info。
然后,我们开始基于上面生成的Thrift代码来实现对HBase表的操作。
我们在客户端,进行了一层抽象,更加便于传递各种参数,抽象类为AbstractHBaseThriftService,代码如下所示:
01 |
package org.shirdrn.cloud.hbase.thrift; |
03 |
import java.util.List; |
06 |
import org.apache.hadoop.hbase.thrift.generated.Hbase; |
07 |
import org.apache.hadoop.hbase.thrift.generated.TRowResult; |
08 |
import org.apache.thrift.TException; |
09 |
import org.apache.thrift.protocol.TBinaryProtocol; |
10 |
import org.apache.thrift.protocol.TProtocol; |
11 |
import org.apache.thrift.transport.TSocket; |
12 |
import org.apache.thrift.transport.TTransport; |
13 |
import org.apache.thrift.transport.TTransportException; |
15 |
public abstract class AbstractHBaseThriftService { |
17 |
protected static final String CHARSET = "UTF-8" ; |
18 |
private String host = "localhost" ; |
19 |
private int port = 9090 ; |
20 |
private final TTransport transport; |
21 |
protected final Hbase.Client client; |
23 |
public AbstractHBaseThriftService() { |
24 |
transport = new TSocket(host, port); |
25 |
TProtocol protocol = new TBinaryProtocol(transport, true , true ); |
26 |
client = new Hbase.Client(protocol); |
29 |
public AbstractHBaseThriftService(String host, int port) { |
31 |
transport = new TSocket(host, port); |
32 |
TProtocol protocol = new TBinaryProtocol(transport, true , true ); |
33 |
client = new Hbase.Client(protocol); |
36 |
public void open() throws TTransportException { |
37 |
if (transport != null ) { |
43 |
if (transport != null ) { |
48 |
public abstract List<String> getTables() throws TException; |
50 |
public abstract void update(String table, String rowKey, boolean writeToWal, |
51 |
String fieldName, String fieldValue, Map<String, String> attributes) throws TException; |
52 |
public abstract void update(String table, String rowKey, boolean writeToWal, |
53 |
Map<String, String> fieldNameValues, Map<String, String> attributes) throws TException; |
55 |
public abstract void deleteCell(String table, String rowKey, boolean writeToWal, |
56 |
String column, Map<String, String> attributes) throws TException; |
57 |
public abstract void deleteCells(String table, String rowKey, boolean writeToWal, |
58 |
List<String> columns, Map<String, String> attributes) throws TException; |
60 |
public abstract void deleteRow(String table, String rowKey, |
61 |
Map<String, String> attributes) throws TException; |
63 |
public abstract int scannerOpen(String table, String startRow, List<String> columns, |
64 |
Map<String, String> attributes) throws TException; |
65 |
public abstract int scannerOpen(String table, String startRow, String stopRow, List<String> columns, |
66 |
Map<String, String> attributes) throws TException; |
67 |
public abstract int scannerOpenWithPrefix(String table, String startAndPrefix, |
68 |
List<String> columns, Map<String, String> attributes) throws TException; |
69 |
public abstract int scannerOpenTs(String table, String startRow, |
70 |
List<String> columns, long timestamp, Map<String, String> attributes) throws TException; |
71 |
public abstract int scannerOpenTs(String table, String startRow, String stopRow, |
72 |
List<String> columns, long timestamp, Map<String, String> attributes) throws TException; |
74 |
public abstract List<TRowResult> scannerGetList( int id, int nbRows) throws TException; |
75 |
public abstract List<TRowResult> scannerGet( int id) throws TException; |
77 |
public abstract List<TRowResult> getRow(String table, String row, |
78 |
Map<String, String> attributes) throws TException; |
79 |
public abstract List<TRowResult> getRows(String table, |
80 |
List<String> rows, Map<String, String> attributes) throws TException; |
81 |
public abstract List<TRowResult> getRowsWithColumns(String table, |
82 |
List<String> rows, List<String> columns, Map<String, String> attributes) throws TException; |
84 |
public abstract void scannerClose( int id) throws TException; |
87 |
* Iterate result rows(just for test purpose) |
90 |
public abstract void iterateResults(TRowResult result); |
这里,简单叙述一下,我们提供的客户端API的基本功能:
- 建立到Thrift服务的连接:open()
- 获取到HBase中的所有表名:getTables()
- 更新HBase表记录:update()
- 删除HBase表中一行的记录的数据(cell):deleteCell()和deleCells()
- 删除HBase表中一行记录:deleteRow()
- 打开一个Scanner,返回id:scannerOpen()、scannerOpenWithPrefix()和scannerOpenTs();然后用返回的id迭代记录:scannerGetList()和scannerGet()
- 获取一行记录结果:getRow()、getRows()和getRowsWithColumns()
- 关闭一个Scanner:scannerClose()
- 迭代结果,用于调试:iterateResults()
比如,我们想要实现分页的逻辑,可能和传统的关系型数据库操作有些不同。基于HBase表的实现是,首先打开一个Scanner实例(例如调用scannerOpen()),返回一个id,然后再使用该id,调用scannerGetList()方法(可以指定每次返回几条记录的变量nbRows的值),返回一个记录列表,反复调用该scannerGetList()方法,直到此次没有结果返回为止。后面会通过测试用例来实际体会。
现在,我们基于上抽象出来的客户端操作接口,给出一个基本的实现,代码如下所示:
001 |
package org.shirdrn.cloud.hbase.thrift; |
003 |
import java.io.UnsupportedEncodingException; |
004 |
import java.nio.ByteBuffer; |
005 |
import java.util.ArrayList; |
006 |
import java.util.HashMap; |
007 |
import java.util.Iterator; |
008 |
import java.util.List; |
009 |
import java.util.Map; |
010 |
import java.util.Map.Entry; |
012 |
import org.apache.hadoop.hbase.thrift.generated.IOError; |
013 |
import org.apache.hadoop.hbase.thrift.generated.Mutation; |
014 |
import org.apache.hadoop.hbase.thrift.generated.TCell; |
015 |
import org.apache.hadoop.hbase.thrift.generated.TRowResult; |
016 |
import org.apache.thrift.TException; |
019 |
public class HBaseThriftClient extends AbstractHBaseThriftService { |
021 |
public HBaseThriftClient() { |
025 |
public HBaseThriftClient(String host, int port) { |
030 |
public List<String> getTables() throws TException { |
031 |
List<String> list = new ArrayList<String>( 0 ); |
032 |
for (ByteBuffer buf : client.getTableNames()) { |
033 |
byte [] name = decode(buf); |
034 |
list.add( new String(name)); |
039 |
static ByteBuffer wrap(String value) { |
040 |
ByteBuffer bb = null ; |
042 |
bb = ByteBuffer.wrap(value.getBytes(CHARSET)); |
043 |
} catch (UnsupportedEncodingException e) { |
049 |
protected byte [] decode(ByteBuffer buffer) { |
050 |
byte [] bytes = new byte [buffer.limit()]; |
051 |
for ( int i = 0 ; i < buffer.limit(); i++) { |
052 |
bytes[i] = buffer.get(); |
058 |
public void update(String table, String rowKey, boolean writeToWal, |
059 |
Map<String, String> fieldNameValues, Map<String, String> attributes) throws TException { |
060 |
List<Mutation> mutations = new ArrayList<Mutation>(); |
061 |
for (Map.Entry<String, String> entry : fieldNameValues.entrySet()) { |
062 |
mutations.add( new Mutation( false , wrap(entry.getKey()), wrap(entry.getValue()), writeToWal)); |
064 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
065 |
ByteBuffer tableName = wrap(table); |
066 |
ByteBuffer row = wrap(rowKey); |
067 |
client.mutateRow(tableName, row, mutations, wrappedAttributes); |
071 |
public void update(String table, String rowKey, boolean writeToWal, |
072 |
String fieldName, String fieldValue, Map<String, String> attributes) throws IOError, TException { |
073 |
Map<String, String> fieldNameValues = new HashMap<String, String>(); |
074 |
fieldNameValues.put(fieldName, fieldValue); |
075 |
update(table, rowKey, writeToWal, fieldNameValues, attributes); |
080 |
public void deleteCells(String table, String rowKey, boolean writeToWal, |
081 |
List<String> columns, Map<String, String> attributes) throws TException { |
082 |
List<Mutation> mutations = new ArrayList<Mutation>(); |
083 |
for (String column : columns) { |
084 |
mutations.add( new Mutation( false , wrap(column), null , writeToWal)); |
086 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
087 |
ByteBuffer tableName = wrap(table); |
088 |
ByteBuffer row = wrap(rowKey); |
089 |
client.mutateRow(tableName, row, mutations, wrappedAttributes); |
093 |
public void deleteCell(String table, String rowKey, boolean writeToWal, |
094 |
String column, Map<String, String> attributes) throws TException { |
095 |
List<String> columns = new ArrayList<String>( 1 ); |
097 |
deleteCells(table, rowKey, writeToWal, columns, attributes); |
102 |
public void deleteRow(String table, String rowKey, Map<String, String> attributes) throws TException { |
103 |
ByteBuffer tableName = wrap(table); |
104 |
ByteBuffer row = wrap(rowKey); |
105 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
106 |
client.deleteAllRow(tableName, row, wrappedAttributes); |
111 |
public int scannerOpen(String table, String startRow, List<String> columns, |
112 |
Map<String, String> attributes) throws TException { |
113 |
ByteBuffer tableName = wrap(table); |
114 |
List<ByteBuffer> fl = encodeColumns(columns); |
115 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
116 |
return client.scannerOpen(tableName, wrap(startRow), fl, wrappedAttributes); |
120 |
public int scannerOpen(String table, String startRow, String stopRow, List<String> columns, |
121 |
Map<String, String> attributes) throws TException { |
122 |
ByteBuffer tableName = wrap(table); |
123 |
List<ByteBuffer> fl = encodeColumns(columns); |
124 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
125 |
return client.scannerOpenWithStop(tableName, wrap(startRow), wrap(stopRow), fl, wrappedAttributes); |
129 |
public int scannerOpenWithPrefix(String table, String startAndPrefix, List<String> columns, |
130 |
Map<String, String> attributes) throws TException { |
131 |
ByteBuffer tableName = wrap(table); |
132 |
List<ByteBuffer> fl = encodeColumns(columns); |
133 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
134 |
return client.scannerOpenWithPrefix(tableName, wrap(startAndPrefix), fl, wrappedAttributes); |
138 |
public int scannerOpenTs(String table, String startRow, List<String> columns, |
139 |
long timestamp, Map<String, String> attributes) throws TException { |
140 |
ByteBuffer tableName = wrap(table); |
141 |
List<ByteBuffer> fl = encodeColumns(columns); |
142 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
143 |
return client.scannerOpenTs(tableName, wrap(startRow), fl, timestamp, wrappedAttributes); |
147 |
public int scannerOpenTs(String table, String startRow, String stopRow, List<String> columns, |
148 |
long timestamp, Map<String, String> attributes) throws TException { |
149 |
ByteBuffer tableName = wrap(table); |
150 |
List<ByteBuffer> fl = encodeColumns(columns); |
151 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
152 |
return client.scannerOpenWithStopTs(tableName, wrap(startRow), wrap(stopRow), fl, timestamp, wrappedAttributes); |
156 |
public List<TRowResult> scannerGetList( int id, int nbRows) throws TException { |
157 |
return client.scannerGetList(id, nbRows); |
161 |
public List<TRowResult> scannerGet( int id) throws TException { |
162 |
return client.scannerGetList(id, 1 ); |
166 |
public List<TRowResult> getRow(String table, String row, |
167 |
Map<String, String> attributes) throws TException { |
168 |
ByteBuffer tableName = wrap(table); |
169 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
170 |
return client.getRow(tableName, wrap(row), wrappedAttributes); |
174 |
public List<TRowResult> getRows(String table, List<String> rows, |
175 |
Map<String, String> attributes) throws TException { |
176 |
ByteBuffer tableName = wrap(table); |
177 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
178 |
List<ByteBuffer> wrappedRows = encodeRows(rows); |
179 |
return client.getRows(tableName, wrappedRows, wrappedAttributes); |
183 |
public List<TRowResult> getRowsWithColumns(String table, List<String> rows, |
184 |
List<String> columns, Map<String, String> attributes) throws TException { |
185 |
ByteBuffer tableName = wrap(table); |
186 |
List<ByteBuffer> wrappedRows = encodeRows(rows); |
187 |
List<ByteBuffer> wrappedColumns = encodeColumns(columns); |
188 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = encodeAttributes(attributes); |
189 |
return client.getRowsWithColumns(tableName, wrappedRows, wrappedColumns, wrappedAttributes); |
192 |
private List<ByteBuffer> encodeColumns(List<String> columns) { |
193 |
List<ByteBuffer> fl = new ArrayList<ByteBuffer>( 0 ); |
194 |
for (String column : columns) { |
195 |
fl.add(wrap(column)); |
200 |
private Map<ByteBuffer, ByteBuffer> encodeAttributes(Map<String, String> attributes) { |
201 |
Map<ByteBuffer, ByteBuffer> wrappedAttributes = null ; |
202 |
if (attributes != null && !attributes.isEmpty()) { |
203 |
wrappedAttributes = new HashMap<ByteBuffer, ByteBuffer>( 1 ); |
204 |
for (Map.Entry<String, String> entry : attributes.entrySet()) { |
205 |
wrappedAttributes.put(wrap(entry.getKey()), wrap(entry.getValue())); |
208 |
return wrappedAttributes; |
211 |
private List<ByteBuffer> encodeRows(List<String> rows) { |
212 |
List<ByteBuffer> list = new ArrayList<ByteBuffer>( 0 ); |
213 |
for (String row : rows) { |
220 |
public void iterateResults(TRowResult result) { |
221 |
Iterator<Entry<ByteBuffer, TCell>> iter = result.columns.entrySet().iterator(); |
222 |
System.out.println( "RowKey=" + new String(result.getRow())); |
223 |
while (iter.hasNext()) { |
224 |
Entry<ByteBuffer, TCell> entry = iter.next(); |
225 |
System.out.println( "\tCol=" + new String(decode(entry.getKey())) + ", Value=" + new String(entry.getValue().getValue())); |
230 |
public void scannerClose( int id) throws TException { |
231 |
client.scannerClose(id); |
上面代码,给出了基本的实现,接着我们给出测试用例,调用我们实现的客户端操作,与HBase表进行交互。实现的测试用例类如下所示:
001 |
package org.shirdrn.cloud.hbase.thrift; |
003 |
import java.io.UnsupportedEncodingException; |
004 |
import java.nio.ByteBuffer; |
005 |
import java.text.DecimalFormat; |
006 |
import java.util.ArrayList; |
007 |
import java.util.HashMap; |
008 |
import java.util.List; |
009 |
import java.util.Map; |
010 |
import java.util.Random; |
012 |
import org.apache.hadoop.hbase.thrift.generated.IOError; |
013 |
import org.apache.hadoop.hbase.thrift.generated.IllegalArgument; |
014 |
import org.apache.hadoop.hbase.thrift.generated.TRowResult; |
015 |
import org.apache.thrift.TException; |
016 |
import org.apache.thrift.transport.TTransportException; |
020 |
private static final String CHARSET = "UTF-8" ; |
021 |
static DecimalFormat formatter = new DecimalFormat( "00" ); |
022 |
private final AbstractHBaseThriftService client; |
024 |
public Test(String host, int port) { |
025 |
client = new HBaseThriftClient(host, port); |
028 |
} catch (TTransportException e) { |
034 |
this ( "master" , 9090 ); |
037 |
static String randomlyBirthday() { |
038 |
Random r = new Random(); |
039 |
int year = 1900 + r.nextInt( 100 ); |
040 |
int month = 1 + r.nextInt( 12 ); |
041 |
int date = 1 + r.nextInt( 30 ); |
042 |
return String.valueOf(year + "-" + formatter.format(month) + "-" + formatter.format(date)); |
045 |
static String randomlyGender() { |
046 |
Random r = new Random(); |
047 |
int flag = r.nextInt( 2 ); |
048 |
return flag == 0 ? "M" : "F" ; |
051 |
static String randomlyUserType() { |
052 |
Random r = new Random(); |
053 |
int flag = 1 + r.nextInt( 10 ); |
054 |
return String.valueOf(flag); |
057 |
static ByteBuffer wrap(String value) { |
058 |
ByteBuffer bb = null ; |
060 |
bb = ByteBuffer.wrap(value.getBytes(CHARSET)); |
061 |
} catch (UnsupportedEncodingException e) { |
067 |
static DecimalFormat rowKeyFormatter = new DecimalFormat( "0000" ); |
069 |
public void caseForUpdate() throws TException { |
070 |
boolean writeToWal = false ; |
071 |
Map<String, String> attributes = new HashMap<String, String>( 0 ); |
072 |
String table = setTable(); |
074 |
for ( long i = 0 ; i < 10000 ; i++) { |
075 |
String rowKey = rowKeyFormatter.format(i); |
076 |
Map<String, String> fieldNameValues = new HashMap<String, String>(); |
077 |
fieldNameValues.put( "info:birthday" , randomlyBirthday()); |
078 |
fieldNameValues.put( "info:user_type" , randomlyUserType()); |
079 |
fieldNameValues.put( "info:gender" , randomlyGender()); |
080 |
client.update(table, rowKey, writeToWal, fieldNameValues, attributes); |
084 |
public void caseForDeleteCells() throws TException { |
085 |
boolean writeToWal = false ; |
086 |
Map<String, String> attributes = new HashMap<String, String>( 0 ); |
087 |
String table = setTable(); |
089 |
for ( long i = 5 ; i < 10 ; i++) { |
090 |
String rowKey = rowKeyFormatter.format(i); |
091 |
List<String> columns = new ArrayList<String>( 0 ); |
092 |
columns.add( "info:birthday" ); |
093 |
client.deleteCells(table, rowKey, writeToWal, columns, attributes); |
097 |
private String setTable() { |
098 |
String table = "test_info" ; |
102 |
public void caseForDeleteRow() throws TException { |
103 |
Map<String, String> attributes = new HashMap<String, String>( 0 ); |
104 |
String table = setTable(); |
106 |
for ( long i = 5 ; i < 10 ; i++) { |
107 |
String rowKey = rowKeyFormatter.format(i); |
108 |
client.deleteRow(table, rowKey, attributes); |
112 |
public void caseForScan() throws TException { |
113 |
Map<String, String> attributes = new HashMap<String, String>( 0 ); |
114 |
String table = setTable(); |
115 |
String startRow = "0005" ; |
116 |
String stopRow = "0015" ; |
117 |
List<String> columns = new ArrayList<String>( 0 ); |
118 |
columns.add( "info:birthday" ); |
119 |
int id = client.scannerOpen(table, startRow, stopRow, columns, attributes); |
121 |
List<TRowResult> results = client.scannerGetList(id, nbRows); |
122 |
while (results != null && !results.isEmpty()) { |
123 |
for (TRowResult result : results) { |
124 |
client.iterateResults(result); |
126 |
results = client.scannerGetList(id, nbRows); |
128 |
client.scannerClose(id); |
131 |
public void caseForGet() throws TException { |
132 |
Map<String, String> attributes = new HashMap<String, String>( 0 ); |
133 |
String table = setTable(); |
134 |
List<String> rows = new ArrayList<String>( 0 ); |
138 |
List<String> columns = new ArrayList<String>( 0 ); |
139 |
columns.add( "info:birthday" ); |
140 |
columns.add( "info:gender" ); |
141 |
List<TRowResult> results = client.getRowsWithColumns(table, rows, columns, attributes); |
142 |
for (TRowResult result : results) { |
143 |
client.iterateResults(result); |
147 |
public static void main(String[] args) |
148 |
throws IOError, IllegalArgument, TException, UnsupportedEncodingException { |
149 |
Test test = new Test(); |
上面的测试可以实现操作Hbase表数据。另外,在生成的Thrift客户端代码中,org.apache.hadoop.hbase.thrift.generated.Hbase.Iface中给出了全部的服务接口,可以根据需要来选择,客户端org.apache.hadoop.hbase.thrift.generated.Hbase.Client实现了与Thrift交互的一些逻辑的处理,通过该类对象可以代理HBase提供的Thrift服务。