开发者社区> 问答> 正文

flink-metric中的自定义reporter是什么啊?

flink-metric中的自定义reporter是什么啊?

展开
收起
游客vwuxaq6iqaowc 2021-12-09 13:37:22 618 0
1 条回答
写回答
取消 提交回答
  • public class InfluxdbReporter extends AbstractReporter implements Scheduled {

    private String database;
    private String retentionPolicy;
    private InfluxDB influxDB;
    
    public InfluxdbReporter() {
    	super(new MeasurementInfoProvider());
    }
    
    @Override
    public void open(MetricConfig config) {
    	String host = getString(config, HOST);
    	int port = getInteger(config, PORT);
    	if (!isValidHost(host) || !isValidPort(port)) {
    		throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
    	}
    	String database = getString(config, DB);
    	if (database == null) {
    		throw new IllegalArgumentException("'" + DB.key() + "' configuration option is not set");
    	}
    	String url = String.format("http://%s:%d", host, port);
    	String username = getString(config, USERNAME);
    	String password = getString(config, PASSWORD);
    
    	this.database = database;
    	this.retentionPolicy = getString(config, RETENTION_POLICY);
    	if (username != null && password != null) {
    		influxDB = InfluxDBFactory.connect(url, username, password);
    	} else {
    		influxDB = InfluxDBFactory.connect(url);
    	}
    
    	log.info("Configured InfluxDBReporter with {host:{}, port:{}, db:{}, and retentionPolicy:{}}", host, port, database, retentionPolicy);
    }
    
    @Override
    public void close() {
    	if (influxDB != null) {
    		influxDB.close();
    		influxDB = null;
    	}
    }
    
    @Override
    public void report() {
    	BatchPoints report = buildReport();
    	if (report != null) {
    		influxDB.write(report);
    	}
    }
    
    @Nullable
    private BatchPoints buildReport() {
    	Instant timestamp = Instant.now();
    	BatchPoints.Builder report = BatchPoints.database(database);
    	report.retentionPolicy(retentionPolicy);
    	try {
    		for (Map.Entry<Gauge<?>, MeasurementInfo> entry : gauges.entrySet()) {
    			report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
    		}
    
    		for (Map.Entry<Counter, MeasurementInfo> entry : counters.entrySet()) {
    			report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
    		}
    
    		for (Map.Entry<Histogram, MeasurementInfo> entry : histograms.entrySet()) {
    			report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
    		}
    
    		for (Map.Entry<Meter, MeasurementInfo> entry : meters.entrySet()) {
    			report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
    		}
    	}
    	catch (ConcurrentModificationException | NoSuchElementException e) {
    		// ignore - may happen when metrics are concurrently added or removed
    		// report next time
    		return null;
    	}
    	return report.build();
    }
    
    private static boolean isValidHost(String host) {
    	return host != null && !host.isEmpty();
    }
    
    private static boolean isValidPort(int port) {
    	return 0 < port && port <= 65535;
    }
    

    }

    2021-12-09 13:37:57
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载