2

基于Netty的TCP服务框架 - DaFanJoy

 1 year ago
source link: https://www.cnblogs.com/dafanjoy/p/16653128.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

19年写的一个基础的TCP服务框架,内置了一个简单IOC容器,当时的目标是一方面能作为组件供第三方集成实现TCP通讯相关功能,另一方面作为提供一种服务框架范式。所以框架核心点主要还是通过适度的封装,隐藏底层的通讯细节,最终调用者接受到的是经过合包分包处理的字节数组,不涉及具体的协议解析,大家如果使用可以再基于业务进行适度的封装。

好,废话不多说,简单介绍下整个架构和源码细节。

Jtcp-cmmon

Jtcp-cmmon主要放置一些基础配置与工具类。 1、这里注意的服务配置类与默认配置项 JtcpConfig、JtcpOptions,JtcpConfig 顾名思义就是配置类,而JtcpOptions则定义了默认值; 2、RouteEnum枚举中列出了几种通用的网络通讯事件类型,作为注解中的字段定义路由

    public enum RouteEnum {
        OnConnect, //链接
        OnDisconnect, //链接断开
        OnRecevie, //数据接收
        OnSessionTimeOut, //超时
        OnException //异常
    }

Jtcp-transport

Jtcp-transport 基于Netty提供了TCP服务与报文解析功能,这里我针对常规固定字节起始的协议,通过递归方式对报文粘包、半包等进行了处理

     /**
     * state = 0 开始解析
     * state = 1 解析(递归处理粘包)
     * state = 2 半包
     */
    private void parseCompletePackets(ChannelHandlerContext ctx, byte[] bytesReady, List<Object> out,
            int magicByteBegin, int magicByteEnd) throws IOException {
        if (state == 0) { // 开始解析
            dataStream = new ByteArrayOutputStream();
            // 包数据开始状态,查找开始标识
            if (bytesReady[0] != magicByteBegin) {//第一包必须从协议报文头开始
                return;
            }
            state = 1;
        }
        if (state > 0) {
            int pos = indexOfMagicByte(bytesReady, magicByteEnd);//寻找尾部标识index,跳过头部标识位从1开始
            if(state == 2) {//半包状态
                if(bytesReady[0] == magicByteEnd) {//半包状态,但下段报文7E开头,明显是不正常的
                    dataStream.reset(); //只能清除目前已累积的所有数据
                }
            }
            if (pos != -1) {
                // 结束标识
                dataStream.write(bytesReady, 0, pos);
                
                byte[] ad = dataStream.toByteArray();
                // 读取完整一个报文
                out.add(ad);
                // 重置为包开始处理状态
                state = 0;
                // 将剩余字节写入内存字节流中
                if (pos != bytesReady.length) {
                    byte[] remainBytes = new byte[bytesReady.length - pos];
                    System.arraycopy(bytesReady, pos, remainBytes, 0, remainBytes.length);
                    parseCompletePackets(ctx, remainBytes, out, magicByteBegin, magicByteEnd);
                }
            } else {
                // 无结束标识,非完成报文,继续后续处理
                state = 2; //报文体读取状态,直接将当前数据写内存字节流中
                // 在下一次数据过来时处理结束标识
                dataStream.write(bytesReady, 0, bytesReady.length);
            }
        }
    }

Jtcp-core

自定义实现一个IOC容器,可对消息处理handler进行管理,并通过注解的方式制定消息转发机制 首先遍历main函数下所有class类,并缓存所有指定注解@JtcpComponet的class类对象并注入sproutBeanFactory实例工厂

    /**
     * 缓存所有指定注解的class<?>类对象
     * @param packageName
     * @return
     * @throws Exception
     */
    public static Map<String, Class<?>> getBean(String packageName) throws Exception {

        if (componetMap == null) {
            Set<Class<?>> clsList = getClasses(packageName);

            if (clsList == null || clsList.isEmpty()) {
                return componetMap;
            }

            componetMap = new HashMap<>(16);
            for (Class<?> cls : clsList) {

                Annotation annotation = cls.getAnnotation(JtcpComponet.class);
                if (annotation == null) {
                    continue;
                }

                JtcpComponet sproutComponet = (JtcpComponet) annotation;
                componetMap.put(sproutComponet.value() == null ? cls.getName() : sproutComponet.value(), cls);

            }
        }
        return componetMap;
    }

实现方法路由,通过@JtcpRoute并结合上面定义链接、断开、消息接收、超时、异常等事件枚举类型,把触发的网络通信事件转发至指定的业务方法中处理

    /**
     * 根据注解调用方法
     * @param method
     * @param annotation
     * @param args
     * @throws Exception
     */
    public void invoke(RouteEnum routeEnum, Object[] args) throws Exception {
        Method method = RouterScanner.getInstance().routeMethod(routeEnum);
        if (method == null) {
            return;
        }
        Object bean = applicationContext.getBean(method.getDeclaringClass().getName());
        if (args == null) {
            method.invoke(bean);
        } else {
            method.invoke(bean, args);
        }
    }

channelRead接收数据并转发

        /**
         * 接收消息事件
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object source) {
            try {
                byte[] dataBytes = (byte[]) source;
                JtcpContext sproutContext = new JtcpContext(ctx, dataBytes);
                RouteMethod.getInstance().invoke(RouteEnum.OnRecevie, new Object[] { sproutContext });
            } catch (Exception ex) {
            }
        }

Jtcp-example

    public static void main(String[] args) throws Exception {
        JtcpBootstrap bootstrap = new JtcpBootstrap();
        bootstrap.config().setHost("127.0.0.1");
        bootstrap.config().setPort(8030);
        bootstrap.start();
    }

    @JtcpComponet
    public class DemoHandler{

        @JtcpRoute(RouteEnum.OnRecevie)
        public void res(JtcpContext jtcpContext) {
            jtcpContext.context.writeAndFlush(jtcpContext.getRecvBytes());
            //System.err.println(BytesUtils.toHexString(context.getRecvBytes()));
        }

        @JtcpRoute(RouteEnum.OnConnect)
        public void onConnect(JtcpContext context ) {
            System.err.println("连接成功");
        }
    }

好的以上就是框架代码的基本构造,涉及到了Netty的应用、粘包半包处理,实例缓存与方法路由等内容,整体并不复杂,这里只是提供了一种服务端编码的思路,供初学者参考。

github地址:https://github.com/dafanjoy/jtcp


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK