2

mongodb 数据块迁移的源码分析 - xinghebuluo

 2 years ago
source link: https://www.cnblogs.com/xinghebuluo/p/16461068.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

上一篇我们聊到了mongodb数据块的基本概念,和数据块迁移的主要流程,这篇文章我们聊聊源码实现部分。

2. 迁移序列图

数据块迁移的请求是从配置服务器(config server)发给(donor,捐献方),再有捐献方发起迁移请求给目标节点(recipient,接收方),后续迁移由捐献方和接收方配合完成。

数据迁移结束时,捐献方再提交迁移结果给配置服务器,三方交互序列图如下:

319908-20220709144223689-1467687484.png

可以看到,序列图中的5个步骤,是对应前面文章的迁移流程中的5个步骤,其中接收方的流程控制代码在migration_destination_manager.cpp中的_migrateDriver方法中,捐献方的流程控制代码在donor的move_chunk_command.cpp中的_runImpl方法中完成,代码如下:

static void _runImpl(OperationContext* opCtx, const MoveChunkRequest& moveChunkRequest) {
        const auto writeConcernForRangeDeleter =
            uassertStatusOK(ChunkMoveWriteConcernOptions::getEffectiveWriteConcern(
                opCtx, moveChunkRequest.getSecondaryThrottle()));

        // Resolve the donor and recipient shards and their connection string
        auto const shardRegistry = Grid::get(opCtx)->shardRegistry();
        // 准备donor和recipient的连接
        const auto donorConnStr =
            uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getFromShardId()))
                ->getConnString();
        const auto recipientHost = uassertStatusOK([&] {
            auto recipientShard =
                uassertStatusOK(shardRegistry->getShard(opCtx, moveChunkRequest.getToShardId()));

            return recipientShard->getTargeter()->findHost(
                opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly});
        }());

        std::string unusedErrMsg;
        // 用于统计每一步的耗时情况
        MoveTimingHelper moveTimingHelper(opCtx,
                                          "from",
                                          moveChunkRequest.getNss().ns(),
                                          moveChunkRequest.getMinKey(),
                                          moveChunkRequest.getMaxKey(),
                                          6,  // Total number of steps
                                          &unusedErrMsg,
                                          moveChunkRequest.getToShardId(),
                                          moveChunkRequest.getFromShardId());

        moveTimingHelper.done(1);
        moveChunkHangAtStep1.pauseWhileSet();

        if (moveChunkRequest.getFromShardId() == moveChunkRequest.getToShardId()) {
            // TODO: SERVER-46669 handle wait for delete.
            return;
        }
        // 构建迁移任务管理器
        MigrationSourceManager migrationSourceManager(
            opCtx, moveChunkRequest, donorConnStr, recipientHost);

        moveTimingHelper.done(2);
        moveChunkHangAtStep2.pauseWhileSet();

        // 向接收方发送迁移命令
        uassertStatusOKWithWarning(migrationSourceManager.startClone());
        moveTimingHelper.done(3);
        moveChunkHangAtStep3.pauseWhileSet();

        // 等待块数据和变更数据都拷贝完成
        uassertStatusOKWithWarning(migrationSourceManager.awaitToCatchUp());
        moveTimingHelper.done(4);
        moveChunkHangAtStep4.pauseWhileSet();

        // 进入临界区
        uassertStatusOKWithWarning(migrationSourceManager.enterCriticalSection());

        // 通知接收方
        uassertStatusOKWithWarning(migrationSourceManager.commitChunkOnRecipient());
        moveTimingHelper.done(5);
        moveChunkHangAtStep5.pauseWhileSet();

        // 在配置服务器提交分块元数据信息
        uassertStatusOKWithWarning(migrationSourceManager.commitChunkMetadataOnConfig());
        moveTimingHelper.done(6);
        moveChunkHangAtStep6.pauseWhileSet();
    }

下面对每一个步骤的代码做分析。

3. 各步骤源码分析

3.1 启动迁移( _recvChunkStart)

在启动阶段,捐献方主要做了三件事:

1. 参数检查,在MigrationSourceManager 构造函数中完成,不再赘述。

2. 注册监听器,用于记录在迁移期间该数据块内发生的变更数据,代码如下:

3. 向接收方发送迁移命令_recvChunkStart。

步骤2和3的代码实现在一个方法中,如下:

Status MigrationSourceManager::startClone() {
    ...// 省略了部分代码

    _cloneAndCommitTimer.reset();

    auto replCoord = repl::ReplicationCoordinator::get(_opCtx);
    auto replEnabled = replCoord->isReplEnabled();

    {
        const auto metadata = _getCurrentMetadataAndCheckEpoch();

        // Having the metadata manager registered on the collection sharding state is what indicates
        // that a chunk on that collection is being migrated. With an active migration, write
        // operations require the cloner to be present in order to track changes to the chunk which
        // needs to be transmitted to the recipient.
        // 注册监听器,_cloneDriver除了迁移数据外,还会用于记录在迁移过程中该数据块增量变化的数据(比如新增的数据)
        _cloneDriver = std::make_unique<MigrationChunkClonerSourceLegacy>(
            _args, metadata.getKeyPattern(), _donorConnStr, _recipientHost);

        AutoGetCollection autoColl(_opCtx,
                                   getNss(),
                                   replEnabled ? MODE_IX : MODE_X,
                                   AutoGetCollectionViewMode::kViewsForbidden,
                                   _opCtx->getServiceContext()->getPreciseClockSource()->now() +
                                       Milliseconds(migrationLockAcquisitionMaxWaitMS.load()));

        auto csr = CollectionShardingRuntime::get(_opCtx, getNss());
        auto lockedCsr = CollectionShardingRuntime::CSRLock::lockExclusive(_opCtx, csr);
        invariant(nullptr == std::exchange(msmForCsr(csr), this));

        _coordinator = std::make_unique<migrationutil::MigrationCoordinator>(
            _cloneDriver->getSessionId(),
            _args.getFromShardId(),
            _args.getToShardId(),
            getNss(),
            *_collectionUUID,
            ChunkRange(_args.getMinKey(), _args.getMaxKey()),
            _chunkVersion,
            _args.getWaitForDelete());

        _state = kCloning;
    }

    if (replEnabled) {
        auto const readConcernArgs = repl::ReadConcernArgs(
            replCoord->getMyLastAppliedOpTime(), repl::ReadConcernLevel::kLocalReadConcern);

        // 检查当前节点状态是否满足repl::ReadConcernLevel::kLocalReadConcern
        auto waitForReadConcernStatus =
            waitForReadConcern(_opCtx, readConcernArgs, StringData(), false);
        if (!waitForReadConcernStatus.isOK()) {
            return waitForReadConcernStatus;
        }
        setPrepareConflictBehaviorForReadConcern(
            _opCtx, readConcernArgs, PrepareConflictBehavior::kEnforce);
    }

    _coordinator->startMigration(_opCtx);

    // 向接收方发送开始拷贝数据的命令(_recvChunkStart)
    Status startCloneStatus = _cloneDriver->startClone(_opCtx,
                                                       _coordinator->getMigrationId(),
                                                       _coordinator->getLsid(),
                                                       _coordinator->getTxnNumber());
    if (!startCloneStatus.isOK()) {
        return startCloneStatus;
    }

    scopedGuard.dismiss();
    return Status::OK();
}

接收方在收到迁移请求后,会先检查本地是否有该表,如果没有的话,会先建表会创建表的索引:

void MigrationDestinationManager::cloneCollectionIndexesAndOptions(
    OperationContext* opCtx,
    const NamespaceString& nss,
    const CollectionOptionsAndIndexes& collectionOptionsAndIndexes) {
    {
        // 1. Create the collection (if it doesn't already exist) and create any indexes we are
        // missing (auto-heal indexes).

        ...// 省略部分代码

        {
            AutoGetCollection collection(opCtx, nss, MODE_IS);
            // 如果存在表,且不缺索引,则退出
            if (collection) {
                checkUUIDsMatch(collection.getCollection());
                auto indexSpecs =
                    checkEmptyOrGetMissingIndexesFromDonor(collection.getCollection());
                if (indexSpecs.empty()) {
                    return;
                }
            }
        }

        // Take the exclusive database lock if the collection does not exist or indexes are missing
        // (needs auto-heal).
        // 建表时,需要对数据库加锁
        AutoGetDb autoDb(opCtx, nss.db(), MODE_X);
        auto db = autoDb.ensureDbExists();

        auto collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
        if (collection) {
            checkUUIDsMatch(collection);
        } else {
            ...// 省略部分代码// We do not have a collection by this name. Create the collection with the donor's
            // options.
            // 建表
            OperationShardingState::ScopedAllowImplicitCollectionCreate_UNSAFE
                unsafeCreateCollection(opCtx);
            WriteUnitOfWork wuow(opCtx);
            CollectionOptions collectionOptions = uassertStatusOK(
                CollectionOptions::parse(collectionOptionsAndIndexes.options,
                                         CollectionOptions::ParseKind::parseForStorage));
            const bool createDefaultIndexes = true;
            uassertStatusOK(db->userCreateNS(opCtx,
                                             nss,
                                             collectionOptions,
                                             createDefaultIndexes,
                                             collectionOptionsAndIndexes.idIndexSpec));
            wuow.commit();
            collection = CollectionCatalog::get(opCtx)->lookupCollectionByNamespace(opCtx, nss);
        }
        // 创建对应的索引
        auto indexSpecs = checkEmptyOrGetMissingIndexesFromDonor(collection);
        if (!indexSpecs.empty()) {
            WriteUnitOfWork wunit(opCtx);
            auto fromMigrate = true;
            CollectionWriter collWriter(opCtx, collection->uuid());
            IndexBuildsCoordinator::get(opCtx)->createIndexesOnEmptyCollection(
                opCtx, collWriter, indexSpecs, fromMigrate);
            wunit.commit();
        }
    }
}

3.2 接收方拉取存量数据( _migrateClone)

接收方的拉取存量数据时,做了六件事情:

1. 定义了一个批量插入记录的方法。

2. 定义了一个批量拉取数据的方法。

3. 定义生产者和消费队列。

4. 启动数据写入线程,该线程会消费队列中的数据,并调用批量插入记录的方法把记录保存到本地。

5. 循环向捐献方发起拉取数据请求(步骤2的方法),并写入步骤3的队列中。

6. 数据拉取结束后(写入空记录到队列中,触发步骤5结束),则同步等待步骤5的线程也结束。

详细代码如下:

// 1. 定义批量写入函数
        auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
            auto it = arr.begin();
            while (it != arr.end()) {
                int batchNumCloned = 0;
                int batchClonedBytes = 0;
                const int batchMaxCloned = migrateCloneInsertionBatchSize.load();

                assertNotAborted(opCtx);

                write_ops::InsertCommandRequest insertOp(_nss);
                insertOp.getWriteCommandRequestBase().setOrdered(true);
                insertOp.setDocuments([&] {
                    std::vector<BSONObj> toInsert;
                    while (it != arr.end() &&
                           (batchMaxCloned <= 0 || batchNumCloned < batchMaxCloned)) {
                        const auto& doc = *it;
                        BSONObj docToClone = doc.Obj();
                        toInsert.push_back(docToClone);
                        batchNumCloned++;
                        batchClonedBytes += docToClone.objsize();
                        ++it;
                    }
                    return toInsert;
                }());

                const auto reply =
                    write_ops_exec::performInserts(opCtx, insertOp, OperationSource::kFromMigrate);

                for (unsigned long i = 0; i < reply.results.size(); ++i) {
                    uassertStatusOKWithContext(
                        reply.results[i],
                        str::stream() << "Insert of " << insertOp.getDocuments()[i] << " failed.");
                }

                {
                    stdx::lock_guard<Latch> statsLock(_mutex);
                    _numCloned += batchNumCloned;
                    ShardingStatistics::get(opCtx).countDocsClonedOnRecipient.addAndFetch(
                        batchNumCloned);
                    _clonedBytes += batchClonedBytes;
                }
                if (_writeConcern.needToWaitForOtherNodes()) {
                    runWithoutSession(outerOpCtx, [&] {
                        repl::ReplicationCoordinator::StatusAndDuration replStatus =
                            repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
                                opCtx,
                                repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
                                _writeConcern);
                        if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
                            LOGV2_WARNING(
                                22011,
                                "secondaryThrottle on, but doc insert timed out; continuing",
                                "migrationId"_attr = _migrationId->toBSON());
                        } else {
                            uassertStatusOK(replStatus.status);
                        }
                    });
                }

                sleepmillis(migrateCloneInsertionBatchDelayMS.load());
            }
        };
        // 2. 定义批量拉取函数
        auto fetchBatchFn = [&](OperationContext* opCtx) {
            auto res = uassertStatusOKWithContext(
                fromShard->runCommand(opCtx,
                                      ReadPreferenceSetting(ReadPreference::PrimaryOnly),
                                      "admin",
                                      migrateCloneRequest,
                                      Shard::RetryPolicy::kNoRetry),
                "_migrateClone failed: ");

            uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(res),
                                       "_migrateClone failed: ");

            return res.response;
        };

SingleProducerSingleConsumerQueue<BSONObj>::Options options;
    options.maxQueueDepth = 1;
    // 3. 使用生产者和消费者队列来把同步的数据写入到本地
    SingleProducerSingleConsumerQueue<BSONObj> batches(options);
    repl::OpTime lastOpApplied;

    // 4. 定义写数据线程,该线程会读取队列中的数据并写入本地节点,直到无需要同步的数据时线程退出
    stdx::thread inserterThread{[&] {
        Client::initThread("chunkInserter", opCtx->getServiceContext(), nullptr);
        auto client = Client::getCurrent();
        {
            stdx::lock_guard lk(*client);
            client->setSystemOperationKillableByStepdown(lk);
        }
        auto executor =
            Grid::get(opCtx->getServiceContext())->getExecutorPool()->getFixedExecutor();
        auto inserterOpCtx = CancelableOperationContext(
            cc().makeOperationContext(), opCtx->getCancellationToken(), executor);

        auto consumerGuard = makeGuard([&] {
            batches.closeConsumerEnd();
            lastOpApplied = repl::ReplClientInfo::forClient(inserterOpCtx->getClient()).getLastOp();
        });

        try {
            while (true) {
                auto nextBatch = batches.pop(inserterOpCtx.get());
                auto arr = nextBatch["objects"].Obj();
                if (arr.isEmpty()) {
                    return;
                }
                insertBatchFn(inserterOpCtx.get(), arr);
            }
        } catch (...) {
            stdx::lock_guard<Client> lk(*opCtx->getClient());
            opCtx->getServiceContext()->killOperation(lk, opCtx, ErrorCodes::Error(51008));
            LOGV2(21999,
                  "Batch insertion failed: {error}",
                  "Batch insertion failed",
                  "error"_attr = redact(exceptionToStatus()));
        }
    }};


    {
        //6.  makeGuard的作用是延迟执行inserterThread.join()
        auto inserterThreadJoinGuard = makeGuard([&] {
            batches.closeProducerEnd();
            inserterThread.join();
        });
        // 5. 向捐献方发起拉取请求,并把数据写入队列中
        while (true) {
            auto res = fetchBatchFn(opCtx);
            try {
                batches.push(res.getOwned(), opCtx);
                auto arr = res["objects"].Obj();
                if (arr.isEmpty()) {
                    break;
                }
            } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) {
                break;
            }
        }
    }  // This scope ensures that the guard is destroyed

3.3 接收方拉取变更数据( _recvChunkStart)

在本步骤,接收方会再拉取变更数据,即在前面迁移过程中,捐献方上发生的针对该数据块的写入、更新和删除的记录,代码如下:

// 同步变更数据(_transferMods)
    const BSONObj xferModsRequest = createTransferModsRequest(_nss, *_sessionId);

    {
        // 5. Do bulk of mods
        // 5. 批量拉取变更数据,循环拉取,直至无变更数据
        _setState(CATCHUP);

        while (true) {
            auto res = uassertStatusOKWithContext(
                fromShard->runCommand(opCtx,
                                      ReadPreferenceSetting(ReadPreference::PrimaryOnly),
                                      "admin",
                                      xferModsRequest,
                                      Shard::RetryPolicy::kNoRetry),
                "_transferMods failed: ");

            uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(res),
                                       "_transferMods failed: ");

            const auto& mods = res.response;

            if (mods["size"].number() == 0) {
                // There are no more pending modifications to be applied. End the catchup phase
                // 无变更数据时,停止循环
                break;
            }
            // 应用拉取到的变更数据
            if (!_applyMigrateOp(opCtx, mods, &lastOpApplied)) {
                continue;
            }

            const int maxIterations = 3600 * 50;

            // 等待从节点完成数据同步
            int i;
            for (i = 0; i < maxIterations; i++) {
                opCtx->checkForInterrupt();
                outerOpCtx->checkForInterrupt();

                if (getState() == ABORT) {
                    LOGV2(22002,
                          "Migration aborted while waiting for replication at catch up stage",
                          "migrationId"_attr = _migrationId->toBSON());
                    return;
                }

                if (runWithoutSession(outerOpCtx, [&] {
                        return opReplicatedEnough(opCtx, lastOpApplied, _writeConcern);
                    })) {
                    break;
                }

                if (i > 100) {
                    LOGV2(22003,
                          "secondaries having hard time keeping up with migrate",
                          "migrationId"_attr = _migrationId->toBSON());
                }

                sleepmillis(20);
            }

            if (i == maxIterations) {
                _setStateFail("secondary can't keep up with migrate");
                return;
            }
        }

        timing.done(5);
        migrateThreadHangAtStep5.pauseWhileSet();
    }

变更数据拉取结束,就进入等待捐献方进入临界区,在临界区内,捐献方会阻塞写入请求,因此在未进入临界区前,仍然需要拉取变更数据:

        // 6. Wait for commit
        // 6. 等待donor进入临界区
        _setState(STEADY);

        bool transferAfterCommit = false;
        while (getState() == STEADY || getState() == COMMIT_START) {
            opCtx->checkForInterrupt();
            outerOpCtx->checkForInterrupt();

            // Make sure we do at least one transfer after recv'ing the commit message. If we
            // aren't sure that at least one transfer happens *after* our state changes to
            // COMMIT_START, there could be mods still on the FROM shard that got logged
            // *after* our _transferMods but *before* the critical section.
            if (getState() == COMMIT_START) {
                transferAfterCommit = true;
            }

            auto res = uassertStatusOKWithContext(
                fromShard->runCommand(opCtx,
                                      ReadPreferenceSetting(ReadPreference::PrimaryOnly),
                                      "admin",
                                      xferModsRequest,
                                      Shard::RetryPolicy::kNoRetry),
                "_transferMods failed in STEADY STATE: ");

            uassertStatusOKWithContext(Shard::CommandResponse::getEffectiveStatus(res),
                                       "_transferMods failed in STEADY STATE: ");

            auto mods = res.response;

            // 如果请求到变更数据,则应用到本地,并继续请求变更数据,直到所有变更数据都迁移结束
            if (mods["size"].number() > 0 && _applyMigrateOp(opCtx, mods, &lastOpApplied)) {
                continue;
            }

            if (getState() == ABORT) {
                LOGV2(22006,
                      "Migration aborted while transferring mods",
                      "migrationId"_attr = _migrationId->toBSON());
                return;
            }

            // We know we're finished when:
            // 1) The from side has told us that it has locked writes (COMMIT_START)
            // 2) We've checked at least one more time for un-transmitted mods
            // 检查transferAfterCommit的原因:进入COMMIT_START(临界区)后,需要再拉取一次变更数据
            if (getState() == COMMIT_START && transferAfterCommit == true) {
                // 检查所有数据同步到从节点后,数据迁移流程结束
                if (runWithoutSession(outerOpCtx,
                                      [&] { return _flushPendingWrites(opCtx, lastOpApplied); })) {
                    break;
                }
            }

            // Only sleep if we aren't committing
            if (getState() == STEADY)
                sleepmillis(10);
        }

3.4 进入临界区( _recvChunkStatus,_recvChunkCommit)

在该步骤,捐献方主要做了三件事:

1. 等待接收方完成数据同步(_recvChunkStatus)。

2. 标记本节点进入临界区,阻塞写操作。

3. 通知接收方进入临界区(_recvChunkCommit)。

相关代码如下:

Status MigrationSourceManager::awaitToCatchUp() {
    invariant(!_opCtx->lockState()->isLocked());
    invariant(_state == kCloning);
    auto scopedGuard = makeGuard([&] { cleanupOnError(); });
    _stats.totalDonorChunkCloneTimeMillis.addAndFetch(_cloneAndCommitTimer.millis());
    _cloneAndCommitTimer.reset();

    // Block until the cloner deems it appropriate to enter the critical section.
    // 等待数据拷贝完成,这里会向接收方发送_recvChunkStatus,检查接收方的状态是否是STEADY
    Status catchUpStatus = _cloneDriver->awaitUntilCriticalSectionIsAppropriate(
        _opCtx, kMaxWaitToEnterCriticalSectionTimeout);
    if (!catchUpStatus.isOK()) {
        return catchUpStatus;
    }

    _state = kCloneCaughtUp;
    scopedGuard.dismiss();
    return Status::OK();
}

// 进入临界区 Status MigrationSourceManager::enterCriticalSection() { ...// 省略部分代码
// 标记进入临界区,后续更新类操作会被阻塞(通过ShardingMigrationCriticalSection::getSignal()检查该标记) _critSec.emplace(_opCtx, _args.getNss(), _critSecReason); _state = kCriticalSection; // Persist a signal to secondaries that we've entered the critical section. This is will cause // secondaries to refresh their routing table when next accessed, which will block behind the // critical section. This ensures causal consistency by preventing a stale mongos with a cluster // time inclusive of the migration config commit update from accessing secondary data. // Note: this write must occur after the critSec flag is set, to ensure the secondary refresh // will stall behind the flag. // 通知从节点此时主节点已进入临界区,如果有数据访问时要刷新路由信息(保证因果一致性) Status signalStatus = shardmetadatautil::updateShardCollectionsEntry( _opCtx, BSON(ShardCollectionType::kNssFieldName << getNss().ns()), BSON("$inc" << BSON(ShardCollectionType::kEnterCriticalSectionCounterFieldName << 1)), false /*upsert*/); if (!signalStatus.isOK()) { return { ErrorCodes::OperationFailed, str::stream() << "Failed to persist critical section signal for secondaries due to: " << signalStatus.toString()}; } LOGV2(22017, "Migration successfully entered critical section", "migrationId"_attr = _coordinator->getMigrationId()); scopedGuard.dismiss(); return Status::OK(); }

Status MigrationSourceManager::commitChunkOnRecipient() {
  invariant(!_opCtx->lockState()->isLocked());
  invariant(_state == kCriticalSection);
  auto scopedGuard = makeGuard([&] { cleanupOnError(); });

  // Tell the recipient shard to fetch the latest changes.
  // 通知接收方进入临界区,并再次拉取变更数据。
  auto commitCloneStatus = _cloneDriver->commitClone(_opCtx);

  if (MONGO_unlikely(failMigrationCommit.shouldFail()) && commitCloneStatus.isOK()) {
    commitCloneStatus = {ErrorCodes::InternalError,
    "Failing _recvChunkCommit due to failpoint."};
   }

  if (!commitCloneStatus.isOK()) {
    return commitCloneStatus.getStatus().withContext("commit clone failed");
  }

  _recipientCloneCounts = commitCloneStatus.getValue()["counts"].Obj().getOwned();

  _state = kCloneCompleted;
  scopedGuard.dismiss();
  return Status::OK();
}

3.5 提交迁移结果( _configsvrCommitChunkMigration)

此时,数据已经前部迁移结束,捐献方将会向配置服务器(config server)提交迁移结果,更新配置服务器上面的分片信息,代码如下:

    BSONObjBuilder builder;

    {
        const auto metadata = _getCurrentMetadataAndCheckEpoch();

        ChunkType migratedChunkType;
        migratedChunkType.setMin(_args.getMinKey());
        migratedChunkType.setMax(_args.getMaxKey());
        migratedChunkType.setVersion(_chunkVersion);

        // 准备提交更新元信息的请求
        const auto currentTime = VectorClock::get(_opCtx)->getTime();
        CommitChunkMigrationRequest::appendAsCommand(&builder,
                                                     getNss(),
                                                     _args.getFromShardId(),
                                                     _args.getToShardId(),
                                                     migratedChunkType,
                                                     metadata.getCollVersion(),
                                                     currentTime.clusterTime().asTimestamp());

        builder.append(kWriteConcernField, kMajorityWriteConcern.toBSON());
    }

    // Read operations must begin to wait on the critical section just before we send the commit
    // operation to the config server
    // 进入提交阶段时,会阻塞读请求,其实现和阻塞写请求类似
    _critSec->enterCommitPhase();

    _state = kCommittingOnConfig;

    Timer t;

    // 向配置服务器提交更新元数据的请求
    auto commitChunkMigrationResponse =
        Grid::get(_opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
            _opCtx,
            ReadPreferenceSetting{ReadPreference::PrimaryOnly},
            "admin",
            builder.obj(),
            Shard::RetryPolicy::kIdempotent);

    if (MONGO_unlikely(migrationCommitNetworkError.shouldFail())) {
        commitChunkMigrationResponse = Status(
            ErrorCodes::InternalError, "Failpoint 'migrationCommitNetworkError' generated error");
    }

至此,mongodb的数据块迁移的源代码基本分析完毕,这里补充一下监听变更数据的代码实现。

前面有提到监听变更数据是由_cloneDriver完成的,下面看下_cloneDriver的接口定义:

class MigrationChunkClonerSourceLegacy final : public MigrationChunkClonerSource {
    ...// 省略部分代码

    StatusWith<BSONObj> commitClone(OperationContext* opCtx) override;

    void cancelClone(OperationContext* opCtx) override;

    bool isDocumentInMigratingChunk(const BSONObj& doc) override;

// 该类定义了三个方法,当捐献方有写入、更新和删除请求时,会分别调用这三个方法 void onInsertOp(OperationContext* opCtx, const BSONObj& insertedDoc, const repl::OpTime& opTime) override; void onUpdateOp(OperationContext* opCtx, boost::optional<BSONObj> preImageDoc, const BSONObj& postImageDoc, const repl::OpTime& opTime, const repl::OpTime& prePostImageOpTime) override; void onDeleteOp(OperationContext* opCtx, const BSONObj& deletedDocId, const repl::OpTime& opTime, const repl::OpTime& preImageOpTime) override;
下面以onInsertOp为例,看下其实现:
  void MigrationChunkClonerSourceLegacy::onInsertOp(OperationContext* opCtx,
                                                  const BSONObj& insertedDoc,
                                                  const repl::OpTime& opTime) {
    dassert(opCtx->lockState()->isCollectionLockedForMode(_args.getNss(), MODE_IX));

    BSONElement idElement = insertedDoc["_id"];
   
    // 检查该记录是否在当前迁移数据块的范围内,如果不在,直接退出方法
    if (!isInRange(insertedDoc, _args.getMinKey(), _args.getMaxKey(), _shardKeyPattern)) {
        return;
    }

    if (!_addedOperationToOutstandingOperationTrackRequests()) {
        return;
    }

// 将该记录的_id记录下面,方便后面拉取变更数据 if (opCtx->getTxnNumber()) { opCtx->recoveryUnit()->registerChange(std::make_unique<LogOpForShardingHandler>( this, idElement.wrap(), 'i', opTime, repl::OpTime())); } else { opCtx->recoveryUnit()->registerChange(std::make_unique<LogOpForShardingHandler>( this, idElement.wrap(), 'i', repl::OpTime(), repl::OpTime())); } }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK