3-指标监听注册梳理

Dubbo 指标模块源码分析-指标监听注册梳理

三、指标监听注册梳理

在前一章中,我们了解了不同收集器中的指标样本是如何被监听器添加进去的。接下来,我们将归纳指标监听器 的创建位置,及它们对应统计的指标。

通过之前的分析,我们已经知道指标 注册事件多播器(RegistryMetricsEventMulticaster)中定义了并绑定了服务注册相关的指标。这种绑定操作同样存在于其它几个简单指标事件多播器(SimpleMetricsEventMulticaster)的几个实现中。

转发器注册

RegistrySubDispatcher (服务注册指标转发器)注册了服务注册相关指标:

  • 应用级实例注册成功/失败/总数计数 (APPLICATION_REGISTER_…)
  • 应用级服务接口订阅成功/失败/总数计数 (APPLICATION_SUBSCRIBE_…)
  • 服务级注册成功/失败/总数计数 (SERVICE_REGISTER_…)
  • 特殊的 APPLICATION_NOTIFY_FINISH 和 APPLICATION_DIRECTORY_POST (应用服务目录变化次数)

MetadataSubDispatcher(元数据指标转发器)注册应用元数据相关指标

  • 应用推送元数据相关计数 (APPLICATION_PUSH_…)

  • 应用订阅元数据相关计数 (APPLICAITON_SUBSCRIBE_…)

  • 服务订阅元数据相关计数 (SERVICE_SUBSCRIBE_…)

ConfigCenterSubDispatcher (配置中心指标转发器) 注册配置中心配置更新次数指标

  • 配置中心推送新配置次数 (CONFIGCENTER_METRIC_TOTAL)

DefaultSubDispatcher (默认转发器) 注册核心RPC调用次数指标

  • 请求次数 (METRIC_REQUESTS)
  • 请求成功次数(METRIC_REQUESTS_SUCCEED)
  • 请求失败次数(METRIC_REQUEST_BUSINESS_FAILED)

MetricsDispatcher

MetricsDispatcher 较为特殊,它负责 ApplicationModel 下所有 MetricsCollector(前文中提到的指标收集器) 的初始化注册工作,并将它们添加到自己的监听器列表中。

public class MetricsDispatcher extends SimpleMetricsEventMulticaster {

    @SuppressWarnings({"rawtypes"})
    public MetricsDispatcher(ApplicationModel applicationModel) {
        ScopeBeanFactory beanFactory = applicationModel.getBeanFactory();
        ExtensionLoader<MetricsCollector> extensionLoader = applicationModel.getExtensionLoader(MetricsCollector.class);
        if (extensionLoader != null) {
            List<MetricsCollector> customizeCollectors = extensionLoader
                .getActivateExtensions();
            for (MetricsCollector customizeCollector : customizeCollectors) {
                beanFactory.registerBean(customizeCollector);
            }
            customizeCollectors.forEach(this::addListener);
        }
    }

}

需要注意,以上几个实现均继承自 SimpleMetricsEventMulticaster,因此它们都具有注册监听、转发事件的能力。它们将自己注册到对应领域的指标 Collector 中,并在收到指标事件时转发到自己注册的监听器中。

//SimpleMetricsEventMulticaster

    public void addListener(MetricsListener<?> listener) {
        listeners.add(listener);
    }
    
    public void publishEvent(MetricsEvent event) {
        if (event instanceof EmptyEvent) {
            return;
        }
        if (validateIfApplicationConfigExist(event)) return;
        for (MetricsListener listener : listeners) {
            if (listener.isSupport(event)) {
                listener.onEvent(event);
            }
        }
    }
//...

SubDispatcher 和 Collector 之间的对应关系:

  • MetadataSubDispatcher -> MetadataMetricsCollector 元数据指标事件
  • RegistrySubDispatcher -> RegistryMetricsCollector 服务注册指标事件
  • ConfigCenterSubDispatcher -> ConfigCenterMetricsCollector 配置中心指标事件
  • MetricsDispatcher 由 MetricsEventBus 通过 BeanFactory 加载。它是所有事件转发的入口。

事件触发

剩下的问题就是这些监听器是如何被触发的。

可以发现三大中心的指标转发器都是在它们对应的Collector中创建的:

public ConfigCenterMetricsCollector(ApplicationModel applicationModel) {
    ...
    super.setEventMulticaster(new ConfigCenterMetricsDispatcher(this));
    ...
}
public MetadataMetricsCollector(ApplicationModel applicationModel) {
   ...
   super.setEventMulticaster(new MetadataMetricsEventMulticaster(this));
   ...
}
public RegistryMetricsCollector(ApplicationModel applicationModel) {
   ...
   super.setEventMulticaster(new RegistryMetricsEventMulticaster(this));
   ...
}

这意味这想要通过它们发布事件,需要通过它们对应的 Collector 来访问。

如前文所述, MetricsDispatcher 在初始化时会尝试获取并加载所有 MetricsCollector 的SPI拓展,

三大中心的MetricsCollector (Metadata/Registry/ConfigCenter)也会在这里被初始化,并添加为 MetricsDispatcher 的监听器:

 public MetricsDispatcher(ApplicationModel applicationModel) {
...
        ExtensionLoader<MetricsCollector> extensionLoader = applicationModel.getExtensionLoader(MetricsCollector.class);
...
        for (MetricsCollector customizeCollector : customizeCollectors) {
             beanFactory.registerBean(customizeCollector);
        }
        customizeCollectors.forEach(this::addListener);
    }

对于 MetricsDispatcher,它由 MetricsEventBus 创建。而 MetricsEventBus 自身作为指标相关消息的总线,会接收所有指标消息,并将它们转发给监听者。

MetricsEvenetBus 提供了三个方法来发布指标事件:

  • publish(MetricsEvent event) ,将事件发布给所有订阅者,只发布一次且不关心事件处理结果
  • post(MetricsEvent event, Supplier<T> targetSupplier) ,将事件发布给所有订阅者,并根据是否产生异常判断事件成功或失败,调用 MetricsDispatcher 发布对应的事件。
  • post(MetricsEvent event, Supplier<T> targetSupplier, Function<T, Boolean> trFunction) ,额外的 trFunction 可用于通过业务结果判断事件成功或失败。 targetSupplier 为业务操作函数,泛型T为业务结果类型。

这三个方法均会通过 MetricsDispatcher 来转发事件。

在之前的分析中,我们知道 MetricsDispatcher 创建了所有 MetricsCollector 拓展,并将它们注册为自己的监听者。

因此,当 MetricsEventBus 接收到发布的信息时,它会将信息转发到所有 MetricsCollector 中。对于 CombMetricsCollector 的实现,它们又会调用自己创建的 MetricsEventMulticaster 再次转发消息,到具体指标的监听器。

之后,这些监听器就会根据自己的逻辑修改Collector中的指标计数。

image-20230629160012950

事件发布

接下来,我们将寻找指标事件发布的源头。

通过前文的分析,我们知道 MetricsEventBus 是所有指标事件发布的入口。具体来说,它有以下的用法:

  • AbstractDirectory
  • ServiceConfig
  • DefaultApplicationDeployer
  • ApolloDynamicConfiguration
  • NacosDynamicConfiguration
  • ZookeeperDataListener
  • AbstractMetadataReport

我们将逐个分析每个用法。

AbstractDirectory

AbstractDirectory 在修改 Invoker 状态相关的操作完成后都会通过 MetricsEventBus 发布 refreshDirectoryEvent(服务目录更新事件,类型为 RegistryEvent ),将当前目录中各种状态 Invoker 实例的最新数量作为附件添加到 RegistryEvent 中。

//AbstractDirectory

    public void recoverDisabledInvoker(Invoker<T> invoker) {
        ...
        MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
    }


    protected void setInvokers(BitList<Invoker<T>> invokers) {
        ...
        MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
    }

    protected void setInvokers(BitList<Invoker<T>> invokers) {
        ...
        MetricsEventBus.publish(RegistryEvent.refreshDirectoryEvent(applicationModel, getSummary()));
    }

    private Map<MetricsKey, Map<String, Integer>> getSummary() {
        Map<MetricsKey, Map<String, Integer>> summaryMap = new HashMap<>();
        //目录中可用的Invoker数量
        summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_VALID, groupByServiceKey(getValidInvokers()));
        //目录中不可用的Invoker数量
        summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_DISABLE, groupByServiceKey(getDisabledInvokers()));
        //目录中等待重连的Invoker数量
        summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_TO_RECONNECT, groupByServiceKey(getInvokersToReconnect()));
        summaryMap.put(MetricsKey.DIRECTORY_METRIC_NUM_ALL, groupByServiceKey(getInvokers()));
        return summaryMap;
    }
...   

该事件最终会由 RegistryMetricsCollector 中的 RegistryMetricsDispatcher 转发到关系该事件的监听器中。事件和监听器之间通过 MetricsKey匹配

最终,MetricsKey 为 **DIRECTORY_METRIC_NUM_VALID** 的监听器会处理这个事件,并更新 Collector中的计数。

//DIRECTORY_METRIC_NUM_VALID 对应的 Listener。
MetricsCat APPLICATION_DIRECTORY_POST = new MetricsCat(MetricsKey.DIRECTORY_METRIC_NUM_VALID, (key, placeType, collector) -> AbstractMetricsListener.onEvent(key,
            event ->
            {
                Map<MetricsKey, Map<String, Integer>> summaryMap = event.getAttachmentValue(ATTACHMENT_DIRECTORY_MAP);
                summaryMap.forEach((metricsKey, map) ->
                    map.forEach(
                        (k, v) -> collector.setNum(metricsKey, event.appName(), k, v)));
            }
        ));

这样,服务目录中不同状态 Invoker 的计数就通过 RegistryMetricsCollector 更新到了 ServiceStatComposite 中。

ServiceConfig

当通过 ServiceConfig 导出、注册一个服务时,它会发布一个服务导出事件。

//ServiceConfig    
    protected synchronized void doExport() {
    ...
        doExportUrls();
        exported();
    }
private void doExportUrls() {
     ...
        MetricsEventBus.post(RegistryEvent.toRsEvent(module.getApplicationModel(), getUniqueServiceName(), protocols.size() * registryURLs.size()),
            //该函数会被同步执行,如果抛出异常则触发 MetricsEvent 的 onError方法,否则触发 onFinish
            () -> {
                for (ProtocolConfig protocolConfig : protocols) {
                    String pathKey = URL.buildKey(getContextPath(protocolConfig)
                        .map(p -> p + "/" + path)
                        .orElse(path), group, version);
                    if (!serverService) {
                        repository.registerService(pathKey, interfaceClass);
                    }
                    doExportUrlsFor1Protocol(protocolConfig, registryURLs);
                }
                return null;
            }
        );
        providerModel.setServiceUrls(urls);
    }

事件发布时,该事件会被转发到 RegistryMetricsCollector,触发对应的 Listener 增加 SERVICE_REGISTER_METRIC_REQUESTS (当前服务级注册请求总数)的计数,然后执行定义的 provider 函数。根据是否抛出异常,之后执行 onError 方法 或 onFinish 方法,增加 SERVICE_REGISTER_METRIC_REQUESTS_FAILED (当前服务级注册请求失败总数)或SERVICE_REGISTER_METRIC_REQUESTS_SUCCEED (当前服务级注册请求成功总数) 的计数。

DefaultApplicationDeployer

它在应用部署过程中,初始化配置中心时,发布配置发生改变的事件。

 private void startConfigCenter() {
 ...
   compositeDynamicConfiguration.addConfiguration(
     prepareEnvironment(configCenter)
   );
 ...
 }
 private DynamicConfiguration prepareEnvironment(ConfigCenterConfig configCenter) {
...
            // Add metrics
            MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, configCenter.getConfigFile(), configCenter.getGroup(),configCenter.getProtocol(), ConfigChangeType.ADDED.name(), configMap.size()));
            if (isNotEmpty(appGroup)) {
                   MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, appConfigFile, appGroup, configCenter.getProtocol(), ConfigChangeType.ADDED.name(), appConfigMap.size()));
            }
 ...
 
    }

prepareEnvironment 方法中,会按配置中心设置的组(group)和当前应用程序的名称作为组名发布两次事件。该事件会被转发到 ConfigCenterMetricsCollector,增加 CONFIGCENTER_METRIC_TOTAL (配置中心配置变化次数)的计数。

ApolloDynamicConfiguration

动态配置功能的 Apollo 配置中心实现。

//ApolloDynamicConfiguration
@Override 
public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) {
        ...
                MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, event.getKey(), event.getGroup(),ConfigCenterEvent.APOLLO_PROTOCOL, ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
 }

当Apollo配置中心的配置发生变化时,它的 onChange 方法会被触发,并在最后发布一个 ConfigCenterEvent。该事件最终转发到ConfigCenterMetricsCollector 中,同样增加 **CONFIGCENTER_METRIC_TOTAL **的计数。

NacosDynamicConfiguration

动态配置功能的 Nacos 配置中心实现。

//NacosDynamicConfiguration
@Override
        public void innerReceive(String dataId, String group, String configInfo) {
        ...
            MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, event.getKey(), event.getGroup(),
                ConfigCenterEvent.NACOS_PROTOCOL, ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
        }

当Nacos配置中心的配置发生变化时,它的 innerReceive 方法被触发,发布一个 ConfigCenterEvent。它的处理流程和 ApolloDynamicConfiguration 一致,最终增加 **CONFIGCENTER_METRIC_TOTAL **的计数。

ZookeeperDataListener

动态配置功能的 Zookeeper 实现。

//ZookeeperDataListener
@Override
    public void dataChanged(String path, Object value, EventType eventType) {
      ...
        MetricsEventBus.publish(ConfigCenterEvent.toChangeEvent(applicationModel, configChangeEvent.getKey(), configChangeEvent.getGroup(),
            ConfigCenterEvent.ZK_PROTOCOL, ConfigChangeType.ADDED.name(), SELF_INCREMENT_SIZE));
    }

在指标收集层面,它的行为和前文中两个配置中心一致,此处不详细展开三个配置中心具体实现的异同。

AbstractMetadataReport

元数据报告接口的抽象实现。它的三个实现 (NacosMetadataReport、RedisMetadataReport、ZookeeperMetadataReport)均使用它的指标事件逻辑。

当订阅新服务并获取它的元数据时,它会发布一个 MetadataEvent 触发相关指标的修改。

private void storeProviderMetadataTask(MetadataIdentifier providerMetadataIdentifier, ServiceDefinition serviceDefinition) {
...
        MetricsEventBus.post(metadataEvent, () ->
            {
                boolean result = true;
                try {
                ...
                    doStoreProviderMetadata(providerMetadataIdentifier, data);
                    saveProperties(providerMetadataIdentifier, data, true, !syncReport);
                ...
                } catch (Exception e) {
                ...
                    result = false;
                }
                return result;
            }, aBoolean -> aBoolean
        );
    }

在前文中,我们提到 MetricsEventBus.post 的第二个参数是实际要进行的业务操作,第三个参数则是根据业务操作返回值判断操作是否成功的逻辑。

此处的业务操作是尝试存储目标服务的元数据。执行操作之前,会先发布事件,最终增加 STORE_PROVIDER_METADATA (尝试存储服务元数据次数)的计数。如果产生异常,会增加 STORE_PROVIDER_METADATA_ERROR (存储服务元数据失败) 的计数,否则增加STORE_PROVIDER_METADATA_SUCCEED(存储服务元数据成功) 的计数。