在前面的文章里面,我们已经实现了一个服务端的定义及功能运行。这里的话我们就要开启客户端模块的编写,首先大家想想,客户端连接服务器需要做什么?
那肯定是需要这几个基础条件:
1、定义一个客户端。 2、定义一个客户端的请求协议
所以这里的话,我们来实现下这两个需求。
一、首先我们需要定义一个客户端,这里我们的服务器端是通过netty定义的,所以这里我们客户端也是用netty来定义。
package org.wz.rpc.framework.client; import org.wz.rpc.framework.model.ZookeeperMetadataData; /** * 网络请求客户端,定义网络请求规范 * */ public interface NetClient { byte[] sendRequest(byte[] data, ZookeeperMetadataData zookeeperMetadataData) throws InterruptedException; }
这里我们定义的客户端,相当于定义一个httpclient,让我们通过这个client来发送消息给服务器端。所以我们需要来实现下这个client的具体逻辑,再实现里面,我们主要做以下几个事情:
获取服务端的元数据信息 向服务端发送请求获取response
所以这里我们的实现代码如下:
package org.wz.rpc.framework.client; import org.wz.rpc.framework.handler.SendHandler; import org.wz.rpc.framework.model.ZookeeperMetadataData; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; /** * Netty网络请求客户端,定义通过Netty实现网络请求的细则。 * */ @Slf4j public class NettyNetClient implements NetClient { /** * 发送请求 * * @param data 请求数据 * @param service 服务信息 * @return 响应数据 * @throws InterruptedException 异常 */ @Override public byte[] sendRequest(byte[] data, ZookeeperMetadataData zookeeperMetadataData) throws InterruptedException { String[] addInfoArray = zookeeperMetadataData.getAddress().split(":"); String serverAddress = addInfoArray[0]; String serverPort = addInfoArray[1]; SendHandler sendHandler = new SendHandler(data); byte[] respData; // 配置客户端 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ChannelPipeline p = ch.pipeline(); p.addLast(sendHandler); } }); // 启动客户端连接 b.connect(serverAddress, Integer.parseInt(serverPort)).sync(); respData = (byte[]) sendHandler.rspData(); log.info("SendRequest get reply: {}", respData); } finally { // 释放线程组资源 group.shutdownGracefully(); } return respData; } }
这里我们编写了一个SendHandler来处理具体的发送请求,代码示例如下:
package org.wz.rpc.framework.handler; import java.util.concurrent.CountDownLatch; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; /** * 发送处理类,定义Netty入站处理细则 * * @author 东方雨倾 * @since 1.0.0 */ @Slf4j public class SendHandler extends ChannelInboundHandlerAdapter { private CountDownLatch cdl; private Object readMsg = null; private byte[] data; public SendHandler(byte[] data) { cdl = new CountDownLatch(1); this.data = data; } /** * 当连接服务端成功后,发送请求数据 * * @param ctx 通道上下文 */ @Override public void channelActive(ChannelHandlerContext ctx) { log.info("Successful connection to server:{}", ctx); ByteBuf reqBuf = Unpooled.buffer(data.length); reqBuf.writeBytes(data); log.info("Client sends message:{}", reqBuf); ctx.writeAndFlush(reqBuf); } /** * 读取数据,数据读取完毕释放CD锁 * * @param ctx 上下文 * @param msg ByteBuf */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { log.info("Client reads message: {}", msg); ByteBuf msgBuf = (ByteBuf) msg; byte[] resp = new byte[msgBuf.readableBytes()]; msgBuf.readBytes(resp); readMsg = resp; cdl.countDown(); } /** * 等待读取数据完成 * * @return 响应数据 * @throws InterruptedException 异常 */ public Object rspData() throws InterruptedException { cdl.await(); return readMsg; } @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { log.error(cause.getMessage(),cause); ctx.close(); } }
以上3个代码逻辑,我们就完成了网络客户端的定义,现在的项目结构示例图如下:
最后按照惯例,附上本案例的源码,登录后即可下载:
还没有评论,来说两句吧...