目 录CONTENT

文章目录

SpringBoot搭建Netty实现消息发送

芈亓
2022-03-11 / 0 评论 / 1 点赞 / 685 阅读 / 6,700 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-04-12,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

SpringBoot搭建Netty实现消息发送

一.导入Netty依赖

<dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.25.Final</version>
        </dependency>

二.搭建websocket服务器

@Component
public class WebSocketServer {

    /**
     * 主线程池
     */
    private EventLoopGroup bossGroup;
    /**
     * 工作线程池
     */
    private EventLoopGroup workerGroup;
    /**
     * 服务器
     */
    private ServerBootstrap server;
    /**
     *  回调
     */
    private ChannelFuture future;

    public void start() {
        future = server.bind(9001);
        System.out.println("netty server - 启动成功");
    }

    public WebSocketServer() {
        bossGroup = new NioEventLoopGroup();
        workerGroup = new NioEventLoopGroup();

        server = new ServerBootstrap();
        server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WebsocketInitializer());
    }
}

三.初始化Websocket

public class WebsocketInitializer extends ChannelInitializer<SocketChannel> {
  
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        // ------------------
        // 用于支持Http协议
        // ------------------
        // websocket基于http协议,需要有http的编解码器
        pipeline.addLast(new HttpServerCodec());
        // 对写大数据流的支持
        pipeline.addLast(new ChunkedWriteHandler());
        // 添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用
        //设置单次请求的文件的大小
        pipeline.addLast(new HttpObjectAggregator(1024 * 64));
        //webSocket 服务器处理的协议,用于指定给客户端连接访问的路由 :/ws
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
        // 添加Netty空闲超时检查的支持
        // 1. 读空闲超时(超过一定的时间会发送对应的事件消息)
        // 2. 写空闲超时
        // 3. 读写空闲超时
        pipeline.addLast(new IdleStateHandler(4, 8, 12));
        //添加心跳处理
        pipeline.addLast(new HearBeatHandler());
        // 添加自定义的handler
        pipeline.addLast(new ChatHandler());

    }
}

四.创建Netty监听器

@Component
public class NettyListener implements ApplicationListener<ContextRefreshedEvent> {

    @Resource
    private WebSocketServer websocketServer;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if(event.getApplicationContext().getParent() == null) {
            try {
                websocketServer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

五.建立消息通道

public class UserChannelMap {
    /**
     * 用户保存用户id与通道的Map对象
     */
//    private static Map<String, Channel> userChannelMap;

   /* static {
        userChannelMap = new HashMap<String, Channel>();
    }*/

    /**
     * 定义一个channel组,管理所有的channel
     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     * 存放用户与Chanel的对应信息,用于给指定用户发送消息
     */
    private static ConcurrentHashMap<String,Channel> userChannelMap = new ConcurrentHashMap<>();

    private UserChannelMap(){}
    /**
     * 添加用户id与channel的关联
     * @param userNum
     * @param channel
     */
    public static void put(String userNum, Channel channel) {
        userChannelMap.put(userNum, channel);
    }

    /**
     * 根据用户id移除用户id与channel的关联
     * @param userNum
     */
    public static void remove(String userNum) {
        userChannelMap.remove(userNum);
    }

    /**
     * 根据通道id移除用户与channel的关联
     * @param channelId 通道的id
     */
    public static void removeByChannelId(String channelId) {
        if(!StringUtils.isNotBlank(channelId)) {
            return;
        }
        for (String s : userChannelMap.keySet()) {
            Channel channel = userChannelMap.get(s);
            if(channelId.equals(channel.id().asLongText())) {
                System.out.println("客户端连接断开,取消用户" + s + "与通道" + channelId + "的关联");
                userChannelMap.remove(s);
                UserService userService = SpringUtil.getBean(UserService.class);
                userService.logout(s);
                break;
            }
        }
    }

    /**
     * 打印所有的用户与通道的关联数据
     */
    public static void print() {
        for (String s : userChannelMap.keySet()) {
            System.out.println("用户id:" + s + " 通道:" + userChannelMap.get(s).id());
        }
    }

    /**
     * 根据好友id获取对应的通道
     * @param receiverNum 接收人编号
     * @return Netty通道
     */
    public static Channel get(String receiverNum) {
        return userChannelMap.get(receiverNum);
    }

    /**
     * 获取channel组
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }

    /**
     * 获取用户channel map
     * @return
     */
    public static ConcurrentHashMap<String,Channel> getUserChannelMap(){
        return userChannelMap;
    }
}

六.自定义消息类型


public class Message {
    /**
     * 消息类型
     */
    private Integer type;
    /**
     * 聊天消息
     */
    private String message;
    /**
     * 扩展消息字段
     */
    private Object ext;
    public Integer getType() {
        return type;
    }

    public void setType(Integer type) {
        this.type = type;
    }

    public MarketChatRecord getChatRecord() {
        return marketChatRecord;
    }
    public void setChatRecord(MarketChatRecord chatRecord) {
        this.marketChatRecord = chatRecord;
    }

    public Object getExt() {
        return ext;
    }

    public void setExt(Object ext) {
        this.ext = ext;
    }

    @Override
    public String toString() {
        return "Message{" +
                "type=" + type +
                ", marketChatRecord=" + marketChatRecord +
                ", ext=" + ext +
                '}';
    }

}

七.创建处理消息的handler

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);


    /**
     * 用来保存所有的客户端连接
     */
    private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    /**
     *当Channel中有新的事件消息会自动调用
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 当接收到数据后会自动调用
        // 获取客户端发送过来的文本消息
        Gson gson = new Gson();
        log.info("服务器收到消息:{}",msg.text());
        System.out.println("接收到消息数据为:" + msg.text());
        Message message = gson.fromJson(msg.text(), Message.class);    
//根据业务要求进行消息处理
        switch (message.getType()) {
            // 处理客户端连接的消息
            case 0:
                // 建立用户与通道的关联
            // 处理客户端发送好友消息
             break;
            case 1:
            // 处理客户端的签收消息
             break;
            case 2:
                // 将消息记录设置为已读
                break;
            case 3:
                // 接收心跳消息
                break;
            default:
                break;
        }

    }

    // 当有新的客户端连接服务器之后,会自动调用这个方法
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerAdded 被调用"+ctx.channel().id().asLongText());
        // 添加到channelGroup 通道组
        UserChannelMap.getChannelGroup().add(ctx.channel());
//        clients.add(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.info("{异常:}"+cause.getMessage());
        // 删除通道
        UserChannelMap.getChannelGroup().remove(ctx.channel());
        UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
        ctx.channel().close();
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        log.info("handlerRemoved 被调用"+ctx.channel().id().asLongText());
        //删除通道
        UserChannelMap.getChannelGroup().remove(ctx.channel());
        UserChannelMap.removeByChannelId(ctx.channel().id().asLongText());
        UserChannelMap.print();
    }

}

八.处理心跳

public class HearBeatHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if(evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent)evt;

            if(idleStateEvent.state() == IdleState.READER_IDLE) {
                System.out.println("读空闲事件触发...");
            }
            else if(idleStateEvent.state() == IdleState.WRITER_IDLE) {
                System.out.println("写空闲事件触发...");
            }
            else if(idleStateEvent.state() == IdleState.ALL_IDLE) {
                System.out.println("---------------");
                System.out.println("读写空闲事件触发");
                System.out.println("关闭通道资源");
                ctx.channel().close();
            }
        }
    }
}

搭建完成后调用测试

1.页面访问http://localhost:9001/ws
2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送 :用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送 :用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。

1

评论区