flink-metric中的自定义reporter是什么啊?
public class InfluxdbReporter extends AbstractReporter implements Scheduled {
private String database;
private String retentionPolicy;
private InfluxDB influxDB;
public InfluxdbReporter() {
super(new MeasurementInfoProvider());
}
@Override
public void open(MetricConfig config) {
String host = getString(config, HOST);
int port = getInteger(config, PORT);
if (!isValidHost(host) || !isValidPort(port)) {
throw new IllegalArgumentException("Invalid host/port configuration. Host: " + host + " Port: " + port);
}
String database = getString(config, DB);
if (database == null) {
throw new IllegalArgumentException("'" + DB.key() + "' configuration option is not set");
}
String url = String.format("http://%s:%d", host, port);
String username = getString(config, USERNAME);
String password = getString(config, PASSWORD);
this.database = database;
this.retentionPolicy = getString(config, RETENTION_POLICY);
if (username != null && password != null) {
influxDB = InfluxDBFactory.connect(url, username, password);
} else {
influxDB = InfluxDBFactory.connect(url);
}
log.info("Configured InfluxDBReporter with {host:{}, port:{}, db:{}, and retentionPolicy:{}}", host, port, database, retentionPolicy);
}
@Override
public void close() {
if (influxDB != null) {
influxDB.close();
influxDB = null;
}
}
@Override
public void report() {
BatchPoints report = buildReport();
if (report != null) {
influxDB.write(report);
}
}
@Nullable
private BatchPoints buildReport() {
Instant timestamp = Instant.now();
BatchPoints.Builder report = BatchPoints.database(database);
report.retentionPolicy(retentionPolicy);
try {
for (Map.Entry<Gauge<?>, MeasurementInfo> entry : gauges.entrySet()) {
report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
}
for (Map.Entry<Counter, MeasurementInfo> entry : counters.entrySet()) {
report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
}
for (Map.Entry<Histogram, MeasurementInfo> entry : histograms.entrySet()) {
report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
}
for (Map.Entry<Meter, MeasurementInfo> entry : meters.entrySet()) {
report.point(MetricMapper.map(entry.getValue(), timestamp, entry.getKey()));
}
}
catch (ConcurrentModificationException | NoSuchElementException e) {
// ignore - may happen when metrics are concurrently added or removed
// report next time
return null;
}
return report.build();
}
private static boolean isValidHost(String host) {
return host != null && !host.isEmpty();
}
private static boolean isValidPort(int port) {
return 0 < port && port <= 65535;
}
}
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。