網(wǎng)上有很多關(guān)于pos機(jī)注冊網(wǎng)絡(luò)超時,一文深入理解AP架構(gòu)Nacos注冊原理的知識,也有很多人為大家解答關(guān)于pos機(jī)注冊網(wǎng)絡(luò)超時的問題,今天pos機(jī)之家(www.shineka.com)為大家整理了關(guān)于這方面的知識,讓我們一起來看下吧!
本文目錄一覽:
pos機(jī)注冊網(wǎng)絡(luò)超時
Nacos簡介Nacos是一款阿里巴巴開源用于管理分布式微服務(wù)的中間件,能夠幫助開發(fā)人員快速實現(xiàn)動態(tài)服務(wù)發(fā)現(xiàn)、服務(wù)配置、服務(wù)元數(shù)據(jù)及流量管理等。這篇文章主要剖析一下Nacos作為注冊中心時其服務(wù)注冊與發(fā)現(xiàn)原理。
為什么會需要NacosNacos作為注冊中心是為了更好更方便的管理應(yīng)用中的每一個服務(wù),是各個分布式節(jié)點之間的紐帶。其作為注冊中心主要提供以下核心功能:
服務(wù)注冊與發(fā)現(xiàn):動態(tài)的增減服務(wù)節(jié)點,服務(wù)節(jié)點增減后動態(tài)的通知服務(wù)消費者,不需要由消費者來更新配置。服務(wù)配置:動態(tài)修改服務(wù)配置,并將其推送到服務(wù)提供者和服務(wù)消費者而不需要重啟服務(wù)。健康檢查和服務(wù)摘除:主動的檢查服務(wù)健康情況,對于宕機(jī)的服務(wù)將其摘除服務(wù)列表。分布式架構(gòu)CAP理論CAP定理是分布式系統(tǒng)中最基礎(chǔ)的原則,所以理解和掌握了CAP對系統(tǒng)架構(gòu)的設(shè)計至關(guān)重要。分布式架構(gòu)下所有系統(tǒng)不可能同時滿足以下三點:Consisteny(一致性)、Availability(可用性)、Partition tolerance(分區(qū)容錯性),CAP指明了任何分布式系統(tǒng)只能同時滿足這三項中的兩項。
分布式系統(tǒng)肯定都要保證其容錯性 ,那么可用性和一致性就只能選一個了。簡單來說分布式系統(tǒng)的CAP理論就像你想買個新手機(jī),這個手機(jī)不可能功能強(qiáng)大、便宜、又好看的,它最多只能滿足兩點的,要么功能強(qiáng)大便宜、要么功能強(qiáng)大好看、要么便宜好看,不可能同時滿足三點。
幾種注冊中心的區(qū)別注冊中心在分布式應(yīng)用中是經(jīng)常用到的,也是必不可少的,那注冊中心,又分為以下幾種:Eureka、Zookeeper、Nacos等。這些注冊中心最大的區(qū)別就是其基于AP架構(gòu)還是CP架構(gòu),簡單介紹一下:
Zookeeper:用過或者了解過zk做注冊中心的同學(xué)都知道,Zookeeper集群下一旦leader節(jié)點宕機(jī)了,在短時間內(nèi)服務(wù)都不可通訊,因為它們在一定時間內(nèi)follower進(jìn)行選舉來推出新的leader,因為在這段時間內(nèi),所有的服務(wù)通信將受到影響,而且leader選取時間比較長,需要花費幾十秒甚至上百秒的時間,因此:可以理解為 Zookeeper是實現(xiàn)的CP,也就是將失去A(可用性)。Eureka:Eureka集群下每個節(jié)點之間都會定時發(fā)送心跳,定時同步數(shù)據(jù),沒有master/slave之分,是一個完全去中心化的架構(gòu)。因此每個注冊到Eureka下的實例都會定時同步ip,服務(wù)之間的調(diào)用也是根據(jù)Eureka拿到的緩存服務(wù)數(shù)據(jù)進(jìn)行調(diào)用。若一臺Eureka服務(wù)宕機(jī),其他Eureka在一定時間內(nèi)未感知到這臺Eureka服務(wù)宕機(jī),各個服務(wù)之間還是可以正常調(diào)用。Eureka的集群中,只要有一臺Eureka還在,就能保證注冊服務(wù)可用(保證可用性),只不過查到的信息可能不是最新的(不保證強(qiáng)一致性)。當(dāng)數(shù)據(jù)出現(xiàn)不一致時,雖然A, B上的注冊信息不完全相同,但每個Eureka節(jié)點依然能夠正常對外提供服務(wù),這會出現(xiàn)查詢服務(wù)信息時如果請求A查不到,但請求B就能查到。如此保證了可用性但犧牲了一致性。Nacos:同時支持CP和AP架構(gòu),根據(jù)根據(jù)服務(wù)注冊選擇臨時和永久來決定走AP模式還是CP模式。如果注冊Nacos的client節(jié)點注冊時ephemeral=true,那么Nacos集群對這個client節(jié)點的效果就是AP,采用distro協(xié)議實現(xiàn);而注冊Nacos的client節(jié)點注冊時ephemeral=false,那么Nacos集群對這個節(jié)點的效果就是CP的,采用raft協(xié)議實現(xiàn)。本篇文章主要是深入研究一下Nacos基于AP架構(gòu)微服務(wù)注冊原理,由于篇幅有限基于CP架構(gòu)的Nacos微服務(wù)注冊下次再跟你們分析。
Nacos服務(wù)注冊與發(fā)現(xiàn)的原理1.微服務(wù)在啟動將自己的服務(wù)注冊到Nacos注冊中心,同時發(fā)布http接口供其他系統(tǒng)調(diào)用,一般都是基于SpringMVC。
2.服務(wù)消費者基于Feign調(diào)用服務(wù)提供者對外發(fā)布的接口,先對調(diào)用的本地接口加上注解@FeignClient,F(xiàn)eign會針對加了該注解的接口生成動態(tài)代理,服務(wù)消費者針對Feign生成的動態(tài)代理去調(diào)用方法時,會在底層生成Http協(xié)議格式的請求,類似 /stock/deduct? productId=100。
3.Feign最終會調(diào)用Ribbon從本地的Nacos注冊表的緩存里根據(jù)服務(wù)名取出服務(wù)提供在機(jī)器的列表,然后進(jìn)行負(fù)載均衡并選擇一臺機(jī)器出來,對選出來的機(jī)器IP和端口拼接之前生成的url請求,生成調(diào)用的Http接口地址。
Nacos核心功能點服務(wù)注冊:Nacos Client會通過發(fā)送REST請求的方式向Nacos Server注冊自己的服務(wù),提供自身的元數(shù)據(jù),比如ip地址、端口等信息。Nacos Server接收到注冊請求后,就會把這些元數(shù)據(jù)信息存儲在一個雙層的內(nèi)存Map中。
服務(wù)心跳:在服務(wù)注冊后,Nacos Client會維護(hù)一個定時心跳來持續(xù)通知Nacos Server,說明服務(wù)一直處于可用狀態(tài),防止被剔除。默認(rèn)5s發(fā)送一次心跳。
服務(wù)健康檢查:Nacos Server會開啟一個定時任務(wù)用來檢查注冊服務(wù)實例的健康情況,對于超過15s沒有收到客戶端心跳的實例會將它 的healthy屬性置為false(客戶端服務(wù)發(fā)現(xiàn)時不會發(fā)現(xiàn)),如果某個實例超過30秒沒有收到心跳,直接剔除該實例(被剔除的實例如果恢復(fù) 發(fā)送心跳則會重新注冊)
服務(wù)發(fā)現(xiàn):服務(wù)消費者(Nacos Client)在調(diào)用服務(wù)提供者的服務(wù)時,會發(fā)送一個REST請求給Nacos Server,獲取上面注冊的服務(wù)清 單,并且緩存在Nacos Client本地,同時會在Nacos Client本地開啟一個定時任務(wù)定時拉取服務(wù)端最新的注冊表信息更新到本地緩存
服務(wù)同步:Nacos Server集群之間會互相同步服務(wù)實例,用來保證服務(wù)信息的一致性。
Nacos源碼分析看Nacos源碼的不難發(fā)現(xiàn),Nacos實際上就是一個基于Spring Boot的web應(yīng)用,不管是服務(wù)注冊還是發(fā)送心跳都是通過給Nacos服務(wù)端發(fā)送http請求實現(xiàn)的。下載并編譯Nacos源碼就不過多贅述了,首先需要搭建一個微服務(wù)作為Nacos的客戶端。
Nacos客戶端注冊
Nacos客戶端也是個Spring Boot項目,當(dāng)客戶端服務(wù)啟動時Spring Boot項目啟動時自動加載spring-cloud-starter-alibaba-nacos-discovery包的META-INF/spring.factories中包含自動裝配的配置信息,并將文件中的類加載成bean放入Spring容器中,我們可以先看一下spring.factories文件:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryAutoConfiguration,\\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\\ com.alibaba.cloud.nacos.registry.NacosServiceRegistryAutoConfiguration,\\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientConfiguration,\\ com.alibaba.cloud.nacos.discovery.reactive.NacosReactiveDiscoveryClientConfiguration,\\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration,\\ com.alibaba.cloud.nacos.NacosServiceAutoConfigurationorg.springframework.cloud.bootstrap.BootstrapConfiguration=\\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
找到Nacos注冊中心的自動配置類:NacosServiceRegistryAutoConfiguration。
NacosServiceRegistryAutoConfiguration這個類是Nacos客戶端啟動時的一個入口類,代碼如下:
@Configuration( proxyBeanMethods = false)@EnableConfigurationProperties@ConditionalOnNacosDiscoveryEnabled@ConditionalOnProperty( value = {"spring.cloud.service-registry.auto-registration.enabled"}, matchifMissing = true)@AutoConfigureAfter({AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class})public class NacosServiceRegistryAutoConfiguration { public NacosServiceRegistryAutoConfiguration() { } @Bean public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean({AutoServiceRegistrationProperties.class}) public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration((List)registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context); } @Bean @ConditionalOnBean({AutoServiceRegistrationProperties.class}) public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); }}
看NacosServiceRegistryAutoConfiguration配置類有3個@Bean注解。
nacosServiceRegistry()方法: 定義了NacosServiceRegistry的bean,并且為其屬性nacosDiscoveryProperties賦值,即將從配置文件中讀取到的配置信息賦值進(jìn)去待用;nacosRegistration()方法主要就是定義了NacosRegistration的bean,后面會用到這個bean;nacosAutoServiceRegistration:該方法比較核心它的參數(shù)中有2個就是前面定義的兩個bean,其實就是為了這個方法服務(wù)的,由NacosAutoServiceRegistration類的構(gòu)造器傳入NacosAutoServiceRegistration類中:NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration),后面的流程都是以這句代碼作為入口。利用IDEA查看類結(jié)構(gòu),如上圖所示,NacosAutoServiceRegistration繼承AbstractAutoServiceRegistration類,而AbstractAutoServiceRegistration類又實現(xiàn)了AutoServiceRegistration和ApplicationListener接口。
ApplicationListener接口是Spring提供的事件監(jiān)聽接口,Spring會在所有bean都初始化完成之后發(fā)布一個事件,ApplicationListener會監(jiān)聽所發(fā)布的事件,這里的事件是Spring Boot自定義的WebServerInitializedEvent事件,主要是項目啟動時就會發(fā)布WebServerInitializedEvent事件,然后被AbstractAutoServiceRegistration監(jiān)聽到,從而就會執(zhí)行onApplicationEvent方法,在這個方法里就會進(jìn)行服務(wù)注冊。
這里AbstractAutoServiceRegistration類實現(xiàn)了Spring監(jiān)聽器接口ApplicationListener,并重寫了該接口的onApplicationEvent方法
public void onApplicationEvent(WebServerInitializedEvent event) { this.bind(event);}
繼續(xù)點下去看bind方法
public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) { this.port.compareAndSet(0, event.getWebServer().getPort()); //start方法 this.start(); } }
看到這里發(fā)現(xiàn)了bind方法里有個非常重要的start()方法,繼續(xù)看該方法的register()就是真正的客戶端注冊方法
public void start() { if (!this.isEnabled()) { if (logger.isDebugEnabled()) { logger.debug("Discovery Lifecycle disabled. Not starting"); } } else { if (!this.running.get()) { this.context.publishEvent(new InstancePreRegisteredEvent(this, this.getRegistration())); //真正的客戶端注冊方法 this.register(); if (this.shouldRegisterManagement()) { this.registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent(this, this.getConfiguration())); this.running.compareAndSet(false, true); } } }
跳過一些中間非關(guān)鍵性的代碼,可以直接看該注冊方法
protected void register() { this.serviceRegistry.register(getRegistration());}
這里的serviceRegistry就是NacosServiceRegistryAutoConfiguration類中第一個@Bean定義的bean,第一個@Bean就是這里的serviceRegistry對象的實現(xiàn);其中g(shù)etRegistration()獲取的就是第二個@Bean定義的NacosRegistration的實例,這兩個bean實例都是通過第3個@Bean傳進(jìn)來的,所以這里就可以把NacosServiceRegistryAutoConfiguration類中那3個@Bean給串起來了。
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); } else { NamingService namingService = this.namingService(); String serviceId = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); //構(gòu)建客戶端參數(shù)ip,端口號等 Instance instance = this.getNacosInstanceFromRegistration(registration); try { //調(diào)用注冊方法 namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()}); } catch (Exception var7) { log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var7}); ReflectionUtils.rethrowRuntimeException(var7); } } } //構(gòu)建客戶端注冊參數(shù) private Instance getNacosInstanceFromRegistration(Registration registration) { Instance instance = new Instance(); instance.setIp(registration.getHost()); instance.setPort(registration.getPort()); instance.setWeight((double)this.nacosDiscoveryProperties.getWeight()); instance.setClusterName(this.nacosDiscoveryProperties.getClusterName()); instance.setEnabled(this.nacosDiscoveryProperties.isInstanceEnabled()); instance.setMetadata(registration.getMetadata()); instance.setEphemeral(this.nacosDiscoveryProperties.isEphemeral()); return instance; }
不得不說,阿里巴巴開發(fā)的中間件,其底層源碼的命名還是很規(guī)范的,register()方法從命名上來看就可以知道這是注冊的方法,事實也確實是注冊的方法,這個方法中會通過nacos-client包來調(diào)用nacos-server的服務(wù)注冊接口來實現(xiàn)服務(wù)的注冊功能。下面我看一下調(diào)用Nacos注冊接口方法:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { //開啟一個異步線程向服務(wù)端發(fā)送心跳 BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance); this.beatReactor.addBeatInfo(groupedServiceName, beatInfo); } //調(diào)用服務(wù)端提供的注冊api實現(xiàn)注冊 this.serverProxy.registerService(groupedServiceName, groupName, instance); } public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); //構(gòu)建客戶端參數(shù) Map<String, String> params = new HashMap(16); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //調(diào)用Nacos提供的api實現(xiàn)注冊 this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST"); }
根據(jù)源碼可以知道beatReactor.addBeatInfo()方法作用在于創(chuàng)建心跳信息實現(xiàn)健康檢測,Nacos 服務(wù)端必須要確保注冊的服務(wù)實例是健康的,而心跳檢測就是服務(wù)健康檢測的手段。而serverProxy.registerService()實現(xiàn)服務(wù)注冊,綜上可以分析出Nacos客戶端注冊流程:
到此為止還沒有真正的實現(xiàn)服務(wù)的注冊,但是至少已經(jīng)知道了Nacos客戶端的自動注冊原理是借助了Spring Boot的自動配置功能,在項目啟動時通過自動配置類。NacosServiceRegistryAutoConfiguration將NacosServiceRegistry注入進(jìn)來,通過Spring的事件監(jiān)聽機(jī)制,調(diào)用該類的注冊方法register(registration)實現(xiàn)服務(wù)的自動注冊。
Nacos服務(wù)發(fā)現(xiàn)
1 Nacos客戶端客戶端服務(wù)發(fā)現(xiàn)
當(dāng)Nacos服務(wù)端啟動后,會先從本地緩存的serviceInfoMap中獲取服務(wù)實例信息,獲取不到則通過NamingProxy調(diào)用Nacos服務(wù)端獲取服務(wù)實例信息,最后開啟定時任務(wù)每秒請求服務(wù)端獲取實例信息列表進(jìn)而更新本地緩存serviceInfoMap,服務(wù)發(fā)現(xiàn)拉取實例信息流程圖如下:
廢話不多說,直接上服務(wù)發(fā)現(xiàn)源碼:
/** * 客戶端服務(wù)發(fā)現(xiàn) * * @param serviceName name of service * @param groupName group of service * @param clusters list of cluster * @param subscribe if subscribe the service * @return * @throws NacosException */ @Override public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) { // 如果本地緩存不存在服務(wù)信息,則進(jìn)行訂閱 serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 如果非訂閱模式就直接拉取服務(wù)端的注冊表 serviceInfo = hostReactor .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } List<Instance> list; if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList<Instance>(); } return list; }
/** * 客戶端從注冊中心拉取注冊列表 * * @param serviceName * @param clusters * @return */ public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //客戶端從本地緩存中拉群注冊表信息,第一次根據(jù)服務(wù)名從注冊表map中獲取,服務(wù)表信息肯定是為null ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); //如果拿到緩存map中的服務(wù)列表為null,如果是第一次根據(jù)服務(wù)名拉取注冊表信息,肯定為null if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); //第一次拉取注冊表信息為null后,然后調(diào)用Nacos服務(wù)端接口更新本地注冊表 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } /** * 定時任務(wù)拉取,每隔幾秒鐘就去拉取一次,去拉取nacos注冊表,更新客戶端本地注冊列表的map * * 為啥這里要定時任務(wù)拉取呢?因為上面到注冊表map是緩存在客戶端本地的,假如有新的服務(wù)注冊到nacos * 時,這時就要更新客戶端注冊表信息,所以這里會執(zhí)行一個訂單拉取的任務(wù) */ scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } //異步拉取任務(wù) public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } //執(zhí)行一個定時拉取任務(wù) ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } //定時拉取注冊表任務(wù) public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private final String clusters; private final String serviceName; /** * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty */ private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (failCount == limit) { return; } failCount++; } private void resetFailCount() { failCount = 0; } @Override public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { //又在繼續(xù)調(diào)用拉取nacos注冊列表方法 updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { //又在繼續(xù)調(diào)用拉取nacos注冊列表方法 updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!notifier.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { //最后繼續(xù)嵌套調(diào)用當(dāng)前這個任務(wù),實現(xiàn)定時拉取 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
這里值得注意的是,Nacos客戶端拉取注冊列表方法的最后又是一個定時任務(wù)任務(wù),每隔10秒鐘就會拉取一次服務(wù)端Nacos的注冊表。為啥這里要定時任務(wù)拉取呢?因為上面到注冊表map是緩存在客戶端本地的,假如有新的服務(wù)注冊到Nacos時,這時就要更新客戶端注冊表信息,所以這里會執(zhí)行一個拉取的任務(wù)。
private void updateServiceNow(String serviceName, String clusters) { try { //拉群nacos列表,更新到本地緩存map中的注冊列表 updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } /** * Update service now. * 拉取注冊列表 * * @param serviceName service name * @param clusters clusters */ public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { //調(diào)用拉群列表接口 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { //解析返回值服務(wù)表json processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } /** * Nacos客戶端查詢服務(wù)端注冊表數(shù) * * @param serviceName service name * @param clusters clusters * @param udpPort udp port * @param healthyOnly healthy only * @return instance list * @throws NacosException nacos exception */ public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); //調(diào)用拉取注冊列表接口 return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); }
2 服務(wù)端服務(wù)發(fā)現(xiàn)查詢注冊表api
上面分析了當(dāng)客戶端在其本地緩存中沒有找到注冊表信息,就會調(diào)用Nacos服務(wù)端api拉取注冊表信息,不難發(fā)現(xiàn)服務(wù)端查詢注冊表api為"/instance/list"。
/** * Get all instance of input service. * 客戶端獲取nacos所有注冊實例方法 * * @param request http request * @return list of instance * @throws Exception any error during list */ @GetMapping("/list") @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ) public ObjectNode list(HttpServletRequest request) throws Exception { String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); String agent = WebUtils.getUserAgent(request); String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY); String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY); int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0")); String env = WebUtils.optional(request, "env", StringUtils.EMPTY); boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false")); String app = WebUtils.optional(request, "app", StringUtils.EMPTY); String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY); boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false")); return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly); }
這里通過doSrvIpxt()方法獲取服務(wù)列表,根據(jù)namespaceId、serviceName獲取service實例,service實例中srvIPs獲取所有服務(wù)提供者的實例信息,遍歷組裝成json字符串并返回
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception { ClientInfo clientInfo = new ClientInfo(agent); ObjectNode result = JacksonUtils.createEmptyJsonNode(); Service service = serviceManager.getService(namespaceId, serviceName); long cacheMillis = switchDomain.getDefaultCacheMillis(); // now try to enable the push try { if (udpPort > 0 && pushService.canEnablePush(agent)) { pushService .addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app); cacheMillis = switchDomain.getPushCacheMillis(serviceName); } } catch (Exception e) { Loggers.SRV_LOG .error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e); cacheMillis = switchDomain.getDefaultCacheMillis(); } if (service == null) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } result.put("name", serviceName); result.put("clusters", clusters); result.put("cacheMillis", cacheMillis); result.replace("hosts", JacksonUtils.createEmptyArrayNode()); return result; } checkIfDisabled(service); List<Instance> srvedIPs; //獲取所有實例 srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ","))); // filter ips using selector: if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) { srvedIPs = service.getSelector().select(clientIP, srvedIPs); } if (CollectionUtils.isEmpty(srvedIPs)) { if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName); } if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.set("hosts", JacksonUtils.createEmptyArrayNode()); result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; } Map<Boolean, List<Instance>> ipMap = new HashMap<>(2); ipMap.put(Boolean.TRUE, new ArrayList<>()); ipMap.put(Boolean.FALSE, new ArrayList<>()); for (Instance ip : srvedIPs) { ipMap.get(ip.isHealthy()).add(ip); } if (isCheck) { result.put("reachProtectThreshold", false); } double threshold = service.getProtectThreshold(); if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) { Loggers.SRV_LOG.warn("protect threshold reached, return all ips, service: {}", serviceName); if (isCheck) { result.put("reachProtectThreshold", true); } ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE)); ipMap.get(Boolean.FALSE).clear(); } if (isCheck) { result.put("protectThreshold", service.getProtectThreshold()); result.put("reachLocalSiteCallThreshold", false); return JacksonUtils.createEmptyJsonNode(); } ArrayNode hosts = JacksonUtils.createEmptyArrayNode(); for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) { List<Instance> ips = entry.getValue(); if (healthyOnly && !entry.getKey()) { continue; } for (Instance instance : ips) { // remove disabled instance: if (!instance.isEnabled()) { continue; } ObjectNode ipObj = JacksonUtils.createEmptyJsonNode(); ipObj.put("ip", instance.getIp()); ipObj.put("port", instance.getPort()); // deprecated since nacos 1.0.0: ipObj.put("valid", entry.getKey()); ipObj.put("healthy", entry.getKey()); ipObj.put("marked", instance.isMarked()); ipObj.put("instanceId", instance.getInstanceId()); ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata())); ipObj.put("enabled", instance.isEnabled()); ipObj.put("weight", instance.getWeight()); ipObj.put("clusterName", instance.getClusterName()); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { ipObj.put("serviceName", instance.getServiceName()); } else { ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName())); } ipObj.put("ephemeral", instance.isEphemeral()); hosts.add(ipObj); } } result.replace("hosts", hosts); if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) { result.put("dom", serviceName); } else { result.put("dom", NamingUtils.getServiceName(serviceName)); } result.put("name", serviceName); result.put("cacheMillis", cacheMillis); result.put("lastRefTime", System.currentTimeMillis()); result.put("checksum", service.getChecksum()); result.put("useSpecifiedURL", false); result.put("clusters", clusters); result.put("env", env); result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata())); return result; }
最后看一下獲取服務(wù)端實例方法,最后就是將臨時實例或者持久實例放在一個集合中返回給客戶端。
public List<Instance> srvIPs(List<String> clusters) { if (CollectionUtils.isEmpty(clusters)) { clusters = new ArrayList<>(); clusters.addAll(clusterMap.keySet()); } return allIPs(clusters); } public List<Instance> allIPs(List<String> clusters) { List<Instance> result = new ArrayList<>(); for (String cluster : clusters) { Cluster clusterObj = clusterMap.get(cluster); if (clusterObj == null) { continue; } result.addAll(clusterObj.allIPs()); } return result; } public List<Instance> allIPs() { List<Instance> allInstances = new ArrayList<>(); //將nacos內(nèi)存中注冊表信息返回 allInstances.addAll(persistentInstances); allInstances.addAll(ephemeralInstances); return allInstances; }
總結(jié)一下Nacos客戶端服務(wù)發(fā)現(xiàn)的核心流程:
如果沒有開啟訂閱模式,則直接通過調(diào)用/instance/list接口獲取服務(wù)實例列表信息;如果開啟訂閱模式,則先會從本地緩存中獲取實例信息,如果不存在,則進(jìn)行訂閱獲并獲取實例信息;在獲得最新的實例信息之后,也會執(zhí)行processServiceJson(result)方法來更新內(nèi)存和本地實例緩存,并發(fā)布變更時間。開啟訂閱時,會開啟定時任務(wù),定時執(zhí)行UpdateTask獲取服務(wù)器實例信息、更新本地緩存、發(fā)布事件等;Nacos服務(wù)端注冊
服務(wù)端的注冊源碼邏輯相對客戶端的還是要復(fù)雜很多,所以這里我們先看一下Nacos服務(wù)端注冊的完整流程圖,避免一上來就看源碼被繞暈。
接下來我們就著重分析一下AP架構(gòu)Nacos服務(wù)注冊的源碼。
1 Nacos服務(wù)端注冊
Nacos服務(wù)端注冊當(dāng)然是本文的核心,那么首先我們來看一下Nacos服務(wù)端注冊源碼。從Nacos的客戶端注冊原理不難發(fā)現(xiàn),客戶端通過調(diào)用Nacos服務(wù)端提供的http接口實現(xiàn)注冊,對外提供的服務(wù)接口請求地址為nacos/v1/ns/instance,實現(xiàn)代碼咋nacos-naming模塊下的InstanceController類中:
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { //從請求參數(shù)匯總獲得namespaceId(命名空間Id) final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); //從請求參數(shù)匯總獲得serviceName(服務(wù)名) final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); final Instance instance = parseInstance(request); //registerInstance注冊實例 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
客戶端就是通過調(diào)用該api實現(xiàn)Nacos的注冊的,下面可以看一下Nacos的這個注冊api是怎么實現(xiàn)的
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //前面構(gòu)建過了,這里調(diào)取肯定部不為null,從serviceMap中根據(jù)namespaceId和serviceName得到一個服務(wù)對象 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //調(diào)用addInstance添加服務(wù)實例 //總體流程:把需要注冊的實例放到內(nèi)存阻塞隊列中,另外會起另一個線程從內(nèi)存中取出intance實例放到Service中,即注冊成功了 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
registerInstance()干了兩件事兒,第一就是createEmptyService()方法從請求參數(shù)匯總獲得serviceName(服務(wù)名)和namespaceId(命名空間Id),第二就是調(diào)用registerInstance注冊實例。先看一下createEmptyService方法。
2 服務(wù)端構(gòu)建注冊表
Nacos的注冊表是多級存儲結(jié)構(gòu),最外層是通過namespace來實現(xiàn)環(huán)境隔離,然后是group分組,分組下就是服務(wù),一個服務(wù)有可以分為不同的集群,集群中包含多個實例。因此其注冊表結(jié)構(gòu)為一個Map,類型是:Map<String, Map<String, Service>>外層key是namespace_id,內(nèi)層key是group + serviceName,Service內(nèi)部維護(hù)一個Map,結(jié)構(gòu)是:Map<String, Cluster>的key是clusterName,其值是集群信息;Cluster內(nèi)部維護(hù)一個Set集合Set<Instance> ephemeralInstances和Set<Instance> persistentInstances,元素是Instance類型,代表集群中的多個實例。
createEmptyService()方法就是服務(wù)端構(gòu)建注冊表的方法,基于AP架構(gòu)的Nacos實際就是將注冊實例信息保存在內(nèi)存中。
/** * 1、創(chuàng)建一個Serivice對象,內(nèi)部包含了一個clusterMap。 * 2、將service對象放入到SeriviceMap中,結(jié)構(gòu)為:Map<namespaceId, Map<groupName::serviceName, Service>>。 * 3、開啟一個定時任務(wù)用來檢測實例的心跳是否超時,每5秒執(zhí)行一次。 * * @param namespaceId * @param serviceName * @param local * @throws NacosException */ public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { createServiceIfAbsent(namespaceId, serviceName, local, null); } public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); //第一次注冊進(jìn)來,從注冊表里獲取命名空間,肯定是為null,所以需要構(gòu)建一個命名空間, //設(shè)置nameSpace等信息,如果Service實例為空,則創(chuàng)建并保存到緩存中 if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); //注冊和初始化,通過putService()方法將服務(wù)緩存到內(nèi)存 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
createEmptyService()方法主要作用如下:
創(chuàng)建一個Serivice對象,內(nèi)部包含了一個clusterMap;將service對象放入到SeriviceMap中,結(jié)構(gòu)為:Map<namespaceId, Map<groupName::serviceName, Service>>;開啟一個定時任務(wù)用來檢測實例的心跳是否超時,每5秒執(zhí)行一次。createServiceIfAbsent()方法主要作用在于第一次注冊進(jìn)來,從注冊表里獲取命名空間,肯定是為null,所以需要構(gòu)建一個命名空間,設(shè)置nameSpace等信息并保存到緩存中。這個方法里值得注意的是putServiceAndInit()方法,可以點進(jìn)來看一下這個方法:
private void putServiceAndInit(Service service) throws NacosException { //構(gòu)建注冊表雙層map,初始化serviceMap --> Map<String, Map<String, Service>> serviceMap putService(service); //初始化service,開啟心跳檢測的線程 service.init(); //實現(xiàn)數(shù)據(jù)一致性監(jiān)聽 consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
這里我著重putService(service)方法,這里實際是將注冊的實例緩存到內(nèi)存的注冊表中
/** * 通過putService()方法將服務(wù)緩存到內(nèi)存 * * @param service service */public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { //雙檢索防止并發(fā),為了防止同一個服務(wù)多個地方同時注冊 synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { //構(gòu)建NamespaceId,Serivce對象放到了ServiceMap里面了,也就是說下次我們再調(diào)用getService(namespaceId)的時候就可以獲取到一個Service對象了 serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>()); } } } //構(gòu)建 service name serviceMap.get(service.getNamespaceId()).put(service.getName(), service);}
3 Nacos服務(wù)端心跳機(jī)制
接下來我們看一下 putServiceAndInit(Service service)方法中的,init()初始化方法是怎么保持心跳連接的
/** * service.init()建立心跳機(jī)制 */public void init() { //客戶端心跳檢查任務(wù),每隔5s執(zhí)行一次,clientBeatCheckTask是一個線程的方法 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); }}/** * Schedule client beat check task with a delay. * * @param task client beat check task */public static void scheduleCheck(ClientBeatCheckTask task) { //客戶端的心跳任務(wù),這里并沒有嵌套調(diào)用,而是開啟延遲5s的任務(wù),然后每隔5秒鐘執(zhí)行一次 futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));}public class ClientBeatCheckTask implements Runnable { private Service service; public ClientBeatCheckTask(Service service) { this.service = service; } @JsonIgnore public PushService getPushService() { return ApplicationUtils.getBean(PushService.class); } @JsonIgnore public DistroMapper getDistroMapper() { return ApplicationUtils.getBean(DistroMapper.class); } public GlobalConfig getGlobalConfig() { return ApplicationUtils.getBean(GlobalConfig.class); } public SwitchDomain getSwitchDomain() { return ApplicationUtils.getBean(SwitchDomain.class); } public String taskKey() { return KeyBuilder.buildServiceMetaKey(service.getNamespaceId(), service.getName()); } @Override public void run() { try { /** * nacos心跳在集群架構(gòu)下只允許在一臺機(jī)器上執(zhí)行健康檢查任務(wù) * * 集群中有多臺機(jī)器,本方法在于對服務(wù)名稱做hash運算再對機(jī)器數(shù)量取模后,那么 * 這里每次只有定位到一臺機(jī)器,其他機(jī)器都直接return了 * * 疑問:如果一臺機(jī)器掛了會怎么辦?這里取模會不會亂掉?那這里會不會要做一致性hash? * 在nacos集群中每臺機(jī)器之間也是存在狀態(tài)同步的,每臺機(jī)器之間都有集群節(jié)點同步任務(wù),詳見com.alibaba.nacos.naming.cluster.ServerListManager.ServerStatusReporter * */ if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } //獲取服務(wù)端所有實例 List<Instance> instances = service.allIPs(true); // first set health status of instances: /** * for循環(huán)對每個實例都做健康檢查 * 在這個方法里面主要是循環(huán)當(dāng)前service的每一個臨時實例 用當(dāng)前時間減去最后一次心跳時間 是否大于心跳超時時間來判斷心跳是否超時, * 如果大于這個時間會執(zhí)行instance.setHealthy(false)將實例的健康狀態(tài)改為false;但是這個定時任務(wù)不會立即執(zhí)行,會每5秒執(zhí)行一次: */ for (Instance instance : instances) { //判斷心跳是否超時:當(dāng)前時間 - 實例上次心跳時間 > 心跳的超時時間【默認(rèn)是15秒】? if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { //如果大于心跳默認(rèn)時間,把實例的 healthy 設(shè)置為false【服務(wù)列表一開始不會刪掉,一開始會變成false】 instance.setHealthy(false); Loggers.EVT_LOG .info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}", instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(), UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat()); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } // then remove obsolete instances: for (Instance instance : instances) { if (instance.isMarked()) { continue; } //當(dāng)前時間 - 實例上一次心跳時間 > 實例的刪除時間【默認(rèn)30s】 if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JacksonUtils.toJson(instance)); //直接刪除實例 deleteIp(instance); } } } catch (Exception e) { Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e); } } private void deleteIp(Instance instance) { try { NamingProxy.Request request = NamingProxy.Request.newRequest(); request.appendParam("ip", instance.getIp()).appendParam("port", String.valueOf(instance.getPort())) .appendParam("ephemeral", "true").appendParam("clusterName", instance.getClusterName()) .appendParam("serviceName", service.getName()).appendParam("namespaceId", service.getNamespaceId()); //調(diào)用本地服務(wù) String url = "http://" + IPUtil.localHostIP() + IPUtil.IP_PORT_SPLITER + EnvUtil.getPort() + EnvUtil.getContextPath() + UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance?" + request.toUrl(); // /v/ns/instance // delete instance asynchronously: HttpClient.asyncHttpDelete(url, null, null, new Callback<String>() { @Override public void onReceive(RestResult<String> result) { if (!result.ok()) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, caused {}, resp code: {}", instance.toJson(), result.getMessage(), result.getCode()); } } @Override public void onError(Throwable throwable) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), throwable); } @Override public void onCancel() { } }); } catch (Exception e) { Loggers.SRV_LOG .error("[IP-DEAD] failed to delete ip automatically, ip: {}, error: {}", instance.toJson(), e); } }}
可以看出init方法是開啟了一個異步線程ClientBeatCheckTask去做了個周期性發(fā)送心跳的機(jī)制,方法中客戶端心跳檢查任務(wù),開啟延遲5s的任務(wù),然后每隔5秒鐘執(zhí)行一次。
service.init()方法主要通過定時任務(wù)不斷檢測當(dāng)前服務(wù)下所有實例最后發(fā)送心跳包的時間。在這個方法里面主要是循環(huán)當(dāng)前service的每一個臨時實例,用當(dāng)前時間減去最后一次心跳時間是否大于15s來判斷心跳是否超時,如果大于這個時間會執(zhí)行instance.setHealthy(false)將實例的健康狀態(tài)改為false,但是這個定時任務(wù)不會立即執(zhí)行,會每5秒執(zhí)行一次;當(dāng)前時間 - 實例上一次心跳時間 > 實例的刪除時間【默認(rèn)30s】就會刪除實例。
那么服務(wù)實例的最后心跳包更新時間是誰來觸發(fā)的呢?實際上前面在說客戶端注冊時有說到, Nacos客戶端注冊服務(wù)的同時也建立了心跳機(jī)制。
4 服務(wù)端實例注冊
上文中registerInstance注冊實例方法中還有一個最最重要的方法就是addInstance()方法,其本質(zhì)上就是把當(dāng)前注冊的服務(wù)實例保存到Service中
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { //將需要注冊的實例全部放到Cluster,再將Cluster放在Service里 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //看一下 consistencyService 對象初始化的地方就知道走的是哪個實現(xiàn) consistencyService.put(key, instances); } } public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) { //根據(jù) ephemeral 取值默認(rèn)是 true為臨時實例,臨時實例是存放在內(nèi)存的;false即為永久實例寫到文件的,可以通過此參數(shù)區(qū)分nacos是AP還是CP架構(gòu) return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName) : buildPersistentInstanceListKey(namespaceId, serviceName); }
這里著重看一下這個put方法,put方法主要做了兩件事,第一對對客戶端的請求過來的實例進(jìn)行注冊,第二是Nacos集群架構(gòu)下的數(shù)據(jù)同步,Nacos默認(rèn)用的是臨時實例,也就是ephemeral = true,也就是本文的重點AP架構(gòu)的Nacos注冊原理。
public void put(String key, Record value) throws NacosException { //注冊邏輯:實際就是把實例注冊任務(wù)放到內(nèi)存阻塞隊列中 onPut(key, value); //AP 架構(gòu)下的節(jié)點數(shù)據(jù)同步 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } /** * 注冊邏輯 */ public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //把客戶端信息注冊信息更新到注冊表 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //這里放的是DataOperation.CHANGE notifier.addTask(key, DataOperation.CHANGE); }
先來看一下onPut()方法,不難發(fā)現(xiàn)當(dāng)注冊實例數(shù)據(jù)有改變時,就無腦將這個實例扔到這個task內(nèi)存阻塞隊列中去,具體可以看一下addTask()方法
public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); //用于存放即將要注冊實例信息的內(nèi)存阻塞隊列 private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); /** * Add new notify task to queue. * * @param datumKey data key * @param action action for data */ public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } /** * 把客戶端的參數(shù)封裝成pair對象后,放進(jìn)了一個內(nèi)存隊列中,注冊就結(jié)束了,看這里并沒有把客戶端的注冊信息寫進(jìn)雙層map中 * 憑經(jīng)驗?zāi)懿碌?,這里把客戶端對象放進(jìn)內(nèi)存隊列,后續(xù)肯定是通過異步起線程的方式去注冊 */ tasks.offer(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } @Override public void run() { Loggers.DISTRO.info("distro notifier started"); /** * Spring啟動時就會開啟一個線程加載 Notifier任務(wù),這里就會死循環(huán)一直從內(nèi)存隊列中拿取實例信息實現(xiàn)異步注冊 * * 問 題1:這里的for循環(huán)會占用cpu資源嗎? * 不會占用,因為tasks是個阻塞隊列,如果tasks中沒有實例信息,這里就會阻塞在這,不會無腦死循環(huán) * * 問 題2:為什么要把實例信息都無腦先放在內(nèi)存阻塞隊列中,然后另起一個線程去異步注冊呢?阿里這里為什么要這么設(shè)計? * 個人理解:nacos是在阿里內(nèi)部使用的中間件,一般是需要滿足三高特性【高并發(fā)、高性能、高可擴(kuò)展】,阿里內(nèi)部就有幾十萬臺機(jī)器, * 如果不能實現(xiàn)高并發(fā)注冊那么肯定會有很多問題。比如訂單服務(wù)A需要注冊到nacos時,是在訂單A啟動時就需要注冊,服務(wù)注冊到nacos的邏輯 * 還是比較復(fù)雜的【詳見com.alibaba.nacos.naming.core.Service#updateIPs】,假如這里不用異步注冊而是用同步注冊的方式,那么 * 服務(wù)注冊到nacos需要花費很多時間,這才是一個注冊到nacos的行為就花費了大量時間,那么如果多幾個中間需要加載的話,那得浪費多少時間? * 所以這里采用異步注冊。 * * 問 題3:內(nèi)存阻塞隊列 tasks 會不會有堆積的情況呢? * 實際上看了com.alibaba.nacos.naming.core.Cluster#updateIps注冊方法可以發(fā)現(xiàn),注冊實際上就是把實例信息寫進(jìn)一個內(nèi)存集合Set中 * 【com.alibaba.nacos.naming.core.Cluster#ephemeralInstances】這樣的操作其實是很快的,假如真有個運維寫了個批量注冊的腳本 * 把一堆機(jī)器同時注冊進(jìn)來,那這樣確實有可能會造成內(nèi)存阻塞隊列tasks的堆積現(xiàn)象,但是這種情況并沒什么關(guān)系,Eureka有時候?qū)嵗远紩兄? * 幾十秒,對當(dāng)前的nacos架構(gòu)而言,既然要實現(xiàn)高并發(fā)那么只能犧牲一點實例注冊的即使響應(yīng)時間。正常情況下,即使有幾十臺幾百臺機(jī)器同時注冊, * 由于注冊是內(nèi)存操作,速度也很快,可以說是準(zhǔn)實時,基本上正常情況下注冊信息1s就能感知到。 * */ for (; ; ) { try { //從內(nèi)存隊列中拿出任務(wù) Pair<String, DataOperation> pair = tasks.take(); //拿出pair對象中的客戶端信息去注冊 handle(pair); } catch (Throwable e) { //這個線程即使拋異常也不終止 Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } } private void handle(Pair<String, DataOperation> pair) { try { //前面拼接的參數(shù) "com.alibaba.nacos.naming.iplist.ephemeral.namespaceId##serviceName" String datumKey = pair.getValue0(); //前面?zhèn)鞯腄ataOperation.CHANGE。 DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { /** * 拿到前面放的map中的客戶端信息dataStore.get,這里的key就是前面的 * 拼接的參數(shù) "com.alibaba.nacos.naming.iplist.ephemeral.namespaceId##serviceName" * 前面放的是DataOperation.CHANGE。 * */ listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
當(dāng)有實例需要注冊時,直接調(diào)用addTask()方法將這個實例信息無腦扔進(jìn)內(nèi)存阻塞隊列中去,注冊就結(jié)束了。這個應(yīng)該算是Nacos注冊的一個精髓吧,Nacos為了提高性能其源碼使用了大量的異步任務(wù)、異步線程等操作,用這些方式對提升Nacos性能有很大幫助。不難猜到,這里把客戶端實例對象放進(jìn)內(nèi)存隊列,后續(xù)肯定是通過異步起線程的方式去注冊。
不難發(fā)現(xiàn)addTask()方法是Notifier類的方法,Notifier實現(xiàn)了Runnable接口,很明顯這就是一個異步線程,這里跟上面的猜想一致,Nacos就是通過開啟了一個異步線程實現(xiàn)注冊的,具體的注冊方法直接可以看Notifier線程的run()方法即可。那么這個Notifier線程是啥時候開啟的呢?
@DependsOn("ProtocolManager")@org.springframework.stereotype.Service("distroConsistencyService")public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor { //本類DistroConsistencyServiceImpl注入到了Spring容器中,所以項目啟動時類加載的時候就會初始化這個方法 @PostConstruct public void init() { //線程池執(zhí)行器,執(zhí)行notifier線程的任務(wù) GlobalExecutor.submitDistroNotifyTask(notifier); }}
了解Spring的應(yīng)該都知道這個是Spring加載的一種初始化方式,Spring啟動時加載這個init方法初始化數(shù)據(jù),就會開啟一個線程加載Notifier任務(wù)。
看了Notifier線程的run()方法,不免會有幾個疑問。第一、這里的for循環(huán)會占用cpu資源嗎?第二、為什么要把實例信息都無腦先放在內(nèi)存阻塞隊列中,然后另起一個線程去異步注冊呢?第三、阿里這里為什么要這么設(shè)計?這樣設(shè)計好處是什么呢?
這里第一個問題不會占用cpu資源,因為tasks是個阻塞隊列,如果tasks中沒有實例信息,這里就會阻塞在這,不會無腦死循環(huán),所以是不會占用cpu資源的;
第二個問題個人理解:Nacos是在阿里內(nèi)部使用的中間件,肯定是需要滿足高并發(fā)、高性能、高可擴(kuò)展,阿里內(nèi)部估計就有幾十萬臺機(jī)器,如果不能實現(xiàn)高并發(fā)注冊那么肯定會有很多問題。比如訂單服務(wù)需要注冊到nacos時,那么訂單啟動時就需要注冊,服務(wù)注冊到Nacos的邏輯還是比較復(fù)雜的【詳見com.alibaba.nacos.naming.core.Service#updateIPs】,假如這里不用異步注冊而是用同步注冊的方式,那么服務(wù)注冊到Nacos需要花費很多時間,這才是一個注冊到Nacos的行為就花費了大量時間,那么如果多幾個中間需要加載的話,那會浪費很多時間,所以這里采用異步注冊。
public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } //設(shè)置權(quán)重默認(rèn)值啥的 if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //真正的注冊實例的方法 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); } public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); /** * 注冊邏輯updateIps,更新注冊表信息: * * 這個方法里面會將已經(jīng)注冊過的實例列表復(fù)制一份,將新的實例和老的實例都更新到一個集合中, * 最終再將這個集合更新到真正的實例列表,是一種寫時復(fù)制的思想,主要時為了解決并發(fā)沖突, * 在寫的過程中,其他線程讀到的還是舊數(shù)據(jù),等真正寫完之后再將數(shù)據(jù)更新回去。 * * 思考一下,正常情況下在寫之前都要加鎖,不然可能會有讀寫并發(fā)問題,這里為什么不在加鎖? * 假如這里加了一個鎖之后,相當(dāng)于把讀和寫操作排隊串行化執(zhí)行了,就是讀寫不能同時進(jìn)行了, * 這里并發(fā)肯定會很低,所以這里用copy on write機(jī)制,將原來的注冊表復(fù)制出一個副本,然后進(jìn) * 行修改,此時讀請求進(jìn)來還是讀老的注冊表,這樣讀寫就能并發(fā)執(zhí)行。 * * 那這里用讀寫分離和加鎖串行執(zhí)行有什么優(yōu)劣勢嗎? * 讀寫分離:寫的時候?qū)懙氖歉北?,讀的是老得數(shù)據(jù),這樣可能讀到讀不是最新數(shù)據(jù),只有當(dāng)副本寫完將 * 老數(shù)據(jù)替換,此時讀的才是最新數(shù)據(jù),讀寫分離雖然提高了讀寫并發(fā)但是對數(shù)據(jù)的一致性稍有妥協(xié),但是 * 對于此時注冊的場景而言影響不大,即使是沒有讀到最新數(shù)據(jù)也沒關(guān)系,最多就當(dāng)此服務(wù)啟動的慢一點而 * 已,當(dāng)前這個注冊場景,還是提高并發(fā)注冊能力稍重要些,若是對讀寫數(shù)據(jù)一致場景要求很高時,就必須得 * 加鎖串行執(zhí)行 * * 加鎖執(zhí)行:讀寫都是加鎖執(zhí)行,寫完后再去讀,讀的一定是最新的數(shù)據(jù),讀寫數(shù)據(jù)強(qiáng)一致,但是這里根本不需要數(shù)據(jù)強(qiáng)一致 * * 這里有個疑問注冊時每個微服務(wù)都去復(fù)制一個副本,然后將副本替換回原件時會不會有覆蓋的問題? * 不會,這里是開的一個線程去拿內(nèi)存隊列的數(shù)據(jù)進(jìn)行注冊的,所以不可能存在覆蓋并發(fā)問題。詳見 * com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#init()方法 * */ clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); //上面已經(jīng)更新過了注冊表后,這里需要發(fā)布事件,主動通知客戶端 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); } /** * Update instance list. * * @param ips instance list 需要注冊的實例 * @param ephemeral whether these instances are ephemeral 是哪種實例,永久實例還是臨時實例 */ public void updateIps(List<Instance> ips, boolean ephemeral) { //根據(jù)傳進(jìn)來的 ephemeral 判斷是哪種實例,直接把這種實例復(fù)制一份toUpdateInstances Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances //臨時實例 【這里這個實例是注冊表已有的實例,這里直接將原有的實例再次復(fù)制了一份,就是toUpdateInstances】 : persistentInstances; //永久實例 【這里這個實例是注冊表已有的實例,這里直接將原有的實例再次復(fù)制了一份,就是toUpdateInstances】 HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); //循環(huán)將上面復(fù)制的實例toUpdateInstances放oldIpMap中 for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } /** * 下面就是對比新注冊的實例與副本實例的差異,判斷新的實例是新增、刪除、還是修改,這里使用了copyOnWrite機(jī)制,上面的toUpdateInstances就是 * 復(fù)制出來的副本,新增、刪除、修改都是在副本上進(jìn)行,之后再將原注冊表覆蓋 * 新增:直接新增到副本中 * 修改:直接修改副本的實例信息 * 刪除:刪除副本的實例 * */ //更新注冊表 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} IP-{} {}:{}@{}", getService().getName(), (ip.isHealthy() ? "ENABLED" : "DISABLED"), ip.getIp(), ip.getPort(), getName()); } if (ip.getWeight() != oldIP.getWeight()) { // ip validation status updated Loggers.EVT_LOG.info("{} {SYNC} {IP-UPDATED} {}->{}", getService().getName(), oldIP.toString(), ip.toString()); } } } //刪除注冊表 List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-NEW} cluster: {}, new ips size: {}, content: {}", getService().getName(), getName(), newIPs.size(), newIPs.toString()); for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { Loggers.EVT_LOG .info("{} {SYNC} {IP-DEAD} cluster: {}, dead ips size: {}, content: {}", getService().getName(), getName(), deadIPs.size(), deadIPs.toString()); for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips);
第三個問題:可以看上面這個寫注冊表的源碼,當(dāng)服務(wù)A需要注冊到Nacos時,并不是直接寫進(jìn)Nacos的注冊表里,實際上是先拷貝了一個副本,訂單服務(wù)注冊寫注冊表時直接寫副本的注冊表,副本寫完后才會替換原來Nacos中的注冊表,所以當(dāng)庫存服務(wù)需要從Nacos拉取服務(wù)時,拉取的是Nacos實際注冊表中的信息,這種設(shè)計方式能夠大大提高Nacos的注冊性能。
類似于CopyOnWriteArrayList的copy on write機(jī)制,也就是寫時復(fù)制、讀寫分離設(shè)計思想。這種讀寫分離對于客戶端注冊感知實時性可能會稍差點,但是這種情況并沒什么關(guān)系,Eureka有時候?qū)嵗远紩兄獛资?,對?dāng)前的nacos架構(gòu)而言,既然要實現(xiàn)高并發(fā)那么只能犧牲一點實例注冊的及時響應(yīng)時間。
當(dāng)Nacos注冊成功后,就需要發(fā)布事件,主動通知客戶端,接下來可以看一下發(fā)布事件的源碼
public void serviceChanged(Service service) { // merge some change events to reduce the push frequency: if (futureMap .containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { return; } //時間發(fā)布,發(fā)布事件,通知客戶端 this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); }
這里Nacos會通過udp的方式將服務(wù)變動通知給訂閱的客戶端。Nacos的這種推送模式相對于zk那種利用tcp長連接而言還是會節(jié)約很多資源,即使有大量節(jié)點更新也不會使Nacos出現(xiàn)性能瓶頸。
當(dāng)Nacos客戶端接收到了udp消息后會給服務(wù)端返回一個ack,如果Nacos超時未收到ack,還會有重發(fā)機(jī)制,超過了這個超時時間就不再重發(fā)了。雖然udp是個不可靠協(xié)議不能保證消息一定能推送到客戶端,但是Nacos客戶端還是有定時輪訓(xùn)做兜底定時查詢Nacos注冊表。Nacas采用了這兩種機(jī)制,既保證了實時性,又保證了數(shù)據(jù)更新不會被漏掉。
5 Nacos集群新節(jié)點啟動數(shù)據(jù)同步
Nacos數(shù)據(jù)同步分為全量同步和增量同步,全量同步就是初始化數(shù)據(jù)一次性同步,而增量同步是指有數(shù)據(jù)增加的時候,只同步增加的數(shù)據(jù)。
a. Nacos集群全量數(shù)據(jù)同步
Nacos集群有新的節(jié)點啟動時,DistroProtocol類就會在Spring加載時調(diào)用構(gòu)造方法,同時開啟一個數(shù)據(jù)同步任務(wù),該方法會執(zhí)行startVerifyTask()和startLoadTask(),我們重點關(guān)注startLoadTask(),具體代碼如下:
@Componentpublic class DistroProtocol { /** * 當(dāng)nacos集群只有兩臺機(jī)器時,此時若又新增一臺機(jī)器,此時需要將原來兩臺機(jī)器的數(shù)據(jù)同步到新的nacos機(jī)器上 * * @param memberManager * @param distroComponentHolder * @param distroTaskEngineHolder * @param distroConfig */ public DistroProtocol(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroTaskEngineHolder distroTaskEngineHolder, DistroConfig distroConfig) { this.memberManager = memberManager; this.distroComponentHolder = distroComponentHolder; this.distroTaskEngineHolder = distroTaskEngineHolder; this.distroConfig = distroConfig; //本項目啟動時DistroProtocol類加載時需要加載本構(gòu)造方法,開啟一個數(shù)據(jù)同步任務(wù) startDistroTask(); } private void startDistroTask() { if (EnvUtil.getStandaloneMode()) { isInitialized = true; return; } //啟動startVerifyTask,做數(shù)據(jù)同步校驗 startVerifyTask(); //加載任務(wù) startLoadTask(); } private void startLoadTask() { ///處理狀態(tài)回調(diào)對象 DistroCallback loadCallback = new DistroCallback() { //處理成功 @Override public void onSuccess() { isInitialized = true; } //處理失敗 @Override public void onFailed(Throwable throwable) { isInitialized = false; } }; GlobalExecutor.submitLoadDataTask( new DistroLoadDataTask(memberManager, distroComponentHolder, distroConfig, loadCallback)); } }
上面方法會調(diào)用DistroLoadDataTask對象,而該對象其實是個線程,因此會執(zhí)行它的run方法,run方法會調(diào)用load()方法實現(xiàn)數(shù)據(jù)全量加載,代碼如下:
public class DistroLoadDataTask implements Runnable { private final ServerMemberManager memberManager; private final DistroComponentHolder distroComponentHolder; private final DistroConfig distroConfig; private final DistroCallback loadCallback; private final Map<String, Boolean> loadCompletedMap; public DistroLoadDataTask(ServerMemberManager memberManager, DistroComponentHolder distroComponentHolder, DistroConfig distroConfig, DistroCallback loadCallback) { this.memberManager = memberManager; this.distroComponentHolder = distroComponentHolder; this.distroConfig = distroConfig; this.loadCallback = loadCallback; loadCompletedMap = new HashMap<>(1); } /** * 數(shù)據(jù)加載過程 */ @Override public void run() { try { //加載數(shù)據(jù) load(); if (!checkCompleted()) { GlobalExecutor.submitLoadDataTask(this, distroConfig.getLoadDataRetryDelayMillis()); } else { loadCallback.onSuccess(); Loggers.DISTRO.info("[DISTRO-INIT] load snapshot data success"); } } catch (Exception e) { loadCallback.onFailed(e); Loggers.DISTRO.error("[DISTRO-INIT] load snapshot data failed. ", e); } }}
數(shù)據(jù)同步會通過Http請求從遠(yuǎn)程服務(wù)器獲取數(shù)據(jù),并同步到當(dāng)前服務(wù)的緩存中。執(zhí)行流程如下:
首先,loadAllDataSnapshotFromRemote()從遠(yuǎn)程加載所有數(shù)據(jù),并處理同步到本機(jī);
第二,transportAgent.getDatumSnapshot()遠(yuǎn)程加載數(shù)據(jù),通過Http請求執(zhí)行遠(yuǎn)程加載;
第三,dataProcessor.processSnapshot()處理數(shù)據(jù)同步到本地
/** * 加載數(shù)據(jù),并同步 * * @throws Exception */ private void load() throws Exception { while (memberManager.allMembersWithoutSelf().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting server list init..."); TimeUnit.SECONDS.sleep(1); } while (distroComponentHolder.getDataStorageTypes().isEmpty()) { Loggers.DISTRO.info("[DISTRO-INIT] waiting distro data storage register..."); TimeUnit.SECONDS.sleep(1); } for (String each : distroComponentHolder.getDataStorageTypes()) { if (!loadCompletedMap.containsKey(each) || !loadCompletedMap.get(each)) { //從遠(yuǎn)端機(jī)器拉取數(shù)據(jù),從遠(yuǎn)程加載所有數(shù)據(jù),并處理同步到本機(jī) loadCompletedMap.put(each, loadAllDataSnapshotFromRemote(each)); } } } /** * 從遠(yuǎn)端機(jī)器拉取數(shù)據(jù) * * @param resourceType * @return */ private boolean loadAllDataSnapshotFromRemote(String resourceType) { DistroTransportAgent transportAgent = distroComponentHolder.findTransportAgent(resourceType); DistroDataProcessor dataProcessor = distroComponentHolder.findDataProcessor(resourceType); if (null == transportAgent || null == dataProcessor) { Loggers.DISTRO.warn("[DISTRO-INIT] Can't find component for type {}, transportAgent: {}, dataProcessor: {}", resourceType, transportAgent, dataProcessor); return false; } //拉取不包含自己的機(jī)器 for (Member each : memberManager.allMembersWithoutSelf()) { try { Loggers.DISTRO.info("[DISTRO-INIT] load snapshot {} from {}", resourceType, each.getAddress()); //調(diào)取接口獲取除自己外的所有機(jī)器 DistroData distroData = transportAgent.getDatumSnapshot(each.getAddress()); //同步數(shù)據(jù) boolean result = dataProcessor.processSnapshot(distroData); Loggers.DISTRO .info("[DISTRO-INIT] load snapshot {} from {} result: {}", resourceType, each.getAddress(), result); //同步成功直接return,從一臺機(jī)器上同步 if (result) { return true; } } catch (Exception e) { Loggers.DISTRO.error("[DISTRO-INIT] load snapshot {} from {} failed.", resourceType, each.getAddress(), e); } } return false; } private boolean checkCompleted() { if (distroComponentHolder.getDataStorageTypes().size() != loadCompletedMap.size()) { return false; } for (Boolean each : loadCompletedMap.values()) { if (!each) { return false; } } return true; }
到這為止實現(xiàn)數(shù)據(jù)全量同步,其實全量同步最終還是互相調(diào)用Nacos提供的api。總結(jié)一下全量數(shù)據(jù)同步的過程:
啟動一個定時任務(wù)線程DistroLoadDataTask加載數(shù)據(jù),調(diào)用load()方法加載數(shù)據(jù)調(diào)用loadAllDataSnapshotFromRemote()方法從遠(yuǎn)程機(jī)器同步所有的數(shù)據(jù)構(gòu)造http請求,調(diào)用httpGet方法從指定的server獲取數(shù)據(jù)同步處理數(shù)據(jù)processData并執(zhí)行監(jiān)聽器listener成功后,就更新data storeb. Nacos集群增量數(shù)據(jù)同步
當(dāng)服務(wù)注冊完成后,Nacos需要將客戶端實例信息同步到Nacos集群其他節(jié)點,可以看一下Nacos底層是怎么實現(xiàn)的。我們再次回到put方法:
public void put(String key, Record value) throws NacosException { //注冊邏輯:實際就是把實例注冊任務(wù)放到內(nèi)存阻塞隊列中 onPut(key, value); //AP 架構(gòu)下的節(jié)點數(shù)據(jù)同步 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
上文中已經(jīng)解釋過put方法中的 onPut(key, value)方法,接下來我們再了解一下AP結(jié)構(gòu)Nacos下的節(jié)點數(shù)據(jù)是同步,也就是distroProtocol.sync方法
/** * Start to sync data to all remote server. * * @param distroKey distro key of sync data * @param action the action of data operation */public void sync(DistroKey distroKey, DataOperation action, long delay) { //循環(huán)將新增實例同步到除自己外的所有實例,單機(jī)for循環(huán)都不會走,集群架構(gòu)就會走本方法 for (Member each : memberManager.allMembersWithoutSelf()) { //先把每臺機(jī)器都數(shù)據(jù)封裝稱distroKeyWithTarget對象 DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //添加到task任務(wù)中 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("[DISTRO-SCHEDULE] {} to {}", distroKey, each.getAddress()); }}/** * 將集群中除自己外其他需要同步的機(jī)器信息添加到一個tasks任務(wù)中,由前面的知識可以猜到這里也是 * 用異步開啟一個線程去拿tasks進(jìn)行同步新增實例信息到其他nacos機(jī)器中 * * @param key key of task * @param newTask */@Overridepublic void addTask(Object key, AbstractDelayTask newTask) { lock.lock(); try { AbstractDelayTask existTask = tasks.get(key); if (null != existTask) { newTask.merge(existTask); } //將集群中除自己外其他需要同步的機(jī)器信息添加到一個tasks任務(wù)中,task是個保存信息的ConcurrentHashMap tasks.put(key, newTask); } finally { lock.unlock(); }}
這里直接把需要同步的信息放在了內(nèi)存的ConcurrentHashMap中,我們看一下這里具體看一下怎么同步其他節(jié)點
@Componentpublic class DistroTaskEngineHolder { //延遲任務(wù)執(zhí)行器 private final DistroDelayTaskExecuteEngine delayTaskExecuteEngine = new DistroDelayTaskExecuteEngine(); //任務(wù)執(zhí)行引擎器 private final DistroExecuteTaskExecuteEngine executeWorkersManager = new DistroExecuteTaskExecuteEngine(); public DistroTaskEngineHolder(DistroComponentHolder distroComponentHolder) { DistroDelayTaskProcessor defaultDelayTaskProcessor = new DistroDelayTaskProcessor(this, distroComponentHolder); delayTaskExecuteEngine.setDefaultTaskProcessor(defaultDelayTaskProcessor); } public DistroDelayTaskExecuteEngine getDelayTaskExecuteEngine() { return delayTaskExecuteEngine; } public DistroExecuteTaskExecuteEngine getExecuteWorkersManager() { return executeWorkersManager; } public void registerNacosTaskProcessor(Object key, NacosTaskProcessor nacosTaskProcessor) { this.delayTaskExecuteEngine.addProcessor(key, nacosTaskProcessor); }}
這個類中會創(chuàng)建一個任務(wù)執(zhí)行引擎,代碼如下:
public class DistroDelayTaskExecuteEngine extends NacosDelayTaskExecuteEngine { public DistroDelayTaskExecuteEngine() { super(DistroDelayTaskExecuteEngine.class.getName(), Loggers.DISTRO); } @Override public void addProcessor(Object key, NacosTaskProcessor taskProcessor) { Object actualKey = getActualKey(key); super.addProcessor(actualKey, taskProcessor); } @Override public NacosTaskProcessor getProcessor(Object key) { Object actualKey = getActualKey(key); return super.getProcessor(actualKey); } private Object getActualKey(Object key) { return key instanceof DistroKey ? ((DistroKey) key).getResourceType() : key; }}//忽略一些中間省略的代碼public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) { super(logger); tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity); processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name)); //這里又執(zhí)行了一個任務(wù) processingExecutor .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}/** * 執(zhí)行了一個任務(wù) */private class ProcessRunnable implements Runnable { @Override public void run() { try { processTasks(); } catch (Throwable e) { getEngineLog().error(e.toString(), e); } }}
public boolean process(NacosTask task) { if (!(task instanceof DistroDelayTask)) { return true; } DistroDelayTask distroDelayTask = (DistroDelayTask) task; DistroKey distroKey = distroDelayTask.getDistroKey(); if (DataOperation.CHANGE.equals(distroDelayTask.getAction())) { //將延遲任務(wù)變更成異步任務(wù),異步任務(wù)對象是一個線程 DistroSyncChangeTask syncChangeTask = new DistroSyncChangeTask(distroKey, distroComponentHolder); //將前面封裝到任務(wù)拿出來放在一個隊列中 distroTaskEngineHolder.getExecuteWorkersManager().addTask(distroKey, syncChangeTask); return true; } return false;}public void addTask(Object tag, AbstractExecuteTask task) { //拿前面其他實例到任務(wù) NacosTaskProcessor processor = getProcessor(tag); if (null != processor) { processor.process(task); return; } TaskExecuteWorker worker = getWorker(tag); //將同步數(shù)據(jù)到其他nacos實例到tasks任務(wù)放進(jìn)一個queue中 在InnerWorker.run()方法中從queue隊列中拿任務(wù)執(zhí)行 worker.process(task);}public boolean process(NacosTask task) { if (task instanceof AbstractExecuteTask) { putTask((Runnable) task); } return true;}/** * 把任務(wù)同步放進(jìn)內(nèi)存隊列中 * * @param task */private void putTask(Runnable task) { try { queue.put(task); } catch (InterruptedException ire) { log.error(ire.toString(), ire); }}
將同步數(shù)據(jù)到其他Nacos實例到tasks任務(wù)放進(jìn)一個queue中,然后在InnerWorker.run()方法中從queue隊列中拿任務(wù)執(zhí)行??匆幌戮唧w是怎么執(zhí)行同步任務(wù)的:
/** * Inner execute worker. */private class InnerWorker extends Thread { InnerWorker(String name) { setDaemon(false); setName(name); } @Override public void run() { while (!closed.get()) { try { //從queue中拿同步任務(wù) Runnable task = queue.take(); long begin = System.currentTimeMillis(); //實際就是執(zhí)行異步同步任務(wù) DistroSyncChangeTask 的run() task.run(); long duration = System.currentTimeMillis() - begin; if (duration > 1000L) { log.warn("distro task {} takes {}ms", task, duration); } } catch (Throwable e) { log.error("[DISTRO-FAILED] " + e.toString(), e); } } }}
這里從隊列里面拿出來任務(wù)執(zhí)行,不難發(fā)現(xiàn)這里的任務(wù)執(zhí)行的具體方法就是DistroSyncChangeTask類的run方法:
public class DistroSyncChangeTask extends AbstractDistroExecuteTask { private final DistroComponentHolder distroComponentHolder; public DistroSyncChangeTask(DistroKey distroKey, DistroComponentHolder distroComponentHolder) { super(distroKey); this.distroComponentHolder = distroComponentHolder; } @Override public void run() { Loggers.DISTRO.info("[DISTRO-START] {}", toString()); try { //獲取各種參數(shù) String type = getDistroKey().getResourceType(); DistroData distroData = distroComponentHolder.findDataStorage(type).getDistroData(getDistroKey()); distroData.setType(DataOperation.CHANGE); //調(diào)用http接口同步任務(wù) boolean result = distroComponentHolder.findTransportAgent(type).syncData(distroData, getDistroKey().getTargetServer()); //同步失敗會繼續(xù)重試 if (!result) { handleFailedTask(); } Loggers.DISTRO.info("[DISTRO-END] {} result: {}", toString(), result); } catch (Exception e) { Loggers.DISTRO.warn("[DISTRO] Sync data change failed.", e); handleFailedTask(); } } /** * 同步失敗會繼續(xù)重試 */ private void handleFailedTask() { String type = getDistroKey().getResourceType(); DistroFailedTaskHandler failedTaskHandler = distroComponentHolder.findFailedTaskHandler(type); if (null == failedTaskHandler) { Loggers.DISTRO.warn("[DISTRO] Can't find failed task for type {}, so discarded", type); return; } failedTaskHandler.retry(getDistroKey(), DataOperation.CHANGE); } @Override public String toString() { return "DistroSyncChangeTask for " + getDistroKey().toString(); }}
每個Nacos服務(wù)端實例都會提供這樣的一個api接口供其他Nacos實例調(diào)用,從而同步注冊實例數(shù)據(jù)。DistroSyncChangeTask類的run方法,就是調(diào)用http接口同步任務(wù)接口,將本節(jié)點的注冊實例數(shù)據(jù)同步到其他節(jié)點機(jī)器上。
總結(jié)一下上面增量數(shù)據(jù)同步方法:
DistroProtocol 使用 sync() 方法處理AP 架構(gòu)下的節(jié)點數(shù)據(jù)同步向其他節(jié)點發(fā)布廣播任務(wù)調(diào)用 distroTaskEngineHolder 發(fā)布延遲任務(wù)調(diào)用 DistroDelayTaskProcessor.process() 方法進(jìn)行任務(wù)投遞:將延遲任務(wù)轉(zhuǎn)換為異步變更任務(wù)執(zhí)行變更任務(wù) DistroSyncChangeTask.run() 方法:向指定節(jié)點發(fā)送消息總結(jié)Nacos避免并發(fā)讀寫的沖突:Nacos在更新實例列表時,會采用CopyOnWrite技術(shù),首先將老得實例列表拷貝一份,然后更新拷貝的實例列表,再用更新后的實例列表來覆蓋舊的實例列表。
Nacos提高注冊并發(fā):為了應(yīng)對阿里巴巴內(nèi)部數(shù)十萬服務(wù)的并發(fā)寫請求Nacos內(nèi)部會將服務(wù)注冊的任務(wù)放入阻塞隊列,采用線程池異步來完成實例更新,從而提高并發(fā)寫能力。
Nacos的服務(wù)發(fā)現(xiàn)分為兩種模式:主動拉取模式,消費者定期主動從Nacos服務(wù)端拉取服務(wù)列表并緩存起來,當(dāng)服務(wù)調(diào)用時優(yōu)先讀取本地緩存中的服務(wù)列表。訂閱模式,消費者訂閱Nacos中的服務(wù)列表,并基于UDP協(xié)議來接收服務(wù)變更通知。當(dāng)Nacos中的服務(wù)列表更新時,會發(fā)送UDP廣播給所有訂閱者。與Eureka相比,Nacos的訂閱模式服務(wù)狀態(tài)更新更及時,消費者更容易及時發(fā)現(xiàn)服務(wù)列表的變化,剔除故障服務(wù)。
以上就是關(guān)于pos機(jī)注冊網(wǎng)絡(luò)超時,一文深入理解AP架構(gòu)Nacos注冊原理的知識,后面我們會繼續(xù)為大家整理關(guān)于pos機(jī)注冊網(wǎng)絡(luò)超時的知識,希望能夠幫助到大家!
