简介
Apollo(阿波罗)是一款可靠的分布式配置管理中心,诞生于携程框架研发部,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。Apollo 采用 CP 架构。
配置中心原理
客户端向服务端发起一个获取配置信息的 Http 请求并将服务端返回的配置信息保存到本地磁盘与缓存中。
之后每隔 5 分钟发起一次获取配置信息的 Http 请求。这是一个 fallback 机制,为了防止推送机制失效导致配置不更新。可以在运行时指定系统参数 apollo.refreshInterval 来覆盖,单位为分钟。一般情况下,服务端都会返回 304 – Not Modified。
服务端接收到请求后,会保持住 60 秒。如果 60 秒之内有客户端关注的配置发生变化,则将客户端请求返回并告知客户端有配置发生变化的 namespace 信息。如果 60 秒之内没有客户端关注的配置发生变化,则返回 Http 状态码 304 给客户端。因为考虑到会有数万客户端向服务端发起长轮询,因此服务端使用了 Spring 的 DeferredResult 来处理客户端的长轮询请求。
客户端向服务端发起获取对应 namespace 的最新配置的 Http 请求。
ReleaseMessage 表的 Message 字段记录 appid + cluster + namespace 三个维度的信息。服务端通过对比客户端传递的 notificationId 与数据库的 ReleaseMessage 表中相同 appid+cluster+namespace 维度下的最大的主键 id 是否相等,来判断命名空间是否有配置发生了变化。
功能
Apollo 支持如下功能:
统一管理不同环境、不同集群的配置
Apollo 提供了一个统一界面集中式管理不同环境、不同集群、不同命名空间的配置。
同一份代码部署在不同的集群,可以有不同的配置。
通过命名空间可以很方便支持多个不同应用共享同一份配置,同时还允许应用对共享配置进行覆盖。
配置修改实时生效(热发布)
用户在 Apollo 修改完配置并发布后,客户端能实时(1秒)接收到最新的配置,并通知到应用程序。
版本发布管理
所有的配置发布都有版本概念,从而可以方便地支持配置的回滚。
灰度发布
支持配置的灰度发布,比如点了发布后,只对部分应用实例生效,等观察一段时间没问题后再推给所有的应用实例。
权限管理、发布审核、操作审计
应用和配置的管理都有完善的权限管理机制,对配置的管理还分为了编辑和发布两个环节,从而减少人为的错误。
所有的操作都有审计日志,可以方便地追踪问题。
客户端配置信息监控
可以在界面上方便地看到配置在哪些实例使用。
提供Java和.Net原生客户端
提供了Java和.Net原生客户端,方便应用集成。
支持 Spring Placeholder,Annotation 和 Spring Boot 的 ConfigurationProperties,方便应用使用。
同时提供了 Http 接口,非 Java 和 .Net 应用也可以方便地使用。
提供开放平台API
Apollo 自身提供了比较完善的统一配置管理界面,支持多环境、多数据中心配置管理、权限、流程治理等特性。不过 Apollo 出于通用性考虑,不会对配置的修改做过多限制,只要符合基本的格式就能保存,不会针对不同的配置值进行针对性的校验。
部署简单
目前唯一的外部依赖是 MySQL,所以部署非常简单。
Apollo 还提供了打包脚本,一键就可以生成所有需要的安装包,并且支持自定义运行时参数。
总体设计
Apollo 的总体设计如下:
Config Service 提供配置的读取、推送等功能,服务对象是 Apollo 客户端。
Admin Service 提供配置的修改、发布等功能,服务对象是 Apollo Portal(管理界面)。
Config Service 和 Admin Service 都是多实例、无状态部署,所以需要将自己注册到 Eureka 中并保持心跳。
Meta Server 在 Eureka 之上架了一层,用于封装 Eureka 的服务发现接口。
Client 通过域名访问 Meta Server 获取 Config Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Client 侧会做 load balance、错误重试。
Portal 通过域名访问 Meta Server 获取 Admin Service 服务列表(IP+Port),而后直接通过 IP+Port 访问服务,同时在 Portal 侧会做 load balance、错误重试。
为了简化部署,实际上会把 Config Service、Eureka、Meta Server 三个逻辑角色部署在同一个 jvm 进程中。
核心概念
Apollo 支持四个维度管理 key-value 格式的配置 – application(应用)、environment(环境)、cluster(集群)、namespace(命名空间)。
1、application(应用)
实际使用配置的应用,Apollo 客户端在运行时需要知道当前应用是谁,从而可以去获取对应的配置。
每个应用都需要唯一的身份标识,即 appId。
2、environment(环境)
配置对应的环境,Apollo 客户端在运行时需要知道当前应用处于哪个环境,从而可以去获取应用的配置。
3、cluster(集群)
一个应用下不同实例的分组。比如典型的可以按照数据中心分,把上海机房的应用实例分为一个集群,把北京机房的应用实例分为另一个集群。
对于不同的集群,同一个配置可以有不一样的值。
集群默认是通过读取机器上的配置(server.properties 中的 idc 属性)指定的,不过也支持运行时通过 System Property 指定。
4、namespace(命名空间)
一个应用下不同配置的分组,可以简单把命名空间类比为文件,不同类型的配置存放在不同的文件中。
应用可以直接读取到公共组件的配置。
应用也可以通过继承公共组件的配置来对公共组件的配置做出调整。
Namespace
什么是Namespace
Namespace 是配置项的集合,类似于一个配置文件的概念。
Apollo 在创建项目的时候,都会默认创建一个 application 的 namespace。对于 90% 的应用来说,application 的 namespace 已经满足日常配置的使用场景了。
客户端获取 application namespace 的代码如下:
Config config = ConfigService.getAppConfig();
客户端获取非 application namespace 的代码如下:
Config config = ConfigService.getConfig(namespace);
Namespace的格式
支持 properties、xml、yml、yaml、json 等,默认是 properties。
Namespace的获取权限
Namespace 的获取权限分为两种 – private(私有的)、public(公共的)。
获取权限是相对于 Apollo 客户端而言的。
private权限
private 权限的 namespace,只能被所属的应用获取到。一个应用尝试获取其它应用的 private 权限的 namespace,Apollo 会报 404 异常。
public权限
public 权限的 namespace,能被所有应用获取到。
Namespace的类型
Namespace 的类型有三种:私有类型、公共类型、关联类型(继承类型)。
私有类型
私有类型的 namespace 具有 private 权限。
公共类型
公共类型的 namespace 具有 public 权限。公共类型的 namespace 名称必须全局唯一。
使用场景:
- 部门级别共享的配置
- 小组级别共享的配置
- 几个项目之间共享的配置
- 中间件客户端的配置
关联类型
关联类型也称为继承类型,具有 private 权限。关联类型的 namespace 继承于指定的公共类型 namespace,用于覆盖公共 namespace 的某些配置。
例如公共的 namespace 有两个配置项
k1 = v1k2 = v2
然后应用 a 有一个关联类型的 namespace 关联了此公共的 namespace,并且覆盖了配置项 k1,新值为 v3。那么在应用 a 实际运行时,获取到的公共 namespace 的配置为:
k1 = v3k2 = v2
使用场景:
- 对于默认的公共配置可以动态调整。
可用性
场景 | 影响 | 降级 | 原因 |
---|---|---|---|
某台 Config Service 下线 | 无影响 | Config Service 无状态,客户端重连其它 Config Service | |
所有 Config Service 下线 | 客户端无法读取最新配置,Portal 无影响 | 客户端重启时,可以读取本地缓存配置文件 | |
某台 Admin Service 下线 | 无影响 | Admin Service 无状态,Portal 重连其它 Admin Service | |
所有 Admin Service 下线 | 客户端无影响,Portal 无法更新配置 | ||
某台 Portal 下线 | 无影响 | Portal 域名通过 slb 绑定多台服务器,重试后指向可用的服务器 | |
全部 Portal 下线 | 客户端无影响,Portal 无法更新配置 | ||
某个数据中心下线 | 无影响 | 多数据中心部署,数据完全同步,Meta Server/Portal 域名通过 slb 自动切换到其它存活的数据中心 |
源码分析 – 客户端
一、ApolloApplicationContextInitializer
实现了 ApplicationContextInitializer 接口。ApplicationContextInitializer 参考文章
在 Spring 容器初始化时,ApplicationContextInitializer 接口的所有实现类都会被实例化。在 Spring 容器刷新之前,调用 ApplicationContextInitializer 接口的所有实现类的 initialize 方法,可以对上下文做一些操作。
@Overridepublic void initialize(ConfigurableApplicationContext context) {ConfigurableEnvironment environment = context.getEnvironment();// 判断 apollo.bootstrap.enabled 属性值,默认 falseif (!environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED, Boolean.class, false)) {logger.debug("Apollo bootstrap config is not enabled for context {}, see property: ${{}}", context, PropertySourcesConstants.APOLLO_BOOTSTRAP_ENABLED);return;}logger.debug("Apollo bootstrap config is enabled for context {}", context);initialize(environment);}
接着看下 initialize 的重载方法的内部逻辑。
protected void initialize(ConfigurableEnvironment environment) {if (environment.getPropertySources().contains(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME)) {return;}String namespaces = environment.getProperty(PropertySourcesConstants.APOLLO_BOOTSTRAP_NAMESPACES, ConfigConsts.NAMESPACE_APPLICATION);logger.debug("Apollo bootstrap namespaces: {}", namespaces);List<String> namespaceList = NAMESPACE_SPLITTER.splitToList(namespaces);CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);for (String namespace : namespaceList) {// 获取指定命名空间的配置信息Config config = ConfigService.getConfig(namespace);composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));}environment.getPropertySources().addFirst(composite);}
二、ConfigService
public static Config getConfig(String namespace) {return s_instance.getManager().getConfig(namespace);}
三、DefaultConfigManager
@Overridepublic Config getConfig(String namespace) {Config config = m_configs.get(namespace);if (config == null) {synchronized (this) {config = m_configs.get(namespace);if (config == null) {ConfigFactory factory = m_factoryManager.getFactory(namespace);config = factory.create(namespace);m_configs.put(namespace, config);}}}return config;}
使用双重检查锁,创建 Config 实例。
四、DefaultConfigFactory
@Overridepublic Config create(String namespace) {ConfigFileFormat format = determineFileFormat(namespace);// 如果配置文件是 yaml、yml 格式的if (ConfigFileFormat.isPropertiesCompatible(format)) {return new DefaultConfig(namespace, createPropertiesCompatibleFileConfigRepository(namespace, format));}return new DefaultConfig(namespace, createLocalConfigRepository(namespace));}
createLocalConfigRespository
LocalFileConfigRepository createLocalConfigRepository(String namespace) {// 判断环境是否是 LOCALif (m_configUtil.isInLocalMode()) {logger.warn("==== Apollo is in local mode! Won't pull configs from remote server for namespace {} ! ====",namespace);return new LocalFileConfigRepository(namespace);}return new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));}
createRemoteConfigRepository
RemoteConfigRepository createRemoteConfigRepository(String namespace) {return new RemoteConfigRepository(namespace);}
接下来分别分析 LocalFileConfigRepository、RemoteConfigRepository。
五、RemoteConfigRepository
先看下 RemoteConfigRepository 构造器方法。
public RemoteConfigRepository(String namespace) {m_namespace = namespace;m_configCache = new AtomicReference<>();m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);m_longPollServiceDto = new AtomicReference<>();m_remoteMessages = new AtomicReference<>();m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());m_configNeedForceRefresh = new AtomicBoolean(true);m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),m_configUtil.getOnErrorRetryInterval() * 8);// 客户端从Apollo配置中心服务端获取到应用的最新配置后,会保存在内存中,同时持久化到磁盘中// 应用程序可以从客户端获取最新的配置、订阅配置更新通知this.trySync();// 客户端定时调度处理this.schedulePeriodicRefresh();// 客户端长轮询处理this.scheduleLongPollingRefresh();}
1、trySync 方法
首先看下 trySync 方法的处理逻辑。
protected boolean trySync() {try {sync();return true;} catch (Throwable ex) {Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));logger.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil.getDetailMessage(ex));}return false;}
进入 sync 方法内部一窥究竟。
@Overrideprotected synchronized void sync() {Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");try {// 从 m_configCache 缓存中获取本地配置ApolloConfig previous = m_configCache.get();// 从服务端加载远程配置ApolloConfig current = loadApolloConfig();// 如果本地配置与远程配置不一致,即远程配置发生了变化if (previous != current) {logger.debug("Remote Config refreshed!");// 更新 m_configCache 缓存m_configCache.set(current);// 回调所有 RepositoryChangeListener 的 onRepositoryChange 方法this.fireRepositoryChange(m_namespace, this.getConfig());}if (current != null) {Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()),current.getReleaseKey());}transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {transaction.setStatus(ex);throw ex;} finally {transaction.complete();}}
客户端从 Apollo 服务端获取到应用的最新配置后,会更新本地缓存。
2、schedulePeriodicRefresh 方法
初始延迟 5 分钟,之后每隔 5 分钟重复调度一次 trySync 方法。
private void schedulePeriodicRefresh() {logger.debug("Schedule periodic refresh with interval: {} {}",m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());// 默认初始延迟5分钟,之后每隔5分钟重复调度一次// 可以通过 apollo.refreshInterval 属性修改默认值m_executorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));logger.debug("refresh config for namespace: {}", m_namespace);trySync();Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);}}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),m_configUtil.getRefreshIntervalTimeUnit());}
客户端定时从服务端拉取应用的最新配置。
3、scheduleLongPollingRefresh 方法
客户端向服务端发起长轮询请求。实际上客户端和服务端保持了一个长连接,从而能第一时间获得配置更新的推送。
private void scheduleLongPollingRefresh() {// 交给 RemoteConfigLongPollService 处理remoteConfigLongPollService.submit(m_namespace, this);}
接着看下 RemoteConfigLongPollService 的 submit 方法是如何处理的。
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {// 更新 m_longPollNamespaces 缓存boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);// 更新 m_notifications 缓存m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);if (!m_longPollStarted.get()) {// 执行 startLongPolling 方法startLongPolling();}return added;}
接着看下 startLongPolling 方法的处理逻辑。
private void startLongPolling() {if (!m_longPollStarted.compareAndSet(false, true)) {return;}try {final String appId = m_configUtil.getAppId();final String cluster = m_configUtil.getCluster();final String dataCenter = m_configUtil.getDataCenter();final String secret = m_configUtil.getAccessKeySecret();// 默认 2000 毫秒final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();m_longPollingService.submit(new Runnable() {@Overridepublic void run() {if (longPollingInitialDelayInMills > 0) {try {logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);} catch (InterruptedException e) {//ignore}}// 执行 doLongPollingRefresh 方法doLongPollingRefresh(appId, cluster, dataCenter, secret);}});} catch (Throwable ex) {m_longPollStarted.set(false);ApolloConfigException exception =new ApolloConfigException("Schedule long polling refresh failed", ex);Tracer.logError(exception);logger.warn(ExceptionUtil.getDetailMessage(exception));}}
接着看下 doLongPollingRefresh 方法的处理逻辑。
private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {final Random random = new Random();ServiceDTO lastServiceDto = null;while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {// 限流判断if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {try {TimeUnit.SECONDS.sleep(5);} catch (InterruptedException e) {}}Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");String url = null;try {if (lastServiceDto == null) {// 向服务端发起一个 /services/config 的 GET 请求,从响应体中得到 ServiceDTO 列表List<ServiceDTO> configServices = getConfigServices();// 采用随机策略从 ServiceDTO 列表中选出一个 ServiceDTO 实例lastServiceDto = configServices.get(random.nextInt(configServices.size()));}// 组装请求的 URLurl = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,m_notifications);logger.debug("Long polling from {}", url);HttpRequest request = new HttpRequest(url);// 默认读操作的超时时间为 90 秒request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);if (!StringUtils.isBlank(secret)) {Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);request.setHeaders(headers);}transaction.addData("Url", url);// 1、底层采用 HttpURLConnection 向服务端发起一个 /notifications/v2 的 GET 请求final HttpResponse<List<ApolloConfigNotification>> response =m_httpUtil.doGet(request, m_responseType);logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);if (response.getStatusCode() == 200 && response.getBody() != null) {// 2、更新客户端本地 m_notifications 缓存updateNotifications(response.getBody());// 3、更新客户端本地 m_remoteNotificationMessages 缓存updateRemoteNotifications(response.getBody());transaction.addData("Result", response.getBody().toString());// 4、对比客户端本地缓存与服务端返回的配置信息,如果发生变化,则更新本地缓存并触发监听器回调notify(lastServiceDto, response.getBody());}if (response.getStatusCode() == 304 && random.nextBoolean()) {lastServiceDto = null;}m_longPollFailSchedulePolicyInSecond.success();transaction.addData("StatusCode", response.getStatusCode());transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {lastServiceDto = null;Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));transaction.setStatus(ex);long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();logger.warn("Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));try {TimeUnit.SECONDS.sleep(sleepTimeInSecond);} catch (InterruptedException ie) {//ignore}} finally {transaction.complete();}}}
默认的监听器是 LocalFileConfigRepository。
六、LocalFileConfigRepository
先看下 LocalFileConfigRepository 的构造器方法。
public LocalFileConfigRepository(String namespace, ConfigRepository upstream) {m_namespace = namespace;m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);// 获取本地磁盘文件的路径,并创建本地磁盘文件关联的目录this.setLocalCacheDir(findLocalCacheDir(), false);// 将服务端返回的最新配置同步到本地磁盘文件中,并添加监听器this.setUpstreamRepository(upstream);// 将本地磁盘文件的内容同步到本地缓存中this.trySync();}
findLocalCacheDir
获取本地磁盘文件的路径
private File findLocalCacheDir() {try {// 获取自定义本地磁盘路径String defaultCacheDir = m_configUtil.getDefaultLocalCacheDir();Path path = Paths.get(defaultCacheDir);if (!Files.exists(path)) {// 创建目录Files.createDirectories(path);}// 如果目录创建成功,并且处于可写的状态if (Files.exists(path) && Files.isWritable(path)) {// 在父目录基础上创建 config-cache 目录return new File(defaultCacheDir, CONFIG_DIR);}} catch (Throwable ex) {//ignore}return new File(ClassLoaderUtil.getClassPath(), CONFIG_DIR);}
默认的本地磁盘文件的目录如下:
- Mac / Linux:
/opt/data/{appId}/config-cache
- Windows:
C:\opt\data\{appId}\config-cache
getDefaultLocalCacheDir
public String getDefaultLocalCacheDir() {// 获取自定义本地磁盘文件的路径(通过 apollo.cacheDir 属性指定,详情可以查看 Apollo 官方文档)String cacheRoot = getCustomizedCacheRoot();// 如果有指定本地磁盘文件的路径,则追加 {appId} 目录if (!Strings.isNullOrEmpty(cacheRoot)) {return cacheRoot + File.separator + getAppId();}// 否则使用 Apollo 指定的本地磁盘文件的路径cacheRoot = isOSWindows() " />"C:\\opt\\data\\%s" : "/opt/data/%s";return String.format(cacheRoot, getAppId());}
setLocalCacheDir
创建本地磁盘文件关联的目录
void setLocalCacheDir(File baseDir, boolean syncImmediately) {m_baseDir = baseDir;// 检查本地磁盘文件关联的目录是否已创建,如果没有创建,则进行创建this.checkLocalConfigCacheDir(m_baseDir);if (syncImmediately) {this.trySync();}}
setUpstreamRepository
@Overridepublic void setUpstreamRepository(ConfigRepository upstreamConfigRepository) {if (upstreamConfigRepository == null) {return;}if (m_upstream != null) {// 删除旧的监听器m_upstream.removeChangeListener(this);}m_upstream = upstreamConfigRepository;// 将服务端返回的最新配置同步到本地磁盘中trySyncFromUpstream();// 添加监听器upstreamConfigRepository.addChangeListener(this);}
trySyncFromUpstream
将服务端返回的最新配置同步到本地磁盘文件中
private boolean trySyncFromUpstream() {if (m_upstream == null) {return false;}try {updateFileProperties(m_upstream.getConfig(), m_upstream.getSourceType());return true;} catch (Throwable ex) {Tracer.logError(ex);logger.warn("Sync config from upstream repository {} failed, reason: {}", m_upstream.getClass(),ExceptionUtil.getDetailMessage(ex));}return false;}
底层使用 Properties 的 store(OutputStream out, String comments) 方法将远程服务端的配置同步到本地磁盘文件中。
本文磁盘文件名称的格式如下:
{appId}+{cluster}+{namespace}.properties
另外,值得注意的是,LocalFileConfigRepository 作为一个 RepositoryChangeListener,看下 onRepositoryChange 方法,它是如何处理的。
@Overridepublic void onRepositoryChange(String namespace, Properties newProperties) {if (newProperties.equals(m_fileProperties)) {return;}Properties newFileProperties = propertiesFactory.getPropertiesInstance();newFileProperties.putAll(newProperties);// 1、将配置持久化到磁盘中updateFileProperties(newFileProperties, m_upstream.getSourceType());// 2、触发监听器回调this.fireRepositoryChange(namespace, newProperties);}
1、updateFileProperties 方法
private synchronized void updateFileProperties(Properties newProperties, ConfigSourceType sourceType) {this.m_sourceType = sourceType;if (newProperties.equals(m_fileProperties)) {return;}this.m_fileProperties = newProperties;persistLocalCacheFile(m_baseDir, m_namespace);}
进入 persistLocalCacheFile 方法一窥究竟。
void persistLocalCacheFile(File baseDir, String namespace) {if (baseDir == null) {return;}// 文件名为 ${appId} + ${cluster} + ${namespace}.propertiesFile file = assembleLocalCacheFile(baseDir, namespace);OutputStream out = null;Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "persistLocalConfigFile");transaction.addData("LocalConfigFile", file.getAbsolutePath());try {out = new FileOutputStream(file);// 底层调用 Properties 的 store 方法将配置持久化到磁盘中m_fileProperties.store(out, "Persisted by DefaultConfig");transaction.setStatus(Transaction.SUCCESS);} catch (IOException ex) {ApolloConfigException exception =new ApolloConfigException(String.format("Persist local cache file %s failed", file.getAbsolutePath()), ex);Tracer.logError(exception);transaction.setStatus(exception);logger.warn("Persist local cache file {} failed, reason: {}.", file.getAbsolutePath(),ExceptionUtil.getDetailMessage(ex));} finally {if (out != null) {try {out.close();} catch (IOException ex) {//ignore}}transaction.complete();}}
简单看下 Properties 的 store 方法是如何持久化的。
public void store(OutputStream out, String comments)throws IOException{store0(new BufferedWriter(new OutputStreamWriter(out, "8859_1")), comments, true);}private void store0(BufferedWriter bw, String comments, boolean escUnicode)throws IOException{if (comments != null) {writeComments(bw, comments);}bw.write("#" + new Date().toString());bw.newLine();synchronized (this) {for (Enumeration<?> e = keys(); e.hasMoreElements();) {String key = (String)e.nextElement();String val = (String)get(key);key = saveConvert(key, true, escUnicode);val = saveConvert(val, false, escUnicode);bw.write(key + "=" + val);bw.newLine();}}bw.flush();}
可以看出底层使用 BufferedWriter 的 write 方法将数据写入到磁盘中。
2、fireRepositoryChange 方法
应用程序可以从 Apollo 客户端获取最新的配置、订阅配置更新通知。
protected void fireRepositoryChange(String namespace, Properties newProperties) {for (RepositoryChangeListener listener : m_listeners) {try {listener.onRepositoryChange(namespace, newProperties);} catch (Throwable ex) {Tracer.logError(ex);logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);}}}
默认的监听器是 DefaultConfig。
七、DefaultConfig
看下 DefaultConfig 是如何回调处理的。
@Overridepublic synchronized void onRepositoryChange(String namespace, Properties newProperties) {if (newProperties.equals(m_configProperties.get())) {return;}ConfigSourceType sourceType = m_configRepository.getSourceType();Properties newConfigProperties = propertiesFactory.getPropertiesInstance();newConfigProperties.putAll(newProperties);Map<String, ConfigChange> actualChanges = updateAndCalcConfigChanges(newConfigProperties, sourceType);if (actualChanges.isEmpty()) {return;}// 监听器回调处理this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));Tracer.logEvent("Apollo.Client.ConfigChanges", m_namespace);}
进入 fireConfigChange 方法一窥究竟。
protected void fireConfigChange(final ConfigChangeEvent changeEvent) {for (final ConfigChangeListener listener : m_listeners) {if (!isConfigChangeListenerInterested(listener, changeEvent)) {continue;}m_executorService.submit(new Runnable() {@Overridepublic void run() {String listenerName = listener.getClass().getName();Transaction transaction = Tracer.newTransaction("Apollo.ConfigChangeListener", listenerName);try {// 触发监听器回调listener.onChange(changeEvent);transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {transaction.setStatus(ex);Tracer.logError(ex);logger.error("Failed to invoke config change listener {}", listenerName, ex);} finally {transaction.complete();}}});}
监听器默认是 AutoUpdateConfigChangeListener。
八、AutoUpdateConfigChangeListener
看下 AutoUpdateConfigChangeListener 是如何回调处理的。
@Overridepublic void onChange(ConfigChangeEvent changeEvent) {// 获取发生变化的属性集合Set<String> keys = changeEvent.changedKeys();if (CollectionUtils.isEmpty(keys)) {return;}for (String key : keys) {// 获取原有的属性以及属性值集合Collection<SpringValue> targetValues = springValueRegistry.get(beanFactory, key);if (targetValues == null || targetValues.isEmpty()) {continue;}// 更新属性值for (SpringValue val : targetValues) {updateSpringValue(val);}}}
进入 updateSpringValue 方法一窥究竟。
private void updateSpringValue(SpringValue springValue) {try {// 获取经过解析后的属性值Object value = resolvePropertyValue(springValue);// 更新属性值springValue.update(value);// 日志打印logger.info("Auto update apollo changed value successfully, new value: {}, {}", value,springValue);} catch (Throwable ex) {logger.error("Auto update apollo changed value failed, {}", springValue.toString(), ex);}}
九、SpringValue
简单看下,如何更新属性值的。
public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {if (isField()) {injectField(newVal);} else {injectMethod(newVal);}}private void injectField(Object newVal) throws IllegalAccessException {Object bean = beanRef.get();if (bean == null) {return;}boolean accessible = field.isAccessible();field.setAccessible(true);// 底层使用 Field 的 set 方法更新属性值field.set(bean, newVal);field.setAccessible(accessible);}private void injectMethod(Object newVal)throws InvocationTargetException, IllegalAccessException {Object bean = beanRef.get();if (bean == null) {return;}// 底层使用 Method 的 invoke 方法更新属性值methodParameter.getMethod().invoke(bean, newVal);}
源码分析 – 服务端
一、NotificationControllerV2
看下服务端是如何处理客户端的长轮询请求的。
pollNotification 方法
@GetMappingpublic DeferredResult<ResponseEntity<List<ApolloConfigNotification>>> pollNotification(@RequestParam(value = "appId") String appId,@RequestParam(value = "cluster") String cluster,@RequestParam(value = "notifications") String notificationsAsString,@RequestParam(value = "dataCenter", required = false) String dataCenter,@RequestParam(value = "ip", required = false) String clientIp) {List<ApolloConfigNotification> notifications = null;try {// 将客户端的 notifications 解析为 ApolloConfigNotification 列表notifications =gson.fromJson(notificationsAsString, notificationsTypeReference);} catch (Throwable ex) {Tracer.logError(ex);}if (CollectionUtils.isEmpty(notifications)) {throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);}// 过滤 ApolloConfigNotificationMap<String, ApolloConfigNotification> filteredNotifications = filterNotifications(appId, notifications);if (CollectionUtils.isEmpty(filteredNotifications)) {throw new BadRequestException("Invalid format of notifications: " + notificationsAsString);}// 构建 DeferredResultWrapper 实例DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(bizConfig.longPollingTimeoutInMilli());Set<String> namespaces = Sets.newHashSetWithExpectedSize(filteredNotifications.size());Map<String, Long> clientSideNotifications = Maps.newHashMapWithExpectedSize(filteredNotifications.size());for (Map.Entry<String, ApolloConfigNotification> notificationEntry : filteredNotifications.entrySet()) {String normalizedNamespace = notificationEntry.getKey();ApolloConfigNotification notification = notificationEntry.getValue();namespaces.add(normalizedNamespace);clientSideNotifications.put(normalizedNamespace, notification.getNotificationId());if (!Objects.equals(notification.getNamespaceName(), normalizedNamespace)) {deferredResultWrapper.recordNamespaceNameNormalizedResult(notification.getNamespaceName(), normalizedNamespace);}}Multimap<String, String> watchedKeysMap =watchKeysUtil.assembleAllWatchKeys(appId, cluster, namespaces, dataCenter);// 获取客户端需要关注的key集合Set<String> watchedKeys = Sets.newHashSet(watchedKeysMap.values());deferredResultWrapper.onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys"));deferredResultWrapper.onCompletion(() -> {for (String key : watchedKeys) {deferredResults.remove(key, deferredResultWrapper);}logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys");});for (String key : watchedKeys) {this.deferredResults.put(key, deferredResultWrapper);}logWatchedKeys(watchedKeys, "Apollo.LongPoll.RegisteredKeys");logger.debug("Listening {} from appId: {}, cluster: {}, namespace: {}, datacenter: {}",watchedKeys, appId, cluster, namespaces, dataCenter);// 从 releaseMessageCache 缓存获取对应的 ReleaseMessage 实例,并组装成集合List<ReleaseMessage> latestReleaseMessages =releaseMessageService.findLatestReleaseMessagesGroupByMessages(watchedKeys);entityManagerUtil.closeEntityManager();// 获取服务端发生变化的 ApolloConfigNotification 列表(实际上是比较客户端传递的notificationId与服务端对应ReleaseMessage表中的主键id)List<ApolloConfigNotification> newNotifications =getApolloConfigNotifications(namespaces, clientSideNotifications, watchedKeysMap,latestReleaseMessages);if (!CollectionUtils.isEmpty(newNotifications)) {// 底层调用 DeferredResult 的 setResult 方法deferredResultWrapper.setResult(newNotifications);}// 返回服务端对于客户端请求的响应return deferredResultWrapper.getResult();}
getApolloConfigNotifications 方法
private List<ApolloConfigNotification> getApolloConfigNotifications(Set<String> namespaces,Map<String, Long> clientSideNotifications,Multimap<String, String> watchedKeysMap,List<ReleaseMessage> latestReleaseMessages) {List<ApolloConfigNotification> newNotifications = Lists.newArrayList();if (!CollectionUtils.isEmpty(latestReleaseMessages)) {Map<String, Long> latestNotifications = Maps.newHashMap();for (ReleaseMessage releaseMessage : latestReleaseMessages) {latestNotifications.put(releaseMessage.getMessage(), releaseMessage.getId());}for (String namespace : namespaces) {long clientSideId = clientSideNotifications.get(namespace);long latestId = ConfigConsts.NOTIFICATION_ID_PLACEHOLDER;Collection<String> namespaceWatchedKeys = watchedKeysMap.get(namespace);for (String namespaceWatchedKey : namespaceWatchedKeys) {long namespaceNotificationId =latestNotifications.getOrDefault(namespaceWatchedKey, ConfigConsts.NOTIFICATION_ID_PLACEHOLDER);if (namespaceNotificationId > latestId) {// latestId记录最大的notificationIdlatestId = namespaceNotificationId;}}// 说明服务端的命名空间的配置发生了变化if (latestId > clientSideId) {ApolloConfigNotification notification = new ApolloConfigNotification(namespace, latestId);namespaceWatchedKeys.stream().filter(latestNotifications::containsKey).forEach(namespaceWatchedKey ->notification.addMessage(namespaceWatchedKey, latestNotifications.get(namespaceWatchedKey)));newNotifications.add(notification);}}}return newNotifications;}
可见,服务端向客户端返回的主要是 ReleaseMessage 表中的主键 id + namespace 信息。
最后重点看下 releaseMessageCache 缓存存储 ReleaseMessage 实例的时机。
二、ReleaseMessageScanner
由于它实现了 InitializingBean 接口,我们重点看下它的 afterPropertiesSet 方法的实现。
在 Bean 被实例化并且属性注入后,会调用 InitializingBean 的 afterPropertiesSet 方法。与 init-method 相似,但是略早于 init-method。它们两个的执行时机在 BeanPostProcessor 的 postProcessBeforeInitailization 方法和 postProcessAfterInitialization 方法之间。 InitializingBean 的参考文章
@Overridepublic void afterPropertiesSet() throws Exception {databaseScanInterval = bizConfig.releaseMessageScanIntervalInMilli();maxIdScanned = loadLargestMessageId();executorService.scheduleWithFixedDelay(() -> {Transaction transaction = Tracer.newTransaction("Apollo.ReleaseMessageScanner", "scanMessage");try {scanMessages();transaction.setStatus(Transaction.SUCCESS);} catch (Throwable ex) {transaction.setStatus(ex);logger.error("Scan and send message failed", ex);} finally {transaction.complete();}}, databaseScanInterval, databaseScanInterval, TimeUnit.MILLISECONDS);}
scanMessages 方法
private void scanMessages() {boolean hasMoreMessages = true;while (hasMoreMessages && !Thread.currentThread().isInterrupted()) {hasMoreMessages = scanAndSendMessages();}}
scanAndSendMessages 方法
private boolean scanAndSendMessages() {// 从数据库中获取大于maxIdScanned的ReleaseMessage列表List<ReleaseMessage> releaseMessages =releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);if (CollectionUtils.isEmpty(releaseMessages)) {return false;}// 触发监听器的回调fireMessageScanned(releaseMessages);int messageScanned = releaseMessages.size();maxIdScanned = releaseMessages.get(messageScanned - 1).getId();return messageScanned == 500;}
fireMessageScanned 方法
private void fireMessageScanned(List<ReleaseMessage> messages) {for (ReleaseMessage message : messages) {for (ReleaseMessageListener listener : listeners) {try {// 触发监听器的回调listener.handleMessage(message, Topics.APOLLO_RELEASE_TOPIC);} catch (Throwable ex) {Tracer.logError(ex);logger.error("Failed to invoke message listener {}", listener.getClass(), ex);}}}}
ReleaseMessageServiceWithCache 就是其中一个监听器。
三、ReleaseMessageServiceWithCache
@Overridepublic void handleMessage(ReleaseMessage message, String channel) {doScan.set(false);logger.info("message received - channel: {}, message: {}", channel, message);String content = message.getMessage();Tracer.logEvent("Apollo.ReleaseMessageService.UpdateCache", String.valueOf(message.getId()));if (!Topics.APOLLO_RELEASE_TOPIC.equals(channel) || Strings.isNullOrEmpty(content)) {return;}long gap = message.getId() - maxIdScanned;if (gap == 1) {mergeReleaseMessage(message);} else if (gap > 1) {loadReleaseMessages(maxIdScanned);}}
先看下 mergeReleaseMessage 方法的处理。
private synchronized void mergeReleaseMessage(ReleaseMessage releaseMessage) {ReleaseMessage old = releaseMessageCache.get(releaseMessage.getMessage());if (old == null || releaseMessage.getId() > old.getId()) {releaseMessageCache.put(releaseMessage.getMessage(), releaseMessage);maxIdScanned = releaseMessage.getId();}}
可见,往 releaseMessageCache 缓存里添加 ReleaseMessage 相关信息。
然后看下 loadReleaseMessages 方法的处理。
private void loadReleaseMessages(long startId) {boolean hasMore = true;while (hasMore && !Thread.currentThread().isInterrupted()) {// 从数据库中获取大于maxIdScanned的ReleaseMessage列表List<ReleaseMessage> releaseMessages = releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(startId);if (CollectionUtils.isEmpty(releaseMessages)) {break;}// 调用 mergeReleaseMessage 方法releaseMessages.forEach(this::mergeReleaseMessage);int scanned = releaseMessages.size();startId = releaseMessages.get(scanned - 1).getId();hasMore = scanned == 500;logger.info("Loaded {} release messages with startId {}", scanned, startId);}}