又是美好的一天呀~
个人博客地址: huanghong.top

往下看看~

    • 服务端配置修改的事件推送
      • publishConfig
      • onEvent
      • AsyncRpcTask
      • dump
      • process
      • configDump

服务端配置修改的事件推送

通过nacos ui界面新增或修改配置信息(接口访问同理)

#post请求#示例urlhttp://localhost:8848/nacos/v1/cs/configs?accessToken=eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJuYWNvcyIsImV4cCI6MTY3MjgzMjQ3Nn0.7Ph4LSo2z4FYO09bZP145HRxXJJI5ZjepDodGT2LS4s&message=true#示例请求体dataId: config.propertiesgroup: DEFAULT_GROUPcontent: key%3Dvalueconfig_tags: type: propertiesappName: tenant: namespaceId: 

请求会进入到com.alibaba.nacos.config.server.controller.ConfigController#publishConfig方法中

publishConfig

//com.alibaba.nacos.config.server.controller.ConfigController#publishConfig//根据传入接口的参数查询config_info表,判断配置信息是插入还是更新persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, false);//发布ConfigDataChangeEvent事件ConfigChangePublisher.notifyConfigChange(        new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));

onEvent

//com.alibaba.nacos.config.server.service.notify.AsyncNotifyService//com.alibaba.nacos.common.notify.listener.Subscriber#onEventpublic void onEvent(Event event) {    // Generate ConfigDataChangeEvent concurrently    if (event instanceof ConfigDataChangeEvent) {        ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;        long dumpTs = evt.lastModifiedTs;        String dataId = evt.dataId;        String group = evt.group;        String tenant = evt.tenant;        String tag = evt.tag;        //获取所有nacos服务端地址        Collection<Member> ipList = memberManager.allMembers();        // In fact, any type of queue here can be        Queue<NotifySingleTask> httpQueue = new LinkedList<>();        Queue<NotifySingleRpcTask> rpcQueue = new LinkedList<>();        for (Member member : ipList) {            //检查nacos服务端是否支持长连接            //支持长连接            if (!MemberUtil.isSupportedLongCon(member)) {                httpQueue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),                                                   evt.isBeta));            } else {                //不支持长连接                rpcQueue.add(                    new NotifySingleRpcTask(dataId, group, tenant, tag, dumpTs, evt.isBeta, member));            }        }        if (!httpQueue.isEmpty()) {            //执行异步任务            ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, httpQueue));        }        if (!rpcQueue.isEmpty()) {            //执行异步任务            ConfigExecutor.executeAsyncNotify(new AsyncRpcTask(rpcQueue));        }    }}

AsyncRpcTask

//com.alibaba.nacos.config.server.service.notify.AsyncNotifyService.AsyncRpcTaskclass AsyncRpcTask implements Runnable {        private Queue<NotifySingleRpcTask> queue;        public AsyncRpcTask(Queue<NotifySingleRpcTask> queue) {        this.queue = queue;    }        @Override    public void run() {        while (!queue.isEmpty()) {            //获取任务            NotifySingleRpcTask task = queue.poll();            //构建异步请求            ConfigChangeClusterSyncRequest syncRequest = new ConfigChangeClusterSyncRequest();            syncRequest.setDataId(task.getDataId());            syncRequest.setGroup(task.getGroup());            syncRequest.setBeta(task.isBeta);            syncRequest.setLastModified(task.getLastModified());            syncRequest.setTag(task.tag);            syncRequest.setTenant(task.getTenant());            //获取nacos服务端            Member member = task.member;            if (memberManager.getSelf().equals(member)) {                if (syncRequest.isBeta()) {                    dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),                            syncRequest.getLastModified(), NetUtils.localIP(), true);                } else {                    //添加dump任务至TaskManager,任务会异步执行(100ms执行一次)                    dumpService.dump(syncRequest.getDataId(), syncRequest.getGroup(), syncRequest.getTenant(),                            syncRequest.getTag(), syncRequest.getLastModified(), NetUtils.localIP());                }                continue;            }                        if (memberManager.hasMember(member.getAddress())) {                // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify                boolean unHealthNeedDelay = memberManager.isUnHealth(member.getAddress());                if (unHealthNeedDelay) {                    // target ip is unhealthy, then put it in the notification list                    ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,                            task.getLastModified(), InetUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,                            0, member.getAddress());                    // get delay time and set fail count to the task                    asyncTaskExecute(task);                } else {                    if (!MemberUtil.isSupportedLongCon(member)) {                        asyncTaskExecute(                                new NotifySingleTask(task.getDataId(), task.getGroup(), task.getTenant(), task.tag,                                        task.getLastModified(), member.getAddress(), task.isBeta));                    } else {                        try {                            configClusterRpcClientProxy                                    .syncConfigChange(member, syncRequest, new AsyncRpcNotifyCallBack(task));                        } catch (Exception e) {                            MetricsMonitor.getConfigNotifyException().increment();                            asyncTaskExecute(task);                        }                    }                                  }            } else {                //No nothig if  member has offline.            }                    }    }}

dump

//com.alibaba.nacos.config.server.service.dump.DumpService#dump(java.lang.String, java.lang.String, java.lang.String, java.lang.String, long, java.lang.String, boolean)public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,        boolean isBeta) {    //构建groupkey    String groupKey = GroupKey2.getKey(dataId, group, tenant);    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);    //添加任务至TaskManager    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);}

TaskManager异步任务执行源码简要分析

  1. EmbeddedDumpService初始化会进行TaskManager的初始化

  2. TaskManager构造器中调用父类NacosDelayTaskExecuteEngine构造器,初始化父类

    public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {    super(logger);    tasks = new ConcurrentHashMap<>(initCapacity);    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));    //processInterval = 100,延迟100ms开始执行异步任务,每100ms执行一次    processingExecutor            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}
  3. ProcessRunnable实现Runnable接口,执行processTasks方法

    //com.alibaba.nacos.config.server.manager.TaskManager#processTasksprotected void processTasks() {    super.processTasks();    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());    if (tasks.isEmpty()) {        this.lock.lock();        try {            this.notEmpty.signalAll();        } finally {            this.lock.unlock();        }    }}//com.alibaba.nacos.common.task.engine.NacosDelayTaskExecuteEngine#processTasksprotected void processTasks() {    //获取所有taskKey    Collection<Object> keys = getAllTaskKeys();    for (Object taskKey : keys) {        //获取并移除任务        AbstractDelayTask task = removeTask(taskKey);        if (null == task) {            continue;        }        //获取任务对应processor        NacosTaskProcessor processor = getProcessor(taskKey);        if (null == processor) {            getEngineLog().error("processor not found for task, so discarded. " + task);            continue;        }        try {            // ReAdd task if process failed            //执行异步任务,任务执行失败则重入队列            if (!processor.process(task)) {                retryFailedTask(taskKey, task);            }        } catch (Throwable e) {            getEngineLog().error("Nacos task execute error ", e);            retryFailedTask(taskKey, task);        }    }}

process

初始化TaskManager时

//com.alibaba.nacos.config.server.service.dump.processor.DumpProcessor#processpublic boolean process(NacosTask task) {    final PersistService persistService = dumpService.getPersistService();    DumpTask dumpTask = (DumpTask) task;    String[] pair = GroupKey2.parseKey(dumpTask.getGroupKey());    String dataId = pair[0];    String group = pair[1];    String tenant = pair[2];    long lastModified = dumpTask.getLastModified();    String handleIp = dumpTask.getHandleIp();    boolean isBeta = dumpTask.isBeta();    String tag = dumpTask.getTag();    //根据配置信息构建ConfigDumpEventBuilder对象    ConfigDumpEvent.ConfigDumpEventBuilder build = ConfigDumpEvent.builder().namespaceId(tenant).dataId(dataId)            .group(group).isBeta(isBeta).tag(tag).lastModifiedTs(lastModified).handleIp(handleIp);    //如果发布beta配置,则进行保存配置并更新beta缓存    if (isBeta) {        // if publish beta, then dump config, update beta cache        ConfigInfo4Beta cf = persistService.findConfigInfo4Beta(dataId, group, tenant);                build.remove(Objects.isNull(cf));        build.betaIps(Objects.isNull(cf) ? null : cf.getBetaIps());        build.content(Objects.isNull(cf) ? null : cf.getContent());        build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());                return DumpConfigHandler.configDump(build.build());    }    if (StringUtils.isBlank(tag)) {        //根据dataId, group, tenant查询config_info表获取配置信息        ConfigInfo cf = persistService.findConfigInfo(dataId, group, tenant);        //属性设置        build.remove(Objects.isNull(cf));        build.content(Objects.isNull(cf) ? null : cf.getContent());        build.type(Objects.isNull(cf) ? null : cf.getType());        build.encryptedDataKey(Objects.isNull(cf) ? null : cf.getEncryptedDataKey());    } else {        ConfigInfo4Tag cf = persistService.findConfigInfo4Tag(dataId, group, tenant, tag);                build.remove(Objects.isNull(cf));        build.content(Objects.isNull(cf) ? null : cf.getContent());            }    //触发配置持久化事件    return DumpConfigHandler.configDump(build.build());}

configDump

//com.alibaba.nacos.config.server.service.dump.DumpConfigHandler#configDumppublic static boolean configDump(ConfigDumpEvent event) {    final String dataId = event.getDataId();    final String group = event.getGroup();    final String namespaceId = event.getNamespaceId();    final String content = event.getContent();    final String type = event.getType();    final long lastModified = event.getLastModifiedTs();    final String encryptedDataKey = event.getEncryptedDataKey();    ...    if (StringUtils.isBlank(event.getTag())) {        //聚合白名单内容处理        if (dataId.equals(AggrWhitelist.AGGRIDS_METADATA)) {            AggrWhitelist.load(content);        }                if (dataId.equals(ClientIpWhiteList.CLIENT_IP_WHITELIST_METADATA)) {            ClientIpWhiteList.load(content);        }                if (dataId.equals(SwitchService.SWITCH_META_DATAID)) {            SwitchService.load(content);        }                boolean result;        //事件没有被移除        if (!event.isRemove()) {            //配置信息持久化            //同服务端启动推送配置目录中的dump方法解析            result = ConfigCacheService                    .dump(dataId, group, namespaceId, content, lastModified, type, encryptedDataKey);            //日志记录            if (result) {                ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),                        ConfigTraceService.DUMP_EVENT_OK, System.currentTimeMillis() - lastModified,                        content.length());            }        } else {            result = ConfigCacheService.remove(dataId, group, namespaceId);                        if (result) {                ConfigTraceService.logDumpEvent(dataId, group, namespaceId, null, lastModified, event.getHandleIp(),                        ConfigTraceService.DUMP_EVENT_REMOVE_OK, System.currentTimeMillis() - lastModified, 0);            }        }        return result;    }    ...}

感谢阅读完本篇文章!!!
个人博客地址: huanghong.top