在websocket中,服务端主要使用的是session打交道,但是由于session无法实现序列化,不能存储到redis这些中间存储里面,因此这里我们只能把session存储在本地的内存中,那么如果是集群的话,我们如何实现session准确的发送消息呢,其实就是session共享。在websocket中,其实是无法做到session共享的,目前通用的解决方案都是通过消息中间件,实现消息的发布与订阅,也就是每一个服务端实例都订阅某个消息队列的topic,根据对应的sessionid来判断是否在本地存储,如果在本地通过sessionid找到了session,则给客户端发送消息,如果在本地找不到对应的session,那么就直接把这条消息丢弃掉。具体的如下图所示:
这里的图来自于网上,网上大多都是基于redis做发布与订阅,在真实的环境中,我们一般用kafka或者rocketmq等。根据上面的图示,我们介绍下整个流程:
1、我们同时有A,B,C,D四个websocket服务端,同时订阅消息队列的topic: test8 2、我们发送一条消息a1到消息队列的topic:test8 3、此时A,B,C,D四个websocket服务端都会收到这条消息a1 4、A根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。 5、B根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。 6、C根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,如果没有直接放弃掉此条消息。 7、D根据a1的消息体,获取到对应的sessionid,然后在本地的map中查找是否有对应的session,结果找到有对应的session,此时我们就把这条消息发送给=这个session。 8、客户端就收到了对应的消息。
整个流程就如下,下面我们使用代码演示下,这里的代码我们还是沿用上一篇文章《Websocket系列(二)解决websocket不能使用@Autowired的问题》的源码做改造。
一、创建一个公共的map,用来存放session
package com.websocket.utils; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.Session; import org.springframework.stereotype.Component; @Component public class OnlineSessionCache { private ConcurrentHashMap<Integer, Session> onlines = new ConcurrentHashMap<Integer, Session>(); public void setUserSession(Integer userId, Session session) { onlines.put(userId, session); } public Session getUserSession(Integer userId) { return onlines.get(userId); } public void removeUserSession(Integer userId) { onlines.remove(userId); } public ConcurrentHashMap<Integer, Session> getAllSession() { return this.onlines; } }
二、在websocket连接和关闭的时候,把session关闭掉
@OnOpen public void onOpen(Session session,EndpointConfig config) { this.session = session; log.info("当前session id : {} 登录进来了", session.getId()); OnlineCalUtils.addOnlineCount(); onlineSessionCache.setUserSession(Integer.valueOf(session.getId()), session); log.info("存储session了多少个session:{}", onlineSessionCache.getAllSession().size()); log.info("有新连接加入!当前在线人数为 :{} ", getOnlineCount()); }
@OnClose public void onClose() { OnlineCalUtils.subOnlineCount(); log.info("有一连接关闭!当前在线人数为: {}", getOnlineCount()); onlineSessionCache.removeUserSession(Integer.valueOf(this.session.getId())); log.info("当前session id : {} 退出去了"); }
三、编写一个接口,用来给指定的用户发送消息
package com.websocket.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.websocket.model.ChatModel; import com.websocket.producer.RocketProducer; import com.websocket.utils.ChatModelUtils; import lombok.extern.slf4j.Slf4j; @RestController @Slf4j public class ChatMsgController { @Autowired private RocketProducer rocketProducer; @RequestMapping("/sendToSimpleUser") public String sendToSimpleUser(Integer fromUserId,Integer toUserId) { ChatModel model = ChatModelUtils.createNewChatModel(fromUserId, toUserId, "手动发送消息"); rocketProducer.sendDirectMessage(model); return "成功"; } }
这里我们是把消息直接发送给了rocketmq里面,发送者代码如下:
package com.websocket.producer; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.websocket.model.ChatModel; @Component public class RocketProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendDirectMessage(ChatModel message) { String msg = JSON.toJSONString(message); rocketMQTemplate.syncSend("test8", msg); } }
四、编写消费者,获取mq的消息,并且发送消息给对应的session
package com.websocket.producer; import javax.websocket.Session; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.alibaba.fastjson.JSON; import com.websocket.model.ChatModel; import com.websocket.product.SocketServerProduct; import com.websocket.utils.OnlineSessionCache; import lombok.extern.slf4j.Slf4j; @Component @Slf4j @RocketMQMessageListener(topic = "test8", consumerGroup = "${chat.group.groupname}") public class RocketConsumer implements RocketMQListener<String>{ @Autowired private OnlineSessionCache onlineSessionCache; @Autowired private SocketServerProduct socketServerProduct; @Value("${chat.group.groupname}") private String groupName; @Override public void onMessage(String message) { log.info("监听到的topic是:{} groupname是:{}","test8",groupName); ChatModel model = JSON.parseObject(message, ChatModel.class); Integer userId = model.getToUserId(); Session session = onlineSessionCache.getUserSession(userId); if (null != session) { log.info("找到了对应的session,准备回复消息"); socketServerProduct.sendMessage(session, model.getMessage()); }else { log.info("没有找到对应的session,准备丢弃"); } } }
以上就是一个完整的关于websocket服务端集群关于session共享的解决方案。
这里我们测试一下,首先建立一个连接
此时可以看到sessionid为0的用户连接进来了
然后我们给这个用户发送消息
查看下客户端是否收到消息了
可以看到收到了消息。
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...