浅析RocketMQ-消息重建
所谓消息重建是指broker接收到消息之后,将消息再分发给comsumequeue和index,进行追加新数据的过程。
本篇将分为三部分进行阐述:
- 重建服务ReputMessageService
- 追加消费队列文件CommitLogDispatcherBuildConsumeQueue
- 追加索引文件CommitLogDispatcherBuildIndex
一. ReputMessageService
ReputMessageService 实现Runnable接口,在构造DefualtMessageStore对象时,会初始化并启动。
run方法内部会每隔1ms调用一次doReput方法
class ReputMessageService extends ServiceThread {
// 起始重建偏移量
private volatile long reputFromOffset = 0;
private void doReput() {
// getMinOffset这个方法获取commitlog最小的可用文件起始偏移量
if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
}
// isCommitLogAvailable的判断标准是reputFromOffset 小于 commitlog最大文件的已提交偏移量
for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
// 是否可重复发送,默认false
if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
break;
}
// 获取已提交的所有数据
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
if (result != null) {
try {
// 获取最新的偏移量
this.reputFromOffset = result.getStartOffset();
for (int readSize = 0; readSize < result.getSize() && doNext; ) {
// checkMessageAndReturnSize这个方法主要从SelectMappedBufferResult中读取一条消息,并封装成一个DispatchRequest
DispatchRequest dispatchRequest =
DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
// 如果有额外操作对消息进行包装,消息大小就不是消息的大小了,默认都是 dispatchRequest.getMsgSize()
int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
// 读取消息成功都是success
if (dispatchRequest.isSuccess()) {
if (size > 0) {
// 将消息分发给consumequeue和index进行追加数据
DefaultMessageStore.this.doDispatch(dispatchRequest);
// 如果是master broker则通知其他监听服务有消息到达了
if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()
&& DefaultMessageStore.this.messageArrivingListener != null) {
DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
notifyMessageArrive4MultiQueue(dispatchRequest);
}
this.reputFromOffset += size;
readSize += size;
... // 省略数据统计的操作
} else if (size == 0) {
this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
readSize = result.getSize();
}
} else if (!dispatchRequest.isSuccess()) {}
}
} finally {
// 这里操作来源自getData操作,解除引用,可以看做解锁操作
result.release();
}
} else {
// 没有可重建的数据退出循环
doNext = false;
}
}
}
}
1.getData
getData方法分成两步走:
- 根据offset查找对应mappedFile
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
// 查找对应文件
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
// 计算在MappedFile中的偏移量
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
- mappedFile 找对对应的数据,并生成一个共享内存缓存区
public SelectMappedBufferResult selectMappedBuffer(int pos) {
// 已提交指针
int readPosition = getReadPosition();
if (pos < readPosition && pos >= 0) {
// 对应上文的release操作
if (this.hold()) {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
byteBuffer.position(pos);
int size = readPosition - pos;
ByteBuffer byteBufferNew = byteBuffer.slice();
byteBufferNew.limit(size);
return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
}
}
return null;
}
2.doDispatch
doDispatch 这里会将封装的数据转给对应的处理器,dispatcherList 在初始化时,放入了CommitLogDispatcherBuildConsumeQueue和CommitLogDispatcherBuildIndex两种CommitLogDispatcher 处理器
public void doDispatch(DispatchRequest req) {
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
二. CommitLogDispatcherBuildConsumeQueue
CommitLogDispatcherBuildConsumeQueue 主要做一个中转,实际由putMessagePositionInfo执行
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
break;
}
}
}
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
// 查找要追加数据的队列
ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
// 将数据追加到队列中
cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));
}
1. 查找队列
查找的过程与查找实际文件类似,先寻找topic的文件夹,再查找该目录下queue文件夹。不过这个过程中并没有建立实际的文件路径
public ConsumeQueue findConsumeQueue(String topic, int queueId) {
// 根据topic找到对应ConsumeQueue
ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
// 首次构建进行初始化
if (null == map) {
ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;
} else {
map = newMap;
}
}
// 查找对应队列
ConsumeQueue logic = map.get(queueId);
// 不存在则新建一个队列
if (null == logic) {
ConsumeQueue newLogic = new ConsumeQueue(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
this);
ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
if (oldLogic != null) {
logic = oldLogic;
} else {
if (MixAll.isLmq(topic)) {
lmqConsumeQueueNum.getAndIncrement();
}
logic = newLogic;
}
}
return logic;
}
2. 追加数据
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) {
// 最大重试次数
final int maxRetries = 30;
// 是否可以写入
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
// 是否有额外的扩展信息,默认为false
if (isExtWriteEnable()) {...}
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
if (result) {
...
return;
} else {
// 追加失败间隔一秒重试
try {
Thread.sleep(1000);
} catch (InterruptedException e) { }
}
}
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
// 可能存在重复构建消费队列消息
if (offset + size <= this.maxPhysicOffset) {
return true;
}
// 将消息条目存入buffer中,CQ_STORE_UNIT_SIZE =20,每个写入的条目大小都是20字节
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
// 期待的添加位置的偏移量
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
// 查找期待偏移值对应的文件,这里如果没找到对应文件会创建一个实际的文件
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
if (mappedFile != null) {
// 首次创建的文件
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);
}
// 这里只有 currentLogicOffset == expectLogicOffset 才是正常情况
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
return true;
}
if (expectLogicOffset != currentLogicOffset) {}
}
this.maxPhysicOffset = offset + size;
// 这里将消息条目追到到mappedByteBuffer中
return mappedFile.appendMessage(this.byteBufferIndex.array());
}
return false;
}
putMessagePositionInfo 操作只是将数据追到到mappedByteBuffer中,那什么时候存入硬盘呢
答案是FlushConsumeQueueService
3.FlushConsumeQueueService
类似于commitlog的刷盘操作,comsumequeue也有一个刷盘服务FlushConsumeQueueService。
FlushConsumeQueueService默认情况是1s执行一次
class FlushConsumeQueueService extends ServiceThread {
private static final int RETRY_TIMES_OVER = 3;
private long lastFlushTimestamp = 0;
private void doFlush(int retryTimes) {
// 最少刷盘页面,默认2页
int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();
// 服务退出时,会强制刷盘一次
if (retryTimes == RETRY_TIMES_OVER) {
flushConsumeQueueLeastPages = 0;
}
long logicsMsgTimestamp = 0;
// 最大刷盘间隔,默认60s
int flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();
long currentTimeMillis = System.currentTimeMillis();
// 长久未刷盘,强制刷一次
if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushConsumeQueueLeastPages = 0;
logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();
}
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍历所有主题下的队列,持久化数据
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
// 具体的flush操作与commitlog的flush操作一致,前文说过,此处不赘述
result = cq.flush(flushConsumeQueueLeastPages);
}
}
}
if (0 == flushConsumeQueueLeastPages) {
if (logicsMsgTimestamp > 0) {
DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
}
DefaultMessageStore.this.getStoreCheckpoint().flush();
}
}
}
三. CommitLogDispatcherBuildIndex
CommitLogDispatcherBuildIndex 用于中转构建Index操作,buildIndex逻辑大体如下:
- 查找Index文件,如果不存在则新建一个
- 根据唯一key,追加条目数据
- 根据指定的key,追加条目数据
class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
public void dispatch(DispatchRequest request) {
// 是否开启构建Index,默认true
if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
DefaultMessageStore.this.indexService.buildIndex(request);
}
}
}
public void buildIndex(DispatchRequest req) {
// 获取索引文件,不存在则新建一个
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
// 如果当前数据偏移量小于 index中的偏移量,可能是重复构建了
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
// 构建唯一key的索引
if (req.getUniqKey() != null) {
// buildKey 是 topic#key
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
return;
}
}
// 构建多个普通key的索引
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
return;
}
}
}
}
} else {}
}
1. retryGetAndCreateIndexFile
retryGetAndCreateIndexFile 会有3次机会进行尝试,首先会在IndexList中获取index文件
public IndexFile retryGetAndCreateIndexFile() {
IndexFile indexFile = null;
// 获取Index失败,每隔1s重试获取一次,最多重试3次
for (int times = 0; null == indexFile && times < MAX_TRY_IDX_CREATE; times++) {
indexFile = this.getAndCreateLastIndexFile();
if (null != indexFile)
break;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {}
}
if (null == indexFile) {
this.defaultMessageStore.getAccessRights().makeIndexFileError();
}
return indexFile;
}
public IndexFile getAndCreateLastIndexFile() {
IndexFile indexFile = null;
IndexFile prevIndexFile = null;
long lastUpdateEndPhyOffset = 0;
long lastUpdateIndexTimestamp = 0;
{
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
// 存在Index文件,则获取末尾文件
IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
if (!tmp.isWriteFull()) {
indexFile = tmp;
} else {
lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
lastUpdateIndexTimestamp = tmp.getEndTimestamp();
prevIndexFile = tmp;
}
}
this.readWriteLock.readLock().unlock();
}
// index不存在则新建一个
if (indexFile == null) {
try {
String fileName =
this.storePath + File.separator
+ UtilAll.timeMillisToHumanString(System.currentTimeMillis());
// IndexFile这里包含了在硬盘中创建实际文件的过程
indexFile =
new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
lastUpdateIndexTimestamp);
this.readWriteLock.writeLock().lock();
this.indexFileList.add(indexFile);
} catch (Exception e) { } finally {
this.readWriteLock.writeLock().unlock();
}
// 这里针对前一个文件,创建了flush服务
if (indexFile != null) {
final IndexFile flushThisFile = prevIndexFile;
Thread flushThread = new Thread(new Runnable() {
@Override
public void run() {
IndexService.this.flush(flushThisFile);
}
}, "FlushIndexFileThread");
flushThread.setDaemon(true);
flushThread.start();
}
}
return indexFile;
}
2. putKey
putKey就是不断重试调用IndexFile的putkey方法
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
index文件的数据由header,hash槽,index条目数组成,其中index是不断递增的,前两者都是进行更新数据,位置都固定。其中hash槽记录的值是上一个的index数值
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
// 获取hash值
int keyHash = indexKeyHashMethod(key);
// 计算key在hash槽的位置
int slotPos = keyHash % this.hashSlotNum;
// 计算hash槽的偏移量
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// 获取之前的index位置
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// 如果不存在则设置为0
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 计算index条款的位置,只有this.indexHeader.getIndexCount() 是变动的,其他参数数值固定
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// key的hash值
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
// commitlog中的偏移量
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
// 上一个槽位的index值
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
// 记录已使用槽位数
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) { } finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {}
}
}
} else { }
return false;
}
3. flush
flush 首先更新IndexHeader的数据,这里要注意indexHeader对应的内存空间也是mappedByteBuffer 的一部分,force执行时,也会刷盘IndexHeader里面的数据。
public void flush() {
long beginTime = System.currentTimeMillis();
if (this.mappedFile.hold()) {
this.indexHeader.updateByteBuffer();
this.mappedByteBuffer.force();
this.mappedFile.release();
}
}
构造IndexFile对象有如下一个操作
public IndexFile(final String fileName, final int hashSlotNum, final int indexNum,
final long endPhyOffset, final long endTimestamp) throws IOException {
...
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
...
}