3. Hystrix 源码

1. 断路器开关

@SpringBootApplication(scanBasePackages = {"len.hgy"})
//注册到eureka
@EnableEurekaClient
//开启断路器功能`
@EnableCircuitBreaker
//开启feign支持,clients指定哪个类开启feign
@EnableFeignClients(clients = {StudentService.class, TeacherServiceFeign.class})
public class MicroWebSecurity {

@EnableCircuitBreaker

@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {
}
public class EnableCircuitBreakerImportSelector
      extends SpringFactoryImportSelector<EnableCircuitBreaker> {

   @Override
   protected boolean isEnabled() {
      return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
            Boolean.class, Boolean.TRUE);
   }

}

// org.springframework.cloud.commons.util.SpringFactoryImportSelector
public String[] selectImports(AnnotationMetadata metadata) {
    if (!isEnabled()) {
        return new String[0];
    }
    AnnotationAttributes attributes = AnnotationAttributes.fromMap(
        metadata.getAnnotationAttributes(this.annotationClass.getName(), true));

    Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
                   + metadata.getClassName() + " annotated with @" + getSimpleName() + "?");

    // Find all possible auto configuration classes, filtering duplicates
    List<String> factories = new ArrayList<>(new LinkedHashSet<>(
        // spi
        SpringFactoriesLoader.loadFactoryNames(this.annotationClass, this.beanClassLoader)
    ));

    if (factories.isEmpty() && !hasDefaultFactory()) {
        throw new IllegalStateException("Annotation @" + getSimpleName()
                                        + " found, but there are no implementations. Did you forget to include a starter?");
    }

    if (factories.size() > 1) {
        // there should only ever be one DiscoveryClient, but there might be more than
        // one factory
        this.log.warn("More than one implementation " + "of @" + getSimpleName()
                      + " (now relying on @Conditionals to pick one): " + factories);
    }

    return factories.toArray(new String[factories.size()]);
}

SPI 加载 EnableCircuitBreaker 类型的类

image-20220310234948143

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration
# 短路器配置功能的bean
org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration

2. 创建 hystrixCommand 注解的切面类

@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {

    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect(); // 命令切面
    }
}

// com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                                        "annotations at the same time");
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
        metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        if (!metaHolder.isObservable()) {
            // 命令执行器执行
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {
        throw e.getCause();
    } catch (HystrixRuntimeException e) {
        throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
    }
    return result;
}

com.netflix.hystrix.contrib.javanica.command.CommandExecutor#execute

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {
        case SYNCHRONOUS: { // 同步调用
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: { // 异步调用
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: { // 观察者模式
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

3. 执行命令

com.netflix.hystrix.HystrixCommand#execute

public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

public Future<R> queue() {
    /*
         * The Future returned by Observable.toBlocking().toFuture() does not implement the
         * interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
         * thus, to comply with the contract of Future, we must wrap around it.
         */
    final Future<R> delegate = toObservable().toBlocking().toFuture(); // here
}

com.netflix.hystrix.AbstractCommand#toObservable

final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
    @Override
    public Observable<R> call() {
        if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
            return Observable.never();
        }
        return applyHystrixSemantics(_cmd); // here
    }
};

4. 熔断器判断是否允许请求

com.netflix.hystrix.AbstractCommand#applyHystrixSemantics

if (circuitBreaker.allowRequest()) { // 判断
    final TryableSemaphore executionSemaphore = getExecutionSemaphore();
    final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
    final Action0 singleSemaphoreRelease = new Action0() {
        @Override
        public void call() {
            if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                executionSemaphore.release();
            }
        }
    };
}

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#allowRequest

public boolean allowRequest() {
    if (properties.circuitBreakerForceOpen().get()) {
        // properties have asked us to force the circuit open so we will allow NO requests
        return false;
    }
    if (properties.circuitBreakerForceClosed().get()) {
        // we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
        isOpen(); // 判断是否断路器已经打开
        // properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
        return true;
    }
    return !isOpen() || allowSingleTest(); // 允许单个测试
}

5. 判断请求数和失败率是否达标,如果都达标就开启熔断器

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#isOpen

public boolean isOpen() {
    if (circuitOpen.get()) {
        return true;
    }
    HealthCounts health = metrics.getHealthCounts();
    // 获取请求总数和阈值判断
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        return false;
    }
	// 获取错误请求比率和阈值比较
    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
        if (circuitOpen.compareAndSet(false, true)) {
			// 如果前置是失败我们设置当前时间, 即最早一次成功的时间
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            return true;
        }
    }
}

6. 当当前时间超过了时间窗口则允许一次请求

com.netflix.hystrix.HystrixCircuitBreaker.HystrixCircuitBreakerImpl#allowSingleTest

处于半开状态

public boolean allowSingleTest() {
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            return true;
        }
    }
    return false;
}

7. 如果是采用的信号量隔离级别

com.netflix.hystrix.AbstractCommand#applyHystrixSemantics

if (executionSemaphore.tryAcquire()) { // 信号量处理
    try {
        /* used to track userThreadExecutionTime */
        executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
        return executeCommandAndObserve(_cmd)
                .doOnError(markExceptionThrown)
                .doOnTerminate(singleSemaphoreRelease)
                .doOnUnsubscribe(singleSemaphoreRelease);
    } catch (RuntimeException e) {
        return Observable.error(e);
    }
} else {
    // 服务降级
    return handleSemaphoreRejectionViaFallback();
}

如果全局变量 count 大于配置的最大请求数,则返回 false,不让请求走降级

public boolean tryAcquire() {
    int currentCount = count.incrementAndGet(); // 这个最少是1
    if (currentCount > numberOfPermits.get()) { // 运行的数量
        count.decrementAndGet();
        return false;
    } else {
        return true;
    }
}

8. 如果是线程池隔离级别,则这个 tryAcquire 方法就会返回 true

com.netflix.hystrix.AbstractCommand.TryableSemaphoreNoOp#tryAcquire

public boolean tryAcquire() {
    return true;
}

com.netflix.hystrix.AbstractCommand#executeCommandAndObserve

Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
    execution = executeCommandWithSpecifiedIsolation(_cmd)
            .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // here
} else {
    execution = executeCommandWithSpecifiedIsolation(_cmd);
}

com.netflix.hystrix.AbstractCommand#executeCommandWithSpecifiedIsolation

if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
    //we have not been unsubscribed, so should proceed
    HystrixCounters.incrementGlobalConcurrentThreads();
    threadPool.markThreadExecution();
    // store the command that is being run
    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
    executionResult = executionResult.setExecutedInThread();
    /**
     * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
     */
    try {
        executionHook.onThreadStart(_cmd);
        executionHook.onRunStart(_cmd);
        executionHook.onExecutionStart(_cmd);
        return getUserExecutionObservable(_cmd); // here
    } catch (Throwable ex) {
        return Observable.error(ex);
    }
} else {
    //command has already been unsubscribed, so return immediately
    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
}

// com.netflix.hystrix.AbstractCommand#getUserExecutionObservable
private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable(); // 这里
    } catch (Throwable ex) {
        userObservable = Observable.error(ex);
    }

    return userObservable
        .lift(new ExecutionHookApplication(_cmd))
        .lift(new DeprecatedOnRunHookApplication(_cmd));
}

这里一切都合格,则 hystrix 就判断可以调用后端服务接口,则会反射调用被代理方法

final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(run()); // 这里
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}

这个钩子返回会掉到

com.netflix.hystrix.contrib.javanica.command.GenericCommand#run

protected Object run() throws Exception {
    LOGGER.debug("execute command: {}", getCommandKey().name());
    return process(new Action() {
        @Override
        Object execute() {
            return getCommandAction().execute(getExecutionType());
        }
    });
}

接下来就是反射调用了

com.netflix.hystrix.contrib.javanica.command.MethodExecutionAction#execute(com.netflix.hystrix.contrib.javanica.command.ExecutionType)

public Object execute(ExecutionType executionType) throws CommandActionExecutionException {
    return executeWithArgs(executionType, _args);
}

com.netflix.hystrix.contrib.javanica.command.MethodExecutionAction#execute(java.lang.Object, java.lang.reflect.Method, java.lang.Object...)

private Object execute(Object o, Method m, Object... args) throws CommandActionExecutionException {
    Object result = null;
    try {
        m.setAccessible(true); // suppress Java language access
        if (isCompileWeaving() && metaHolder.getAjcMethod() != null) {
            result = invokeAjcMethod(metaHolder.getAjcMethod(), o, metaHolder, args);
        } else {
            result = m.invoke(o, args); // 这里
        }
    } catch (IllegalAccessException e) {
        propagateCause(e);
    } catch (InvocationTargetException e) {
        propagateCause(e);
    }
    return result;
}

9. 线程池的创建及超时控制

com.netflix.hystrix.AbstractCommand#executeCommandAndObserve

Observable<R> execution;
if (properties.executionTimeoutEnabled().get()) {
    execution = executeCommandWithSpecifiedIsolation(_cmd)
            .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); // here
} else {
    execution = executeCommandWithSpecifiedIsolation(_cmd);
																	

com.netflix.hystrix.AbstractCommand#handleCommandEnd

if (endCurrentThreadExecutingCommand != null) {
    endCurrentThreadExecutingCommand.call();
}

com.netflix.hystrix.AbstractCommand.HystrixObservableTimeoutOperator#call

final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
public Reference<TimerListener> addTimerListener(final TimerListener listener) {
    startThreadIfNeeded();
    // add the listener

    Runnable r = new Runnable() {

        @Override
        public void run() {
            try {
                listener.tick();
            } catch (Exception e) {
                logger.error("Failed while ticking TimerListener", e);
            }
        }
    };
	// 这里创建了线程
    ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
    return new TimerReference(listener, f);
}

其他的,比如熔断器开启,线程池,信号量都满了,则会走到降级方法

com.netflix.hystrix.AbstractCommand#applyHystrixSemantics

    if (executionSemaphore.tryAcquire()) {
        try {
            /* used to track userThreadExecutionTime */
            executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
            return executeCommandAndObserve(_cmd)
                    .doOnError(markExceptionThrown)
                    .doOnTerminate(singleSemaphoreRelease)
                    .doOnUnsubscribe(singleSemaphoreRelease);
        } catch (RuntimeException e) {
            return Observable.error(e);
        }
    } else {
        return handleSemaphoreRejectionViaFallback();
    }
} else {
    return handleShortCircuitViaFallback();
}

这里也是会反射调用到 fallback 方法,fallback 降级方法也是有信号量和线程池的大小控制 的,也就是信号量或线程池是多少大小,fallback 降级方法也会接收多少降级的请求。(断路器的阈值也是降级开始拒绝的阈值)

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐