博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
再说rocketmq消息存储
阅读量:7052 次
发布时间:2019-06-28

本文共 38672 字,大约阅读时间需要 128 分钟。

两篇精彩的文章:

《》

《》

rocketmq通过netty获取到消息请求后,直接掉处理模块,比如:SendMessageProcessor

这个处理类主要负责处理客户端发送消息的请求

这个类实现了com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor接口。这个接口下一共两个方法:

RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception; boolean rejectRequest();

加粗的方法是我们接下来要讲的方法。

它同时还继承了com.alibaba.rocketmq.broker.processor.AbstractSendMessageProcessor。

我们先看一下这个过程的调用链:

这里有一篇博客写的特别好:http://www.cnblogs.com/sunshine-2015/p/6291116.html

再次基础之上补充一点点东西。

首先进入SendMessageProcessor类的第一个方法是:

这个类什么时候被调用的呢?肯定是netty通信模块接收到消息后调用了,具体实在这里:

就是箭头所指的地方调用的上面那个方法。

我们看一下完整的从netty通信模块接收消息到这里的整个过程:

现在就到了我们非常熟悉的Handler了。

 

这里从下往上看会更加的直观。

好了,我们接着往消息存储走。

我们进入这个方法:

private RemotingCommand sendMessage(final ChannelHandlerContext ctx, //                                        final RemotingCommand request, //                                        final SendMessageContext sendMessageContext, //                                        final SendMessageRequestHeader requestHeader) throws RemotingCommandException {        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();        response.setOpaque(request.getOpaque());        response.addExtField(MessageConst.PROPERTY_MSG_REGION,this.brokerController.getBrokerConfig().getRegionId());        if (log.isDebugEnabled()) {            log.debug("receive SendMessage request command, " + request);        }        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();        if (this.brokerController.getMessageStore().now() < startTimstamp) {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));            return response;        }        response.setCode(-1);        super.msgCheck(ctx, requestHeader, response);        if (response.getCode() != -1) {            return response;        }        final byte[] body = request.getBody();        int queueIdInt = requestHeader.getQueueId();        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());        if (queueIdInt < 0) {            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();        }        int sysFlag = requestHeader.getSysFlag();        if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {            sysFlag |= MessageSysFlag.MultiTagsFlag;        }        String newTopic = requestHeader.getTopic();        if ((null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX))) {            String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length());            SubscriptionGroupConfig subscriptionGroupConfig =                    this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);            if (null == subscriptionGroupConfig) {                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);                response.setRemark(                        "subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));                return response;            }            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();            }            int reconsumeTimes = requestHeader.getReconsumeTimes();            if (reconsumeTimes >= maxReconsumeTimes) {                newTopic = MixAll.getDLQTopic(groupName);                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, //                        DLQ_NUMS_PER_GROUP, //                        PermName.PERM_WRITE, 0                );                if (null == topicConfig) {                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("topic[" + newTopic + "] not exist");                    return response;                }            }        }        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();        msgInner.setTopic(newTopic);        msgInner.setBody(body);        msgInner.setFlag(requestHeader.getFlag());        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));        msgInner.setPropertiesString(requestHeader.getProperties());        msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(topicConfig.getTopicFilterType(), msgInner.getTags()));        msgInner.setQueueId(queueIdInt);        msgInner.setSysFlag(sysFlag);        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());        msgInner.setBornHost(ctx.channel().remoteAddress());        msgInner.setStoreHost(this.getStoreHost());        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());        if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {            String traFlag = msgInner.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);            if (traFlag != null) {                response.setCode(ResponseCode.NO_PERMISSION);                response.setRemark(                        "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending transaction message is forbidden");                return response;            }        }     // 前面是一些系统检查和数据准备,下面进入消息存储环节。        PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);             // 锚点putMessage      if (putMessageResult != null) {            boolean sendOK = false;            switch (putMessageResult.getPutMessageStatus()) {                // Success                case PUT_OK:                    sendOK = true;                    response.setCode(ResponseCode.SUCCESS);                    break;                case FLUSH_DISK_TIMEOUT:                    response.setCode(ResponseCode.FLUSH_DISK_TIMEOUT);                    sendOK = true;                    break;                case FLUSH_SLAVE_TIMEOUT:                    response.setCode(ResponseCode.FLUSH_SLAVE_TIMEOUT);                    sendOK = true;                    break;                case SLAVE_NOT_AVAILABLE:                    response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);                    sendOK = true;                    break;                // Failed                case CREATE_MAPEDFILE_FAILED:                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("create maped file failed, please make sure OS and JDK both 64bit.");                    break;                case MESSAGE_ILLEGAL:                case PROPERTIES_SIZE_EXCEEDED:                    response.setCode(ResponseCode.MESSAGE_ILLEGAL);                    response.setRemark(                            "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k, msg properties length limit 32k.");                    break;                case SERVICE_NOT_AVAILABLE:                    response.setCode(ResponseCode.SERVICE_NOT_AVAILABLE);                    response.setRemark(                            "service not available now, maybe disk full, " + diskUtil() + ", maybe your broker machine memory too small.");                    break;                case OS_PAGECACHE_BUSY:                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("[PC_SYNCHRONIZED]broker busy, start flow control for a while");                    break;                case UNKNOWN_ERROR:                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("UNKNOWN_ERROR");                    break;                default:                    response.setCode(ResponseCode.SYSTEM_ERROR);                    response.setRemark("UNKNOWN_ERROR DEFAULT");                    break;            }            String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);            if (sendOK) {                this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());                this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),                        putMessageResult.getAppendMessageResult().getWroteBytes());                this.brokerController.getBrokerStatsManager().incBrokerPutNums();                response.setRemark(null);                responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());                responseHeader.setQueueId(queueIdInt);                responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());                doResponse(ctx, request, response);                if (hasSendMessageHook()) {                    sendMessageContext.setMsgId(responseHeader.getMsgId());                    sendMessageContext.setQueueId(responseHeader.getQueueId());                    sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());                    int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);                    sendMessageContext.setCommercialSendTimes(incValue);                    sendMessageContext.setCommercialSendSize(wroteSize);                    sendMessageContext.setCommercialOwner(owner);                }                return null;            } else {                if (hasSendMessageHook()) {                    int wroteSize = request.getBody().length;                    int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);                    sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);                    sendMessageContext.setCommercialSendTimes(incValue);                    sendMessageContext.setCommercialSendSize(wroteSize);                    sendMessageContext.setCommercialOwner(owner);                }            }        } else {            response.setCode(ResponseCode.SYSTEM_ERROR);            response.setRemark("store putMessage return null");        }        return response;    }

我们进入PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);

的MessageStore.putMessage(msgInner)方法。

MesageStore这个接口一共有三个实现类:

实现了上面那个方法的只有DefaultMessageStore实现类。所以我们看DefaultMessageStore.putMessage(MessageExtBrokerInner msg)方法。

public PutMessageResult putMessage(MessageExtBrokerInner msg) {        if (this.shutdown) {            log.warn("message store has shutdown, so putMessage is forbidden");            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is slave mode, so putMessage is forbidden ");            }            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        }        if (!this.runningFlags.isWriteable()) {            long value = this.printTimes.getAndIncrement();            if ((value % 50000) == 0) {                log.warn("message store is not writeable, so putMessage is forbidden " + this.runningFlags.getFlagBits());            }            return new PutMessageResult(PutMessageStatus.SERVICE_NOT_AVAILABLE, null);        } else {            this.printTimes.set(0);        }        // message topic长度校验        if (msg.getTopic().length() > Byte.MAX_VALUE) {            log.warn("putMessage message topic length too long " + msg.getTopic().length());            return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);        }        // message properties长度校验        if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) {            log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());            return new PutMessageResult(PutMessageStatus.PROPERTIES_SIZE_EXCEEDED, null);        }        if (this.isOSPageCacheBusy()) {            return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);        }        long beginTime = this.getSystemClock().now();      //上面是一些系统检查,下面是消息的存储        PutMessageResult result = this.commitLog.putMessage(msg);        // 性能数据统计        long eclipseTime = this.getSystemClock().now() - beginTime;        if (eclipseTime > 1000) {            log.warn("putMessage not in lock eclipse time(ms)={}, bodyLength={}", eclipseTime, msg.getBody().length);        }        this.storeStatsService.setPutMessageEntireTimeMax(eclipseTime);        if (null == result || !result.isOk()) {            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();        }        return result;    }
其中commitLog在构造方法中的初始化:this.commitLog = new CommitLog(this);

 我们接着进入commitLog.putMessage(final MessageExtBrokerInner msg)

 这个方法搞定了,你也就彻底了解了rocketmq的消息存储过程了。

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {        // Set the storage time        msg.setStoreTimestamp(System.currentTimeMillis());        // Set the message body BODY CRC (consider the most appropriate setting        // on the client)        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));        // Back to Results        AppendMessageResult result = null;        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();        String topic = msg.getTopic();        int queueId = msg.getQueueId();        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());        if (tranType == MessageSysFlag.TransactionNotType//                || tranType == MessageSysFlag.TransactionCommitType) {            // Delay Delivery            if (msg.getDelayTimeLevel() > 0) {                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());                }                topic = ScheduleMessageService.SCHEDULE_TOPIC;                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());                // Backup real topic, queueId                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));                msg.setTopic(topic);                msg.setQueueId(queueId);            }        }        long eclipseTimeInLock = 0;        MapedFile unlockMapedFile = null;        MapedFile mapedFile = this.mapedFileQueue.getLastMapedFileWithLock();        synchronized (this) {            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();            this.beginTimeInLock = beginLockTimestamp;            // Here settings are stored timestamp, in order to ensure an orderly            // global            msg.setStoreTimestamp(beginLockTimestamp);            if (null == mapedFile || mapedFile.isFull()) {                mapedFile = this.mapedFileQueue.getLastMapedFile();            }            if (null == mapedFile) {                log.error("create maped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                beginTimeInLock = 0;                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);            }            result = mapedFile.appendMessage(msg, this.appendMessageCallback);            switch (result.getStatus()) {                case PUT_OK:                    break;                case END_OF_FILE:                    unlockMapedFile = mapedFile;                    // Create a new file, re-write the message                    mapedFile = this.mapedFileQueue.getLastMapedFile();                    if (null == mapedFile) {                        // XXX: warn and notify me                        log.error("create maped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());                        beginTimeInLock = 0;                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);                    }                    result = mapedFile.appendMessage(msg, this.appendMessageCallback);                    break;                case MESSAGE_SIZE_EXCEEDED:                case PROPERTIES_SIZE_EXCEEDED:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);                case UNKNOWN_ERROR:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);                default:                    beginTimeInLock = 0;                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);            }            eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;            beginTimeInLock = 0;        } // end of synchronized        if (eclipseTimeInLock > 1000) {            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result);        }        if (null != unlockMapedFile) {            this.defaultMessageStore.unlockMapedFile(unlockMapedFile);        }        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);        // Statistics        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());        GroupCommitRequest request = null;        // Synchronization flush        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {            GroupCommitService service = (GroupCommitService) this.flushCommitLogService;            if (msg.isWaitStoreMsgOK()) {                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                service.putRequest(request);                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());                if (!flushOK) {                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()                            + " client address: " + msg.getBornHostString());                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);                }            } else {                service.wakeup();            }        }        // Asynchronous flush        else {            this.flushCommitLogService.wakeup();        }        // Synchronous write double        if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {            HAService service = this.defaultMessageStore.getHaService();            if (msg.isWaitStoreMsgOK()) {                // Determine whether to wait                if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {                    if (null == request) {                        request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                    }                    service.putRequest(request);                    service.getWaitNotifyObject().wakeupAll();                    boolean flushOK =                            // TODO                            request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());                    if (!flushOK) {                        log.error("do sync transfer other node, wait return, but failed, topic: " + msg.getTopic() + " tags: "                                + msg.getTags() + " client address: " + msg.getBornHostString());                        putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);                    }                }                // Slave problem                else {                    // Tell the producer, slave not available                    putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);                }            }        }        return putMessageResult;    }

 我们进入标黄的方法:

/**     * 插入消息到 MappedFile,并返回插入结果。     * MappedFile的理解就可以按照字面意思来理解。换一种说法就是file的缓冲区。刷盘之前先写入缓存区。nio就是这样的。     * @param msg     * @param cb     * @return     */    public AppendMessageResult appendMessage(final Object msg, final AppendMessageCallback cb) {        assert msg != null;        assert cb != null;        int currentPos = this.wrotePostion.get();        if (currentPos < this.fileSize) {            // 获取需要写入的字节缓冲区。为什么会有 writeBuffer != null 的判断后,使用不同的字节缓冲区,见:FlushCommitLogService。            ByteBuffer byteBuffer = this.mappedByteBuffer.slice();            byteBuffer.position(currentPos);            AppendMessageResult result =                    cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, msg);            this.wrotePostion.addAndGet(result.getWroteBytes());            this.storeTimestamp = result.getStoreTimestamp();            return result;        }        log.error("MapedFile.appendMessage return null, wrotePostion: " + currentPos + " fileSize: "                + this.fileSize);        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);    }

接着看上面这个标黄的方法:

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final Object msg) {            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET 
MessageExtBrokerInner msgInner = (MessageExtBrokerInner) msg; // PHY OFFSET long wroteOffset = fileFromOffset + byteBuffer.position(); String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(), wroteOffset); // Record ConsumeQueue information String key = msgInner.getTopic() + "-" + msgInner.getQueueId(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: queueOffset = 0L; break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: default: break; } /** * Serialize message 序列化消息 */ final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); if (propertiesData.length > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final short propertiesLength = propertiesData == null ? 0 : (short) propertiesData.length; final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData == null ? 0 : topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space // 确定是否有足够的空间 if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { this.resetMsgStoreItemMemory(maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(maxBlank); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.BlankMagicCode); // 3 The remaining space may be any value // // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // Initialization of storage space this.resetMsgStoreItemMemory(msgLen); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MessageMagicCode); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.msgStoreItemMemory.put(msgInner.getBornHostBytes()); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.msgStoreItemMemory.put(msgInner.getStoreHostBytes()); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort(propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // message:>>>>>>>:Hello RocketMQ 0 System.out.println("message:>>>>>>>:" + new String(msgInner.getBody())); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TransactionPreparedType: case MessageSysFlag.TransactionRollbackType: break; case MessageSysFlag.TransactionNotType: case MessageSysFlag.TransactionCommitType: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; } private void resetMsgStoreItemMemory(final int length) { this.msgStoreItemMemory.flip(); this.msgStoreItemMemory.limit(length); } }

我们注意到消息被保存到了:msgStoreItemMemory这里,这是一个内存映射文件MappedByteBuffer

可以参考(http://www.cnblogs.com/guazi/p/6829487.html)

他声明在MapedFile中,这个类我们上面说了,他的功能和他的名字正好相符。

//映射的内存对象 文件映射为的内存 private final MappedByteBuffer mappedByteBuffer;
//映射的FileChannel对象 nio阻塞 private FileChannel fileChannel;

在构造方法中初始化:

 
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);

保存到这里之后,我们回到最开始的 sendMessage方法中的锚点

消息保存的映射缓存中后,就启动耍盘线程了,同样是在sendmessage方法中,接下来就是耍盘操作了:

...    ...     // 消息刷盘,即持久化到文件。上面插入消息实际未存储到硬盘。此处,根据不同的刷盘策略,执行会有不同。详细解析见:FlushCommitLogService。        // 进行同步||异步 flush||commit        GroupCommitRequest request = null;        // Synchronization flush        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {            GroupCommitService service = (GroupCommitService) this.flushCommitLogService;            if (msg.isWaitStoreMsgOK()) {                request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());                service.putRequest(request);                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());                if (!flushOK) {                    log.error("do groupcommit, wait for flush failed, topic: " + msg.getTopic() + " tags: " + msg.getTags()                            + " client address: " + msg.getBornHostString());                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);                }            } else {                service.wakeup();            }        }        // Asynchronous flush        else {            this.flushCommitLogService.wakeup();// important:唤醒commitLog线程,进行flush。具体看flushCommitLogService的实现类        }    ......

这里分成了种刷盘方式,一种是同步FlushDiskType.SYNC_FLUSH,一种是异步。

配置系统的同步和异步刷盘方式可以通过这里看到:

同步的是这个线程:GroupCommitService service = (GroupCommitService) this.flushCommitLogService;来处理刷盘逻辑。

我们直接看他的刷盘业务代码:

private void doCommit() {            if (!this.requestsRead.isEmpty()) {                for (GroupCommitRequest req : this.requestsRead) {                    // There may be a message in the next file, so a maximum of                    // two times the flush                    boolean flushOK = false;                    for (int i = 0; (i < 2) && !flushOK; i++) {                        flushOK = (CommitLog.this.mapedFileQueue.getCommittedWhere() >= req.getNextOffset());                        if (!flushOK) {                            CommitLog.this.mapedFileQueue.commit(0);                        }                    }                    req.wakeupCustomer(flushOK);                }                long storeTimestamp = CommitLog.this.mapedFileQueue.getStoreTimestamp();                if (storeTimestamp > 0) {                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                }                this.requestsRead.clear();            } else {                // Because of individual messages is set to not sync flush, it                // will come to this process                CommitLog.this.mapedFileQueue.commit(0);            }        }

最后调用:CommitLog.this.mapedFileQueue.commit(0);来保存到硬盘。

  // 消息文件队列,包含所有保存在磁盘上的文件。另外MappedFile和文件一对一关系。这两个类之间的关系非常紧密。
com.alibaba.rocketmq.store.MapedFileQueue

这个也是在 CommitLog的构造方法中初始化的。

MapedFileQueue mapedFileQueue;
this.mapedFileQueue = new MapedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),         defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMapedFileService());
  public MapedFileQueue(final String storePath, int mapedFileSize,                          AllocateMapedFileService allocateMapedFileService) {        this.storePath = storePath;        this.mapedFileSize = mapedFileSize;        this.allocateMapedFileService = allocateMapedFileService;    }

 

看一下commit(final int flushLeastPages)方法:

public boolean commit(final int flushLeastPages) {        boolean result = true;        MapedFile mapedFile = this.findMapedFileByOffset(this.committedWhere, true);        if (mapedFile != null) {            long tmpTimeStamp = mapedFile.getStoreTimestamp();            int offset = mapedFile.commit(flushLeastPages);            long where = mapedFile.getFileFromOffset() + offset;            result = (where == this.committedWhere);            this.committedWhere = where;            if (0 == flushLeastPages) {                this.storeTimestamp = tmpTimeStamp;            }        }        return result;    }

 

public MapedFile findMapedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {        try {            this.readWriteLock.readLock().lock();            MapedFile mapedFile = this.getFirstMapedFile();            if (mapedFile != null) {                int index =                        (int) ((offset / this.mapedFileSize) - (mapedFile.getFileFromOffset() / this.mapedFileSize));                if (index < 0 || index >= this.mapedFiles.size()) {                    logError                            .warn(                                    "findMapedFileByOffset offset not matched, request Offset: {}, index: {}, mapedFileSize: {}, mapedFiles count: {}, StackTrace: {}",//                                    offset,//                                    index,//                                    this.mapedFileSize,//                                    this.mapedFiles.size(),//                                    UtilAll.currentStackTrace());                }                try {                    return this.mapedFiles.get(index);                } catch (Exception e) {                    if (returnFirstOnNotFound) {                        return mapedFile;                    }                }            }        } catch (Exception e) {            log.error("findMapedFileByOffset Exception", e);        } finally {            this.readWriteLock.readLock().unlock();        }        return null;    }
private MapedFile getFirstMapedFile() {        if (this.mapedFiles.isEmpty()) {            return null;        }        return this.mapedFiles.get(0);    }

最终要找到mapedFiles的赋值的方法:

就是他了!!!

com.alibaba.rocketmq.store.MapedFileQueue   public boolean load() {        File dir = new File(this.storePath);        File[] files = dir.listFiles();        if (files != null) {            // ascending order            Arrays.sort(files);            for (File file : files) {                if (file.length() != this.mapedFileSize) {                    log.warn(file + "\t" + file.length()                            + " length not matched message store config value, ignore it");                    return true;                }                try {                    MapedFile mapedFile = new MapedFile(file.getPath(), mapedFileSize);                    mapedFile.setWrotePostion(this.mapedFileSize);                    mapedFile.setCommittedPosition(this.mapedFileSize);                    this.mapedFiles.add(mapedFile);                    log.info("load " + file.getPath() + " OK");                } catch (IOException e) {                    log.error("load file " + file + " error", e);                    return false;                }            }        }        return true;    }

现在来看看这个MapedFile的构造方法:

需要注意的是一个内部属性:

//文件的起始偏移量 该文件的全局offset,也就是文件名的前缀

private final long fileFromOffset;

public MapedFile(final String fileName, final int fileSize) throws IOException {        this.fileName = fileName;        this.fileSize = fileSize;        this.file = new File(fileName);        this.fileFromOffset = Long.parseLong(this.file.getName());        boolean ok = false;        ensureDirOK(this.file.getParent());        try {            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);            TotalMapedVitualMemory.addAndGet(fileSize);            TotalMapedFiles.incrementAndGet();            ok = true;        } catch (FileNotFoundException e) {            log.error("create file channel " + this.fileName + " Failed. ", e);            throw e;        } catch (IOException e) {            log.error("map file " + this.fileName + " Failed. ", e);            throw e;        } finally {            if (!ok && this.fileChannel != null) {                this.fileChannel.close();            }        }    }

好了回到。

 int offset = mapedFile.commit(flushLeastPages)

接着看:

/**     * 消息刷盘     *     * @param flushLeastPages     *            至少刷几个page     *     * @return     */    public int commit(final int flushLeastPages) {        //判断当前是否能刷盘        if (this.isAbleToFlush(flushLeastPages)) {            //类似于一个智能指针,控制刷盘线程数            if (this.hold()) {                int value = this.wrotePostion.get();                //刷盘,内存到硬盘                this.mappedByteBuffer.force();                this.committedPosition.set(value);                //释放智能指针                this.release();            } else {                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());                this.committedPosition.set(this.wrotePostion.get());            }        }        return this.getCommittedPosition();    }

到这来说刷盘就算结束了。当然中间跳过了很多,也是不得已的事,争取再抽时间把文章拆分一下。

 

转载于:https://www.cnblogs.com/guazi/p/6822939.html

你可能感兴趣的文章
用 Python 做一个 H5 游戏机器人
查看>>
Kali Linux渗透测试实战 第一章
查看>>
倒计时获取验证码
查看>>
我眼中的Android Framework
查看>>
网易云信融合CDN方案及实践
查看>>
Flutter初探 上下拉分页请求+计算器实现
查看>>
TensorFlow入门教程
查看>>
HTML5 新特性
查看>>
你知道JavaScript中的结果值是什么吗?
查看>>
python将指定点云文件(asc)转换为PCD格式
查看>>
切图崽的自我修养-[ES6] 异步函数管理方案浅析
查看>>
关于性能优化的那点事——函数节流
查看>>
NPM简单入门
查看>>
Linux Namespace系列(05):pid namespace (CLONE_NEWPID)
查看>>
爬虫学习之基于Scrapy的网络爬虫
查看>>
基于 WebSocket 实现 WebGL 3D 拓扑图实时数据通讯同步(一)
查看>>
Docker利用Jexus独立版部署MVC Demo
查看>>
199. Binary Tree Right Side View
查看>>
ERR_INCOMPLETE_CHUNKED_ENCODING
查看>>
ReactiveCocoa与swift
查看>>