开发者社区> 问答> 正文

Flink使用批处理,DataSet获取HBase的数据,自定义的数据集该怎么写?

那个,最近在写这个Flink SQL对接HBase,但是在对接的时候,FlinkSQL需要用到我们自己实现的InputFormat接口的一个HBase的InputFormat。然后我自己写了一个继承RichInputFormat的HBaseInputFormat类,是按照JDBCInputFormat来写的。但是着实是不知道后面怎么处理了,有没有大神帮我看一下?

public class HBaseInputFormat extends RichInputFormat<ResultScanner , InputSplit>  {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(HBaseInputFormat.class);

    private org.apache.hadoop.conf.Configuration conf = null;
    private Connection connection = null;
    private Admin admin = null;
    private String tableName ;
    private String quorum ;
    private String clientPort ;
    private ResultScanner scanner ;

    @Override
    public void configure(Configuration parameters) {
        //do nothing here
    }

    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return cachedStatistics;
    }

    @Override
    public InputSplit[] createInputSplits(int minNumSplits) throws IOException {
        if (minNumSplits < 1 ){
            throw new IllegalArgumentIOException("Number of input splits must be at least 1.");
        }
        minNumSplits = (this instanceof NonParallelInput ) ? 1 : minNumSplits ;
        GenericInputSplit[] splits = new GenericInputSplit[minNumSplits];
        for (int i = 0 ; i < splits.length ; i ++ ){
            splits[i] = new GenericInputSplit(i , minNumSplits) ;
        }
        return splits;
    }

    @Override
    public InputSplitAssigner getInputSplitAssigner(InputSplit[] inputSplits) {
        return new DefaultInputSplitAssigner(inputSplits);
    }
    // 打开与HBase的连接
    @Override
    public void openInputFormat()  {
        System.out.println("openInputFormat...");
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum" , quorum);
        conf.set("hbase.zookeeper.property.clientPort" , clientPort);
        try {
            connection = ConnectionFactory.createConnection(conf);
        } catch (IOException e) {
           throw new IllegalArgumentException("connection failed " + e.getMessage() , e) ;
        }
        try {
            admin = connection.getAdmin() ;
        } catch (IOException e) {
            throw new IllegalArgumentException("admin failed " + e.getMessage() , e) ;
        }
    }

    @Override
    public void closeInputFormat()  {
        if (connection != null ){
            try {
                connection.close();
            } catch (IOException e) {
                throw new IllegalArgumentException("connection closed  failed " + e.getMessage() , e) ;
            }
        }
        if (admin != null ){
            try {
                admin.close();
            } catch (IOException e) {
                throw new IllegalArgumentException("admin closed  failed " + e.getMessage() , e) ;
            }
        }

    }

    @Override
    public void open(InputSplit split) throws IOException {
        Table table = connection.getTable(TableName.valueOf(tableName));
        Scan scan = new Scan();
        scanner = table.getScanner(scan);
    }

    @Override
    public boolean reachedEnd() throws IOException {
        if (scanner.next() != null) {return true;}
        return false ;
    }

    @Override
    public ResultScanner nextRecord(ResultScanner reuse) throws IOException {
        return reuse;
    }

    @Override
    public void close() throws IOException {
        if (scanner == null ){
            return ;
        }
        try {
            scanner.close();
        }catch (Exception e){
            LOG.info("Inputformat ResultScanner couldn't be closed - " + e.getMessage());
        }
    }
    @VisibleForTesting
    Admin getAdmin(){ return admin ;}
    @VisibleForTesting
    Connection getConnection(){return connection ;}
    public static HBaseInputFormatBuilder buildHBaseInputFormat() {
        return new HBaseInputFormatBuilder();
    }

    public static class HBaseInputFormatBuilder {
        private final HBaseInputFormat format ;

        public HBaseInputFormatBuilder(){
            this.format = new HBaseInputFormat();
        }

        public HBaseInputFormatBuilder setQuorum(String quorum) {
            format.quorum = quorum ;
            return this;
        }
        public HBaseInputFormatBuilder setClientPort(String clientPort){
            format.clientPort = clientPort ;
            return this ;
        }
        public HBaseInputFormatBuilder setTableName(String tableName){
            format.tableName = tableName ;
            return this ;
        }
        public HBaseInputFormatBuilder open(){
            format.openInputFormat();
            return this ;
        }

        public HBaseInputFormat finish(){
            if (format.quorum == null ){
                LOG.info("quorum was not supplied separately.");
            }
            if (format.clientPort == null ){
                LOG.info("clientPort was not supplied separately.");
            }
            if (format.tableName == null ){
                LOG.info("tableName was not supplied separately.");
            }
            if (format.connection == null ){
                throw new IllegalArgumentException("No connection supplied");
            }
            if (format.admin == null ){
                throw new IllegalArgumentException("No admin supplied");
            }
            return format ;
        }

    }
}

展开
收起
IT喵看视界 2019-09-16 15:45:04 2512 0
0 条回答
写回答
取消 提交回答
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载