3

java实现朴素rpc - 余充数

 11 months ago
source link: https://www.cnblogs.com/aolob/p/java-rpc.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

java实现朴素rpc

posted @ 2023-10-14 11:59  余充数  阅读(68)  评论(0)  编辑  收藏  举报

五层协议中,RPC在第几层?

五层协议
应用层
传输层
网络层
链路层
物理层

我不知道,我要去大气层!

远程过程调用(RPC),比较朴素的说法就是,从某台机器调用另一台机器的一段代码,并获取返回结果。

这之前的一个基层问题就是进程间通信方式(IPC),从是否设计网络通信分为:

  • 基于信号量和共享内存实现的管道和消息队列和其本身(不涉及IP端口)
  • Socket(IP端口)

和共享内存不同,Socket实现不并不是只依靠内存屏障,它还额外需要物理/虚拟网卡设备。

关于网卡,只需要知道网卡可以帮助我们从网络中读写信息,这也是RPC的基础。

jRPC实现

远程过程调用,不如先来研究调用。

回声服务实现

先来一段普通的代码。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		return new EchoResponse("echo:" + req.content);
	}

	public static void main(String[] args) throws Exception {
		System.out.println(EchoService.echo(new EchoRequest("ping")).content); // echo:ping
	}
}

class EchoRequest {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

回声服务对传入参数直接返回,就像你在山谷中的回声一样。

现在如果使用远程传输,我们需要给网卡注册自己的IP和端口,以便和服务端建立连接。连接建立后,我们还需要确定数据如何传输。

服务端实现

为了朴素性,我们假设只有10台机器和我们进行连接。

public Runnable apply(Integer port) {
	return () -> {
		try {
			try (ServerSocket serverSocket = new ServerSocket(port)) {
				for (;;) {
					Socket clientSocket = serverSocket.accept();
					new Thread(() -> {
						// 数据如何传输
					}).start();
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	};
}

根据Socket的文档,我们可以很快迭代出一台服务器应该如何与他的客户端连接。对于每个客户端,我们提供了独立的线程支持两台机器间的长连接。

试想一下,此时的长连接如果是百万甚至千万,为每个连接分配一个线程不可取,有什么好办法可以支持到呢?这个问题这里不解了,有兴趣自行研究下。

Serializable

一说起序列化,最怕异口同声json

使用json就难免会使用到 第三方库,如果没有必要,并不希望引入。除了json外,java其实本身就有Serializable实现,他和synchronized一样,java官方提供并维护。

public class EchoService {

	public static EchoResponse echo(EchoRequest req) throws Exception {
		throw new UnsupportedOperationException();
	}
}

class EchoRequest implements Serializable {
	String content;

	public EchoRequest(String content) {
		this.content = content;
	}
}

class EchoResponse implements Serializable {
	String content;

	public EchoResponse() {
	}

	public EchoResponse(String content) {
		this.content = content;
	}
}

除了参数外,一个rpc需要知道,ip、端口、服务名、方法名。

ip和端口在调用时应该已经知道,为此还需要支持一个header来完成服务名和方法名的指定。

class Header implements Serializable {
	String stub;
	String method;

	public Header(String stub, String method) {
		this.stub = stub;
		this.method = method;
	}
}

通过编码解码器对Serializable的数据编码和解码。

public class Codec {
	Socket clientSocket;
	ObjectInputStream objectInputStream;
	ObjectOutputStream objectOutputStream;

	public Codec(Socket clientSocket)
		throws Exception {
		this.clientSocket = clientSocket;
		this.objectOutputStream = new ObjectOutputStream(clientSocket.getOutputStream());
		this.objectInputStream = new ObjectInputStream(clientSocket.getInputStream());
	}

	public Header header() throws Exception {
		return (Header) this.objectInputStream.readObject();
	}

	public Object read() throws Exception {
		return this.objectInputStream.readObject();
	}

	public void write(Header header, Object obj) throws Exception {
		this.objectOutputStream.writeObject(header);
		this.objectOutputStream.writeObject(obj);
	}
}

回到服务端,将空缺的地方通过反射补全。

Codec codec = new Codec(clientSocket);
for (;;) {
	Header header = codec.header();
	Class<?> stub = Class.forName(header.stub);
	Map<String, Method> methods = Arrays.asList(stub.getDeclaredMethods()).stream()
		.collect(Collectors.toMap(t -> t.getName(), t -> t));
	Method method = methods.get(header.method);
	codec.write(header, method.invoke(null, header, codec.read()));
}

通过codec解码stub和method来找到对应的方法,调用对应方法,获取结果后再通过编码返回客户端。

高性能客户端

想一下,如果一个客户端发送了10个请求,其中第2个由于种种原因被阻塞掉,后面的请求会被卡在阻塞的请求之后而无法获得响应。

简单的处理方法,就是抽象掉调用过程,并给其唯一标识。需要一个map来存全部的调用请求。

class Call {
    Long seq;
    Object req;
    Object rsp;
    Thread thread;

    public Call(Long seq, Object req) {
        this.seq = seq;
        this.req = req;
    }
}

对call抽象后,对client也就迎刃而解了。

我知道了,map,用map解。

Long seq;
Codec codec;
ReentrantLock clock;
Map<Long, Call> calls;
ReentrantLock metux;

在map之上提供对seq的操作。

Call register(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.put(seq, call);
		seq++;
		return call;
	} finally {
		clock.unlock();
	}
}

Call remove(Call call) {
	try {
		clock.lock();
		call.seq = seq;
		calls.remove(seq);
		return call;
	} finally {
		clock.unlock();
	}
}

对服务端的响应监听,唤醒阻塞的线程。

void receive() throws Exception {
	for (;;) {
		Header header = codec.header();
		Call call = calls.remove(header.seq);
		Object rsp = codec.read();
		call.rsp = rsp;
		LockSupport.unpark(call.thread);
	}
}

最后就是发起客户端调用的代码。

FutureTask<Object> start(Header header, Object req) throws Exception {
	Call call = new Call(seq, req);
	try {
		metux.lock();
		final Call fcall = register(call);
		header.seq = call.seq;
		codec.write(header, req);
		FutureTask<Object> task = new FutureTask<>(() -> {
			fcall.thread = Thread.currentThread();
			LockSupport.park();
			return fcall.rsp;
		});
		task.run();
		return task;
	} finally {
		metux.unlock();
	}
}

你好,世界

public static void main(String[] args) throws UnknownHostException, IOException, Exception {
	new Thread(new Server().apply(8080)).start(); // 服务端启动
	// 模拟调用
	ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(10);
	Client client = new Client(new Codec(new Socket("127.0.0.1", 8080)));
	for (int i = 0; i < 100; i++) {
		newFixedThreadPool.submit(() -> {
			try {
				FutureTask<Object> call = client.start(
					new Header("EchoService", "echo"),
					new EchoRequest("~hello"));
				EchoResponse rsp = (EchoResponse) call.get();
				System.out.println(rsp.content);
			} catch (Exception e) {
				e.printStackTrace();
			}
		});
	}
}

Output

RPC echo~hello 0
RPC echo~hello 1
RPC echo~hello 2
RPC echo~hello 3
RPC echo~hello 4
RPC echo~hello 6
RPC echo~hello 5
RPC echo~hello 7
RPC echo~hello 9
RPC echo~hello 8

至此,只是实现了rpc的通信过程,完成度比较高。

  • 针对大流量的服务端还有优化空间,比如NIO的使用来管理长连接会更加有效。
  • 没有实现注册中心。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK