8

用Java写一个分布式缓存——RESP服务端 - 秋玻

 1 year ago
source link: https://www.cnblogs.com/weloe/p/17102182.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本篇我们将完成一个RESP的socket的服务端,初步完成一个单机版缓存。

另外在其中我们还需要完成命令的动态路由

源码:https://github.com/weloe/Java-Distributed-Cache

本篇代码:

https://github.com/weloe/Java-Distributed-Cache/tree/master/src/main/java/com/weloe/cache/server

上篇:缓存管理 https://www.cnblogs.com/weloe/p/17068891.html

RESP协议

RESP协议支持5种数据类型:字符串,异常,整数,多行字符串,数组

数据类型由第一个字节进行区分,不同部分使用\r\n来分开

  • '+' : 简单字符串
  • '-' : 异常
  • ':' : 整数
  • '$': 多行字符串
  • '*': 数组

简单字符串一般用来返回该操作无误,操作成功,例如返回+ok\r\n

异常: -error msg\r\n

整数:: 1\r\n

多行字符串:

$4\r\n
test

这里的4是实际数据的长度

实际信息为 test

*2\r\n
$2\r\n
ab\r\n
$3\r\n
cde\r\n

信息为字符串数组[ab][cde]

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/util/RESPUtil.java

根据协议我们就能编写出对请求信息的解析类了,我们把它抽象为一个工具类

对返回内容解析的代码

	/**
     * 读取RESP协议的字节,转为String
     *
     * @return String
     */
    public static String parseRESPBytes(InputStream inputStream) {
        byte[] bytes;

        String result = null;
        try {
            while (inputStream.available() == 0) {
            }
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);
            result = new String(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return result;
    }

    /**
     * 解析RESP协议的String
     *
     * @param raw
     * @return
     */
    public static Object parseRESPString(String raw) {
        byte type = raw.getBytes()[0];
        String result = raw.substring(1);
        switch (type) {
            case '+':
                // +ok\r\n
                // 读单行
                return result.replace("\r\n", "");
            case '-':
                // 异常
                // -Error msg\r\n
                throw new RuntimeException(result.replace("\r\n", ""));
            case ':':
                // 数字
                return result.replace("\r\n", "");
            case '$':
                return result.split("\r\n")[1];
            case '*':
                // 多行字符串
                String[] strList = result.substring(result.indexOf("$")).split("\r\n");
                System.out.print("多条批量请求:");
                List<String> list = new LinkedList<>();
                for (int i = 1; i < strList.length; i += 2) {
                    System.out.print(strList[i] + " ");
                    list.add(strList[i]);
                }
                System.out.println();
                return list;
            default:
                throw new RuntimeException("错误的数据格式");
        }
    }

发送请求的代码

    /**
     * 发送RESP请求
     * @param host
     * @param port
     * @param args
     * @return
     * @throws IOException
     */
    public static byte[] sendRequest(String host,Integer port, String ... args) throws IOException {
        Socket socket = new Socket(host,port);
        RESPRequest request = new RESPRequest(socket);
        PrintWriter writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
        sendRequest(writer,args);

        byte[] param = request.getBytes();

        request.close();
        writer.close();
        socket.close();
        return param;
    }

    /**
     * 发送RESP请求
     * @param writer
     * @param args
     * @throws IOException
     */
    public static void sendRequest(PrintWriter writer, String ... args) throws IOException {
        writer.println("*"+args.length);
        for (String arg : args) {
            writer.println("$"+arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }
        writer.flush();
    }

命令的解析

通过RESP协议的实现,我们就能做到对我们的缓存发送命令,那么我们又该怎么对命令进行解析,然后做出对应的操作呢?

这里我们使用前缀树结构来进行动态路由

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/Node.java

首先要构建树的节点

需要注意的是只有叶子节点才有pattern

isWild来判断是否有通配符则是为了来匹配用户输入的参数,我们设置路由的时候用:作为通配符

例如设置路由set :group :k :v

命令为:set g1 k1 v1

把 :group和g1匹配,:k和k1匹配,:v和v1匹配

最后我们要解析出 group = g1 , k = k1, v = v1

/**
 * @author weloe
 */
public class Node {
    /**
     * 待匹配的命令参数,只有最后一层才有 例如 set :group :key :value
     */
    private String pattern;

    /**
     * 命令参数的一部分例如 set :group :key :value 中的 :group
     */
    private String part;

    /**
     * 子节点
     */
    private List<Node> children;

    /**
     * 是否是通配符节点
     */
    private boolean isWild;

    public Node() {
        this.children = new LinkedList<>();
        this.part = "";
        this.pattern = "";
    }

    public Node(String part, boolean isWild) {
        this.part = part;
        this.isWild = isWild;
        this.children = new LinkedList<>();
    }
}

注册节点的方法

	/**
     * 注册节点
     *
     * @param pattern
     * @param parts
     * @param height
     */
    public void insert(String pattern, String[] parts, int height) {
        // 终止条件,height匹配完,到了最下层
        if (parts.length == height) {
            this.pattern = pattern;
            return;
        }

        String part = parts[height];
        // 匹配出一个子节点
        Node child = matchChild(part);
        if (child == null) {
            // 如果当前part的第一个字符是":"或者"*"就为模糊匹配
            child = new Node(part, part.startsWith(":") || part.startsWith("*"));
            // 增加当前节点的子节点
            children.add(child);
        }
        child.insert(pattern, parts, height + 1);
    }

注册节点方法中调用的machChild()为 根据part部分匹配子节点的方法

	/**
     * 根据part匹配子节点
     *
     * @param part
     * @return 第一个匹配节点
     */
    public Node matchChild(String part) {
        for (Node child : children) {
            if (child.part.equals(part) || child.isWild) {
                return child;
            }
        }
        return null;
    }

根据字符串数组(输入命令)来递归匹配出对应叶子节点的方法

	/**
     * 根据parts[]匹配出节点
     * @param parts
     * @param height
     * @return
     */
    public Node search(String[] parts, int height) {
        // 匹配到末端
        if(parts.length == height || part.startsWith("*")){
            if(pattern == null){
                return null;
            }
            // 匹配到节点
            return this;
        }

        String part = parts[height];
        // 根据part找到匹配的子节点
        List<Node> children = matchChildren(part);

        for (Node child : children) {
            Node node = child.search(parts, height + 1);
            if(node != null){
                return node;
            }
        }

        return null;
    }

search方法中调用的matchChildren为 根据part来匹配出所有符合的子节点的方法

    /**
     * 根据part匹配子节点
     * @param part
     * @return 所有匹配的节点
     */
    public List<Node> matchChildren(String part) {
        ArrayList<Node> nodes = new ArrayList<>();
        for (Node child : children) {
            if (child.part.equals(part) || child.isWild) {
                nodes.add(child);
            }
        }
        return nodes;
    }

Router

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/Router.java

Router提供了添加路由和根据用户命令匹配路由的方法

这里的HandlerFunc是需要我们在添加路由addRoute时进行设置的命令对应的处理函数

这里的Router类是我们在getRoute时返回的,node就是匹配到的树的叶子节点,map为匹配到的通配符参数

例如设置路由set :group :k :v

命令为:set g1 k1 v1

把 :group和g1匹配,:k和k1匹配,:v和v1匹配

map即为 group = g1 , k = k1, v = v1

/**
 * @author weloe
 */
public class Router {

    @FunctionalInterface
    public interface HandlerFunc {
        Object handle(RESPContext context);
    }

    public class Route{
        private Node node;
        private Map<String,String> map;

        public Route(Node node, Map<String, String> map) {
            this.node = node;
            this.map = map;
        }

        public Object handle(RESPContext context){
            context.setParamMap(map);
            String key = "command"+"-"+node.getPattern();
            return handlers.get(key).handle(context);
        }

        public Map<String, String> getMap() {
            return map;
        }

        public Node getNode() {
            return node;
        }
    }

    /**
     * 根节点
     */
    private Map<String,Node> roots;

    private Map<String,HandlerFunc> handlers;


    public Router() {
        this.roots = new LinkedHashMap<>();
        this.handlers = new LinkedHashMap<>();
    }

    /**
     * 解析pattern
     * @param pattern
     * @return
     */
    public String[] parsePattern(String pattern){
        String[] patterns = pattern.split(" ");

        String[] parts = new String[patterns.length];
        for (int i = 0; i < patterns.length; i++) {
            parts[i] = patterns[i];
            if(patterns[i].charAt(0) == '*'){
                break;
            }
        }

        return parts;
    }

    public void addRoute(String method,String pattern,HandlerFunc handler){
        String[] parts = parsePattern(pattern);

        String key = method + "-" + pattern;
        Node node = roots.get(method);
        // 判断有没有该method对应的根节点,没有就建一个
        if (node == null) {
            roots.put(method,new Node());
        }
        roots.get(method).insert(pattern,parts,0);
        handlers.put(key,handler);
    }

    public Route getRoute(String path){
        return getRoute("command",path);
    }

    public Route getRoute(String method,String path){

        String[] patterns = parsePattern(path);
        Map<String, String> params = new LinkedHashMap<>();

        Node root = roots.get(method);
        if(root == null){
            return null;
        }
        Node res = root.search(patterns, 0);

        if (res == null) {
            return null;
        }
        String[] parts = parsePattern(res.getPattern());
        for (int i = 0; i < parts.length; i++) {
            String part = parts[i];
            if (part.charAt(0) == ':') {
                params.put(part.substring(1),patterns[i]);
            }
            if(part.charAt(0) == '*' && part.length() > 1){
                String collect = Arrays.stream(patterns).skip(i).collect(Collectors.joining(" "));
                params.put(part.substring(1),collect);
                break;
            }

        }

        return new Route(res,params);
    }

}

这里相当于我们输入delete g1 k1 v1

匹配到命令为 delete

匹配到的参数为key:k1

@Test
    void getRoute() {
        Router router = new Router();

        router.addRoute("command","set :group :key :v",null);
        router.addRoute("command","delete :group :key :v",null);
        router.addRoute("command","expire :group :key :v",null);
        router.addRoute("command","get :group :key :v",null);
        router.addRoute("command","hget :key :field",null);
        router.addRoute("command","config set maxsize :size",null);
        router.addRoute("command","config set maxnum :num",null);
        router.addRoute("command","config set cachestrategy :strategy",null);

        Router.Route route = router.getRoute("command", "delete g1 k1 v1");
        Assertions.assertEquals(route.getNode().getPattern(),"delete :group :key :v");
        Assertions.assertEquals(route.getMap().get("key"),"k1");
    }

根据以上内容我们可以封装出RESPContext,RESPRqeuest,以及RESPReqspnse类

RESPContext

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPContext.java

作为一个服务端自然有上下文信息,我们将它封装为一个Context类

/**
 * @author weloe
 */
public class RESPContext {

    private LocalDateTime startTime;

    private Socket socket;

    private RESPRequest request;

    private RESPResponse response;

    private Router.Route route;

    private Map<String,String> paramMap;

    private Group group;


    public void initServer(Socket socket, RESPRequest request, RESPResponse response) throws IOException {

        this.socket = socket;
        this.request = request;
        this.response = response;
        this.startTime = LocalDateTime.now();
    }


    /**
     * 解析RESP协议的字节
     *
     * @return
     */
    public String parseRESPBytes() {
        String result = request.parseRESPBytes();

        return result;
    }

    /**
     * 解析RESP协议的String
     *
     * @param raw
     * @return
     */
    public Object parseRESPString(String raw) {
        Object obj = request.parseRESPString(raw);
        return obj;
    }

    public void ok() {
        response.ok();
    }

    public void ok(byte[] bytes){
        response.ok(String.valueOf(bytes));
    }

    public void ok(String arg) {
        response.ok(arg);
    }

    public void ok(String... args) {
        response.ok(args);
    }

    public void error(String msg) {
        response.error(msg);
    }

    public void close() {
        if (request != null) {
            try {
                request.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        if (response != null) {
            response.close();
        }

        if (socket != null) {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    public Socket getSocket() {
        return socket;
    }

    public RESPRequest getRequest() {
        return request;
    }

    public RESPResponse getResponse() {
        return response;
    }

    public LocalDateTime getStartTime() {
        return startTime;
    }

    public void setStartTime(LocalDateTime startTime) {
        this.startTime = startTime;
    }

    public void setParamMap(Map<String, String> paramMap) {
        this.paramMap = paramMap;
    }

    public Map<String, String> getParamMap() {
        return paramMap;
    }

    public void setRoute(Router.Route route) {
        this.route = route;
    }

    public Router.Route getRoute() {
        return route;
    }

    public void setGroup(Group group) {
        this.group = group;
    }

    public Group getGroup() {
        return group;
    }

    public String getParam(String key){
        return paramMap.get(key);
    }
}

RESPRequest

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPRequest.java

/**
 * @author weloe
 */
public class RESPRequest {
    private InputStream inputStream;

    private Map<String,String> params;

    public RESPRequest(Socket socket) throws IOException {
        this.inputStream = socket.getInputStream();
    }

    public String getParam(String key) {
        return params.get(key);
    }

    /**
     * 解析RESP协议的字节
     *
     * @return
     */
    public byte[] getBytes() {
        byte[] bytes = null;

        try {
            while (inputStream.available() == 0) {
            }
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return bytes;
    }

    /**
     * 解析RESP协议的字节
     *
     * @return
     */
    public String parseRESPBytes() {
        byte[] bytes;
        byte[] buf = new byte[1];
        String result = null;
        try {
            System.out.println("等待数据传输");
//            try {
//                // 读不到阻塞
//                inputStream.read(buf);
//            } catch (IOException e) {
//                return null;
//            }
//            result = new String(buf) + new String(bytes);
            while (inputStream.available() == 0) {
            }
            bytes = new byte[inputStream.available()];
            inputStream.read(bytes);
            result = new String(bytes);

        } catch (IOException e) {
            e.printStackTrace();
        }

        return result;
    }

    /**
     * 解析RESP协议的String
     *
     * @param raw
     * @return
     */
    public Object parseRESPString(String raw) {
        byte type = raw.getBytes()[0];
        String result = raw.substring(1);
        switch (type) {
            case '+':
                // +ok\r\n
                // 读单行
                return result.replace("\r\n", "");
            case '-':
                // 异常
                // -Error msg\r\n
                throw new RuntimeException(result.replace("\r\n", ""));
            case ':':
                // 数字
                return result.replace("\r\n", "");
            case '$':
                return result.substring(1).replace("\r\n", "");
            case '*':
                // 多行字符串
                String[] strList = result.substring(result.indexOf("$")).split("\r\n");
                System.out.print("多条批量请求:");
                List<String> list = new LinkedList<>();
                for (int i = 1; i < strList.length; i += 2) {
                    System.out.print(strList[i] + " ");
                    list.add(strList[i]);
                }
                System.out.println();
                return list;
            default:
                throw new RuntimeException("错误的数据格式");
        }
    }

    public void close() throws IOException {
        if (inputStream != null) {
            inputStream.close();
        }
    }


    public InputStream getInputStream() {
        return inputStream;
    }
}

RESPResponse

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/resp/RESPResponse.java

/**
 * @author weloe
 */
public class RESPResponse {
    private PrintWriter writer;

    public RESPResponse(Socket socket) throws IOException {
        // 字符输出流,可以直接按行输出
        writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
    }

    public void ok() {
        writer.println(RESPStatus.OK.getMsg());
        writer.flush();
    }

    public void ok(Integer arg) {
        writer.println(":" + arg);
        writer.flush();
    }

    public void ok(String arg) {
        writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);
        writer.println(arg);
        writer.flush();
    }

    public void ok(String... args) {
        writer.println("*" + args.length);
        for (String arg : args) {
            writer.println("$" + arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }
        writer.flush();
    }

    public void error(String msg) {
        writer.println(RESPStatus.ERROR.getMsg() + " " + msg);
        writer.flush();
    }

    public void close() {
        if (writer != null) {
            writer.close();
        }
    }

    public PrintWriter getWriter() {
        return writer;
    }


}

Service

https://github.com/weloe/Java-Distributed-Cache/tree/master/src/main/java/com/weloe/cache/server/command

命令有对应的HandlerFunc函数,我们就需要在函数中调用我们的相应的service,因此需要抽象出对应的service

这里的service为对前篇缓存管理中的类的简单调用,就不再一一赘述

ConfigService

/**
 * config操作
 * @author weloe
 */
public class ConfigService {

    public Object getNormalSize(Group group){
        return group.getNormalSize();
    }

    public Object getMaxSize(Group group){
        return group.getMaxSize();
    }

    public Object setMaxSize(Group group,String size) {
        group.setMaxSize(Integer.parseInt(size));
        return "";
    }

    public Object setMaxNum(Group group,String num) {
        return null;
    }

}

DeleteService

/**
 * 删除key相关操作
 * @author weloe
 */
public class DeleteService {

    public Object delete(Group group,String key) {
        CacheObj delete = group.delete(key);
        if(delete == null){
            return null;
        }
        return "";
    }


    public Object clear(Group group) {
        group.clear();
        return "";
    }
}

ExpireService

/**
 * 设置,获取key的过期时间,单位秒
 * @author weloe
 */
public class ExpireService {

    public Object expire(Group group,String key,String value) {
        LocalDateTime time = group.expire(key, Long.parseLong(value), ChronoUnit.SECONDS);
        return time;
    }

    public Object ttl(Group group,String key) {
        long ttl = group.ttl(key);
        return ttl;
    }
}

GroupService

/**
 * group操作
 * @author weloe
 */
public class GroupService {

    private GroupManager groupManager = GroupManager.getInstance();

    public Object add(String groupName) {

        Group group = new Group();
        group.setName(groupName);
        group.setCache(new Cache());
        group.setGetter(k -> null);
        groupManager.put(group);

        return "";
    }


}

StringService

/**
 * 缓存set,get操作
 * @author weloe
 */
public class StringService {

    public String get(Group group,String key) {
        CacheObj cacheObj = group.get(key);
        if(cacheObj != null){
            return new String(cacheObj.getData());
        }
        return null;
    }


    public Object set(Group group,String key,String value) {
        group.putCacheObj(key, new CacheObj(value.getBytes(StandardCharsets.UTF_8)));
        return "";
    }
}

ServiceFactory

为了方便管理,另外写了管理service的类

public class ServiceFactory {

    Map<String,Object> map;

    public ServiceFactory() {
        map = new LinkedHashMap<>();
        map.put("str",new StringService());
        map.put("group",new GroupService());
        map.put("config",new ConfigService());
        map.put("delete",new DeleteService());
        map.put("expire",new ExpireService());
    }


    public<T> T getBean(String name){
        return (T) map.get(name);
    }
}

CommandQueue

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/server/parser/CommandQueue.java

这里我们使用多线程io解析命令,单线程执行命令的模型,多线程解析完命令得到Route后把Context加入到阻塞队列中,再消费阻塞队列的Context执行命令

public class CommandQueue {

    private static PriorityBlockingQueue<RESPContext> commandQueue = new PriorityBlockingQueue<>(5, (o1, o2) -> {
        if (o1.getStartTime().isBefore(o2.getStartTime())) {
            return -1;
        }
        return 1;
    });

    public boolean add(RESPContext context){
        return commandQueue.add(context);
    }


    public void consume() {
        new Thread(() -> {
            System.out.println("服务端等待接收命令...");
            while (true) {
                RESPContext respContext = null;
                try {
                    respContext = commandQueue.take();
                } catch (InterruptedException e) {
                    respContext.error(e.getMessage());
                    e.printStackTrace();
                    continue;
                }

                System.out.println("执行命令"+respContext.getRoute().getNode().getPattern());
                // 执行命令
                Object handle = respContext.getRoute().handle(respContext);
                System.out.println(handle);
                if(handle == null){
                    respContext.ok("nil");
                }else if(handle.equals("")){
                    respContext.ok();
                }else {
                    respContext.ok(handle.toString());
                }
            }
        }).start();


    }

    public PriorityBlockingQueue<RESPContext> getCommandQueue() {
        return commandQueue;
    }
}

Launch服务端启动类

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/Launch.java

public class Launch {
    private static final ExecutorService poolExecutor = new ThreadPoolExecutor(2, 5,
            30L, TimeUnit.SECONDS,
            new ArrayBlockingQueue(10));

    private static final GroupManager groupManager = GroupManager.getInstance();

    private static final ServiceFactory factory = new ServiceFactory();

    private static final Router router = new Router();

    static {
        // 注册路由信息
        ConfigService config = factory.getBean("config");
        StringService str = factory.getBean("str");
        DeleteService delete = factory.getBean("delete");
        ExpireService expire = factory.getBean("expire");
        GroupService groupService = factory.getBean("group");

        router.addRoute("group add :name", c -> groupService.add(c.getParam("name")));

        router.addRoute("config set maxByteSize :group :size", c -> config.setMaxSize(c.getGroup(),c.getParam("size")))
              .addRoute("config get maxByteSize :group", c -> config.getMaxSize(c.getGroup()))
              .addRoute("config get normalSize :group", c -> config.getNormalSize(c.getGroup()))
              .addRoute("config set maxNum :group :num", c -> config.setMaxNum(c.getGroup(),c.getParam("num")));


        router.addRoute("expire :group :k :n", c -> expire.expire(c.getGroup(),c.getParam("k"),c.getParam("n")))
              .addRoute("ttl :group :k", c -> expire.ttl(c.getGroup(),c.getParam("k")));

        router.addRoute("delete :group :k", c -> delete.delete(c.getGroup(),c.getParam("size")))
              .addRoute("clear :group", c -> delete.clear(c.getGroup()));

        router.addRoute("set :group :k :v", c -> str.set(c.getGroup(),c.getParam("k"),c.getParam("v")))
              .addRoute("get :group :k", c -> str.get(c.getGroup(),c.getParam("k")));


    }


    public static void main(String[] args) throws IOException {

        CommandQueue commandQueue = new CommandQueue();
        commandQueue.consume();

        ServerSocket serverSocket = new ServerSocket(8081);

        while (true) {
            // 初始化server
            Socket socket = serverSocket.accept();
            System.out.println(socket.getInetAddress() + ":" + socket.getPort() + "连接");
            poolExecutor.submit(() -> task(commandQueue, socket));

        }


    }

    private static void task(CommandQueue commandQueue, Socket socket) {
        RESPContext context = null;

        System.out.println("线程"+Thread.currentThread().getId()+" 执行");
        try {
            while (true) {
                context = new RESPContext();
                context.initServer(socket,new RESPRequest(socket),new RESPResponse(socket));

                Object requestData = null;
                try {
                    // 处理请求
                    String res = context.parseRESPBytes();
                    if(res == null){
                        return;
                    }
                    System.out.printf("%s => %s%n", "原始格式", res.replace("\r\n", "\\r\\n"));
                    requestData = context.parseRESPString(res);

                } catch (Exception e) {
                    System.out.println(e.getMessage());
                    context.error(e.getMessage());
                    continue;
                }

                List<String> commandStr = (List<String>) requestData;

                System.out.println("接收到" + socket.getInetAddress() + ":" + socket.getPort() +"的命令");

                // 解析命令
                Router.Route route = router.getRoute(String.join(" ",commandStr));
                if (route == null) {
                    context.ok("请检查你的命令参数");
                    continue;
                }

                Map<String, String> paramMap = route.getMap();
                String name = paramMap.get("group");
                if(name != null){
                    if(groupManager.getGroup(name) == null){
                        context.ok("该group不存在");
                        continue;
                    }
                    context.setGroup(groupManager.getGroup(name));
                }
                context.setRoute(route);


                // 命令加入阻塞队列
                commandQueue.add(context);
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            context.close();
        }
    }

}

Client

https://github.com/weloe/Java-Distributed-Cache/blob/master/src/main/java/com/weloe/cache/client/Client.java

public class ClientLaunch {
    static Socket socket;
    static PrintWriter writer;
    static BufferedReader reader;
    static InputStream inputStream;

    static String host = "127.0.0.1";
    static int port = 8081;

    public static void main(String[] args) {

        try {
            // 建立连接
            socket = new Socket(host,port);
            // 获取输出输入流
            // 字符输出流,可以直接按行输出
            writer = new PrintWriter(new OutputStreamWriter(socket.getOutputStream(), StandardCharsets.UTF_8));
            inputStream = socket.getInputStream();

            while (true) {
                Scanner reader = new Scanner(System.in);
                if (reader.hasNextLine()) {
                    String s = reader.nextLine();
                    if("exit".equals(s)){
                        System.out.println("exit cli");
                        return;
                    }
                    if (!s.isEmpty()) {
                        Object obj;
                        // 操作命令
                        sendRequest(s.split(" "));
                        // 响应
                        obj = inputStreamHandleByteResponse();
                        System.out.println(obj);
                    }
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            // 释放连接
            try {
                if(reader != null) {
                    reader.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                if(inputStream != null) {
                    inputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                if(writer != null) {
                    writer.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                if(socket != null) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    }

    private static void sendRequest(String ... args) {

        writer.println("*"+args.length);
        for (String arg : args) {
            writer.println("$"+arg.getBytes(StandardCharsets.UTF_8).length);
            writer.println(arg);
        }

        writer.flush();

    }


    public static Object inputStreamHandleByteResponse() {
        String result = RESPUtil.parseRESPBytes(inputStream);
        return RESPUtil.parseRESPString(result);
    }


}

最后我们就可以启动Launch和Client来进行一下使用了

服务端启动

服务端等待接收命令...
/127.0.0.1:53779连接
线程13 执行
等待数据传输

客户端发送请求group add test

服务端返回

ok

服务端输出

原始格式 => *3\r\n$5\r\ngroup\r\n$3\r\nadd\r\n$4\r\ntest\r\n
多条批量请求:group add test 
接收到/127.0.0.1:53779的命令
执行命令group add :name

客户端设置缓存set test testKey testV

服务端返回

ok

服务端输出

原始格式 => *4\r\n$3\r\nset\r\n$4\r\ntest\r\n$7\r\ntestKey\r\n$5\r\ntestV\r\n
多条批量请求:set test testKey testV 
接收到/127.0.0.1:53779的命令
执行命令set :group :k :v

客户端get操作get test testKey

服务端返回

testV

至此,单机的缓存就基本完成~


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK