序
本文主要研究一下eureka client的serviceUrl
serviceUrl解析
DiscoveryClient.scheduleServerEndpointTask
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/DiscoveryClient.java
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
AbstractDiscoveryClientOptionalArgs args) {
Collection<?> additionalFilters = args == null
? Collections.emptyList()
: args.additionalFilters;
EurekaJerseyClient providedJerseyClient = args == null
? null
: args.eurekaJerseyClient;
TransportClientFactories argsTransportClientFactories = null;
if (args != null && args.getTransportClientFactories() != null) {
argsTransportClientFactories = args.getTransportClientFactories();
}
// Ignore the raw types warnings since the client filter interface changed between jersey 1/2
@SuppressWarnings("rawtypes")
TransportClientFactories transportClientFactories = argsTransportClientFactories == null
? new Jersey1TransportClientFactories()
: argsTransportClientFactories;
Optional<SSLContext> sslContext = args == null
? Optional.empty()
: args.getSSLContext();
Optional<HostnameVerifier> hostnameVerifier = args == null
? Optional.empty()
: args.getHostnameVerifier();
// If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
eurekaTransport.transportClientFactory = providedJerseyClient == null
? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier)
: transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
@Override
public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
long delay = getLastSuccessfulRegistryFetchTimePeriod();
if (delay > thresholdInMs) {
logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
thresholdInMs, delay);
return null;
} else {
return localRegionApps.get();
}
}
};
eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
clientConfig,
transportConfig,
eurekaTransport.transportClientFactory,
applicationInfoManager.getInfo(),
applicationsSource
);
if (clientConfig.shouldRegisterWithEureka()) {
EurekaHttpClientFactory newRegistrationClientFactory = null;
EurekaHttpClient newRegistrationClient = null;
try {
newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
transportConfig
);
newRegistrationClient = newRegistrationClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
eurekaTransport.registrationClient = newRegistrationClient;
}
// new method (resolve from primary servers for read)
// Configure new transport layer (candidate for injecting in the future)
if (clientConfig.shouldFetchRegistry()) {
EurekaHttpClientFactory newQueryClientFactory = null;
EurekaHttpClient newQueryClient = null;
try {
newQueryClientFactory = EurekaHttpClients.queryClientFactory(
eurekaTransport.bootstrapResolver,
eurekaTransport.transportClientFactory,
clientConfig,
transportConfig,
applicationInfoManager.getInfo(),
applicationsSource
);
newQueryClient = newQueryClientFactory.newClient();
} catch (Exception e) {
logger.warn("Transport initialization failure", e);
}
eurekaTransport.queryClientFactory = newQueryClientFactory;
eurekaTransport.queryClient = newQueryClient;
}
}
复制代码
构造器里头直接初始化scheduleServerEndpointTask,然后newBootstrapResolver
EurekaHttpClients.newBootstrapResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/EurekaHttpClients.java
public static ClosableResolver<AwsEndpoint> newBootstrapResolver(
final EurekaClientConfig clientConfig,
final EurekaTransportConfig transportConfig,
final TransportClientFactory transportClientFactory,
final InstanceInfo myInstanceInfo,
final ApplicationsResolver.ApplicationsSource applicationsSource)
{
if (COMPOSITE_BOOTSTRAP_STRATEGY.equals(transportConfig.getBootstrapResolverStrategy())) {
if (clientConfig.shouldFetchRegistry()) {
return compositeBootstrapResolver(
clientConfig,
transportConfig,
transportClientFactory,
myInstanceInfo,
applicationsSource
);
} else {
logger.warn("Cannot create a composite bootstrap resolver if registry fetch is disabled." +
" Falling back to using a default bootstrap resolver.");
}
}
// if all else fails, return the default
return defaultBootstrapResolver(clientConfig, myInstanceInfo);
}
复制代码
这里执行defaultBootstrapResolver
/**
* @return a bootstrap resolver that resolves eureka server endpoints based on either DNS or static config,
* depending on configuration for one or the other. This resolver will warm up at the start.
*/
static ClosableResolver<AwsEndpoint> defaultBootstrapResolver(final EurekaClientConfig clientConfig,
final InstanceInfo myInstanceInfo) {
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);
ClusterResolver<AwsEndpoint> delegateResolver = new ZoneAffinityClusterResolver(
new ConfigClusterResolver(clientConfig, myInstanceInfo),
myZone,
true
);
List<AwsEndpoint> initialValue = delegateResolver.getClusterEndpoints();
if (initialValue.isEmpty()) {
String msg = "Initial resolution of Eureka server endpoints failed. Check ConfigClusterResolver logs for more info";
logger.error(msg);
failFastOnInitCheck(clientConfig, msg);
}
return new AsyncResolver<>(
EurekaClientNames.BOOTSTRAP,
delegateResolver,
initialValue,
1,
clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000
);
}
复制代码
从注释可以看到,这里从DNS或者配置文件来解析eureka server的配置,这里的delegateResolver为ZoneAffinityClusterResolver
AsyncResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/AsyncResolver.java
this.backgroundTask = new TimedSupervisorTask(
this.getClass().getSimpleName(),
executorService,
threadPoolExecutor,
refreshIntervalMs,
TimeUnit.MILLISECONDS,
5,
updateTask
);
private final Runnable updateTask = new Runnable() {
@Override
public void run() {
try {
List<T> newList = delegate.getClusterEndpoints();
if (newList != null) {
resultsRef.getAndSet(newList);
lastLoadTimestamp = System.currentTimeMillis();
} else {
logger.warn("Delegate returned null list of cluster endpoints");
}
logger.debug("Resolved to {}", newList);
} catch (Exception e) {
logger.warn("Failed to retrieve cluster endpoints from the delegate", e);
}
}
};
复制代码
这里有个backgroundTask去更新ClusterEndpoints,间隔值为clientConfig.getEurekaServiceUrlPollIntervalSeconds() * 1000毫秒,默认为5分钟 配置项为eureka.client.eureka-service-url-poll-interval-seconds
ZoneAffinityClusterResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ZoneAffinityClusterResolver.java
public List<AwsEndpoint> getClusterEndpoints() {
List<AwsEndpoint>[] parts = ResolverUtils.splitByZone(delegate.getClusterEndpoints(), myZone);
List<AwsEndpoint> myZoneEndpoints = parts[0];
List<AwsEndpoint> remainingEndpoints = parts[1];
List<AwsEndpoint> randomizedList = randomizeAndMerge(myZoneEndpoints, remainingEndpoints);
if (!zoneAffinity) {
Collections.reverse(randomizedList);
}
logger.debug("Local zone={}; resolved to: {}", myZone, randomizedList);
return randomizedList;
}
复制代码
这里调用了delegate.getClusterEndpoints(),这里的delegate为ConfigClusterResolver 这里对得到的List进行randomizeAndMerge
private static List<AwsEndpoint> randomizeAndMerge(List<AwsEndpoint> myZoneEndpoints, List<AwsEndpoint> remainingEndpoints) {
if (myZoneEndpoints.isEmpty()) {
return ResolverUtils.randomize(remainingEndpoints);
}
if (remainingEndpoints.isEmpty()) {
return ResolverUtils.randomize(myZoneEndpoints);
}
List<AwsEndpoint> mergedList = ResolverUtils.randomize(myZoneEndpoints);
mergedList.addAll(ResolverUtils.randomize(remainingEndpoints));
return mergedList;
}
复制代码
ConfigClusterResolver
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/resolver/aws/ConfigClusterResolver.java
public List<AwsEndpoint> getClusterEndpoints() {
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
if (logger.isInfoEnabled()) {
logger.info("Resolving eureka endpoints via DNS: {}", getDNSName());
}
return getClusterEndpointsFromDns();
} else {
logger.info("Resolving eureka endpoints via configuration");
return getClusterEndpointsFromConfig();
}
}
private List<AwsEndpoint> getClusterEndpointsFromConfig() {
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
String myZone = InstanceInfo.getZone(availZones, myInstanceInfo);
Map<String, List<String>> serviceUrls = EndpointUtils
.getServiceUrlsMapFromConfig(clientConfig, myZone, clientConfig.shouldPreferSameZoneEureka());
List<AwsEndpoint> endpoints = new ArrayList<>();
for (String zone : serviceUrls.keySet()) {
for (String url : serviceUrls.get(zone)) {
try {
endpoints.add(new AwsEndpoint(url, getRegion(), zone));
} catch (Exception ignore) {
logger.warn("Invalid eureka server URI: {}; removing from the server pool", url);
}
}
}
logger.debug("Config resolved to {}", endpoints);
if (endpoints.isEmpty()) {
logger.error("Cannot resolve to any endpoints from provided configuration: {}", serviceUrls);
}
return endpoints;
}
复制代码
这里通过EndpointUtils.getServiceUrlsMapFromConfig解析serviceUrl 假设serviceUrl是
eureka:
client:
serviceUrl:
defaultZone: http://127.0.0.1:8761/eureka/,http://127.0.0.1:8762/eureka/
复制代码
则最后得到两个AwsEndpoint
{ serviceUrl='http://127.0.0.1:8761/eureka/', region='us-east-1', zone='defaultZone'}
{ serviceUrl='http://127.0.0.1:8762/eureka/', region='us-east-1', zone='defaultZone'}
复制代码
EndpointUtils.getServiceUrlsMapFromConfig
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/endpoint/EndpointUtils.java
/**
* Get the list of all eureka service urls from properties file for the eureka client to talk to.
*
* @param clientConfig the clientConfig to use
* @param instanceZone The zone in which the client resides
* @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
* @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order
*/
public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
Map<String, List<String>> orderedUrls = new LinkedHashMap<>();
String region = getRegion(clientConfig);
String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
if (availZones == null || availZones.length == 0) {
availZones = new String[1];
availZones[0] = DEFAULT_ZONE;
}
logger.debug("The availability zone for the given region {} are {}", region, availZones);
int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
String zone = availZones[myZoneOffset];
List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
if (serviceUrls != null) {
orderedUrls.put(zone, serviceUrls);
}
int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
while (currentOffset != myZoneOffset) {
zone = availZones[currentOffset];
serviceUrls = clientConfig.getEurekaServerServiceUrls(zone);
if (serviceUrls != null) {
orderedUrls.put(zone, serviceUrls);
}
if (currentOffset == (availZones.length - 1)) {
currentOffset = 0;
} else {
currentOffset++;
}
}
if (orderedUrls.size() < 1) {
throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
}
return orderedUrls;
}
复制代码
这里调用clientConfig.getEurekaServerServiceUrls(zone)解析serviceUrl
EurekaClientConfigBean.getEurekaServerServiceUrls
spring-cloud-netflix-eureka-client-2.0.0.RC1-sources.jar!/org/springframework/cloud/netflix/eureka/EurekaClientConfigBean.java
public List<String> getEurekaServerServiceUrls(String myZone) {
String serviceUrls = this.serviceUrl.get(myZone);
if (serviceUrls == null || serviceUrls.isEmpty()) {
serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
}
if (!StringUtils.isEmpty(serviceUrls)) {
final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
for (String eurekaServiceUrl : serviceUrlsSplit) {
if (!endsWithSlash(eurekaServiceUrl)) {
eurekaServiceUrl += "/";
}
eurekaServiceUrls.add(eurekaServiceUrl);
}
return eurekaServiceUrls;
}
return new ArrayList<>();
}
复制代码
可以看到这里对多个eureka server的地址采用逗号分割
serviceUrl使用
RetryableEurekaHttpClient.execute
eureka-client-1.8.8-sources.jar!/com/netflix/discovery/shared/transport/decorator/RetryableEurekaHttpClient.java
protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
List<EurekaEndpoint> candidateHosts = null;
int endpointIdx = 0;
for (int retry = 0; retry < numberOfRetries; retry++) {
EurekaHttpClient currentHttpClient = delegate.get();
EurekaEndpoint currentEndpoint = null;
if (currentHttpClient == null) {
if (candidateHosts == null) {
candidateHosts = getHostCandidates();
if (candidateHosts.isEmpty()) {
throw new TransportException("There is no known eureka server; cluster server list is empty");
}
}
if (endpointIdx >= candidateHosts.size()) {
throw new TransportException("Cannot execute request on any known server");
}
currentEndpoint = candidateHosts.get(endpointIdx++);
currentHttpClient = clientFactory.newClient(currentEndpoint);
}
try {
EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
delegate.set(currentHttpClient);
if (retry > 0) {
logger.info("Request execution succeeded on retry #{}", retry);
}
return response;
}
logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
} catch (Exception e) {
logger.warn("Request execution failed with message: {}", e.getMessage()); // just log message as the underlying client should log the stacktrace
}
// Connection error or 5xx from the server that must be retried on another server
delegate.compareAndSet(currentHttpClient, null);
if (currentEndpoint != null) {
quarantineSet.add(currentEndpoint);
}
}
throw new TransportException("Retry limit reached; giving up on completing the request");
}
复制代码
可以看到这里先获取candidateHosts,然后支持失败重试,重试默认是3次(
int DEFAULT_NUMBER_OF_RETRIES = 3
) 每重试一次失败,就换下一个eureka server的地址,如果endpointIdx >= candidateHosts.size(),则抛出TransportException("Cannot execute request on any known server")
RetryableEurekaHttpClient.getHostCandidates
private List<EurekaEndpoint> getHostCandidates() {
List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
quarantineSet.retainAll(candidateHosts);
// If enough hosts are bad, we have no choice but start over again
int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
//Prevent threshold is too large
if (threshold > candidateHosts.size()) {
threshold = candidateHosts.size();
}
if (quarantineSet.isEmpty()) {
// no-op
} else if (quarantineSet.size() >= threshold) {
logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
quarantineSet.clear();
} else {
List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
for (EurekaEndpoint endpoint : candidateHosts) {
if (!quarantineSet.contains(endpoint)) {
remainingHosts.add(endpoint);
}
}
candidateHosts = remainingHosts;
}
return candidateHosts;
}
复制代码
这里quarantineSet = new ConcurrentSkipListSet<>(),它维护的是不可用的eureka server列表(
Connection error or 5xx
) 这里有个threshold,是依据eureka.client.transport.retryableClientQuarantineRefreshPercentage来计算的,默认是0.66*candidateHosts.size(),大小最大为candidateHosts.size(),防止quarantineSet.size() < threshold导致不会清空quarantineSet,最后导致remainingHosts为空。
小结
- client端的serviceUrl配置多个eureka server的话,默认是使用随机之后的list中的第一个,如果改server请求都成功,则不会轮到list中的第二个,不过这个list是会定时更新而且随机化的。
- AsyncResolver有个backgroundTask(
默认5分钟执行一次,取决于eureka.client.eureka-service-url-poll-interval-seconds配置
)来刷新eureka server的list,默认还是走ZoneAffinityClusterResolver,然后走ConfigClusterResolver去获取,会重新随机list
所有评论(0)