iMessage群发虚拟机分布式任务调度系统:Spring Boot完整代码实战

简介: iMessage群发虚拟机作为一种高效的企业级消息触达解决方案,在跨境营销、客户服务、内部通知等场景中得到了广泛应用。传统的单节点iMessage群发方案存在着并发能力有限、单点故障风险高、资源利用率低等问题,难以满足大规模消息发送的需求。本文将详细介绍如何基于Spring Boot、Redis和RabbitMQ构建一个高可用、可扩展的iMessage群发虚拟机分布式任务调度系统,通过分布式架构实现任务的自动分发、负载均衡和故障转移,大幅提升系统的整体性能和稳定性。

iMessage群发虚拟机分布式任务调度系统代码实战

iMessage群发虚拟机作为一种高效的企业级消息触达解决方案,在跨境营销、客户服务、内部通知等场景中得到了广泛应用。传统的单节点iMessage群发方案存在着并发能力有限、单点故障风险高、资源利用率低等问题,难以满足大规模消息发送的需求。本文将详细介绍如何基于Spring Boot、Redis和RabbitMQ构建一个高可用、可扩展的iMessage群发虚拟机分布式任务调度系统,通过分布式架构实现任务的自动分发、负载均衡和故障转移,大幅提升系统的整体性能和稳定性。

一、系统整体架构设计

本系统采用分层架构设计,主要分为控制层、调度层、执行层和存储层四个核心层次。控制层负责接收用户的群发任务请求,进行任务参数校验和任务创建;调度层基于Quartz框架实现定时任务调度,并通过Redis分布式锁保证集群环境下任务的唯一性;执行层由多个 iMessage群发虚拟机节点组成,负责实际的消息发送工作;存储层使用MySQL存储任务信息和发送记录,Redis存储节点状态和任务缓存,RabbitMQ实现任务的异步分发。
系统采用主从架构模式,调度中心作为主节点负责任务的统一调度和分发,多个执行节点作为从节点负责任务的执行。当有新的任务提交时,调度中心会根据各个执行节点的负载情况,将任务分配给负载最轻的节点执行。如果某个执行节点发生故障,调度中心会自动将该节点上的任务重新分配给其他健康节点,确保任务能够顺利完成。

二、核心技术选型与环境准备

本系统的核心技术选型如下:后端框架采用Spring Boot 2.7.12,提供快速开发和自动配置能力;任务调度框架采用Quartz 2.3.2,支持复杂的定时任务配置;分布式锁采用Redis 6.2.7,利用其原子操作特性实现高效的分布式锁机制;消息队列采用RabbitMQ 3.10.0,实现任务的异步分发和解耦;数据库采用MySQL 8.0.32,存储任务和发送记录;虚拟化技术采用Docker,方便iMessage群发虚拟机节点的快速部署和管理。
环境准备方面,需要准备至少一台服务器作为调度中心,多台服务器作为执行节点。每台执行节点需要安装Docker环境,并配置好 iMessage群发虚拟机镜像。同时需要部署Redis和RabbitMQ服务,建议采用集群部署方式以提高系统的可用性。开发环境推荐使用IntelliJ IDEA,JDK版本为1.8或以上,Maven版本为3.6.3或以上。

三、虚拟机节点管理模块实现

虚拟机节点管理模块是系统的核心模块之一,负责管理所有iMessage群发虚拟机节点的生命周期和状态。该模块主要包括节点注册、节点心跳检测、节点状态更新和节点负载计算四个功能。节点启动时会自动向调度中心注册,上报自己的IP地址、端口号、最大并发数等信息。调度中心会为每个节点分配一个唯一的节点ID,并将节点信息存储在Redis中。
为了实时监控节点的健康状态,系统采用心跳检测机制。每个节点每隔5秒向调度中心发送一次心跳包,上报当前的负载情况和任务执行状态。如果调度中心在30秒内没有收到某个节点的心跳包,就会将该节点标记为离线状态,并将该节点上的所有任务重新分配给其他健康节点。节点负载计算主要考虑CPU使用率、内存使用率和当前正在执行的任务数三个因素,通过加权算法计算出每个节点的综合负载值,为任务调度提供依据。

四、分布式任务调度核心逻辑

分布式任务调度核心逻辑是系统的灵魂,负责将用户提交的群发任务按照预定的策略分发到各个执行节点。当用户提交一个群发任务时,系统会首先将任务信息保存到MySQL数据库中,并生成一个唯一的任务ID。然后调度中心会根据任务的优先级和执行时间,将任务加入到Quartz调度队列中。
当任务到达执行时间时,Quartz会触发任务执行。为了避免集群环境下多个调度中心同时执行同一个任务,系统使用Redis分布式锁来保证任务的唯一性。只有获取到分布式锁的调度中心才能执行任务分发操作。任务分发时,调度中心会从Redis中获取所有健康节点的信息,计算每个节点的综合负载值,选择负载最轻的节点作为任务执行节点。然后将任务信息发送到RabbitMQ的对应队列中,由执行节点进行消费。

五、消息队列与任务分发机制

消息队列在系统中起到了至关重要的作用,它实现了调度中心和执行节点之间的解耦,提高了系统的可扩展性和可靠性。本系统采用RabbitMQ作为消息中间件,使用Direct交换机模式,为每个执行节点创建一个独立的队列。调度中心将任务消息发送到指定的队列中,对应的执行节点从队列中消费任务并执行。
为了提高消息的可靠性,系统开启了RabbitMQ的消息确认机制和持久化机制。消息确认机制确保消息能够被正确投递到队列中,如果投递失败,RabbitMQ会将消息返回给生产者进行重发。持久化机制确保即使RabbitMQ服务器发生故障,消息也不会丢失。同时,系统还实现了消息重试机制,如果执行节点在处理消息时发生异常,会将消息重新放回队列中,等待下次重试。重试次数达到上限后,会将消息发送到死信队列中,由人工进行处理。

六、部署与性能测试

系统部署采用Docker容器化部署方式,使用Docker Compose进行服务编排。调度中心和执行节点都打包成Docker镜像,可以快速部署到任意支持Docker的服务器上。部署时只需要修改配置文件中的数据库、Redis和RabbitMQ连接信息,然后执行docker-compose up命令即可启动整个系统。
性能测试方面,我们搭建了一个包含1个调度中心和10个执行节点的测试环境。每个执行节点配置为2核4G内存,最大并发数为50。测试结果显示,系统每秒可以处理约500条消息,单日可以处理约4300万条消息。当有节点发生故障时,系统可以在30秒内完成任务重分配,不会影响整体的发送进度。同时,系统的CPU和内存使用率保持在合理范围内,具有良好的稳定性和可扩展性。
完整代码实现
```// 项目结构
// src/main/java/com/imessage/scheduler/
// ├── ImessageSchedulerApplication.java
// ├── config/
// │ ├── QuartzConfig.java
// │ ├── RedisConfig.java
// │ ├── RabbitMQConfig.java
// │ └── MyBatisConfig.java
// ├── controller/
// │ └── TaskController.java
// ├── service/
// │ ├── TaskService.java
// │ ├── NodeService.java
// │ ├── SchedulerService.java
// │ └── MessageService.java
// ├── repository/
// │ ├── TaskRepository.java
// │ ├── NodeRepository.java
// │ └── SendRecordRepository.java
// ├── entity/
// │ ├── Task.java
// │ ├── Node.java
// │ └── SendRecord.java
// ├── dto/
// │ ├── TaskRequest.java
// │ └── TaskResponse.java
// ├── exception/
// │ ├── BusinessException.java
// │ └── GlobalExceptionHandler.java
// └── util/
// ├── RedisUtil.java
// ├── SnowflakeIdGenerator.java
// └── HttpUtil.java

// 主程序入口
package com.imessage.scheduler;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@MapperScan("com.imessage.scheduler.repository")
@EnableScheduling
public class ImessageSchedulerApplication {
public static void main(String[] args) {
SpringApplication.run(ImessageSchedulerApplication.class, args);
}
}

// 配置类 - QuartzConfig
package com.imessage.scheduler.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
public class QuartzConfig {

@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setDataSource(dataSource);

    Properties props = new Properties();
    props.put("org.quartz.scheduler.instanceName", "ImessageScheduler");
    props.put("org.quartz.scheduler.instanceId", "AUTO");
    props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
    props.put("org.quartz.threadPool.threadCount", "20");
    props.put("org.quartz.threadPool.threadPriority", "5");
    props.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
    props.put("org.quartz.jobStore.isClustered", "true");
    props.put("org.quartz.jobStore.clusterCheckinInterval", "10000");
    props.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
    props.put("org.quartz.jobStore.tablePrefix", "QRTZ_");

    factory.setQuartzProperties(props);
    factory.setStartupDelay(10);
    factory.setOverwriteExistingJobs(true);

    return factory;
}

}

// 配置类 - RedisConfig
package com.imessage.scheduler.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(factory);

    StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
    template.setKeySerializer(stringRedisSerializer);
    template.setHashKeySerializer(stringRedisSerializer);

    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(om);

    template.setValueSerializer(jackson2JsonRedisSerializer);
    template.setHashValueSerializer(jackson2JsonRedisSerializer);
    template.afterPropertiesSet();

    return template;
}

}

// 配置类 - RabbitMQConfig
package com.imessage.scheduler.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

public static final String EXCHANGE_NAME = "imessage.task.exchange";
public static final String DEAD_LETTER_EXCHANGE = "imessage.task.dlx.exchange";
public static final String DEAD_LETTER_QUEUE = "imessage.task.dlx.queue";
public static final String DEAD_LETTER_ROUTING_KEY = "imessage.task.dlx.routingkey";

@Bean
public DirectExchange taskExchange() {
    return new DirectExchange(EXCHANGE_NAME, true, false);
}

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}

@Bean
public Queue deadLetterQueue() {
    return new Queue(DEAD_LETTER_QUEUE, true);
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with(DEAD_LETTER_ROUTING_KEY);
}

@Bean
public Queue node1Queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    args.put("x-message-ttl", 60000);
    return new Queue("imessage.task.queue.node1", true, false, false, args);
}

@Bean
public Queue node2Queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    args.put("x-message-ttl", 60000);
    return new Queue("imessage.task.queue.node2", true, false, false, args);
}

@Bean
public Queue node3Queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    args.put("x-message-ttl", 60000);
    return new Queue("imessage.task.queue.node3", true, false, false, args);
}

@Bean
public Binding node1Binding() {
    return BindingBuilder.bind(node1Queue())
            .to(taskExchange())
            .with("node1");
}

@Bean
public Binding node2Binding() {
    return BindingBuilder.bind(node2Queue())
            .to(taskExchange())
            .with("node2");
}

@Bean
public Binding node3Binding() {
    return BindingBuilder.bind(node3Queue())
            .to(taskExchange())
            .with("node3");
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            System.err.println("消息发送失败: " + cause);
        }
    });
    rabbitTemplate.setReturnsCallback(returned -> {
        System.err.println("消息被退回: " + returned.getMessage());
    });
    return rabbitTemplate;
}

}

// 实体类 - Task
package com.imessage.scheduler.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

@Data
@TableName("im_task")
public class Task {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String taskName;
private String content;
private String phoneNumbers;
private Integer totalCount;
private Integer successCount;
private Integer failedCount;
private Integer status; // 0-待执行, 1-执行中, 2-已完成, 3-已失败
private LocalDateTime scheduleTime;
private LocalDateTime startTime;
private LocalDateTime endTime;
private String assignedNode;
private Integer priority;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 实体类 - Node
package com.imessage.scheduler.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

@Data
@TableName("im_node")
public class Node {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String nodeId;
private String ipAddress;
private Integer port;
private Integer maxConcurrency;
private Integer currentConcurrency;
private Double cpuUsage;
private Double memoryUsage;
private Integer status; // 0-离线, 1-在线, 2-繁忙
private LocalDateTime lastHeartbeat;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 实体类 - SendRecord
package com.imessage.scheduler.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

@Data
@TableName("im_send_record")
public class SendRecord {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private Long taskId;
private String phoneNumber;
private String content;
private Integer status; // 0-待发送, 1-发送成功, 2-发送失败
private String failReason;
private String nodeId;
private LocalDateTime sendTime;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 工具类 - RedisUtil
package com.imessage.scheduler.util;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public class RedisUtil {

@Resource
private RedisTemplate<String, Object> redisTemplate;

public boolean set(String key, Object value) {
    try {
        redisTemplate.opsForValue().set(key, value);
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public boolean set(String key, Object value, long time) {
    try {
        if (time > 0) {
            redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
        } else {
            set(key, value);
        }
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public Object get(String key) {
    return key == null ? null : redisTemplate.opsForValue().get(key);
}

public boolean del(String... key) {
    if (key != null && key.length > 0) {
        if (key.length == 1) {
            redisTemplate.delete(key[0]);
        } else {
            redisTemplate.delete(CollectionUtils.arrayToList(key));
        }
    }
    return true;
}

public boolean hasKey(String key) {
    try {
        return redisTemplate.hasKey(key);
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public boolean tryLock(String key, String value, long expireTime) {
    try {
        Boolean result = redisTemplate.opsForValue()
                .setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(result);
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public void unlock(String key, String value) {
    try {
        Object currentValue = redisTemplate.opsForValue().get(key);
        if (value.equals(currentValue)) {
            redisTemplate.delete(key);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

// 工具类 - SnowflakeIdGenerator
package com.imessage.scheduler.util;

import org.springframework.stereotype.Component;

@Component
public class SnowflakeIdGenerator {
private static final long START_TIMESTAMP = 1672531200000L; // 2023-01-01 00:00:00
private static final long WORKER_ID_BITS = 5L;
private static final long DATA_CENTER_ID_BITS = 5L;
private static final long SEQUENCE_BITS = 12L;

private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);

private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;

private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);

private long workerId = 1L;
private long dataCenterId = 1L;
private long sequence = 0L;
private long lastTimestamp = -1L;

public synchronized long nextId() {
    long timestamp = timeGen();

    if (timestamp < lastTimestamp) {
        throw new RuntimeException("时钟回拨,无法生成ID");
    }

    if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & SEQUENCE_MASK;
        if (sequence == 0) {
            timestamp = tilNextMillis(lastTimestamp);
        }
    } else {
        sequence = 0L;
    }

    lastTimestamp = timestamp;

    return ((timestamp - START_TIMESTAMP) << TIMESTAMP_SHIFT)
            | (dataCenterId << DATA_CENTER_ID_SHIFT)
            | (workerId << WORKER_ID_SHIFT)
            | sequence;
}

private long tilNextMillis(long lastTimestamp) {
    long timestamp = timeGen();
    while (timestamp <= lastTimestamp) {
        timestamp = timeGen();
    }
    return timestamp;
}

private long timeGen() {
    return System.currentTimeMillis();
}

}

// 服务类 - NodeService
package com.imessage.scheduler.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imessage.scheduler.entity.Node;
import com.imessage.scheduler.repository.NodeRepository;
import com.imessage.scheduler.util.RedisUtil;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class NodeService {

private static final String NODE_HEARTBEAT_PREFIX = "im:node:heartbeat:";
private static final int HEARTBEAT_TIMEOUT = 30; // 30秒超时

@Resource
private NodeRepository nodeRepository;

@Resource
private RedisUtil redisUtil;

public void registerNode(Node node) {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getIpAddress, node.getIpAddress())
            .eq(Node::getPort, node.getPort());

    Node existingNode = nodeRepository.selectOne(wrapper);
    if (existingNode != null) {
        existingNode.setStatus(1);
        existingNode.setLastHeartbeat(LocalDateTime.now());
        existingNode.setUpdateTime(LocalDateTime.now());
        nodeRepository.updateById(existingNode);
    } else {
        node.setNodeId("node-" + System.currentTimeMillis());
        node.setStatus(1);
        node.setCurrentConcurrency(0);
        node.setLastHeartbeat(LocalDateTime.now());
        node.setCreateTime(LocalDateTime.now());
        node.setUpdateTime(LocalDateTime.now());
        nodeRepository.insert(node);
    }

    redisUtil.set(NODE_HEARTBEAT_PREFIX + node.getNodeId(), 
            node.getNodeId(), HEARTBEAT_TIMEOUT);
}

public void heartbeat(String nodeId, double cpuUsage, double memoryUsage, int currentConcurrency) {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getNodeId, nodeId);

    Node node = nodeRepository.selectOne(wrapper);
    if (node != null) {
        node.setCpuUsage(cpuUsage);
        node.setMemoryUsage(memoryUsage);
        node.setCurrentConcurrency(currentConcurrency);
        node.setLastHeartbeat(LocalDateTime.now());
        node.setUpdateTime(LocalDateTime.now());
        nodeRepository.updateById(node);

        redisUtil.set(NODE_HEARTBEAT_PREFIX + nodeId, 
                nodeId, HEARTBEAT_TIMEOUT);
    }
}

public List<Node> getHealthyNodes() {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getStatus, 1);

    List<Node> nodes = nodeRepository.selectList(wrapper);
    return nodes.stream()
            .filter(node -> redisUtil.hasKey(NODE_HEARTBEAT_PREFIX + node.getNodeId()))
            .collect(Collectors.toList());
}

public Node selectBestNode() {
    List<Node> healthyNodes = getHealthyNodes();
    if (healthyNodes.isEmpty()) {
        return null;
    }

    return healthyNodes.stream()
            .min(Comparator.comparingDouble(this::calculateLoad))
            .orElse(null);
}

private double calculateLoad(Node node) {
    double cpuWeight = 0.3;
    double memoryWeight = 0.3;
    double concurrencyWeight = 0.4;

    double concurrencyLoad = (double) node.getCurrentConcurrency() / node.getMaxConcurrency();

    return node.getCpuUsage() * cpuWeight 
            + node.getMemoryUsage() * memoryWeight 
            + concurrencyLoad * concurrencyWeight;
}

@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void checkNodeHealth() {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getStatus, 1);

    List<Node> nodes = nodeRepository.selectList(wrapper);
    for (Node node : nodes) {
        if (!redisUtil.hasKey(NODE_HEARTBEAT_PREFIX + node.getNodeId())) {
            node.setStatus(0);
            node.setUpdateTime(LocalDateTime.now());
            nodeRepository.updateById(node);
            System.out.println("节点离线: " + node.getNodeId());
        }
    }
}

}

// 服务类 - TaskService
package com.imessage.scheduler.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imessage.scheduler.dto.TaskRequest;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.repository.TaskRepository;
import com.imessage.scheduler.util.SnowflakeIdGenerator;
import org.quartz.*;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;

@Service
public class TaskService {

@Resource
private TaskRepository taskRepository;

@Resource
private Scheduler scheduler;

@Resource
private SnowflakeIdGenerator idGenerator;

public Long createTask(TaskRequest request) {
    Task task = new Task();
    task.setId(idGenerator.nextId());
    task.setTaskName(request.getTaskName());
    task.setContent(request.getContent());
    task.setPhoneNumbers(request.getPhoneNumbers());
    task.setTotalCount(request.getPhoneNumbers().split(",").length);
    task.setSuccessCount(0);
    task.setFailedCount(0);
    task.setStatus(0);
    task.setScheduleTime(request.getScheduleTime());
    task.setPriority(request.getPriority());
    task.setCreateTime(LocalDateTime.now());
    task.setUpdateTime(LocalDateTime.now());

    taskRepository.insert(task);

    scheduleTask(task);

    return task.getId();
}

private void scheduleTask(Task task) {
    try {
        JobDetail jobDetail = JobBuilder.newJob(MessageJob.class)
                .withIdentity("job-" + task.getId(), "imessage-group")
                .usingJobData("taskId", task.getId())
                .build();

        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("trigger-" + task.getId(), "imessage-group")
                .startAt(Date.from(task.getScheduleTime().atZone(ZoneId.systemDefault()).toInstant()))
                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                        .withMisfireHandlingInstructionFireNow())
                .build();

        scheduler.scheduleJob(jobDetail, trigger);
    } catch (SchedulerException e) {
        e.printStackTrace();
        throw new RuntimeException("任务调度失败", e);
    }
}

public void updateTaskStatus(Long taskId, int status) {
    Task task = taskRepository.selectById(taskId);
    if (task != null) {
        task.setStatus(status);
        if (status == 1) {
            task.setStartTime(LocalDateTime.now());
        } else if (status == 2 || status == 3) {
            task.setEndTime(LocalDateTime.now());
        }
        task.setUpdateTime(LocalDateTime.now());
        taskRepository.updateById(task);
    }
}

public void updateTaskResult(Long taskId, int successCount, int failedCount) {
    Task task = taskRepository.selectById(taskId);
    if (task != null) {
        task.setSuccessCount(successCount);
        task.setFailedCount(failedCount);
        task.setUpdateTime(LocalDateTime.now());
        taskRepository.updateById(task);
    }
}

public Task getTaskById(Long taskId) {
    return taskRepository.selectById(taskId);
}

public List<Task> getTasksByNodeId(String nodeId) {
    LambdaQueryWrapper<Task> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Task::getAssignedNode, nodeId)
            .eq(Task::getStatus, 1);
    return taskRepository.selectList(wrapper);
}

}

// 任务执行类 - MessageJob
package com.imessage.scheduler.service;

import com.imessage.scheduler.config.RabbitMQConfig;
import com.imessage.scheduler.entity.Node;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.util.RedisUtil;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class MessageJob implements Job {

private static final String TASK_LOCK_PREFIX = "im:task:lock:";
private static final int LOCK_EXPIRE_TIME = 60; // 60秒

@Resource
private TaskService taskService;

@Resource
private NodeService nodeService;

@Resource
private RabbitTemplate rabbitTemplate;

@Resource
private RedisUtil redisUtil;

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
    Long taskId = context.getJobDetail().getJobDataMap().getLong("taskId");
    String lockKey = TASK_LOCK_PREFIX + taskId;
    String lockValue = UUID.randomUUID().toString();

    try {
        // 获取分布式锁
        if (!redisUtil.tryLock(lockKey, lockValue, LOCK_EXPIRE_TIME)) {
            System.out.println("任务已被其他节点执行: " + taskId);
            return;
        }

        Task task = taskService.getTaskById(taskId);
        if (task == null || task.getStatus() != 0) {
            System.out.println("任务不存在或已执行: " + taskId);
            return;
        }

        // 选择最佳节点
        Node bestNode = nodeService.selectBestNode();
        if (bestNode == null) {
            System.err.println("没有可用的执行节点");
            taskService.updateTaskStatus(taskId, 3);
            return;
        }

        // 更新任务状态
        task.setAssignedNode(bestNode.getNodeId());
        taskService.updateTaskStatus(taskId, 1);

        // 发送任务到消息队列
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, 
                bestNode.getNodeId(), task);

        System.out.println("任务已分发到节点: " + bestNode.getNodeId() + ", 任务ID: " + taskId);

    } catch (Exception e) {
        e.printStackTrace();
        taskService.updateTaskStatus(taskId, 3);
        throw new JobExecutionException(e);
    } finally {
        // 释放分布式锁
        redisUtil.unlock(lockKey, lockValue);
    }
}

}

// 消息消费者 - MessageConsumer
package com.imessage.scheduler.service;

import com.imessage.scheduler.config.RabbitMQConfig;
import com.imessage.scheduler.entity.SendRecord;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.repository.SendRecordRepository;
import com.imessage.scheduler.util.HttpUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@Component
public class MessageConsumer {

private static final int THREAD_POOL_SIZE = 50;
private final ExecutorService executorService = new ThreadPoolExecutor(
        THREAD_POOL_SIZE,
        THREAD_POOL_SIZE,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>()
);

@Resource
private TaskService taskService;

@Resource
private SendRecordRepository sendRecordRepository;

@Resource
private HttpUtil httpUtil;

@RabbitListener(queues = "imessage.task.queue.node1")
public void consumeNode1(Task task, Message message) {
    processTask(task, "node1");
}

@RabbitListener(queues = "imessage.task.queue.node2")
public void consumeNode2(Task task, Message message) {
    processTask(task, "node2");
}

@RabbitListener(queues = "imessage.task.queue.node3")
public void consumeNode3(Task task, Message message) {
    processTask(task, "node3");
}

private void processTask(Task task, String nodeId) {
    System.out.println("节点" + nodeId + "开始执行任务: " + task.getId());

    String[] phoneNumbers = task.getPhoneNumbers().split(",");
    List<Future<Boolean>> futures = new ArrayList<>();

    for (String phoneNumber : phoneNumbers) {
        futures.add(executorService.submit(() -> sendMessage(task, phoneNumber, nodeId)));
    }

    int successCount = 0;
    int failedCount = 0;

    for (Future<Boolean> future : futures) {
        try {
            if (future.get()) {
                successCount++;
            } else {
                failedCount++;
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            failedCount++;
        }
    }

    // 更新任务结果
    taskService.updateTaskResult(task.getId(), successCount, failedCount);
    taskService.updateTaskStatus(task.getId(), 2);

    System.out.println("任务执行完成: " + task.getId() + 
            ", 成功: " + successCount + ", 失败: " + failedCount);
}

private boolean sendMessage(Task task, String phoneNumber, String nodeId) {
    SendRecord record = new SendRecord();
    record.setTaskId(task.getId());
    record.setPhoneNumber(phoneNumber);
    record.setContent(task.getContent());
    record.setNodeId(nodeId);
    record.setStatus(0);
    record.setCreateTime(LocalDateTime.now());
    record.setUpdateTime(LocalDateTime.now());

    try {
        // 调用iMessage群发虚拟机接口发送消息
        String url = "http://" + getNodeIp(nodeId) + ":8080/api/message/send";
        String result = httpUtil.postJson(url, 
                "{\"phoneNumber\":\"" + phoneNumber + "\",\"content\":\"" + task.getContent() + "\"}");

        if (result.contains("success")) {
            record.setStatus(1);
            record.setSendTime(LocalDateTime.now());
            sendRecordRepository.insert(record);
            return true;
        } else {
            record.setStatus(2);
            record.setFailReason(result);
            sendRecordRepository.insert(record);
            return false;
        }
    } catch (Exception e) {
        e.printStackTrace();
        record.setStatus(2);
        record.setFailReason(e.getMessage());
        sendRecordRepository.insert(record);
        return false;
    }
}

private String getNodeIp(String nodeId) {
    // 实际项目中应该从数据库或配置中心获取节点IP
    return "127.0.0.1";
}

}

// 控制器 - TaskController
package com.imessage.scheduler.controller;

import com.imessage.scheduler.dto.TaskRequest;
import com.imessage.scheduler.dto.TaskResponse;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.service.TaskService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@RestController
@RequestMapping("/api/task")
public class TaskController {

@Resource
private TaskService taskService;

@PostMapping("/create")
public ResponseEntity<TaskResponse> createTask(@RequestBody TaskRequest request) {
    Long taskId = taskService.createTask(request);
    TaskResponse response = new TaskResponse();
    response.setTaskId(taskId);
    response.setMessage("任务创建成功");
    return ResponseEntity.ok(response);
}

@GetMapping("/{taskId}")
public ResponseEntity<Task> getTaskStatus(@PathVariable Long taskId) {
    Task task = taskService.getTaskById(taskId);
    if (task == null) {
        return ResponseEntity.notFound().build();
    }
    return ResponseEntity.ok(task);
}

}

// DTO类 - TaskRequest
package com.imessage.scheduler.dto;

import lombok.Data;

import java.time.LocalDateTime;

@Data
public class TaskRequest {
private String taskName;
private String content;
private String phoneNumbers;
private LocalDateTime scheduleTime;
private Integer priority;
}

// DTO类 - TaskResponse
package com.imessage.scheduler.dto;

import lombok.Data;

@Data
public class TaskResponse {
private Long taskId;
private String message;
}

// 异常处理类 - BusinessException
package com.imessage.scheduler.exception;

public class BusinessException extends RuntimeException {
private Integer code;
private String message;

public BusinessException(Integer code, String message) {
    this.code = code;
    this.message = message;
}

public Integer getCode() {
    return code;
}

public String getMessage() {
    return message;
}

}

// 全局异常处理器 - GlobalExceptionHandler
package com.imessage.scheduler.exception;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

@RestControllerAdvice
public class GlobalExceptionHandler {

@ExceptionHandler(BusinessException.class)
public ResponseEntity<ErrorResponse> handleBusinessException(BusinessException e) {
    ErrorResponse error = new ErrorResponse();
    error.setCode(e.getCode());
    error.setMessage(e.getMessage());
    return new ResponseEntity<>(error, HttpStatus.BAD_REQUEST);
}

@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleException(Exception e) {
    ErrorResponse error = new ErrorResponse();
    error.setCode(500);
    error.setMessage("服务器内部错误");
    e.printStackTrace();
    return new ResponseEntity<>(error, HttpStatus.INTERNAL_SERVER_ERROR);
}

static class ErrorResponse {
    private Integer code;
    private String message;

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

}

// 工具类 - HttpUtil
package com.imessage.scheduler.util;

import com.alibaba.fastjson.JSONObject;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class HttpUtil {

private static final int CONNECT_TIMEOUT = 5000;
private static final int SOCKET_TIMEOUT = 10000;

public String postJson(String url, String json) throws Exception {
    CloseableHttpClient httpClient = HttpClients.createDefault();
    HttpPost httpPost = new HttpPost(url);

    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(CONNECT_TIMEOUT)
            .setSocketTimeout(SOCKET_TIMEOUT)
            .build();
    httpPost.setConfig(requestConfig);

    StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8);
    entity.setContentType("application/json");
    httpPost.setEntity(entity);

    try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
        return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
    } finally {
        httpClient.close();
    }
}

}

// MyBatis Mapper接口 - TaskRepository
package com.imessage.scheduler.repository;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.Task;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TaskRepository extends BaseMapper {
}

// MyBatis Mapper接口 - NodeRepository
package com.imessage.scheduler.repository;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.Node;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface NodeRepository extends BaseMapper {
}

// MyBatis Mapper接口 - SendRecordRepository
package com.imessage.scheduler.repository;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.SendRecord;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface SendRecordRepository extends BaseMapper {
}
```

总结

本文详细介绍了iMessage群发虚拟机分布式任务调度系统的设计与实现。通过采用分布式架构,系统解决了传统单节点方案存在的并发能力有限、单点故障风险高等问题,实现了任务的自动分发、负载均衡和故障转移。系统使用Spring Boot作为基础框架,Quartz作为任务调度框架,Redis实现分布式锁,RabbitMQ实现任务的异步分发,具有良好的可扩展性和稳定性。
在实际应用中,还可以根据业务需求对系统进行进一步优化。例如,可以增加任务优先级调度机制,确保重要任务能够优先执行;可以增加任务分片功能,将大任务拆分成多个小任务并行执行,提高发送效率;可以增加更完善的监控和告警功能,及时发现和解决系统问题。希望本文能够对正在开发类似系统的开发者有所帮助。

相关文章
|
10天前
|
消息中间件 数据可视化 API
阿里云短信服务怎么接入?从签名、模板、API 到发送回执,一文讲清楚
本片文章将围绕阿里云短信服务的完整接入链路,拆解从资质申请、签名审核、模板配置、运营商报备,到 API 发送和状态回执的关键步骤,帮助产品经理、运营人员、技术负责人和开发者快速理解短信服务接入流程,提前做好上线准备。
176 5
|
29天前
|
Kubernetes Cloud Native 微服务
【微服务与云原生架构】 云原生核心:Docker、K8s架构、核心资源(Pod/Deployment/Service/Ingress)、Pod生命周期、健康检查、滚动更新、自动扩缩容HPA
本文系统梳理微服务与云原生架构的知识体系:以Docker实现环境一致与轻量交付,K8s提供容器编排底座;涵盖Pod、Deployment、Service、Ingress四大核心资源,以及健康检查、滚动更新、HPA自动扩缩容等关键能力,构建高可用、可弹性、可观测的现代分布式应用架构闭环。
|
13天前
|
JSON 前端开发 测试技术
Kimi-k2.6 流式回包乱序后,我这样接入 ​D​М‌X​Α‌РΙ
kimi-k2.6 不止于聊天,其核心价值在于“可执行交付”:统一支持代码生成、长时程任务、Agent协作、文档→技能复用及多格式输出,具备工程级组合能力。它契合企业对“单模型多工位”的刚需——在研发、内容中台等场景中,稳定闭环完成需求拆解、编码、文档整理等多步任务。真正落地需依托DMXAPI网关实现标准化API集成,解决Web路径的不确定性,让模型能力成为可度量、可审计、可持续的生产基础执行层。(239字)
|
12天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
3202 20
|
1月前
|
Ubuntu 算法 关系型数据库
Debian/Ubuntu 环境 PolarDB-X 单机版 DEB 包安装综合指南
本文整合阿里云文档,详解Ubuntu 18.04与Debian 10下PolarDB-X单机版安装:因官方仅提供RPM包,需用alien转DEB,但二者压缩格式不同(Ubuntu用zstd,Debian 10不支持),必须在目标系统本地转换,不可复用。含依赖处理、配置初始化及启动验证全流程。
508 19
|
25天前
|
数据采集 人工智能 自然语言处理
舆情监控:如何让AI自动抓取新闻资讯,并生成每日摘要报告?
本文介绍一套AI驱动的自动化舆情监控方案:用站大爷隧道代理(高可用IP轮换)+ OpenClaw(零代码AI Agent)+ 大模型(智能摘要),7×24小时自动抓取、筛选、生成并推送结构化日报,彻底解决人工扫新闻耗时多、漏报频、易被封等问题。(239字)
363 9
|
11天前
|
存储 缓存 安全
大模型应用:大模型响应缓存技术完全指南:TTL 缓存装饰器的设计与落地.112
本文详解大模型应用中缓存装饰器的实战实现,直击响应慢、成本高两大痛点。从基础缓存出发,逐步升级为支持TTL过期、线程安全、LRU淘汰、异常防护及哈希键优化的生产级方案,显著提升响应速度、降低Token消耗、增强系统稳定性。
138 7
|
8天前
|
SQL 人工智能 自然语言处理
什么是低代码 v2.0 时代?JeecgBoot低代码用 Skills 把"一句话生成系统"做成了现实
一句话先说清楚:低代码 v1.0 阶段,是用"拖拽设计"代替"代码开发";低代码 v2.0 阶段,是用 AI Skills 把"拖拽设计"也省掉, 一句话生成功能。![低代码迈入 v2.0 时代 — Skills 加持一句话搭建系统](https://oscimg.oschina.net/osc
75 5
什么是低代码 v2.0 时代?JeecgBoot低代码用 Skills 把"一句话生成系统"做成了现实
|
1月前
|
存储 设计模式 缓存
为生产级 AI Agent 构建持久化记忆:五阶段流水线与四种设计模式
LLM Agent需持久化记忆以支撑连续对话、用户画像、知识沉淀与崩溃恢复。但满上下文方案成本高、延迟大、易出错。本文提出五阶段流水线(抽取→整合→存储→检索→遗忘)与四种记忆类型(工作/情景/语义/过程记忆),结合结构化状态+向量搜索等设计模式,实现高效、可控、可审计的生产级记忆系统。
554 9
为生产级 AI Agent 构建持久化记忆:五阶段流水线与四种设计模式
|
25天前
|
人工智能 自然语言处理 安全
【新人快速上手使用】小白也能上手的 OpenClaw 2.6.6 安装教程(技术分享)
OpenClaw(小龙虾)是2026年热门开源「数字员工」,支持Windows一键部署(5分钟搞定),本地运行、零代码、全自动办公。无需配置环境,可整理文件、发邮件、浏览器自动化等,隐私安全,小白友好。

热门文章

最新文章