0

[原创] KCP 源码分析(上) - hellozhangjz

 6 months ago
source link: https://www.cnblogs.com/hellozhangjz/p/18075534
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

[原创] KCP 源码分析(上)

KCP 协议是一种可靠的传输协议,对比 TCP 取消了累计确认(延迟 ACK)、减小 RTO增长速度、选择性重传而非全部重传。通过用流量换取低时延。 KCP 中最重要的两个数据结构IKCPCB和IKCPSEG,一个IKCPCB对应一个 KCP 连接,通过这个结构体维护发送缓存、接收缓存、超时重传时间、窗口大小等。IKCPSEG 对应一个 KCP 数据包,包含该数据包的命令、数据、时间戳、数据长度等信息。源码地址:https://github.com/skywind3000/kcp

1742389-20240315153104041-282634368.png

KCP 数据包结构体:

struct IKCPSEG
{
	struct IQUEUEHEAD node;
	IUINT32 conv; 	// 会话 ID
	IUINT32 cmd;	// KCP 命令:
    				// IKCP_CMD_ACK:这是个 ACK 
    				// IKCP_CMD_WASK:发送方探测接收方的窗口
    				// IKCP_CMD_WINS:接收方回应自己的窗口大小
    				
	IUINT32 frg;	// fragment分段号,如果是流模式:默认为 0
	IUINT32 wnd;	// 窗口大小
	IUINT32 ts;		// 发送方:数据包的发送时间戳。 
  					// 接收方(ACK):所接受数据包的发送时间,而不是发送 ACK 的时间,方便发送方收到 ACK 后计算 rtt。
	IUINT32 sn;		// 发送方:发送数据包的序列号
  					// 接收方(ACK):ACK 号
	IUINT32 una;	// 未确认序列号:期待下次收到的数据包
	IUINT32 len;	// 数据包除去头部的字节数
  
/*-----------------以下成员不会实际发送到网络中,主要是超时重传和快速重传计算的辅助数据-----------------*/
	IUINT32 resendts;	// = current + rto, 超时重传的阈值, 当前时间超过resendts, 就要重发这个数据包
	IUINT32 rto;	// Retransmission Timeout, 下次超时重传的间隔时间, 会随着超时次数增加, 增加速率取决于是不是快速模式
	IUINT32 fastack;// 数据包被跳过次数, 快速重传功能需要
	IUINT32 xmit;	// 该数据包发送次数, transmit 的缩写, ,次数太多判断网络断开
/*-----------------以上成员不会实际发送到网络中,主要是超时重传和快速重传计算的辅助数据-----------------*/
  
	char data[1];	// 数据包携带的数据,大小根据ikcp_segment_new的参数决定
};

KCP 连接结构体:

struct IKCPCB
{
	IUINT32 conv; 	// 会话ID
	IUINT32 mtu; 	// 下层协议的最大传输单元, 一次发送若干个kcp包, 这些包的总长度不超过mtu
	IUINT32 mss; 	// 一个KCP数据包的最大数据载荷, mss+head一定不超过mtu
	IUINT32 state; 	// 连接状态

	IUINT32 snd_una; 	// snd_una之前的包对方(接收方)都已经收到了
	IUINT32 snd_nxt;  	// 下一个要从 send_que 发到 send_buf 的包序列号
	IUINT32 rcv_nxt;	// 下一个要从 rcv_buf 发到 rcv_que 的包序列号

	IUINT32 ts_recent; 	// 没用到
	IUINT32 ts_lastack;	// 没用到

	IUINT32 ssthresh;	// 拥塞窗口从慢启动转换到拥塞避免的窗口阈值
	IINT32 rx_rttval;	// 近4次rtt和srtt的平均差值,反应了rtt偏离srtt的程度
	IINT32 rx_srtt;		// 平滑的rtt,近8次rtt平均值
	IINT32 rx_rto;		// 系统的重传超时时间
	IINT32 rx_minrto; 	// 最小重传超时时间
	
	IUINT32 snd_wnd; 	// 发送窗口大小
	IUINT32 rcv_wnd; 	// 接收窗口大小
	IUINT32 rmt_wnd; 	// 对方接收窗口大小
	IUINT32 cwnd; 		// 拥塞窗口大小
	IUINT32 probe;		// 探测窗口大小

	IUINT32 current;	// 当前时间戳
	IUINT32 interval; 	// 内部flush刷新间隔
	IUINT32 ts_flush; 	// 下一次刷新输出的时间戳
	IUINT32 xmit;		// 该KCP连接超时重传次数

	IUINT32 nrcv_buf; 	// rcv_buf的长度
	IUINT32 nsnd_buf;	// snd_buf的长度
	IUINT32 nrcv_que; 	// rcv_que的长度
	IUINT32 nsnd_que; 	// snd_que的长度

	IUINT32 nodelay;	// 是否启用nodelay模式, ==2为快速模式
	IUINT32 updated;	// 是否调用过update函数
	IUINT32 ts_probe; 	// 下次探测窗口大小的时间戳
	IUINT32 probe_wait; // 探测窗口大小的间隔时间,每次探测对面窗口为0(失败), 探测时间*1.5
	IUINT32 dead_link;	// 断开连接的重传次数阈值
	IUINT32 incr; 		// k*mss , 拥塞窗口等于floor(k)
	struct IQUEUEHEAD snd_queue;// 发送队列
	struct IQUEUEHEAD rcv_queue;// 接收队列
	struct IQUEUEHEAD snd_buf; // 发送缓存, 还没收到 ACK 的包都在这里边
	struct IQUEUEHEAD rcv_buf; // 接收缓存, 将收到的数据暂存, 然后将其中连续的数据放到rcv_queue供上层读取
	IUINT32 *acklist; 	// 一个整数数组,存放要回复的ack,
  						// 结构为 [sn0(接收数据包的序号), ts0(接收数据包的发送时间), sn1, ts1, ...]
	IUINT32 ackcount; 	// 本次需要回复的ack个数
	IUINT32 ackblock; 	// acklist的大小,会动态扩容,类似于 vector
	void *user;			// 用户标识
	char *buffer; 		// 数据缓冲区
	int fastresend; 	// 快速重传的失序阈值, 发送方收到 fastresend 个冗余ACK就触发快速重传
	int fastlimit;  	// 快速重传的次数限制
	int nocwnd; 		// 0: 有拥塞控制, 1: 没有拥塞控制
	int stream;			// 流模式
	int logmask;
	int (*output)(const char *buf, int len, struct IKCPCB *kcp, void *user); // 回调函数,数据发送到下层协议
	void (*writelog)(const char *log, struct IKCPCB *kcp, void *user);
};

ikcp_send

先来看发送方的用户接口:

int ikcp_send(ikcpcb *kcp, const char *buffer, int len)
{
	IKCPSEG *seg;
	int count, // 需要装多少包
		 i;

	assert(kcp->mss > 0);
	if (len < 0) return -1;

	// 字节流模式,如果之前的包没装满,则先把之前的包装满。(粘包现象)
	if (kcp->stream != 0) { 
		if (!iqueue_is_empty(&kcp->snd_queue)) {
      // old:没有被装满的包
			IKCPSEG *old = iqueue_entry(kcp->snd_queue.prev, IKCPSEG, node);
			if (old->len < kcp->mss) { // 前一个包没塞满, 粘包
				int capacity = kcp->mss - old->len;
				int extend = (len < capacity)? len : capacity;
				seg = ikcp_segment_new(kcp, old->len + extend);
				assert(seg);
				if (seg == NULL) {
					return -2;
				}
				iqueue_add_tail(&seg->node, &kcp->snd_queue);
				memcpy(seg->data, old->data, old->len); // 把old数据转移到seg,然后把old删了
				if (buffer) {
					memcpy(seg->data + old->len, buffer, extend);
					buffer += extend;
				}
				seg->len = old->len + extend;
				seg->frg = 0;
				len -= extend;
				iqueue_del_init(&old->node); 
				ikcp_segment_delete(kcp, old); // 删除 old 节点
			}
		}
		if (len <= 0) {
			return 0;
		}
	}

	// 需要几个包来装len字节的数据, 一个包最多装mss字节
	if (len <= (int)kcp->mss) count = 1;
	else count = (len + kcp->mss - 1) / kcp->mss;

	if (count >= (int)IKCP_WND_RCV) return -2;

	if (count == 0) count = 1;

	// 将buffer数据分段装入snd_queue
	for (i = 0; i < count; i++) {
		int size = len > (int)kcp->mss ? (int)kcp->mss : len;
		seg = ikcp_segment_new(kcp, size);
		assert(seg);
		if (seg == NULL) {
			return -2;
		}
		if (buffer && len > 0) {
			memcpy(seg->data, buffer, size);
		}
		seg->len = size;
    	// 上层数据包被分段后的段号,如果开启流模式,默认段号都为 0
		seg->frg = (kcp->stream == 0)? (count - i - 1) : 0; 
		iqueue_init(&seg->node);
		iqueue_add_tail(&seg->node, &kcp->snd_queue); // 将数据包 push 进发送队列
		kcp->nsnd_que++;
		if (buffer) {
			buffer += size;
		}
		len -= size;
	}

	return 0;
}

ikcp_send 的主要逻辑就是将用户数据分段组装成 IKCPSEG 然后将其添加到发送队列。如果是流模式,则没有段号,每个包都是满的,数据有粘包需要用户自己处理。

ikcp_update

上层定时调用,主要功能是设置当前时间戳、计算下一次update 事件以及调用 ikcp_flush,ikcp_flush 才是将数据从发送队列发送到发送缓存的函数。

ikcp_flush

Step1、回应ACK

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    seg.conv = kcp->conv;
	seg.cmd = IKCP_CMD_ACK; // 命令为 ACK
	seg.frg = 0;
	seg.wnd = ikcp_wnd_unused(kcp); // 设置窗口大小
	seg.una = kcp->rcv_nxt;
	seg.len = 0;
	seg.sn = 0;
	seg.ts = 0;
    ...
	count = kcp->ackcount; // 需要回复的 ack 个数
	for (i = 0; i < count; i++) { 
		size = (int)(ptr - buffer);
        // 如果buffer 放不下 seg 的 head ,那就先把 buffer 中的数据先发到网络中
		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
			ikcp_output(kcp, buffer, size); // 将buffer 中的数据先发到网络中
			ptr = buffer;
		}
        // 从 acklist 中取出要发送 ack 的 sn 和 ts
		ikcp_ack_get(kcp, i, &seg.sn, &seg.ts);
        // 只将 seg 的 head 拷贝到数据缓冲区里,注意回应 ack 的报文没有 data。
		ptr = ikcp_encode_seg(ptr, &seg); 
	}

	kcp->ackcount = 0;
  	...
}

这段代码主要功能就是回复 ACK,在接收数据的时候 kcp 会把需要回复的 ACK 放入 acklist,在这里检查kcp->ackcount,发现需要回复 ACK就从 acklist 中取出要发送 ack 的 sn 和 ts存入 seg 然后发送。

Step2、探测窗口

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    ...
	// 对面没有接收缓存,等待probe_wait
	if (kcp->rmt_wnd == 0) {
		if (kcp->probe_wait == 0) { // 初始化探测窗口
			kcp->probe_wait = IKCP_PROBE_INIT;
			kcp->ts_probe = kcp->current + kcp->probe_wait;
		}	
		else {
			if (_itimediff(kcp->current, kcp->ts_probe) >= 0) {  // 已经到了探测时间
				if (kcp->probe_wait < IKCP_PROBE_INIT) 
					kcp->probe_wait = IKCP_PROBE_INIT;
				kcp->probe_wait += kcp->probe_wait / 2; // 每次探测间隔增长 0.5 倍
				if (kcp->probe_wait > IKCP_PROBE_LIMIT)
					kcp->probe_wait = IKCP_PROBE_LIMIT;
				kcp->ts_probe = kcp->current + kcp->probe_wait;
				kcp->probe |= IKCP_ASK_SEND; // 标记需要探测窗口
			}
		}
	}	else { // 一旦对方有,则重置探测时间和探测间隔
		kcp->ts_probe = 0;
		kcp->probe_wait = 0;
	}
	
    // 标记需要发送探测
	if (kcp->probe & IKCP_ASK_SEND) {
		seg.cmd = IKCP_CMD_WASK; // 命令设为 IKCP_CMD_WASK,其他头信息不需要
		size = (int)(ptr - buffer);
		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
			ikcp_output(kcp, buffer, size);
			ptr = buffer;
		}
		ptr = ikcp_encode_seg(ptr, &seg);
	}
  	...
}

当发送方发现对方窗口大小为 0,需要发送探测命令询问对方窗口大小,每次探测间隔都会增长0.5 倍,一旦对方有接收窗口,则重置探测时间和探测间隔。

Step3、回应探测窗口

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    ...
    // 需要回应窗口大小
	if (kcp->probe & IKCP_ASK_TELL) {
		seg.cmd = IKCP_CMD_WINS;  // 命令设为 IKCP_CMD_WINS,其他头信息不需要
		size = (int)(ptr - buffer);
		if (size + (int)IKCP_OVERHEAD > (int)kcp->mtu) {
			ikcp_output(kcp, buffer, size);
			ptr = buffer;
		}
		ptr = ikcp_encode_seg(ptr, &seg);
	}
    ...
}

Step4、发送数据

void ikcp_flush(ikcpcb *kcp)
{
    char *buffer = kcp->buffer; // 数据缓冲区
	char *ptr = buffer;
 	IKCPSEG seg;
    ...
    // 计算可以发多少数据
	cwnd = _imin_(kcp->snd_wnd, kcp->rmt_wnd);
    // kcp->nocwnd == 1 则关闭流控(拥塞控制)
	if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);
    // 如果 snd_nxt(下一个要从 send_que 发到 send_buf 的包序列号) 在发送窗口内
    // 就一直从 snd_que 中取出数据包放到 snd_buf 中, 直到snd_buf满或者snd_queue为空
	while (_itimediff(kcp->snd_nxt, kcp->snd_una + cwnd) < 0) {
		IKCPSEG *newseg;
		if (iqueue_is_empty(&kcp->snd_queue)) break;

		newseg = iqueue_entry(kcp->snd_queue.next, IKCPSEG, node);

		iqueue_del(&newseg->node);
		iqueue_add_tail(&newseg->node, &kcp->snd_buf);
		kcp->nsnd_que--;
		kcp->nsnd_buf++;

		newseg->conv = kcp->conv;
		newseg->cmd = IKCP_CMD_PUSH;
		newseg->wnd = seg.wnd;
		newseg->ts = current;
		newseg->sn = kcp->snd_nxt++;
		newseg->una = kcp->rcv_nxt;
		newseg->resendts = current;
		newseg->rto = kcp->rx_rto;
		newseg->fastack = 0;
		newseg->xmit = 0;
	}

	// resent:收到 resent 个失序 ACK 就会触发快速重传,TCP 里是冗余 ACK。
    // 如果没开启快速重传,则 resent 为 inf。
	resent = (kcp->fastresend > 0)? (IUINT32)kcp->fastresend : 0xffffffff;
    // 超时重传的最小超时时间
	rtomin = (kcp->nodelay == 0)? (kcp->rx_rto >> 3) : 0;

	// 遍历snd_buf里的数据包是否需要发送
	for (p = kcp->snd_buf.next; p != &kcp->snd_buf; p = p->next) {
		IKCPSEG *segment = iqueue_entry(p, IKCPSEG, node);
		int needsend = 0; 
		/* 
			该数据包是否需要发送,三种情况:
			1. 该数据包没发送过
			2. 超时没有收到ack,触发超时重传
			3. 收到resent次冗余ack,触发快速重传
		*/

		// 1. 该数据包第一次发送
		if (segment->xmit == 0) {
			needsend = 1;
			segment->xmit++;
			segment->rto = kcp->rx_rto; // 超时重传时间
			segment->resendts = current + segment->rto + rtomin;
		}

		// 2. 该数据包超时没有收到ACK, 触发超时重传
		else if (_itimediff(current, segment->resendts) >= 0) { 
			needsend = 1;
			segment->xmit++;
			kcp->xmit++;
			if (kcp->nodelay == 0) { // 普通模式,超时时间*2
				segment->rto += _imax_(segment->rto, (IUINT32)kcp->rx_rto);
			}	else { // 快速模式,超时时间*1.5
				IINT32 step = (kcp->nodelay < 2)? 
					((IINT32)(segment->rto)) : kcp->rx_rto;
				segment->rto += step / 2;
			}
			segment->resendts = current + segment->rto;
			lost = 1;
		}

		// 3. 该数据包被跳过的次数超过了fastresend, 触发快速重传
		else if (segment->fastack >= resent) {  
			if ((int)segment->xmit <= kcp->fastlimit || // 快速重传的限制,不能一直快速重传
				kcp->fastlimit <= 0) {
				needsend = 1;
				segment->xmit++;
				segment->fastack = 0;
				segment->resendts = current + segment->rto;
				change++;
			}
		}

		if (needsend) {
			int need;
			segment->ts = current;
			segment->wnd = seg.wnd;
			segment->una = kcp->rcv_nxt;

			size = (int)(ptr - buffer);
			need = IKCP_OVERHEAD + segment->len; // 该数据包长度, 最大为head+mss

			if (size + need > (int)kcp->mtu) {
				ikcp_output(kcp, buffer, size);
				ptr = buffer;
			}

			ptr = ikcp_encode_seg(ptr, segment);

			if (segment->len > 0) {
				memcpy(ptr, segment->data, segment->len);
				ptr += segment->len;
			}
			// 某个数据包的传输次数超过了dead_link,则判断当前连接断开。
			if (segment->xmit >= kcp->dead_link) {  // 断开连接
				kcp->state = (IUINT32)-1;
			}
		}
	}

	// 把没有数据缓冲区的数据发送出去
	size = (int)(ptr - buffer);
	if (size > 0) {
		ikcp_output(kcp, buffer, size);
	}

	// 如果触发了快速重传,减小拥塞窗口(快速恢复)
	if (change) {
		IUINT32 inflight = kcp->snd_nxt - kcp->snd_una;
		kcp->ssthresh = inflight / 2;
		if (kcp->ssthresh < IKCP_THRESH_MIN)
			kcp->ssthresh = IKCP_THRESH_MIN;
		kcp->cwnd = kcp->ssthresh + resent;
		kcp->incr = kcp->cwnd * kcp->mss;
	}

	// 超时重传,丢包了,重置拥塞窗口和ssthresh。
	if (lost) {
		kcp->ssthresh = cwnd / 2;
		if (kcp->ssthresh < IKCP_THRESH_MIN)
			kcp->ssthresh = IKCP_THRESH_MIN;
		kcp->cwnd = 1;
		kcp->incr = kcp->mss;
	}

	if (kcp->cwnd < 1) {
		kcp->cwnd = 1;
		kcp->incr = kcp->mss;
	}
    ...
}

首先确定发送窗口 cwnd = min(kcp->snd_wnd, kcp->rmt_wnd); 接着检查是否开启流控(拥塞控制) if (kcp->nocwnd == 0) cwnd = _imin_(kcp->cwnd, cwnd);通过取消拥塞控制可以进一步降低延迟。

然后从snd_queue中取出数据包放到snd_buf中, 直到snd_buf满或者snd_queue为空。然后依次检查snd_buf 里的数据包需不需要发送,需要发送有三种情况:

  1. 该数据包首次发送。
  2. 触发超时重传:时间超过超时重传的阈值。超时重传普通模式下:每次超时,超时重传的时间就翻倍。在快速模式下:每次超时的重传时间翻 0.5 倍。降低传输时延同时重置拥塞窗口和ssthresh。
  3. 触发快速重传:收到了该数据包的resent次的失序(冗余)ACK。该数据包被跳过的次数超过了fastresend, 触发快速重传。同时减小拥塞窗口为原来的一半,ssthresh也设置为这个值。

__EOF__


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK