长连接 、短连接、心跳机制与断线重连

简介: 概述 可承遇到,不知什么原因,一个夜晚,机房中,大片的远程调用连接断开。 第二天早上,用户访问高峰,大部分服务器都在获取连接,造成大片网络阻塞。 服务崩溃,惨不忍睹的景象。 本文将从长连接和短连接的概念切入,再到长连接与短连接的区别,以及应用场景,引出心跳机制和断线重连,给出代码实现。

概述


可承遇到,不知什么原因,一个夜晚,机房中,大片的远程调用连接断开。

第二天早上,用户访问高峰,大部分服务器都在获取连接,造成大片网络阻塞。

服务崩溃,惨不忍睹的景象。

本文将从长连接和短连接的概念切入,再到长连接与短连接的区别,以及应用场景,引出心跳机制和断线重连,给出代码实现。

从原理到实践杜绝此类现象。 

 

短连接


概念

client与server通过三次握手建立连接,client发送请求消息,server返回响应,一次连接就完成了。

这时候双方任意都可以发起close操作,不过一般都是client先发起close操作。上述可知,短连接一般只会在 client/server间传递一次请求操作。

短连接的优缺点

管理起来比较简单,存在的连接都是有用的连接,不需要额外的控制手段。

使用场景

通常浏览器访问服务器的时候就是短连接。

对于服务端来说,长连接会耗费服务端的资源,而且用户用浏览器访问服务端相对而言不是很频繁的

如果有几十万,上百万的连接,服务端的压力会非常大,甚至会崩溃。

所以对于并发量大,请求频率低的,建议使用短连接。

长连接


什么是长连接

client向server发起连接,server接受client连接,双方建立连接。

Client与server完成一次读写之后,它们之间的连接并不会主动关闭,后续的读写操作会继续使用这个连接。

长连接的生命周期

正常情况下,一条TCP长连接建立后,只要双不提出关闭请求并且不出现异常情况,这条连接是一直存在的.

操作系统不会自动去关闭它,甚至经过物理网络拓扑的改变之后仍然可以使用。

所以一条连接保持几天、几个月、几年或者更长时间都有可能,只要不出现异常情况或由用户(应用层)主动关闭。

客户端和服务单可一直使用该连接进行数据通信。

长连接的优点

长连接可以省去较多的TCP建立和关闭的操作,减少网络阻塞的影响,

当发生错误时,可以在不关闭连接的情况下进行提示,

减少CPU及内存的使用,因为不需要经常的建立及关闭连接。

长连接的缺点

连接数过多时,影响服务端的性能和并发数量。

使用场景

数据库的连接就是采用TCP长连接.

RPC,远程服务调用,在服务器,一个服务进程频繁调用另一个服务进程,可使用长连接,减少连接花费的时间。

总结

1.对于长连接和短连接的使用是需要根据应用场景来判断的

2.长连接并不是万能的,也是需要维护的,

 

长连接的实现


心跳机制

应用层协议大多都有HeartBeat机制,通常是客户端每隔一小段时间向服务器发送一个数据包,通知服务器自己仍然在线。

并传输一些可能必要的数据。使用心跳包的典型协议是IM,比如QQ/MSN/飞信等协议。

 

在TCP的机制里面,本身是存在有心跳包的机制的,也就是TCP的选项:SO_KEEPALIVE。

系统默认是设置的2小时的心跳频率。但是它检查不到机器断电、网线拔出、防火墙这些断线。

而且逻辑层处理断线可能也不是那么好处理。一般,如果只是用于保活还是可以的。

为什么需要心跳机制?

因为网络的不可靠性, 有可能在 TCP 保持长连接的过程中, 由于某些突发情况, 例如网线被拔出, 突然掉电等,

会造成服务器和客户端的连接中断. 在这些突发情况下, 如果恰好服务器和客户端之间没有交互的话, 那么它们是不能在短时间内发现对方已经掉线的.

心跳机制即可解决此类问题。

TCP协议的KeepAlive机制

默认KeepAlive状态是不打开的。

需要将setsockopt将SOL_SOCKET.SO_KEEPALIVE设置为1才是打开KeepAlive状态,

并且可以设置三个参数:

tcp_keepalive_time  ,tcp_keepalive_probes  , tcp_keepalive_intvl

分别表示:连接闲置多久开始发keepalive的ack包、发几个ack包不回复才当对方已断线、两个ack包之间的间隔。

很多网络设备,尤其是NAT路由器,由于其硬件的限制(例如内存、CPU处理能力),无法保持其上的所有连接,因此在必要的时候,会在连接池中选择一些不活跃的连接踢掉。

典型做法是LRU,把最久没有数据的连接给T掉。

通过使用TCP的KeepAlive机制(修改那个time参数),可以让连接每隔一小段时间就产生一些ack包,以降低被踢掉的风险,当然,这样的代价是额外的网络和CPU负担。

如何实现心跳机制?

两种方式实现心跳机制:

  • 使用 TCP 协议层面的 keepalive 机制.

  • 在应用层上实现自定义的心跳机制.

虽然在 TCP 协议层面上, 提供了 keepalive 保活机制, 但是使用它有几个缺点:

  1. 它不是 TCP 的标准协议, 并且是默认关闭的.

  2. TCP keepalive 机制依赖于操作系统的实现, 默认的 keepalive 心跳时间是 两个小时, 并且对 keepalive 的修改需要系统调用(或者修改系统配置), 灵活性不够.

  3. TCP keepalive 与 TCP 协议绑定, 因此如果需要更换为 UDP 协议时, keepalive 机制就失效了.

使用 TCP 层面的 keepalive 机制比自定义的应用层心跳机制节省流量,

本文的主要介绍应用层方面实现心跳机制,使用netty实现心跳和断线重连。

netty实现心跳机制


netty对心跳机制提供了机制,实现的关键是IdleStateHandler先来看一下他的构造函数

 

    public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }

 

实例化一个 IdleStateHandler 需要提供三个参数:

  • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.

  • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.

  • allIdleTimeSeconds, 读和写都超时. 即当在指定的时间间隔内没有读并且写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

 

netty心跳流程

 

1. 客户端成功连接服务端。

2.在客户端中的ChannelPipeline中加入IdleStateHandler,设置写事件触发事件为5s.

3.客户端超过5s未写数据,触发写事件,向服务端发送心跳包,

4.同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,减轻服务端的压力

5.超过三次,1过0s服务端都会收到来自客户端的心跳信息,服务端可以认为客户端挂了,可以close链路。

6.客户端恢复正常,发现链路已断,重新连接服务端。

代码实现

服务端handler:

package com.heartbreak.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.util.Random;

/**
 * @author janti
 * @date 2018/6/10 12:21
 */
public class HeartbeatServerHandler extends SimpleChannelInboundHandler<String> {
    // 失败计数器:未收到client端发送的ping请求
    private int unRecPingTimes = 0;

    // 定义服务端没有收到心跳消息的最大次数
    private static final int MAX_UN_REC_PING_TIMES = 3;

    private Random random = new Random(System.currentTimeMillis());

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        if (msg!=null && msg.equals("Heartbeat")){
            System.out.println("客户端"+ctx.channel().remoteAddress()+"--心跳信息--");
        }else {
            System.out.println("客户端----请求消息----:"+msg);
            String resp = "商品的价格是:"+random.nextInt(1000);
            ctx.writeAndFlush(resp);
        }
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state()==IdleState.READER_IDLE){
                System.out.println("===服务端===(READER_IDLE 读超时)");
                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                if (unRecPingTimes >= MAX_UN_REC_PING_TIMES) {
                    System.out.println("===服务端===(读超时,关闭chanel)");
                    // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
                    ctx.close();
                } else {
                    // 失败计数器加1
                    unRecPingTimes++;
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        System.out.println("一个客户端已连接");
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.out.println("一个客户端已断开连接");
    }
}

 

 

服务端server:

package com.heartbreak.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/10 10:46
 */
public class HeartBeatServer {
    private static int port = 9817;

    public HeartBeatServer(int port) {
        this.port = port;
    }

    ServerBootstrap bootstrap = null;
    ChannelFuture f;

    // 检测chanel是否接受过心跳数据时间间隔(单位秒)
    private static final int READ_WAIT_SECONDS = 10;

    public static void main(String args[]) {
        HeartBeatServer heartBeatServer = new HeartBeatServer(port);
        heartBeatServer.startServer();
    }

    public void startServer() {
        EventLoopGroup bossgroup = new NioEventLoopGroup();
        EventLoopGroup workergroup = new NioEventLoopGroup();
        try {
            bootstrap = new ServerBootstrap();
            bootstrap.group(bossgroup, workergroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new HeartBeatServerInitializer());
            // 服务器绑定端口监听
            f = bootstrap.bind(port).sync();
            System.out.println("server start ,port: "+port);
            // 监听服务器关闭监听,此方法会阻塞
            f.channel().closeFuture().sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            bossgroup.shutdownGracefully();
            workergroup.shutdownGracefully();
        }
    }


    private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            // 监听读操作,读超时时间为5秒,超过5秒关闭channel;
            pipeline.addLast("ping", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0, TimeUnit.SECONDS));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());

            pipeline.addLast("handler", new HeartbeatServerHandler());
        }
    }

}

 

 

 客户端handler

 

package com.heartbreak.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoop;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/11 22:55
 */
public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String>{
    private HeartBeatClient client;

    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:dd");

    private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
            CharsetUtil.UTF_8));

    public HeartBeatClientHandler(HeartBeatClient client) {
        this.client = client;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("收到服务端回复:"+msg);
        if (msg.equals("Heartbeat")) {
            ctx.write("has read message from server");
            ctx.flush();
        }
        ReferenceCountUtil.release(msg);
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.WRITER_IDLE) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        System.err.println("客户端与服务端断开连接,断开的时间为:"+format.format(new Date()));
        // 定时线程 断线重连
        final EventLoop eventLoop = ctx.channel().eventLoop();
        eventLoop.schedule(new Runnable() {
            @Override
            public void run() {
                client.doConncet();
            }
        }, 10, TimeUnit.SECONDS);
    }


}

 

客户端启动:

package com.heartbreak.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author tangj
 * @date 2018/6/10 16:18
 */
public class HeartBeatClient {

    private Random random = new Random();
    public Channel channel;
    public Bootstrap bootstrap;

    protected String host = "127.0.0.1";
    protected int port = 9817;

    public static void main(String args[]) throws Exception {
        HeartBeatClient client = new HeartBeatClient();
        client.run();
        client.sendData();

    }

    public void run() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleClientInitializer(HeartBeatClient.this));
            doConncet();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送数据
     * @throws Exception
     */
    public void sendData() throws Exception {
        BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
        while (true){
            String cmd = in.readLine();
            switch (cmd){
                case "close" :
                    channel.close();
                    break;
                default:
                channel.writeAndFlush(in.readLine());
                    break;
            }
        }
    }

    /**
     * 连接服务端
     */
    public void doConncet() {
        if (channel != null && channel.isActive()) {
            return;
        }
        ChannelFuture channelFuture = bootstrap.connect(host, port);
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture futureListener) throws Exception {
                if (channelFuture.isSuccess()) {
                    channel = futureListener.channel();
                    System.out.println("connect server successfully");
                } else {
                    System.out.println("Failed to connect to server, try connect after 10s");
                    futureListener.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            doConncet();
                        }
                    }, 10, TimeUnit.SECONDS);
                }
            }
        });

    }


    private class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {

        private HeartBeatClient client;

        public SimpleClientInitializer(HeartBeatClient client) {
            this.client = client;
        }

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            pipeline.addLast(new IdleStateHandler(0, 5, 0));
            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("handler", new HeartBeatClientHandler(client));
        }
    }


}

 

 

运行结果:

1.客户端长时间未发送心跳包,服务端关闭连接

server start ,port: 9817
一个客户端已连接
===服务端===(READER_IDLE 读超时)
===服务端===(READER_IDLE 读超时)
===服务端===(READER_IDLE 读超时)
===服务端===(READER_IDLE 读超时)
===服务端===(读超时,关闭chanel)
一个客户端已断开连接

 

2.客户端发送心跳包,服务端和客户端保持心跳信息

一个客户端已连接
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--
客户端/127.0.0.1:55436--心跳信息--

 

3.服务单宕机,断开连接,客户端进行重连

客户端与服务端断开连接,断开的时间为:2018-06-12 23:47:12
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
Failed to connect to server, try connect after 10s
connect server successfully

 

 

代码地址:

LearnTCP

 本文首发于个人网站:http://www.janti.cn

参考:

TCP长连接与短连接、心跳机制

Socket的长连接和短连接.

浅析 Netty 实现心跳机制与断线重连

http://tldp.org/HOWTO/html_single/TCP-Keepalive-HOWTO

个人博客网站 http://www.janti.cn
相关实践学习
通过Ingress进行灰度发布
本场景您将运行一个简单的应用,部署一个新的应用用于新的发布,并通过Ingress能力实现灰度发布。
容器应用与集群管理
欢迎来到《容器应用与集群管理》课程,本课程是“云原生容器Clouder认证“系列中的第二阶段。课程将向您介绍与容器集群相关的概念和技术,这些概念和技术可以帮助您了解阿里云容器服务ACK/ACK Serverless的使用。同时,本课程也会向您介绍可以采取的工具、方法和可操作步骤,以帮助您了解如何基于容器服务ACK Serverless构建和管理企业级应用。 学习完本课程后,您将能够: 掌握容器集群、容器编排的基本概念 掌握Kubernetes的基础概念及核心思想 掌握阿里云容器服务ACK/ACK Serverless概念及使用方法 基于容器服务ACK Serverless搭建和管理企业级网站应用
相关文章
|
8月前
|
前端开发
websocket的心跳机制
websocket的心跳机制
449 3
|
Java Windows
JavaWebSocket心跳机制详解
WebSocket是一种在Web浏览器和服务器之间进行全双工通信的协议,它提供了一种简单而强大的方式来实现实时数据传输。在使用WebSocket时,心跳机制是非常关键的,它能够保持连接的稳定性并及时发现连接的异常。本文将详细解释JavaWebSocket心跳机制的实现原理和步骤。
547 0
|
数据采集 前端开发 JavaScript
查看Socket断开原因及加入心跳机制防止自动断开连接
一般情况下,前端页面连接WebSocket服务的时候都是通过Nginx等负载均衡,然后由Nginx去代理连接后端的socket服务。如果建立连接之后不做一些措施,那么可能会有各种各样的原因会导致socket断开。
2548 0
|
应用服务中间件 nginx 前端开发
WebSocket加入心跳包防止自动断开连接
近日,在公司中开发一个使用websocket为前端推送消息的功能时,发现一个问题:就是每隔一段时间如果不传送数据的话,与前段的连接就会自动断开; 刚开始以为是session的原因,因为web session 的默认时间是30分钟;但是通过日志发现断开时间间隔时间远远不到30分钟;认真分析发现不操作间隔恰好为90秒 它就会在自动断开;随恍然大悟;原来是我们的使用nginx 代理,nginx配置了访问超时时间为90s; WebSocket是html5中用来实现长连接的一个协议。
24361 0
|
8月前
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的
1163 2
|
8月前
MQTT协议本身是支持心跳保活机制的
MQTT协议本身是支持心跳保活机制的
745 3
|
8月前
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的,
MQTT的心跳保活机制是通过`setKeepAliveInterval()`方法设置的,
640 1
|
8月前
|
移动开发 HTML5
WebSocket心跳机制
WebSocket心跳机制
94 0
|
网络协议
选择长连接 or 短连接,大量 Timewait 的产生时如何处理?
网络通讯中,常见的两个连接类型分别是长连接和短连接。长连接指在一定时间内保持连接不断开,而短连接则指每次连接只进行一次通信,通信结束后即时断开连接。在实际应用中,不同类型的连接有着不同的应用场景和优缺点,而且在网络通讯中可能会遇到大量 Timewait 的产生,这就需要针对不同情况选择不同的处理方案。
132 1
|
监控 前端开发 网络协议
HTTP - 长连接 & 短连接 & 长轮询 & 短轮询 & 心跳机制
HTTP - 长连接 & 短连接 & 长轮询 & 短轮询 & 心跳机制
2205 0
HTTP - 长连接 & 短连接 & 长轮询 & 短轮询 & 心跳机制