ribbon源码分析之七种负载均衡策略

news/2024/7/10 3:25:35 标签: ribbon, 负载均衡, spring cloud

源码版本为2.1.0RELEASE

书接上文,接下来详细介绍LoadBalancerClient

public interface LoadBalancerClient extends ServiceInstanceChooser {

	/**
	 * Executes request using a ServiceInstance from the LoadBalancer for the specified
	 * service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @param request Allows implementations to execute pre and post actions, such as
	 * incrementing metrics.
	 * @return The result of the LoadBalancerRequest callback on the selected
	 * ServiceInstance.
	 */
	<T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * Executes request using a ServiceInstance from the LoadBalancer for the specified
	 * service.
	 * @param serviceId The service ID to look up the LoadBalancer.
	 * @param serviceInstance The service to execute the request to.
	 * @param request Allows implementations to execute pre and post actions, such as
	 * incrementing metrics.
	 * @return The result of the LoadBalancerRequest callback on the selected
	 * ServiceInstance.
	 */
	<T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;

	/**
	 * Creates a proper URI with a real host and port for systems to utilize.
	 * Some systems use a URI with the logical service name as the host,
	 * such as http://myservice/path/to/service.  This will replace the
	 * service name with the host:port from the ServiceInstance.
	 * @param instance
	 * @param original A URI with the host as a logical service name.
	 * @return A reconstructed URI.
	 */
	URI reconstructURI(ServiceInstance instance, URI original);
}

reconstructURI

首先看reconstructURI方法

使用真实主机和端口创建适当的 URI,供系统使用。某些系统使用带有逻辑服务名称的 URI 作为主机,例如 http:myservicepathtoservice。这将使用来自 ServiceInstance 的 host:port 替换服务名称。参数:实例 – 原始 – 以主机作为逻辑服务名称的 URI。返回: 重建的 URI。

public URI reconstructURI(ServiceInstance instance, URI original) {
		Assert.notNull(instance, "instance can not be null");
		String serviceId = instance.getServiceId();
		RibbonLoadBalancerContext context = this.clientFactory
				.getLoadBalancerContext(serviceId);

		URI uri;
		Server server;
		if (instance instanceof RibbonServer) {
			RibbonServer ribbonServer = (RibbonServer) instance;
			server = ribbonServer.getServer();
			uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
		} else {
			server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
			IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
			ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
			uri = updateToSecureConnectionIfNeeded(original, clientConfig,
					serverIntrospector, server);
		}
		return context.reconstructURIWithServer(server, uri);
	}

如果需要,将方案替换为安全变体。如果 unsecureSchemeMapping 映射包含 uri 方案并且 isSecure(IClientConfig, ServerIntrospector, Server) 为 true,则更新方案,其中unsecureSchemeMapping内容如下:

private static final Map<String, String> unsecureSchemeMapping;
	static
	{
		unsecureSchemeMapping = new HashMap<>();
		unsecureSchemeMapping.put("http", "https");
		unsecureSchemeMapping.put("ws", "wss");
	}

upgradeConnection的作用是把url中的’+‘替换为’%20’

private static URI upgradeConnection(URI uri, String scheme) {
		UriComponentsBuilder uriComponentsBuilder = UriComponentsBuilder.fromUri(uri).scheme(scheme);
		if (uri.getRawQuery() != null) {
			// When building the URI, UriComponentsBuilder verify the allowed characters and does not
			// support the '+' so we replace it for its equivalent '%20'.
			// See issue https://jira.spring.io/browse/SPR-10172
			uriComponentsBuilder.replaceQuery(uri.getRawQuery().replace("+", "%20"));
		}
		return uriComponentsBuilder.build(true).toUri();
	}

reconstructURIWithServer方法则负责最终的url重组,比如

public URI reconstructURIWithServer(Server server, URI original) {
        String host = server.getHost();
        int port = server.getPort();
        String scheme = server.getScheme();
        
        if (host.equals(original.getHost()) 
                && port == original.getPort()
                && scheme == original.getScheme()) {
            return original;
        }
        if (scheme == null) {
            scheme = original.getScheme();
        }
        if (scheme == null) {
            scheme = deriveSchemeAndPortFromPartialUri(original).first();
        }

        try {
            StringBuilder sb = new StringBuilder();
            sb.append(scheme).append("://");
            if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
                sb.append(original.getRawUserInfo()).append("@");
            }
            sb.append(host);
            if (port >= 0) {
                sb.append(":").append(port);
            }
            sb.append(original.getRawPath());
            if (!Strings.isNullOrEmpty(original.getRawQuery())) {
                sb.append("?").append(original.getRawQuery());
            }
            if (!Strings.isNullOrEmpty(original.getRawFragment())) {
                sb.append("#").append(original.getRawFragment());
            }
            URI newURI = new URI(sb.toString());
            return newURI;            
        } catch (URISyntaxException e) {
            throw new RuntimeException(e);
        }
    }

值得注意的是deriveSchemeAndPortFromPartialUri方法会根据server的host和port检查url的host和url,对于https请求如果端口小于0则修改为443,如果是http请求如果端口小于0则修改为80,如果url只包含"/",则应返回“http”和 80

execute

excute方法有两个,我们直接看两个入参的方法

public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
	    return execute(serviceId, request, null);
	}

直接调用了三个入参的方法,区别就在于调用之前通过getServer方法获得server,作为成员变量new了一个ribbbonServer

public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException {
		ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
		Server server = getServer(loadBalancer, hint);
		if (server == null) {
			throw new IllegalStateException("No instances available for " + serviceId);
		}
		RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server,
				serviceId), serverIntrospector(serviceId).getMetadata(server));
		//调用三个参数的方法,其实就是多了一步获取ribbonServer的过程
		return execute(serviceId, ribbonServer, request);
	}

那么就看看getServer方法做了什么

protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
		if (loadBalancer == null) {
			return null;
		}
		// Use 'default' on a null hint, or just pass it on?
		return loadBalancer.chooseServer(hint != null ? hint : "default");
	}

没错,调用了负载均衡器的chooseServer方法,而这个方法,就是本文重点,如下图,默认的负载均衡器是ZoneAwareLoadBalancer
在这里插入图片描述

eureka提供了region和zone两个概念来进行分区,这两个概念均来自于亚马逊的AWS:

region:可以简单理解为地理上的分区,比如亚洲地区,或者华北地区,再或者北京等等,没有具体大小的限制。根据项目具体的情况,可以自行合理划分region。
zone:可以简单理解为region内的具体机房,比如说region划分为北京,然后北京有两个机房,就可以在此region之下划分出zone1,zone2两个zone。

在只有一个分区的情况下,直接调用父类的chooseServer方法

if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
            logger.debug("Zone aware logic disabled or there is only one zone");
            return super.chooseServer(key);
        }

深入此方法

public Server chooseServer(Object key) {
        if (counter == null) {
            counter = createCounter();
        }
        counter.increment();
        if (rule == null) {
            return null;
        } else {
            try {
                return rule.choose(key);
            } catch (Exception e) {
                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
                return null;
            }
        }
    }

可以看到维护了一个volatile修饰的Counter计数器,每次请求计数器加一,然后调用持有的IRule对象的choose方法,IRule的继承关系如下
在这里插入图片描述

那么接下来就重点介绍每个子类

AbstractLoadBalancerRule

负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够在具体实现选择服务策略时,获取到一些负载均衡器中维护的信息来作为分配依据,并以此设计一些算法来实现针对特定场景高效策略。

public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {

    private ILoadBalancer lb;
        
    @Override
    public void setLoadBalancer(ILoadBalancer lb){
        this.lb = lb;
    }
    
    @Override
    public ILoadBalancer getLoadBalancer(){
        return lb;
    }      
}

ClientConfigEnabledRoundRobinRule

RoundRobinRule roundRobinRule = new RoundRobinRule();

    @Override
    public void initWithNiwsConfig(IClientConfig clientConfig) {
        roundRobinRule = new RoundRobinRule();
    }

    @Override
    public void setLoadBalancer(ILoadBalancer lb) {
    	super.setLoadBalancer(lb);
    	roundRobinRule.setLoadBalancer(lb);
    }
    
    @Override
    public Server choose(Object key) {
        if (roundRobinRule != null) {
            return roundRobinRule.choose(key);
        } else {
            throw new IllegalArgumentException(
                    "This class has not been initialized with the RoundRobinRule class");
        }
    }

继承AbstractLoadBalancerRule,内部维护了一个RoundRobinRule,该策略比较特殊,我们一般不直接使用它。虽然我们不会直接使用该策略,但是通过继承该策略,默认的choose就实现了线性轮询机制,在子类中做一些高级策略时通常有可能会存在一些无法实施的情况,那么就可以用父类的实现作为备选。

RoundRobinRule

上面已经提到了RoundRobinRule,那么就先看一下它的实现

public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();

            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }

            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }

            // Next.
            server = null;
        }

        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }

该策略实现了按照线性轮询的方式一次选择每个服务实例的功能。从循环条件中,我们可以维护了一个count计数变量,该变量会在每次循环后累加,也就是说,如果一直选择不到server超过10次,那么就会结束尝试。

WeightedResponseTimeRule

public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            return null;
        }
        Server server = null;

        while (server == null) {
            // get hold of the current reference in case it is changed from the other thread
            List<Double> currentWeights = accumulatedWeights;
            if (Thread.interrupted()) {
                return null;
            }
            List<Server> allList = lb.getAllServers();

            int serverCount = allList.size();

            if (serverCount == 0) {
                return null;
            }

            int serverIndex = 0;

            // last one in the list is the sum of all weights
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
            // No server has been hit yet and total weight is not initialized
            // fallback to use round robin
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {
                    return server;
                }
            } else {
                // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
                double randomWeight = random.nextDouble() * maxTotalWeight;
                // pick the server index based on the randomIndex
                int n = 0;
                for (Double d : currentWeights) {
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }

                server = allList.get(serverIndex);
            }

            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }

            if (server.isAlive()) {
                return (server);
            }

            // Next.
            server = null;
        }
        return server;
    }

该策略是对RoundRobinRule的扩展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,以达到更优的分配效果,它的实现主要有三个核心内容。

定时任务:

WeightedResponseTimeRule策略在初始化的时候会通过serverWeightTimer启动一个定时任务,用来为每个服务实例计算权重,该任务默认30秒执行一次。

public void maintainWeights() {
            ILoadBalancer lb = getLoadBalancer();
            if (lb == null) {
                return;
            }
            
            if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {
                return; 
            }
            
            try {
                logger.info("Weight adjusting job started");
                AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;
                LoadBalancerStats stats = nlb.getLoadBalancerStats();
                if (stats == null) {
                    // no statistics, nothing to do
                    return;
                }
                double totalResponseTime = 0;
                // find maximal 95% response time
                for (Server server : nlb.getAllServers()) {
                    // this will automatically load the stats if not in cache
                    ServerStats ss = stats.getSingleServerStat(server);
                    totalResponseTime += ss.getResponseTimeAvg();
                }
                // weight for each server is (sum of responseTime of all servers - responseTime)
                // so that the longer the response time, the less the weight and the less likely to be chosen
                Double weightSoFar = 0.0;
                
                // create new list and hot swap the reference
                List<Double> finalWeights = new ArrayList<Double>();
                for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }
                setWeights(finalWeights);
            } catch (Exception e) {
                logger.error("Error calculating server weights", e);
            } finally {
                serverWeightAssignmentInProgress.set(false);
            }

        }

如何计算权重

其中totalResponseTime记录平均响应时间

根据LoadBalancerStats中记录的每个实例的统计信息,累加所有实例的平均响应时间,得到总平均响应时间,再通过以下代码为负载均衡器中维护的实例清单逐个计算权重

for (Server server : nlb.getAllServers()) {
                    ServerStats ss = stats.getSingleServerStat(server);
                    double weight = totalResponseTime - ss.getResponseTimeAvg();
                    weightSoFar += weight;
                    finalWeights.add(weightSoFar);   
                }

每个服务器的权重是(所有服务器的响应时间之和 - 响应时间),因此响应时间越长,权重越小,被选中的可能性越小这里为什么要累加而不是直接存在list中呢?带着问题继续看下去

如何进行实例选择:

获取权重区间的最后一个值,其实也就是总权重,因为计算权重时,每个位置都是累加的,最后一个既是总权重。
如果总权重,低于0.01d,采用线性轮询的策略
如果总权重大于等于0.01,就产生一个权重区间内的随机数
遍历维护的权重清单,若权重大于等于随机得到的数值,就选择这个实例

// last one in the list is the sum of all weights
            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); 
// No server has been hit yet and total weight is not initialized
            // fallback to use round robin
            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {
                server =  super.choose(getLoadBalancer(), key);
                if(server == null) {
                    return server;
                }
            } else {
                // generate a random weight between 0 (inclusive) to maxTotalWeight (exclusive)
                double randomWeight = random.nextDouble() * maxTotalWeight;
                // pick the server index based on the randomIndex
                int n = 0;
                for (Double d : currentWeights) {
                    if (d >= randomWeight) {
                        serverIndex = n;
                        break;
                    } else {
                        n++;
                    }
                }

                server = allList.get(serverIndex);
            }

这个算法非常巧妙,我们权重列表中的最后一个元素(累加的总权重)maxTotalWeight,再通过乘以一个随机数,得到一个0到maxTotalWeight之间的一个randomWeight,然后遍历权重列表,当第一次遇到的权重列表的大于randomWeight的元素,其下标对应的服务器就是被选中的服务器。(仔细思考下就会发现,当响应时间越短,则当前权重跟之前的权重差越大,也就是说增量大,随机值randomWeight落在这个增量区间的概率也就越大)。

ResponseTimeWeightedRule

ResponseTimeWeightedRule已经废弃,改用上面的WeightedResponseTimeRule

BestAvailableRule

继续看BestAvailableRule,该类继承了ClientConfigEnabledRoundRobinRule在实现中它注入了负载均衡器的统计对象LoadBalancerStats,同时在具体的choose算法中利用LoadBalancerStats保存的实例统计信息来选择满足要求的实例。从如下源码中我们可以看到,它通过遍历负载据衡器中维护的所有服务shillings,会过滤掉故障的实例,并找出并发请求最小的一个,所以该策略的特性是可选出最空闲的实例。

public Server choose(Object key) {
        if (loadBalancerStats == null) {
            return super.choose(key);
        }
        List<Server> serverList = getLoadBalancer().getAllServers();
        int minimalConcurrentConnections = Integer.MAX_VALUE;
        long currentTime = System.currentTimeMillis();
        Server chosen = null;
        for (Server server: serverList) {
            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);
            if (!serverStats.isCircuitBreakerTripped(currentTime)) {
                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);
                if (concurrentConnections < minimalConcurrentConnections) {
                    minimalConcurrentConnections = concurrentConnections;
                    chosen = server;
                }
            }
        }
        if (chosen == null) {
            return super.choose(key);
        } else {
            return chosen;
        }
    }

看起来是选择负载最小的实例,但是在高并发的情况下,应该要考虑会不会同一时间大量请求打在一台服务器,从而达到相反的效果呢?

PredicateBasedRule

这是一个抽象策略,继承了ClientConfigEnabledRoundRobinRule,从命名中可以猜出这是一个基于Predicate实现的策略

​ 如下面的源码所示,它定义了一个抽象函数getPredicate来获取AbstractServerPredicate对象的实现,而在choose函数中,通过AbstractServerPredicate的chooseRandomlyAfterFiltering函数来选出具体的服务实例。从该函数的命名我们也能大致猜到他的基础逻辑:先通过子类实现的Predicate逻辑来过滤一部分服务实例,然后再以线性轮询的方式从过滤后的实例清单中选出一个

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {
   
    /**
     * Method that provides an instance of {@link AbstractServerPredicate} to be used by this class.
     * 
     */
    public abstract AbstractServerPredicate getPredicate();
        
    /**
     * Get a server by calling {@link AbstractServerPredicate#chooseRandomlyAfterFiltering(java.util.List, Object)}.
     * The performance for this method is O(n) where n is number of servers to be filtered.
     */
    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();
        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
        if (server.isPresent()) {
            return server.get();
        } else {
            return null;
        }       
    }
}

看一下chooseRoundRobinAfterFiltering方法是如何进行过滤的

public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
        List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
        if (eligible.size() == 0) {
            return Optional.absent();
        }
        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
    }

getEligibleServers获得合格的服务器

 public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
        if (loadBalancerKey == null) {
            return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
        } else {
            List<Server> results = Lists.newArrayList();
            for (Server server: servers) {
                if (this.apply(new PredicateKey(loadBalancerKey, server))) {
                    results.add(server);
                }
            }
            return results;            
        }
    }

因为AbstractServerPredicate实现了Predicate接口,而apply方法是该接口中的定义,主要用来实现过滤条件的判断逻辑,它输入的参数则是过滤条件需要用到的一些信息。所以这里chooseRoundRobinAfterFiltering只是定义了一个模板策略:”先过滤清单,再轮询选择“。对于如何过滤,需要我们在AbstractServerPredicate的子类中实现apply方法来确定具体的策略

最后通过incrementAndGetModulo对AbstractServerPredicate的子类返回的结果进行轮询选择

private int incrementAndGetModulo(int modulo) {
        for (;;) {
            int current = nextIndex.get();
            int next = (current + 1) % modulo;
            if (nextIndex.compareAndSet(current, next) && current < modulo)
                return current;
        }
    }

AvailabilityFilteringRule

这个类自身对choose方法进行了一些改进,它并没有像在父类PredicateBasedRule那样,先遍历所有的节点进行过滤,然后在过滤后的集合中选择实例,而是先以线性的方式选择一个实例,接着用过滤条件来判断该实例是否满足要求,若满足就直接使用该实例,如不满足要求就再选择下一个实例,如此循环进行,当这个过程重复了10次还是没有找到符合要求的实例,就采用父类实现的方案

@Override
    public Server choose(Object key) {
        int count = 0;
        Server server = roundRobinRule.choose(key);
        while (count++ <= 10) {
            if (predicate.apply(new PredicateKey(server))) {
                return server;
            }
            server = roundRobinRule.choose(key);
        }
        return super.choose(key);
    }

apply方法是初始化了一个AbstractServerPredicate的子类AvailabilityPredicate

private AbstractServerPredicate predicate;
    
    public AvailabilityFilteringRule() {
    	super();
    	predicate = CompositePredicate.withPredicate(new AvailabilityPredicate(this, null))
                .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
                .build();
    }

深入apply方法,会有一个检查

@Override
    public boolean apply(@Nullable PredicateKey input) {
        LoadBalancerStats stats = getLBStats();
        if (stats == null) {
            return true;
        }
        return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));
    }
    
    
    private boolean shouldSkipServer(ServerStats stats) {        
        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) 
                || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {
            return true;
        }
        return false;
    }

如果使用了断路器并且已经断开或者实例的并发请求数大于阈值则跳过这个server

StickyRule

每次都返回同一个实例

public Server choose(Object key) {
		final List<Server> instances = getLoadBalancer().getServerList(true);
		log.debug(String.format("Instances taken from load balancer [%s]", instances));
		Server localOurInstance = this.ourInstance.get();
		log.debug(String.format("Current saved instance [%s]", localOurInstance));
		//如果存活实例不包含本地保存的实例,则先将本地实例置空
		if (!instances.contains(localOurInstance)) {
			this.ourInstance.compareAndSet(localOurInstance, null);
		}
		//根据构造参数传进来的策略选择一个实例并保存到本地
		if (this.ourInstance.get() == null) {
			Server instance = this.masterStrategy.choose(key);
			if (this.ourInstance.compareAndSet(null, instance)) {
				this.instanceNumber.incrementAndGet();
			}
		}
		//返回本地实例
		return this.ourInstance.get();
	}

如果存活实例不包含本地保存的实例,则先将本地实例置空,否则根据构造参数传进来的策略选择一个实例并保存到本地

RetryRule

该策略实现了一个具备重试机制的实例选择功能。在其内部还定义了一个IRule对象,默认使用了RoundRobinRule实例。而在choose方法中则实现了对内部定义策略反复进行尝试的策略,若期间能够选择到实例就返回,若选择不到就根据设置的尝试时间为阈值,当超过该阈值后就返回null。

public Server choose(ILoadBalancer lb, Object key) {
		long requestTime = System.currentTimeMillis();
		long deadline = requestTime + maxRetryMillis;

		Server answer = null;

		answer = subRule.choose(key);

		if (((answer == null) || (!answer.isAlive()))
				&& (System.currentTimeMillis() < deadline)) {

			InterruptTask task = new InterruptTask(deadline
					- System.currentTimeMillis());

			while (!Thread.interrupted()) {
				answer = subRule.choose(key);

				if (((answer == null) || (!answer.isAlive()))
						&& (System.currentTimeMillis() < deadline)) {
					/* pause and retry hoping it's transient */
					Thread.yield();
				} else {
					break;
				}
			}

			task.cancel();
		}

		if ((answer == null) || (!answer.isAlive())) {
			return null;
		} else {
			return answer;
		}
	}
IRule subRule = new RoundRobinRule();
	long maxRetryMillis = 500;

http://www.niftyadmin.cn/n/1243089.html

相关文章

【我的Android进阶之旅】如何在Retrofit2 中创建动态 URL?

一、需求描述 最近刚有个小需求,是这样的: 我们要post某些数据到业务服务器,而且是需要post相同的数据到不同的业务服务器。这个需要的原因是因为服务那边的架构问题,导致没有在中间做一个中间层来转发不同的微服务去,所以就很恶心的客户端post数据的时候,需要上传到不同…

Day42.String常用类复习概要 -Java常用类#、集合、IO

String类 1.概述 String&#xff1a;字符串&#xff0c;使用一对""引起来表示 ①String声明为final的&#xff0c;不可被继承 ②String 实现了Serializable接口&#xff1a;表示字符串是可支持序列化的。 实现了Comparable接口&#xff1a;表示字符串可比较大小。 …

Ribbon源码之负载均衡器ILoadBalancer

在前面的文章中已经了解了ribbon的基本配置,各种组件的作用&#xff0c;负载均衡策略以及如何配合注册中心实现服务的注册发现&#xff0c;现在还遗留一个问题&#xff0c;服务的新增和更新ribbon到底是如何来支持的呢&#xff1f; ILoadBalancer 定义软件负载均衡器操作接口…

【我的Android进阶之旅】OKHttp出现错误 java.lang.IllegalStateException: Expected Android API level 21+ but was 19

一、错误描述 因为使用kotlin的协程搭配retrofit写http请求接口的时候,提示我自动升级Retrofit到2.6.1版本,OKhttp到4.2.0版本,然后运行一切正常。编写完代码自测的时候,当跑到一台Android4.4(SDK19)设备的时候,出现了如下所述的错误: Caused by: java.lang.IllegalSt…

Day43.计算机体系结构的基本概念(一) -计算机体系结构

&#x1f4d6;计算机体系结构的基本概念&#xff08;一&#xff09; &#x1f33f;1.1.1存储程序计算机 &#x1f33f;1.1.2计算机体系 &#x1f33f;1.1.3计算机系统中的层次概念 机器语言、操作系统、汇编语言构成硬件和软件的界面 翻译&#xff1a; 将一组代码翻译…

【我的Android进阶之旅】 adb logcat 输出同一个进程的所有输出

一、需求描述 最近提测一个需求,因为是后台服务,没有任何界面提示之类的。测试不好验证效果,所以建议他查看日志打印是否正常。 但是测试人员那边没有安装android studio,不好使用adb logcat 命令来打印日志,得告知他如何打印我这个进程的日志。 二、实现 2.1 打印该进…

ribbon源码之服务列表过滤器以及ZoneAwareLoadBalancer

在LoadBalancer的学习中&#xff0c;我们最后还看到了filter的身影,接下来就在本文一探究竟 为什么要了解&#xff1f; 因为在运行过程中&#xff0c;并不是每台Server一直都持续可用&#xff0c;另外多台Server很有可能分部在不同的可用区zone&#xff0c;而很多时候我们希望…

Day43.二叉树的定义性质、存储结构、先中后序递归遍历和非递归遍历 -数据结构

&#x1f4d6;二叉树的定义及性质 n0n1n2-10n01n12n →n0&#xff1d;n21 &#x1f4d6;二叉树的存储结构 二叉树可以通过数组来实现吗&#xff1f;可以 &#x1f4d6;二叉树的先序中序后序遍历 采用递归的方法 &#x1f4d6;二叉树的非递归遍历 学习资源来源&…