29

使用PHP Socket开发Yar TCP服务

 4 years ago
source link: https://www.laruence.com/2020/04/01/5726.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Yar支持HTTP和TCP俩种Transporter, HTTP的是基于CURL,PHP中的Yar默认就是走的HTTP Transporter, 这个大家应该都不陌生, 但是基于TCP的, 可能大家会用的少一些。

事实上,我6年前也写过一个C的Yar server框架,叫做Yar-c, 代码地址在 Yar-C at Github , 它提供了服务启动,worker进程管理,Yar打包协议等。当时我们用这个框架,实现了高性能的微博白名单等服务,以供PHP端使用Yar Client来调用。

只不过,Yar C需要用C来写Handle, 可能对于不少PHPer来说,会稍微有点陌生,那今天我们尝试用PHP来写一个TCP的Server,来介绍下如何实现对Yar RPC协议的处理, 这个例子可以方便的结合 Swoole 等异步PHP框架,实现一个高性能的Yar TCP Server。 这个过程中, 会让大家了解Yar的RPC通信协议,以及捎带了解下Socket编程。

我们今天还是用“白名单”服务作为例子,我们提供一个接口,接受RPC客户端的请求,参数是一个用户ID,返回bool,表示是否在白名单:

function query(int $id) : bool;

首先,我们建立一个文件yar_server, 为了方便的直接执行,我们在文件写下:

#!/bin/env php7
<?php
class WhiteList {
}

然后,通过chmod a+x 给这个文件增加可执行的权限。

第一步我们需要处理服务的启动参数处理, 接受一个参数S表示要监听的IP和端口,值的格式是host:port, 我们使用PHP的 getopt 函数来处理命令行参数:

class WhiteList {
    protected $host;

    public function __construct() {
        $options = getOpt("S:");
        if (!isset($options["S"])) {
            $this->usage();
        }
    }

    protected function usage() {
        exit("Usage: yar_server -S hostname:port\n");
    }
}

这样,当用户启动yar_server的时候,没有指定S参数,我们就退出,并提示Usage。 我们还需要另外一个配置,就是指向一个词表文件,词表文件中每一行是一个在白名单中的用户ID, 我们用F表示:

class WhiteList {
    protected $host;
    protected $dicts;

    public function __construct() {
        $options = getOpt("S:F:");
        if (!isset($options["S"]) || !isset($options["F"])) {
            $this->usage();
        }
        $this->host = $options["S"];
        $this->dicts = $options["F"];
    }

    protected function usage() {
        exit("Usage: yar_server -F path_to_dict -S hostname:port\n");
    }
}

好了, 现在启动参数处理完成, 当然为了简单,我省去了对输入参数的有效性检查。

接下来, 我们需要完成俩个函数, 第一个是读取-F指定的词表文件,把所有的用户ID读入到一个数组中,因为我们的这个服务会是常驻进行, 所以不用担心性能, 它只会在启动阶段处理这个词表文件:

protected function loadDict() {
	$this->ids = array();

	$fp = fopen($this->dicts, "r");
	while (!feof($fp)) {
		$line = trim(fgets($fp));
		if ($line) {
			$this->ids[$line] = true;
		}
	}
	fclose($fp);
	echo "Loading dict successfully, ", count($this->ids), " loaded\n";

	return $this;
}

因为用户ID是整型,所以我们把它当作Hashtable的key,这样在将来查找的时候,使用isset会非常高效。 需要注意的是因为文件处理不是我们今天要讲的重点,也就省去了对文件存在行,可读性,合法性的检查。

好了, 接下来是重点了, 我们要启动一个IPV4 TCP Socket服务,监听在$host指定的地方, 为了方便大家了解Socket API,我们不采用PHP的Stream系列函数,而是采用PHP直接包装的Socket系列API, 首先我们用 socket_create 创建一个Socket套接字:

protected function listen() {
	$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
	if ($socket == false) {
		throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error()));
	}
}

然后,我们需要使用 socket_bind 绑定这个Socket到我们需要监听的地址, 并且使用 socket_listen 来监听请求:

protected function listen() {
	$socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
	if ($socket == false) {
		throw new Exception("socket_create() failed: reason: " . socket_strerror(socket_last_error()));
	}
	list($hostname, $port) = explode(":", $this->host);
	if (socket_bind($socket, $hostname, $port) == false) {
		throw new Exception("socket_bind() failed: reason: " . socket_strerror(socket_last_error()));
	}
	if (socket_listen($socket, 64) === false) {
		throw new Exception("socket_listen() failed: reason: " . socket_strerror(socket_last_error()));
	}
	echo "Starting Yar_Server at {$this->host}\nPresss Ctrl + C to quit\n";

	$this->socket = $socket;
	return $this;
}

好了, 如果一切没问题,接下来我们就可以 socket_accept 来监听请求了, 默认的socket是阻塞模式,如果没有请求,进程会一直阻塞等待, 对于高性能的服务来说, 最好采用非阻塞+select或者epoll的模式来同时处理多个请求, 但是我们的这个例子主要是为了介绍Yar的协议, 所以还是采用简单的阻塞模式。

接下来,我们来编写真正的RPC处理部分,首先我们通过accept接受一个请求, 然后读取请求的的内容,分析请求头中的Yar RPC Header信息, Yar RPC的协议头定义如下:

typedef struct _yar_header {
    uint32_t       id;            // transaction id
    uint16_t       version;       // protocl version
    uint32_t       magic_num;     // default is: 0x80DFEC60
    uint32_t       reserved;
    unsigned char  provider[32];  // reqeust from who
    unsigned char  token[32];     // request token, used for authentication
    uint32_t       body_len;      // request body len
}

其中, magic_num是用来验证请求有效性的一个特殊值, 合法的Yar RPC请求都会设置这个值为0x80DFEC60(我很想告诉你为啥是这个值,但我真不记得当时我为啥用这个数字了),这个头部是82个字节,可能有同学会问,不对啊一看这个Struct不应该是82啊,那是因为头部申明的时候采用pack模式,也就是不对齐, 所以确实是82个字节.

provider是一个字符串,标明了客户端的名字, 比如对于Yar扩展的Yar_Client就是"Yar PHP Cient-x.x.x"

token在设计的最初是为了做API key验证的,但是后来没用上,因为大部分都是内网应用,可以有多种办法来保证请求来源的合法性。

id是一个唯一请求id,这个是为了排查请求问题的, version默认为0,或者1,目前我没有升级过协议头,所以这个暂时我们也不用关心,reserved可以用来传递一些请求参数, 比如客户端可以说明是否保持连接。

body_len是我们需要关心的, 这个字段表明了这次请求,请求体一共多大(不包括Yar协议头部)。

所有的这些数字, 都是以网络字节序传递的, 我们采用PHP处理二进制流的 unpack 函数来解析读取进来的二进制流:

protected function parseHeader($header) {
   return 
     unpack("Nid/nversion/Nmagic_num/Nreserved/A32provider/A32token/Nbody_len", $header);
}

这个函数会返回一个上面说到的头部结构体的数组。

对应的我们也需要使用 pack 来实现生成Yar Header的方法:

protected function genHeader($id, $len) {
	$bin = pack("NnNNA32A32N",
		$id, 0, 0x80DFEC60,
		0, "Yar PHP TCP Server",
		"", $len
	);
	return $bin;
}

如刚才说的,我们需要在接受一个请求以前, 验证请求的合法性:

const YAR_MAGIC_NUM = 0x80DFEC60;
protected function validRequest($header) {
	if ($header["magic_num"] != self::YAR_MAGIC_NUM) {
		return false;
	}
	return true;
}

所以大概请求的处理整个逻辑框架是:

protected function accept() {
	while (($conn = socket_accept($this->socket))) {
		$buf = socket_read($conn, self::HEADER_SIZE, PHP_BINARY_READ);
		if ($buf === false) {
			socket_shutdown($conn);
			continue;
		}

		if (!$this->validHeader($header = $this->parseHeader($buf))) {
			$output = $this->response(1, "illegal Yar RPC request");
			goto response;
		}

		$buf = socket_read($conn, $header["body_len"], PHP_BINARY_READ);
		if ($buf === false) {
			$output = $this->response(1, "insufficient request body");
			goto response;
		}

		if (!$this->validPackager($buf)) {
			$output = $this->response(1, "unsupported packager");
			goto response;
		}

		$buf = substr($buf, 8); /* 跳过打包信息的8个字节 */
		$request = $this->parseRequest($buf);
		if ($request == false) {
			$this->response(1, "malformed request body");
			goto response;
		}

		$status = $this->handle($request, $ret);

		$output = $this->response($status, $ret);
response:
		socket_write($conn, $output, strlen($output));

		socket_shutdown($conn); /* 关闭写 */
	}
}

现在整体的框架就算完成了,我们需要完成handle,response方法就可以了,handle是要根据用户的请求中的m, 来调用指定的方法

protected function handle($request, &$ret) {
	if ($request["m"] == "query") {
		$ret = $this->query(...$request["p"]);
	} else {
		$ret = "unsupported method '" . $request["m"]. "'";
		return 1;
	}
	return 0;
}

现在来实现query方法本身, 这个会很简单,就检查下id是不是在白名单数组:

protected function query($id) {
	return isset($this->ids[$id]);
}

好了,接下来我们要完成response方法,这个方法是打包一个符合Yar协议的返回体,包括82个字节的头部,8个字节的打包信息,以及序列化后的响应体, 我们需要根据status不同,来选择设置响应体中的r还是e字段:

protected function response($status, $ret) {
	$body = array();

	$body["i"] = 0;
	$body["s"] = $status;
	if ($status == 0) {
		$body["r"] = $ret;
	} else {
		$body["e"] = $ret;
	}

	$packed = serialize($body);
	$header = $this->genHeader(0, strlen($packed) + 8);

	return $header . str_pad("PHP", 8, "\0") . $packed;
}

好了, 马上就要大功告成了,我们最后完成启动方法和析构函数(关闭socket):

public function run() {
	$this->loadDict()->listen()->accept();
}
public function __destruct() {
	if ($this->socket) {
		socket_close($this->socket);
	}
}

现在一切就绪, 我们最后在文件末尾加入:

(new Whitelist)->run();

在测试之前,我们先准备一个测试词表,比如1到1000的id:

seq 1, 1, 10000 > user_id.dict

然后启动服务, 监听在本机的9000端口:

$ ./yar_server -F user_id.dict -S127.0.0.1:9000
Loading dict successfully, 1000 loaded
Starting Yar_Server at 127.0.0.1:9000
Presss Ctrl + C to quit

不错,服务启动成功,然后我们使用Yar扩展来编写客户端(你需要首先安装好 Yar扩展 ), 测试下用户id 999和99999的调用效果:

<?php
$yar = new Yar_Client("tcp://127.0.0.1:9000");
var_dump($yar->query("999"));
var_dump($yar->query("99999"));
?>

和调用HTTP的Yar服务不同,此处我们应该使用tcp://做地址头,表示这是一个TCP的服务。

来,运行一下看看:

php7 client.php
bool(true)
bool(false)

看起来不错, 符合预期!

你也可以尝试故意构造一些错误的可能,比如调用不存在的方法之类的,来看看服务器的反应, 这个例子的代码你可以在 这里 找到.

到这里我就算介绍完了如何采用PHP来编写Yar的TCP服务, 大家应该可以很方便的把这个例子修改完善成自己希望的格式,或者嵌入Swoole。

还是要再次说明,因为本文的主要目的是为了介绍Yar RPC通信协议,所以在服务管理这块并没有做的很完善,比如socket_accept, socket_read/write等都默认采用了阻塞模式,也没有加入超时设计,服务进程也只有一个,这个如果真的想用做实际服务的话,还是需要一些功课的,不过我相信你有兴趣的话,都是可以搞定的。:)

当然,最简单的是,你可以直接使用 Yar-C服务框架 来编写C Yar TCP服务。

在这里也有一个Yar-C Server的例子 yar_server in C .

enjoy!


Recommend

  • 35
    • www.tuicool.com 5 years ago
    • Cache

    [译]TCP Socket 是如何工作的?

    原文: How TCP Sockets Work by Evan Klitzke. 本文我将从上层介绍Linux上的TCP/IP栈是如何工作的,特别是socket系统调用和内核数据结构的交互、内核和实际网络的交...

  • 27
    • www.laruence.com 4 years ago
    • Cache

    Yar-2.1.0 新功能介绍

    Yar(Yet Another RPC framework) 是一个轻量级支持并行调用的PHP RPC框架,是我还在微博的时候为了优化微博的性能而开发的一个工具,Yar的并行调用在微博被大量应用以降低用户请求耗时。...

  • 10

        以下是在实现一个高性能Socket组件总结下来的问题,如果你只需要处理几千的并发应用那代码编写上注意一下就行了,但需要面对上万或几万的并发应用.那以下问题的总结,相信对编写这方面的应用有很大的帮助. SocketAsyncEventArgs

  • 11
    • www.cnblogs.com 3 years ago
    • Cache

    C# Socket tcp 发送数据大小问题

    C# Socket tcp 发送数据大小问题     TCP/IP是可靠性传输协议,它能...

  • 7

    推荐一些socket工具,TCP、UDP调试、抓包工具 浏览:6004次  出处信息    还记得我在...

  • 1
    • www.laruence.com 3 years ago
    • Cache

    Yar-2.1 新功能介绍

    本文地址: https://www.laruence.com/2020/03/16/5578.html 转载请注明出处 Yar(Yet Another RPC framewo...

  • 10

    TCP SOCKET中backlog参数的用途是什么? 在前年时,业务中遇到好多次因为PHP-FPM的backlog参数引发的性能问题,一直想去详细研究一番,还特意在2013年总结里提到这事《为何PHP5.5.6中f...

  • 8

    Yar - 并行的RPC框架(Concurrent RPC framework) 本文地址: https://www.laruence.com/2012/09/15/2779.html 转...

  • 10

    先看一天面试的经验: 第一场: 面试官:你说一下TCP的三次握手 我:第一次Client将SYN置1……、第二次Server收……..、 第三次…….. 面试官:很难背吧? 我:……是啊,很难,要不我在和你...

  • 7
    • tonybai.com 2 years ago
    • Cache

    Go语言TCP Socket编程

    Go语言TCP Socket编程 Golang的主要 设计目标之一就是面向大规模后端服务程序,网络通信这块是服务端 程序必不可少也是至关重要的一部分。在日常应用中,我们也可以看到Go中的net以及其subdir...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK