当Nacos服务端启动时怎么知道集群中有哪些节点?当新的节点加入集群或者集群中有节点下线了,集群之间可以通过健康检查发现。健康检查的频率是怎么样的?节点的状态又是如何变动的?状态的变动又会触发什么动作。
当Nacos服务端启动时怎么知道集群中有哪些节点?
在配置集群时,会在配置文件cluster.conf中指定集群中各个节点的IP和端口,Nacos服务端启动时会读取这个配置文件并解析,下面来看看这个解析过程。
com.alibaba.nacos.core.cluster.ServerMemberManager#ServerMemberManager
public ServerMemberManager(ServletContext servletContext) throws Exception {this.serverList = new ConcurrentSkipListMap<>();EnvUtil.setContextPath(servletContext.getContextPath());init();}protected void init() throws NacosException {Loggers.CORE.info("Nacos-related cluster resource initialization");this.port = EnvUtil.getProperty("server.port", Integer.class, 8848);this.localAddress = InetUtils.getSelfIP() + ":" + port;this.self = MemberUtil.singleParse(this.localAddress);this.self.setExtendVal(MemberMetaDataConstants.VERSION, VersionUtils.version);serverList.put(self.getAddress(), self);// register NodeChangeEvent publisher to NotifyManager// 注册MembersChangeEvent事件registerClusterEvent();// Initializes the lookup mode// 初始化节点initAndStartLookup();if (serverList.isEmpty()) {throw new NacosException(NacosException.SERVER_ERROR, "cannot get serverlist, so exit.");}Loggers.CORE.info("The cluster resource is initialized");}
ServerMemberManager#registerClusterEvent
注册MembersChangeEvent的Publisher。
监听IPChangeEvent事件。
com.alibaba.nacos.core.cluster.ServerMemberManager#registerClusterEvent
private void registerClusterEvent() {// Register node change eventsNotifyCenter.registerToPublisher(MembersChangeEvent.class, EnvUtil.getProperty("nacos.member-change-event.queue.size", Integer.class, 128));// The address information of this node needs to be dynamically modified// when registering the IP change of this nodeNotifyCenter.registerSubscriber(new Subscriber<InetUtils.IPChangeEvent>() {@Overridepublic void onEvent(InetUtils.IPChangeEvent event) {String newAddress = event.getNewIP() + ":" + port;ServerMemberManager.this.localAddress = newAddress;EnvUtil.setLocalAddress(localAddress);Member self = ServerMemberManager.this.self;self.setIp(event.getNewIP());String oldAddress = event.getOldIP() + ":" + port;// 维护服务列表ServerMemberManager.this.serverList.remove(oldAddress);ServerMemberManager.this.serverList.put(newAddress, self);ServerMemberManager.this.memberAddressInfos.remove(oldAddress);ServerMemberManager.this.memberAddressInfos.add(newAddress);}@Overridepublic Class<? extends Event> subscribeType() {return InetUtils.IPChangeEvent.class;}});}
ServerMemberManager#initAndStartLookup
com.alibaba.nacos.core.cluster.ServerMemberManager#initAndStartLookup
private void initAndStartLookup() throws NacosException {this.lookup = LookupFactory.createLookUp(this);/** * @see com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start() */this.lookup.start();}
FileConfigMemberLookup#start
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#start
public void start() throws NacosException {if (start.compareAndSet(false, true)) {// 读取cluster.conf文件readClusterConfFromDisk();// Use the inotify mechanism to monitor file changes and automatically// trigger the reading of cluster.conftry {// 监听文件的变化WatchFileCenter.registerWatcher(EnvUtil.getConfPath(), watcher);} catch (Throwable e) {Loggers.CLUSTER.error("An exception occurred in the launch file monitor : {}", e.getMessage());}}}
FileConfigMemberLookup#readClusterConfFromDisk
com.alibaba.nacos.core.cluster.lookup.FileConfigMemberLookup#readClusterConfFromDisk
private void readClusterConfFromDisk() {Collection<Member> tmpMembers = new ArrayList<>();try {List<String> tmp = EnvUtil.readClusterConf();// 读取cluster.conf文件tmpMembers = MemberUtil.readServerConf(tmp);} catch (Throwable e) {Loggers.CLUSTER.error("nacos-XXXX [serverlist] failed to get serverlist from disk!, error : {}", e.getMessage());}/** * 发布MembersChangeEvent事件 */afterLookup(tmpMembers);}
AbstractMemberLookup#afterLookup
com.alibaba.nacos.core.cluster.AbstractMemberLookup#afterLookup
public void afterLookup(Collection<Member> members) {this.memberManager.memberChange(members);}
ServerMemberManager#memberChange
com.alibaba.nacos.core.cluster.ServerMemberManager#memberChange
synchronized boolean memberChange(Collection<Member> members) {if (members == null || members.isEmpty()) {return false;}// 判断自己是否在集群中boolean isContainSelfIp = members.stream().anyMatch(ipPortTmp -> Objects.equals(localAddress, ipPortTmp.getAddress()));if (isContainSelfIp) {isInIpList = true;} else {isInIpList = false;// 如果自己不在集群中,把自己加入members.add(this.self);Loggers.CLUSTER.warn("[serverlist] self ip {} not in serverlist {}", self, members);}// If the number of old and new clusters is different, the cluster information// must have changed; if the number of clusters is the same, then compare whether// there is a difference; if there is a difference, then the cluster node changes// are involved and all recipients need to be notified of the node change event// 判断集群的状态是否已变更boolean hasChange = members.size() != serverList.size();ConcurrentSkipListMap<String, Member> tmpMap = new ConcurrentSkipListMap<>();Set<String> tmpAddressInfo = new ConcurrentHashSet<>();for (Member member : members) {final String address = member.getAddress();if (!serverList.containsKey(address)) {hasChange = true;// If the cluster information in cluster.conf or address-server has been changed,// while the corresponding nacos-server has not been started yet, the member's state// should be set to DOWN. If the corresponding nacos-server has been started, the// member's state will be set to UP after detection in a few seconds.member.setState(NodeState.DOWN);} else {//fix issue # 4925member.setState(serverList.get(address).getState());}// Ensure that the node is created only oncetmpMap.put(address, member);if (NodeState.UP.equals(member.getState())) {tmpAddressInfo.add(address);}}serverList = tmpMap;memberAddressInfos = tmpAddressInfo;Collection<Member> finalMembers = allMembers();Loggers.CLUSTER.warn("[serverlist] updated to : {}", finalMembers);// Persist the current cluster node information to cluster.conf// need to put the event publication into a synchronized block to ensure// that the event publication is sequentialif (hasChange) {MemberUtil.syncToFile(finalMembers);// 发布MembersChangeEvent事件Event event = MembersChangeEvent.builder().members(finalMembers).build();NotifyCenter.publishEvent(event);}return hasChange;}
集群间的节点怎么维持心跳?
ServerMemberManager监听了Spring Boot启动过程中发出的WebServerInitializedEvent事件,然后启动集群节点之间的健康检查任务MemberInfoReportTask。
com.alibaba.nacos.core.cluster.ServerMemberManager#onApplicationEvent
public void onApplicationEvent(WebServerInitializedEvent event) {getSelf().setState(NodeState.UP);if (!EnvUtil.getStandaloneMode()) {// 发送服务节点之间的心跳包GlobalExecutor.scheduleByCommon(this.infoReportTask, 5_000L);}EnvUtil.setPort(event.getWebServer().getPort());EnvUtil.setLocalAddress(this.localAddress);Loggers.CLUSTER.info("This node is ready to provide external services");}
下面分析MemberInfoReportTask任务的执行过程。
Task#run
MemberInfoReportTask实现了Task,Task实现了Runnable接口,在Task中会调用子类的方法executeBody()。
com.alibaba.nacos.core.cluster.Task#run
public void run() {if (shutdown) {return;}try {executeBody();} catch (Throwable t) {Loggers.CORE.error("this task execute has error : {}", ExceptionUtil.getStackTrace(t));} finally {if (!shutdown) {after();}}}
MemberInfoReportTask#executeBody
遍历集群中的所有的节点,给每个节点发送心跳包。
com.alibaba.nacos.core.cluster.ServerMemberManager.MemberInfoReportTask#executeBody
protected void executeBody() {// 获取除自己外的所有节点List<Member> members = ServerMemberManager.this.allMembersWithoutSelf();if (members.isEmpty()) {return;}// 每次+1this.cursor = (this.cursor + 1) % members.size();Member target = members.get(cursor);Loggers.CLUSTER.debug("report the metadata to the node : {}", target.getAddress());// /nacos/v1/core/cluster/reportfinal String url = HttpUtils.buildUrl(false, target.getAddress(), EnvUtil.getContextPath(), Commons.NACOS_CORE_CONTEXT,"/cluster/report");try {Header header = Header.newInstance().addParam(Constants.NACOS_SERVER_HEADER, VersionUtils.version);AuthHeaderUtil.addIdentityToHeader(header);asyncRestTemplate.post(url, header,Query.EMPTY, getSelf(), reference.getType(), new Callback<String>() {@Overridepublic void onReceive(RestResult<String> result) {if (result.getCode() == HttpStatus.NOT_IMPLEMENTED.value()|| result.getCode() == HttpStatus.NOT_FOUND.value()) {Loggers.CLUSTER.warn("{} version is too low, it is recommended to upgrade the version : {}",target, VersionUtils.version);return;}if (result.ok()) {// 成功MemberUtil.onSuccess(ServerMemberManager.this, target);} else {Loggers.CLUSTER.warn("failed to report new info to target node : {}, result : {}",target.getAddress(), result);// 失败MemberUtil.onFail(ServerMemberManager.this, target);}}@Overridepublic void onError(Throwable throwable) {Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(), ExceptionUtil.getAllExceptionMsg(throwable));// 失败MemberUtil.onFail(ServerMemberManager.this, target, throwable);}@Overridepublic void onCancel() {}});} catch (Throwable ex) {Loggers.CLUSTER.error("failed to report new info to target node : {}, error : {}", target.getAddress(),ExceptionUtil.getAllExceptionMsg(ex));}}
MemberUtil#onSuccess
com.alibaba.nacos.core.cluster.MemberUtil#onSuccess
public static void onSuccess(final ServerMemberManager manager, final Member member) {final NodeState old = member.getState();manager.getMemberAddressInfos().add(member.getAddress());// 将节点状态改为UPmember.setState(NodeState.UP);member.setFailAccessCnt(0);if (!Objects.equals(old, member.getState())) {// 发布MembersChangeEvent事件manager.notifyMemberChange();}}
MemberUtil#onFail
com.alibaba.nacos.core.cluster.MemberUtil#onFail(com.alibaba.nacos.core.cluster.ServerMemberManager, com.alibaba.nacos.core.cluster.Member, java.lang.Throwable)
public static void onFail(final ServerMemberManager manager, final Member member, Throwable ex) {manager.getMemberAddressInfos().remove(member.getAddress());final NodeState old = member.getState();// 将节点状态改为SUSPICIOUSmember.setState(NodeState.SUSPICIOUS);member.setFailAccessCnt(member.getFailAccessCnt() + 1);int maxFailAccessCnt = EnvUtil.getProperty("nacos.core.member.fail-access-cnt", Integer.class, 3);// If the number of consecutive failures to access the target node reaches// a maximum, or the link request is rejected, the state is directly downif (member.getFailAccessCnt() > maxFailAccessCnt || StringUtils.containsIgnoreCase(ex.getMessage(), TARGET_MEMBER_CONNECT_REFUSE_ERRMSG)) {// 错误次数超过3次,将节点状态改为DOWNmember.setState(NodeState.DOWN);}if (!Objects.equals(old, member.getState())) {// 发布MembersChangeEvent事件manager.notifyMemberChange();}}
节点对心跳包的处理
集群的节点之间每隔5s会给集群中的其他节点发送心跳包,下面来看看节点收到这个心跳包后是怎么处理的。
请求的接口地址:/nacos/v1/core/cluster/report。
com.alibaba.nacos.core.controller.NacosClusterController#report
@PostMapping(value = {"/report"})public RestResult<String> report(@RequestBody Member node) {// 服务接收其他节点心跳包入口if (!node.check()) {return RestResultUtils.failedWithMsg(400, "Node information is illegal");}LoggerUtils.printIfDebugEnabled(Loggers.CLUSTER, "node state report, receive info : {}", node);node.setState(NodeState.UP);node.setFailAccessCnt(0);// 更新节点状态boolean result = memberManager.update(node);return RestResultUtils.success(Boolean.toString(result));}
com.alibaba.nacos.core.cluster.ServerMemberManager#update
public boolean update(Member newMember) {Loggers.CLUSTER.debug("member information update : {}", newMember);String address = newMember.getAddress();if (!serverList.containsKey(address)) {return false;}serverList.computeIfPresent(address, (s, member) -> {if (NodeState.DOWN.equals(newMember.getState())) {memberAddressInfos.remove(newMember.getAddress());}boolean isPublishChangeEvent = MemberUtil.isBasicInfoChanged(newMember, member);newMember.setExtendVal(MemberMetaDataConstants.LAST_REFRESH_TIME, System.currentTimeMillis());MemberUtil.copy(newMember, member);if (isPublishChangeEvent) {// member basic data changes and all listeners need to be notified// 如果节点有变更,发布MembersChangeEvent事件notifyMemberChange();}return member;});return true;}