项目实战16——消息队列的意义

简介: 项目实战16——消息队列的意义

一.什么是消息队列

消息队列,英文简称是MQ,“消息队列”是在消息的传输过程中保存消息的容器。

二.原理

1.ArrayBockingQueue:

ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)

ArrayBockingQueue使用场景:

1.先进先出队列:头是先进队的元素,尾是后进队的元素

2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作

3.队列不支持空元素

阻塞式队列方法的四种形式:

2.Socket

创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象

3.SeverSocket

ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。

4.Java IO操作——BufferedReader

可以接收任意长度的数据,并且避免乱码的产生

5.java.io.PrintWriter

输出流、字符打印流

三.实现

1.例子目录结构

2.代码部分

消息处理中心 Broker

package org.example;
import java.util.concurrent.ArrayBlockingQueue;
public class Broker {
    private final static int MAX_SIZE = 3;
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());
        } else{
            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        System.out.println("-----------------------------");
    }
    public static String consume(){
        String msg = messageQueue.poll();
        if(msg != null){
            System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());
        } else {
            System.out.println("消息处理中心内没有可供消费的消息!");
        }
        System.out.println("-----------------------------");
        return msg;
    }
}

BrokerSever 用来提供Broker类的对外服务

(BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动)

package org.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class BrokerSever implements Runnable{
    public static int SERVICE_PORT = 9999;
    private final Socket socket;
    public BrokerSever(Socket socket){
        this.socket = socket;
    }
    @Override
    public void run() {
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream()))
        {
            while (true){
                String str = in.readLine();
                if (str == null){
                    continue;
                }
                System.out.println("接收到原始数据: " + str);
                if (str.equals("CONSUME")){
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                }else {
                    Broker.produce(str);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception{
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while(true){
            BrokerSever brokerServer = new BrokerSever(server.accept());
            new Thread(brokerServer).start();
        }
    }
}

ProduceClient 消息生产者

package org.example;
public class ProduceClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        client.produce("hello World.");
    }
}

ConsumeClient 消息消费者

package org.example;
public class ConsumeClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        String message = client.consume();
        System.out.println("获得的消息为: " + message);
    }
}

MyClient 与消息服务器进行通信

package org.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MyClient {
    public static void produce(String message) throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println(message);
            out.flush();
        }
    }
    public static String consume() throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println("CONSUME");
            out.flush();
            String message = in.readLine();
            return message;
        }
    }
}


相关文章
|
存储 缓存 分布式计算
Spark任务OOM问题如何解决?
大家好,我是V哥。在实际业务中,Spark任务常因数据量过大、资源分配不合理或代码瓶颈导致OOM(Out of Memory)。本文详细分析了各种业务场景下的OOM原因,并提供了优化方案,包括调整Executor内存和CPU资源、优化内存管理策略、数据切分及减少宽依赖等。通过综合运用这些方法,可有效解决Spark任务中的OOM问题。关注威哥爱编程,让编码更顺畅!
787 3
|
SQL 分布式计算 Hadoop
Hive使用Impala组件查询(1)
Hive使用Impala组件查询(1)
883 0
|
关系型数据库 MySQL 数据挖掘
MYSQL日期与时间函数的实用技巧
MYSQL日期函数与时间函数是数据库操作的关键工具,可轻松处理、查询、比较和格式化日期时间数据。它们能提取日期的年、月、日等部分,便于筛选和统计;同时,也能处理时间数据,如计算时间差、获取当前时间,助力用户更好地管理时间信息。掌握这些函数,不仅能提升数据库操作效率,还能为数据分析和报表生成提供有力支持。无论初学者还是资深数据库管理员,精通MYSQL的日期和时间函数都至关重要,以满足各种数据处理需求,确保数据的准确性和高效性。
807 0
|
SQL Java Linux
聊聊 kerberos 的 kinit 命令和 ccache 机制
聊聊 kerberos 的 kinit 命令和 ccache 机制
|
Python
掌握pandas中的transform
掌握pandas中的transform
332 3
|
SQL 分布式计算 大数据
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
大数据-94 Spark 集群 SQL DataFrame & DataSet & RDD 创建与相互转换 SparkSQL
365 0
|
存储 缓存 分布式计算
如何在 PySpark 中缓存数据以提高性能?
【8月更文挑战第13天】
648 8
|
SQL 分布式计算 安全
|
Ubuntu Linux C语言
【opencv】opencv在windows和linux的应用
【opencv】opencv在windows和linux的应用