那个,最近在写这个Flink SQL对接HBase,但是在对接的时候,FlinkSQL需要用到我们自己实现的InputFormat接口的一个HBase的InputFormat。然后我自己写了一个继承RichInputFormat的HBaseInputFormat类,是按照JDBCInputFormat来写的。但是着实是不知道后面怎么处理了,有没有大神帮我看一下?
public class HBaseInputFormat extends RichInputFormat<ResultScanner , InputSplit> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(HBaseInputFormat.class);
private org.apache.hadoop.conf.Configuration conf = null;
private Connection connection = null;
private Admin admin = null;
private String tableName ;
private String quorum ;
private String clientPort ;
private ResultScanner scanner ;
@Override
public void configure(Configuration parameters) {
//do nothing here
}
@Override
public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
return cachedStatistics;
}
@Override
public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
if (minNumSplits < 1 ){
throw new IllegalArgumentIOException("Number of input splits must be at least 1.");
}
minNumSplits = (this instanceof NonParallelInput ) ? 1 : minNumSplits ;
GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
for (int i = 0 ; i < splits.length ; i ++ ){
splits[i] = new GenericInputSplit(i , minNumSplits) ;
}
return splits;
}
@Override
public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
return new DefaultInputSplitAssigner(inputSplits);
}
// 打开与HBase的连接
@Override
public void openInputFormat() {
System.out.println("openInputFormat...");
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum" , quorum);
conf.set("hbase.zookeeper.property.clientPort" , clientPort);
try {
connection = ConnectionFactory.createConnection(conf);
} catch (IOException e) {
throw new IllegalArgumentException("connection failed " + e.getMessage() , e) ;
}
try {
admin = connection.getAdmin() ;
} catch (IOException e) {
throw new IllegalArgumentException("admin failed " + e.getMessage() , e) ;
}
}
@Override
public void closeInputFormat() {
if (connection != null ){
try {
connection.close();
} catch (IOException e) {
throw new IllegalArgumentException("connection closed failed " + e.getMessage() , e) ;
}
}
if (admin != null ){
try {
admin.close();
} catch (IOException e) {
throw new IllegalArgumentException("admin closed failed " + e.getMessage() , e) ;
}
}
}
@Override
public void open(InputSplit split) throws IOException {
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scanner = table.getScanner(scan);
}
@Override
public boolean reachedEnd() throws IOException {
if (scanner.next() != null) {return true;}
return false ;
}
@Override
public ResultScanner nextRecord(ResultScanner reuse) throws IOException {
return reuse;
}
@Override
public void close() throws IOException {
if (scanner == null ){
return ;
}
try {
scanner.close();
}catch (Exception e){
LOG.info("Inputformat ResultScanner couldn't be closed - " + e.getMessage());
}
}
@VisibleForTesting
Admin getAdmin(){ return admin ;}
@VisibleForTesting
Connection getConnection(){return connection ;}
public static HBaseInputFormatBuilder buildHBaseInputFormat() {
return new HBaseInputFormatBuilder();
}
public static class HBaseInputFormatBuilder {
private final HBaseInputFormat format ;
public HBaseInputFormatBuilder(){
this.format = new HBaseInputFormat();
}
public HBaseInputFormatBuilder setQuorum(String quorum) {
format.quorum = quorum ;
return this;
}
public HBaseInputFormatBuilder setClientPort(String clientPort){
format.clientPort = clientPort ;
return this ;
}
public HBaseInputFormatBuilder setTableName(String tableName){
format.tableName = tableName ;
return this ;
}
public HBaseInputFormatBuilder open(){
format.openInputFormat();
return this ;
}
public HBaseInputFormat finish(){
if (format.quorum == null ){
LOG.info("quorum was not supplied separately.");
}
if (format.clientPort == null ){
LOG.info("clientPort was not supplied separately.");
}
if (format.tableName == null ){
LOG.info("tableName was not supplied separately.");
}
if (format.connection == null ){
throw new IllegalArgumentException("No connection supplied");
}
if (format.admin == null ){
throw new IllegalArgumentException("No admin supplied");
}
return format ;
}
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。