【Hystrix权威指南三】Hystrix隔离策略源码分析一
微服务组件
·
在开始隔离策略的源码分析之前,先了解下Hystrix的类组织形式。在Spring中配置的Bean都有一个scop属性,默认是singleton 。保证在一个IOC容器中,一个class只有一个实例,减少内存的占用,同时又利用GC。另外:Spring中的单例模式和普通的单例模式是不一样的,在Spring中是一个IOC容器中只有一个实例,但是一个JVM可以有多个IOC容器。而普通的单例是指一个JVM中只能有一个实例。
Hystrix类组织形式
同样在Hystrix内部,也是用的单例模式。每个接口类文件中都有一个Factory,和该接口的实现类,通过该接口文件中的Factory的获取接口的具体实现对象,从而操作对象的相关方法。Factory中持有一个静态的ConCurrentHash类型的
常量,当需要一个接口的实例时,如果Map中没有,则new一个putIfAbsent到map中。
这里有一个小技巧,new对象是放在putIfAbsent方法里的,所以是局部变量,不存在线程安全问题。putIfAbsent又是ConcurrentHashMap提供的方法,内部的通过Segment对同步做了优化,同时又保证了线程安全,所以在初始化对象时不需要Synchornized 。
如果同时有两个线程 访问putIfAbsent方法时,只会有一个能返回instance ,另一个返回null 。 只需要对访问结果进行非null判断即可,如果返回null就表示map中已经有对象了,get即可。如:
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
}
在上一章节中设置CommandGroupKey时,
首先需要把String转换成HystrixCommandGroupKey对象,这里是通过
withGroupKey(HystrixCommandGroupKey.Factory.asKey("hello")) ,
同样其他对象也是通过XXX.Factory.xx操作。
在Hystrix内部所有的对象都是通过xxx(接口).Factory.getInstance()操作的。
Hystrix线程池隔离源码解析
Hystrix命令的执行流程整体来说还是比较逻辑清晰的,响应式编程。首先检查是否开启了缓存,如果开启了,则直接从缓存中读取,否则正常执行。根据构造命令时设置的隔离策略.withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE)
,选择是个从线程池中获取资源还是从信号池中获取许可。
HystrixCommand类继承图:
值得注意的一点是:如果回路是闭合的,就去获取一个许可,此时是并未判断是不是信号隔离,也就是说即使是隔离策略是Thread,这里同样会去获取一个Permit 。那么问题就来了:这样的话岂不是Thread隔离也会占用Permit ? 答案是否定的。这个地方在获取Permit时根据隔离策略决定返回的是TryableSemaphoreNoOp还是TryableSemaphore ,
TryableSemaphoreNoOp是什么逻辑都不执行的,tryAcquire时永远返回true 。这里返回的显然是TryableSemaphoreNoOp了。
下面先看下整体的流程图,对执行流程有一个整体的把握:
threadPoolKey 也是线程池的名字的前缀,默认前缀是 hystrix 。在Hystrix中,核心线程数和最大线程数是一致的,减少线程临时创
建和销毁带来的性能开销。线程池的默认参数都在HystrixThreadPoolProperties中,重点讲解一下参数queueSizeRejectionThreshold 和maxQueueSize 。queueSizeRejectionThreshold默认值是5,允许在队列中的等待的任务数量。maxQueueSize默认值是-1,队列大小。如果是Fast Fail 应用,建议使用默认值。线程池饱满后直接拒绝后续的任务,不再进行等待。代码如下HystrixThreadPool类中:
@Override
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
public boolean isQueueSpaceAvailable() {
if (queueSize <= 0) {
// we don't have a queue so we won't look for space but instead
// let the thread-pool reject or not
return true;
} else {
return threadPool.getQueue().size() < properties.queueSizeRejectionThreshold().get();
}
}
线程池一旦创建完成,相关参数就不会更改,存放在静态的ConcurrentHashMap中,key是对应的commandKey 。而queueSizeRejectionThreshold是每个命令都是设置的。
线程池的相关参数都保存在HystrixThreadPool这个类文件中,线程池的创建方法getThreadPool则在HystrixConcurrencyStrategy类文件中。从getThreadPool方法可以看出线程池的名字就是hystrix-threadPoolKey-threadNumber.
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "hystrix-" + threadPoolKey.name() + "-" + threadNumber.incrementAndGet());
thread.setDaemon(true);
return thread;
}
在HystrixThreadPool实现类的构造方法中,并发HystrixConcurrencyStrategy实例是通过HystrixPlugins获取的,所以可以通过HystrixPlugins设置自定义插件。具体的HystrixPlugins如何使用,会在后面章节中讲解。
线程池的创建
前面说了,在Hystrix内部大部分类都是单实例,同样ThreadPool也不例外,也是单实例。并且相同commandKey的依赖还必须是使用同一个线程池。这就需要把ThreadPool保存在一个静态的map中,key是commandKey,同时要保证线程安全,Hytstrix使用了ConcurrentHashMap。关于为什么不适用HashTable保证线程安全问题的疑问请自行Google。线程池的创建在HystrixThreadPool这个类文件中的内部类Factory中的getInstance方法。
/* package */final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();
String key = threadPoolKey.name();
// this should find it for all but the first time
HystrixThreadPool previouslyCached = threadPools.get(key);
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
synchronized (HystrixThreadPool.class) {
if (!threadPools.containsKey(key)) {
threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
}
}
return threadPools.get(key);
线程池的使用
HystrixCommand类的execute()内部调用了queue() ,queue又调用了父类AbstractCommand的
toObservable方法,toObservable方法处理了是否可缓存问题后,交给了getRunObservableDecoratedForMetricsAndErrorHandling方法,这个方法设置了一系列的executionHook之后,交给了getExecutionObservableWithLifecycle,这个方法通过getExecutionObservable()获取了执行器。getExecutionObservable()是个抽象方法,具体实现放在了子类:HystrixCommand和HystrixObservableCommand类中。下面是HystrixCommand类中的getExecutionObservable方法实现:
final protected Observable<R> getExecutionObservable() {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> s) {
try {
s.onNext(run());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
}
}
});
}
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> s) {
try {
s.onNext(run());
s.onCompleted();
} catch (Throwable e) {
s.onError(e);
}
}
});
}
在这个Call方法中执行了具体的业务逻辑run() ;
更多推荐
已为社区贡献4条内容
所有评论(0)