前面我们定义了二进制的通信协议,这篇文章我们我构建一个rpcserver,也就是定义服务器,同时通信协议使用我们自定义的。下面直接开始:
首先我们定义一个服务端的接口,也就是RPCServer
package org.wz.rpc.framework.server;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* RPC服务端抽象类
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public abstract class RpcServer {
/**
* 服务端口
*/
protected int port;
/**
* 服务协议
*/
protected String protocol;
/**
* 请求处理者
*/
protected RequestHandler handler;
/**
* 开启服务
*/
public abstract void start();
/**
* 停止服务
*/
public abstract void stop();
}这里主要有几个模块,分别是:
定义服务端启动的端口 客户端与服务端的通信协议 请求处理者如何处理 启动服务 停止服务
这里的请求处理者我们主要做以下的事情:
1、解组消息 2、查找服务对象 3、编组响应消息
上面三个处理事务就是服务端需要做的事情。所以这里我们的RequestHandler的示例代码如下:
package org.wz.rpc.framework.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.wz.rpc.framework.model.ServiceObject;
import org.wz.rpc.framework.protocol.MessageProtocol;
import org.wz.rpc.framework.protocol.MessageRequest;
import org.wz.rpc.framework.protocol.MessageResponse;
import org.wz.rpc.framework.protocol.MessageStatus;
import org.wz.rpc.framework.register.ServiceRegister;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
/**
* 请求处理者,提供解组请求、编组响应等操作
*
*/
@Slf4j
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class RequestHandler {
private MessageProtocol protocol;
private ServiceRegister serviceRegister;
public byte[] handleRequest(byte[] data) throws Exception {
// 1、解组消息
MessageRequest req = this.protocol.unmarshallingRequest(data);
// 2、查找服务对象
ServiceObject so = this.serviceRegister.getServiceObject(req.getServiceName());
MessageResponse rsp = null;
if (so == null) {
rsp = new MessageResponse(MessageStatus.NOT_FOUND, null, rsp, null);
} else {
// 3、反射调用对应的过程方法
try {
Method m = so.getClazz().getMethod(req.getMethod(), req.getParameterTypes());
Object returnValue = m.invoke(so.getObj(), req.getParameters());
rsp = new MessageResponse(MessageStatus.SUCCESS, null, returnValue, null);
rsp.setReturnValue(returnValue);
} catch (Exception e) {
log.error(e.getMessage(),e);
rsp = new MessageResponse(MessageStatus.ERROR, null, e, e);
rsp.setException(e);
}
}
// 4、编组响应消息
return this.protocol.marshallingResponse(rsp);
}
}上面我们定义了RPCServer的接口,那么我们就要来实现下这个RPCServer,所以这里我们使用netty来实现这个RPCServer。具体的实现代码如下:
package org.rpc.framework.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
/**
* Netty RPC服务端,提供Netty网络服务开启、关闭的能力
*
*/
@Slf4j
public class NettyRpcServer extends RpcServer {
private Channel channel;
public NettyRpcServer(int port, String protocol, RequestHandler handler) {
super(port, protocol, handler);
}
@Override
public void start() {
// 配置服务器
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(new ChannelRequestHandler());
}
});
// 启动服务
ChannelFuture f = b.bind(port).sync();
log.info("Server started successfully.");
channel = f.channel();
// 等待服务通道关闭
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 释放线程组资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
@Override
public void stop() {
this.channel.close();
}
private class ChannelRequestHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("Channel active:{}", ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("The server receives a message: {}", msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] req = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(req);
byte[] res = handler.handleRequest(req);
log.info("Send response:{}", msg);
ByteBuf respBuf = Unpooled.buffer(res.length);
respBuf.writeBytes(res);
ctx.write(respBuf);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(), cause);
ctx.close();
}
}
}以上我们就完成了一个rpcserver的代码编写。目前整个项目的代码示例图如下:
最后按照惯例,附上本案例的源码,登录后即可下载。










还没有评论,来说两句吧...