java 中counter什么意思_java容器中的几种计数方法浅谈
本文讨论java集合容器中的几种元素数量获取的方式,命题很小,但是也足以让我们思考一些东西。所谓计数:即是给出所在容器的元素总数的方式。一般能想到的就是两种方式:一是使用某个字段直接存储该计数值,二是在请求计数值时临时去计算所有元素数量。貌似本文的答案已经出来了。好吧,那我们还是从源码的角度来验证下想法吧:一般在java的集合容器中,可以分为普通容器和并发容器,我们就姑且以这种方式来划分下,验证下
本文讨论java集合容器中的几种元素数量获取的方式,命题很小,但是也足以让我们思考一些东西。
所谓计数:即是给出所在容器的元素总数的方式。一般能想到的就是两种方式:一是使用某个字段直接存储该计数值,二是在请求计数值时临时去计算所有元素数量。貌似本文的答案已经出来了。好吧,那我们还是从源码的角度来验证下想法吧:
一般在java的集合容器中,可以分为普通容器和并发容器,我们就姑且以这种方式来划分下,验证下其实现计数的方式吧!
1. 普通容器 --HashMap
一般非并发容器在进行增删改时,都会同时维护一个count值,如hashmap中的实现:
//HashMap 增加和修改都在此实现
/*** Implements Map.put and related methods
*
*@paramhash hash for key
*@paramkey the key
*@paramvalue the value to put
*@paramonlyIfAbsent if true, don't change existing value
*@paramevict if false, the table is in creation mode.
*@returnprevious value, or null if none*/
final V putVal(int hash, K key, V value, booleanonlyIfAbsent,booleanevict) {
Node[] tab; Node p; intn, i;if ((tab = table) == null || (n = tab.length) == 0)
n= (tab =resize()).length;if ((p = tab[i = (n - 1) & hash]) == null)
tab[i]= newNode(hash, key, value, null);else{
Nodee; K k;if (p.hash == hash &&((k= p.key) == key || (key != null &&key.equals(k))))
e=p;else if (p instanceofTreeNode)
e= ((TreeNode)p).putTreeVal(this, tab, hash, key, value);else{for (int binCount = 0; ; ++binCount) {if ((e = p.next) == null) {
p.next= newNode(hash, key, value, null);if (binCount >= TREEIFY_THRESHOLD - 1) //-1 for 1st
treeifyBin(tab, hash);break;
}if (e.hash == hash &&((k= e.key) == key || (key != null &&key.equals(k))))break;
p=e;
}
}if (e != null) { //existing mapping for key
V oldValue =e.value;if (!onlyIfAbsent || oldValue == null)
e.value=value;
afterNodeAccess(e);returnoldValue;
}
}++modCount;//直接对size进行增加1即可, 如果是更新key的值,则不会运行到此处,即不会进行相加
if (++size >threshold)
resize();
afterNodeInsertion(evict);return null;
}//删除元素的实现,同时维护 size 大小
/*** Implements Map.remove and related methods
*
*@paramhash hash for key
*@paramkey the key
*@paramvalue the value to match if matchValue, else ignored
*@parammatchValue if true only remove if value is equal
*@parammovable if false do not move other nodes while removing
*@returnthe node, or null if none*/
final Node removeNode(inthash, Object key, Object value,boolean matchValue, booleanmovable) {
Node[] tab; Node p; intn, index;if ((tab = table) != null && (n = tab.length) > 0 &&(p= tab[index = (n - 1) & hash]) != null) {
Node node = null, e; K k; V v;//先查找node所在的位置
if (p.hash == hash &&((k= p.key) == key || (key != null &&key.equals(k))))
node=p;else if ((e = p.next) != null) {if (p instanceofTreeNode)
node= ((TreeNode)p).getTreeNode(hash, key);else{do{if (e.hash == hash &&((k= e.key) == key ||(key!= null &&key.equals(k)))) {
node=e;break;
}
p=e;
}while ((e = e.next) != null);
}
}if (node != null && (!matchValue || (v = node.value) == value ||(value!= null &&value.equals(v)))) {if (node instanceofTreeNode)
((TreeNode)node).removeTreeNode(this, tab, movable);else if (node ==p)
tab[index]=node.next;elsep.next=node.next;++modCount;//直接减小size即可
--size;
afterNodeRemoval(node);returnnode;
}
}return null;
}
因为有了增删改时对计数器的维护,所以在想要获取总数时,就容易许多了。只需把size字段返回即可。
//HashMap.size()
/*** Returns the number of key-value mappings in this map.
*
*@returnthe number of key-value mappings in this map*/
public intsize() {returnsize;
}
所以,在这种情况下,获取计数值的方式非常简单。但是不管怎么样,size字段对外部是不可见的,因为它是容器内部的一个实现逻辑,它完全在将来的某个时刻改变掉。即 size() != size .
2. 普通容器 --LinkedList
看完hash类的计数实现,咱们再来看另外一个类型的容器LinkedList:
//LinkedList.add(E) 添加一个元素
public booleanadd(E e) {
linkLast(e);return true;
}/*** Links e as last element.*/
voidlinkLast(E e) {final Node l =last;final Node newNode = new Node<>(l, e, null);
last=newNode;if (l == null)
first=newNode;elsel.next=newNode;//同样,直接使用一个 size 计数器统计即可
size++;
modCount++;
}//删除一个元素, 同时维护 size 字段
publicE removeFirst() {final Node f =first;if (f == null)throw newNoSuchElementException();returnunlinkFirst(f);
}/*** Unlinks non-null first node f.*/
private E unlinkFirst(Nodef) {//assert f == first && f != null;
final E element =f.item;final Node next =f.next;
f.item= null;
f.next= null; //help GC
first =next;if (next == null)
last= null;elsenext.prev= null;//元素计数减1
size--;
modCount++;returnelement;
}//同样,统计元素数量时,直接返回size即可
public intsize() {returnsize;
}
可见,LinkedList 也同样是简单地维护一个计数器字段,从而实现了高效地计数方法。而这简单地实现,则是基于单线程的访问的,它同时维护一个计数字段,基本没有多少开销,却给取值时带来了便利。
总结: 普通容器直接维护一个计数器字段,可以很方便地进行大小统计操作。
3. 并发容器 --ConcurrentHashMap
而对于并发容器,则可能会不一样些,但也有一些情况是一样的。比较,HashTable, 直接使用 synchronized 来保证线程安全,则它也同样可以直接使用一个size即可完成元素大小的统计。事实上,有些版本的HashTable仅仅是在HashMap的上面加上了synchronizd锁而已(有些版本则是 不一样的哦),细节咱们无需再看。
而稍微有点不一样的如: ConcurrentHashMap.size(), 早期的 ConcurrentHashMap 使用分段锁,则需要统计各segement的元素,相加起来然后得到整体元素大小. 而jdk1.8中,已经放弃使用分段锁来实现高性能安全的hash容器了,而是直接使用 synchronized + CAS + 红黑树 实现. 那么,我们来看看其实现元素统计这一功能的实现有何不同吧!
//ConcurrentHashMap.putVal() 新增或修改一个元素
/**Implementation for put and putIfAbsent*/
final V putVal(K key, V value, booleanonlyIfAbsent) {if (key == null || value == null) throw newNullPointerException();int hash =spread(key.hashCode());int binCount = 0;for (Node[] tab =table;;) {
Node f; intn, i, fh;if (tab == null || (n = tab.length) == 0)
tab=initTable();else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node(hash, key, value, null)))break; //no lock when adding to empty bin
}else if ((fh = f.hash) ==MOVED)
tab=helpTransfer(tab, f);else{
V oldVal= null;synchronized(f) {if (tabAt(tab, i) ==f) {if (fh >= 0) {
binCount= 1;for (Node e = f;; ++binCount) {
K ek;if (e.hash == hash &&((ek= e.key) == key ||(ek!= null &&key.equals(ek)))) {
oldVal=e.val;if (!onlyIfAbsent)
e.val=value;break;
}
Node pred =e;if ((e = e.next) == null) {
pred.next= new Node(hash, key,
value,null);break;
}
}
}else if (f instanceofTreeBin) {
Nodep;
binCount= 2;if ((p = ((TreeBin)f).putTreeVal(hash, key,
value))!= null) {
oldVal=p.val;if (!onlyIfAbsent)
p.val=value;
}
}
}
}if (binCount != 0) {if (binCount >=TREEIFY_THRESHOLD)
treeifyBin(tab, i);if (oldVal != null)returnoldVal;break;
}
}
}//主要是在进行新增成功时,再进行计数器的操作, 看起来不是 ++size 这么简单了
addCount(1L, binCount);return null;
}//这个计数的相加看起来相当复杂
/*** Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
*@paramx the count to add
*@paramcheck if <0, don't check resize, if <= 1 only check if uncontended*/
private final void addCount(long x, intcheck) {
CounterCell[] as;longb, s;//使用 CounterCell 来实现计数操作//使用 CAS 保证更新计数时只会有一个线程成功
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b +x)) {
CounterCell a;long v; intm;boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||
//使用一个类似随机负载均衡的方式,将计数值随机添加到 CounterCell 的某个值下面,减少多线程竞争的可能性
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
//通过cas将计数值x添加到 CounterCell 的 value 字段中
!(uncontended =U.compareAndSwapLong(a, CELLVALUE, v= a.value, v +x))) {//如果上面添加失败,则使用 fullAddCount 进行重新添加该计数
fullAddCount(x, uncontended);return;
}if (check <= 1)return;//基于 CounterCell 做一此汇总操作
s =sumCount();
}//在进行put值时, check的值都是大于等于0的
if (check >= 0) {
Node[] tab, nt; intn, sc;//rehash 处理
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n= tab.length) >> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc== rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex<= 0)break;if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs<< RESIZE_STAMP_SHIFT) + 2))//辅助进行hash扩容
transfer(tab, null);
s=sumCount();
}
}
}//fullAddCount 比较复杂, 它的目的是为了保证多线程可以快速进行添加完成, 目标很简单, 即向数组 CounterCell 中添加一个值 x//See LongAdder version for explanation
private final void fullAddCount(long x, booleanwasUncontended) {inth;if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();//force initialization
h =ThreadLocalRandom.getProbe();
wasUncontended= true;
}boolean collide = false; //True if last slot nonempty
for(;;) {
CounterCell[] as; CounterCell a;int n; longv;if ((as = counterCells) != null && (n = as.length) > 0) {if ((a = as[(n - 1) & h]) == null) {if (cellsBusy == 0) { //Try to attach new Cell
CounterCell r = new CounterCell(x); //Optimistic create
if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean created = false;try { //Recheck under lock
CounterCell[] rs; intm, j;if ((rs = counterCells) != null &&(m= rs.length) > 0 &&rs[j= (m - 1) & h] == null) {
rs[j]=r;
created= true;
}
}finally{
cellsBusy= 0;
}if(created)break;continue; //Slot is now non-empty
}
}
collide= false;
}else if (!wasUncontended) //CAS already known to fail
wasUncontended = true; //Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v +x))break;else if (counterCells != as || n >=NCPU)
collide= false; //At max size or stale
else if (!collide)
collide= true;else if (cellsBusy == 0 &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {try{if (counterCells == as) {//Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];for (int i = 0; i < n; ++i)
rs[i]=as[i];
counterCells=rs;
}
}finally{
cellsBusy= 0;
}
collide= false;continue; //Retry with expanded table
}
h=ThreadLocalRandom.advanceProbe(h);
}else if (cellsBusy == 0 && counterCells == as &&U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {boolean init = false;try { //Initialize table
if (counterCells ==as) {
CounterCell[] rs= new CounterCell[2];
rs[h& 1] = newCounterCell(x);
counterCells=rs;
init= true;
}
}finally{
cellsBusy= 0;
}if(init)break;
}else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v +x))break; //Fall back on using base
}
}//ConcurrentHashMap.remove 删除元素
/*** Implementation for the four public remove/replace methods:
* Replaces node value with v, conditional upon match of cv if
* non-null. If resulting value is null, delete.*/
finalV replaceNode(Object key, V value, Object cv) {int hash =spread(key.hashCode());for (Node[] tab =table;;) {
Node f; intn, i, fh;if (tab == null || (n = tab.length) == 0 ||(f= tabAt(tab, i = (n - 1) & hash)) == null)break;else if ((fh = f.hash) ==MOVED)
tab=helpTransfer(tab, f);else{
V oldVal= null;boolean validated = false;synchronized(f) {if (tabAt(tab, i) ==f) {if (fh >= 0) {
validated= true;for (Node e = f, pred = null;;) {
K ek;if (e.hash == hash &&((ek= e.key) == key ||(ek!= null &&key.equals(ek)))) {
V ev=e.val;if (cv == null || cv == ev ||(ev!= null &&cv.equals(ev))) {
oldVal=ev;if (value != null)
e.val=value;//删除元素
else if (pred != null)
pred.next=e.next;elsesetTabAt(tab, i, e.next);
}break;
}
pred=e;if ((e = e.next) == null)break;
}
}else if (f instanceofTreeBin) {
validated= true;
TreeBin t = (TreeBin)f;
TreeNoder, p;if ((r = t.root) != null &&(p= r.findTreeNode(hash, key, null)) != null) {
V pv=p.val;if (cv == null || cv == pv ||(pv!= null &&cv.equals(pv))) {
oldVal=pv;if (value != null)
p.val=value;//删除元素
else if(t.removeTreeNode(p))
setTabAt(tab, i, untreeify(t.first));
}
}
}
}
}if(validated) {if (oldVal != null) {//value = null, 代表需要将元素删除,所以需要对计数器做减1操作
if (value == null)
addCount(-1L, -1);returnoldVal;
}break;
}
}
}return null;
}
同样是由于在增删时,维护一个计数器(CounterCell数组), 所以对于返回计数值操作则会比较简单化:
//ConcurrentHashMap.size()
public intsize() {long n =sumCount();return ((n < 0L) ? 0:
(n> (long)Integer.MAX_VALUE) ?Integer.MAX_VALUE :
(int)n);
}//直接将 CounterCell 中的值相加起来即可
final longsumCount() {
CounterCell[] as=counterCells; CounterCell a;long sum =baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)
sum+=a.value;
}
}returnsum;
}
虽然ConcurrentHash的元素本身没有使用分段式存储了,但是其计数值还是存在了多个 CounterCell 中,目的自然是为了减少多线程竞争对计数器的更新成性能瓶颈。在进行 size() 计数时,并未有上锁操作,整个 CounterCell 使用 volatile 修饰,保证其可见性,但是整个size 却是不保证绝对准确的哦。
4. 并发容器 --ArrayBlockingQueue
下面我们再来看看另一各类型的并发容器: ArrayBlockingQueue
//ArrayBlockingQueue.offer()
/*** Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@codetrue} upon success and {@codefalse} if this queue
* is full. This method is generally preferable to method {@link#add},
* which can fail to insert an element only by throwing an exception.
*
*@throwsNullPointerException if the specified element is null*/
public booleanoffer(E e) {
checkNotNull(e);final ReentrantLock lock = this.lock;//直接上锁操作
lock.lock();try{if (count ==items.length)return false;else{//进行入队操作
enqueue(e);return true;
}
}finally{
lock.unlock();
}
}/*** Inserts element at current put position, advances, and signals.
* Call only when holding lock.*/
private voidenqueue(E x) {//assert lock.getHoldCount() == 1;//assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex]=x;if (++putIndex ==items.length)
putIndex= 0;//同样,它还是通过一个 count 的计数器完成统计工作
count++;
notEmpty.signal();
}//移除动作时,也需要维护 count 的值
/*** Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.*/
void removeAt(final intremoveIndex) {//assert lock.getHoldCount() == 1;//assert items[removeIndex] != null;//assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;if (removeIndex ==takeIndex) {//removing front item; just advance
items[takeIndex] = null;if (++takeIndex ==items.length)
takeIndex= 0;//移除成功, 将计数器减1
count--;if (itrs != null)
itrs.elementDequeued();
}else{//an "interior" remove//slide over all others up through putIndex.//通过轮询的方式, 必然有一个元素被删除
final int putIndex = this.putIndex;for (int i =removeIndex;;) {int next = i + 1;if (next ==items.length)
next= 0;if (next !=putIndex) {
items[i]=items[next];
i=next;
}else{
items[i]= null;this.putIndex =i;break;
}//计数器相减
count--;if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}
同样是维护了一个计数器,但是因为有上锁机制的保证,整个过程看起来就简单了许多。在获取元素大小时,自然也就简单了.
//ArrayBlockingQueue.size()
/*** Returns the number of elements in this queue.
*
*@returnthe number of elements in this queue*/
public intsize() {final ReentrantLock lock = this.lock;
lock.lock();try{returncount;
}finally{
lock.unlock();
}
}
但是它为了保证结果的准确性,在计数时,同样进行了上锁操作。可见,并发容器的实现思路也基本一致.并无太多奇淫技巧. 咱们再来看一下并发容器的实现: CopyOnWriteArrayList
5. 并发容器 --CopyOnWriteArrayList
顾名思义,是在写操作的时候,使用复制方式进行实现。
//CopyOnWriteArrayList.add()
/*** Appends the specified element to the end of this list.
*
*@parame element to be appended to this list
*@return{@codetrue} (as specified by {@linkCollection#add})*/
public booleanadd(E e) {final ReentrantLock lock = this.lock;//同样上锁保证线程安全
lock.lock();try{
Object[] elements=getArray();int len =elements.length;//将元素copy出来, 但其并非维护一个len字段
Object[] newElements = Arrays.copyOf(elements, len + 1);
newElements[len]=e;
setArray(newElements);return true;
}finally{
lock.unlock();
}
}//CopyOnWriteArrayList, 删除一个字段, 同其名称一样, 还是使用写时复制实现
public E remove(intindex) {final ReentrantLock lock = this.lock;
lock.lock();try{
Object[] elements=getArray();int len =elements.length;
E oldValue=get(elements, index);int numMoved = len - index - 1;if (numMoved == 0)
setArray(Arrays.copyOf(elements, len- 1));else{//找到移除的字段位置, 依次复制其前后元素到新数组中,完成功能
Object[] newElements = new Object[len - 1];
System.arraycopy(elements,0, newElements, 0, index);
System.arraycopy(elements, index+ 1, newElements, index,
numMoved);
setArray(newElements);
}returnoldValue;
}finally{
lock.unlock();
}
}//CopyOnWriteArrayList.size(), 直接使用数组长度字段
/*** Returns the number of elements in this list.
*
*@returnthe number of elements in this list*/
public intsize() {//获取元素大小时,直接获取所有元素,取数组的长度即可. 借用jvm提供的数组长度元信息实现
returngetArray().length;
}/*** Gets the array. Non-private so as to also be accessible
* from CopyOnWriteArraySet class.*/
finalObject[] getArray() {//该array字段一定是要保证可见性的, 即至少得是 volatile 修饰的数据
returnarray;
}
CopyOnWriteArrayList, 因为其语义决定,其在一定程度上是线程安全的,所以,在读操作时,就不需要上锁,从而性能在某些场景会比较好。
根据功能特性的不同, CopyOnWriteArrayList 采用了一个不同实现方式, 实现了元素的统计功能. 另外像 SynchronousQueue#size, 则永久返回0, 因为它的定义是当被放一个元素后,必须等到有线程消费之后才可返回,而其本身并不存储元素. 所以, 虽然元素计数道理比较简单通用, 但是还是要按照具体的场景进行相应的实现, 才能满足具体的需求. 即不可脱离场景谈技术.
6. 更多计数
类似数据库类的产品,同样的这样的计数刚性需求,各自实现方式也有不同,但大体思路也差不多。比如 redis 的计数使用在计数时临时遍历元素实现,mysql myisam 引擎使用一个表级的计数器等等。
更多推荐
所有评论(0)