Nacos客户端在启动时以及定时任务会发送Http请求,调用Nacos服务端接口/nacos/v1/ns/instance进行服务注册。
我们根据url找到Nacos服务端服务注册的源码入口:
com.alibaba.nacos.naming.controllers.InstanceController#register
将请求封装为Instance,注意serviceName使用@@分隔组名和服务名,如DEFAULT_GROUP@@user,存入注册表中的serviceName也是包含组名的。
com.alibaba.nacos.naming.controllers.InstanceController#register
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {// 服务注册的入口// 命名空间final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);// 服务名称,使用@@分隔组名和服务名,如DEFAULT_GROUP@@userfinal String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);NamingUtils.checkServiceNameFormat(serviceName);// 将请求包装为Instance,包含IP、端口、服务名、元数据等信息// serviceName上面已获取,直接传入parseInstance()不好吗,parseInstance()里面又要去获取一次final Instance instance = parseInstance(request);serviceManager.registerInstance(namespaceId, serviceName, instance);return "ok";
}
创建一个空的Service放入serviceMap,将实例instance添加到服务service里面,注意这里只是更新到缓存中,还没改注册表。
com.alibaba.nacos.naming.core.ServiceManager#registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {/*** Map(namespace, Map(group::serviceName, Service)).* 创建一个空的Service放入serviceMap*/createEmptyService(namespaceId, serviceName, instance.isEphemeral());// 根据namespaceId和serviceName获得Service// Service不是由createEmptyService()创建的吗,直接createEmptyService()返回Service岂不更好,省得这里又去查一次Service service = getService(namespaceId, serviceName);if (service == null) {throw new NacosException(NacosException.INVALID_PARAM,"service not found, namespace: " + namespaceId + ", service: " + serviceName);}// 将实例instance添加到服务service里面addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
如果服务不存在就创建一个。
com.alibaba.nacos.naming.core.ServiceManager#createEmptyService
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {createServiceIfAbsent(namespaceId, serviceName, local, null);
}public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)throws NacosException {Service service = getService(namespaceId, serviceName);if (service == null) {Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);service = new Service();service.setName(serviceName);service.setNamespaceId(namespaceId);service.setGroupName(NamingUtils.getGroupName(serviceName));// now validate the service. if failed, exception will be thrownservice.setLastModifiedMillis(System.currentTimeMillis());service.recalculateChecksum();if (cluster != null) {cluster.setService(service);// cluster在这里创建service.getClusterMap().put(cluster.getName(), cluster);}service.validate();// 将service放入serviceMap,并初始化putServiceAndInit(service);if (!local) {addOrReplaceService(service);}}
}
将一个空的服务加入到注册表,并初始化服务,此时服务中并没有实例。
com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException {// 将service放入serviceMapputService(service);service = getService(service.getNamespaceId(), service.getName());// 开启客户端心跳检测任务service.init();/*** 哪里回调这些listener** @see DistroConsistencyServiceImpl.Notifier#handle(org.javatuples.Pair)*/// 这里加入了两个listenerconsistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
只是将新的实例与已有的实例进行合并,并放入缓存中。
com.alibaba.nacos.naming.core.ServiceManager#addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)throws NacosException {String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);// 又查了一次Service???Service service = getService(namespaceId, serviceName);synchronized (service) {// 将缓存中的实例和注册中的实例合并得到instanceMap,最后将当前实例加入instanceMapList instanceList = addIpAddresses(service, ephemeral, ips);Instances instances = new Instances();instances.setInstanceList(instanceList);// 将实例列表放入缓存中/*** @see DelegateConsistencyServiceImpl#put(String, Record)*/consistencyService.put(key, instances);}
}
新实例与旧实例列表合并的过程。
com.alibaba.nacos.naming.core.ServiceManager#addIpAddresses
private List addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}public List updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips)throws NacosException {// 从缓存中查询实例Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));// 从注册表中查询实例List currentIPs = service.allIPs(ephemeral);// Map currentInstances = new HashMap<>(currentIPs.size());// 实例ID集合Set currentInstanceIds = Sets.newHashSet();for (Instance instance : currentIPs) {currentInstances.put(instance.toIpAddr(), instance);currentInstanceIds.add(instance.getInstanceId());}// 合并缓存中的实例和注册表中的实例Map instanceMap;if (datum != null && null != datum.value) {instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);} else {instanceMap = new HashMap<>(ips.length);}for (Instance instance : ips) {if (!service.getClusterMap().containsKey(instance.getClusterName())) {Cluster cluster = new Cluster(instance.getClusterName(), service);cluster.init();service.getClusterMap().put(instance.getClusterName(), cluster);Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());}if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {// 从instanceMap中删除实例instanceMap.remove(instance.getDatumKey());} else {// 将实例放入到instanceMap中Instance oldInstance = instanceMap.get(instance.getDatumKey());if (oldInstance != null) {instance.setInstanceId(oldInstance.getInstanceId());} else {instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));}instanceMap.put(instance.getDatumKey(), instance);}}if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));}// 将合并后的实例返回return new ArrayList<>(instanceMap.values());
}
这里只是做一个代理,让不同的协议走不同的Service类。
com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {/*** ephemeral:true AP ephemeralConsistencyService,其实现类为DistroConsistencyServiceImpl* ephemeral:false CP persistentConsistencyService 单节点StandalonePersistentServiceProcessor,集群PersistentServiceProcessor** @see DistroConsistencyServiceImpl#put(java.lang.String, com.alibaba.nacos.naming.pojo.Record)* @see com.alibaba.nacos.naming.consistency.persistent.impl.PersistentServiceProcessor#put(String, Record)* @see com.alibaba.nacos.naming.consistency.persistent.impl.StandalonePersistentServiceProcessor#put(String, Record)*/mapConsistencyService(key).put(key, value);
}
默认使用的是AP协议,走的是DistroConsistencyServiceImpl。
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put
public void put(String key, Record value) throws NacosException {// 放入缓存,添加实例变更任务onPut(key, value);// 将实例变更同步给其他节点distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE,globalConfig.getTaskDispatchPeriod() / 2);
}
这里会将实例列表数据放入缓存中,然后添加一个实例变更的任务。
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#onPut
public void onPut(String key, Record value) {// 放入缓存if (KeyBuilder.matchEphemeralInstanceListKey(key)) {Datum datum = new Datum<>();datum.value = (Instances) value;datum.key = key;datum.timestamp.incrementAndGet();// 实例变更数据存dataStoredataStore.put(key, datum);}if (!listeners.containsKey(key)) {return;}// 添加实例变更的任务notifier.addTask(key, DataOperation.CHANGE);
}
实例变更的任务最后添加到一个阻塞队列中,那么这个任务什么时候从阻塞队列中取出并执行呢?
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#addTask
private BlockingQueue> tasks = new ArrayBlockingQueue<>(1024 * 1024);/*** Add new notify task to queue.** @param datumKey data key* @param action action for data*/
public void addTask(String datumKey, DataOperation action) {if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {return;}if (action == DataOperation.CHANGE) {// 加入services,防止重复添加任务services.put(datumKey, StringUtils.EMPTY);}/*** 哪里取这些任务** @see Notifier#run()*/tasks.offer(Pair.with(datumKey, action));
}
到这里整个实例的注册过程已完成,但是并没有看到instance放入到service中的过程,这个过程在下面的异步处理中。
Notifier是DistroConsistencyServiceImpl的一个成员变量,在DistroConsistencyServiceImpl#init方法使用线程池来执行Notifier,所以Notifier肯定实现了Runable接口。
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#init
private volatile Notifier notifier = new Notifier();@PostConstructpublic void init() {GlobalExecutor.submitDistroNotifyTask(notifier);}
那么我们来看看Notifier#run怎么执行的?从阻塞队列中一个一个的取出并处理。
com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl.Notifier#run
@Override
public void run() {Loggers.DISTRO.info("distro notifier started");for (; ; ) {try {// 阻塞队伍中获取Pair pair = tasks.take();// 处理实例变更handle(pair);} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}}
}private void handle(Pair pair) {try {String datumKey = pair.getValue0();DataOperation action = pair.getValue1();services.remove(datumKey);int count = 0;if (!listeners.containsKey(datumKey)) {return;}for (RecordListener listener : listeners.get(datumKey)) {count++;try {if (action == DataOperation.CHANGE) {// 处理变更// 根据datumKey从dataStore获取实例变更的数据/*** @see com.alibaba.nacos.naming.core.Service#onChange(java.lang.String, com.alibaba.nacos.naming.core.Instances)*/listener.onChange(datumKey, dataStore.get(datumKey).value);continue;}if (action == DataOperation.DELETE) {// 处理删除listener.onDelete(datumKey);continue;}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);}}if (Loggers.DISTRO.isDebugEnabled()) {Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}",datumKey, count, action.name());}} catch (Throwable e) {Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);}
}
com.alibaba.nacos.naming.core.Service#onChange
public void onChange(String key, Instances value) throws Exception {Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);for (Instance instance : value.getInstanceList()) {if (instance == null) {// Reject this abnormal instance list:throw new RuntimeException("got null instance " + key);}if (instance.getWeight() > 10000.0D) {instance.setWeight(10000.0D);}if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {instance.setWeight(0.01D);}}// 真正的更新注册表updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));recalculateChecksum();
}
将缓存中获取的实例列表按clusterName进行分组,最后以cluster为维度进行更新注册表。
com.alibaba.nacos.naming.core.Service#updateIPs
public void updateIPs(Collection instances, boolean ephemeral) {// 将instances按clusterName进行分组Map> ipMap = new HashMap<>(clusterMap.size());for (String clusterName : clusterMap.keySet()) {ipMap.put(clusterName, new ArrayList<>());}for (Instance instance : instances) {try {if (instance == null) {Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");continue;}if (StringUtils.isEmpty(instance.getClusterName())) {instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);}if (!clusterMap.containsKey(instance.getClusterName())) {Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.",instance.getClusterName(), instance.toJson());Cluster cluster = new Cluster(instance.getClusterName(), this);cluster.init();getClusterMap().put(instance.getClusterName(), cluster);}List clusterIPs = ipMap.get(instance.getClusterName());if (clusterIPs == null) {clusterIPs = new LinkedList<>();ipMap.put(instance.getClusterName(), clusterIPs);}clusterIPs.add(instance);} catch (Exception e) {Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);}}// 按clusterName进行遍历for (Map.Entry> entry : ipMap.entrySet()) {//make every ip mineList entryIPs = entry.getValue();// 更新IPclusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);}setLastModifiedMillis(System.currentTimeMillis());getPushService().serviceChanged(this);StringBuilder stringBuilder = new StringBuilder();for (Instance instance : allIPs()) {stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");}Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(),stringBuilder.toString());}
com.alibaba.nacos.naming.core.Cluster#updateIps
public void updateIps(List ips, boolean ephemeral) {Set toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;HashMap oldIpMap = new HashMap<>(toUpdateInstances.size());for (Instance ip : toUpdateInstances) {oldIpMap.put(ip.getDatumKey(), ip);}// 中间一大串只是为了识别出哪些是修改的、新增的、删除的,然后修改实例的健康状态List updatedIPs = updatedIps(ips, oldIpMap.values());if (updatedIPs.size() > 0) {for (Instance ip : updatedIPs) {Instance oldIP = oldIpMap.get(ip.getDatumKey());// do not update the ip validation status of updated ips// because the checker has the most precise result// Only when ip is not marked, don't we update the health status of IP:if (!ip.isMarked()) {ip.setHealthy(oldIP.isHealthy());}if (ip.isHealthy() != oldIP.isHealthy()) {// ip validation status updatedLoggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(),(ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName());}if (ip.getWeight() != oldIP.getWeight()) {// ip validation status updatedLoggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(),ip.toString());}}}List newIPs = subtract(ips, oldIpMap.values());if (newIPs.size() > 0) {Loggers.EVT_LOG.info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(),getName(), newIPs.size(), newIPs.toString());for (Instance ip : newIPs) {HealthCheckStatus.reset(ip);}}List deadIPs = subtract(oldIpMap.values(), ips);if (deadIPs.size() > 0) {Loggers.EVT_LOG.info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(),getName(), deadIPs.size(), deadIPs.toString());for (Instance ip : deadIPs) {HealthCheckStatus.remv(ip);}}toUpdateInstances = new HashSet<>(ips);// 直接覆盖原来的引用,CopyOnWriteif (ephemeral) {ephemeralInstances = toUpdateInstances;} else {persistentInstances = toUpdateInstances;}
}
服务端怎么保证注册表的高并发读和写?