本文讨论java集合容器中的几种元素数量获取的方式,命题很小,但是也足以让我们思考一些东西。

所谓计数:即是给出所在容器的元素总数的方式。一般能想到的就是两种方式:一是使用某个字段直接存储该计数值,二是在请求计数值时临时去计算所有元素数量。貌似本文的答案已经出来了。好吧,那我们还是从源码的角度来验证下想法吧:

一般在java的集合容器中,可以分为普通容器和并发容器,我们就姑且以这种方式来划分下,验证下其实现计数的方式吧!

1. 普通容器 --HashMap

一般非并发容器在进行增删改时,都会同时维护一个count值,如hashmap中的实现:

//HashMap 增加和修改都在此实现

/*** Implements Map.put and related methods

*

*@paramhash hash for key

*@paramkey the key

*@paramvalue the value to put

*@paramonlyIfAbsent if true, don't change existing value

*@paramevict if false, the table is in creation mode.

*@returnprevious value, or null if none*/

final V putVal(int hash, K key, V value, booleanonlyIfAbsent,booleanevict) {

Node[] tab; Node p; intn, i;if ((tab = table) == null || (n = tab.length) == 0)

n= (tab =resize()).length;if ((p = tab[i = (n - 1) & hash]) == null)

tab[i]= newNode(hash, key, value, null);else{

Nodee; K k;if (p.hash == hash &&((k= p.key) == key || (key != null &&key.equals(k))))

e=p;else if (p instanceofTreeNode)

e= ((TreeNode)p).putTreeVal(this, tab, hash, key, value);else{for (int binCount = 0; ; ++binCount) {if ((e = p.next) == null) {

p.next= newNode(hash, key, value, null);if (binCount >= TREEIFY_THRESHOLD - 1) //-1 for 1st

treeifyBin(tab, hash);break;

}if (e.hash == hash &&((k= e.key) == key || (key != null &&key.equals(k))))break;

p=e;

}

}if (e != null) { //existing mapping for key

V oldValue =e.value;if (!onlyIfAbsent || oldValue == null)

e.value=value;

afterNodeAccess(e);returnoldValue;

}

}++modCount;//直接对size进行增加1即可, 如果是更新key的值,则不会运行到此处,即不会进行相加

if (++size >threshold)

resize();

afterNodeInsertion(evict);return null;

}//删除元素的实现,同时维护 size 大小

/*** Implements Map.remove and related methods

*

*@paramhash hash for key

*@paramkey the key

*@paramvalue the value to match if matchValue, else ignored

*@parammatchValue if true only remove if value is equal

*@parammovable if false do not move other nodes while removing

*@returnthe node, or null if none*/

final Node removeNode(inthash, Object key, Object value,boolean matchValue, booleanmovable) {

Node[] tab; Node p; intn, index;if ((tab = table) != null && (n = tab.length) > 0 &&(p= tab[index = (n - 1) & hash]) != null) {

Node node = null, e; K k; V v;//先查找node所在的位置

if (p.hash == hash &&((k= p.key) == key || (key != null &&key.equals(k))))

node=p;else if ((e = p.next) != null) {if (p instanceofTreeNode)

node= ((TreeNode)p).getTreeNode(hash, key);else{do{if (e.hash == hash &&((k= e.key) == key ||(key!= null &&key.equals(k)))) {

node=e;break;

}

p=e;

}while ((e = e.next) != null);

}

}if (node != null && (!matchValue || (v = node.value) == value ||(value!= null &&value.equals(v)))) {if (node instanceofTreeNode)

((TreeNode)node).removeTreeNode(this, tab, movable);else if (node ==p)

tab[index]=node.next;elsep.next=node.next;++modCount;//直接减小size即可

--size;

afterNodeRemoval(node);returnnode;

}

}return null;

}

因为有了增删改时对计数器的维护,所以在想要获取总数时,就容易许多了。只需把size字段返回即可。

//HashMap.size()

/*** Returns the number of key-value mappings in this map.

*

*@returnthe number of key-value mappings in this map*/

public intsize() {returnsize;

}

所以,在这种情况下,获取计数值的方式非常简单。但是不管怎么样,size字段对外部是不可见的,因为它是容器内部的一个实现逻辑,它完全在将来的某个时刻改变掉。即 size() != size .

2. 普通容器 --LinkedList

看完hash类的计数实现,咱们再来看另外一个类型的容器LinkedList:

//LinkedList.add(E) 添加一个元素

public booleanadd(E e) {

linkLast(e);return true;

}/*** Links e as last element.*/

voidlinkLast(E e) {final Node l =last;final Node newNode = new Node<>(l, e, null);

last=newNode;if (l == null)

first=newNode;elsel.next=newNode;//同样,直接使用一个 size 计数器统计即可

size++;

modCount++;

}//删除一个元素, 同时维护 size 字段

publicE removeFirst() {final Node f =first;if (f == null)throw newNoSuchElementException();returnunlinkFirst(f);

}/*** Unlinks non-null first node f.*/

private E unlinkFirst(Nodef) {//assert f == first && f != null;

final E element =f.item;final Node next =f.next;

f.item= null;

f.next= null; //help GC

first =next;if (next == null)

last= null;elsenext.prev= null;//元素计数减1

size--;

modCount++;returnelement;

}//同样,统计元素数量时,直接返回size即可

public intsize() {returnsize;

}

可见,LinkedList 也同样是简单地维护一个计数器字段,从而实现了高效地计数方法。而这简单地实现,则是基于单线程的访问的,它同时维护一个计数字段,基本没有多少开销,却给取值时带来了便利。

总结: 普通容器直接维护一个计数器字段,可以很方便地进行大小统计操作。

3. 并发容器 --ConcurrentHashMap

而对于并发容器,则可能会不一样些,但也有一些情况是一样的。比较,HashTable, 直接使用 synchronized 来保证线程安全,则它也同样可以直接使用一个size即可完成元素大小的统计。事实上,有些版本的HashTable仅仅是在HashMap的上面加上了synchronizd锁而已(有些版本则是 不一样的哦),细节咱们无需再看。

而稍微有点不一样的如: ConcurrentHashMap.size(), 早期的 ConcurrentHashMap 使用分段锁,则需要统计各segement的元素,相加起来然后得到整体元素大小. 而jdk1.8中,已经放弃使用分段锁来实现高性能安全的hash容器了,而是直接使用 synchronized + CAS + 红黑树 实现. 那么,我们来看看其实现元素统计这一功能的实现有何不同吧!

//ConcurrentHashMap.putVal() 新增或修改一个元素

/**Implementation for put and putIfAbsent*/

final V putVal(K key, V value, booleanonlyIfAbsent) {if (key == null || value == null) throw newNullPointerException();int hash =spread(key.hashCode());int binCount = 0;for (Node[] tab =table;;) {

Node f; intn, i, fh;if (tab == null || (n = tab.length) == 0)

tab=initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node(hash, key, value, null)))break; //no lock when adding to empty bin

}else if ((fh = f.hash) ==MOVED)

tab=helpTransfer(tab, f);else{

V oldVal= null;synchronized(f) {if (tabAt(tab, i) ==f) {if (fh >= 0) {

binCount= 1;for (Node e = f;; ++binCount) {

K ek;if (e.hash == hash &&((ek= e.key) == key ||(ek!= null &&key.equals(ek)))) {

oldVal=e.val;if (!onlyIfAbsent)

e.val=value;break;

}

Node pred =e;if ((e = e.next) == null) {

pred.next= new Node(hash, key,

value,null);break;

}

}

}else if (f instanceofTreeBin) {

Nodep;

binCount= 2;if ((p = ((TreeBin)f).putTreeVal(hash, key,

value))!= null) {

oldVal=p.val;if (!onlyIfAbsent)

p.val=value;

}

}

}

}if (binCount != 0) {if (binCount >=TREEIFY_THRESHOLD)

treeifyBin(tab, i);if (oldVal != null)returnoldVal;break;

}

}

}//主要是在进行新增成功时,再进行计数器的操作, 看起来不是 ++size 这么简单了

addCount(1L, binCount);return null;

}//这个计数的相加看起来相当复杂

/*** Adds to count, and if table is too small and not already

* resizing, initiates transfer. If already resizing, helps

* perform transfer if work is available. Rechecks occupancy

* after a transfer to see if another resize is already needed

* because resizings are lagging additions.

*

*@paramx the count to add

*@paramcheck if <0, don't check resize, if <= 1 only check if uncontended*/

private final void addCount(long x, intcheck) {

CounterCell[] as;longb, s;//使用 CounterCell 来实现计数操作//使用 CAS 保证更新计数时只会有一个线程成功

if ((as = counterCells) != null ||

!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b +x)) {

CounterCell a;long v; intm;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||

//使用一个类似随机负载均衡的方式,将计数值随机添加到 CounterCell 的某个值下面,减少多线程竞争的可能性

(a = as[ThreadLocalRandom.getProbe() & m]) == null ||

//通过cas将计数值x添加到 CounterCell 的 value 字段中

!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v= a.value, v +x))) {//如果上面添加失败,则使用 fullAddCount 进行重新添加该计数

fullAddCount(x, uncontended);return;

}if (check <= 1)return;//基于 CounterCell 做一此汇总操作

s =sumCount();

}//在进行put值时, check的值都是大于等于0的

if (check >= 0) {

Node[] tab, nt; intn, sc;//rehash 处理

while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n= tab.length) >> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc== rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex<= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))

transfer(tab, nt);

}else if (U.compareAndSwapInt(this, SIZECTL, sc,

(rs<< RESIZE_STAMP_SHIFT) + 2))//辅助进行hash扩容

transfer(tab, null);

s=sumCount();

}

}

}//fullAddCount 比较复杂, 它的目的是为了保证多线程可以快速进行添加完成, 目标很简单, 即向数组 CounterCell 中添加一个值 x//See LongAdder version for explanation

private final void fullAddCount(long x, booleanwasUncontended) {inth;if ((h = ThreadLocalRandom.getProbe()) == 0) {

ThreadLocalRandom.localInit();//force initialization

h =ThreadLocalRandom.getProbe();

wasUncontended= true;

}boolean collide = false; //True if last slot nonempty

for(;;) {

CounterCell[] as; CounterCell a;int n; longv;if ((as = counterCells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { //Try to attach new Cell

CounterCell r = new CounterCell(x); //Optimistic create

if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { //Recheck under lock

CounterCell[] rs; intm, j;if ((rs = counterCells) != null &&(m= rs.length) > 0 &&rs[j= (m - 1) & h] == null) {

rs[j]=r;

created= true;

}

}finally{

cellsBusy= 0;

}if(created)break;continue; //Slot is now non-empty

}

}

collide= false;

}else if (!wasUncontended) //CAS already known to fail

wasUncontended = true; //Continue after rehash

else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v +x))break;else if (counterCells != as || n >=NCPU)

collide= false; //At max size or stale

else if (!collide)

collide= true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try{if (counterCells == as) {//Expand table unless stale

CounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)

rs[i]=as[i];

counterCells=rs;

}

}finally{

cellsBusy= 0;

}

collide= false;continue; //Retry with expanded table

}

h=ThreadLocalRandom.advanceProbe(h);

}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { //Initialize table

if (counterCells ==as) {

CounterCell[] rs= new CounterCell[2];

rs[h& 1] = newCounterCell(x);

counterCells=rs;

init= true;

}

}finally{

cellsBusy= 0;

}if(init)break;

}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v +x))break; //Fall back on using base

}

}//ConcurrentHashMap.remove 删除元素

/*** Implementation for the four public remove/replace methods:

* Replaces node value with v, conditional upon match of cv if

* non-null. If resulting value is null, delete.*/

finalV replaceNode(Object key, V value, Object cv) {int hash =spread(key.hashCode());for (Node[] tab =table;;) {

Node f; intn, i, fh;if (tab == null || (n = tab.length) == 0 ||(f= tabAt(tab, i = (n - 1) & hash)) == null)break;else if ((fh = f.hash) ==MOVED)

tab=helpTransfer(tab, f);else{

V oldVal= null;boolean validated = false;synchronized(f) {if (tabAt(tab, i) ==f) {if (fh >= 0) {

validated= true;for (Node e = f, pred = null;;) {

K ek;if (e.hash == hash &&((ek= e.key) == key ||(ek!= null &&key.equals(ek)))) {

V ev=e.val;if (cv == null || cv == ev ||(ev!= null &&cv.equals(ev))) {

oldVal=ev;if (value != null)

e.val=value;//删除元素

else if (pred != null)

pred.next=e.next;elsesetTabAt(tab, i, e.next);

}break;

}

pred=e;if ((e = e.next) == null)break;

}

}else if (f instanceofTreeBin) {

validated= true;

TreeBin t = (TreeBin)f;

TreeNoder, p;if ((r = t.root) != null &&(p= r.findTreeNode(hash, key, null)) != null) {

V pv=p.val;if (cv == null || cv == pv ||(pv!= null &&cv.equals(pv))) {

oldVal=pv;if (value != null)

p.val=value;//删除元素

else if(t.removeTreeNode(p))

setTabAt(tab, i, untreeify(t.first));

}

}

}

}

}if(validated) {if (oldVal != null) {//value = null, 代表需要将元素删除,所以需要对计数器做减1操作

if (value == null)

addCount(-1L, -1);returnoldVal;

}break;

}

}

}return null;

}

同样是由于在增删时,维护一个计数器(CounterCell数组), 所以对于返回计数值操作则会比较简单化:

//ConcurrentHashMap.size()

public intsize() {long n =sumCount();return ((n < 0L) ? 0:

(n> (long)Integer.MAX_VALUE) ?Integer.MAX_VALUE :

(int)n);

}//直接将 CounterCell 中的值相加起来即可

final longsumCount() {

CounterCell[] as=counterCells; CounterCell a;long sum =baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)

sum+=a.value;

}

}returnsum;

}

虽然ConcurrentHash的元素本身没有使用分段式存储了,但是其计数值还是存在了多个 CounterCell 中,目的自然是为了减少多线程竞争对计数器的更新成性能瓶颈。在进行 size() 计数时,并未有上锁操作,整个 CounterCell 使用 volatile 修饰,保证其可见性,但是整个size 却是不保证绝对准确的哦。

4. 并发容器 --ArrayBlockingQueue

下面我们再来看看另一各类型的并发容器: ArrayBlockingQueue

//ArrayBlockingQueue.offer()

/*** Inserts the specified element at the tail of this queue if it is

* possible to do so immediately without exceeding the queue's capacity,

* returning {@codetrue} upon success and {@codefalse} if this queue

* is full. This method is generally preferable to method {@link#add},

* which can fail to insert an element only by throwing an exception.

*

*@throwsNullPointerException if the specified element is null*/

public booleanoffer(E e) {

checkNotNull(e);final ReentrantLock lock = this.lock;//直接上锁操作

lock.lock();try{if (count ==items.length)return false;else{//进行入队操作

enqueue(e);return true;

}

}finally{

lock.unlock();

}

}/*** Inserts element at current put position, advances, and signals.

* Call only when holding lock.*/

private voidenqueue(E x) {//assert lock.getHoldCount() == 1;//assert items[putIndex] == null;

final Object[] items = this.items;

items[putIndex]=x;if (++putIndex ==items.length)

putIndex= 0;//同样,它还是通过一个 count 的计数器完成统计工作

count++;

notEmpty.signal();

}//移除动作时,也需要维护 count 的值

/*** Deletes item at array index removeIndex.

* Utility for remove(Object) and iterator.remove.

* Call only when holding lock.*/

void removeAt(final intremoveIndex) {//assert lock.getHoldCount() == 1;//assert items[removeIndex] != null;//assert removeIndex >= 0 && removeIndex < items.length;

final Object[] items = this.items;if (removeIndex ==takeIndex) {//removing front item; just advance

items[takeIndex] = null;if (++takeIndex ==items.length)

takeIndex= 0;//移除成功, 将计数器减1

count--;if (itrs != null)

itrs.elementDequeued();

}else{//an "interior" remove//slide over all others up through putIndex.//通过轮询的方式, 必然有一个元素被删除

final int putIndex = this.putIndex;for (int i =removeIndex;;) {int next = i + 1;if (next ==items.length)

next= 0;if (next !=putIndex) {

items[i]=items[next];

i=next;

}else{

items[i]= null;this.putIndex =i;break;

}//计数器相减

count--;if (itrs != null)

itrs.removedAt(removeIndex);

}

notFull.signal();

}

同样是维护了一个计数器,但是因为有上锁机制的保证,整个过程看起来就简单了许多。在获取元素大小时,自然也就简单了.

//ArrayBlockingQueue.size()

/*** Returns the number of elements in this queue.

*

*@returnthe number of elements in this queue*/

public intsize() {final ReentrantLock lock = this.lock;

lock.lock();try{returncount;

}finally{

lock.unlock();

}

}

但是它为了保证结果的准确性,在计数时,同样进行了上锁操作。可见,并发容器的实现思路也基本一致.并无太多奇淫技巧. 咱们再来看一下并发容器的实现: CopyOnWriteArrayList

5. 并发容器 --CopyOnWriteArrayList

顾名思义,是在写操作的时候,使用复制方式进行实现。

//CopyOnWriteArrayList.add()

/*** Appends the specified element to the end of this list.

*

*@parame element to be appended to this list

*@return{@codetrue} (as specified by {@linkCollection#add})*/

public booleanadd(E e) {final ReentrantLock lock = this.lock;//同样上锁保证线程安全

lock.lock();try{

Object[] elements=getArray();int len =elements.length;//将元素copy出来, 但其并非维护一个len字段

Object[] newElements = Arrays.copyOf(elements, len + 1);

newElements[len]=e;

setArray(newElements);return true;

}finally{

lock.unlock();

}

}//CopyOnWriteArrayList, 删除一个字段, 同其名称一样, 还是使用写时复制实现

public E remove(intindex) {final ReentrantLock lock = this.lock;

lock.lock();try{

Object[] elements=getArray();int len =elements.length;

E oldValue=get(elements, index);int numMoved = len - index - 1;if (numMoved == 0)

setArray(Arrays.copyOf(elements, len- 1));else{//找到移除的字段位置, 依次复制其前后元素到新数组中,完成功能

Object[] newElements = new Object[len - 1];

System.arraycopy(elements,0, newElements, 0, index);

System.arraycopy(elements, index+ 1, newElements, index,

numMoved);

setArray(newElements);

}returnoldValue;

}finally{

lock.unlock();

}

}//CopyOnWriteArrayList.size(), 直接使用数组长度字段

/*** Returns the number of elements in this list.

*

*@returnthe number of elements in this list*/

public intsize() {//获取元素大小时,直接获取所有元素,取数组的长度即可. 借用jvm提供的数组长度元信息实现

returngetArray().length;

}/*** Gets the array. Non-private so as to also be accessible

* from CopyOnWriteArraySet class.*/

finalObject[] getArray() {//该array字段一定是要保证可见性的, 即至少得是 volatile 修饰的数据

returnarray;

}

CopyOnWriteArrayList, 因为其语义决定,其在一定程度上是线程安全的,所以,在读操作时,就不需要上锁,从而性能在某些场景会比较好。

根据功能特性的不同, CopyOnWriteArrayList 采用了一个不同实现方式, 实现了元素的统计功能. 另外像 SynchronousQueue#size, 则永久返回0, 因为它的定义是当被放一个元素后,必须等到有线程消费之后才可返回,而其本身并不存储元素. 所以, 虽然元素计数道理比较简单通用, 但是还是要按照具体的场景进行相应的实现, 才能满足具体的需求. 即不可脱离场景谈技术.

6. 更多计数

类似数据库类的产品,同样的这样的计数刚性需求,各自实现方式也有不同,但大体思路也差不多。比如 redis 的计数使用在计数时临时遍历元素实现,mysql myisam 引擎使用一个表级的计数器等等。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐