美团Leaf分布式ID源码启动部署
1 概述
美团点评(Leaf)分布式主键支持数据库号段模式(segment mode)和雪花算法模式(Snowflake mode),可以根据不同业务场景灵活切换。
2 下载源码
Meituan-Dianping master分支
3 修改源码
3.1 项目结构
3.2 修改pom
mysql数据库版本支持8.X版本,需要升级leaf-server和leaf-core模块的mysql-connector以及druid的版本,升级版本号如下:
<mysql-connector-java.version>8.0.13</mysql-connector-java.version>
<druid.version>1.1.10</druid.version>
4 号段模式(Segment mode)
4.1 修改leaf-server配置文件leaf.properties
# Segment mode
leaf.name=com.sankuai.leaf.opensource.test
leaf.segment.enable=true
leaf.jdbc.url=jdbc:mysql://localhost:3306/meituan-leaf?autoReconnect=true&useUnicode=true&characterEncoding=utf-8&&zeroDateTimeBehavior=CONVERT_TO_NULL&&serverTimezone=GMT%2B8
leaf.jdbc.username=root
leaf.jdbc.password=123456
leaf.snowflake.enable=false
4.2 新建MySQL数据库和表
数据库:meituan-leaf
表:leaf_alloc
DROP TABLE IF EXISTS `leaf_alloc`;
CREATE TABLE `leaf_alloc` (
`biz_tag` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL DEFAULT '' COMMENT '不同业务,用biz_tag字段来隔离,如果需要扩容时,对biz_tag分库分表即可',
`max_id` bigint NOT NULL DEFAULT '1' COMMENT '当前业务号段的最大值,用于计算下一个号段',
`step` int NOT NULL COMMENT '步长,每次获取新ID的数量',
`description` varchar(256) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci DEFAULT NULL COMMENT '对于业务的描述',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`biz_tag`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
insert into leaf_alloc(biz_tag, max_id, step, description) values('biz-01-segment', 1, 10, '某某业务号段模式获取ID')
4.3 启动项目
1) 访问 http://127.0.0.1:8080/cache 业务号段已经分配ID信息监控
2) 访问 http://127.0.0.1:8080/db 数据库信息监控
3)访问 http://127.0.0.1:8080/api/segment/get/biz-01-segment 获取分布式ID
4 雪花模式(Snowflake mode)
4.1 修改leaf-server配置文件leaf.properties
# Snowflake mode
leaf.name=com.sankuai.leaf.opensource.test
leaf.segment.enable=false
leaf.snowflake.enable=true
# zk地址
leaf.snowflake.zk.address=127.0.0.1
# zk端口
leaf.snowflake.port=2181
4.2 启动zookeeper-3.4.11
.4.3 启动项目
1) 访问 http://127.0.0.1:8080/api/snowflake/get/biz-01-segment 获取分布式ID
5 客户端服务获取分布式ID
5.1 远程FeignClient接口
@FeignClient(value = "meituanleafid", fallback = MeiTuanLeafIdApiFallback.class)
public interface MeiTuanLeafIdApi {
// 雪花模式
@PostMapping("/snowflake/mode/get")
Result<List<Long>> getSnowflakeModeIds(BizIdDto dto);
// 号段模式
@PostMapping("/segment/mode/get")
Result<SegmentDto> getSegmentModeIds(BizIdDto dto);
}
@Data
public class BizIdDto {
@NotBlank(message = "业务id不能为空")
private String bizId;
@NotNull(message = "数量不能为空")
@Min(value = 1, message = "数量必须是正整数!")
@Max(value = 20000, message = "数量不能超过2万!")
private Integer size;
}
@Data
@Builder
public class SegmentDto {
// 当前号段的分布式id值
private AtomicLong value;
// 分布式id最大值
private volatile long max;
}
5.2 定时任务
public class DistributedIdService {
private final ConcurrentLinkedQueue<Long> cacheIds = new ConcurrentLinkedQueue();
private final AtomicBoolean isSnowflakeModePulling= new AtomicBoolean(false);
private final ConcurrentLinkedQueue<SegmentDto> cacheSegment = new ConcurrentLinkedQueue();
private volatile SegmentDto segmentDto = null;
private final Semaphore segmentModeSemaphore= new Semaphore(1);
private final ExecutorService service=new ThreadPoolExecutor(4, 4, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue(10));
// 本地缓存最大分布式ID数量
private int cacheMaxSize;
// 因子
private float loadFactor;
// 每段的步长
private volatile int pullSegmentStepSize = 1000;
public DistributedIdService () {
}
@PostConstruct
private void init() {
this.cacheMaxSize= 10000;
this.loadFactor= 0.7F;
this.checkPullSnowflakeIds();
this.checkPullSegment();
}
// 开启异步线程检查是本地分布式ID缓存是否快消耗完毕,从服务器拉取新的分布式ID集合,本地缓存
private void checkPullSnowflakeIds() {
// 本地缓存的数量 < 阀值,请求新的分布式ID
if ((float)this.cacheIds.size() < (float)this.cacheMaxSize * this.loadFactor && this.isSnowflakeModePulling.compareAndSet(false, true)) {
this.service.execute(new Runnable() {
public void run() {
this.pullSnowflakeIds();
}
});
}
}
// 开启异步线程检查本地业务号段缓存是否快消耗完毕,从服务器拉取新的号段集合,本地缓存
private void checkPullSegment() {
// 本地缓存的号段*号段步长 < 阀值,请求新的号段
if ((float)(this.cacheSegment.size() * this.pullSegmentStepSize) < (float)this.cacheMaxSize * this.loadFactor && this.segmentModeSemaphore.tryAcquire()) {
this.service.execute(() -> {
try {
this.pullSegments();
} finally {
segmentModeSemaphore.release();
}
});
}
}
// 获取最新的雪花分布式ID集合进行缓存
private void pullSnowflakeIds() {
if (this.cacheIds.size() > this.cacheMaxSize) {
this.isSnowflakeModePulling.compareAndSet(true, false);
} else {
try {
// 请求新的分布式ID,本地缓存
List<Long> ids = this.requestPullSnowflakeIds(this.pullOrderedSize);
this.cacheIds.addAll(ids);
} finally {
this.isSnowflakeModePulling.compareAndSet(true, false);
}
}
}
// 获取最新的号段进行缓存
private void pullSegments() {
if (this.cacheSegment.size() * this.pullSegmentStepSize > this.cacheMaxSize) {
log.info("号段拉数据段被取消");
} else {
// 请求新的号段,本地缓存
SegmentDto segmentDto = this.requestPullSegment(this.pullOrderedSize);
this.cacheSegment.add(segmentDto);
}
}
// 请求远程服务器获取最新的雪花分布式ID集合
private List<Long> requestPullSnowflakeIds(int size) {
BizIdDto bizIdDto = new BizIdDto();
bizIdDto.setServiceId(""biz-01-segment);
bizIdDto.setSize(1000);
Result<List<Long>> result = this.meiTuanLeafIdApi.getSnowflakeModeIds(bizIdDto);
if (ResultHelp.success(result)) {
return (List)result.getData();
} else {
throw new CusException("从服务端获取雪花分布式ID失败");
}
}
// 请求远程服务器获取最新的号段
private SegmentDto requestPullSegment(int size) {
BizIdDto bizIdDto = new BizIdDto();
bizIdDto.setServiceId(""biz-01-segment);
bizIdDto.setSize(1000);
Result<SegmentDto> result = this.meiTuanLeafIdApi.getSegmentModeIds(bizIdDto);
if (ResultHelp.success(result)) {
return (SegmentDto)result.getData();
} else {
throw new CusException("从服务端获取号段失败");
}
}
}