3

基于Netty,从零开发IM(三):编码实践篇(群聊功能)

 2 years ago
source link: http://www.blogjava.net/jb2011/archive/2022/07/18/450785.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文由作者“大白菜”分享,有较多修订和改动。注意:本系列是给IM初学者的文章,IM老油条们还望海涵,勿喷!

接上两篇《IM系统设计篇》、《编码实践篇(单聊功能)》,本篇主要讲解的是通过实战编码实现IM的群聊功能,内容涉及群聊技术实现原理、编码实践等知识。

2、写在前面

建议你在阅读本文之前,务必先读本系列的前两篇《IM系统设计篇》、《编码实践篇(单聊功能)》,在着重理解IM系统的理论设计思路之后,再来阅读实战代码则效果更好。

最后,在开始本文之前,请您务必提前了解Netty的相关基础知识,可从本系列首篇《IM系统设计篇》中的“知识准备”一章开始。

3、系列文章

本文是系列文章的第3篇,以下是系列目录:

4、本篇概述

在上篇《编码实践篇(单聊功能)》中,我们主要实现了IM的单聊功能,本节主要是实现IM群聊功能。

本篇涉及的群聊核心功能,大致如下所示:

  • 1)登录:每个客户端连接服务端的时候,都需要输入自己的账号信息,以便和连接通道进行绑定;
  • 2)创建群组:输入群组 ID 和群组名称进行创建群组。需要先根据群组 ID 进行校验,判断是否已经存在了;
  • 3)查看群组:查看目前已经创建的群组列表;
  • 4)加入群组:主要参数是群组 ID 和用户 ID,用户 ID 只需从 Channel 的绑定属性里面获取即。主要是判断群组 ID 是否存在,如果存在还需要判断该用户 ID 是否已经在群组里面了;
  • 5)退出群组:主要是判断群组 ID 是否存在,如果存在则删除相应的关系;
  • 6)查看组成员:根据群组 ID 去查询对应的成员列表;
  • 7)群发消息:选择某个群进行消息发送,该群下的成员都能收到信息。主要判断群组 ID 是否存在,如果存在再去获取其对应的成员列表。

5、群聊原理

其实群聊和单聊,整体上原理是一样的,只是做了一下细节上的升级。

在首篇《IM系统设计篇》的“6、IM群聊思路设计”设计部分也做了详细的说明了。

群聊的大概流程就是:根据群组 ID 查找到所有的成员集合,然后再遍历找到每个成员对应的连接通道。

具体的群聊架构思路如下图:

1834368-20220718145127406-1362948057.png

如上图所示,群聊通讯流程技术原理如下:

  • 1)群聊和单聊整体上的思路一致:需要保存每个用户和通道的对应关系,方便后期通过用户 ID 去查找到对应的通道,再跟进通道推送消息;
  • 2)群聊把消息发送给群员的原理:其实很简单,服务端再保存另外一份映射关系,那就是聊天室和成员的映射关系。发送消息时,首先根据聊天室 ID 找到对应的所有成员,然后再跟进各个成员的 ID 去查找到对应的通道,最后由每个通道进行消息的发送;
  • 3)群成员加入某个群聊聊的时候:往映射表新增一条记录,如果成员退群的时候则删除对应的映射记录。

6、运行效果

补充说明:因为本系列文章主要目的是引导IM初学者在基于Netty的情况下,如何一步一步从零写出IM的逻辑和思维能力,因而为了简化编码实现,本篇中编码实现的客户端都是基于控制台实现的(希望不要被嫌弃),因为理解技术的本质显然比炫酷的外在表现形式更为重要。

用户登录效果图:

1834368-20220718145140914-326536990.png

群组操作效果图:

1834368-20220718145150991-143191918.png

7、实体定义实战

7.1 服务端实体

服务端映射关系的管理,分别是:

  • 1)登录信息(用户 ID 和通道);
  • 2)群组信息(群组 ID 和群组成员关系)。

主要通过两个 Map 去维护,具体如下:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

//组和成员列表关系实体

@Data

public class Group implements Serializable {

    private String groupName;

    private List<GroupMember> members=new ArrayList<GroupMember>();

//成员和连接通道的关系实体

public class GroupMember implements Serializable {

    private Integer userid;

    private Channel channel;

7.2 实体和指令关系

我们准备好相应的实体,以及实体和指令的映射关系,具体如下所示:

private static Map<Byte, Class<? extends BaseBean>> map=new HashMap<Byte,Class<? extends BaseBean>>();

    static{

        //登录的请求和响应实体

        map.put(1, LoginReqBean.class);

        map.put(2, LoginResBean.class);

        //创建群组的请求和响应实体

        map.put(3, GroupCreateReqBean.class);

        map.put(4, GroupCreateResBean.class);

        //查看群组的请求和响应实体

        map.put(5, GroupListReqBean.class);

        map.put(6, GroupListResBean.class);

        //加入群组的请求和响应实体

        map.put(7,GroupAddReqBean.class);

        map.put(8,GroupAddResBean.class);

        //退出群组的请求和响应实体

        map.put(9,GroupQuitReqBean.class);

        map.put(10,GroupQuitResBean.class);

        //查看成员列表的请求和响应实体

        map.put(11,GroupMemberReqBean.class);

        map.put(12,GroupMemberResBean.class);

        //发送响应的实体(发送消息、发送响应、接受消息)

        map.put(13,GroupSendMsgReqBean.class);

        map.put(14,GroupSendMsgResBean.class);

        map.put(15,GroupRecMsgBean.class);

通过下面这张图,能看的更清晰一些:

1834368-20220718145202472-2014963050.png
 

8、Handler定义实战

IM群聊功能的实现,我们需要两个两个业务 Handler:

  • 1)分别是客户端(ClientChatGroupHandler);
  • 2)服务端(ServerChatGroupHandler)。

8.1 客户端 Handler

客户端 Handler,主要是通过判断实体类型来做不同的业务操作,当然也可以使用 SimpleChannelInboundHandler 去进行 Handler 拆分。

public class ClientChatGroupHandler extends ChannelInboundHandlerAdapter {

    @Override

    public void channelActive(ChannelHandlerContext ctx) throws Exception {

        //在链接就绪时登录

        login(ctx.channel());

    //主要是“接受服务端”的响应信息

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if(msg instanceof LoginResBean){

            LoginResBean res=(LoginResBean) msg;

            System.out.println("登录响应:"+res.getMsg());

            if(res.getStatus()==0){

                //登录成功

                //1.给通道绑定身份

                ctx.channel().attr(AttributeKey.valueOf("userid")).set(res.getUserid());

                //2.显示操作类型【请看下面】

                deal(ctx.channel());

            }else{

                //登录失败,继续登录

                login(ctx.channel());

        }else if(msg instanceof GroupCreateResBean){

            GroupCreateResBean res=(GroupCreateResBean)msg;

            System.out.println("创建响应群组:"+res.getMsg());

        }else if(msg instanceofGroupListResBean){

            GroupListResBean res=(GroupListResBean)msg;

            System.out.println("查看群组列表:"+res.getLists());

        }elseif(msg instanceofGroupAddResBean){

            GroupAddResBean res=(GroupAddResBean)msg;

            System.out.println("加入群组响应:"+res.getMsg());

        }elseif(msg instanceof GroupQuitResBean){

            GroupQuitResBean res=(GroupQuitResBean)msg;

            System.out.println("退群群组响应:"+res.getMsg());

        }else if(msg instanceof GroupMemberResBean){

            GroupMemberResBean res=(GroupMemberResBean)msg;

            if(res.getCode()==1){

                System.out.println("查看成员列表:"+res.getMsg());

            }else{

                System.out.println("查看成员列表:"+res.getLists());

        }else if(msg instanceof GroupSendMsgResBean){

            GroupSendMsgResBean res=(GroupSendMsgResBean)msg;

            System.out.println("群发消息响应:"+res.getMsg());

        }else if(msg instanceof GroupRecMsgBean){

            GroupRecMsgBean res=(GroupRecMsgBean)msg;

            System.out.println("收到消息fromuserid="+

                               res.getFromuserid()+

                               ",msg="+res.getMsg());

通过子线程循环向输出控制台输出操作类型的方法,以下方法目前都是空方法,下面将详细讲解。

private void deal(final Channel channel){

        final Scanner scanner=new Scanner(System.in);

        new Thread(new Runnable() {

            public void run() {

                while(true){

                    System.out.println("请选择类型:0创建群组,1查看群组,2加入群组,3退出群组,4查看群成员,5群发消息");

                    int type=scanner.nextInt();

                    switch(type){

                        case 0:

                            createGroup(scanner,channel);

                            break;

                        case 1:

                            listGroup(scanner,channel);

                            break;

                        case 2:

                            addGroup(scanner,channel);

                            break;

                        case 3:

                            quitGroup(scanner,channel);

                            break;

                        case 4:

                            listMembers(scanner,channel);

                            break;

                        case 5:

                            sendMsgToGroup(scanner,channel);

                            break;

                        default:

                            System.out.println("输入的类型不存在!");

        }).start();

8.2 服务端 Handler

服务端 Handler,主要是通过判断实体类型来做不同的业务操作,当然也可以使用 SimpleChannelInboundHandler 去进行 Handler 拆分。

以下方法目前都是空方法,下面将详细讲解。

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    @Override

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if(msg instanceof LoginReqBean) {

            login((LoginReqBean) msg, ctx.channel());

        }else if(msg instanceof GroupCreateReqBean){

            //创建群组

            createGroup((GroupCreateReqBean)msg,ctx.channel());

        }else if(msg instanceof GroupListReqBean){

            //查看群组列表

            listGroup((GroupListReqBean)msg,ctx.channel());

        }else if(msg instanceof GroupAddReqBean){

            //加入群组

            addGroup((GroupAddReqBean)msg,ctx.channel());

        }else if(msg instanceof GroupQuitReqBean){

            //退出群组

            quitGroup((GroupQuitReqBean)msg,ctx.channel());

        }else if(msg instanceof GroupMemberReqBean){

            //查看成员列表

            listMember((GroupMemberReqBean)msg,ctx.channel());

        }else if(msg instanceof GroupSendMsgReqBean){

            //消息发送

            sendMsg((GroupSendMsgReqBean) msg,ctx.channel());

9、具体功能编码实战

9.1 创建群组

客户端请求:

private void createGroup(Scanner scanner,Channel channel){

        System.out.println("请输入群组ID");

        Integer groupId=scanner.nextInt();

        System.out.println("请输入群组名称");

        String groupName=scanner.next();

        GroupCreateReqBean bean=new GroupCreateReqBean();

        bean.setGroupId(groupId);

        bean.setGroupName(groupName);

        channel.writeAndFlush(bean);

服务端处理:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    private void createGroup(GroupCreateReqBean bean,Channel channel){

        //定义一个响应实体

        GroupCreateResBean res=new GroupCreateResBean();

        //查询groups是否已经存在

        Group group=groups.get(bean.getGroupId());

        //判断是否已经存在

        if(group==null){

            //定义群组实体

            Group g=new Group();

            //定义一个集合,专门存储成员

            List<GroupMember> members=new ArrayList<GroupMember>();

            //属性赋值

            g.setGroupName(bean.getGroupName());

            g.setMembers(members);

            //添加到Map里面

            groups.put(bean.getGroupId(),g);

            //响应信息

            res.setCode(0);

            res.setMsg("创建群组成功");

        }else{

            res.setCode(1);

            res.setMsg("该群组已经存在!");

        channel.writeAndFlush(res);

9.2 查看群组

客户端请求:

private void listGroup(Scanner scanner,Channel channel){

    GroupListReqBean bean=new GroupListReqBean();

    bean.setType("list");

    channel.writeAndFlush(bean);

服务端处理:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    private void listGroup(GroupListReqBean bean,Channel channel){

        if("list".equals(bean.getType())){

            //定义一个响应实体

            GroupListResBean res=new GroupListResBean();

            //定义一个集合

            List<GroupInfo> lists=new ArrayList<GroupInfo>();

            //变量groups Map集合

            for(Map.Entry<Integer, Group> entry : groups.entrySet()){

                Integer mapKey = entry.getKey();

                Group mapValue = entry.getValue();

                GroupInfo gi=new GroupInfo();

                gi.setGroupId(mapKey);

                gi.setGroupName(mapValue.getGroupName());

                lists.add(gi);

            //把集合添加到响应实体里面

            res.setLists(lists);

            //开始写到客户端

            channel.writeAndFlush(res);

9.3 加入群组

客户端请求:

private void addGroup(Scanner scanner,Channel channel){

    System.out.println("请输入加入的群组ID");

    int groupId=scanner.nextInt();

    Integer userId=(Integer) channel.attr(AttributeKey.valueOf("userid")).get();

    GroupAddReqBean bean=new GroupAddReqBean();

    bean.setUserId(userId);

    bean.setGroupId(groupId);

    channel.writeAndFlush(bean);

服务端处理:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    private void addGroup(GroupAddReqBean bean,Channel channel){

        GroupAddResBean res=new GroupAddResBean();

        //1.根据“群组ID”获取对应的“组信息”

        Group group=groups.get(bean.getGroupId());

        //2.“群组”不存在

        if(group==null){

            res.setCode(1);

            res.setMsg("groupId="+bean.getGroupId()+",不存在!");

            channel.writeAndFlush(res);

            return;

        //3.“群组”存在,则获取其底下的“成员集合”

        List<GroupMember> members=group.getMembers();

        boolean flag=false;

        //4.遍历集合,判断“用户”是否已经存在了

        for(GroupMember gm:members){

            if(gm.getUserid()==bean.getUserId()){

                flag=true;

                break;

        if(flag){

            res.setCode(1);

            res.setMsg("已经在群组里面,无法再次加入!");

        }else{

            //1.用户信息

            GroupMember gm=new GroupMember();

            gm.setUserid(bean.getUserId());

            gm.setChannel(channel);

            //2.添加到集合里面

            members.add(gm);

            //3.给“群组”重新赋值

            group.setMembers(members);

            res.setCode(0);

            res.setMsg("加入群组成功");

        channel.writeAndFlush(res);

9.4 退出群组

客户端请求:

private void quitGroup(Scanner scanner,Channel channel){

    System.out.println("请输入退出的群组ID");

    int groupId=scanner.nextInt();

    Integer userId=(Integer) channel.attr(AttributeKey.valueOf("userid")).get();

    GroupQuitReqBean bean=new GroupQuitReqBean();

    bean.setUserId(userId);

    bean.setGroupId(groupId);

    channel.writeAndFlush(bean);

服务端处理:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    private void quitGroup(GroupQuitReqBean bean,Channel channel){

        GroupQuitResBean res=new GroupQuitResBean();

        //1.根据“群组ID”获取对应的“组信息”

        Group group=groups.get(bean.getGroupId());

        if(group==null){

            //2.群组不存在

            res.setCode(1);

            res.setMsg("groupId="+bean.getGroupId()+",不存在!");

            channel.writeAndFlush(res);

            return;

        //3.群组存在,则获取其底下“成员集合”

        List<GroupMember> members=group.getMembers();

        //4.遍历集合,找到“当前用户”在集合的序号

        int index=-1;

        for(inti=0;i<members.size();i++){

            if(members.get(i).getUserid()==bean.getUserId()){

                index=i;

                break;

        //5.如果序号等于-1,则表示“当前用户”不存在集合里面

        if(index==-1){

            res.setCode(1);

            res.setMsg("userid="+bean.getUserId()+",不存在该群组里面!");

            channel.writeAndFlush(res);

            return;

        //6.从集合里面删除“当前用户”

        members.remove(index);

        //7.给“群组”的“成员列表”重新赋值

        group.setMembers(members);

        res.setCode(0);

        res.setMsg("退出群组成功");

        channel.writeAndFlush(res);

9.5 查看群组成员

客户端请求:

private void listMembers(Scanner scanner,Channel channel){

    System.out.println("请输入群组ID:");

    int groupId=scanner.nextInt();

    GroupMemberReqBean bean=new GroupMemberReqBean();

    bean.setGroupId(groupId);

    channel.writeAndFlush(bean);

服务端处理:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    private void listMember(GroupMemberReqBean bean,Channel channel){

        GroupMemberResBean res=new GroupMemberResBean();

        List<Integer> lists=new ArrayList<Integer>();

        //1.根据“群组ID”获取对应的“组信息”

        Group group=groups.get(bean.getGroupId());

        if(group==null){

            //2.查询的群组不存在

            res.setCode(1);

            res.setMsg("groupId="+bean.getGroupId()+",不存在!");

            channel.writeAndFlush(res);

        }else{

            //3.群组存在,则变量其底层的成员

            for(Map.Entry<Integer, Group> entry : groups.entrySet()){

                Group g = entry.getValue();

                List<GroupMember> members=g.getMembers();

                for(GroupMember gm:members){

                    lists.add(gm.getUserid());

            res.setCode(0);

            res.setMsg("查询成功");

            res.setLists(lists);

            channel.writeAndFlush(res);

9.6 群发消息

客户端请求:

private void sendMsgToGroup(Scanner scanner,Channel channel){

    System.out.println("请输入群组ID:");

    int groupId=scanner.nextInt();

    System.out.println("请输入发送消息内容:");

    String msg=scanner.next();

    Integer userId=(Integer) channel.attr(AttributeKey.valueOf("userid")).get();

    GroupSendMsgReqBean bean=new GroupSendMsgReqBean();

    bean.setFromuserid(userId);

    bean.setTogroupid(groupId);

    bean.setMsg(msg);

    channel.writeAndFlush(bean);

服务端处理:

public class ServerChatGroupHandler extends ChannelInboundHandlerAdapter {

    private static Map<Integer, Channel> map=new HashMap<Integer, Channel>();

    private static Map<Integer, Group> groups=new HashMap<Integer, Group>();

    privatevoidsendMsg(GroupSendMsgReqBean bean,Channel channel){

        GroupSendMsgResBean res=new GroupSendMsgResBean();

        //1.根据“群组ID”获取对应的“组信息”

        Group group=groups.get(bean.getTogroupid());

        //2.给“发送人”响应,通知其发送的消息是否成功

        if(group==null){

            res.setCode(1);

            res.setMsg("groupId="+bean.getTogroupid()+",不存在!");

            channel.writeAndFlush(res);

            return;

        }else{

            res.setCode(0);

            res.setMsg("群发消息成功");

            channel.writeAndFlush(res);

        //3.根据“组”下面的“成员”,变量并且逐个推送消息

        List<GroupMember> members=group.getMembers();

        for(GroupMember gm:members){

            GroupRecMsgBean rec=new GroupRecMsgBean();

            rec.setFromuserid(bean.getFromuserid());

            rec.setMsg(bean.getMsg());

            gm.getChannel().writeAndFlush(rec);

10、本篇小结

本篇中涉及的功能点稍微有点多,主要是实现了群聊的几个核心功能,分别是:创建群组、查看群组列表、加入群组、退出群组、查看成员列表、群发消息。

这些功能经过拆解,看起来就不是那么复杂了,希望大家都可以亲自动手实现一遍,加深理解,提高学习效果。

实际上,真正的产品级IM中,群聊涉及的技术细节是非常多的,有兴趣可以详读下面这几篇:

11、参考资料

[1] 手把手教你用Netty实现心跳机制、断线重连机制

[2] 自已开发IM很难?手把手教你撸一个Andriod版IM

[3] 基于Netty,从零开发一个IM服务端

[4] 拿起键盘就是干,教你徒手开发一套分布式IM系统

[5] 正确理解IM长连接、心跳及重连机制,并动手实现

[6] 手把手教你用Go快速搭建高性能、可扩展的IM系统

[7] 手把手教你用WebSocket打造Web端IM聊天

[8] 万字长文,手把手教你用Netty打造IM聊天

[9] 基于Netty实现一套分布式IM系统

[10] 基于Netty,搭建高性能IM集群(含技术思路+源码)

[11] SpringBoot集成开源IM框架MobileIMSDK,实现即时通讯IM聊天功能

(本文已同步发布于:http://www.52im.net/thread-3981-1-1.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK