1.利用HashMap加同步

  1. 说明:把HashMap当作缓存容器。每缓存一个key的时候,都进行同步。

  2. 代码:

package memory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class MemorySecond<K, V> implements Computable<K, V> {
    private final Map<K, V> cache = new ConcurrentHashMap<>();
    private final Computable<K, V> c;

    public MemorySecond(Computable<K, V> c) {
        this.c = c;
    }

    @Override
    public V compute(K arg) throws InterruptedException {
        V result = cache.get(arg);

        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }

        return result;
    }
}
  1. 缺点:由于HashMap并非线程安全,因此每一次计算都使用同步机制确保线程安全。很明显,这种方式伸缩性比较差。因为一个线程正在计算结果,其它所有线程都在等待,即使对应的arg是不同的。

2.用ConcurrentHashMap代替HashMap

  1. 说明:ConcurrentHashMap是线程安全的,并且同步并非对整个Map进行同步而是对每一个分段进行同步,所以并发性也可以大大提升。
  2. 代码:
package memory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MemorySecond<K, V> implements Computable<K, V> {
    private final Map<K, V> cache = new ConcurrentHashMap<>();
    private final Computable<K, V> c;

    public MemorySecond(Computable<K, V> c) {
        this.c = c;
    }

    @Override
    public V compute(K arg) throws InterruptedException {
        V result = cache.get(arg);

        if (result == null) {
            result = c.compute(arg);
            cache.put(arg, result);
        }

        return result;
    }
}
  1. 缺点:相比第一个设计方案。这种方案已经有很大的提升了。但是如果一个compute的计算开销很大,恰巧有另一个同一个arg的线程同时请求compute,则会造成重复计算,重复put的情况。所以我们希望如果有一个线程正在计算的时候另一个线程正在等待而不是重复计算。

3.利用FutureTask

1.说明

利用FutrueTask, 如果get到结果则返回,如果正在计算则利用FutureTask的特性阻塞。否则计算。
(2)代码

package memory;

import org.slf4j.Logger;

import java.util.Map;
import java.util.concurrent.*;

import static memory.ErrorHandler.launderThrowable;

public class MemoryThird<K, V> implements Computable<K, V> {
    private final Map<K, Future<V>> cache = new ConcurrentHashMap<>();
    private final Computable<K, V> c;

    public MemoryThird(Computable<K, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final K arg) throws InterruptedException {
        Future<V> f = cache.get(arg);
        if (f == null) {
            Callable<V> eval = new Callable<V>() {
                @Override
                public V call() throws Exception {
                    return c.compute(arg);
                }
            };

            FutureTask<V> ft = new FutureTask<>(eval);
            f = ft;
            cache.put(arg, ft);
            ft.run(); // start compute
        }
        try {
            return f.get();
        } catch (ExecutionException e) {
            throw launderThrowable(e.getCause());
        }
    }
}

(3)缺点

只有一个缺陷,仍然存在两个线程计算出相同值的漏洞。就是由于compute方法中的if代码块仍然是非原子的“先检查,再执行”,因此仍然有可能两个线程在同一时间计算一个不存在的arg。原因是get方法和put方法是对底层的Map操作,所以无法保证原子性。由于cache里面的是future而不是真正的值,所以将有可能导致缓存污染(cache pollution)问题,即如果某个计算过程被取消或者失败,那么缓存存入的Future是有缺陷的。

4.最终方案:

  1. 说明:使用putIfAbsent代替put,以保证原子性。如果发现Future计算被取消或失败则删除,从而缓存不会消耗过多内存。

  2. 代码:

package memory;

import java.util.Map;
import java.util.concurrent.*;

import static memory.ErrorHandler.launderThrowable;

public class Memory<K, V> implements Computable<K, V> {
    private Map<K, Future<V>> cache = new ConcurrentHashMap<>();
    private Computable<K, V> c;

    public Memory(Computable<K, V> c) {
        this.c = c;
    }

    @Override
    public V compute(K arg) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(arg);

            if (f == null) {
                Callable<V> eval = new Callable<V>() {
                    @Override
                    public V call() throws Exception {
                        return c.compute(arg);
                    }
                };

                FutureTask<V> ft = new FutureTask<>(eval);

                f = cache.putIfAbsent(arg, ft); //double check
                if (f == null) {
                    f = ft;
                    ft.run(); //start compute
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(arg);
            } catch (ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    }
}
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐