前面我们定义了二进制的通信协议,这篇文章我们我构建一个rpcserver,也就是定义服务器,同时通信协议使用我们自定义的。下面直接开始:
首先我们定义一个服务端的接口,也就是RPCServer
package org.wz.rpc.framework.server; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import lombok.ToString; /** * RPC服务端抽象类 * */ @Data @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode @ToString public abstract class RpcServer { /** * 服务端口 */ protected int port; /** * 服务协议 */ protected String protocol; /** * 请求处理者 */ protected RequestHandler handler; /** * 开启服务 */ public abstract void start(); /** * 停止服务 */ public abstract void stop(); }
这里主要有几个模块,分别是:
定义服务端启动的端口 客户端与服务端的通信协议 请求处理者如何处理 启动服务 停止服务
这里的请求处理者我们主要做以下的事情:
1、解组消息 2、查找服务对象 3、编组响应消息
上面三个处理事务就是服务端需要做的事情。所以这里我们的RequestHandler的示例代码如下:
package org.wz.rpc.framework.server; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import org.wz.rpc.framework.model.ServiceObject; import org.wz.rpc.framework.protocol.MessageProtocol; import org.wz.rpc.framework.protocol.MessageRequest; import org.wz.rpc.framework.protocol.MessageResponse; import org.wz.rpc.framework.protocol.MessageStatus; import org.wz.rpc.framework.register.ServiceRegister; import lombok.AllArgsConstructor; import lombok.Data; import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; /** * 请求处理者,提供解组请求、编组响应等操作 * */ @Slf4j @Data @NoArgsConstructor @AllArgsConstructor @EqualsAndHashCode @ToString public class RequestHandler { private MessageProtocol protocol; private ServiceRegister serviceRegister; public byte[] handleRequest(byte[] data) throws Exception { // 1、解组消息 MessageRequest req = this.protocol.unmarshallingRequest(data); // 2、查找服务对象 ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName()); MessageResponse rsp = null; if (so == null) { rsp = new MessageResponse(MessageStatus.NOT_FOUND, null, rsp, null); } else { // 3、反射调用对应的过程方法 try { Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes()); Object returnValue = m.invoke(so.getObj(), req.getParameters()); rsp = new MessageResponse(MessageStatus.SUCCESS, null, returnValue, null); rsp.setReturnValue(returnValue); } catch (Exception e) { log.error(e.getMessage(),e); rsp = new MessageResponse(MessageStatus.ERROR, null, e, e); rsp.setException(e); } } // 4、编组响应消息 return this.protocol.marshallingResponse(rsp); } }
上面我们定义了RPCServer的接口,那么我们就要来实现下这个RPCServer,所以这里我们使用netty来实现这个RPCServer。具体的实现代码如下:
package org.rpc.framework.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import lombok.extern.slf4j.Slf4j; /** * Netty RPC服务端,提供Netty网络服务开启、关闭的能力 * */ @Slf4j public class NettyRpcServer extends RpcServer { private Channel channel; public NettyRpcServer(int port, String protocol, RequestHandler handler) { super(port, protocol, handler); } @Override public void start() { // 配置服务器 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(new ChannelRequestHandler()); } }); // 启动服务 ChannelFuture f = b.bind(port).sync(); log.info("Server started successfully."); channel = f.channel(); // 等待服务通道关闭 f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 释放线程组资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } @Override public void stop() { this.channel.close(); } private class ChannelRequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { log.info("Channel active:{}", ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("The server receives a message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] req = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(req); byte[] res = handler.handleRequest(req); log.info("Send response:{}", msg); ByteBuf respBuf = Unpooled.buffer(res.length); respBuf.writeBytes(res); ctx.write(respBuf); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(cause.getMessage(), cause); ctx.close(); } } }
以上我们就完成了一个rpcserver的代码编写。目前整个项目的代码示例图如下:
最后按照惯例,附上本案例的源码,登录后即可下载。
还没有评论,来说两句吧...