JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)

简介: 【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。

JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)

RRPC指的是调用该接口向指定设备发送请求消息,并同步返回响应

在物联网场景下,如果想要做到Java服务与硬件同步通信的效果,那么一般会依赖MQTT来实现通信

比如Java服务向硬件发送请求,请求查询硬件相关信息

  1. Java服务和硬件要提前订阅对应的Topic
  2. Java服务先将消息发送到MQTT上(硬件订阅的Topic上)
  3. 硬件订阅Topic收到消息后进行消费,消费完再发送ack响应消息到MQTT上(Java服务订阅的Topic上)

image.png

在这个同步通信的过程中,Java服务发送完消息是需要等待直到ack响应的,那么这个过程在Java服务端该如何实现这种等待/唤醒的模式呢?

本文就结合JUC组件来实现Java与硬件(通过MQTT)同步通信的组件

(为了简化流程,我们代码中使用阻塞队列代替MQTT)

整体流程

整体流程可以想象成远程调用的流程,只不过消费端是硬件,并且它们是通过MQTT转发消息来做到通信的

举例:把Java服务当作A端、把硬件当作B端,它们需要提前订阅MQTT上的topic

  1. A端发送消息到B端订阅的Topic上,并进入等待状态(等待收到响应后唤醒)
  2. B端订阅Topic收到消息后消费,响应并发送到A端订阅的Topic
  3. A端订阅Topic的线程收到消息后进行解析,如果消息是当前节点需要处理的,则唤醒A端发送消息的线程

    image.png

在这个过程中主要涉及四个线程:

  1. A端发送消息的业务线程
  2. B端接收消息并响应的线程
  3. A端接收消息并唤醒的业务线程
  4. A端定时删除超时的任务,防止内存泄漏

由于MQTT中间件太大,为了简化流程,我使用LinkedBlockingQueue进行模拟MQTT通信

    /**
     * 模拟MQTT通信时 A端 接收消息的 Topic
     */
    private static final LinkedBlockingQueue<String> mqttTopicA = new LinkedBlockingQueue<>();

    /**
     * 模拟MQTT通信时 B端 接收消息的 Topic
     */
    private static final LinkedBlockingQueue<String> mqttTopicB = new LinkedBlockingQueue<>();

流程代码:

  1. 先启动B端消费线程
  2. 在启动A端接收线程
  3. 在模拟A端业务线程发送消息
public static void main(String[] args) {
   
    //1.开启消费线程 模拟B端消费消息
    Thread bConsumerThread = new Thread(() -> {
   
        while (true) {
   

            //获取消息
            String msgId = null;
            try {
   
                msgId = mqttTopicB.take();
            } catch (InterruptedException e) {
   
                throw new RuntimeException(e);
            }

            //消费..
            System.out.println("2." + Thread.currentThread().getName() + "消费消息:" + msgId);

            //消费完响应
            mqttTopicA.offer(msgId);

        }
    }, "B端消费线程");
    bConsumerThread.setDaemon(true);
    bConsumerThread.start();

    //2.开启接收线程 模拟A端接收消息
    Thread aReceivedThread = new Thread(() -> {
   
        while (true) {
   

            //获取消息
            String msgId = null;
            try {
   
                msgId = mqttTopicA.take();
            } catch (InterruptedException e) {
   
                throw new RuntimeException(e);
            }

            //唤醒业务处理线程
            MsgResponse msgResponse = new MsgResponse();
            msgResponse.setMsgId(msgId);
            //实际上消息通信会传输消息内容的,这里图方便 不想解析消息 就用msgID当作消息内容传输
            msgResponse.setMsgBodyJson("JSON消息内容");
            if (DefaultFuture.received(msgResponse)) {
   
                System.out.println("3." + Thread.currentThread().getName() + "唤醒消息:" + msgId);
            }
        }
    }, "A端接收消息线程");
    aReceivedThread.setDaemon(true);
    aReceivedThread.start();


    //3.业务处理线程发送消息
    String msgId = UUID.randomUUID().toString();
    System.out.println("1.A端业务线程开始发送消息");
    MsgResponse msgResponse = sendMsg(msgId);
    System.out.println("4.同步通信完毕,得到消息内容:" + msgResponse);
}

在sendMsg发送消息中,主要是创建DefaultFuture任务再发送消息,最后get阻塞等待结果到达时被唤醒

private static MsgResponse sendMsg(String msgId) {
   
    DefaultFuture future = new DefaultFuture(msgId);
    //模拟MQTT通信 发送消息给B端 让其消费消息
    mqttTopicB.offer(msgId);
    MsgResponse msgResponse;
    try {
   
        //阻塞等待 响应到达时被唤醒
        msgResponse = future.get();
    } catch (InterruptedException e) {
   
        throw new RuntimeException(e);
    } catch (ExecutionException e) {
   
        throw new RuntimeException(e);
    }
    return msgResponse;
}

DefaultFuture是我们要实现的任务组件、而MsgResponse是消息响应

类设计

Java服务发送完消息需要等待,直到响应到达;在这个过程中,非常与生产者、消费者模型类似

Java(生产者)发送完消息,进入等待状态,直到收到MQTT消息(相当于收到响应,消费完),唤醒接收响应

经典的生产者与消费者模型,可以考虑使用 synchronized + wait / notify 实现等待通知 或者 JUC下的Lock(AQS) + Condition 实现等待通知

但如果硬件一直不响应,请求一直等待会造成堆积,从而影响服务端性能,因此需要给等待设置超时时间,而synchronized并不支持,因此使用JUC下的 ReentrantLock(AQS阻塞队列)以及Condition(等待队列)

同时需要两个超时相关的字段:超时时间、时间单位

为了判断任务是否超时,需要记录任务开始的时间

在这个过程正好类似JUC下Future接口的流程,可以对Future接口进行实现

在分布式系统下,Java服务通常是多节点的,在同步通信的过程中由于MQTT是发布订阅模型,多节点都会收到响应,如何区分收到的响应消息是不是当前节点发送的呢?

因此需要增加消息的唯一标识(消息ID)用于判断消息,同时本地需要存储消息的唯一标识,存储可能是并发操作,因此考虑使用ConcurrentHashMap 相对线程安全的哈希表,能够根据消息ID快速判断当前节点是否需要接收

为了方便操作,我们将哈希表中的Value设置为我们的Future

由于实现Future接口,还可以取消任务,需要一个字段判断当前任务是否取消

同时需要一个字段存储收到的消息结果,并作为get的返回值

类设计完毕后,给我们的类取上名称——DefaultFuture

public class DefaultFuture implements Future<MsgResponse> {
   

    public DefaultFuture(String msgId) {
   
        this(msgId, 200L, TimeUnit.MILLISECONDS);
    }

    public DefaultFuture(String msgId, long timeout, TimeUnit timeUnit) {
   
        this.msgId = msgId;
        this.timeout = timeout;
        this.timeUnit = timeUnit;

        futures.put(msgId, this);
        startTimeMillis = System.currentTimeMillis();
    }

    /**
     * 通过MQTT发布、订阅模型通信
     * 多Java节点的情况下无法分辨消息是不是当前节点发送的
     * 因此需要唯一标识 消息ID来判断
     * 并且本地还要进行存储,因此考虑使用KV的容器进行存储
     * 由于是会被并发访问,因此使用ConcurrentHashMap
     * K存储消息ID,V存储DefaultFuture,通过消息ID可以获取DefaultFuture
     */
    private static final Map<String, DefaultFuture> futures = new ConcurrentHashMap<>();

    /**
     * 消息唯一标识
     */
    private String msgId;

    /**
     * 用于判断任务是否被取消
     * true被取消  false未被取消
     */
    private boolean isCancel;

    /**
     * 启动时间用于判断是否超时
     */
    private long startTimeMillis;

    private long timeout;

    private TimeUnit timeUnit;

    /**
     * 消息通信的结果
     * 收到消息后 将消息封装成MsgResponse对象 存储到msgResponse中
     * 用于判断任务是否完成
     */
    private MsgResponse msgResponse;

    /**
     * 每个任务对应一把锁 和 一个等待队列
     * 用于实现等待通知模式
     */
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
}

方法实现

实现Future

Future接口提供5个方法:cancel取消任务、isCancelled任务是否被取消、isDone任务是否已完成、剩下两个get重载方法阻塞获取结果

我们依次进行实现:

boolean cancel(boolean mayInterruptIfRunning)

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
   
    //取消任务
    futures.remove(msgId);
    isCancel = true;
    return true;
}

方法中的参数mayInterruptIfRunning为true时需要主动打断任务,因为我们通过MQTT通信,消息发送后无法打断,因此不需要管

只需要在容器中删除消息和标记消息已取消

boolean isCancelled()

@Override
public boolean isCancelled() {
   
    return isCancel;
}

返回是否取消的标记即可

boolean isDone();

@Override
public boolean isDone() {
   
    //结果不为空 说明消息通信完成
    return Objects.nonNull(msgResponse);
}

是否完成只需要判断消息结果是否为空

V get() throws InterruptedException, ExecutionException;

@Override
public MsgResponse get() throws InterruptedException, ExecutionException {
   
    return get(timeout, timeUnit);
}

默认超时时间为200ms

V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

@Override
public MsgResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
   
    //参数校验
    if (timeout <= 0) {
   
        throw new RuntimeException("超时时间有误");
    }

    //覆盖超时字段
    this.timeout = timeout;
    this.timeUnit = unit;

    //如果任务完成就返回结果 否则阻塞等待任务完成
    if (isDone()) {
   
        return msgResponse;
    }

    //等待前要加锁
    lock.lock();
    try {
   
        condition.await(timeout, unit);
    } finally {
   
        lock.unlock();
    }

    return msgResponse;
}

先进行参数校验、再判断是否完成,完成返回结果,未完成则加锁等待

由于只有收到消息才会调用唤醒方法并且也是只执行一次,这里没有使用循环防止虚假唤醒

其他方法

除了future接口的方法外,还需要其他方法来满足我们的需求

public static boolean received(MsgResponse msg) {
   
    //接收时删除
    DefaultFuture future = futures.remove(msg.getMsgId());

    //当前节点不需要处理
    if (Objects.isNull(future)) {
   
        return false;
    }

    //唤醒
    future.doReceived(msg);
    return true;
}

当收到消息时调用接收方法,received会先通过哈希表判断是否需要处理,如果删除获取的结果为空,说明当前节点没维护,不是当前节点不需要处理,否则需要去进行唤醒

private void doReceived(MsgResponse msg) {
   
    lock.lock();
    try {
   
        msgResponse = msg;
        condition.signal();
    } finally {
   
        lock.unlock();
    }
}

唤醒前也需要加锁,并把结果赋值给字段msgResponse

补偿机制

在这个流程中,如果任务超时则会自动被唤醒,导致获取的结果为空,从而抛出异常

当超时的情况发生时,并没有清理哈希表中的记录,这就是常说的内存泄漏,当大量内存泄漏则会发生内存溢出

因此需要启动定时任务做补偿机制,循环判断任务是否已超时

    public void startJob() {
   
        Thread timeoutJobThread = new Thread(() -> {
   
            while (true) {
   
                try {
   
                    for (DefaultFuture future : futures.values()) {
   
                        if (future == null || future.isDone()) {
   
                            continue;
                        }

                        //如果超时就去清理 以防内存泄漏
                        if (System.currentTimeMillis() - future.startTimeMillis > future.timeUnit.toMillis(timeout)) {
   
                            MsgResponse msgResponse = new MsgResponse();
                            msgResponse.setMsgId(future.msgId);
                            msgResponse.setMsgBodyJson("timeout");
                            DefaultFuture.received(msgResponse);
                        }
                    }
                    Thread.sleep(10000);
                } catch (Throwable e) {
   
                    //日志
                }
            }
        },"超时任务线程");

        timeoutJobThread.setDaemon(true);
        timeoutJobThread.start();
    }

总结

本文结合JUC下的Lock、Condition与Future实现MQTT同步通信的组件

其中Lock与Condition是为了阻塞等待,但程序中的DefaultFuture是当作局部变量被使用的,并不存在并发

因此,如果只是为了等待而加锁是没必要的,可以考虑使用LockSupport.park/unpark进行等待

但是LockSupport.unpark唤醒时需要具体的线程,因此需要增加一个容器对任务与业务线程进行绑定存储

具体代码放在git仓库了,感兴趣的同学可以查看

🌠最后(点赞、收藏、关注求求啦~)

本篇文章被收入专栏 由点到线,由线到面,深入浅出构建Java并发编程知识体系,感兴趣的同学可以持续关注喔

本篇文章笔记以及案例被收入 Gitee-CaiCaiJavaGithub-CaiCaiJava,除此之外还有更多Java进阶相关知识,感兴趣的同学可以starred持续关注喔~

有什么问题可以在评论区交流,如果觉得菜菜写的不错,可以点赞、关注、收藏支持一下~

关注菜菜,分享更多技术干货,公众号:菜菜的后端私房菜

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
3月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
89 2
|
17天前
|
Java
Java基础却常被忽略:全面讲解this的实战技巧!
本次分享来自于一道Java基础的面试试题,对this的各种妙用进行了深度讲解,并分析了一些关于this的常见面试陷阱,主要包括以下几方面内容: 1.什么是this 2.this的场景化使用案例 3.关于this的误区 4.总结与练习
|
1月前
|
Java 程序员
Java基础却常被忽略:全面讲解this的实战技巧!
小米,29岁程序员,分享Java中`this`关键字的用法。`this`代表当前对象引用,用于区分成员变量与局部变量、构造方法间调用、支持链式调用及作为参数传递。文章还探讨了`this`在静态方法和匿名内部类中的使用误区,并提供了练习题。
34 1
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
70 6
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
45 3
|
3月前
|
Java 调度
[Java]线程生命周期与线程通信
本文详细探讨了线程生命周期与线程通信。文章首先分析了线程的五个基本状态及其转换过程,结合JDK1.8版本的特点进行了深入讲解。接着,通过多个实例介绍了线程通信的几种实现方式,包括使用`volatile`关键字、`Object`类的`wait()`和`notify()`方法、`CountDownLatch`、`ReentrantLock`结合`Condition`以及`LockSupport`等工具。全文旨在帮助读者理解线程管理的核心概念和技术细节。
44 1
[Java]线程生命周期与线程通信
|
2月前
|
Java
JAVA多线程通信:为何wait()与notify()如此重要?
在Java多线程编程中,`wait()` 和 `notify()/notifyAll()` 方法是实现线程间通信的核心机制。它们通过基于锁的方式,使线程在条件不满足时进入休眠状态,并在条件满足时被唤醒,从而确保数据一致性和同步。相比其他通信方式,如忙等待,这些方法更高效灵活。 示例代码展示了如何在生产者-消费者模型中使用这些方法实现线程间的协调和同步。
43 3
|
2月前
|
Java 调度
Java 线程同步的四种方式,最全详解,建议收藏!
本文详细解析了Java线程同步的四种方式:synchronized关键字、ReentrantLock、原子变量和ThreadLocal,通过实例代码和对比分析,帮助你深入理解线程同步机制。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
Java 线程同步的四种方式,最全详解,建议收藏!
|
3月前
|
安全 Java
Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧
【10月更文挑战第20天】Java多线程通信新解:本文通过生产者-消费者模型案例,深入解析wait()、notify()、notifyAll()方法的实用技巧,包括避免在循环外调用wait()、优先使用notifyAll()、确保线程安全及处理InterruptedException等,帮助读者更好地掌握这些方法的应用。
27 1