Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:
- 服务器启动
- 客户端连接
- 客户端心跳
- 服务器监控检查
服务器proto文件
api/src/main/proto/nacos_grpc_service.proto文件:
syntax = "proto3";import "google/protobuf/any.proto";import "google/protobuf/timestamp.proto";option java_multiple_files = true;option java_package = "com.alibaba.nacos.api.grpc.auto";message Metadata { string type = 3; // 请求/响应的真实类型 string clientIp = 8; map headers = 7;}// GRPC通信层请求/响应体message Payload { Metadata metadata = 2; // 业务层的请求/响应体,需要使用type做反序列化 google.protobuf.Any body = 3;}service RequestStream { // build a streamRequest rpc requestStream (Payload) returns (stream Payload) { }}service Request { // Sends a commonRequest rpc request (Payload) returns (Payload) { }}service BiRequestStream { // Sends a commonRequest rpc requestBiStream (stream Payload) returns (stream Payload) { }}
文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。
服务器启动Server类继承关系
BaseRpcServer |-- BaseGrpcServer |-- GrpcSdkServer |-- GrpcClusterServer
此处介绍一下GrpcSdkServer实现。
GrpcSdkServer类
@Servicepublic class GrpcSdkServer extends BaseGrpcServer { // 所以SDK服务器的监听端口是9848 private static final int PORT_OFFSET = 1000; @Override public int rpcPortOffset() { return PORT_OFFSET; } @Override public ThreadPoolExecutor getRpcExecutor() { return GlobalExecutor.sdkRpcExecutor; }}
大部分的启动逻辑在BaseGrpcServer中。
BaseGrpcServer类
GRPC服务器的启动逻辑大部分都在这个类的startServer方法。
- 将处理请求的RequestAcceptor注册到HandlerRegistry
- GrpcRequestAcceptor用于处理普通业务请求
- GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
- 创建GRPC的Server对象
- 设置port和executor
- 设置HandlerRegistry
- 添加ServerTransportFilter在连接建立和断开时做一些业务操作
- 启动Server
GrpcRequestAcceptor类
这个类对GRPC做了扩展,重写了request方法:
- 解析Payload获取请求体的数据类型
- 从RequestHandlerRegistry获取适配的RequestHandler处理器
- 将请求体反序列化成请求体类型对象
- 调用handleRequest方法处理请求返回响应
处理请求代码:
Request request = (Request) parseObj;try { // 获取Connection Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get()); RequestMeta requestMeta = new RequestMeta(); requestMeta.setClientIp(connection.getMetaInfo().getClientIp()); requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get()); requestMeta.setClientVersion(connection.getMetaInfo().getVersion()); requestMeta.setLabels(connection.getMetaInfo().getLabels()); // 刷新活跃时间,后续的健康检查会使用到这个时间戳 connectionManager.refreshActiveTime(requestMeta.getConnectionId()); // 使用RequestHandler处理请求 Response response = requestHandler.handleRequest(request, requestMeta); Payload payloadResponse = GrpcUtils.convert(response); traceIfNecessary(payloadResponse, false); responseObserver.onNext(payloadResponse); responseObserver.onCompleted();} catch (Throwable e) { Payload payloadResponse = GrpcUtils.convert(buildErrorResponse( (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(), e.getMessage())); traceIfNecessary(payloadResponse, false); responseObserver.onNext(payloadResponse); responseObserver.onCompleted();}
RequestHandler处理器
RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:
public abstract class RequestHandler { @Autowired private RequestFilters requestFilters; /** * Handler request. */ public Response handleRequest(T request, RequestMeta meta) throws NacosException { for (AbstractRequestFilter filter : requestFilters.filters) { try { Response filterResult = filter.filter(request, meta, this.getClass()); if (filterResult != null && !filterResult.isSuccess()) { return filterResult; } } catch (Throwable throwable) { Loggers.REMOTE.error("filter error", throwable); } } return handle(request, meta); } /** * Handler request. */ public abstract S handle(T request, RequestMeta meta) throws NacosException;}
实现类:
Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:
// key是Request类型的简单名// value是RequestHandler实现类对象Map registryHandlers = new HashMap();
RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。
GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:
public RequestHandler getByRequestType(String requestType) { return registryHandlers.get(requestType);}
建立连接
在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。
创建GrpcConnection
客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:
- 获取到conn_id、remote_ip、remote_port、local_port等
- 解析请求获取clienIp
- 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
- 将GrpcConnection注册到ConnectionManager上
创建Client
ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:
public void clientConnected(Connection connect) { // grpc类型 String type = connect.getMetaInfo().getConnectType(); // 此处获取到的是ConnectionBasedClientFactory对象 ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type); // 此处创建的是ConnectionBasedClient对象 clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));}public boolean clientConnected(Client client) { if (!clients.containsKey(client.getClientId())) { // 注册到client集 // 使用Map维护clientId->client对象关系 clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client); } return true;}
健康检查ConnectionManager连接管理器
这个类管理客户端连接,提供注册连接、移除连接等功能:
// 管理IP -> 连接数,用于实现ConnectionLimitRuleprivate Map connectionForClientIp = new ConcurrentHashMap(16);// 管理connectionId -> ConnectionMap connections = new ConcurrentHashMap();
Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。
GrpcConnection实现了Connection抽象类。
在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。
健康检查周期任务
ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:
- 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发reset请求重置连接
- 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接
断开连接业务处理流程
GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:
public void transportTerminated(Attributes transportAttrs) { String connectionId = null; try { connectionId = transportAttrs.get(TRANS_KEY_CONN_ID); } catch (Exception e) { // Ignore } if (StringUtils.isNotBlank(connectionId)) { // 使用ConnectionManager移除连接 connectionManager.unregister(connectionId); }}
ConnectionManager移除连接:
public synchronized void unregister(String connectionId) { // 从Connection集移除连接 Connection remove = this.connections.remove(connectionId); if (remove != null) { String clientIp = remove.getMetaInfo().clientIp; AtomicInteger atomicInteger = connectionForClientIp.get(clientIp); // IP连接数-- if (atomicInteger != null) { int count = atomicInteger.decrementAndGet(); if (count <= 0) { connectionForClientIp.remove(clientIp); } } remove.close(); // 通知ClientManager层移除client对象 clientConnectionEventListenerRegistry.notifyClientDisConnected(remove); }}
ConnectionBasedClientManager的clientDisconnected方法:
public boolean clientDisconnected(String clientId) { ConnectionBasedClient client = clients.remove(clientId); if (null == client) { return true; } client.release(); // 推送一个ClientDisconnectEvent事件 NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client)); return true;}
事件处理流程
ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.
- ClientServiceIndexesManager – 维护注册和订阅关系
- DistroClientDataProcessor – 同步客户端数据到所有服务节点
- NamingMetadataManager – 维护客户端注册的服务和实例元数据信息
客户端建立连接ServerListFactory接口
Server list factory. Use to inner client to connected and switch servers.
管理Server服务器地址集合,RpcClient使用这个接口选择可用的服务器地址。
public interface ServerListFactory { // 选择一个可用的服务器地址 ip:port格式 String genNextServer(); // 返回当前使用的服务器地址 ip:port格式 String getCurrentServer(); // 返回服务器集合 List getServerList();}
ServerListManager类
解析Properties参数封装服务器地址集合。
创建RpcClient
RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
createClient方法:
public static RpcClient createClient(String clientName, ConnectionType connectionType, Map labels) { return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> { if (client == null) { if (ConnectionType.GRPC.equals(connectionType)) { // 创建的是GrpcSdkClient对象 client = new GrpcSdkClient(clientNameInner); } if (client == null) { throw new UnsupportedOperationException( "unsupported connection type :" + connectionType.getType()); } client.labels(labels); } return client; });}
之后需要为Client进行初始化:
设置ServerListFactory,用于选择服务器地址
注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知
注册ConnectionEventListener监听器
rpcClient.serverListFactory(serverListFactory);rpcClient.start();rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
启动Client
- 启动ConnectionEvent处理线程
- 启动健康检查(心跳)线程
- 创建GrpcConnection
创建GrpcConnection
- 创建GRPC的RequestFutureStub和BiRequestStreamStub
- 发一个ServerCheckRequest请求验证服务端的可用性
- 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
- 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
- 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
if (grpcExecutor == null) { int threadNumber = ThreadUtils.getSuitableThreadCount(8); grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS, new LinkedBlockingQueue(10000), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d") .build()); grpcExecutor.allowCoreThreadTimeOut(true);}// 8848+1000int port = serverInfo.getServerPort() + rpcPortOffset();RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);// 发一个ServerCheckRequest请求验证服务端的可用性Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);if (response == null || !(response instanceof ServerCheckResponse)) { shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel()); return null;}BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc .newStub(newChannelStubTemp.getChannel());// 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());// create stream request and bind connection event to this connectionStreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);// stream observer to send response to servergrpcConn.setPayloadStreamObserver(payloadStreamObserver);grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());// send a setup requestConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());conSetupRequest.setLabels(super.getLabels());conSetupRequest.setAbilities(super.clientAbilities);conSetupRequest.setTenant(super.getTenant());grpcConn.sendRequest(conSetupRequest);
发送请求Requester接口
这个接口定义了发送请求的方法:
public interface Requester { /** * send request. * * @param request request. * @param timeoutMills mills of timeouts. * @return response response returned. * @throws NacosException exception throw. */ Response request(Request request, long timeoutMills) throws NacosException; /** * send request. * * @param request request. * @return request future. * @throws NacosException exception throw. */ RequestFuture requestFuture(Request request) throws NacosException; /** * send async request. * * @param request request. * @param requestCallBack callback of request. * @throws NacosException exception throw. */ void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException; /** * close connection. */ void close();}
GrpcConnection实现
GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:
public Response request(Request request, long timeouts) throws NacosException { Payload grpcRequest = GrpcUtils.convert(request); ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest); Payload grpcResponse; try { // 由于request方法是同步的,所以此处阻塞等待响应 grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } return (Response) GrpcUtils.parse(grpcResponse);}
对于另外两个方法:
- requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
- asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调
心跳healthCheck
前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:
// keepAliveTime默认5000LReconnectContext reconnectContext = reconnectionSignal .poll(keepAliveTime, TimeUnit.MILLISECONDS);if (reconnectContext == null) { // check alive time. if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) { // 健康检查 boolean isHealthy = healthCheck(); if (!isHealthy) { if (currentConnection == null) { continue; } RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get(); if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) { break; } // 准备重连 boolean success = RpcClient.this.rpcClientStatus .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY); if (success) { reconnectContext = new ReconnectContext(null, false); } else { continue; } } else { lastActiveTimeStamp = System.currentTimeMillis(); continue; } } else { continue; }}if (reconnectContext.serverInfo != null) { // clear recommend server if server is not in server list. boolean serverExist = false; for (String server : getServerListFactory().getServerList()) { ServerInfo serverInfo = resolveServerInfo(server); if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) { serverExist = true; reconnectContext.serverInfo.serverPort = serverInfo.serverPort; break; } } if (!serverExist) { reconnectContext.serverInfo = null; }}// 重连reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);
healthCheck方法:
private boolean healthCheck() { HealthCheckRequest healthCheckRequest = new HealthCheckRequest(); if (this.currentConnection == null) { return false; } try { Response response = this.currentConnection.request(healthCheckRequest, 3000L); // not only check server is ok ,also check connection is register. return response != null && response.isSuccess(); } catch (NacosException e) { // ignore } return false;}