这一章节将实现客户端登录的相关逻辑,客户端所涉及的模块大致如下:
首先创建chatclient模块,客户端的代码大多在该模块中实现。
ClientCommand模块
这里我们将要实现4个基础的命令,这几个命令的作用主要是用来收集用户的输入:
- 菜单命令类(ClientCommandMenu):主要用于列出支持的命令菜单,以及响应用户输入的命令类型
- 登录命令(LoginConsoleCommand)
这些Command类都会实现一个BaseCommand:
public interface BaseCommand {
* 命令的执行
* @param scanner
void exec(Scanner scanner);
* 用于识别每个命令的key
* @return
String getKey();
* 命令的提示信息
* @return
String getTip();
ClientCommandMenu
该命令的作用主要用于列出支持的命令菜单,以及响应用户输入的命令类型。首先添加spring-boot-starter
、chatocommon模块以及lombok的依赖:
<dependency>
<artifactId>chatcommon</artifactId>
<groupId>cn.didadu</groupId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
实现ClientCommandMenu类:
@Service
@Data
public class ClientCommandMenu implements BaseCommand{
public static final String KEY = "0";
private String allCommandsShow;
private String commandInput;
@Override
public void exec(Scanner scanner) {
// 使用err,立即输出
System.err.println("请输入某个操作指令:");
System.err.println(allCommandsShow);
// 获取第一个指令(next读取到空白符就结束了)
commandInput = scanner.next();
@Override
public String getKey() {
return KEY;
@Override
public String getTip() {
return "show 所有命令";
这里没有给allCommandsShow赋值,后面再看。
LoginConsoleCommand
该命令负责从Scanner控制台实例收集客户端登录的用户ID和密码,实现如下:
@Service
@Data
public class LoginConsoleCommand implements BaseCommand{
public static final String KEY = "1";
private String userName;
private String password;
@Override
public void exec(Scanner scanner) {
System.out.println("请输入用户信息(id:password) ");
String[] info = null;
while (true) {
String input = scanner.next();
info = input.split(":");
if (info.length != 2) {
System.out.println("请按照格式输入(id:password):");
} else {
break;
userName = info[0];
password = info[1];
@Override
public String getKey() {
return KEY;
@Override
public String getTip() {
return "登录";
ProtoBufBuilder模块
该模块存放的就是各种消息Bean,没有什么实际的逻辑。
BaseBuilder
Builder消息都存储在chatClient模块的protoBuilder包下,它们继承自BaseBuilder:
public class BaseBuilder {
protected ProtoMsg.HeadType type;
private long seqId;
private ClientSession session;
public BaseBuilder(ProtoMsg.HeadType type, ClientSession session) {
this.type = type;
this.session = session;
* 构建消息 基础部分
public ProtoMsg.Message buildCommon(long seqId) {
this.seqId = seqId;
ProtoMsg.Message.Builder mb =
ProtoMsg.Message
.newBuilder()
.setType(type)
.setSessionId(session.getSessionId())
.setSequence(seqId);
return mb.buildPartial();
其中用到了ClientSession,该类在chatClient模块的client包下:
@Data
@Slf4j
public class ClientSession {
public static final AttributeKey<ClientSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY");
* 用户实现客户端会话管理的核心
private Channel channel;
* 用户信息
private User user;
* 保存登录后的服务端sessionId
private String sessionId;
* 是否已建立连接
private boolean connected = false;
* 是否已登录
private boolean login = false;
* session中存储的session 变量属性值
private Map<String, Object> map = new HashMap<String, Object>();
* 绑定通道
* @param channel
public ClientSession(Channel channel) {
this.channel = channel;
this.sessionId = String.valueOf(-1);
channel.attr(ClientSession.SESSION_KEY).set(this);
LoginMsgBuilder
public class LoginMsgBuilder extends BaseBuilder {
private final User user;
public LoginMsgBuilder(User user, ClientSession session) {
super(ProtoMsg.HeadType.LOGIN_REQUEST, session);
this.user = user;
public ProtoMsg.Message build() {
ProtoMsg.Message message = buildCommon(-1);
ProtoMsg.LoginRequest.Builder lb =
ProtoMsg.LoginRequest.newBuilder()
.setDeviceId(user.getDevId())
.setPlatform(user.getPlatform().getCode())
.setToken(user.getToken())
.setUid(user.getUid());
return message.toBuilder().setLoginRequest(lb).build();
public static ProtoMsg.Message buildLoginMsg(
User user, ClientSession session) {
LoginMsgBuilder builder = new LoginMsgBuilder(user, session);
return builder.build();
HeartBeatMsgBuilder
public class HeartBeatMsgBuilder extends BaseBuilder {
private final User user;
public HeartBeatMsgBuilder(User user, ClientSession session) {
super(ProtoMsg.HeadType.KEEPALIVE_REQUEST, session);
this.user = user;
public ProtoMsg.Message buildMsg() {
ProtoMsg.Message message = buildCommon(-1);
ProtoMsg.MessageHeartBeat.Builder lb = ProtoMsg.MessageHeartBeat.newBuilder()
.setSeq(0)
.setJson("{\"from\":\"client\"}")
.setUid(user.getUid());
return message.toBuilder().setHeartBeat(lb).build();
Sender模块
该模块的作用是用来发送数据包的。
BaseSender
所有Sender的基类:
@Slf4j
@Data
public abstract class BaseSender {
private User user;
private ClientSession session;
* 是否连接中
* @return
public boolean isConnected() {
if (null == session) {
log.info("session is null");
return false;
return session.isConnected();
* 是否登录中
* @return
public boolean isLogin() {
if (null == session) {
log.info("session is null");
return false;
return session.isLogin();
public void sendMsg(ProtoMsg.Message message) {
if (null == getSession() || !isConnected()) {
log.info("连接还没成功");
return;
Channel channel = session.getChannel();
ChannelFuture f = channel.writeAndFlush(message);
// ChannelFuture有结果了会在addListener中执行operationComplete方法
f.addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
if (f.isSuccess()) {
sendSucced(message);
} else {
sendfailed(message);
f.sync();
} catch (InterruptedException e) {
e.printStackTrace();
protected void sendSucced(ProtoMsg.Message message) {
log.info("发送成功");
protected void sendfailed(ProtoMsg.Message message) {
log.info("发送失败");
LoginSender
发送登录消息:
@Slf4j
@Service
public class LoginSender extends BaseSender {
public void sendLoginMsg() {
if (!isConnected()) {
log.info("还没有建立连接!");
return;
log.info("构造登录消息");
ProtoMsg.Message message = LoginMsgBuilder.buildLoginMsg(getUser(), getSession());
log.info("发送登录消息");
super.sendMsg(message);
Handler模块
处理器模块,主要是服务端响应处理器,都是入站处理器。
HeartBeatClientHandler
用来处理心跳消息,首先在client模块的ClientSession添加获取Sessiont的方法:
@Data
@Slf4j
public class ClientSession {
......
* 从Channel中获取ClientSession
* @param ctx
* @return
public static ClientSession getSession(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
return channel.attr(ClientSession.SESSION_KEY).get();
HeartBeatClientHandler实现:
@Slf4j
@ChannelHandler.Sharable
@Service
public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
// 心跳的时间间隔,单位为s
private static final int HEARTBEAT_INTERVAL = 100;
* 在Handler被加入到Pipeline时,开始发送心跳
* 然后每100秒递归发送信条消息
* @param ctx
* @throws Exception
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ClientSession session = ClientSession.getSession(ctx);
User user = session.getUser();
HeartBeatMsgBuilder builder = new HeartBeatMsgBuilder(user, session);
ProtoMsg.Message message = builder.buildMsg();
// 发送心跳
heartBeat(ctx, message);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
super.channelRead(ctx, msg);
return;
// 判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = pkg.getType();
if (headType.equals(ProtoMsg.HeadType.KEEPALIVE_RESPONSE)) {
log.info(" 收到回写的 HEART_BEAT 消息 from server");
return;
} else {
super.channelRead(ctx, msg);
* 使用定时器,发送心跳报文
* @param ctx
* @param heartbeatMsg
public void heartBeat(ChannelHandlerContext ctx, ProtoMsg.Message heartbeatMsg) {
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info(" 发送 HEART_BEAT 消息 to server");
ctx.writeAndFlush(heartbeatMsg);
// 递归调用,发送下一次的心跳
heartBeat(ctx, heartbeatMsg);
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
LoginResponceHandler
用来处理登录消息的返回逻辑,首先在common模块中添加常量信息:
public class ProtoInstant {
......
@AllArgsConstructor
public enum ResultCodeEnum {
SUCCESS(0, "登录成功"),
AUTH_FAILED(1, "登录失败"),
NO_TOKEN(2, "没有授权码"),
UNKNOWN_ERROR(3, "未知错误");
@Getter
private Integer code;
@Getter
private String desc;
client模块的ClientSession添加登录方法:
@Data
@Slf4j
public class ClientSession {
......
* 登录成功之后,设置sessionId
* @param ctx
* @param pkg
public static void loginSuccess(ChannelHandlerContext ctx, ProtoMsg.Message pkg) {
Channel channel = ctx.channel();
ClientSession session = channel.attr(ClientSession.SESSION_KEY).get();
session.setSessionId(pkg.getSessionId());
session.setLogin(true);
log.info("登录成功");
LoginResponseHandler实现:
@Slf4j
@ChannelHandler.Sharable
@Service()
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof ProtoMsg.Message)) {
// 传递消息到下一个Handler
super.channelRead(ctx, msg);
return;
// 判断类型
ProtoMsg.Message pkg = (ProtoMsg.Message) msg;
ProtoMsg.HeadType headType = ((ProtoMsg.Message) msg).getType();
if (!headType.equals(ProtoMsg.HeadType.LOGIN_RESPONSE)) {
// 如果不是LOGIN_RESPONSE,传递到下一个Handler
super.channelRead(ctx, msg);
return;
//判断返回是否成功
ProtoMsg.LoginResponse info = pkg.getLoginResponse();
ProtoInstant.ResultCodeEnum result = ProtoInstant.ResultCodeEnum.values()[info.getCode()];
if (!result.equals(ProtoInstant.ResultCodeEnum.SUCCESS)) {
//登录失败
log.info(result.getDesc());
} else {
// 登录成功,设置session id
ClientSession.loginSuccess(ctx, pkg);
ChannelPipeline p = ctx.pipeline();
// 移除登录响应处理器(从该连接的pipeline中移除LoginResponseHandler)
p.remove(this);
// 在编码器后面,动态插入心跳处理器
p.addAfter("encoder", "heartbeat", new HeartBeatClientHandler());
ExceptionHandler
用来处理Handler中抛出的异常,首先在ClientSession中添加close方法:
@Data
@Slf4j
public class ClientSession {
......
* 关闭通道
public void close() {
connected = false;
ChannelFuture future = channel.close();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
log.error("连接顺利断开");
ExceptionHandler实现,其中重连操作后面再完善:
@Slf4j
@ChannelHandler.Sharable
@Service
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof InvalidFrameException) {
// 协议异常,直接关闭
log.error(cause.getMessage());
ClientSession.getSession(ctx).close();
} else {
// 其它异常,捕捉异常信息
log.error(cause.getMessage());
ctx.close();
// TODO 重连
* 通道 Read 读取 Complete 完成
* 将缓冲区中的数据刷新到对端
* @param ctx
* @throws Exception
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
Concurrent模块
在客户端进程中需要用到多线程,在common模块的cocurrent包中添加工具类。
ExecuteTask
任务接口:
public interface ExecuteTask {
void execute();
FutureTaskScheduler
线程任务:
@Slf4j
public class FutureTaskScheduler extends Thread {
* 任务队列
private ConcurrentLinkedQueue<ExecuteTask> executeTaskQueue = new ConcurrentLinkedQueue<ExecuteTask>();
* 线程休眠时间
private long sleepTime = 200;
* 固定10个的线程池
* 用来从队列中获取需要执行的CallbackTask
private ExecutorService pool = Executors.newFixedThreadPool(10);
private static FutureTaskScheduler inst = new FutureTaskScheduler();
* 私有构造函数,直接启动线程
private FutureTaskScheduler() {
this.start();
* 添加任务
* @param executeTask
public static void add(ExecuteTask executeTask) {
inst.executeTaskQueue.add(executeTask);
@Override
public void run() {
while (true) {
// 处理任务
handleTask();
threadSleep(sleepTime);
* 线程休眠
* @param time
private void threadSleep(long time) {
sleep(time);
} catch (InterruptedException e) {
log.error(e.getLocalizedMessage());
* 处理任务队列,检查其中是否有任务
private void handleTask() {
ExecuteTask executeTask;
while (executeTaskQueue.peek() != null) {
executeTask = executeTaskQueue.poll();
pool.execute(new ExecuteRunnable(executeTask));
} catch (Exception e) {
log.error(e.getLocalizedMessage());
class ExecuteRunnable implements Runnable {
ExecuteTask executeTask;
ExecuteRunnable(ExecuteTask executeTask) {
this.executeTask = executeTask;
@Override
public void run() {
executeTask.execute();
组装各个模块
NettyClient
Netty客户端实现,首先添加配置项:
server:
ip: localhost
port: 8080
@Slf4j
@Service
public class NettyClient {
* 服务器ip地址
@Value("${server.ip}")
private String host;
* 服务器端口
@Value("${server.port}")
private int port;
@Autowired
private LoginResponseHandler loginResponseHandler;
@Autowired
private ExceptionHandler exceptionHandler;
EventLoopGroup eventLoopGroup;
@Setter
private GenericFutureListener<ChannelFuture> connectedListener;
public NettyClient() {
eventLoopGroup = new NioEventLoopGroup();
public void doConnect() {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.remoteAddress(host, port);
// 设置通道初始化
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast("decoder", new ProtoBufDecoder());
ch.pipeline().addLast("encoder", new ProtoBufEncoder());
ch.pipeline().addLast(loginResponseHandler);
ch.pipeline().addLast(exceptionHandler);
log.info("客户端开始连接 [疯狂创客圈IM]");
ChannelFuture f = bootstrap.connect();
f.addListener(connectedListener);
} catch (Exception e) {
log.info("客户端连接失败!" + e.getMessage());
public void close() {
eventLoopGroup.shutdownGracefully();
CommandController
负责收集用户在控制台输入的命令,根据响应的命令类型调用响应的命令处理器合收集相关的信息。
@Slf4j
@Service
public class CommandController {
* 登录命令收集类
@Autowired
private LoginConsoleCommand loginConsoleCommand;
* 菜单命令收集类
@Autowired
private ClientCommandMenu clientCommandMenu;
@Autowired
private NettyClient nettyClient;
@Autowired
private LoginSender loginSender;
* 与服务端连接的通道
private Channel channel;
* 连接状态标记
private boolean connectFlag = false;
* 客户端会话
private ClientSession session;
* 客户端命令Map
private Map<String, BaseCommand> commandMap;
* 显式在中断的字符串(展示出所有支持的命令行)
private String menuString;
private User user;
* 初始化命令Map
public void initCommandMap() {
commandMap = new HashMap<>();
commandMap.put(clientCommandMenu.getKey(), clientCommandMenu);
commandMap.put(loginConsoleCommand.getKey(), loginConsoleCommand);
Set<Map.Entry<String, BaseCommand>> entrys = commandMap.entrySet();
Iterator<Map.Entry<String, BaseCommand>> iterator = entrys.iterator();
StringBuilder menus = new StringBuilder();
menus.append("[menu] ");
while (iterator.hasNext()) {
BaseCommand next = iterator.next().getValue();
menus.append(next.getKey())
.append("->")
.append(next.getTip())
.append(" | ");
menuString = menus.toString();
// 在这里设置了ClientCommandMenu.allCommandsShow
clientCommandMenu.setAllCommandsShow(menuString);
GenericFutureListener<ChannelFuture> closeListener = (ChannelFuture f) -> {
log.info(new Date() + ": 连接已经断开……");
channel = f.channel();
// 创建会话
ClientSession session = channel.attr(ClientSession.SESSION_KEY).get();
session.close();
// 唤醒用户线程
notifyCommandThread();
* 连接通道监听
GenericFutureListener<ChannelFuture> connectedListener = (ChannelFuture f) -> {
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("连接失败!在10s之后准备尝试重连!");
eventLoop.schedule(
() -> nettyClient.doConnect(),
TimeUnit.SECONDS);
connectFlag = false;
} else {
connectFlag = true;
log.info("疯狂创客圈 IM 服务器 连接成功!");
channel = f.channel();
// 创建会话
session = new ClientSession(channel);
session.setConnected(true);
// 连接上之后添加通道关闭监听
channel.closeFuture().addListener(closeListener);
// 唤醒用户线程
notifyCommandThread();
public synchronized void notifyCommandThread() {
// 唤醒,命令收集程
this.notify();
public synchronized void waitCommandThread() {
// 休眠,命令收集线程
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
* 启动连接
public void startConnectServer() {
FutureTaskScheduler.add(() -> {
nettyClient.setConnectedListener(connectedListener);
nettyClient.doConnect();
* 启动Command线程
* @throws InterruptedException
public void startCommandThread() throws InterruptedException {
Thread.currentThread().setName("命令线程");
while (true) {
// 建立连接
while (!connectFlag) {
// 开始连接
startConnectServer();
// 暂停命令线程
waitCommandThread();
// 处理命令
while (null != session) {
// 获取命令行输入的命令key,通过key找到命令
Scanner scanner = new Scanner(System.in);
clientCommandMenu.exec(scanner);
String key = clientCommandMenu.getCommandInput();
BaseCommand command = commandMap.get(key);
if (null == command) {
System.err.println("无法识别[" + command + "]指令,请重新输入!");
continue;
switch (key) {
case LoginConsoleCommand.KEY:
command.exec(scanner);
startLogin((LoginConsoleCommand) command);
break;
private void startLogin(LoginConsoleCommand command) {
if (!connectFlag) {
log.info("连接异常,请重新建立连接");
return;
User user = User.builder()
.uid(command.getUserName())
.token(command.getPassword())
.platform(User.PlatForm.WEB)
.devId("1111").build();
this.user = user;
session.setUser(user);
loginSender.setUser(user);
loginSender.setSession(session);
loginSender.sendLoginMsg();
ClientApplication
SpringBoot启动类:
@SpringBootApplication
public class ClientApplication {
public static void main(String[] args) {
// 启动并初始化 Spring 环境及其各 Spring 组件
ApplicationContext context = SpringApplication.run(ClientApplication.class, args);
CommandController commandClient = context.getBean(CommandController.class);
commandClient.initCommandMap();
commandClient.startCommandThread();
} catch (InterruptedException e) {
e.printStackTrace();
首先启动好服务端,然后启动客户端,在命令行操作。
客户端的输出如下:
2021-03-22 11:52:51.039 INFO 52704 --- [ main] cn.didadu.chatclient.ClientApplication : Started ClientApplication in 1.942 seconds (JVM running for 3.334)
2021-03-22 11:52:51.276 INFO 52704 --- [pool-1-thread-1] cn.didadu.chatclient.client.NettyClient : 客户端开始连接 [疯狂创客圈IM]
2021-03-22 11:52:51.364 INFO 52704 --- [ntLoopGroup-3-1] c.d.chatclient.client.CommandController : 疯狂创客圈 IM 服务器 连接成功!
请输入某个操作指令:
[menu] 0->show 所有命令 | 1->登录 |
请输入用户信息(id:password)
bboyjing:123
2021-03-22 11:53:00.615 INFO 52704 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 构造登录消息
2021-03-22 11:53:00.712 INFO 52704 --- [ 命令线程] cn.didadu.chatclient.sender.LoginSender : 发送登录消息
2021-03-22 11:53:00.770 INFO 52704 --- [ntLoopGroup-3-1] cn.didadu.chatclient.sender.BaseSender : 发送成功
请输入某个操作指令:
[menu] 0->show 所有命令 | 1->登录 |
2021-03-22 11:53:00.830 INFO 52704 --- [ntLoopGroup-3-1] c.d.chatclient.client.ClientSession : 登录成功
2021-03-22 11:54:40.841 INFO 52704 --- [ntLoopGroup-3-1] c.d.c.handler.HeartBeatClientHandler : 发送 HEART_BEAT 消息 to server
2021-03-22 11:54:40.846 INFO 52704 --- [ntLoopGroup-3-1] c.d.c.handler.HeartBeatClientHandler : 收到回写的 HEART_BEAT 消息 from server
......
服务端的输出如下:
2021-03-22 11:52:38.501 INFO 52702 --- [ main] cn.didadu.chatserver.ServerApplication : Started ServerApplication in 1.268 seconds (JVM running for 2.025)
2021-03-22 11:52:38.681 INFO 52702 --- [ main] cn.didadu.chatserver.server.ChatServer : 疯狂创客圈 CrazyIM 服务启动, 端口 /0:0:0:0:0:0:0:0:8080
2021-03-22 11:53:00.804 INFO 52702 --- [pool-1-thread-1] cn.didadu.chatcommon.bean.User : 登录中: User(uid=bboyjing, devId=1111, token=123, nickName=null, platform=null, intPlatFrom=5, sessionId=null)
2021-03-22 11:53:00.806 INFO 52702 --- [pool-1-thread-1] c.d.chatserver.server.ServerSession : ServerSession 绑定会话 /127.0.0.1:55130
2021-03-22 11:53:00.806 INFO 52702 --- [pool-1-thread-1] cn.didadu.chatserver.server.SessionMap : 用户登录:id= bboyjing 在线总数: 1
2021-03-22 11:53:00.823 INFO 52702 --- [pool-1-thread-1] c.d.c.handler.LoginRequestHandler : 登录成功:User(uid=bboyjing, devId=1111, token=123, nickName=null, platform=null, intPlatFrom=5, sessionId=50eb2bbb34f54b0f96e3147bd88058ed)
2021-03-22 11:54:40.845 INFO 52702 --- [pool-2-thread-1] c.d.c.handler.HeartBeatServerHandler : 收到 HEART_BEAT 消息 from client
......
从log可以看出,到目前为止完成了简单的登录功能,下一章节来补充聊天功能。