在前面的文章里面,我们已经实现了一个服务端的定义及功能运行。这里的话我们就要开启客户端模块的编写,首先大家想想,客户端连接服务器需要做什么?
那肯定是需要这几个基础条件:
1、定义一个客户端。 2、定义一个客户端的请求协议
所以这里的话,我们来实现下这两个需求。
一、首先我们需要定义一个客户端,这里我们的服务器端是通过netty定义的,所以这里我们客户端也是用netty来定义。
package org.wz.rpc.framework.client;
import org.wz.rpc.framework.model.ZookeeperMetadataData;
/**
* 网络请求客户端,定义网络请求规范
*
*/
public interface NetClient {
byte[] sendRequest(byte[] data, ZookeeperMetadataData zookeeperMetadataData) throws InterruptedException;
}这里我们定义的客户端,相当于定义一个httpclient,让我们通过这个client来发送消息给服务器端。所以我们需要来实现下这个client的具体逻辑,再实现里面,我们主要做以下几个事情:
获取服务端的元数据信息 向服务端发送请求获取response
所以这里我们的实现代码如下:
package org.wz.rpc.framework.client;
import org.wz.rpc.framework.handler.SendHandler;
import org.wz.rpc.framework.model.ZookeeperMetadataData;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
/**
* Netty网络请求客户端,定义通过Netty实现网络请求的细则。
*
*/
@Slf4j
public class NettyNetClient implements NetClient {
/**
* 发送请求
*
* @param data 请求数据
* @param service 服务信息
* @return 响应数据
* @throws InterruptedException 异常
*/
@Override
public byte[] sendRequest(byte[] data, ZookeeperMetadataData zookeeperMetadataData) throws InterruptedException {
String[] addInfoArray = zookeeperMetadataData.getAddress().split(":");
String serverAddress = addInfoArray[0];
String serverPort = addInfoArray[1];
SendHandler sendHandler = new SendHandler(data);
byte[] respData;
// 配置客户端
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast(sendHandler);
}
});
// 启动客户端连接
b.connect(serverAddress, Integer.parseInt(serverPort)).sync();
respData = (byte[]) sendHandler.rspData();
log.info("SendRequest get reply: {}", respData);
} finally {
// 释放线程组资源
group.shutdownGracefully();
}
return respData;
}
}这里我们编写了一个SendHandler来处理具体的发送请求,代码示例如下:
package org.wz.rpc.framework.handler;
import java.util.concurrent.CountDownLatch;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
* 发送处理类,定义Netty入站处理细则
*
* @author 东方雨倾
* @since 1.0.0
*/
@Slf4j
public class SendHandler extends ChannelInboundHandlerAdapter {
private CountDownLatch cdl;
private Object readMsg = null;
private byte[] data;
public SendHandler(byte[] data) {
cdl = new CountDownLatch(1);
this.data = data;
}
/**
* 当连接服务端成功后,发送请求数据
*
* @param ctx 通道上下文
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("Successful connection to server:{}", ctx);
ByteBuf reqBuf = Unpooled.buffer(data.length);
reqBuf.writeBytes(data);
log.info("Client sends message:{}", reqBuf);
ctx.writeAndFlush(reqBuf);
}
/**
* 读取数据,数据读取完毕释放CD锁
*
* @param ctx 上下文
* @param msg ByteBuf
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
log.info("Client reads message: {}", msg);
ByteBuf msgBuf = (ByteBuf) msg;
byte[] resp = new byte[msgBuf.readableBytes()];
msgBuf.readBytes(resp);
readMsg = resp;
cdl.countDown();
}
/**
* 等待读取数据完成
*
* @return 响应数据
* @throws InterruptedException 异常
*/
public Object rspData() throws InterruptedException {
cdl.await();
return readMsg;
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error(cause.getMessage(),cause);
ctx.close();
}
}以上3个代码逻辑,我们就完成了网络客户端的定义,现在的项目结构示例图如下:
最后按照惯例,附上本案例的源码,登录后即可下载:










还没有评论,来说两句吧...