一、负载均衡介绍

负载均衡是指将客户端的请求均匀地分配给一组服务器的过程,以确保没有一台服务器承担过重的负载,从而提高系统的整体性能和可用性。负载均衡器通过各种算法智能地将请求分发到不同的服务器上,以达到最佳的资源利用效率。
有了API Gateway和Eureka或Nacos注册中心,为什么还需要负载均衡器?负载均衡器在哪呢?具体答案我们还是通过如何访问业务服务节点的两个场景来理解一下。

  • 场景一:API Gateway如何转发到下游服务节点?
    在这里插入图片描述
    从上面的图示,我们发现Gateway里有个LoadBalancer接口专门用来解决下游节点的选址等事项。

  • 场景二:服务间调用RestTemplate如何访问到具体的下游节点?
    在这里插入图片描述
    从上面的图示,我们发现服务间调用如RestTemplate,通过注解@LoadBalanced注入LoadBalancerInterceptor来调用LoadBalancer解决下游节点的选址等事项。

二、负载均衡器Ribbon

Spring Cloud Hoxton.SR1版本之前,Spring Cloud将Ribbon作为默认负载均衡器内置在框架里面,配合Eureka客户端去使用,引入Spring Cloud Hoxton.SR1版本后,开始推荐使用Spring Cloud LoadBalancer,甚至在Spring Cloud 2020.0.0版本开始移除Ribbon或标记为不推荐使用。
本章介绍spring cloud的负载均衡器Ribbon,Ribbon有以下特点:
1.客户端负载均衡
通常的服务发现模式下,客户端调用服务时并不知道实际的IP地址和端口,而是通过服务名来访问。Ribbon 允许客户端通过配置的服务名来透明地进行请求转发,而不需要关心具体的IP和端口信息。
2.支持多种负载均衡策略
Ribbon 提供了多种内置的负载均衡算法,例如轮询(Round Robin)、随机(Random)以及最小连接数(Least Connections)等,还可以自定义负载均衡策略。
支持权重策略(Weighted Response Time),根据服务实例的响应时间动态分配请求。
可以根据服务实例的健康状况和可用性来选择实例。
3.配置灵活性
Ribbon 允许对连接超时、重试次数等进行详细的配置,这使得开发者可以根据应用的具体需求进行微调。
4.集成性
Ribbon 通常与 Spring Cloud 的其他组件如 Eureka、Feign 等结合使用,提供服务发现、断路器等功能。
Feign 客户端本身就集成了 Ribbon,因此使用 Feign 进行 HTTP 请求时,会自动使用 Ribbon 进行负载均衡。
5.易于使用
开发者可以通过简单的注解(如 @LoadBalanced)来启用 Ribbon 的负载均衡能力,而无需编写复杂的代码。

三、Spring Cloud Gateway负载均衡源码分析

  • Ribbon(Spring Cloud Hoxton.SR1之前版本,自带的Ribbon负载均衡器)

我们一般通过网关来访问下游服务节点的API,如下图所示:
在这里插入图片描述
上图可以看出来,在A-Server多节点集群情况下,Gateway是通过Ribbon组件的负载策略去选择相应的具体服务节点的(注意并不是Eureka)。

我们来分析一下源码,Spring Cloud Gateway 里有一个LoadBalancerClientFilter过滤器,这个LoadBalancerClientFilter有个LoadBalancerClient属性,默认是RibbonLoadBalancerClient这个实现的。下面是LoadBalancerClientFilterfilterchoose方法的代码片段:

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
	URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
	String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
	if (url == null || (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
		return chain.filter(exchange);
	}
	//preserve the original url
	addOriginalRequestUrl(exchange, url);

	log.trace("LoadBalancerClientFilter url before: " + url);

	final ServiceInstance instance = choose(exchange);

	if (instance == null) {
		throw new NotFoundException("Unable to find instance for " + url.getHost());
	}

	URI uri = exchange.getRequest().getURI();

	// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,
	// if the loadbalancer doesn't provide one.
	String overrideScheme = null;
	if (schemePrefix != null) {
		overrideScheme = url.getScheme();
	}

	URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);

	log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
	exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
	return chain.filter(exchange);
}
	
protected ServiceInstance choose(ServerWebExchange exchange) {
		return loadBalancer.choose(((URI) exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR)).getHost());
}

我们可以看到LoadBalancerClientFilter通过loadBalancer的choose方法开始选择对应的服务节点,来进行转发(这里的loadBalancer是LoadBalancerClient类型)。通过调试,我们可以发现这里的client其实是RibbonLoadBalancerClient实现的。(这里不具体介绍引入org.springframework.cloud.netflix.ribbon组件的一些自动注入操作如RibbonAutoConfiguration,RibbonClientConfigurationd等,可以自行阅读源码了解)

我们接着往下看会发现不同的Server节点可以动态实现不同的负载策略,如A-Server,B-Server都可以设置自己的负载策略配置。具体配置可以参考如下:

demo-server:
  ribbon:
    # 设置负载均衡策略
    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RoundRobinRule
    # 设置连接超时时间(单位毫秒)
    ConnectTimeout: 2000
    # 设置读取超时时间(单位毫秒)
    ReadTimeout: 5000
    # 设置是否对所有操作都进行重试
    OkToRetryOnAllOperations: false
    # 设置是否启用重试
    RetryEnabled: true
    # 设置重试次数(当连接失败时)
    MaxAutoRetries: 2
    # 设置重试次数(当连接到另一个服务器时)
    MaxAutoRetriesNextServer: 2
    # 设置健康检查类
    NFLoadBalancerPingClassName: com.netflix.loadbalancer.PingUrl
    # 设置连接池的最大连接数
    MaxTotalConnections: 200
    # 设置每台服务器的最大连接数
    MaxConnectionsPerHost:
      default: 20
      enabled: true
    # 设置线程池中的线程数量
    NumThreadsPerHost: 10
    # 设置日志级别
    LogLevel: FULL
    # 设置是否显示客户端统计信息
    ShowClientStats: true
    # 设置是否启用服务器列表刷新
    ServerListRefreshEnabled: true
    # 设置服务器列表刷新间隔时间(单位毫秒)
    ServerListRefreshInterval: 30000
    # 设置是否允许在所有操作上重试
    OkToRetryOnAllOperations: false
    # 设置负载均衡统计信息发送频率(单位毫秒)
    StatsCollectorBucketSize: 5000
    # 设置负载均衡统计信息发送间隔
    StatsCollectorNumBuckets: 5
    # 设置是否启用统计信息收集
    EnableStatsCollection: true
  • Spring Cloud LoadBalancer(Spring Cloud Hoxton.SR1之后版本自带)
    我们一般通过网关访问下游节点如下图:
    在这里插入图片描述

上图可以看出来,在A-Server多节点集群情况下,Gateway是通过LoadBalancer组件的负载策略去选择相应的具体服务节点的。
我们来分析一下源码,新版的Spring Cloud Gateway 里有一个ReactiveLoadBalancerClientFilter过滤器,这个ReactiveLoadBalancerClientFilter有个LoadBalancerClientFactory属性,默认是通过LoadBalancerClientFactory来找到ReactorLoadBalancer。下面是ReactiveLoadBalancerClientFilterfilterchoose方法的代码片段:

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        URI url = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
        String schemePrefix = (String)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_SCHEME_PREFIX_ATTR);
        if (url != null && ("lb".equals(url.getScheme()) || "lb".equals(schemePrefix))) {
            ServerWebExchangeUtils.addOriginalRequestUrl(exchange, url);
            if (log.isTraceEnabled()) {
                log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName() + " url before: " + url);
            }

            URI requestUri = (URI)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);
            String serviceId = requestUri.getHost();
            Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(this.clientFactory.getInstances(serviceId, LoadBalancerLifecycle.class), RequestDataContext.class, ResponseData.class, ServiceInstance.class);
            DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest(new RequestDataContext(new RequestData(exchange.getRequest()), this.getHint(serviceId)));
            LoadBalancerProperties loadBalancerProperties = this.clientFactory.getProperties(serviceId);
            return this.choose(lbRequest, serviceId, supportedLifecycleProcessors).doOnNext((response) -> {
                if (!response.hasServer()) {
                    supportedLifecycleProcessors.forEach((lifecycle) -> {
                        lifecycle.onComplete(new CompletionContext(Status.DISCARD, lbRequest, response));
                    });
                    throw NotFoundException.create(this.properties.isUse404(), "Unable to find instance for " + url.getHost());
                } else {
                    ServiceInstance retrievedInstance = (ServiceInstance)response.getServer();
                    URI uri = exchange.getRequest().getURI();
                    String overrideScheme = retrievedInstance.isSecure() ? "https" : "http";
                    if (schemePrefix != null) {
                        overrideScheme = url.getScheme();
                    }

                    DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(retrievedInstance, overrideScheme);
                    URI requestUrl = this.reconstructURI(serviceInstance, uri);
                    if (log.isTraceEnabled()) {
                        log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
                    }

                    exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR, requestUrl);
                    exchange.getAttributes().put(ServerWebExchangeUtils.GATEWAY_LOADBALANCER_RESPONSE_ATTR, response);
                    supportedLifecycleProcessors.forEach((lifecycle) -> {
                        lifecycle.onStartRequest(lbRequest, response);
                    });
                }
            }).then(chain.filter(exchange)).doOnError((throwable) -> {
                supportedLifecycleProcessors.forEach((lifecycle) -> {
                    lifecycle.onComplete(new CompletionContext(Status.FAILED, throwable, lbRequest, (Response)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_LOADBALANCER_RESPONSE_ATTR)));
                });
            }).doOnSuccess((aVoid) -> {
                supportedLifecycleProcessors.forEach((lifecycle) -> {
                    lifecycle.onComplete(new CompletionContext(Status.SUCCESS, lbRequest, (Response)exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_LOADBALANCER_RESPONSE_ATTR), this.buildResponseData(exchange, loadBalancerProperties.isUseRawStatusCodeInResponseData())));
                });
            });
        } else {
            return chain.filter(exchange);
        }
    }

private Mono<Response<ServiceInstance>> choose(Request<RequestDataContext> lbRequest, String serviceId, Set<LoadBalancerLifecycle> supportedLifecycleProcessors) {
        ReactorLoadBalancer<ServiceInstance> loadBalancer = (ReactorLoadBalancer)this.clientFactory.getInstance(serviceId, ReactorServiceInstanceLoadBalancer.class);
        if (loadBalancer == null) {
            throw new NotFoundException("No loadbalancer available for " + serviceId);
        } else {
            supportedLifecycleProcessors.forEach((lifecycle) -> {
                lifecycle.onStart(lbRequest);
            });
            return loadBalancer.choose(lbRequest);
        }
    }

四、总结

通过上述相关介绍,我们可以发现不管是Gateway的路由转发和服务间调用(RestTemplate或WebClient方式)真正的负载其实是内置的负载均衡器(Ribbon、LoadBalancer),Eureka客户端或者Nacos客户端仅提供下游服务的服务列表清单而已。因此在涉及到负载的相关策略应调整的是负载均衡器的相关配置,而非注册中心客户端的相关配置。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐