三. 微服务源码阅读-Hystrix 源码
3. Hystrix 源码1. 断路器开关@SpringBootApplication(scanBasePackages = {"len.hgy"})//注册到eureka@EnableEurekaClient//开启断路器功能`@EnableCircuitBreaker//开启feign支持,clients指定哪个类开启feign@EnableFeignClients(clients = {St
3. Hystrix 源码
1. 断路器开关
@SpringBootApplication(scanBasePackages = {"len.hgy"})
//注册到eureka
@EnableEurekaClient
//开启断路器功能`
@EnableCircuitBreaker
//开启feign支持,clients指定哪个类开启feign
@EnableFeignClients(clients = {StudentService.class, TeacherServiceFeign.class})
public class MicroWebSecurity {
@EnableCircuitBreaker
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
public class EnableCircuitBreakerImportSelector
extends SpringFactoryImportSelector<EnableCircuitBreaker> {
@Override
protected boolean isEnabled() {
return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
Boolean.class, Boolean.TRUE);
}
}
// org.springframework.cloud.commons.util.SpringFactoryImportSelector
public String[] selectImports(AnnotationMetadata metadata) {
if (!isEnabled()) {
return new String[0];
}
AnnotationAttributes attributes = AnnotationAttributes.fromMap(
metadata.getAnnotationAttributes(this.annotationClass.getName(), true));
Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
+ metadata.getClassName() + " annotated with @" + getSimpleName() + "?");
// Find all possible auto configuration classes, filtering duplicates
List<String> factories = new ArrayList<>(new LinkedHashSet<>(
// spi
SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)
));
if (factories.isEmpty() && !hasDefaultFactory()) {
throw new IllegalStateException("Annotation @" + getSimpleName()
+ " found, but there are no implementations. Did you forget to include a starter?");
}
if (factories.size() > 1) {
// there should only ever be one DiscoveryClient, but there might be more than
// one factory
this.log.warn("More than one implementation " + "of @" + getSimpleName()
+ " (now relying on @Conditionals to pick one): " + factories);
}
return factories.toArray(new String[factories.size()]);
}
SPI 加载 EnableCircuitBreaker 类型的类
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
# 短路器配置功能的bean
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration
2. 创建 hystrixCommand 注解的切面类
@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {
@Bean
public HystrixCommandAspect hystrixCommandAspect() {
return new HystrixCommandAspect(); // 命令切面
}
}
// com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
Method method = getMethodFromTarget(joinPoint);
Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
"annotations at the same time");
}
MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
Object result;
try {
if (!metaHolder.isObservable()) {
// 命令执行器执行
result = CommandExecutor.execute(invokable, executionType, metaHolder);
} else {
result = executeObservable(invokable, executionType, metaHolder);
}
} catch (HystrixBadRequestException e) {
throw e.getCause();
} catch (HystrixRuntimeException e) {
throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
}
return result;
}
com.netflix.hystrix.contrib.javanica.command.CommandExecutor#execute
public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
Validate.notNull(invokable);
Validate.notNull(metaHolder);
switch (executionType) {
case SYNCHRONOUS: { // 同步调用
return castToExecutable(invokable, executionType).execute();
}
case ASYNCHRONOUS: { // 异步调用
HystrixExecutable executable = castToExecutable(invokable, executionType);
if (metaHolder.hasFallbackMethodCommand()
&& ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
return new FutureDecorator(executable.queue());
}
return executable.queue();
}
case OBSERVABLE: { // 观察者模式
HystrixObservable observable = castToObservable(invokable);
return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
}
default:
throw new RuntimeException("unsupported execution type: " + executionType);
}
}
3. 执行命令
com.netflix.hystrix.HystrixCommand#execute
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture(); // here
}
com.netflix.hystrix.AbstractCommand#toObservable
final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
return Observable.never();
}
return applyHystrixSemantics(_cmd); // here
}
};
4. 熔断器判断是否允许请求
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
if (circuitBreaker.allowRequest()) { // 判断
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
}
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#allowRequest
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen(); // 判断是否断路器已经打开
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest(); // 允许单个测试
}
5. 判断请求数和失败率是否达标,如果都达标就开启熔断器
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#isOpen
public boolean isOpen() {
if (circuitOpen.get()) {
return true;
}
HealthCounts health = metrics.getHealthCounts();
// 获取请求总数和阈值判断
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
return false;
}
// 获取错误请求比率和阈值比较
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
if (circuitOpen.compareAndSet(false, true)) {
// 如果前置是失败我们设置当前时间, 即最早一次成功的时间
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
return true;
}
}
}
6. 当当前时间超过了时间窗口则允许一次请求
com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#allowSingleTest
处于半开状态
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
return true;
}
}
return false;
}
7. 如果是采用的信号量隔离级别
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
if (executionSemaphore.tryAcquire()) { // 信号量处理
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
// 服务降级
return handleSemaphoreRejectionViaFallback();
}
如果全局变量 count 大于配置的最大请求数,则返回 false,不让请求走降级
public boolean tryAcquire() {
int currentCount = count.incrementAndGet(); // 这个最少是1
if (currentCount > numberOfPermits.get()) { // 运行的数量
count.decrementAndGet();
return false;
} else {
return true;
}
}
8. 如果是线程池隔离级别,则这个 tryAcquire 方法就会返回 true
com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp#tryAcquire
public boolean tryAcquire() {
return true;
}
com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // here
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
}
com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
//we have not been unsubscribed, so should proceed
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
// store the command that is being run
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
/**
* If any of these hooks throw an exception, then it appears as if the actual execution threw an error
*/
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd); // here
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//command has already been unsubscribed, so return immediately
return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}
// com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
Observable<R> userObservable;
try {
userObservable = getExecutionObservable(); // 这里
} catch (Throwable ex) {
userObservable = Observable.error(ex);
}
return userObservable
.lift(new ExecutionHookApplication(_cmd))
.lift(new DeprecatedOnRunHookApplication(_cmd));
}
这里一切都合格,则 hystrix 就判断可以调用后端服务接口,则会反射调用被代理方法
final protected Observable<R> getExecutionObservable() {
return Observable.defer(new Func0<Observable<R>>() {
@Override
public Observable<R> call() {
try {
return Observable.just(run()); // 这里
} catch (Throwable ex) {
return Observable.error(ex);
}
}
}).doOnSubscribe(new Action0() {
@Override
public void call() {
// Save thread on which we get subscribed so that we can interrupt it later if needed
executionThread.set(Thread.currentThread());
}
});
}
这个钩子返回会掉到
com.netflix.hystrix.contrib.javanica.command.GenericCommand#run
protected Object run() throws Exception {
LOGGER.debug("execute command: {}", getCommandKey().name());
return process(new Action() {
@Override
Object execute() {
return getCommandAction().execute(getExecutionType());
}
});
}
接下来就是反射调用了
com.netflix.hystrix.contrib.javanica.command.MethodExecutionAction#execute(com.netflix.hystrix.contrib.javanica.command.ExecutionType)
public Object execute(ExecutionType executionType) throws CommandActionExecutionException {
return executeWithArgs(executionType, _args);
}
com.netflix.hystrix.contrib.javanica.command.MethodExecutionAction#execute(java.lang.Object, java.lang.reflect.Method, java.lang.Object...)
private Object execute(Object o, Method m, Object... args) throws CommandActionExecutionException {
Object result = null;
try {
m.setAccessible(true); // suppress Java language access
if (isCompileWeaving() && metaHolder.getAjcMethod() != null) {
result = invokeAjcMethod(metaHolder.getAjcMethod(), o, metaHolder, args);
} else {
result = m.invoke(o, args); // 这里
}
} catch (IllegalAccessException e) {
propagateCause(e);
} catch (InvocationTargetException e) {
propagateCause(e);
}
return result;
}
9. 线程池的创建及超时控制
com.netflix.hystrix.AbstractCommand#executeCommandAndObserve
Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
execution = executeCommandWithSpecifiedIsolation(_cmd)
.lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // here
} else {
execution = executeCommandWithSpecifiedIsolation(_cmd);
com.netflix.hystrix.AbstractCommand#handleCommandEnd
if (endCurrentThreadExecutingCommand != null) {
endCurrentThreadExecutingCommand.call();
}
com.netflix.hystrix.AbstractCommand.HystrixObservableTimeoutOperator#call
final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
startThreadIfNeeded();
// add the listener
Runnable r = new Runnable() {
@Override
public void run() {
try {
listener.tick();
} catch (Exception e) {
logger.error("Failed while ticking TimerListener", e);
}
}
};
// 这里创建了线程
ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
return new TimerReference(listener, f);
}
其他的,比如熔断器开启,线程池,信号量都满了,则会走到降级方法
com.netflix.hystrix.AbstractCommand#applyHystrixSemantics
if (executionSemaphore.tryAcquire()) {
try {
/* used to track userThreadExecutionTime */
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
return handleSemaphoreRejectionViaFallback();
}
} else {
return handleShortCircuitViaFallback();
}
这里也是会反射调用到 fallback 方法,fallback 降级方法也是有信号量和线程池的大小控制 的,也就是信号量或线程池是多少大小,fallback 降级方法也会接收多少降级的请求。(断路器的阈值也是降级开始拒绝的阈值)
更多推荐
所有评论(0)