Nacos 2.x在服务端与客户端直接增加了GRPC通信方式,本文通过2.0.2版本源码,简单分析GRPC通信方式:

  • 服务器启动
  • 客户端连接
  • 客户端心跳
  • 服务器监控检查

服务器proto文件

api/src/main/proto/nacos_grpc_service.proto文件:

syntax = "proto3";import "google/protobuf/any.proto";import "google/protobuf/timestamp.proto";option java_multiple_files = true;option java_package = "com.alibaba.nacos.api.grpc.auto";message Metadata {  string type = 3; // 请求/响应的真实类型  string clientIp = 8;  map headers = 7;}// GRPC通信层请求/响应体message Payload {  Metadata metadata = 2;  // 业务层的请求/响应体,需要使用type做反序列化  google.protobuf.Any body = 3;}service RequestStream {  // build a streamRequest  rpc requestStream (Payload) returns (stream Payload) {  }}service Request {  // Sends a commonRequest  rpc request (Payload) returns (Payload) {  }}service BiRequestStream {  // Sends a commonRequest  rpc requestBiStream (stream Payload) returns (stream Payload) {  }}

文件定义了通信层的service和message结构,业务层请求响应的序列化和反序列化是Nacos在RequestAcceptor/Connection中使用工具类实现的,业务层请求处理是在RequestAcceptor中进行的转发。

服务器启动Server类继承关系

BaseRpcServer  |-- BaseGrpcServer     |-- GrpcSdkServer     |-- GrpcClusterServer

此处介绍一下GrpcSdkServer实现。

GrpcSdkServer类

@Servicepublic class GrpcSdkServer extends BaseGrpcServer {    // 所以SDK服务器的监听端口是9848    private static final int PORT_OFFSET = 1000;    @Override    public int rpcPortOffset() {        return PORT_OFFSET;    }    @Override    public ThreadPoolExecutor getRpcExecutor() {        return GlobalExecutor.sdkRpcExecutor;    }}

大部分的启动逻辑在BaseGrpcServer中。

BaseGrpcServer类

GRPC服务器的启动逻辑大部分都在这个类的startServer方法。

  1. 将处理请求的RequestAcceptor注册到HandlerRegistry
    • GrpcRequestAcceptor用于处理普通业务请求
    • GrpcBiStreamRequestAcceptor用于处理连接建立请求,获取Channel创建GrpcConnection并注册到ConnectionManager中,后续向客户端发送消息都是使用GrpcConnection做的
  2. 创建GRPC的Server对象
    • 设置port和executor
    • 设置HandlerRegistry
    • 添加ServerTransportFilter在连接建立和断开时做一些业务操作
  3. 启动Server

GrpcRequestAcceptor类

这个类对GRPC做了扩展,重写了request方法:

  1. 解析Payload获取请求体的数据类型
  2. 从RequestHandlerRegistry获取适配的RequestHandler处理器
  3. 将请求体反序列化成请求体类型对象
  4. 调用handleRequest方法处理请求返回响应

处理请求代码:

Request request = (Request) parseObj;try {    // 获取Connection    Connection connection = connectionManager.getConnection(CONTEXT_KEY_CONN_ID.get());    RequestMeta requestMeta = new RequestMeta();    requestMeta.setClientIp(connection.getMetaInfo().getClientIp());    requestMeta.setConnectionId(CONTEXT_KEY_CONN_ID.get());    requestMeta.setClientVersion(connection.getMetaInfo().getVersion());    requestMeta.setLabels(connection.getMetaInfo().getLabels());    // 刷新活跃时间,后续的健康检查会使用到这个时间戳    connectionManager.refreshActiveTime(requestMeta.getConnectionId());    // 使用RequestHandler处理请求    Response response = requestHandler.handleRequest(request, requestMeta);    Payload payloadResponse = GrpcUtils.convert(response);    traceIfNecessary(payloadResponse, false);    responseObserver.onNext(payloadResponse);    responseObserver.onCompleted();} catch (Throwable e) {    Payload payloadResponse = GrpcUtils.convert(buildErrorResponse(            (e instanceof NacosException) ? ((NacosException) e).getErrCode() : ResponseCode.FAIL.getCode(),            e.getMessage()));    traceIfNecessary(payloadResponse, false);    responseObserver.onNext(payloadResponse);    responseObserver.onCompleted();}

RequestHandler处理器

RequestHandler抽象类是Nacos在业务层处理GRPC请求的抽象类:

public abstract class RequestHandler {    @Autowired    private RequestFilters requestFilters;    /**     * Handler request.     */    public Response handleRequest(T request, RequestMeta meta) throws NacosException {        for (AbstractRequestFilter filter : requestFilters.filters) {            try {                Response filterResult = filter.filter(request, meta, this.getClass());                if (filterResult != null && !filterResult.isSuccess()) {                    return filterResult;                }            } catch (Throwable throwable) {                Loggers.REMOTE.error("filter error", throwable);            }        }        return handle(request, meta);    }    /**     * Handler request.     */    public abstract S handle(T request, RequestMeta meta) throws NacosException;}

实现类:

Nacos使用RequestHandlerRegistry管理所有的RequestHandler,是一个Map结构:

// key是Request类型的简单名// value是RequestHandler实现类对象Map registryHandlers = new HashMap();

RequestHandlerRegistry会扫描Spring容器里面所有的RequestHandler对象,解析RequestHandler实现类处理的Request类型的简单名,将其注册到registryHandlers中。

GrpcRequestAcceptor类获取适配的RequestHandler处理器使用的就是RequestHandlerRegistry类的getByRequestType方法:

public RequestHandler getByRequestType(String requestType) {    return registryHandlers.get(requestType);}

建立连接

在Server初始化的时候,Nacos注册了ServerInterceptor和ServerTransportFilter组件,这些组件会在连接建立时将conn_id、remote_ip、remote_port、local_port、ctx_channel等绑定到Context上。

创建GrpcConnection

客户端在连接建立之后会发送一个ConnectionSetupRequest请求,服务器使用GrpcBiStreamRequestAcceptor处理该请求:

  1. 获取到conn_id、remote_ip、remote_port、local_port等
  2. 解析请求获取clienIp
  3. 封装GrpcConnection对象,包括:conn_id、remote_ip、remote_port、local_port、clientIp、客户端版本等基础信息,以及StreamObserver和Channel
  4. 将GrpcConnection注册到ConnectionManager上

创建Client

ConnectionManager的注册操作会触发ConnectionBasedClientManager的clientConnected方法来创建Client对象:

public void clientConnected(Connection connect) {    // grpc类型    String type = connect.getMetaInfo().getConnectType();    // 此处获取到的是ConnectionBasedClientFactory对象    ClientFactory clientFactory = ClientFactoryHolder.getInstance().findClientFactory(type);    // 此处创建的是ConnectionBasedClient对象    clientConnected(clientFactory.newClient(connect.getMetaInfo().getConnectionId()));}public boolean clientConnected(Client client) {    if (!clients.containsKey(client.getClientId())) {        // 注册到client集        // 使用Map维护clientId->client对象关系        clients.putIfAbsent(client.getClientId(), (ConnectionBasedClient) client);    }    return true;}

健康检查ConnectionManager连接管理器

这个类管理客户端连接,提供注册连接、移除连接等功能:

// 管理IP -> 连接数,用于实现ConnectionLimitRuleprivate Map connectionForClientIp = new ConcurrentHashMap(16);// 管理connectionId -> ConnectionMap connections = new ConcurrentHashMap();

Connection抽象类实现了Requester接口,能够向客户端发送请求、管理连接状态。

GrpcConnection实现了Connection抽象类。

在连接建立后,客户端会发送一个ConnectionSetupRequest请求,服务端收到该请求后,会解析出connectionId、客户端IP、客户端端口、客户端版本、Channel等封装成GrpcConnection对象,然后注册到ConnectionManager中。

健康检查周期任务

ConnectionManager在启动阶段会启动一个周期任务来检查IP连接数和连接的活跃状态,每3秒执行一次:

  1. 遍历连接集,使用connectionLimitRule查找需要重置的连接,向这些客户端发reset请求重置连接
  2. 获取连接的最后活跃时间(客户端每次请求都会更新这个时间),如果超过20秒不活跃,则向客户端发送一个探测请求,如果请求失败则断开连接

断开连接业务处理流程

GRPC连接层检测到连接断开之后,会触发GrpcServer的transportTerminated事件:

public void transportTerminated(Attributes transportAttrs) {    String connectionId = null;    try {        connectionId = transportAttrs.get(TRANS_KEY_CONN_ID);    } catch (Exception e) {        // Ignore    }    if (StringUtils.isNotBlank(connectionId)) {        // 使用ConnectionManager移除连接        connectionManager.unregister(connectionId);    }}

ConnectionManager移除连接:

public synchronized void unregister(String connectionId) {    // 从Connection集移除连接    Connection remove = this.connections.remove(connectionId);    if (remove != null) {        String clientIp = remove.getMetaInfo().clientIp;        AtomicInteger atomicInteger = connectionForClientIp.get(clientIp);        // IP连接数--        if (atomicInteger != null) {            int count = atomicInteger.decrementAndGet();            if (count <= 0) {                connectionForClientIp.remove(clientIp);            }        }        remove.close();        // 通知ClientManager层移除client对象        clientConnectionEventListenerRegistry.notifyClientDisConnected(remove);    }}

ConnectionBasedClientManager的clientDisconnected方法:

public boolean clientDisconnected(String clientId) {    ConnectionBasedClient client = clients.remove(clientId);    if (null == client) {        return true;    }    client.release();    // 推送一个ClientDisconnectEvent事件    NotifyCenter.publishEvent(new ClientEvent.ClientDisconnectEvent(client));    return true;}

事件处理流程

ClientDisconnectEvent事件:Client disconnect event. Happened when Client disconnect with server.

  • ClientServiceIndexesManager – 维护注册和订阅关系
  • DistroClientDataProcessor – 同步客户端数据到所有服务节点
  • NamingMetadataManager – 维护客户端注册的服务和实例元数据信息

客户端建立连接ServerListFactory接口

Server list factory. Use to inner client to connected and switch servers.

管理Server服务器地址集合,RpcClient使用这个接口选择可用的服务器地址。

public interface ServerListFactory {    // 选择一个可用的服务器地址 ip:port格式    String genNextServer();    // 返回当前使用的服务器地址 ip:port格式    String getCurrentServer();    // 返回服务器集合    List getServerList();}

ServerListManager类

解析Properties参数封装服务器地址集合。

创建RpcClient

RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);

createClient方法:

public static RpcClient createClient(String clientName,                                     ConnectionType connectionType,                                     Map labels) {    return CLIENT_MAP.compute(clientName, (clientNameInner, client) -> {        if (client == null) {            if (ConnectionType.GRPC.equals(connectionType)) {                // 创建的是GrpcSdkClient对象                client = new GrpcSdkClient(clientNameInner);            }            if (client == null) {                throw new UnsupportedOperationException(                    "unsupported connection type :" + connectionType.getType());            }            client.labels(labels);        }        return client;    });}

之后需要为Client进行初始化:

  1. 设置ServerListFactory,用于选择服务器地址

  2. 注册ServerRequestHandler处理器,用于处理服务端发送的请求,比如服务订阅的回调、配置文件变化通知

  3. 注册ConnectionEventListener监听器

    rpcClient.serverListFactory(serverListFactory);rpcClient.start();rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));rpcClient.registerConnectionListener(namingGrpcConnectionEventListener);
  4. 启动Client

    • 启动ConnectionEvent处理线程
    • 启动健康检查(心跳)线程
    • 创建GrpcConnection

创建GrpcConnection

  1. 创建GRPC的RequestFutureStub和BiRequestStreamStub
  2. 发一个ServerCheckRequest请求验证服务端的可用性
  3. 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等
  4. 为BiRequestStreamStub绑定请求处理逻辑:使用ServerRequestHandler处理器处理服务端发送过来的请求
  5. 发送ConnectionSetupRequest请求,让服务端创建并注册GrpcConnection
if (grpcExecutor == null) {    int threadNumber = ThreadUtils.getSuitableThreadCount(8);    grpcExecutor = new ThreadPoolExecutor(threadNumber, threadNumber, 10L, TimeUnit.SECONDS,            new LinkedBlockingQueue(10000),            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("nacos-grpc-client-executor-%d")                    .build());    grpcExecutor.allowCoreThreadTimeOut(true);}// 8848+1000int port = serverInfo.getServerPort() + rpcPortOffset();RequestGrpc.RequestFutureStub newChannelStubTemp = createNewChannelStub(serverInfo.getServerIp(), port);// 发一个ServerCheckRequest请求验证服务端的可用性Response response = serverCheck(serverInfo.getServerIp(), port, newChannelStubTemp);if (response == null || !(response instanceof ServerCheckResponse)) {    shuntDownChannel((ManagedChannel) newChannelStubTemp.getChannel());    return null;}BiRequestStreamGrpc.BiRequestStreamStub biRequestStreamStub = BiRequestStreamGrpc        .newStub(newChannelStubTemp.getChannel());// 创建GrpcConnection对象,封装serverInfo和executor、connectionId、channel等GrpcConnection grpcConn = new GrpcConnection(serverInfo, grpcExecutor);grpcConn.setConnectionId(((ServerCheckResponse) response).getConnectionId());// create stream request and bind connection event to this connectionStreamObserver payloadStreamObserver = bindRequestStream(biRequestStreamStub, grpcConn);// stream observer to send response to servergrpcConn.setPayloadStreamObserver(payloadStreamObserver);grpcConn.setGrpcFutureServiceStub(newChannelStubTemp);grpcConn.setChannel((ManagedChannel) newChannelStubTemp.getChannel());// send a setup requestConnectionSetupRequest conSetupRequest = new ConnectionSetupRequest();conSetupRequest.setClientVersion(VersionUtils.getFullClientVersion());conSetupRequest.setLabels(super.getLabels());conSetupRequest.setAbilities(super.clientAbilities);conSetupRequest.setTenant(super.getTenant());grpcConn.sendRequest(conSetupRequest);

发送请求Requester接口

这个接口定义了发送请求的方法:

public interface Requester {    /**     * send request.     *     * @param request      request.     * @param timeoutMills mills of timeouts.     * @return response  response returned.     * @throws NacosException exception throw.     */    Response request(Request request, long timeoutMills) throws NacosException;    /**     * send request.     *     * @param request request.     * @return request future.     * @throws NacosException exception throw.     */    RequestFuture requestFuture(Request request) throws NacosException;    /**     * send async request.     *     * @param request         request.     * @param requestCallBack callback of request.     * @throws NacosException exception throw.     */    void asyncRequest(Request request, RequestCallBack requestCallBack) throws NacosException;    /**     * close connection.     */    void close();}

GrpcConnection实现

GrpcConnection类实现了Requester接口的三个request方法,使用的是GRPC的Stub发送请求,以request方法为例:

public Response request(Request request, long timeouts) throws NacosException {    Payload grpcRequest = GrpcUtils.convert(request);    ListenableFuture requestFuture = grpcFutureServiceStub.request(grpcRequest);    Payload grpcResponse;    try {        // 由于request方法是同步的,所以此处阻塞等待响应        grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);    } catch (Exception e) {        throw new NacosException(NacosException.SERVER_ERROR, e);    }    return (Response) GrpcUtils.parse(grpcResponse);}

对于另外两个方法:

  • requestFuture方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,创建一个RequestFuture返回
  • asyncRequest方法:在grpcFutureServiceStub.request(grpcRequest)发送请求之后,为requestFuture添加监听回调

心跳healthCheck

前文介绍过,在启动RpcClient阶段,会启动健康检查任务,该任务每5秒执行一次,对当前客户端封装的connection做健康检查:

// keepAliveTime默认5000LReconnectContext reconnectContext = reconnectionSignal        .poll(keepAliveTime, TimeUnit.MILLISECONDS);if (reconnectContext == null) {    // check alive time.    if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {        // 健康检查        boolean isHealthy = healthCheck();        if (!isHealthy) {            if (currentConnection == null) {                continue;            }            RpcClientStatus rpcClientStatus = RpcClient.this.rpcClientStatus.get();            if (RpcClientStatus.SHUTDOWN.equals(rpcClientStatus)) {                break;            }            // 准备重连            boolean success = RpcClient.this.rpcClientStatus                    .compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);            if (success) {                reconnectContext = new ReconnectContext(null, false);            } else {                continue;            }        } else {            lastActiveTimeStamp = System.currentTimeMillis();            continue;        }    } else {        continue;    }}if (reconnectContext.serverInfo != null) {    // clear recommend server if server is not in server list.    boolean serverExist = false;    for (String server : getServerListFactory().getServerList()) {        ServerInfo serverInfo = resolveServerInfo(server);        if (serverInfo.getServerIp().equals(reconnectContext.serverInfo.getServerIp())) {            serverExist = true;            reconnectContext.serverInfo.serverPort = serverInfo.serverPort;            break;        }    }    if (!serverExist) {        reconnectContext.serverInfo = null;    }}// 重连reconnect(reconnectContext.serverInfo, reconnectContext.onRequestFail);

healthCheck方法:

private boolean healthCheck() {    HealthCheckRequest healthCheckRequest = new HealthCheckRequest();    if (this.currentConnection == null) {        return false;    }    try {        Response response = this.currentConnection.request(healthCheckRequest, 3000L);        // not only check server is ok ,also check connection is register.        return response != null && response.isSuccess();    } catch (NacosException e) {        // ignore    }    return false;}