Spring Cloud Ribbon源码分析

原创 吴就业 98 0 2020-06-27

本文为博主原创文章,未经博主允许不得转载。

本文链接:https://www.wujiuye.com/article/7ac94a204b934f10aee3af4387ba1ec1

作者:吴就业
链接:https://www.wujiuye.com/article/7ac94a204b934f10aee3af4387ba1ec1
来源:吴就业的网络日记
本文为博主原创文章,未经博主允许不得转载。

本篇文章写于2020年06月27日,从公众号|掘金|CSDN手工同步过来(博客搬家),本篇为原创文章。

Spring Cloud Kubernetes微服务实战与源码分析

在项目中使用Ribbon的目的是在客户端(服务消费端)实现负载均衡。

在上一篇《Spring Cloud OpenFeign源码分析,为什么不导入Ribbon应用会启动不起来?》中我们分析了为什么使用OpenFeign时,不配置url,且不导入Ribbon的依赖会报错。本篇继续分析OpenFeign是如何与Ribbon整合、Ribbon是如何实现负载均衡的、Ribbon是如何从注册中心获取服务的。

OpenFeignRibbon整合后的接口调用流程

OpenFeignRibbon整合实现负载均衡调用接口的流程如下:

spring-cloud-openfeign-core模块: * 1、调用LoadBalancerFeignClientexecute调用远程方法; * 2、调用FeignLoadBalancerexecuteWithLoadBalancer方法实现负载均衡调用。

ribbon-core模块: * 3、调用LoadBalancerCommandsubmit方法实现异步调用同步阻塞等待结果。 * 4、调用LoadBalancerCommandselectServer方法从多个服务提供者中负载均衡选择一个调用;

ribbon-loadbalancer模块: * 5、调用ILoadBalancerchooseServer方法选择服务; * 6、调用IRulechoose方法按某种算法选择一个服务,如随机算法、轮询算法;

OpenFeign是如何与Ribbon整合的

sck-demo项目项目地址:https://github.com/wujiuye/share-projects/tree/master/sck-demo

当我们使用openfeign时,如果不配置@FeignClienturl属性,那么就需要导入spring-cloud-starter-kubernetes-ribbon的依赖,使用LoadBalancerFeignClient调用接口。如果我们不需要使用Ribbon来实现负载均衡,那么我们可以直接将@FeignClienturl属性配置为:http://{serviceId},而不用添加Ribbon的依赖。

sck-demo项目中添加spring-cloud-starter-kubernetes-ribbon依赖,非Spring Cloud Kubernetes项目只需添加spring-cloud-starter-netflix-ribbon

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-kubernetes-ribbon</artifactId>
</dependency>

当我们在项目中添加spring-cloud-starter-kubernetes-ribbon依赖配置时,会将spring-cloud-starter-netflix-ribbonspring-cloud-kubernetes-ribbon都会导入到项目中,如下图所示。

当项目中使用openfeign并添加spring-cloud-starter-netflix-ribbon后,Ribbon就能通过自动配置与openfeign整合,为项目注入ILoadBalancer的实现类实例。默认使用的是ZoneAwareLoadBalancer,这是spring-cloud-netflix-ribbon下的类。

spring-cloud-netflix-ribbonMETA-INF目录下的spring.factories文件内容如下:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.ribbon.RibbonAutoConfiguration

可以说spring-cloud-netflix-ribbonspring-cloud-commonsloadbalancer接口的实现。

RibbonAutoConfiguration会注入一个LoadBalancerClientLoadBalancerClientspring-cloud-commons定义的负载均衡接口,RibbonLoadBalancerClientRibbon实现spring-cloud-commons负载均衡接口LoadBalancerClient的实现类,是提供给代码中使用@LoadBalanced注解使用的。

    @Bean
	@ConditionalOnMissingBean(LoadBalancerClient.class)
	public LoadBalancerClient loadBalancerClient() {
		return new RibbonLoadBalancerClient(springClientFactory());
	}

在创建RibbonLoadBalancerClient时调用springClientFactory方法创建SpringClientFactory

    @Bean
	public SpringClientFactory springClientFactory() {
		SpringClientFactory factory = new SpringClientFactory();
		factory.setConfigurations(this.configurations);
		return factory;
	}

SpringClientFactoryNamedContextFactory的子类,其构建方法调用父类构造方法时传入了一个配置类RibbonClientConfiguration.class,这是RibbonClientConfiguration配置类生效的原因。

public class SpringClientFactory extends NamedContextFactory<RibbonClientSpecification> {

	static final String NAMESPACE = "ribbon";

	public SpringClientFactory() {
		super(RibbonClientConfiguration.class, NAMESPACE, "ribbon.client.name");
	}
}

SpringClientFactory会为每个服务提供者创建一个ApplicationContext,实现bean的隔离,解决bean名称冲突问题,以及实现使用不同配置。

在创建ApplicationContext时会注册defaultConfigTypebean工厂,该defaultConfigType就是构造方法传递进来的RibbonClientConfiguration.class

protected AnnotationConfigApplicationContext createContext(String name) {
        // 创建ApplicationContext
		AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
		......
		// 注册多个Configuration类
		context.register(PropertyPlaceholderAutoConfiguration.class,
		                this.defaultConfigType);
		......
		// 调用ApplicationContext的refresh方法
		context.refresh();
		return context;
	}

createContext方法是什么时候被调用的呢?

sck-demo中服务消费者调用服务提供者接口为例:

@Service
public class DemoInvokeServiceImpl implements DemoInvokeService {

    @Resource
    private DemoService demoService;

    @Override
    public ListGenericResponse<DemoDto> invokeDemo() {
        return demoService.getServices();
    }

}

DemoService是被@FeignClient注解声明的接口,当我们调用DemoService的某个方法时,经过《Spring Cloud OpenFeign源码分析》我们知道,最终会调用到LoadBalancerFeignClientexecute方法时。

public class LoadBalancerFeignClient implements Client {
    //............
	private SpringClientFactory clientFactory;

       // 后面再分析execute方法
	@Override
	public Response execute(Request request, Request.Options options) throws IOException{
	    // .....
	    IClientConfig requestConfig = getClientConfig(options, clientName);
	    // .....
	}
}

execute方法中需要调用getClientConfig方法从SpringClientFactory获取IClientConfig实例,即获取客户端配置。getClientConfig方法就是要从服务提供者的ApplicationContext工厂中获取实现了IClientConfig接口的bean

当首次调用某个服务提供者的接口时,由于并未初始化AnnotationConfigApplicationContext,因此会先调用createContext方法创建ApplicationContext,该方法将RibbonClientConfiguration类注册到ApplicationContext,最后调用context.refresh();时就会调用到RibbonClientConfiguration的被@Bean注释的方法。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
		RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
    // ........
    
	@RibbonClientName
	private String name = "client";

	@Autowired
	private PropertiesFactory propertiesFactory;

    // IClientConfig实例,配置client的连接超时、读超时等
	@Bean
	@ConditionalOnMissingBean
	public IClientConfig ribbonClientConfig() {
		DefaultClientConfigImpl config = new DefaultClientConfigImpl();
		config.loadProperties(this.name);
		config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
		config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
		config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
		return config;
	}

    // 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
	@Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
		if (this.propertiesFactory.isSet(IRule.class, name)) {
			return this.propertiesFactory.get(IRule.class, config, name);
		}
		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
		rule.initWithNiwsConfig(config);
		return rule;
	}

    // 配置服务更新器,定时从注册中心拉去服务,由ILoadBalancer启动
	@Bean
	@ConditionalOnMissingBean
	public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
		return new PollingServerListUpdater(config);
	}

    // 配置ribbon的负载均衡器,默认使用ZoneAwareLoadBalancer
	@Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
		if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
			return this.propertiesFactory.get(ILoadBalancer.class, config, name);
		}
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}
	
	// ......其它的暂时不去理解
}

ILoadBalancerRibbon定义的负载均衡接口。ZoneAwareLoadBalancerDynamicServerListLoadBalancer的子类,DynamicServerListLoadBalancer封装了服务更新逻辑。

DynamicServerListLoadBalancer在构造方法中调用enableAndInitLearnNewServersFeature方法开启服务更新器ServerListUpdaterServerListUpdater定时从注册中心拉取可用的服务更新服务列表缓存。

public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
    protected volatile ServerListUpdater serverListUpdater;
    protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
            updateListOfServers();
        }
    };
    public void enableAndInitLearnNewServersFeature(){
        serverListUpdater.start(updateAction);
    }
    // 调用ServerList获取服务
    @VisibleForTesting
    public void updateListOfServers() {
        List<T> servers = new ArrayList<T>();
        if (serverListImpl != null) {
            servers = serverListImpl.getUpdatedListOfServers();
            // 如果需要过滤
            if (filter != null) {
                servers = filter.getFilteredListOfServers(servers);
            }
        }
        updateAllServerList(servers);
    }
}

Ribbon是如何实现负载均衡的

ServerList我们后面再讲解,先搞清楚openfeginribbon整合后的整个调用链路。我们继续从LoadBalancerFeignClientexecute方法继续分析。(LoadBalancerFeignClient是由FeignRibbonClientAutoConfiguration自动配置类配置的,如果忘记的话可以再看下上一篇。)

public class LoadBalancerFeignClient implements Client {
    //............
	private SpringClientFactory clientFactory;

	@Override
	public Response execute(Request request, Request.Options options) throws IOException {
		try {
			URI asUri = URI.create(request.url());
			// 拿到的是服务的名称,如:sck-demo-prodiver
			String clientName = asUri.getHost();
			URI uriWithoutHost = cleanUrl(request.url(), clientName);
			// delegate是:class Default implements Client {}
			FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
					this.delegate, request, uriWithoutHost);
			// 首先获取客户端配置
			IClientConfig requestConfig = getClientConfig(options, clientName);
			return
			    // 负载均衡选择一个服务提供者
			    lbClient(clientName)
			    // 调用接口获取响应结果
				.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
		}
		catch (ClientException e) {
			IOException io = findIOException(e);
			if (io != null) {
				throw io;
			}
			throw new RuntimeException(e);
		}
	}
}

lbClient创建一个FeignLoadBalancer对象,调用FeignLoadBalancerexecuteWithLoadBalancer方法实现负载均衡调用接口,最终会调用到FeignLoadBalancerexecute方法。Ribbon使用RxJava实现异步调用转同步阻塞获取结果。

public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        // command也封装了负载均衡的实现逻辑
        LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
        try {
            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            // 调用FeignLoadBalancer的execute方法
                            return Observable.just(
                            AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)
                            );
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
           .....
        }
        
    }

LoadBalancerCommandsubmit方法代码比较多,逻辑也比较复杂,因此就不展开说明了。

public Observable<T> submit(final ServerOperation<T> operation) {
    // Use the load balancer
    Observable<T> o = (server == null ? selectServer() : Observable.just(server))
}

selectServer方法返回一个Observable<Server>ObservableRxJavaAPI,我们跳过这部分。

private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    // 调用LoadBalancerContext的getServerFromLoadBalancer方法
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }

selectServer方法中调用LoadBalancerContextgetServerFromLoadBalancer方法获取一个服务提供者,此LoadBalancerContext实际是FeignLoadBalancer(在buildLoadBalancerCommand方法中可以找到答案)。

getServerFromLoadBalancer方法部分代码如下:

public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
    ILoadBalancer lb = getLoadBalancer();
    if (host == null) {
        if (lb != null){
            Server svc = lb.chooseServer(loadBalancerKey);
            return svc;
        } 
        // .....
    }
}

由于Ribbon默认使用的ILoadBalancerZoneAwareLoadBalancer,因此getLoadBalancer方法返回的是ZoneAwareLoadBalancer。获取到负载均衡器后调用负载均衡器的chooseServer选择一个服务提供者。

ZoneAwareLoadBalancerchooseServer方法:

    @Override
    public Server chooseServer(Object key) {
        if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }
    }

if条件成立时,调用的是父类BaseLoadBalancerchooseServer方法:

public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {
        
        protected IRule rule = DEFAULT_RULE;
        
    public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                // 调用IRule的choose方法,rule是在创建ZoneAwareLoadBalancer时通过构造方法注入的
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }
}

IRule是服务选择器、是负载均衡算法的实现。在RibbonAutoConfiguration配置类中注入。

    // 配置Ribbon使用的负载均衡算法,默认使用ZoneAvoidanceRule
	@Bean
	@ConditionalOnMissingBean
	public IRule ribbonRule(IClientConfig config) {
		if (this.propertiesFactory.isSet(IRule.class, name)) {
			return this.propertiesFactory.get(IRule.class, config, name);
		}
		ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
		rule.initWithNiwsConfig(config);
		return rule;
	}

在创建ZoneAwareLoadBalancer时通过构造方法注入。

    @Bean
	@ConditionalOnMissingBean
	public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
			ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
			IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
	        // ......
		return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
				serverListFilter, serverListUpdater);
	}

至于ZoneAvoidanceRule是怎么从多个服务提供者中选择一个调用的,这就是负载均衡算法的实现,本篇不做分析。

Ribbon是如何从注册中心获取服务提供者的

前面我们分析到,ZoneAwareLoadBalancerDynamicServerListLoadBalancer的子类,DynamicServerListLoadBalancer封装了服务更新逻辑,定时调用ServerListgetUpdatedListOfServers方法从注册中心拉取服务。

ServerListribbon-loadbalancer包下的类,并不是spring-cloud的接口,所以与spring-cloud的服务发现接口是没有关系的。

public interface ServerList<T extends Server> {

    public List<T> getInitialListOfServers();
    
    /**
     * Return updated list of servers. This is called say every 30 secs
     * (configurable) by the Loadbalancer's Ping cycle
     * 
     */
    public List<T> getUpdatedListOfServers();   

}

在分析RibbonClientConfiguration时,我们发现有一个方法会注册一个ServerList<Server>,但这个方法必不会执行到。

public class RibbonClientConfiguration {
    
    @Bean
	@ConditionalOnMissingBean
	public ServerList<Server> ribbonServerList(IClientConfig config) {
		if (this.propertiesFactory.isSet(ServerList.class, name)) {
			return this.propertiesFactory.get(ServerList.class, config, name);
		}
		ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
		serverList.initWithNiwsConfig(config);
		return serverList;
	}
	
}

因为我们在sck-demo项目中使用的是spring-cloud-starter-kubernetes-ribbon,所以我们现在来看下spring-cloud-kubernetes-ribbon负责做什么。首先从spring-cloud-starter-kubernetes-ribbonspring.factories文件中找到自动配置类。

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.kubernetes.ribbon.RibbonKubernetesAutoConfiguration

自动配置类RibbonKubernetesAutoConfiguration的源码如下:

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnBean(SpringClientFactory.class)
@ConditionalOnProperty(value = "spring.cloud.kubernetes.ribbon.enabled",matchIfMissing = true)
@AutoConfigureAfter(RibbonAutoConfiguration.class)
@RibbonClients(defaultConfiguration = KubernetesRibbonClientConfiguration.class)
public class RibbonKubernetesAutoConfiguration {
}

SpringClientFactory我们分析过了,RibbonAutoConfiguration我们也分析过了,只剩下KubernetesRibbonClientConfiguration这个配置类。

KubernetesRibbonClientConfiguration是使用@RibbonClients注解导入的配置类,也就是通过ImportBeanDefinitionRegistrar注册的。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties(KubernetesRibbonProperties.class)
public class KubernetesRibbonClientConfiguration {

	@Bean
	@ConditionalOnMissingBean
	public ServerList<?> ribbonServerList(KubernetesClient client, IClientConfig config,
			KubernetesRibbonProperties properties) {
		KubernetesServerList serverList;
		if (properties.getMode() == KubernetesRibbonMode.SERVICE) {
			serverList = new KubernetesServicesServerList(client, properties);
		}
		else {
			serverList = new KubernetesEndpointsServerList(client, properties);
		}
		serverList.initWithNiwsConfig(config);
		return serverList;
	}

}

看到这我们就明白了,spring-cloud-kubernetes-ribbon负责实现ribbon的服务列表接口ServerList<Server>。当spring.cloud.kubernetes.ribbon.mode配置为SERVICE时,使用KubernetesServicesServerList,否则使用KubernetesEndpointsServerList。默认modePOD

@ConfigurationProperties(prefix = "spring.cloud.kubernetes.ribbon")
public class KubernetesRibbonProperties {
	/**
	 * Ribbon enabled,default true.
	 */
	private Boolean enabled = true;
	/**
	 * {@link KubernetesRibbonMode} setting ribbon server list with ip of pod or service
	 * name. default value is POD.
	 */
	private KubernetesRibbonMode mode = KubernetesRibbonMode.POD;
	/**
	 * cluster domain.
	 */
	private String clusterDomain = "cluster.local";
}

KubernetesRibbonMode是个枚举类,支持podservice

public enum KubernetesRibbonMode {

	/**
	 * using pod ip and port.
	 */
	POD,
	/**
	 * using kubernetes service name and port.
	 */
	SERVICE

}

什么意思呢? 当modeservice时,就是获取服务提供者在kubernetes中的service的名称和端口,使用这种模式会导致Ribbon的负载均衡失效,转而使用kubernetes的负载均衡。而当modepod时,就是获取服务提供者的podip和端口,该ipkubernetes集群的内部ip,只要服务消费者是部署在同一个kubernetes集群内就能通过podip和服务提供者暴露的端口访问pod上的服务提供者。

如果我们不想使用Ribbon实现负载均衡,那么我们可以在配置文件中添加如下配置项:

spring:
  cloud:
    kubernetes:
      ribbon:
        mode: SERVICE

你学会了吗?下一篇我们了解Spring Cloud Kubernetes的服务注册。

#后端

声明:公众号、CSDN、掘金的曾用名:“Java艺术”,因此您可能看到一些早期的文章的图片有“Java艺术”的水印。

文章推荐

Spring Cloud Kubernetes服务注册与发现实现原理与源码分析

本篇分析Spring Cloud Kubernetes服务注册与发现实现原理,以及Spring Cloud Kubernetes Core&Discovery源码分析。

Ribbon重试策略RetryHandler的配置与源码分析

本篇我们再对Ribbon的重试机制地实现做详细分析,从源码分析找出我们想要地答案,即如何配置Ribbon实现调用每个服务的接口使用不一样的重试策略,如配置失败重试多少次,以及自定义重试策略RetryHandler。

OpenFeign与Ribbon源码分析总结与面试题

本篇介绍OpenFeign与Feign的关系、Feign底层实现原理、Ribbon是什么、Ribbon底层实现原理、Ribbon是如何实现失败重试的?

Spring Cloud OpenFeign源码分析,为什么不导入Ribbon应用会启动不起来?

如果指定了URL,那么getOptional方法不会返回null,且返回的Client是LoadBalancerFeignClient,但不会抛出异常。如果不指定URL,则走负载均衡逻辑,走的是loadBalance方法,且抛出异常。

将分布式项目sck-demo部署到本地kubernetes

本篇介绍如何搭建本地Kubernetes集群,以及将分布式项目sck-demo部署到本地kubernetes,以及实现版本升级和回滚。

Spring Cloud kubernetes入门项目sck-demo

本篇我们将从一个简单的demo上手Spring Cloud kubernetes,当然,我们只用到Spring Cloud kubernetes的服务注册与发现、配置中心模块。