本文接RabbitMQ之监控(1)。
不管是通过HTTP API接口还是客户端,获取的数据都是为了提供监控视图之用,不过这一切都基于RabbitMQ服务运行完好的情况下。虽然可以通过某些其他工具或方法来检测RabbitMQ进程是否在运行(如:ps aux | grep rabbitmq),或者5672端口是否开启(如:telnet xxx.xxx.xxx.xxx 5672),但是这样依旧不能真正的评判RabbitMQ是否还具备服务外部请求的能力。这里就需要使用AMQP协议来构建一个Ping的检测程序,这个类似于TCP协议的Ping。当这个测试程序与RabbitMQ服务无法建立TCP协议层面的连接,或者无法构建AMQP协议层面的连接,亦或者构建连接超时时则可判定RabbitMQ服务处于异常状态而无法正常的为外部应用提供相应的服务。示例程序下:
/**
* AMQP-ping测试程序返回的状态
*/
enum PING_STATUS{
OK,//正常
EXCEPTION//异常
}
public class AMQPPing {
private static String host = "localhost";
private static int port = 5672;
private static String vhost = "/";
private static String username = "guest";
private static String password = "guest";
/**
* 读取rmq_cfg.properties中的内容,如果没有配置相应的项则采用默认值
*/
static {
Properties properties = new Properties();
try {
properties.load(AMQPPing.class.getClassLoader().
getResourceAsStream("rmq_cfg.properties"));
host = properties.getProperty("host");
port = Integer.valueOf(properties.getProperty("port"));
vhost = properties.getProperty("vhost");
username = properties.getProperty("username");
password = properties.getProperty("password");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* AMQP-ping测试程序,如有IOException或者TimeoutException则说明RabbitMQ
* 服务出现异常情况。
*/
public static PING_STATUS checkAMQPPing(){
PING_STATUS ping_status = PING_STATUS.OK;
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
} catch (IOException | TimeoutException e ) {
e.printStackTrace();
ping_status = PING_STATUS.EXCEPTION;
} finally {
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
return ping_status;
}
}
上面中的示例中涉及到rmq_cfg.properties配置文件,这个文件用来灵活的配置与RabbitMQ服务的连接所需的连接信息,包括IP地址、端口号、vhost、用户名和密码等。如果没有配置相应的项则可以采用默认的值。
监控应用时,可以定时调用AMQPPing.checkAMQPPing()方法来获取检测信息,方法返回值是一个枚举类型,示例中只具备两个值:PING_STATUS.OK和PING_STATUS.EXCEPTION,分别代表RabbitMQ服务正常和异常的情况,这里可以根据实际应用情况来细分返回值的粒度。
AMQPPing这个类能够检测RabbitMQ是否能够接收新的请求和构造AMQP信道,但是要检测RabbitMQ服务是否健康还需要进一步的措施。值得庆幸的是RabbitMQ Management插件提供了/api/aliveness-test/vhost的HTTP API形式的接口,这个接口通过3个步骤来验证RabbitMQ服务的健康性:
- 创建一个以“aliveness-test”为名称的队列来接收测试消息。
- 用队列名称,即“aliveness-test”作为消息的路由键,将消息发往默认交换器。
- 当消息到达队列的时候就消费该消息,否则就报错。
这个HTTP API接口背后的检测程序,这里也称之为aliveness-test,其运行在Erlang虚拟机内部,因此它不会受到网络问题的影响。如果在虚拟机外部的话,网络问题可能会阻止外部客户端连接到RabbitMQ的5672端口。aliveness-test程序不会删除创建的队列,对于频繁调用这个接口的情况,它可以避免数以千计的队列元数据事务对Mnesia数据库造成巨大的压力。如果RabbitMQ服务完好,调用/api/aliveness-test/vhost接口会返回{“status”:”ok”},HTTP状态码为200。示例程序如下:
/**
* AlivenessTest程序返回的状态
* OK表示健康,EXCEPTION表示异常
*/
enum ALIVE_STATUS{
OK,
EXCEPTION
}
public class AlivenessTest {
public static ALIVE_STATUS checkAliveness(String url, String username, String password){
ALIVE_STATUS alive_status = ALIVE_STATUS.OK;
HttpClient client = new HttpClient();
client.getState().setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
GetMethod getMethod = new GetMethod(url);
String data = null;
int ret = -1;
try {
ret = client.executeMethod(getMethod);
data = getMethod.getResponseBodyAsString();
if (ret != 200 || !data.equals("{\"status\":\"ok\"}")) {
alive_status = ALIVE_STATUS.EXCEPTION;
}
} catch (IOException e) {
e.printStackTrace();
alive_status = ALIVE_STATUS.EXCEPTION;
}
return alive_status;
}
}
//调用示例
// AlivenessTest.checkAliveness("http://192.168.0.2:15672/api/aliveness-test/%2F", "root", "root123");
监控应用时,可以定时调用 AlivenessTest.checkAliveness()方法来获取检测信息,方法返回值是一个枚举类型,示例中只具备两个值:ALIVE_STATUS.OK和ALIVE_STATUS.EXCEPTION,分别代表RabbitMQ服务正常和异常的情况,这里可以根据实际应用情况来细分返回值的粒度。
这里的aliveness-test程序配合前面的AMQPPing程序一起使用可以从内部和外部这两个方面来全面的监控RabbitMQ服务。表4中还提及另外两个接口/api/healthchecks/node和/api/healthchecks/node/node,这两个HTTP API接口分别表示对当前节点或指定节点进行基本的健康检查,包括RabbitMQ应用、信道、队列是否运行正常,是否有告警产生等等。使用方式可以参考/api/aliveness-test/vhost,在此不多做赘述。