文章目录
一、前言
二、CEPTest
三、Alert
四、MonitoringEvent
五、TemperatureEvent
一、前言
根据Flink CEP library来监控数据中心中每个机柜的温度。当在一定的时间内,如果有2个连续的Event中的温度超过设置的阈值时,就产生一条警告;一条警告也许还不是很坏的结果,但是如果我们在同一个机柜上连续看到2条这种警告,这种情况比较严重了。所以根据第一个警告流的输出,通过定义另一个Pattern,以上一步的输出作为第二个pattern的输入,来定义一个“严重”的问题。
二、CEPTest
package cep;
import org.apache.flink.cep.CEP; import org.apache.flink.cep.PatternSelectFunction; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.cep.pattern.conditions.SimpleCondition; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import java.util.List; import java.util.Map; public class CEPTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // DataStream : source DataStream<TemperatureEvent> inputEventStream = env.fromElements(new TemperatureEvent("xyz", 22.0), new TemperatureEvent("xyz", 20.1), new TemperatureEvent("xyz", 21.1), new TemperatureEvent("xyz", 22.2), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.3), new TemperatureEvent("xyz", 22.1), new TemperatureEvent("xyz", 22.4), new TemperatureEvent("xyz", 22.7), new TemperatureEvent("xyz", 27.0), new TemperatureEvent("xyz", 30.0)); // 定义Pattern,检查10秒钟内温度是否高于26度 Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent>begin("start") .subtype(TemperatureEvent.class) .where(new SimpleCondition<TemperatureEvent>() { public boolean filter(TemperatureEvent subEvent) { if (subEvent.getTemperature() >= 26.0) { return true; } return false; } }) .within(Time.seconds(10)); //匹配pattern并select事件,符合条件的发生警告,即输出 DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern) .select( new PatternSelectFunction<TemperatureEvent, Alert>() { @Override public Alert select(Map<String, List<TemperatureEvent>> event) throws Exception { return new Alert("Temperature Rise Detected: " + event.get("start") + " on machine name: " + event.get("start")); } }); patternStream.print(); env.execute("CEP on Temperature Sensor"); } }
三、Alert
package cep;
public class Alert { private String message; public String getMessage() { return message; } public void setMessage(String message) { this.message = message; } public Alert(String message) { this.message = message; } @Override public String toString() { return "Alert [message=" + message + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((message == null) ? 0 : message.hashCode()); return result; } @Override public boolean equals(Object obj) { if(this == obj) return true; if(obj == null) return false; if(getClass() != obj.getClass()) return false; Alert other = (Alert) obj; if(message == null) { if(other.message != null) { return false; }else if(!message.equals(other.message)) { return false; } } return true; } }
四、MonitoringEvent
package cep;
public abstract class MonitoringEvent { private String machineName; public String getMachineName() { return machineName; } public void setMachineName(String machineName) { this.machineName = machineName; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((machineName == null) ? 0 : machineName.hashCode()); return result; } @Override public boolean equals(Object obj) { if(this == obj) return true; if(obj == null) return false; if(getClass() != obj.getClass()) return false; MonitoringEvent other = (MonitoringEvent) obj; if(machineName == null) { if(other.machineName != null) { return false; }else if(!machineName.equals(other.machineName)) { return false; } } return true; } public MonitoringEvent(String machineName) { super(); this.machineName = machineName; } }
五、TemperatureEvent
package cep;
public class TemperatureEvent extends MonitoringEvent{ public TemperatureEvent(String machineName) { super(machineName); } private double temperature; public double getTemperature() { return temperature; } public void setTemperature(double temperature) { this.temperature = temperature; } @Override public int hashCode() { final int prime = 31; int result = super.hashCode(); long temp; temp = Double.doubleToLongBits(temperature); result = (int) (prime * result +(temp ^ (temp >>> 32))); return result; } @Override public boolean equals(Object obj) { if(this == obj) return true; if(!super.equals(obj)) return false; if(getClass() != obj.getClass()) return false; TemperatureEvent other = (TemperatureEvent) obj; if(Double.doubleToLongBits(temperature) != Double.doubleToLongBits(other.temperature)) return false; return true; } @Override public String toString() { return "TemperatureEvent [getTemperature()=" + getTemperature() + ", getMachineName=" + getTemperature() + "]"; } public TemperatureEvent(String machineName, double temperature) { super(machineName); this.temperature = temperature; } }