又是美好的一天呀~
个人博客地址: huanghong.top
NacosWatch用于定时与Nacos服务端数据同步当前服务的metadata数据并定时发布心跳事件。
默认的taskScheduler只是定时发一个HeartbeatEvent事件,并没有定时拉取数据操作,这个定时任务只是配合服务端主动推送更新后,定时通知其他组件进行相应更新,比如网关收到这个事件后会重新初始化Route。
//创建NacosWatch实例
public NacosWatch(NacosServiceManager nacosServiceManager,NacosDiscoveryProperties properties,ObjectProvider taskScheduler) {this.nacosServiceManager = nacosServiceManager;this.properties = properties;//如果taskScheduler没有可用的就创建默认的this.taskScheduler = taskScheduler.getIfAvailable(NacosWatch::getTaskScheduler);
}//创建默认的taskScheduler
private static ThreadPoolTaskScheduler getTaskScheduler() {ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();taskScheduler.setBeanName("Nacos-Watch-Task-Scheduler");taskScheduler.initialize();return taskScheduler;
}//发布HeartbeatEvent事件
public void nacosServicesWatch() {this.publisher.publishEvent(new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));
}@Override
public void start() {//定期发布事件,默认30sthis.watchFuture = this.taskScheduler.scheduleWithFixedDelay(this::nacosServicesWatch, this.properties.getWatchDelay());}@Override
public void stop() {if (this.watchFuture != null) {//停止线程池((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();//停止线程任务this.watchFuture.cancel(true);}
}
复制代码
这里注册监听器主要是为了更新matadata
@Override
public void start() {//创建监听器,一般一个服务只会创建1个,但如果在线修改了service和group,就可能多个了,那之前那个难道不应该被替换?想不通为啥要用listenerMap存。EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),event -> new EventListener() {@Overridepublic void onEvent(Event event) {//只处理服务变更事件if (event instanceof NamingEvent) {List instances = ((NamingEvent) event).getInstances();Optional instanceOptional = selectCurrentInstance(instances);instanceOptional.ifPresent(currentInstance -> {resetIfNeeded(currentInstance);});}}});//获取服务中心ServiceNamingService namingService = nacosServiceManager.getNamingService(properties.getNacosProperties()); //订阅EventListener namingService.subscribe(properties.getService(),properties.getGroup(),Arrays.asList(properties.getClusterName()), eventListener);
}//key
private String buildKey() {return String.join(":", properties.getService(), properties.getGroup());
}//获取当前节点实例
private Optional selectCurrentInstance(List instances) {return instances.stream().filter(instance -> properties.getIp().equals(instance.getIp())&& properties.getPort() == instance.getPort()).findFirst();
}//更新Metadata
private void resetIfNeeded(Instance instance) {if (!properties.getMetadata().equals(instance.getMetadata())) {properties.setMetadata(instance.getMetadata());}
}
复制代码
监听器最终会被注册到EventDispatcher中的observerMap中,先来看EventDispatcher中这部分代码
public EventDispatcher() {//创建单线程池this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");thread.setDaemon(true);return thread;}});//执行Notifier线程this.executor.execute(new Notifier());
}//注册Listener
public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {List observers = Collections.synchronizedList(new ArrayList());observers.add(listener);observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);//搞不懂为啥还要添加到前一个List中if (observers != null) {observers.add(listener);}//将serviceInfo加到changedServices中,这里只会changedServices只有一个serviceInfoserviceChanged(serviceInfo);
}//添加ServiceInfo
public void serviceChanged(ServiceInfo serviceInfo) {if (serviceInfo == null) {return;}changedServices.add(serviceInfo);
}
复制代码
Notifier
Notifier是EventDispatcher的一个内部类,主要用于定时扫描ServiceInfo中的List变更情况,来通知listener
private class Notifier implements Runnable {@Overridepublic void run() {//循环while (!closed) {ServiceInfo serviceInfo = null;try {//获取队列中ServiceInfo,超时5分钟serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);} catch (Exception ignore) {}if (serviceInfo == null) {continue;}try {List listeners = observerMap.get(serviceInfo.getKey());if (!CollectionUtils.isEmpty(listeners)) {for (EventListener listener : listeners) {List hosts = Collections.unmodifiableList(serviceInfo.getHosts());//向listener发送NamingEvent事件,这里hosts就是当前服务的所有节点listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),serviceInfo.getClusters(), hosts));}}} catch (Exception e) {NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);}}}
}
复制代码
看到这,有个疑问,changedServices中serviceInfo取完了就没有了,说明有个地方不断调用serviceChanged方法往队列中添加serviceInfo。查到是HostReactor中有调用,原来在NacosNamingService的init方法中创建了EventDispatcher和HostReactor实例,将EventDispatcher注入到HostReactor中,当serviceInfo有变更的时候就添加到changedServices。整体流程图如下:

NacosWatch的作用就是更新当前节点的metadata和定时发送一个HearbeatEvent事件,可以监听这个事件做一些配置的动态更新操作。metadata中一般可设置一些状态标识,比如灰度发布可以设置版本号,配合网关把流量做定向分发。
感谢阅读完本篇文章!!!
个人博客地址: huanghong.top