最近接触到一个银行的项目对接,原以为使用现在成熟的http进行交互,结果银行提供的文档是socket的,因此需要我们编写socket服务端供银行进行调用。所以没办法,需要来操作一下这个socket的服务端项目。下面直接开始演示
一、创建一个maven项目,并且引入下面的依赖
<dependency> <groupId>com.corundumstudio.socketio</groupId> <artifactId>netty-socketio</artifactId> <version>1.7.7</version> </dependency>
这里我们直接使用封装好的netty-socketio进行对接。所以我们引入的是netty-socketio的代码。
二、创建socket的配置文件
这里使用socket的话,我们需要创建对应的配置文件,然后在socket初始化的时候读取对应的配置进行初始化,所以这里我们编写读取配置文件的类,完整代码如下:
package com.socket.demo.config; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.corundumstudio.socketio.SocketConfig; import com.corundumstudio.socketio.SocketIOServer; @Configuration public class SocketIOConfig { @Value("${socketio.host}") private String host; @Value("${socketio.port}") private Integer port; @Value("${socketio.bossCount}") private int bossCount; @Value("${socketio.workCount}") private int workCount; @Value("${socketio.allowCustomRequests}") private boolean allowCustomRequests; @Value("${socketio.upgradeTimeout}") private int upgradeTimeout; @Value("${socketio.pingTimeout}") private int pingTimeout; @Value("${socketio.pingInterval}") private int pingInterval; /** * 在config中new一个socket * @return */ @Bean public SocketIOServer socketIOServer() { SocketConfig socketConfig = new SocketConfig(); //开启Nagle算法,提高较慢的广域网传输效率,减小小分组的报文个数 socketConfig.setTcpNoDelay(true); //不管当前有没有还未发送给对方的数据就直接关闭链接 socketConfig.setSoLinger(0); com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); config.setSocketConfig(socketConfig); config.setHostname(host); config.setPort(port); config.setBossThreads(bossCount); config.setWorkerThreads(workCount); config.setAllowCustomRequests(allowCustomRequests); config.setUpgradeTimeout(upgradeTimeout); config.setPingTimeout(pingTimeout); config.setPingInterval(pingInterval); return new SocketIOServer(config); } }
可以看到我们读取了8个属性,所以下面我们需要在application.yml文件中创建相关的配置
# netty-socketio 配置 socketio: host: 127.0.0.1 port: 8888 # 设置最大每帧处理数据的长度,防止他人利用大数据来攻击服务器 maxFramePayloadLength: 1048576 # 设置http交互最大内容长度 maxHttpContentLength: 1048576 # socket连接数大小(如只监听一个端口boss线程组为1即可) bossCount: 1 workCount: 100 allowCustomRequests: true # 协议升级超时时间(毫秒),默认10秒。HTTP握手升级为ws协议超时时间 upgradeTimeout: 1000000 # Ping消息超时时间(毫秒),默认60秒,这个时间间隔内没有接收到心跳消息就会发送超时事件 pingTimeout: 6000000 # Ping消息间隔(毫秒),默认25秒。客户端向服务器发送一条心跳消息间隔 pingInterval: 25000
三、创建soket的开启,连接监听
这里我们需要初始化把socket启动起来,并且开启socket的监听,因此首先需要一个interface,完整代码如下:
package com.socket.demo.service; public interface ISocketIOService { /** * 启动服务 */ void start(); /** * 停止服务 */ void stop(); /** * 推送信息给指定客户端 * * @param userId: 客户端唯一标识 * @param msgContent: 消息内容 */ void pushMessageToUser(String userId, String msgContent); }
这里我们有3个方法,一个是启动服务,一个是停止服务,一个是给某个客户端发送消息。接下来我们需要实现下这个接口,也就是实现上诉的3个方法
package com.socket.demo.service.impl; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOServer; import com.socket.demo.service.ISocketIOService; import lombok.extern.slf4j.Slf4j; @Service @Slf4j public class SocketIOServiceImpl implements ISocketIOService { /** * 存放已连接的客户端 */ private static Map<String, SocketIOClient> clientMap = new ConcurrentHashMap<>(); /** * 自定义事件`push_data_event`,用于服务端与客户端通信 */ private static final String PUSH_DATA_EVENT = "push_data_event"; @Autowired private SocketIOServer socketIOServer; /** * Spring IoC容器创建之后,在加载SocketIOServiceImpl Bean之后启动 */ @PostConstruct private void autoStartup() { start(); } /** * Spring IoC容器在销毁SocketIOServiceImpl Bean之前关闭,避免重启项目服务端口占用问题 */ @PreDestroy private void autoStop() { stop(); } @Override public void start() { // 监听客户端连接 socketIOServer.addConnectListener(client -> { log.info("************ 客户端: " + getIpByClient(client) + " 已连接 ************"); // 自定义事件`connected` -> 与客户端通信 (也可以使用内置事件,如:Socket.EVENT_CONNECT) client.sendEvent("connected", "你成功连接上了哦..."); String userId = getParamsByClient(client); if (userId != null) { clientMap.put(userId, client); } }); // 监听客户端断开连接 socketIOServer.addDisconnectListener(client -> { String clientIp = getIpByClient(client); log.info(clientIp + " *********************** " + "客户端已断开连接"); String userId = getParamsByClient(client); if (userId != null) { clientMap.remove(userId); client.disconnect(); } }); // 自定义事件`client_info_event` -> 监听客户端消息 socketIOServer.addEventListener(PUSH_DATA_EVENT, String.class, (client, data, ackSender) -> { // 客户端推送`client_info_event`事件时,onData接受数据,这里是string类型的json数据,还可以为Byte[],object其他类型 String clientIp = getIpByClient(client); log.info(clientIp + " ************ 客户端:" + data); }); // 启动服务 socketIOServer.start(); } @Override public void stop() { if (socketIOServer != null) { socketIOServer.stop(); socketIOServer = null; } } @Override public void pushMessageToUser(String userId, String msgContent) { SocketIOClient client = clientMap.get(userId); if (client != null) { client.sendEvent(PUSH_DATA_EVENT, msgContent); } } /** * 获取客户端url中的userId参数(这里根据个人需求和客户端对应修改即可) * * @param client: 客户端 * @return: java.lang.String */ private String getParamsByClient(SocketIOClient client) { String userId = null; // 获取客户端url参数(这里的userId是唯一标识) Map<String, List<String>> params = client.getHandshakeData().getUrlParams(); List<String> userIdList = params.get("userId"); if (!CollectionUtils.isEmpty(userIdList)) { userId = userIdList.get(0); } log.info("获取到的当前用户id是:{} ", userId); return userId; } /** * 获取连接的客户端ip地址 * * @param client: 客户端 * @return: java.lang.String */ private String getIpByClient(SocketIOClient client) { String sa = client.getRemoteAddress().toString(); String clientIp = sa.substring(1, sa.indexOf(":")); return clientIp; } }
此时我们完整的服务端就编写完成了,接着我们启动下服务端,查看日志
可以看到启动了两个端口,分别是8888(socket的端口)和8081(springboot的web端口)。到此我们的服务端就配置好了。
接着我们编写一个客户端来连接我们的服务端,客户端的代码如下:
package com.client.test; import io.socket.client.IO; import io.socket.client.Socket; import lombok.extern.slf4j.Slf4j; @Slf4j public class SocketIOClientTest{ public static void main(String[] args) { // 服务端socket.io连接通信地址 String url = "http://127.0.0.1:8888?userId=1"; try { IO.Options options = new IO.Options(); options.transports = new String[] { "websocket" }; options.reconnectionAttempts = 2; // 失败重连的时间间隔 options.reconnectionDelay = 1000; // 连接超时时间(ms) options.timeout = 500; // userId: 唯一标识 传给服务端存储 final Socket socket = IO.socket(url, options); socket.on(Socket.EVENT_CONNECT, args1 -> socket.send("hello...")); // 自定义事件`connected` -> 接收服务端成功连接消息 socket.on("connected", objects -> log.info("服务端:" + objects[0].toString())); // 自定义事件`push_data_event` -> 接收服务端消息 socket.on("push_data_event", objects -> log.info("服务端:" + objects[0].toString())); //测试发送一条消息给服务端 socket.emit("push_data_event", "客户端主动发送消息"); socket.connect(); } catch (Exception e) { log.error(e.getMessage(),e); } } }
我们再启动下客户端,可以看到客户端的日志为:你已经连接上了
然后我们再来看看服务器端的日志:
服务端会显示某个客户端已经连接上了,并且我们在客户端的url中添加了userid,服务端连接之后能获取到userid。还有我们在客户端里面发送了一条测试消息,也能看到我们服务端收到了。
以上我们完整的socket服务端和客户端都已经完整的实现了。但是在真实业务中,我们经常会遇到服务端收到一条消息后,做了相关的逻辑处理之后,需要响应给客户端,所以这块相对于业务来说是还不完善的,因此我们在服务端里面添加一个接口,即主动给客户端发消息,完整代码如下:
package com.socket.demo.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import com.socket.demo.service.ISocketIOService; import lombok.extern.slf4j.Slf4j; @RestController @RequestMapping("/api/socket") @Slf4j public class SocketIOController { @Autowired private ISocketIOService socketIOService; @PostMapping(value = "/pushMessageToUser") public String pushMessageToUser(@RequestParam String userId, @RequestParam String msgContent) { log.info("准备推送给用户:{},的消息是: {}", userId, msgContent); socketIOService.pushMessageToUser(userId, msgContent); return "ok"; } }
然后我们再启动服务端,调用 http://${ip}:8081/api/socket/pushMessageToUser
我们的客户端的userid是1,所以这里我们给用户为1的用户发送消息
这里用postman测试完全没有问题,我们回到客户端控制台查看客户端的日志
可以看到客户端收到了服务端发送的消息,所以这时候客户端可以根据服务端的返回信息进行相关的处理。
再来一个问题,这时候启动的客户端的userid是1,我们如果给userid为2的客户端发消息,userid为1的客户端能不能收到消息呢?
可以看到postman是请求成功的,然后我们看看服务端的日志:
看到有消息推送给客户2,我们再去userid为1的客户端看日志:
可以看到userid为1的客户端是没有收到消息的,因此不会出现乱收消息的情况。
以上就是完整的socket服务端和客户端的代码及测试情况。
备注:
1、在实际的情况中我们还会涉及到服务端对所有的客户端发送广播消息,这时候使用的代码如下:
socketIOServer.getBroadcastOperations().sendEvent("broadcast", "广播消息");
对于广播来说,根据实际情况使用就可。
最后按照惯例附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...