生成mapper映射类,注意插入加入了乐观锁,注意这个sql
```package com.laoyang.id.dao.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import com.laoyang.id.dao.po.IdGeneratePO;
import java.util.List;
/**
- @Author idea
- @Date: Created in 19:47 2023/5/25
@Description
*/
@Mapper
public interface IdGenerateMapper extends BaseMapper {@Update("update t_id_gengrate_config set next_threshold=next_threshold+step," +
"current_start=current_start+step,version=version+1 where id =#{id} and version=#{version}")
int updateNewIdCountAndVersion(@Param("id")int id,@Param("version")int version);
@Select("select * from t_id_gengrate_config")
List selectAll();
}
```在service下创建bo类生成有序id和无序id对象
```package com.laoyang.id.service.bo;
import java.util.concurrent.atomic.AtomicLong;
/**
- @Author idea
- @Date: Created in 20:00 2023/5/25
@Description 有序id的BO对象
*/
public class LocalSeqIdBO {private int id;
/**在内存中记录的当前有序id的值
*/
private AtomicLong currentNum;/**
- 当前id段的开始值
/
private Long currentStart;
/* 当前id段的结束值
*/
private Long nextThreshold;public int getId() {
return id;
}public void setId(int id) {
this.id = id;
}public AtomicLong getCurrentNum() {
return currentNum;
}public void setCurrentNum(AtomicLong currentNum) {
this.currentNum = currentNum;
}public Long getCurrentStart() {
return currentStart;
}public void setCurrentStart(Long currentStart) {
this.currentStart = currentStart;
}public Long getNextThreshold() {
return nextThreshold;
}public void setNextThreshold(Long nextThreshold) {
this.nextThreshold = nextThreshold;
}
}
import java.util.concurrent.ConcurrentLinkedQueue;
/**
- @Author idea
- @Date: Created in 20:32 2023/5/26
@Description 无序id的BO对象
*/
public class LocalUnSeqIdBO {private int id;
/**- 提前将无序的id存放在这条队列中
/
private ConcurrentLinkedQueue idQueue;
/* - 当前id段的开始值
/
private Long currentStart;
/* 当前id段的结束值
*/
private Long nextThreshold;public int getId() {
return id;
}public void setId(int id) {
this.id = id;
}public ConcurrentLinkedQueue getIdQueue() {
return idQueue;
}public void setIdQueue(ConcurrentLinkedQueue idQueue) {
this.idQueue = idQueue;
}public Long getCurrentStart() {
return currentStart;
}public void setCurrentStart(Long currentStart) {
this.currentStart = currentStart;
}public Long getNextThreshold() {
return nextThreshold;
}public void setNextThreshold(Long nextThreshold) {
this.nextThreshold = nextThreshold;
}
}
```生成service类生成有序id与无序id
```package com.laoyang.id.service;
- 提前将无序的id存放在这条队列中
/**
- @Author idea
- @Date: Created in 19:58 2023/5/25
@Description
*/
public interface IdGenerateService {/**
- 获取有序id
* - @param id
@return
*/
Long getSeqId(Integer id);/**
- 获取无序id
* - @param id
- @return
*/
Long getUnSeqId(Integer id);
}
```实现有序id和无序id方法(这里是关键,主要用到了原子类,一些同步类操作等等,线程池)
```package com.laoyang.id.service.impl;
- 获取有序id
import jakarta.annotation.Resource;
import com.laoyang.id.dao.mapper.IdGenerateMapper;
import com.laoyang.id.dao.po.IdGeneratePO;
import com.laoyang.id.service.IdGenerateService;
import com.laoyang.id.service.bo.LocalSeqIdBO;
import com.laoyang.id.service.bo.LocalUnSeqIdBO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
- @Author idea
- @Date: Created in 19:58 2023/5/25
@Description
*/
@Service
public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {@Resource
private IdGenerateMapper idGenerateMapper;private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateServiceImpl.class);
private static Map localSeqIdBOMap = new ConcurrentHashMap<>();
private static Map localUnSeqIdBOMap = new ConcurrentHashMap<>();
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("id-generate-thread-" + ThreadLocalRandom.current().nextInt(1000)); return thread; } });
private static final float UPDATE_RATE = 0.50f;
private static final int SEQ_ID = 1;
private static Map semaphoreMap = new ConcurrentHashMap<>();@Override
public Long getUnSeqId(Integer id) {if (id == null) { LOGGER.error("[getSeqId] id is error,id is {}", id); return null; } LocalUnSeqIdBO localUnSeqIdBO = localUnSeqIdBOMap.get(id); if (localUnSeqIdBO == null) { LOGGER.error("[getUnSeqId] localUnSeqIdBO is null,id is {}", id); return null; } Long returnId = localUnSeqIdBO.getIdQueue().poll(); if (returnId == null) { LOGGER.error("[getUnSeqId] returnId is null,id is {}", id); return null; } this.refreshLocalUnSeqId(localUnSeqIdBO); return returnId;
}
/*
- @param id 传的是对应的业务id
@return
*/
@Override
public Long getSeqId(Integer id) {
if (id == null) {LOGGER.error("[getSeqId] id is error,id is {}", id); return null;
}
LocalSeqIdBO localSeqIdBO = localSeqIdBOMap.get(id);
if (localSeqIdBO == null) {LOGGER.error("[getSeqId] localSeqIdBO is null,id is {}", id); return null;
}
this.refreshLocalSeqId(localSeqIdBO);
long returnId = localSeqIdBO.getCurrentNum().incrementAndGet();
if (returnId > localSeqIdBO.getNextThreshold()) {//同步去刷新 可能是高并发下还未更新本地数据 LOGGER.error("[getSeqId] id is over limit,id is {}", id); return null;
}
return returnId;
}/**
- 刷新本地有序id段
* @param localSeqIdBO
/
private void refreshLocalSeqId(LocalSeqIdBO localSeqIdBO) {
long step = localSeqIdBO.getNextThreshold() - localSeqIdBO.getCurrentStart();
if (localSeqIdBO.getCurrentNum().get() - localSeqIdBO.getCurrentStart() > step UPDATE_RATE) {Semaphore semaphore = semaphoreMap.get(localSeqIdBO.getId()); if (semaphore == null) { LOGGER.error("semaphore is null,id is {}", localSeqIdBO.getId()); return; } boolean acquireStatus = semaphore.tryAcquire(); if (acquireStatus) { LOGGER.info("开始尝试进行本地id段的同步操作"); //异步进行同步id段操作 threadPoolExecutor.execute(new Runnable() { @Override public void run() { try { IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localSeqIdBO.getId()); tryUpdateMySQLRecord(idGeneratePO); } catch (Exception e) { LOGGER.error("[refreshLocalSeqId] error is ", e); } finally { semaphoreMap.get(localSeqIdBO.getId()).release(); LOGGER.info("本地有序id段同步完成,id is {}", localSeqIdBO.getId()); } } }); }
}
}/**
- 刷新本地无序id段
* @param localUnSeqIdBO
/
private void refreshLocalUnSeqId(LocalUnSeqIdBO localUnSeqIdBO) {
long begin = localUnSeqIdBO.getCurrentStart();
long end = localUnSeqIdBO.getNextThreshold();
long remainSize = localUnSeqIdBO.getIdQueue().size();
//如果使用剩余空间不足25%,则进行刷新
if ((end - begin) 0.35 > remainSize) {LOGGER.info("本地无序id段同步开始,id is {}", localUnSeqIdBO.getId()); Semaphore semaphore = semaphoreMap.get(localUnSeqIdBO.getId()); if (semaphore == null) { LOGGER.error("semaphore is null,id is {}", localUnSeqIdBO.getId()); return; } boolean acquireStatus = semaphore.tryAcquire(); if (acquireStatus) { threadPoolExecutor.execute(new Runnable() { @Override public void run() { try { IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localUnSeqIdBO.getId()); tryUpdateMySQLRecord(idGeneratePO); } catch (Exception e) { LOGGER.error("[refreshLocalUnSeqId] error is ", e); } finally { semaphoreMap.get(localUnSeqIdBO.getId()).release(); LOGGER.info("本地无序id段同步完成,id is {}", localUnSeqIdBO.getId()); } } }); }
}
}//bean初始化的时候会回调到这里
@Override
public void afterPropertiesSet() throws Exception {
List idGeneratePOList = idGenerateMapper.selectAll();
for (IdGeneratePO idGeneratePO : idGeneratePOList) {LOGGER.info("服务刚启动,抢占新的id段"); tryUpdateMySQLRecord(idGeneratePO); semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
}
}/**
- 更新mysql里面的分布式id的配置信息,占用相应的id段
- 同步执行,很多的网络IO,性能较慢
* @param idGeneratePO
*/
private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
int updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
if (updateResult > 0) {localIdBOHandler(idGeneratePO); return;
}
//重试进行更新
for (int i = 0; i < 3; i++) {idGeneratePO = idGenerateMapper.selectById(idGeneratePO.getId()); updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion()); if (updateResult > 0) { localIdBOHandler(idGeneratePO); return; }
}
throw new RuntimeException("表id段占用失败,竞争过于激烈,id is " + idGeneratePO.getId());
}/**
- 专门处理如何将本地ID对象放入到Map中,并且进行初始化的
* - @param idGeneratePO
*/
private void localIdBOHandler(IdGeneratePO idGeneratePO) {
long currentStart = idGeneratePO.getCurrentStart();
long nextThreshold = idGeneratePO.getNextThreshold();
long currentNum = currentStart;
if (idGeneratePO.getIsSeq() == SEQ_ID) {
} else {LocalSeqIdBO localSeqIdBO = new LocalSeqIdBO(); AtomicLong atomicLong = new AtomicLong(currentNum); localSeqIdBO.setId(idGeneratePO.getId()); localSeqIdBO.setCurrentNum(atomicLong); localSeqIdBO.setCurrentStart(currentStart); localSeqIdBO.setNextThreshold(nextThreshold); localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
}LocalUnSeqIdBO localUnSeqIdBO = new LocalUnSeqIdBO(); localUnSeqIdBO.setCurrentStart(currentStart); localUnSeqIdBO.setNextThreshold(nextThreshold); localUnSeqIdBO.setId(idGeneratePO.getId()); long begin = localUnSeqIdBO.getCurrentStart(); long end = localUnSeqIdBO.getNextThreshold(); List<Long> idList = new ArrayList<>(); for (long i = begin; i < end; i++) { idList.add(i); } //将本地id段提前打乱,然后放入到队列中 Collections.shuffle(idList); ConcurrentLinkedQueue<Long> idQueue = new ConcurrentLinkedQueue<>(); idQueue.addAll(idList); localUnSeqIdBO.setIdQueue(idQueue); localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);
}
}
```最后创建启动类
```package com.laoyang.id;
import jakarta.annotation.Resource;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import com.laoyang.id.service.IdGenerateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import java.util.HashSet;
/**
- @Author idea
- @Date: Created in 19:45 2023/5/25
@Description
*/
@SpringBootApplication
public class IdGenerateApplication implements CommandLineRunner {private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateApplication.class);
@Resource
private IdGenerateService idGenerateService;public static void main(String[] args) {
SpringApplication springApplication = new SpringApplication(IdGenerateApplication.class); springApplication.setWebApplicationType(WebApplicationType.NONE); springApplication.run(args);
}
@Override
public void run(String... args) throws Exception {HashSet<Long> idSet = new HashSet<>(); for (int i = 0; i < 1500; i++) { Long id = idGenerateService.getSeqId(1); System.out.println(id); idSet.add(id); } System.out.println(idSet.size());
}
}
```最终会在控制台打印输出!