- 组件篇值RPC(上)
基础架构
暂时无法在文档外展示此内容
基础架构之组件篇
组件篇
- RPC
- 注册中心
- Zookeeper
- 配置中心
- Nacos
- 消息队列
- 事务消息
- 延时消息
- Spring
组件篇之RPC(上)
01.RPC原理分析
理解RPC - Remote Procedure Call
- 远程过程调用
- 基于网络表达语义和传达数据
- 通信协议
- 像调用本地方法调用远程服务
- 扩展了算力
- 服务治理的基础
RPC作用 - 屏蔽组包/解包
- 屏蔽数据发送/接收
- 提高开发效率
- 业务发展的必然产物
RPC核心组成 - 远程方法对象代理
- 连接管理
- 序列化/反序列化
- 寻址与负载均衡
RPC调用方式 - 同步调用
- 异步调用
RPC调用过程
暂时无法在文档外展示此内容
02.精简版RPC实现
假如没有RPC : 如果没有RPC框架支持,实现远程调用需要做哪些事情?
Client 端
建立与Server的连接
组装数据
发送数据包
接收处理结果数据包
解析返回数据包
Server 端
监听端口
响应连接请求
接收数据包
解析数据包,调用相应方法
组装请求处理结果数据包
发送结果数据包
设计“用户”服务
功能需求:用户信息管理—CRUD
调用方式:TCP长连接同步交互
协议:自定义协议
接口设计
注册: bool addUser(User user)
更新: bool updateUser(long uid, User user)
注销: bool deleteUser(long uid)
查询: User Info getUser(long ui)
序列化协议
远程调用涉及数据的传输,就会涉及组包和解包,需要调用方和服务方约定数据格式——序列化协议
暂时无法在文档外展示此内容
package com.naixue.client.protocol;
import com.naixue.client.entity.User;
import com.naixue.util.ByteConverter;
import java.io.*;
public class RpcProtocol implements Serializable{
public static int CMD_CREATE_USER = 1;private int version;private int cmd;private int magicNum;private int bodyLen = 0;private byte[] body;final public static int HEAD_LEN = 16;public byte[] getBody() {return body;}public RpcProtocol setBody(byte[] body) {this.body = body;return this;}public int getVersion() {return version;}public RpcProtocol setVersion(int version) {this.version = version;return this;}public int getCmd() {return cmd;}public RpcProtocol setCmd(int cmd) {this.cmd = cmd;return this;}public int getMagicNum() {return magicNum;}public RpcProtocol setMagicNum(int magicNum) {this.magicNum = magicNum;return this;}public int getBodyLen() {return bodyLen;}public RpcProtocol setBodyLen(int bodyLen) {this.bodyLen = bodyLen;return this;}public byte[] generateByteArray(){byte[] data = new byte[HEAD_LEN + bodyLen];int index = 0;System.arraycopy(ByteConverter.intToBytes(version), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(ByteConverter.intToBytes(cmd), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(ByteConverter.intToBytes(magicNum), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(ByteConverter.intToBytes(bodyLen), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(body, 0, data, index, body.length);return data;}public RpcProtocol byteArrayToRpcHeader(byte[] data){int index = 0;this.setVersion(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.setCmd(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.setMagicNum(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.setBodyLen(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.body = new byte[this.bodyLen];System.arraycopy(data, index, this.body, 0, this.bodyLen);return this;}public User byteArrayToUserInfo(byte[] data){User user = new User();int index = 0;user.setUid(ByteConverter.bytesToLong(data, index));index += Long.BYTES;user.setAge(ByteConverter.bytesToShort(data, index));index += Short.BYTES;user.setSex(ByteConverter.bytesToShort(data, index));index += Short.BYTES;return user;}public byte[] userInfoTobyteArray(User info){byte[] data = new byte[Long.BYTES + Short.BYTES + Short.BYTES];int index = 0;System.arraycopy(ByteConverter.longToBytes(info.getUid()), 0, data, index, Long.BYTES);index += Long.BYTES;System.arraycopy(ByteConverter.shortToBytes(info.getAge()), 0, data, index, Short.BYTES);index += Short.BYTES;System.arraycopy(ByteConverter.shortToBytes(info.getSex()), 0, data, index, Short.BYTES);return data;}public static Object bytes2Object(byte[] objBytes) throws Exception {if (objBytes == null || objBytes.length == 0) {return null;}ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);ObjectInputStream oi = new ObjectInputStream(bi);Object obj = oi.readObject();bi.close();oi.close();return obj;}public static byte[] object2Bytes(Serializable obj) throws Exception {if (obj == null) {return null;}ByteArrayOutputStream bo = new ByteArrayOutputStream();ObjectOutputStream oo = new ObjectOutputStream(bo);oo.writeObject(obj);bo.close();oo.close();return bo.toByteArray();}public byte[] createUserRespTobyteArray(int result){byte[] data = new byte[Integer.BYTES];int index = 0;System.arraycopy(ByteConverter.intToBytes(result), 0, data, index, Integer.BYTES);return data;}
}
暂时无法在文档外展示此内容
public static int CMD_CREATE_USER = 1;
private int version;// = 1
private int cmd;// = 0
private int magicNum; // = 0x20191009
private int bodyLen = 0;// = 12
private byte[] body;
final public static int HEAD_LEN = 16;
暂时无法在文档外展示此内容
Consumer代码实现
- 创建代理类
- 构造请求数据
- 执行远程调用
package com.naixue;
import com.naixue.client.entity.User;
import com.naixue.client.service.UserService;
/**
Created by chendong on 2019/9/3.
*/
public class RpcClient {
public static void main(String[] args) throws Exception {
UserService proxyUserService = new UserService();User user = new User(); user.setAge((short) 26); user.setSex((short) 1); int ret = proxyUserService.addUser(user); if(ret == 0) System.out.println("调用远程服务创建用户成功!!!"); else System.out.println("调用远程服务创建用户失败!!!");
}
}
addUser :
package com.naixue.client.entity;
import java.io.Serializable;
/**
Created by chendong on 2019/9/3.
*/
public class User implements Serializable {
private long uid;
private short age;
private short sex;public long getUid() {
return uid;
}public User setUid(long uid) {
this.uid = uid;
return this;
}public short getAge() {
return age;
}public User setAge(short age) {
this.age = age;
return this;
}public short getSex() {
return sex;
}public User setSex(short sex) {
this.sex = sex;
return this;
}
}
package com.naixue.client.connect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
public class TcpClient {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private static int MAX_PACKAGE_SIZE = 1024 * 4;
private static String SERVER_IP = “127.0.0.1”;
private static int SERVER_PORT = 58885;
private static TcpClient instance = null;
private boolean isInit = false;//private ChannelFuture channelFuture = null;SocketChannel client = null;private final static int CONNECT_TIMEOUT_MILLIS = 2000;//private Bootstrap bootstrap = new Bootstrap();public TcpClient() {}public static TcpClient GetInstance() {if (instance == null) {instance = new TcpClient();}return instance;}public void init() throws Exception{if(!isInit) {client = SocketChannel.open(new InetSocketAddress(SERVER_IP, SERVER_PORT));client.configureBlocking(true);}isInit = true;}public boolean sendData(byte[] data){ByteBuffer byteBuffer = ByteBuffer.wrap(data);byteBuffer.put(data);byteBuffer.flip();int ret = 0;try {ret = client.write(byteBuffer);} catch (IOException e) {e.printStackTrace();return false;}return true;}public byte[] recvData(){ByteBuffer byteBuffer = ByteBuffer.allocate(MAX_PACKAGE_SIZE);try {int rs = client.read(byteBuffer);byte[] bytes = new byte[rs];byteBuffer.flip();byteBuffer.get(bytes);return bytes;} catch (IOException e) {e.printStackTrace();}return null;}
}
package com.naixue.client.service;
import com.naixue.client.connect.TcpClient;
import com.naixue.client.entity.User;
import com.naixue.client.protocol.RpcProtocol;
import com.naixue.util.ByteConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class UserService {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public int addUser (User userinfo) throws Exception {//初始化客户端连接TcpClient client = TcpClient.GetInstance();try {client.init();} catch (Exception e) {e.printStackTrace();logger.error("init rpc client error");}//构造请求数据组装协议数据 RpcProtocol rpcReq = new RpcProtocol();rpcReq.setCmd(RpcProtocol.CMD_CREATE_USER);rpcReq.setVersion(0x01);rpcReq.setMagicNum(0x20110711);byte[] body = rpcReq.userInfoTobyteArray(userinfo);rpcReq.setBodyLen(body.length);rpcReq.setBody(body);//序列化序列化数据 byte[] reqData = rpcReq.generateByteArray();//发送请求发送请求等待返回 client.sendData(reqData);//接收请求结果byte[] recvData = client.recvData();//反序列化结果 反序列化返回数据 RpcProtocol rpcResp = new RpcProtocol();rpcResp.byteArrayToRpcHeader(recvData);int ret = ByteConverter.bytesToInt(rpcResp.getBody(), 0);return ret;}
}
序列化/反序列化
/**
- @ClassName: ${ByteConverter}
- @Description: ${}
- @author ${wangzongsheng}
- @version V1.0
- @Date ${2019-01-14}
*/
package com.naixue.util;
public class ByteConverter {
/** * @param buf * @return */public static short bytesToShort(byte[] buf) {return (short) (buf[0] & 0xff | ((buf[1] << 8) & 0xff00));}/** * @param buf * @return */public static int bytesToIntBigEndian(byte[] buf) {return buf[0] & 0xff | ((buf[1] << 8) & 0xff00)| ((buf[2] << 16) & 0xff0000) | ((buf[3] << 24) & 0xff000000);}/** * byte array to int * * @param buf * @return */public static long bytesToLong(byte[] buf) {return (long) buf[0] & 0xffl| (((long) buf[1] << 8) & 0xff00l)| (((long) buf[2] << 16) & 0xff0000l)| (((long) buf[3] << 24) & 0xff000000l)| (((long) buf[4] << 32) & 0xff00000000l)| (((long) buf[5] << 40) & 0xff0000000000l)| (((long) buf[6] << 48) & 0xff000000000000l)| (((long) buf[7] << 56) & 0xff00000000000000l);}public static byte[] shortToBytes(short n) {byte[] buf = new byte[2];for (int i = 0; i > (8 * i));}return buf;}/** * int to byte array * * @param n * @return */public static byte[] intToBytes(int n) {byte[] buf = new byte[4];for (int i = 0; i > (8 * i));}return buf;}public static byte[] longToBytes(long n) {byte[] buf = new byte[8];for (int i = 0; i > (8 * i));}return buf;}public static short bytesToShort(byte[] buf, int offset) {return (short) (buf[offset] & 0xff | ((buf[offset + 1] << 8) & 0xff00));}public static int bytesToInt(byte[] buf, int offset) {return buf[offset] & 0xff| ((buf[offset + 1] << 8) & 0xff00)| ((buf[offset + 2] << 16) & 0xff0000)| ((buf[offset + 3] << 24) & 0xff000000);}public static long bytesToLong(byte[] buf, int offset) {return (long) buf[offset] & 0xffl| (((long) buf[offset + 1] << 8) & 0xff00l)| (((long) buf[offset + 2] << 16) & 0xff0000l)| (((long) buf[offset + 3] << 24) & 0xff000000l)| (((long) buf[offset + 4] << 32) & 0xff00000000l)| (((long) buf[offset + 5] << 40) & 0xff0000000000l)| (((long) buf[offset + 6] << 48) & 0xff000000000000l)| (((long) buf[offset + 7] << 56) & 0xff00000000000000l);}
}
序列化过程
- 序列化请求参数到body
- 序列化RpcProtocol
反序列化过程 - 反序列化RpcProtocol
- 反序列化body
package com.naixue.util;
import com.naixue.client.protocol.RpcProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class PkgDecoder extends ByteToMessageDecoder
{
private Logger logger = LoggerFactory.getLogger(PkgDecoder.class);
public PkgDecoder(){ }@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List
}
/**
- @ClassName: ${PkgEncoder}
- @Description: ${tcp编码器}
- @author ${wangzongsheng}
- @version V1.0
- @Date ${2019-01-14}
*/
package com.naixue.util;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class PkgEncoder extends MessageToByteEncoder
{
public PkgEncoder()
{
}
@Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception{try {//在这之前可以实现编码工作。out.writeBytes((byte[])msg);}finally {}}
}
03.RPC服务消费方核心设计
RPC功能
- RPC基础功能
- 数据传输
- 序列化/反序列化
- 客户端代理类
- 请求映射分发
- RPC产品功能
Consumer
连接管理
负载均衡
请求路由
超时处理
Provider
队列/线程池
超时丢弃
优雅关闭
过载保护
连接管理
保持与服务提供方长连接,用于传输请求数据和返回结果。
暂时无法在文档外展示此内容
初始化时机
饿汉模式
懒汉模式
连接数维护
服务连接池
数据库连接池
思考:两类连接有什么本质区别?
心跳/断线重连
客户端线程模型
package com.naixue.server.connect;
import java.net.InetSocketAddress;
import com.naixue.server.entity.User;
import com.naixue.server.protocol.RpcProtocol;
import com.naixue.server.server.UserService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ServerHandler extends ChannelInboundHandlerAdapter {
private Logger logger = LoggerFactory.getLogger(ServerHandler.class);
private static int CMD_CREATE_USER = 1;private static int CMD_FIND_USER = 2;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel ch = ctx.channel();InetSocketAddress socketAddress = (InetSocketAddress) ch.remoteAddress();String clientIp = socketAddress.getAddress().getHostAddress();logger.info("client connect to rpc server, client's ip is: " + clientIp);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel ch = ctx.channel();InetSocketAddress socketAddress = (InetSocketAddress) ch.remoteAddress();String clientIp = socketAddress.getAddress().getHostAddress();logger.info("client close the connection to rpc server, client's ip is: " + clientIp);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {byte[] recvData = (byte[]) msg;if (recvData.length == 0) {logger.warn("receive request from client, but the data length is 0");return;}logger.info("receive request from client, the data length is: " + recvData.length);//反序列化请求数据RpcProtocol rpcReq = new RpcProtocol();rpcReq.byteArrayToRpcHeader(recvData);if(rpcReq.getMagicNum() != RpcProtocol.CONST_CMD_MAGIC){logger.warn("request msgic code error");return;}//解析请求,并调用处理方法int ret = -1;if(rpcReq.getCmd() == CMD_CREATE_USER){User user = rpcReq.byteArrayToUserInfo(rpcReq.getBody());UserService userService = new UserService();ret = userService.addUser(user);//构造返回数据RpcProtocol rpcResp = new RpcProtocol();rpcResp.setCmd(rpcReq.getCmd());rpcResp.setVersion(rpcReq.getVersion());rpcResp.setMagicNum(rpcReq.getMagicNum());rpcResp.setBodyLen(Integer.BYTES);byte[] body = rpcResp.createUserRespTobyteArray(ret);rpcResp.setBody(body);ByteBuf respData = Unpooled.copiedBuffer(rpcResp.generateByteArray());ctx.channel().writeAndFlush(respData);}}
}
package com.naixue.server.connect;
import com.naixue.util.PkgDecoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.ChannelInitializer;
import org.slf4j.LoggerFactory;
import org.slf4j.Logger;
public class TcpServer {
private Logger logger = LoggerFactory.getLogger(this.getClass());
private int port;private final EventLoopGroup bossGroup; //处理Accept连接事件的线程private final EventLoopGroup workerGroup; //处理handler的工作线程public TcpServer(int port) {this.port = port;this.bossGroup = new NioEventLoopGroup(1);int cores = Runtime.getRuntime().availableProcessors();this.workerGroup = new NioEventLoopGroup(cores);}public void start() throws Exception {try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup);serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024); //连接数serverBootstrap.localAddress(this.port);serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);serverBootstrap.childHandler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new PkgDecoder());pipeline.addLast(new ServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind().sync();if (channelFuture.isSuccess()) {logger.info("rpc server start success!");} else {logger.info("rpc server start fail!");}channelFuture.channel().closeFuture().sync();} catch (Exception ex) {logger.error("exception occurred exception=" + ex.getMessage());} finally {bossGroup.shutdownGracefully().sync();// 释放线程池资源workerGroup.shutdownGracefully().sync();}}
}
package com.naixue.server.entity;
/**
Created by zhuanzhuan on 2019/9/3.
*/
public class User {
private long uid;
private short age;
private short sex;public long getUid() {
return uid;
}public User setUid(long uid) {
this.uid = uid;
return this;
}public short getAge() {
return age;
}public User setAge(short age) {
this.age = age;
return this;
}public short getSex() {
return sex;
}public User setSex(short sex) {
this.sex = sex;
return this;
}
}
package com.naixue.server.protocol;
import com.naixue.server.entity.User;
import com.naixue.util.ByteConverter;
import java.io.*;
public class RpcProtocol {
static public int CONST_CMD_MAGIC = 0x20110711;
private int version;
private int cmd;
public int magicNum;
private int bodyLen;
private byte[] body;
final public static int HEAD_LEN = 16;
public byte[] getBody() {return body;}public RpcProtocol setBody(byte[] body) {this.body = body;return this;}public int getVersion() {return version;}public RpcProtocol setVersion(int version) {this.version = version;return this;}public int getCmd() {return cmd;}public RpcProtocol setCmd(int cmd) {this.cmd = cmd;return this;}public int getMagicNum() {return magicNum;}public RpcProtocol setMagicNum(int magicNum) {this.magicNum = magicNum;return this;}public int getBodyLen() {return bodyLen;}public RpcProtocol setBodyLen(int bodyLen) {this.bodyLen = bodyLen;return this;}public byte[] generateByteArray(){byte[] data = new byte[HEAD_LEN + bodyLen];int index = 0;System.arraycopy(ByteConverter.intToBytes(version), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(ByteConverter.intToBytes(cmd), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(ByteConverter.intToBytes(magicNum), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(ByteConverter.intToBytes(bodyLen), 0, data, index, Integer.BYTES);index += Integer.BYTES;System.arraycopy(body, 0, data, index, body.length);return data;}public RpcProtocol byteArrayToRpcHeader(byte[] data) throws IOException, ClassNotFoundException {if (data == null || data.length == 0) {return null;}int index = 0;this.setVersion(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.setCmd(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.setMagicNum(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.setBodyLen(ByteConverter.bytesToInt(data, index));index += Integer.BYTES;this.body = new byte[this.bodyLen];System.arraycopy(data, index, this.body, 0, this.bodyLen);return this;}public User byteArrayToUserInfo(byte[] data){User user = new User();int index = 0;user.setUid(ByteConverter.bytesToLong(data, index));index += Long.BYTES;user.setAge(ByteConverter.bytesToShort(data, index));index += Short.BYTES;user.setSex(ByteConverter.bytesToShort(data, index));index += Short.BYTES;return user;}public byte[] userInfoTobyteArray(User info){byte[] data = new byte[Long.BYTES + Short.BYTES + Short.BYTES];int index = 0;System.arraycopy(ByteConverter.longToBytes(info.getUid()), 0, data, index, Long.BYTES);index += Long.BYTES;System.arraycopy(ByteConverter.shortToBytes(info.getAge()), 0, data, index, Short.BYTES);index += Short.BYTES;System.arraycopy(ByteConverter.shortToBytes(info.getSex()), 0, data, index, Short.BYTES);return data;}public static Object bytes2Object(byte[] objBytes) throws Exception {if (objBytes == null || objBytes.length == 0) {return null;}ByteArrayInputStream bi = new ByteArrayInputStream(objBytes);ObjectInputStream oi = new ObjectInputStream(bi);Object obj = oi.readObject();bi.close();oi.close();return obj;}public static byte[] object2Bytes(Serializable obj) throws Exception {if (obj == null) {return null;}ByteArrayOutputStream bo = new ByteArrayOutputStream();ObjectOutputStream oo = new ObjectOutputStream(bo);oo.writeObject(obj);bo.close();oo.close();return bo.toByteArray();}public byte[] createUserRespTobyteArray(int result){byte[] data = new byte[Integer.BYTES];int index = 0;System.arraycopy(ByteConverter.intToBytes(result), 0, data, index, Integer.BYTES);return data;}
}
package com.naixue.server.server;
import com.naixue.server.entity.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
Created by chendong on 2019/9/3.
*/
public class UserService {
private Logger logger = LoggerFactory.getLogger(this.getClass());public int addUser(User userinfo){
logger.debug(“create user success, uid=” + userinfo.getUid());
return 0;
}
}
package com.naixue;
import com.naixue.server.connect.TcpServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RpcServer {
private static Logger logger = LoggerFactory.getLogger(RpcServer.class);private static int SERVER_LISTEN_PORT = 58885;public static void main(String[] args) throws Exception {Thread tcpServerThread = new Thread("tcpServer") {public voidrun() {TcpServer tcpServer = new TcpServer(SERVER_LISTEN_PORT);try {tcpServer.start();} catch (Exception e) {logger.info("TcpServer start exception: " + e.getMessage());}}};tcpServerThread.start();tcpServerThread.join();}
}