之前写了一篇《socket之flume监控系统》,但由于生产环境上的端口限制,so要做一些优化和改善。那这篇就是前作的优化篇吧。
需求是:1、每隔1分钟获取客户端所运行的application的运行状态,这里采用的是通过命令获取该application的进程。客户机有AIX、LINUX、WINDOWS三种平台,AIX跟LINUX大同小异。
2、不定时给客户端发送指令,包括重启、关闭application,更新application的文件。
废话不多说,贴代码。
客户端代码:
public class MonitorClient { private static Timer timer = new Timer(); private static String configFile = "config.properties";// 配置文件 private static Properties pro = null;// 加载属性文件,读取数据库连接配置信息 private long timeout = 60000;// 发送数据间隔时间 private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符 private static final String FAILED_MSG = "FAILED";// 失败标识符 private static final String REPLACE = "replace";// 操作命令 private static final String RESTART = "restart";// 操作命令 private static final String STOP = "stop";// 操作命令 private DateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public MonitorClient(File file) { loadConfig(file); String time = pro.getProperty("timeout"); if (time != null) timeout = Long.valueOf(time); timer.schedule(new Handler(), 1000, timeout); } public void loadConfig(File file) {//加载配置文件 try { if(pro==null){ pro = new Properties(); FileInputStream fis = new FileInputStream(file); pro.load(fis); fis.close(); } } catch (FileNotFoundException e) { System.err.println("The file [" + file.getName() + "] not found!"); System.exit(0); } catch (IOException e) { e.printStackTrace(); System.exit(0); } } class Handler extends TimerTask {//操作类 private Socket s; private OutputStream out; private PrintWriter w; @Override public void run() { try { String ip = pro.getProperty("server.ip"); String port = "9099"; if (pro.getProperty("server.port") != null) port = pro.getProperty("server.port"); s = new Socket(ip, Integer.parseInt(port)); String connectTime=df.format(new Date()); System.out.println("-------connect server at "+connectTime+" -------"); s.setOOBInline(false); s.setSoTimeout(60000); out = s.getOutputStream(); w = new PrintWriter(out,true); String msg = sendMsg(s);//发送application状态 w.println(msg); w.flush(); InputStream in = s.getInputStream(); int _count = 0; while (_count == 0) { _count = in.available(); } byte[] _b = new byte[_count]; int _len= in.read(_b);//获取服务端反馈及指令 String command=new String(_b,0,_len).trim(); if("ok".equalsIgnoreCase(command)) {//打印反馈结果 System.out.println("send message success!"); }else if(REPLACE.equalsIgnoreCase(command)) {//执行更新文件命令 System.out.println("execute replace..."); int sumL = 0; OperateUtil.stop();<span style="font-family: Arial, Helvetica, sans-serif;">//关闭application</span> DataInputStream dis = new DataInputStream(in); String name = dis.readUTF(); long length = dis.readLong(); String flumePath=OperateUtil.getPath(); File file = new File(flumePath+name); if (!file.exists()) { file.createNewFile(); } byte[] buf = new byte[1024]; int read = 0; BufferedOutputStream bout = new BufferedOutputStream( new FileOutputStream(file)); System.out.println("start read file..."); while(sumL<length){//接收文件 read = dis.read(buf); sumL += read; bout.write(buf, 0, read); bout.flush(); } bout.close(); if (length == sumL){ System.out.println("done! the file size is " + sumL + " kb"); String fileName=file.getName(); String filePath=file.getParent(); System.out.println("file path:"+filePath); if(fileName.endsWith(".zip")){//如果是zip文件,需要解压 FileUtil.unZip(file, filePath); if(file.isFile()&&file.exists()){ boolean flag = file.getAbsoluteFile().delete(); System.out.println("The file[" + fileName + "]delete success:" + flag); } } String result = OperateUtil.start();//启动application w.println(result); w.flush(); } dis.close(); } else if (RESTART.equalsIgnoreCase(command)) {//执行启动命令 System.out.println("execute restart..."); String result = FAILED_MSG; OperateUtil.stop(); result = OperateUtil.start(); w.println(result); w.flush(); } else if (STOP.equalsIgnoreCase(command)) {//执行停止命令 System.out.println("execute stop..."); String result = FAILED_MSG; if (OperateUtil.stop()) { result = SUCCESS_MSG; } w.println(result); w.flush(); } if (in != null) in.close(); if (out != null) out.close(); if (w != null) w.close(); s.close(); if (s.isClosed()) { System.out.println("The connect has disconnected"); } } catch (IOException e) { if(e.getMessage().indexOf("Connection refused: connect")!=-1){ System.err.println("disconnected or server not running!"); }else{ e.printStackTrace(); } try { s.close(); timer.cancel(); } catch (IOException e1) { e1.printStackTrace(); } System.exit(0); } } private String sendMsg(Socket s){//发送消息 StringBuffer sb = new StringBuffer(); String hostName = ""; try{ hostName = InetAddress.getLocalHost().getCanonicalHostName(); }catch (UnknownHostException e) { System.err.println("Get local host failed!"+e.getMessage()); } boolean isAlive = OperateUtil.isAlive(); String status="0"; if (isAlive)status = "1"; String ip=s.getLocalAddress().toString().replaceAll("/", ""); String version = OperateUtil.getVersion(); sb.append("name=" + hostName + ","); sb.append("ip=" + ip + ","); sb.append("port=" + s.getLocalPort() + ","); sb.append("status=" + status + ","); sb.append("version=" + version); return sb.toString(); } } public static void main(String[] args) { String fileName=configFile; if(args.length>0) fileName=args[0]; //System.out.println("==========fileName:"+fileName); File file=new File(fileName); new MonitorClient(file); } }
服务端代码:
/** * 接收客户端application状态信息存进表格。先查询是否有机器的记录,如果有使用update,否则使用insert * * @author T430 * */ public class MonitorSocket { private LinkedBlockingQueue<FlumeInfo> queue = new LinkedBlockingQueue<FlumeInfo>(); private ExecutorService handler = Executors.newFixedThreadPool(20); private ExecutorService executor = Executors.newCachedThreadPool(); public static Hashtable<String, String> commandList = new Hashtable<String, String>(); public static Hashtable<String, String> resultList = new Hashtable<String, String>(); private static final String SUCCESS_MSG = "SUCCESS";// 成功标识符 private static final String FAILED_MSG = "FAILED";// 失败标识符 private ServerSocket server; private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); public MonitorSocket(ServletContext servletContext) { executor.execute(new Handler(servletContext)); executor.execute(new Task(queue)); } public static Hashtable<String, String> getCommandList() { return commandList; } public static Hashtable<String, String> getResultList() { return resultList; } public static synchronized void setCommandList(Hashtable<String, String> commandList) { MonitorSocket.commandList = commandList; } class Handler implements Runnable {//操作类 private ServletContext servletContext; public Handler(ServletContext servletContext) { this.servletContext = servletContext; } public void run() { Socket s = null; DataOutputStream dos = null; try { String portStr = this.servletContext.getInitParameter("socketPort"); int port = Integer.parseInt(portStr); server = new ServerSocket(port); System.out.println("Waiting for client connect……"); while (true) { s = server.accept(); String connectTime = df.format(new Date()); String clien = s.getInetAddress().toString().replaceAll("/", ""); System.out.println("------" + clien + " connect at " + connectTime + " ------"); s.setSoTimeout(60000); InputStream in = s.getInputStream(); OutputStream out = s.getOutputStream(); PrintWriter w = new PrintWriter(out, true); BufferedReader br = new BufferedReader(new InputStreamReader(in)); String msg = br.readLine(); System.out.println("client msg:" + msg); if (msg.length() > 0 && !SUCCESS_MSG.equals(msg) && !FAILED_MSG.equals(msg)) { putQueue(msg); } if (!commandList.isEmpty()) { String command = commandList.get(clien); if (command != null) { commandList.remove(clien); String[] commands = command.split(";"); String cmd = commands[1]; System.out.println("server command:" + cmd); if (cmd.equals("0")) {//发送重启指令 w.println("restart"); w.flush(); } else if (cmd.equals("1")) {//发送更新文件指令 w.println("replace"); w.flush(); String fileName = commands[2]; File file = new File(fileName); if (!file.exists()) { System.out.println("file[" + fileName + "]not found"); in.close(); out.close(); w.close(); br.close(); s.close(); return; } BufferedInputStream bis = new BufferedInputStream(new FileInputStream(file)); dos = new DataOutputStream(out); String name = file.getName(); long length = file.length(); String dir = commands[3] + File.separator + name; System.out.println("dir=" + dir); dos.writeUTF(dir);//发送客户端文件存放地址 dos.writeLong(length);//文件大小 dos.flush(); int len = 0; byte[] buf = new byte[1024]; while ((len = bis.read(buf)) != -1) {//发送文件 dos.write(buf, 0, len); dos.flush(); } bis.close(); } else if (cmd.equals("2")) {//发送关闭指令 w.println("stop"); w.flush(); } int count = 0; while (count == 0) { count = in.available(); } byte[] b = new byte[count]; int len = in.read(b);//获取客户端反馈结果 String result = new String(b, 0, len).trim(); System.out.println("execute result:" + result); resultList.put(clien, result); } } w.print("ok"); w.flush(); Thread.currentThread().sleep(1000); try { do { //发送一条特殊的信息,客户端设置setOOBInline(false),则会忽略; //如果客户端已经close的话,话报错,用来判断客户端是否close s.sendUrgentData(0xFF); } while (!s.isClosed()); } catch (IOException e) { if (dos != null) dos.close(); br.close(); out.close(); w.close(); s.close(); if (s.isClosed()) { System.out.println("The connect has disconnected"); } } } } catch (Exception e) { e.printStackTrace(); } } } private void putQueue(String msg) throws Exception { String createTime = df.format(new Date()); String[] strs = msg.split(","); FlumeInfo info = new FlumeInfo(); for (String str : strs) { String[] texts = str.split("="); String s = texts[0]; if ("name".equals(s)) { info.setName(texts[1]); } else if ("ip".equals(s)) { info.setIP(texts[1]); } else if ("port".equals(s)) { info.setPort(texts[1]); } else if ("status".equals(s)) { String status = texts[1]; info.setStatus(status); } else if ("version".equals(s)) { info.setVersion(texts[1]); } } info.setCode("code"); info.setCreateTime(createTime); queue.put(info); } public void closeServerSocket() { try { server.close(); executor.shutdown(); } catch (IOException e) { System.err.println("close server error"); e.printStackTrace(); } } }
有的操作类省略了。大家可以到这里去下载完整的代码:
客户端:去下载
服务端:去下载