又是美好的一天呀~
个人博客地址: huanghong.top
往下看看~
- 服务端配置修改的事件推送
- publishConfig
- onEvent
- AsyncRpcTask
- dump
- process
- configDump
服务端配置修改的事件推送
通过nacos ui界面新增或修改配置信息(接口访问同理)
#post请求#示例urlhttp://localhost:8848/nacos/v1/cs/configs?accessToken=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJuYWNvcyIsImV4cCI6MTY3MjgzMjQ3Nn0.7Ph4LSo2z4FYO09bZP145HRxXJJI5ZjepDodGT2LS4s&message=true#示例请求体dataId: config.propertiesgroup: DEFAULT_GROUPcontent: key%3Dvalueconfig_tags: type: propertiesappName: tenant: namespaceId:
请求会进入到com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法中
publishConfig
//com.alibaba.nacos.config.server.controller.ConfigController#publishConfig//根据传入接口的参数查询config_info表,判断配置信息是插入还是更新persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);//发布ConfigDataChangeEvent事件ConfigChangePublisher.notifyConfigChange( new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
onEvent
//com.alibaba.nacos.config.server.service.notify.AsyncNotifyService//com.alibaba.nacos.common.notify.listener.Subscriber#onEventpublic void onEvent(Event event) { // Generate ConfigDataChangeEvent concurrently if (event instanceof ConfigDataChangeEvent) { ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event; long dumpTs = evt.lastModifiedTs; String dataId = evt.dataId; String group = evt.group; String tenant = evt.tenant; String tag = evt.tag; //获取所有nacos服务端地址 Collection<Member> ipList = memberManager.allMembers(); // In fact, any type of queue here can be Queue<NotifySingleTask> httpQueue = new LinkedList<>(); Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>(); for (Member member : ipList) { //检查nacos服务端是否支持长连接 //支持长连接 if (!MemberUtil.isSupportedLongCon(member)) { httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(), evt.isBeta)); } else { //不支持长连接 rpcQueue.add( new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member)); } } if (!httpQueue.isEmpty()) { //执行异步任务 ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue)); } if (!rpcQueue.isEmpty()) { //执行异步任务 ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue)); } }}
AsyncRpcTask
//com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncRpcTaskclass AsyncRpcTask implements Runnable { private Queue<NotifySingleRpcTask> queue; public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) { this.queue = queue; } @Override public void run() { while (!queue.isEmpty()) { //获取任务 NotifySingleRpcTask task = queue.poll(); //构建异步请求 ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest(); syncRequest.setDataId(task.getDataId()); syncRequest.setGroup(task.getGroup()); syncRequest.setBeta(task.isBeta); syncRequest.setLastModified(task.getLastModified()); syncRequest.setTag(task.tag); syncRequest.setTenant(task.getTenant()); //获取nacos服务端 Member member = task.member; if (memberManager.getSelf().equals(member)) { if (syncRequest.isBeta()) { dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getLastModified(), NetUtils.localIP(), true); } else { //添加dump任务至TaskManager,任务会异步执行(100ms执行一次) dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(), syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP()); } continue; } if (memberManager.hasMember(member.getAddress())) { // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress()); if (unHealthNeedDelay) { // target ip is unhealthy, then put it in the notification list ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null, task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH, 0, member.getAddress()); // get delay time and set fail count to the task asyncTaskExecute(task); } else { if (!MemberUtil.isSupportedLongCon(member)) { asyncTaskExecute( new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag, task.getLastModified(), member.getAddress(), task.isBeta)); } else { try { configClusterRpcClientProxy .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task)); } catch (Exception e) { MetricsMonitor.getConfigNotifyException().increment(); asyncTaskExecute(task); } } } } else { //No nothig if member has offline. } } }}
dump
//com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, boolean)public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp, boolean isBeta) { //构建groupkey String groupKey = GroupKey2.getKey(dataId, group, tenant); String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag); //添加任务至TaskManager dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta)); DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);}
TaskManager异步任务执行源码简要分析
EmbeddedDumpService初始化会进行TaskManager的初始化
TaskManager构造器中调用父类NacosDelayTaskExecuteEngine构造器,初始化父类
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); //processInterval = 100,延迟100ms开始执行异步任务,每100ms执行一次 processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}
ProcessRunnable实现Runnable接口,执行processTasks方法
//com.alibaba.nacos.config.server.manager.TaskManager#processTasksprotected void processTasks() { super.processTasks(); MetricsMonitor.getDumpTaskMonitor().set(tasks.size()); if (tasks.isEmpty()) { this.lock.lock(); try { this.notEmpty.signalAll(); } finally { this.lock.unlock(); } }}//com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasksprotected void processTasks() { //获取所有taskKey Collection<Object> keys = getAllTaskKeys(); for (Object taskKey : keys) { //获取并移除任务 AbstractDelayTask task = removeTask(taskKey); if (null == task) { continue; } //获取任务对应processor NacosTaskProcessor processor = getProcessor(taskKey); if (null == processor) { getEngineLog().error("processor not found for task, so discarded. " + task); continue; } try { // ReAdd task if process failed //执行异步任务,任务执行失败则重入队列 if (!processor.process(task)) { retryFailedTask(taskKey, task); } } catch (Throwable e) { getEngineLog().error("Nacos task execute error ", e); retryFailedTask(taskKey, task); } }}
process
初始化TaskManager时
//com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#processpublic boolean process(NacosTask task) { final PersistService persistService = dumpService.getPersistService(); DumpTask dumpTask = (DumpTask) task; String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey()); String dataId = pair[0]; String group = pair[1]; String tenant = pair[2]; long lastModified = dumpTask.getLastModified(); String handleIp = dumpTask.getHandleIp(); boolean isBeta = dumpTask.isBeta(); String tag = dumpTask.getTag(); //根据配置信息构建ConfigDumpEventBuilder对象 ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId) .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp); //如果发布beta配置,则进行保存配置并更新beta缓存 if (isBeta) { // if publish beta, then dump config, update beta cache ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant); build.remove(Objects.isNull(cf)); build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps()); build.content(Objects.isNull(cf) ? null : cf.getContent()); build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey()); return DumpConfigHandler.configDump(build.build()); } if (StringUtils.isBlank(tag)) { //根据dataId, group, tenant查询config_info表获取配置信息 ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant); //属性设置 build.remove(Objects.isNull(cf)); build.content(Objects.isNull(cf) ? null : cf.getContent()); build.type(Objects.isNull(cf) ? null : cf.getType()); build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey()); } else { ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag); build.remove(Objects.isNull(cf)); build.content(Objects.isNull(cf) ? null : cf.getContent()); } //触发配置持久化事件 return DumpConfigHandler.configDump(build.build());}
configDump
//com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDumppublic static boolean configDump(ConfigDumpEvent event) { final String dataId = event.getDataId(); final String group = event.getGroup(); final String namespaceId = event.getNamespaceId(); final String content = event.getContent(); final String type = event.getType(); final long lastModified = event.getLastModifiedTs(); final String encryptedDataKey = event.getEncryptedDataKey(); ... if (StringUtils.isBlank(event.getTag())) { //聚合白名单内容处理 if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) { AggrWhitelist.load(content); } if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) { ClientIpWhiteList.load(content); } if (dataId.equals(SwitchService.SWITCH_META_DATAID)) { SwitchService.load(content); } boolean result; //事件没有被移除 if (!event.isRemove()) { //配置信息持久化 //同服务端启动推送配置目录中的dump方法解析 result = ConfigCacheService .dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey); //日志记录 if (result) { ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified, content.length()); } } else { result = ConfigCacheService.remove(dataId, group, namespaceId); if (result) { ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(), ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0); } } return result; } ...}
感谢阅读完本篇文章!!!
个人博客地址: huanghong.top