什么是Hystrix

Hystrix是Spring Cloud提供的一种带有熔断机制的框架,由于在微服务系统中同一个操作会由多个不同的微服务来共同完成,所以微服务与微服务之间会由很多相互的调用,由于在分布式环境中经常会出现某个微服务节点故障的情况,所以会由调用失败发生,而熔断器的作用就是当出现远程调用失败的时候提供一种机制来保证程序的正常运行而不会卡死在某一次调用,类似Java程序中的try-catch结构,而只有当异常发生的时候才会进入catch的代码块。

使用Hystrix

建立提供服务的服务器

首先构建一个提供服务的服务器项目spring-cloud-server,在其pom.xml文件中加入如下依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tech.codemine</groupId>
    <artifactId>spring-cloud-server</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

接着在项目中新建一个名为hello的package,然后在其中建立一个名为ServerApplication.java的文件,代码如下:

package hello;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@SpringBootApplication
public class ServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ServerApplication.class);
    }

    @RequestMapping("/hello")
    public String hello() {
        return "hello from server";
    }
}

然后在resources目录下建立application.yml文件,里面设置服务的端口号,如下:

server:
  port: 8081

整个项目的结构如下:
在这里插入图片描述
然后启动项目

建立客户端

接着建立连接服务器的客户端,首先建立一个名为hystrix-in-action的项目,项目的pom.xml的依赖如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tech.codemine</groupId>
    <artifactId>hystrix-in-action</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Finchley.SR1</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/libs-milestone</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>

</project>

接着同样建立一个名为hello的package,然后在其中建立一个名为HystrixApplication.java的文件,代码如下:

package hello;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.hystrix.EnableHystrix;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@EnableHystrix
@SpringBootApplication
public class HystrixApplication {

    @Autowired
    HystrixService hystrixService;

    public static void main(String[] args) {
        SpringApplication.run(HystrixApplication.class);
    }

    @RequestMapping("/hi")
    public String hi() {
        return hystrixService.hi();
    }
}


然后建立一个名为HystrixService.java的文件,代码如下:

package hello;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

@Service
public class HystrixService {

    @HystrixCommand(fallbackMethod = "fallback")
    public String hi() {
        return new RestTemplate().getForObject("http://localhost:8081/hello", String.class);
    }

    public String fallback() {
        return "fallback";
    }
}

整个工程的目录如下:

在这里插入图片描述
然后启动项目

测试

我们可以通过IDEA自带的REST Client工具发起HTTP请求进行测试,当然也可以直接在浏览器中输入地址进行测试。

首先输入http://localhost:8081/hi,可以看到得到了“hello from server”的结果,接着关闭spring-cloud-server工程,再次发起请求,可以发现这次得到的结果是“fallback”。

结果解释

在HystrixService类中,我们使用了文章一开始所描述的熔断器机制,在hi()方法中加入了HystrixCommand注解,在注解中我们提供了一个名为“fallback”的字段,这个方法指向的是同一个类中的fallback()方法,当hi()方法调用失败的时候就会自动转而执行fallback()方法。

Hystrix源码解析

Hystrix在底层使用了Spring提供的切面技术。

在HystrixCommandAspect.java文件中可以看到定义的切面:

/**
 * AspectJ aspect to process methods which annotated with {@link HystrixCommand} annotation.
 */
@Aspect
public class HystrixCommandAspect {

    private static final Map<HystrixPointcutType, MetaHolderFactory> META_HOLDER_FACTORY_MAP;

    static {
        META_HOLDER_FACTORY_MAP = ImmutableMap.<HystrixPointcutType, MetaHolderFactory>builder()
                .put(HystrixPointcutType.COMMAND, new CommandMetaHolderFactory())
                .put(HystrixPointcutType.COLLAPSER, new CollapserMetaHolderFactory())
                .build();
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")

    public void hystrixCommandAnnotationPointcut() {
    }

    @Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
    public void hystrixCollapserAnnotationPointcut() {
    }

    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
        //通过切点获取被拦截的方法
        Method method = getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser " +
                    "annotations at the same time");
        }
        MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
        //metaholder中保存了很多和切点相关的信息,详见后文的贴图
        MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
        HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
        ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
                metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();
        
        //result中保留操作的结果,可能是成功操作的返回值也可能是fallback方法的返回值
        Object result;
        try {
            //如果返回的结果使用了observable模式则执行以下代码
            if (!metaHolder.isObservable()) {
                result = CommandExecutor.execute(invokable, executionType, metaHolder);
            } else {
            //否则执行else
                result = executeObservable(invokable, executionType, metaHolder);
            }
        } catch (HystrixBadRequestException e) {
            throw e.getCause() != null ? e.getCause() : e;
        } catch (HystrixRuntimeException e) {
            throw hystrixRuntimeExceptionToThrowable(metaHolder, e);
        }
        return result;
    }
    ..........
}

可以看到在这个类中首先是定义了两个切面,由于我们这里讨论的是HystrixCommand,所以只关注第一个切面,在这个切面里可以看到切点是被HystrixCommand注解的方法,然后对方法的具体增强是在接下来的一个名为methodsAnnotatedWithHystrixCommand()的方法里,具体的流程用注释写在了代码中。

metaHolder的内容:
在这里插入图片描述
可以看到在以上代码中,最重要的便是获取到result具体结果的那一步,由于我们的例子不是使用,所以重点放在

result = CommandExecutor.execute(invokable, executionType, metaHolder);

接下来跳转到CommandExecutor.java中的execute()方法

/**
     * Calls a method of {@link HystrixExecutable} in accordance with specified execution type.
     *
     * @param invokable  {@link HystrixInvokable}
     * @param metaHolder {@link MetaHolder}
     * @return the result of invocation of specific method.
     * @throws RuntimeException
     */
    public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
        Validate.notNull(invokable);
        Validate.notNull(metaHolder);

        switch (executionType) {
            case SYNCHRONOUS: {
                return castToExecutable(invokable, executionType).execute();
            }
            case ASYNCHRONOUS: {
                HystrixExecutable executable = castToExecutable(invokable, executionType);
                if (metaHolder.hasFallbackMethodCommand()
                        && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                    return new FutureDecorator(executable.queue());
                }
                return executable.queue();
            }
            case OBSERVABLE: {
                HystrixObservable observable = castToObservable(invokable);
                return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
            }
            default:
                throw new RuntimeException("unsupported execution type: " + executionType);
        }
    }

在execute()方法中使用了rxjava的观察者模式并最终在这里计算出结果返回。当出现失败是,会调用GenericCommand.java中的getFallBack()方法,代码如下:

/**
     * The fallback is performed whenever a command execution fails.
     * Also a fallback method will be invoked within separate command in the case if fallback method was annotated with
     * HystrixCommand annotation, otherwise current implementation throws RuntimeException and leaves the caller to deal with it
     * (see {@link super#getFallback()}).
     * The getFallback() is always processed synchronously.
     * Since getFallback() can throw only runtime exceptions thus any exceptions are thrown within getFallback() method
     * are wrapped in {@link FallbackInvocationException}.
     * A caller gets {@link com.netflix.hystrix.exception.HystrixRuntimeException}
     * and should call getCause to get original exception that was thrown in getFallback().
     *
     * @return result of invocation of fallback method or RuntimeException
     */
    @Override
    protected Object getFallback() {
        final CommandAction commandAction = getFallbackAction();
        if (commandAction != null) {
            try {
                return process(new Action() {
                    @Override
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable e) {
                LOGGER.error(FallbackErrorMessageBuilder.create()
                        .append(commandAction, e).build());
                throw new FallbackInvocationException(unwrapCause(e));
            }
        } else {
            return super.getFallback();
        }
    }

在这个方法中首先会对是否设置了commandAction进行判断,commandAction就是我们之前所设置的fallback字段指向的方法,如果发现有降级方法,则调用降级方法获取结果,进入process()方法,代码如下:

/**
     * Executes an action. If an action has failed and an exception is ignorable then propagate it as HystrixBadRequestException
     * otherwise propagate original exception to trigger fallback method.
     * Note: If an exception occurred in a command directly extends {@link java.lang.Throwable} then this exception cannot be re-thrown
     * as original exception because HystrixCommand.run() allows throw subclasses of {@link java.lang.Exception}.
     * Thus we need to wrap cause in RuntimeException, anyway in this case the fallback logic will be triggered.
     *
     * @param action the action
     * @return result of command action execution
     */
    Object process(Action action) throws Exception {
        Object result;
        try {
            result = action.execute();
            flushCache();
        } catch (CommandActionExecutionException throwable) {
            Throwable cause = throwable.getCause();
            if (isIgnorable(cause)) {
                throw new HystrixBadRequestException(cause.getMessage(), cause);
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException) cause;
            } else if (cause instanceof Exception) {
                throw (Exception) cause;
            } else {
                // instance of Throwable
                throw new CommandActionExecutionException(cause);
            }
        }
        return result;
    }

最终在process()方法中获取降级方法执行的结果,而如果没有设置降级方法则调用父类的getFallBack()方法,父类的getFallBack方法会抛出一个找不到降级方法的异常,代码如下:

protected T getFallback() {
    throw new RuntimeException("No fallback available.", getExecutionException());
}

至于何时会跳转到降级方法,则是在AbstractCommand.java中,在这里定义了很多种执行失败的情况,通过rxjava框架的观察者模式对错误进行监听,根据不同的情况会进入不同的处理方法,最终这些处理方法都会调用HystrixCommand.java中的getFallbackObservable()方法,并最终进入上文所述的真正执行fallback方法的代码。

以上就是对Hystrix降级机制的说明,想要深入理解Hystrix,必须要对rxjava有比较深刻的认识,在Hystrix中大量运用了rxjava。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐