5

MongoDB Command命令处理模块源码实现一

 3 years ago
source link: https://mongoing.com/archives/77592
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

AFZFJf7.png!mobile

1. 背景

<<transport_layer网络传输层模块源码实现>> 中分享了MongoDB内核底层网络IO处理相关实现,包括套接字初始化、一个完整MongoDB报文的读取、获取到DB数据发送给客户端等。MongoDB支持多种增、删、改、查、聚合处理、cluster处理等操作,每个操作在内核实现中对应一个command,每个command有不同的功能,MongoDB内核如何进行command源码处理将是本文分析的重点。

此外,MongoDB提供了mongostat工具来监控当前集群的各种操作统计。Mongostat监控统计如下图所示:

MjMbmaz.png!mobile

其中,insert、delete、update、query这四项统计比较好理解,分别对应增、删、改、查。但是,comand、getmore不是很好理解,command代表什么统计?getMore代表什么统计?这两项相对比较难理解。

此外,通过本文字分析,我们将搞明白这六项统计的具体含义,同时弄清这六项统计由那些操作进行计数。

Command命令处理模块分为:mongos操作命令、mongod操作命令、MongoDB集群内部命令,具体定义如下:

①mongos操作命令,客户端可以通过mongos访问集群相关的命令。

②mongod操作命令:客户端可以通过mongod复制集和cfg server访问集群的相关命令。

③MongoDB集群内部命令:mongos、mongod、mongo-cfg集群实例之间交互的命令。

Command命令处理模块核心代码实现如下:

FNr2UfR.png!mobile

《Command命令处理模块源码实现》相关文章重点分析命令处理模块核心代码实现,也就是上面截图中的命令处理源码文件实现。

2. <<transport_layer网络传输层模块源码实现>>衔接回顾

<<transport_layer网络传输层模块源码实现三>> 一文中,我们对service_state_machine状态机调度子模块进行了分析,该模块中的dealTask任务进行MongoDB内部业务逻辑处理,其核心实现如下:

//dealTask处理  
void ServiceStateMachine::_processMessage(ThreadGuard guard) {  
  ......
  //command处理、DB访问后的数据通过dbresponse返回  
  DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);  
  ......
}

上面的 sep对应mongod或者mongos实例的服务入口实现,该 seq成员分别在如下代码中初始化为ServiceEntryPointMongod和ServiceEntryPointMongod类实现。SSM状态机的_seq成员初始化赋值核心代码实现如下:

//mongos实例启动初始化  
static ExitCode runMongosServer() {  
  ......  
  //mongos实例对应sep为ServiceEntryPointMongos  
  auto sep = stdx::make_unique<ServiceEntryPointMongos>(getGlobalServiceContext());  
  getGlobalServiceContext()->setServiceEntryPoint(std::move(sep));  
  ......  
}  
 
//mongod实例启动初始化  
ExitCode _initAndListen(int listenPort) {  
  ......  
  //mongod实例对应sep为ServiceEntryPointMongod  
  serviceContext->setServiceEntryPoint(  
      stdx::make_unique<ServiceEntryPointMongod>(serviceContext));  
  ......  
}  
 
//SSM状态机初始化  
ServiceStateMachine::ServiceStateMachine(...)  
  : _state{State::Created},  
    //mongod和mongos实例的服务入口通过这里赋值给_seq成员变量  
    _sep{svcContext->getServiceEntryPoint()},  
    ......  
}

通过上面的几个核心接口实现,把mongos和mongod两个实例的服务入口与状态机SSM(ServiceStateMachine)联系起来,最终和下面的command命令处理模块关联。

dealTask进行一次MongoDB请求的内部逻辑处理,该处理由_sep->handleRequest()接口实现。由于mongos和mongod服务入口分别由ServiceEntryPointMongos和ServiceEntryPointMongod两个类实现,因此dealTask也就演变为如下接口处理:

①mongos实例:ServiceEntryPointMongos::handleRequest(…)

②Mongod实例::ServiceEntryPointMongod::handleRequest(…)

这两个接口入参都是OperationContext和Message,分别对应操作上下文、请求原始数据内容。下文会分析Message解析实现、OperationContext服务上下文实现将在后续章节分析。

Mongod和mongos实例服务入口类都继承自网络传输模块中的ServiceEntryPointImpl类,如下图所示:

YBBniy.png!mobile

Tips: mongos和mongod服务入口类为何要继承网络传输模块服务入口类?

原因是一个请求对应一个链接session,该session对应的请求又和SSM状态机唯一对应。所有客户端请求对应的SSM状态机信息全部保存再ServiceEntryPointImpl._sessions成员中,而command命令处理模块为SSM状态机任务中的dealTask任务,通过该继承关系,ServiceEntryPointMongod和ServiceEntryPointMongos子类也就可以和状态机及任务处理关联起来,同时也可以获取当前请求对应的session链接信息。

3. MongoDB协议解析

《transport_layer网络传输层模块源码实现二》 中的数据收发子模块完成了一个完整MongoDB报文的接收,一个MongoDB报文由Header头部+opCode包体组成,如下图所示:

i2EVziz.png!mobile

上图中各个字段说明如下表:

Header or body 字段名 msg header messageLength 整个message长度,包括header长度和body长度 requestID 该请求id信息 responseTo 应答id opCode 操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等 msg body Body 不同opCode对应的包体内容

opCode取值比较多,早期版本中OP_INSERT、OP_DELETE、OP_UPDATE、OP_QUERY分别针对增删改查请求,MongoDB从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。本文以OP_MSG操作码对应协议为例进行分析,其他操作码协议分析过程类似,OP_MSG请求协议格式如下:

OP_MSG {  
  //MongoDB报文头部  
  MsgHeader header;            
  //位图,用于标识报文是否需要校验 是否需要应答等  
  uint32 flagBits;           // message flags  
  //报文内容,例如find write等命令内容通过bson格式存在于该结构中  
  Sections[] sections;       // data sections  
  //报文CRC校验  
  optional<uint32> checksum; // optional CRC-32C checksum  
}  

OP_MSG各个字段说明如下表:

字段名 功能说明 header MongoDB报文头部,详见msg header说明 flagBits OP_MSG位图信息,其中0-15位只有第0和第1位有效,16-31位为可选位,各位功能说明如下:Bit-0: 如果该位置1,则说明报文需要校验  Bit-1: 标记是否需要应答客户端  Bit-[2-15]: 第2-第15位,必须位0,否则解析异常  Bit-[16-31]:可选位,3.6版本意义不大。 sections 各个请求的命令内容都在该结构中,sections=kind+body组成,kind分为两种类型:Kind-0:客户端的真正请求内容,kind后紧跟的bson格式数据即为命令请求  Kind-1:Document Sequence,文档序列号,暂时不知道具体用途。 checksum 如果有校验标记,用于报文校验,默认4字节

一个完整OP_MSG请求格式如下:

qQvANza.png!mobile

除了通用头部header外,客户端命令请求实际上都保存于sections字段中,该字段存放的是请求的原始bson格式数据。BSON是由10gen开发的一个数据格式,目前主要用于MongoDB中,是MongoDB的数据存储格式。BSON基于JSON格式,选择JSON进行改造的原因主要是JSON的通用性及JSON的schemaless的特性。BSON相比JSON具有以下特性:

①Lightweight(更轻量级)

②Traversable(易操作)

③Efficient(高效性能)

本文重点不是分析bson协议格式,bson协议实现细节将在后续章节分享。bson协议更多设计细节详见: http://bsonspec.org/

总结:一个完整MongoDB报文由header+body组成,其中header长度固定为16字节,body长度等于messageLength-16。Header部分协议解析由message.cpp和message.h两源码文件实现,body部分对应的OP_MSG类请求解析由op_msg.cpp和op_msg.h两源码文件实现。

4. MongoDB报文通用头部解析及封装源码实现

Header头部解析由src/mongo/util/net目录下message.cpp和message.h两文件完成,该类主要完成通用header头部和body部分的解析、封装。因此报文头部核心代码分为以下两类:

①报文头部内容解析及封装(MSGHEADER命名空间实现)

②头部和body内容解析及封装(MsgData命名空间实现)

4.1 MongoDB报文头部解析及封装核心代码实现

MongoDB报文头部解析由namespace MSGHEADER {…}实现,该类主要成员及接口实现如下:

namespace MSGHEADER {  
//header头部各个字段信息  
struct Layout {  
  //整个message长度,包括header长度和body长度  
  int32_t messageLength;    
  //requestID 该请求id信息  
  int32_t requestID;        
  //getResponseToMsgId解析  
  int32_t responseTo;        
  //操作类型:OP_UPDATE、OP_INSERT、OP_QUERY、OP_DELETE、OP_MSG等  
  int32_t opCode;  
};  
 
//ConstView实现header头部数据解析  
class ConstView {  
public:  
  ......  
  //初始化构造  
  ConstView(const char* data) : _data(data) {}  
  //获取_data地址  
  const char* view2ptr() const {  
      return data().view();  
  }  
  //TransportLayerASIO::ASIOSourceTicket::_headerCallback调用  
  //解析header头部的messageLength字段  
  int32_t getMessageLength() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, messageLength));  
  }  
  //解析header头部的requestID字段  
  int32_t getRequestMsgId() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, requestID));  
  }  
  //解析header头部的getResponseToMsgId字段  
  int32_t getResponseToMsgId() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, responseTo));  
  }  
  //解析header头部的opCode字段  
  int32_t getOpCode() const {  
      return data().read<LittleEndian<int32_t>>(offsetof(Layout, opCode));  
  }  
 
protected:  
  //MongoDB报文数据起始地址  
  const view_type& data() const {  
      return _data;  
  }  
private:  
  //数据部分  
  view_type _data;  
};  
 
//View填充header头部数据  
class View : public ConstView {  
public:  
  ......  
  //构造初始化  
  View(char* data) : ConstView(data) {}  
  //header起始地址  
  char* view2ptr() {  
      return data().view();  
  }  
  //以下四个接口进行header填充  
  //填充header头部messageLength字段  
  void setMessageLength(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, messageLength));  
  }  
  //填充header头部requestID字段  
  void setRequestMsgId(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, requestID));  
  }  
  //填充header头部responseTo字段  
  void setResponseToMsgId(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, responseTo));  
  }  
  //填充header头部opCode字段  
  void setOpCode(int32_t value) {  
      data().write(tagLittleEndian(value), offsetof(Layout, opCode));  
  }  
private:  
  //指向header起始地址  
  view_type data() const {  
      return const_cast<char*>(ConstView::view2ptr());  
  }  
};  
}

从上面的header头部解析、填充的实现类可以看出,header头部解析由MSGHEADER::ConstView实现;header头部填充由MSGHEADER::View完成。实际上代码实现上,通过offsetof来进行移位,从而快速定位到头部对应字段。

4.2 MongoDB报文头部+body解析封装核心代码实现

Namespace MSGHEADER{…}命名空间只负责header头部的处理,namespace MsgData{…}命名空间相对MSGHEADER命名空间更加完善,除了处理头部解析封装外,还负责body数据起始地址维护、body数据封装、数据长度检查等。MsgData命名空间核心代码实现如下:

mespace MsgData {  
ruct Layout {  
//数据填充组成:header部分  
MSGHEADER::Layout header;  
//数据填充组成: body部分,body先用data占位置  
char data[4];  
 

解析header字段信息及body其实地址信息  
ass ConstView {  
blic:  
//初始化构造  
ConstView(const char* storage) : _storage(storage) {}  
//获取数据起始地址  
const char* view2ptr() const {  
    return storage().view();  
}  

//以下四个接口间接执行前面的MSGHEADER中的头部字段解析  
//填充header头部messageLength字段  
int32_t getLen() const {  
    return header().getMessageLength();  
}  
//填充header头部requestID字段  
int32_t getId() const {  
    return header().getRequestMsgId();  
}  
//填充header头部responseTo字段  
int32_t getResponseToMsgId() const {  
    return header().getResponseToMsgId();  
}  
//获取网络数据报文中的opCode字段  
NetworkOp getNetworkOp() const {  
    return NetworkOp(header().getOpCode());  
}  
//指向body起始地址  
const char* data() const {  
    return storage().view(offsetof(Layout, data));  
}  
//messageLength长度检查,opcode检查  
bool valid() const {  
    if (getLen() <= 0 || getLen() > (4 * BSONObjMaxInternalSize))  
        return false;  
    if (getNetworkOp() < 0 || getNetworkOp() > 30000)  
        return false;  
    return true;  
}  
......  
otected:  
//获取_storage  
const ConstDataView& storage() const {  
    return _storage;  
}  
//指向header起始地址  
MSGHEADER::ConstView header() const {  
    return storage().view(offsetof(Layout, header));  
}  
ivate:  
//MongoDB报文存储在这里  
ConstDataView _storage;  
 

填充数据,包括Header和body  
ass View : public ConstView {  
blic:  
//构造初始化  
View(char* storage) : ConstView(storage) {}  
......  
//获取报文起始地址  
char* view2ptr() {  
    return storage().view();  
}  

//以下四个接口间接执行前面的MSGHEADER中的头部字段构造  
//以下四个接口完成msg header赋值  
//填充header头部messageLength字段  
void setLen(int value) {  
    return header().setMessageLength(value);  
}  
//填充header头部messageLength字段  
void setId(int32_t value) {  
    return header().setRequestMsgId(value);  
}  
//填充header头部messageLength字段  
void setResponseToMsgId(int32_t value) {  
    return header().setResponseToMsgId(value);  
}  
//填充header头部messageLength字段  
void setOperation(int value) {  
    return header().setOpCode(value);  
}  

using ConstView::data;  
//指向data  
char* data() {  
    return storage().view(offsetof(Layout, data));  
}  
ivate:  
//也就是报文起始地址  
  DataView storage() const {  
      return const_cast<char*>(ConstView::view2ptr());  
  }  
  //指向header头部  
  MSGHEADER::View header() const {  
      return storage().view(offsetof(Layout, header));  
  }  
};  
 
......  
//Value为前面的Layout,减4是因为有4字节填充data,所以这个就是header长度  
const int MsgDataHeaderSize = sizeof(Value) - 4;  
 
//除去头部后的数据部分长度  
inline int ConstView::dataLen() const {  
  return getLen() - MsgDataHeaderSize;  
}  
} // namespace MsgData  

和MSGHEADER命名空间相比,MsgData这个namespace命名空间接口实现和前面的MSGHEADER命名空间实现大同小异。MsgData不仅仅处理header头部的解析组装,还负责body部分数据头部指针指向、头部长度检查、opCode检查、数据填充等。其中,MsgData命名空间中header头部的解析构造底层依赖MSGHEADER实现。

4.3 Message/DbMessage核心代码实现

《transport_layer网络传输层模块源码实现二》 中,从底层ASIO库接收到的MongoDB报文是存放在Message结构中存储,最终存放在ServiceStateMachine._inMessage成员中。

在前面第2章我们知道mongod和mongso实例的服务入口接口handleRequest(…)中都带有Message入参,也就是接收到的Message数据通过该接口处理。Message类主要接口实现如下:

//DbMessage._msg成员为该类型  
class Message {  
public:  
  //message初始化  
  explicit Message(SharedBuffer data) : _buf(std::move(data)) {}  
  //头部header数据  
  MsgData::View header() const {  
      verify(!empty());  
      return _buf.get();  
  }  
  //获取网络数据报文中的op字段  
  NetworkOp operation() const {  
      return header().getNetworkOp();  
  }  
  //_buf释放为空  
  bool empty() const {  
      return !_buf;  
  }  
  //获取报文总长度messageLength  
  int size() const {  
      if (_buf) {  
          return MsgData::ConstView(_buf.get()).getLen();  
      }  
      return 0;  
  }  
  //body长度  
  int dataSize() const {  
      return size() - sizeof(MSGHEADER::Value);  
  }  
  //buf重置  
  void reset() {  
      _buf = {};  
  }  
  // use to set first buffer if empty  
  //_buf直接使用buf空间  
  void setData(SharedBuffer buf) {  
      verify(empty());  
      _buf = std::move(buf);  
  }  
    //把msgtxt拷贝到_buf中  
  void setData(int operation, const char* msgtxt) {  
      setData(operation, msgtxt, strlen(msgtxt) + 1);  
  }  
  //根据operation和msgdata构造一个完整MongoDB报文  
  void setData(int operation, const char* msgdata, size_t len) {  
      verify(empty());  
      size_t dataLen = len + sizeof(MsgData::Value) - 4;  
      _buf = SharedBuffer::allocate(dataLen);  
      MsgData::View d = _buf.get();  
      if (len)  
          memcpy(d.data(), msgdata, len);  
      d.setLen(dataLen);  
      d.setOperation(operation);  
  }  
  ......  
  //获取_buf对应指针  
  const char* buf() const {  
      return _buf.get();  
  }  
 
private:  
  //存放接收数据的buf  
  SharedBuffer _buf;  
};  

Message是操作MongoDB收发报文最直接的实现类,该类主要完成一个完整MongoDB报文封装。有关MongoDB报文头后面的body更多的解析实现在DbMessage类中完成,DbMessage类包含Message类成员 msg。实际上,Message报文信息在handleRequest(…)实例服务入口中赋值给DbMessage. msg,报文后续的body处理继续由DbMessage类相关接口完成处理。DbMessage和Message类关系如下:

class DbMessage {  
  ......  
  //包含Message成员变量  
  const Message& _msg;  
  //MongoDB报文起始地址
  const char* _nsStart;
  //报文结束地址
  const char* _theEnd;
}  
 
DbMessage::DbMessage(const Message& msg) : _msg(msg),  
_nsStart(NULL), _mark(NULL), _nsLen(0) {  
  //一个MongoDB报文(header+body)数据的结束地址  
  _theEnd = _msg.singleData().data() + _msg.singleData().dataLen();  
  //报文起始地址 [_nextjsobj, _theEnd ]之间的数据就是一个完整MongoDB报文  
  _nextjsobj = _msg.singleData().data();  
  ......  
}

DbMessage. msg成员为DbMessage 类型,DbMessage的 nsStart和_theEnd成员分别记录完整MongoDB报文的起始地址和结束地址,通过这两个指针就可以获取一个完整MongoDB报文的全部内容,包括header和body。

注意:DbMessage是早期MongoDB版本(version<3.6)中用于报文body解析封装的类,这些类针对opCode=[dbUpdate, dbDelete]这个区间的操作。在MongoDB新版本(version>=3.6)中,body解析及封装由op_msg.h和op_msg.cpp代码文件中的clase OpMsgRequest{}完成处理。

4.4 OpMsg报文解析封装核心代码实现

MongoDB从3.6版本开始默认使用OP_MSG操作作为默认opCode,是一种可扩展的消息格式,旨在包含其他操作码的功能,新版本读写请求协议都对应该操作码。OP_MSG对应MongoDB报文body解析封装处理由OpMsg类相关接口完成,OpMsg::parse(Message)从Message中解析出报文body内容,其核心代码实现如下:

struct OpMsg {  
    ......  
  //msg解析赋值见OpMsg::parse    
  //各种命令(insert update find等)都存放在该body中  
  BSONObj body;    
  //sequences用法暂时没看懂,感觉没什么用?先跳过  
  std::vector<DocumentSequence> sequences; //赋值见OpMsg::parse  
}  
//从message中解析出OpMsg信息  
OpMsg OpMsg::parse(const Message& message) try {  
  //message不能为空,并且opCode必须为dbMsg  
  invariant(!message.empty());  
  invariant(message.operation() == dbMsg);  
  //获取flagBits  
  const uint32_t flags = OpMsg::flags(message);  
  //flagBits有效性检查,bit 0-15中只能对第0和第1位操作  
  uassert(ErrorCodes::IllegalOpMsgFlag,  
          str::stream() << "Message contains illegal flags value: Ob"  
                        << std::bitset<32>(flags).to_string(),  
          !containsUnknownRequiredFlags(flags));  
 
  //校验码默认4字节  
  constexpr int kCrc32Size = 4;  
  //判断该mongo报文body内容是否启用了校验功能  
  const bool haveChecksum = flags & kChecksumPresent;  
  //如果有启用校验功能,则报文末尾4字节为校验码  
  const int checksumSize = haveChecksum ? kCrc32Size : 0;  
  //sections字段内容  
  BufReader sectionsBuf(message.singleData().data() + sizeof(flags),  
                        message.dataSize() - sizeof(flags) - checksumSize);  
 
  //默认先设置位false  
  bool haveBody = false;  
  OpMsg msg;  
  //解析sections对应命令请求数据  
  while (!sectionsBuf.atEof()) {  
      //BufReader::read读取kind内容,一个字节  
      const auto sectionKind = sectionsBuf.read<Section>();  
      //kind为0对应命令请求body内容,内容通过bson报错  
      switch (sectionKind) {  
          //sections第一个字节是0说明是body  
          case Section::kBody: {  
              //默认只能有一个body  
              uassert(40430, "Multiple body sections in message", !haveBody);  
              haveBody = true;  
              //命令请求的bson信息保存在这里  
              msg.body = sectionsBuf.read<Validated<BSONObj>>();  
              break;  
          }  
 
          //DocSequence暂时没看明白,用到的地方很少,跳过,后续等  
          //该系列文章主流功能分析完成后,从头再回首分析  
          case Section::kDocSequence: {  
                ......  
          }  
      }  
  }  
  //OP_MSG必须有body内容  
  uassert(40587, "OP_MSG messages must have a body", haveBody);  
  //body和sequence去重判断  
  for (const auto& docSeq : msg.sequences) {  
      ......  
  }  
  return msg;  
}

OpMsg类被OpMsgRequest类继承,OpMsgRequest类中核心接口就是解析出OpMsg.body中的库信息和表信息,OpMsgRequest类代码实现如下:

//协议解析得时候会用到,见runCommands  
struct OpMsgRequest : public OpMsg {  
  ......  
  //构造初始化  
  explicit OpMsgRequest(OpMsg&& generic) : OpMsg(std::move(generic)) {}  
  //opMsgRequestFromAnyProtocol->OpMsgRequest::parse  
  //从message中解析出OpMsg所需成员信息  
  static OpMsgRequest parse(const Message& message) {  
      //OpMsg::parse  
      return OpMsgRequest(OpMsg::parse(message));  
  }  
  //根据db body extraFields填充OpMsgRequest  
  static OpMsgRequest fromDBAndBody(... {  
      OpMsgRequest request;  
      request.body = ([&] {  
          //填充request.body  
          ......  
      }());  
      return request;  
  }  
  //从body中获取db name  
  StringData getDatabase() const {  
      if (auto elem = body["$db"])  
          return elem.checkAndGetStringData();  
      uasserted(40571, "OP_MSG requests require a $db argument");  
  }  
  //find insert 等命令信息 body中的第一个elem就是command 名  
  StringData getCommandName() const {  
      return body.firstElementFieldName();  
  }  
};

OpMsgRequest通过OpMsg::parse(message)解析出OpMsg信息,从而获取到body内容,GetCommandName()接口和getDatabase()则分别从body中获取库DB信息、命令名信息。通过该类相关接口,命令名(find、write、update等)和DB库都获取到了。

OpMsg模块除了OP_MSG相关报文解析外,还负责OP_MSG报文组装填充,该模块接口功能大全如下表:

类名 接口名 功能说明 containsUnknownRequiredFlags OP_MSG报文体flags检查 OpMsg OpMsg::flags(…) 获取message中的flag OpMsg::replaceFlags(…) 设置message中的flags OpMsg::parse(…) 从message中解析出OpMsg成员信息 OpMsg::serialize() OpMsg序列化 OpMsg::shareOwnershipWith(…) 共享buffer设置 OpMsgBuilder OpMsgBuilder::beginDocSequence(…) 填充kDocSequence类型的name数据 OpMsgBuilder::finishDocumentStream(…) 完成流式处理 OpMsgBuilder::beginBody() 填充kBody类型数据 OpMsgBuilder::resumeBody() 获取body数据 OpMsgBuilder::finish() 构造message数据

5. Mongod实例服务入口核心代码实现

Mongod实例服务入口类ServiceEntryPointMongod继承ServiceEntryPointImpl类,mongod实例的报文解析处理、命令解析、命令执行都由该类负责处理。ServiceEntryPointMongod核心接口可以细分为:opCode解析及回调处理、命令解析及查找、命令执行三个子模块。

5.1 opCode解析及回调处理

OpCode操作码解析及其回调处理由ServiceEntryPointMongod::handleRequest(…)接口实现,核心代码实现如下:

//mongod服务对于客户端请求的处理    
//通过状态机SSM模块的如下接口调用:ServiceStateMachine::_processMessage  
DbResponse ServiceEntryPointMongod::handleRequest(OperationContext* opCtx, const Message& m) {  
  //获取opCode,3.6版本对应客户端默认使用OP_MSG  
  NetworkOp op = m.operation();  
  ......  
  //根据message构造DbMessage  
  DbMessage dbmsg(m);  
  //根据操作上下文获取对应的client  
  Client& c = *opCtx->getClient();    
  ......  
  //获取库.表信息,注意只有dbUpdate<opCode<dbDelete的opCode请求才通过dbmsg直接获取库和表信息  
  const char* ns = dbmsg.messageShouldHaveNs() ? dbmsg.getns() : NULL;  
  const NamespaceString nsString = ns ? NamespaceString(ns) : NamespaceString();  
  ....  
  //CurOp::debug 初始化opDebug,慢日志相关记录  
  OpDebug& debug = currentOp.debug();  
  //慢日志阀值  
  long long logThresholdMs = serverGlobalParams.slowMS;  
  //时MongoDB将记录这次慢操作,1为只记录慢操作,即操作时间大于了设置的配置,2表示记录所有操作    
  bool shouldLogOpDebug = shouldLog(logger::LogSeverity::Debug(1));  
  DbResponse dbresponse;  
  if (op == dbMsg || op == dbCommand || (op == dbQuery && isCommand)) {  
      //新版本op=dbMsg,因此走这里  
      //从DB获取数据,获取到的数据通过dbresponse返回  
      dbresponse = runCommands(opCtx, m);    
  } else if (op == dbQuery) {  
      ......  
      //早期MongoDB版本查询走这里  
      dbresponse = receivedQuery(opCtx, nsString, c, m);  
  } else if (op == dbGetMore) {    
      //早期MongoDB版本查询走这里  
      dbresponse = receivedGetMore(opCtx, m, currentOp, &shouldLogOpDebug);  
  } else {  
      ......  
      //早期版本增 删 改走这里处理  
        if (op == dbInsert) {  
            receivedInsert(opCtx, nsString, m); //插入操作入口   新版本CmdInsert::runImpl  
        } else if (op == dbUpdate) {  
            receivedUpdate(opCtx, nsString, m); //更新操作入口    
        } else if (op == dbDelete) {  
            receivedDelete(opCtx, nsString, m); //删除操作入口    
        }  
  }  
  //获取runCommands执行时间,也就是内部处理时间  
  debug.executionTimeMicros = durationCount<Microseconds>(currentOp.elapsedTimeExcludingPauses());  
  ......  
  //慢日志记录  
  if (shouldLogOpDebug || (shouldSample && debug.executionTimeMicros > logThresholdMs * 1000LL)) {  
      Locker::LockerInfo lockerInfo;    
      //OperationContext::lockState LockerImpl<>::getLockerInfo  
      opCtx->lockState()->getLockerInfo(&lockerInfo);  
 
  //OpDebug::report 记录慢日志到日志文件  
      log() << debug.report(&c, currentOp, lockerInfo.stats);  
  }  
  //各种统计信息  
  recordCurOpMetrics(opCtx);  
}

Mongod的handleRequest()接口主要完成以下工作:

①从Message中获取OpCode,早期版本每个命令又对应取值,例如增删改查早期版本分别对应:dbInsert、dbDelete、dbUpdate、dbQuery;MongoDB 3.6开始,默认请求对应OpCode都是OP_MSG,本文默认只分析OpCode=OP_MSG相关的处理。

②获取本操作对应的Client客户端信息。

③如果是早期版本,通过Message构造DbMessage,同时解析出库.表信息。

④根据不同OpCode执行对应回调操作,OP_MSG对应操作为runCommands(…),获取的数据通过dbresponse返回。

⑤获取到db层返回的数据后,进行慢日志判断,如果db层数据访问超过阀值,记录慢日志。

⑥设置debug的各种统计信息。

5.2 命令解析及查找

从上面的分析可以看出,接口最后调用runCommands(…),该接口核心代码实现如下所示:

//message解析出对应command执行  
DbResponse runCommands(OperationContext* opCtx, const Message& message) {  
  //获取message对应的ReplyBuilder,3.6默认对应OpMsgReplyBuilder  
  //应答数据通过该类构造  
  auto replyBuilder = rpc::makeReplyBuilder(rpc::protocolForMessage(message));  
  [&] {  
      OpMsgRequest request;  
      try { // Parse.  
          //协议解析 根据message获取对应OpMsgRequest  
          request = rpc::opMsgRequestFromAnyProtocol(message);  
      }  
  }  
  try { // Execute.  
      //opCtx初始化  
      curOpCommandSetup(opCtx, request);  
      //command初始化为Null  
      Command* c = nullptr;  
      //OpMsgRequest::getCommandName查找  
      if (!(c = Command::findCommand(request.getCommandName()))) {  
            //没有找到相应的command的后续异常处理  
            ......  
      }  
      //执行command命令,获取到的数据通过replyBuilder.get()返回  
      execCommandDatabase(opCtx, c, request, replyBuilder.get());  
  }  
  //OpMsgReplyBuilder::done对数据进行序列化操作  
  auto response = replyBuilder->done();  
  //responseLength赋值  
  CurOp::get(opCtx)->debug().responseLength = response.header().dataLen();  
  // 返回  
  return DbResponse{std::move(response)};  
}  

RunCommands(…)接口从message中解析出OpMsg信息,然后获取该OpMsg对应的command命令信息,最后执行该命令对应的后续处理操作。主要功能说明如下:

①获取该OpCode对应replyBuilder,OP_MSG操作对应builder为OpMsgReplyBuilder。

②根据message解析出OpMsgRequest数据,OpMsgRequest来中包含了真正的命令请求bson信息。

③opCtx初始化操作。

④通过request.getCommandName()返回命令信息(如“find”、“update”等字符串)。

⑤通过Command::findCommand(command name)从CommandMap这个map表中查找是否支持该command命令。如果没找到说明不支持,如果找到说明支持。

⑥调用execCommandDatabase(…)执行该命令,并获取命令的执行结果。

⑦根据command执行结果构造response并返回

5.3 命令执行

void execCommandDatabase(...) {  
  ......  
  //获取dbname  
  const auto dbname = request.getDatabase().toString();  
  ......  
  //mab表存放从bson中解析出的elem信息  
  StringMap<int> topLevelFields;  
  //body elem解析  
  for (auto&& element : request.body) {  
      //获取bson中的elem信息  
      StringData fieldName = element.fieldNameStringData();  
      //如果elem信息重复,则异常处理  
      ......  
  }  
  //如果是help命令,则给出help提示  
  if (Command::isHelpRequest(helpField)) {  
      //给出help提示  
      Command::generateHelpResponse(opCtx, replyBuilder, *command);  
      return;  
  }  
  //权限认证检查,检查该命令执行权限  
  uassertStatusOK(Command::checkAuthorization(command, opCtx, request));  
  ......  
 
  //该命令执行次数统计 db.serverStatus().metrics.commands可以获取统计信息  
  command->incrementCommandsExecuted();  
  //真正的命令执行在这里面  
  retval = runCommandImpl(opCtx, command, request, replyBuilder, startOperationTime);  
  //该命令执行失败次数统计  
  if (!retval) {  
      command->incrementCommandsFailed();  
    }  
    ......  
}

execCommandDatabase(…)最终调用RunCommandImpl(…)进行对应命令的真正处理,该接口核心代码实现如下:

bool runCommandImpl(...) {  
  //获取命令请求内容body  
  BSONObj cmd = request.body;  
  //获取请求中的DB库信息  
  const std::string db = request.getDatabase().toString();  
  //ReadConcern检查  
  Status rcStatus = waitForReadConcern(  
      opCtx, repl::ReadConcernArgs::get(opCtx), command->allowsAfterClusterTime(cmd));  
  //ReadConcern检查不通过,直接异常提示处理  
  if (!rcStatus.isOK()) {  
        //异常处理  
        return;  
  }  
  if (!command->supportsWriteConcern(cmd)) {  
      //命令不支持WriteConcern,但是对应的请求中却带有WriteConcern配置,直接报错不支持  
      if (commandSpecifiesWriteConcern(cmd)) {  
          //异常处理"Command does not support writeConcern"  
          ......  
          return result;  
      }  
  //调用Command::publicRun执行不同命令操作  
      result = command->publicRun(opCtx, request, inPlaceReplyBob);  
  }  
  //提取WriteConcernOptions信息  
  auto wcResult = extractWriteConcern(opCtx, cmd, db);  
  //提取异常,直接异常处理  
  if (!wcResult.isOK()) {  
      //异常处理  
      ......  
      return result;  
  }  
  ......  
  //执行对应的命令Command::publicRun,执行不同命令操作  
  result = command->publicRun(opCtx, request, inPlaceReplyBob);  
  ......  
}

RunCommandImpl(…)接口最终调用该接口入参的command,执行 command->publicRun(…)接口,也就是命令模块的公共publicRun。

5.4 总结

Mongod服务入口首先从message中解析出opCode操作码,3.6版本对应客户端默认操作码为OP_MSQ,解析出该操作对应OpMsgRequest信息。然后从message原始数据中解析出command命令字符串后,继续通过全局Map表种查找是否支持该命令操作,如果支持则执行该命令;如果不支持,直接异常打印,同时返回。

6.  Mongos实例服务入口核心代码实现

mongos服务入口核心代码实现过程和mongod服务入口代码实现流程几乎相同,mongos实例message解析、OP_MSG操作码处理、command命令查找等流程和上一章节mongod实例处理过程类似,本章节不在详细分析。Mongos实例服务入口处理调用流程如下:

ServiceEntryPointMongos::handleRequest(…)->Strategy::clientCommand(…)–>runCommand(…)->execCommandClient(…)

EbqQz2.png!mobile

最后的接口核心代码实现如下:

void runCommand(...) {  
  ......  
  //获取请求命令name  
  auto const commandName = request.getCommandName();  
  //从全局map表中查找  
  auto const command = Command::findCommand(commandName);  
  //没有对应的command存在,抛异常说明不支持该命令  
  if (!command) {  
      ......  
      return;  
  }  
  ......  
  //执行命令  
  execCommandClient(opCtx, command, request, builder);  
  ......  
}  

void execCommandClient(...)  
{  
  ......  
  //认证检查,是否有操作该command命令的权限,没有则异常提示  
  Status status = Command::checkAuthorization(c, opCtx, request);    
  if (!status.isOK()) {  
      Command::appendCommandStatus(result, status);  
      return;  
  }  
  //该命令的执行次数自增,代理上面也是要计数的  
  c->incrementCommandsExecuted();  
  //如果需要command统计,则加1  
  if (c->shouldAffectCommandCounter()) {  
      globalOpCounters.gotCommand();  
  }  
  ......  
  //有部分命令不支持writeconcern配置,报错  
  bool supportsWriteConcern = c->supportsWriteConcern(request.body);  
  //不支持writeconcern又带有该参数的请求,直接异常处理"Command does not support writeConcern"  
  if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {  
      ......  
      return;  
  }  
  //执行本命令对应的公共publicRun接口,Command::publicRun  
  ok = c->publicRun(opCtx, request, result);  
  ......  
}

Tips: mongos和mongod实例服务入口核心代码实现的一点小区别

①Mongod实例opCode操作码解析、OpMsg解析、command查找及对应命令调用处理都由class ServiceEntryPointMongod{…}类一起完成。

②mongos实例则把opCode操作码解析交由class ServiceEntryPointMongos{…}类实现,OpMsg解析、command查找及对应命令调用处理放到了clase Strategy{…}类来处理。

7. 总结

MongoDB报文解析及组装流程总结:

①一个完整MongoDB报文由通用报文header头部+body部分组成。

②Body部分内容,根据报文头部的opCode来决定不同的body内容。

③3.6版本对应客户端请求opCode默认为OP_MSG,该操作码对应body部分由flagBits + sections + checksum组成,其中sections 中存放的是真正的命令请求信息,已bson数据格式保存。

④Header头部和body报文体封装及解析过程由class Message {…}类实现

⑤Body中对应command命令名、库名、表名的解析在MongoDB(version<3.6)低版本协议中由class DbMessage {…}类实现

⑥Body中对应command命令名、库名、表名的解析在MongoDB(version<3.6)低版

本协议中由struct OpMsgRequest{…}结构和struct OpMsg {…}类实现。

Mongos和mongod实例的服务入口处理流程大同小异,整体处理流程如下:

①从message解析出opCode操作码,根据不同操作码执行对应操作码回调。

②根据message解析出OpMsg request信息,MongoDB报文的命令信息就存储在该body中,该body已bson格式存储。

③从body中解析出command命令字符串信息(如“insert”、“update”等)。

④从全局_commands MAP表中查找是否支持该命令,如果支持则执行该命令处理,如果不支持则直接报错提示。

⑤最终找到对应command命令后,执行command的功能run接口。

说明:第3章的协议解析及封装过程实际上应该算是网络处理模块范畴,本文为了分析command命令处理模块方便,把该部分实现归纳到了命令处理模块,这样方便理解。

Tips:下期继续分享不同command命令执行细节。

8. 遗留问题

第1章节中的统计信息,将在command模块核心代码分析完毕后揭晓答案,《MongoDB command命令处理模块源码实现二》中继续分析,敬请关注。

更多文章:

常用高并发网络线程模型设计及MongoDB线程模型优化实践

MongoDB网络传输处理源码实现及性能调优-体验内核性能极致设计

OPPO百万级高并发MongoDB集群性能数十倍提升优化实践

盘点 2020 | 我要为分布式数据库 MongoDB 在国内影响力提升及推广做点事

MongoDB网络传输层模块源码实现二

MongoDB网络传输层模块源码实现三

MongoDB网络传输层模块源码实现四

作者:杨亚洲

前滴滴出行技术专家,现任OPPO文档数据库MongoDB负责人,负责oppo千万级峰值TPS/十万亿级数据量文档数据库MongoDB内核研发及运维工作,一直专注于分布式缓存、高性能服务端、数据库、中间件等相关研发。Github账号地址:

https://github.com/y123456yz

aE7BZnb.png!mobileBVfEva.png!mobileUzyqIfi.png!mobileNvuYj2N.png!mobile

MongoDB中文手册翻译正在进行中,欢迎更多朋友在自己的空闲时间学习并进行文档翻译,您的翻译将由社区专家进行审阅,并拥有署名权更新到中文用户手册和发布到社区微信内容平台。

更多问题可以添加社区助理小芒果微信(mongoingcom)咨询,进入社区微信交流群请备注“mongo”。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK