Java容器类源码分析-Map篇
基于JDK8的src.zip
对常用的容器类源码进行简单剖析
HashMap
HashMap采用了一个Node
类型(实现了Map.Entry
接口)的数组作为存放哈希表的基础数据结构,利用位运算(除留余数法的简化版,见#第三节)计算哈希地址,并且使用拉链法处理哈希冲突问题
存储结构
-
不会被序列化的哈希表数组
transient Node<K,V>[] table;
-
数组的初始化大小为16,且必须为2的幂,这是为了使用哈希函数时提高效率
/* The default initial capacity - MUST be a power of two. */ static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
-
Node是一种单向链表节点结构,同时存储哈希值、键值对和下一个具有同义哈希的节点
static class Node<K,V> implements Map.Entry<K,V> { final int hash; final K key; V value; Node<K,V> next; Node(int hash, K key, V value, Node<K,V> next) { this.hash = hash; this.key = key; this.value = value; this.next = next; } public final K getKey() { return key; } public final V getValue() { return value; } public final String toString() { return key + "=" + value; } public final int hashCode() { return Objects.hashCode(key) ^ Objects.hashCode(value); } public final V setValue(V newValue) { V oldValue = value; value = newValue; return oldValue; } public final boolean equals(Object o) { if (o == this) return true; return o instanceof Map.Entry<?, ?> e && Objects.equals(key, e.getKey()) && Objects.equals(value, e.getValue()); } }
键值对的存储流程
当调用HashMap::put
时,会使用内建的HashMap::putVal
函数
public V put(K key, V value) { return putVal(hash(key), key, value, false, true); }
final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
boolean evict) {
Node<K,V>[] tab; Node<K,V> p; int n, i;
if ((tab = table) == null || (n = tab.length) == 0)
n = (tab = resize()).length;
if ((p = tab[i = (n - 1) & hash]) == null)
tab[i] = newNode(hash, key, value, null);
else {
Node<K,V> e; K k;
if (p.hash == hash &&
((k = p.key) == key || (key != null && key.equals(k))))
e = p;
else if (p instanceof TreeNode)
e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
else {
for (int binCount = 0; ; ++binCount) {
if ((e = p.next) == null) {
p.next = newNode(hash, key, value, null);
if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
treeifyBin(tab, hash);
break;
}
if (e.hash == hash &&
((k = e.key) == key || (key != null && key.equals(k))))
break;
p = e;
}
}
if (e != null) { // existing mapping for key
V oldValue = e.value;
if (!onlyIfAbsent || oldValue == null)
e.value = value;
afterNodeAccess(e);
return oldValue;
}
}
++modCount;
if (++size > threshold)
resize();
afterNodeInsertion(evict);
return null;
}
首先计算哈希地址
存储键值对时,首先会使用hash()
函数计算键的哈希值
static final int hash(Object key) {
int h;
return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}
HashMap在存入键为null的K-V对时,会将该K-V对存放在哈希表中索引为0的桶里,否则会将键的hashCode右移16位后再和其本身做异或
在hash()
函数的JavaDoc里可以看到这么做是为了减少哈希冲突,需要结合下面的散列操作一起理解
if ((p = tab[i = (n - 1) & hash]) == null) { ... }
一般的哈希函数为除留余数法,除数为哈希表长度。假设当前哈希数组的容量为n
,它被规定为2的幂次,即 $n=2^p$ (注意n在二进制下一共有p+1
位)
取余操作如下: 让hash
去除 $n$,在二进制下商就是hash
右移 $p$ 位,被右移出去的这 $p$ 个数位组成的就是余数
因为除法操作被编译后其计算速度远不如位运算,为了提高效率HashMap用位运算代替取余操作: n-1
在二进制下是一串连续的1,可以看作是二进制掩码,将n-1
直接与hash
相与,在二进制下相当于提取了hash
的低 $p$ 位作为余数
虽然位运算取余快捷,但是产生的哈希冲突会比较多,如果直接使用存入键的hashCode作为hash
去取余,由于采用掩码操作,其高位信息相当于都被丢弃了。比如,当哈希数组初始容量为n=16
,n-1
即 $(1111)_b$ 只会保留hash
的低4位信息
hashCode为32位int类型,可能出现多个键的hashcode低p
位完全相同而高位大相径庭的情况,但它们都被存在同一个索引的桶中,性能大幅降低
因此在对键初步取哈希值时,会将其hashCode的高位右移16位到int类型的低16位上,再和其本身进行异或得到最终的哈希值hash
,这样高位信息和低位信息获得了混合,就可以减少散列冲突、增大哈希表的散列程度
(h = key.hashCode()) ^ (h >>> 16)
接着处理哈希冲突
获得哈希地址即键值对要被存放进的桶下标后
-
如果当前桶为空,则新建1个Node节点放入桶中
-
如果不为空,则在桶中已有节点的单向链表上进行顺序查找,对每个节点的key执行判等操作:使用
==
判断两者地址是否相同或者调用equals
函数继续判断两个键是否相同if (p.hash == hash && ((k = p.key) == key || (key != null && key.equals(k))))
- 如果某个已有节点的key与要插入的newKey相同,则将已有节点的value更新为newValue
- 否则采用尾插法,新建一个节点追加到该桶中链表的尾部
从JDK8开始,插入桶中链表的操作从JDK7的头插法改成了尾插法,主要是为了防止对HashMap并发插入时,如果触发了数组扩容,扩容时需要重新分配所有元素的哈希地址,可能会出现同一桶中链表里的两个节点互相指向对方导致死循环
键值对的查找流程
- 计算查找键的哈希地址,时间复杂度为$O(1)$
- 在对应桶中的单向链表上执行顺序查找,时间复杂度为$O(N)$
假设表长为$M$,键值对总数size
=$N$,在哈希表比较均匀的情况下,每个桶里链表的平均长度大约是$\frac{N}{M}$,查找的平均复杂度为$O(\frac{N}{M})$
扩容原理
HashMap的内置构造函数为
public HashMap(int initialCapacity, float loadFactor) {
if (initialCapacity < 0)
throw new IllegalArgumentException("Illegal initial capacity: " +
initialCapacity);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
if (loadFactor <= 0 || Float.isNaN(loadFactor))
throw new IllegalArgumentException("Illegal load factor: " +
loadFactor);
this.loadFactor = loadFactor;
this.threshold = tableSizeFor(initialCapacity);
}
其他构造函数包括无参和单参的,其中loadFactor
默认为0.75f
,初始容量默认为16
。HashMap允许传入一个不为2的幂次的初始容量,它会调用tableSizeFor()
函数转换到距离最近的2的幂
参数 | 含义 |
---|---|
capacity | 数组容量,为2的幂次,默认为16 |
size | 实际存储的键值对数量,也就是对外暴露的Map::size |
threshold | size到达该阈值时就会触发数组扩容,构造HashMap时会通过tableSizeFor() 计算得到,每次执行resize() 时以newThr=(int)newCap * loadFactor 进行更新 |
loadFactor | 装载因子,即哈希表能够装载的元素比例,默认为0.75f |
计算扩容后新数组的容量与阈值
由哈希表的查找流程可知,为了使$O(N/M)$尽可能小,需要哈希表的容量$M$尽可能得大。所以HashMap为了兼顾时间与空间上的效率,采用动态扩容的方式,每当存储的键值对数量size
超过阈值threshold
后便执行resize()
函数
// resize()函数里,新哈希数组的容量及其阈值的计算部分如下
final Node<K,V>[] resize() {
Node<K,V>[] oldTab = table;
int oldCap = (oldTab == null) ? 0 : oldTab.length;
int oldThr = threshold;
int newCap, newThr = 0;
if (oldCap > 0) {
if (oldCap >= MAXIMUM_CAPACITY) {
threshold = Integer.MAX_VALUE;
return oldTab;
}
else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
oldCap >= DEFAULT_INITIAL_CAPACITY)
newThr = oldThr << 1; // double threshold
}
else if (oldThr > 0) // initial capacity was placed in threshold
newCap = oldThr;
else { // zero initial threshold signifies using defaults
newCap = DEFAULT_INITIAL_CAPACITY;
newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
}
if (newThr == 0) {
float ft = (float)newCap * loadFactor;
newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
(int)ft : Integer.MAX_VALUE);
}
threshold = newThr;
...
}
如果哈希数组的旧容量oldCap
大于等于默认初始容量16,则每次扩容都是将原数组扩大到2倍,相应的,新阈值newThr
也更新为旧阈值oldThr
的2倍。在不超出最大容量上限MaximumCapacity
的其他情况下,新阈值也可以通过(int)newCap * loadFactor
得到
对旧数组中每个桶中的节点再哈希
扩容后的数组容量和阈值计算完毕后,新建1个新的数组,并遍历旧数组的每个桶,对里面存在的节点执行rehash操作:
// resize()函数里,rehash再哈希的处理部分如下
final Node<K,V>[] resize() {
...
@SuppressWarnings({"rawtypes","unchecked"})
Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
table = newTab;
if (oldTab != null) {
for (int j = 0; j < oldCap; ++j) {
Node<K,V> e;
if ((e = oldTab[j]) != null) {
oldTab[j] = null;
if (e.next == null)
newTab[e.hash & (newCap - 1)] = e;
else if (e instanceof TreeNode)
((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
else { // preserve order
Node<K,V> loHead = null, loTail = null;
Node<K,V> hiHead = null, hiTail = null;
Node<K,V> next;
do {
next = e.next;
if ((e.hash & oldCap) == 0) {
if (loTail == null)
loHead = e;
else
loTail.next = e;
loTail = e;
}
else {
if (hiTail == null)
hiHead = e;
else
hiTail.next = e;
hiTail = e;
}
} while ((e = next) != null);
if (loTail != null) {
loTail.next = null;
newTab[j] = loHead;
}
if (hiTail != null) {
hiTail.next = null;
newTab[j + oldCap] = hiHead;
}
}
}
}
}
return newTab;
}
-
如果当前桶里的节点唯一,将节点存入对应的新桶中,地址为
hash & (newCap - 1)
-
如果当前桶里的链表存在,无需每次都使用新容量去计算散列地址,因为
newCap
=$2^{p+1}$刚好为oldCap
=$2^{p}$的2倍,且它们都是2的幂,则newCap - 1
刚好比oldCap - 1
多出一个第p
位(最低位从0开始)。对每个节点判断其键hash在二进制表示中第p
位上的值,注意到oldCap
的二进制表示下只有第p
位为1,其他数位都是0,因此直接使用位运算e.hash & oldCap
就能够提取第p
位上的数值:-
如果为0,
hash & (newCap - 1) == hash & (oldCap - 1)
,散列地址不变,插入临时低位链表中(loHead
后面) -
如果为1,
hash & (newCap - 1) == hash & (oldCap - 1) + pow(2, p)
,而$2^{p}$的大小就是oldCap
的值,因此其散列地址就是原散列地址j
加上oldCap
,插入临时高位链表中(hiHead
后面) -
最后将低位链表放在散列地址不变的桶中
newTab[j] = loHead
, 高位链表放在原散列地址加上原数组容量的新桶中newTab[j + oldCap] = hiHead
再散列的过程中都是采用尾插的方式将旧节点插入到新链表的尾部,这样可以保证节点原来的有序性不被破坏
-
桶中链表转换红黑树
HashMap源码中还定义了如下变量
参数 | 默认值 | 含义 |
---|---|---|
TREEIFY_THRESHOLD | 8 | 桶中节点数超过该阈值后便将链表转换为红黑树(调用treeifyBin() ) |
UNTREEIFY_THRESHOLD | 6 | 扩容分桶时使用,原桶里的红黑树结构因为分桶导致节点数减小,小于该阈值后就将红黑树转换为链表(调用untreeify() ) |
MIN_TREEIFY_CAPACITY | 64 | treeifyBin() 中的关键阈值,只有当数组容量n 超过该阈值后后才会执行转换操作,否则只调用resize() |
默认的TREEIFY_THRESHOLD
也说明里自JDK8之后,虽然处理哈希冲突时采用时间复杂度较高的尾插法,但是只要桶中节点数超过8
且数组总容量超过64
后就会将桶中链表转换为红黑树,所以尾插法对性能的影响也不算大
但是红黑树建树还是比较消耗性能的
HashSet
你可能会有疑问,HashSet不是实现了Set
接口吗,而Set
接口继承了Collection
接口,明明跟Map
接口下的容器类八竿子打不着啊?
其实HashSet内部数据结构使用的就是HashMap,只是它暴露给外界的方法等都是Collection
和Set
等提供的公用方法
private transient HashMap<E,Object> map;
// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();
HashSet的键值对,键是放入的对象,而值是一个Dummy Value: 名为Present
的Object对象
add()
、contains()
、remove()
等方法都是调用内部的HashMap的相关方法,在上面做了一层封装罢了,凡是需要值的地方都使用PRESENT
对象代替
ConcurrentHashMap
-
JDK7旧版
JDK7的ConcurrentHashMap采用的是分段锁机制,基础数据结构
Segment
继承自可重入锁ReentrantLock
,每个分段锁都维护着几个桶HashEntry
,不同线程可以同时访问不同分段锁上的桶,使并发度更高(并发度就是Segment
的个数),默认为16 -
JDK8新版
JDK8取消了分段锁机制,与HashMap一样都是采用Node数组保存数据,但是会利用桶中节点(链表头节点或者红黑树根节点)作为锁对每个桶里的数据加锁,进一步减少并发冲突、提高并发度
对于并发的处理,JDK8版本的ConcurrentMap引入了CAS机制(Compare And Swap, 比较并交换): 某线程要修改变量$x$的值,首先它会从内存中读取$x$的值设为a
(称作期待值),再次从内存中读取$x$的值设为b
,如果a == b
,则将$x$更新为目标值,否则不进行修改。CAS的本质是一种基于冲突检测的乐观锁策略: 先进行操作,如果没有其他线程争用共享数据,那么操作就成功了,否则采取补偿措施。CAS是非阻塞同步操作,不像synchronized
和ReentrantLock
这两个悲观互斥锁需要通过阻塞其他线程的运行以实现同步
以下都是基于JDK8的ConcurrentHashMap源码进行分析
存储结构
与HashMap一样都是使用Node<K, V>
作为基础结构,维护1个数组+链表/红黑树的拉链式哈希表结构,但是在值属性和指向下一个节点的引用属性上都使用了关键字volatile
作为修饰,目的是在访问节点时实现不用加锁的无锁读
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
...
}
get无锁读
JMM(Java Memory Model, Java内存模型)如下图所示,所有变量都存于主存区域中,每个线程都拥有自己的工作内存,其中包含主内存中共享变量的副本。线程进行的所有操作都是在其对应的工作内存中进行,不能直接操作主存也不能操作其他线程的工作内存
当变量$x$没有被volatile
关键字修饰时,线程操作变量的流程如下:
如果线程1要修改$x$的值,它会从主存区将$x$拷贝到自己的工作内存中,然后更新$x$再写回主存中;如果线程2要读取$x$的值,也是去主存区将$x$拷贝进自己的工作内存中再进行读取
这样就会导致可见性问题:比如线程1修改了$x$的值但还没有写回主存,同时线程2要对$x$进行操作,此时线程1所做的修改对线程2不可见。这种因工作内存与主内存存在同步延迟的情况导致了可见性问题
当变量$x$被volatile
关键字修饰时,相当于标记变量$x$“应当在主存区进行读写操作”,基于内存屏障(Memory Barrier)实现volatile
变量的内存可见性
对一个volatile
变量进行写操作时(赋值),通过jitwatch工具可以查看其编译后得到的汇编代码,在mov
指令后还有一条lock cmpxchg
的汇编指令,在多核处理器上执行该指令时会引发两个操作:
- 将当前核中缓存(Java工作内存)行对应的数据写回到系统内存(Java主存)中
- 写回系统内存时会使其他核里缓存相同内存地址的数据无效
CPU的不同核之间为了保证缓存一致,实现了缓存一致性协议(MESI),每个核会嗅探在总线上传播的信号以检查自己缓存的数据是否过期,当它发现某个缓存行对应的内存地址被修改,就会将其设置成无效状态,下次要对这个数据执行任何操作时就会去系统内存中重新将数据读取到缓存里,保证数据是最新的变量副本
volatile关键字保证了多线程下操作的可见性,这样在调用ConcurrentHashMap::get
方法时,可以不用加锁直接进行无锁读,提高并发效率
ConcurrentHashMap::get
源码如下
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
根据给定的键key计算Hash以定位对应桶的下标,利用ConcurrentHashMap::tabAt
封装的volatile读操作,获取对应桶的头节点,如果key不匹配,则继续通过Node的volatile属性next遍历桶里链表中的所有节点,直到key匹配再去读取volatile属性val
以上的3个操作读取的都是volatile修饰的变量,在不加锁的情况下保证了其对应value的内存可见性,读操作也因此线程安全
ConcurrentHashMap::tabAt
源码如下
// Unsafe mechanics
private static final Unsafe U = Unsafe.getUnsafe();
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}
/* In jdk.internal.misc.Unsafe */
@IntrinsicCandidate
public final Object getReferenceAcquire(Object o, long offset) {
return getReferenceVolatile(o, offset);
}
/**
* Fetches a reference value from a given Java variable, with volatile
* load semantics. Otherwise identical to {@link #getReference(Object, long)}
*/
@IntrinsicCandidate
public native Object getReferenceVolatile(Object o, long offset);
put流程
put的调用过程大致分为两部分:
-
对非空桶的头节点加上互斥锁
synchronized
,再在其对应的链表上寻找键匹配的节点,对值进行替换 -
对于空桶,采用基于乐观锁策略的CAS机制往桶里添加新的头节点
put
函数调用的还是内建的putVal
函数:
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else if (onlyIfAbsent // check first node without acquiring lock
&& fh == hash
&& ((fk = f.key) == key || (fk != null && key.equals(fk)))
&& (fv = f.val) != null)
return fv;
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value);
break;
}
}
}
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
put非空桶
找到非空桶后,对其中的头节点加上互斥锁,这样其他线程就无法对该桶内的链表进行操作,后续的覆盖或者插入流程与HashMap一致
注意,仔细观察会发现源码中有以下两行:
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
这个意思是如果当前桶内节点hash值为MOVED
,说明当前节点实际上是个ForwardingNode
桥接节点,表示有其他线程正在对CHM进行扩容,所以会调用helpTransfer()
函数让当前线程也参与扩容,这样子并发扩容能够加速扩容(其实就是迁移旧哈希表中的桶到新哈希表中)所消耗的时间
详见下一节CHM扩容机制
put空桶
找到空桶后,调用casTabAt()
函数,其封装的是Unsafe类提供的native方法compareAndSetReference()
,它与volatile具有相同的内存读写语义
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
/* In jdk.internal.misc.Unsafe */
@IntrinsicCandidate
public final native boolean compareAndSetReference(Object o, long offset,
Object expected,
Object x);
compareAndSetReference()
函数会比较引用对象o在内存中的地址加上偏移offset
处的值是否等于期待值expected
,是的话则将该地址的值替换为给定对象x
的地址(相当于设置了指向对象x
的引用),并返回true
在并发环境下,Put空桶操作基于自旋锁+CAS实现:假设有两个线程都在进行put操作,它们插入的键值对相同,且桶内为空:
-
线程1执行
casTabAt(tab, i, null, node1)
后,期待值null
与主内存中tab[i]
的值相等,则将tab[i]里的内容替换为node1
对象所在的地址,并返回true。新的节点插入成功,此时返回true后执行到break
一行后结束for死循环for (Node<K, V>[] tab = table;;)
-
线程2在线程1插入键值对之前通过
tabAt
得知相同位置为空桶,但是线程2稍微落后执行casTabAt(tab, i, null, node2)
,由于期待值null
与此时tab[i]
在内存中的实际值不相同(现在tab[i]
处为node1
的引用地址),说明另有线程对tab[i]
执行了修改,继续执行for死循环,在第二次循环中执行前置的tabAt()
操作后发现tab[i]
这个桶不为空,执行put非空桶分支
扩容机制
ConcurrentHashMap支持并发扩容,且这个过程跟sizeCtl
变量息息相关,其源码中的文档如下所示:
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
根据文档可以得知,sizeCtl
为负值时:
- $-1$ 表示当前ConcurrentHashMap对象正在初始化(执行构造函数)
- $-(1 + \textrm{正在扩容的线程数})$ 表示正在扩容,并发扩容的线程数可以通过反向计算 $-\textrm{sizeCtl} - 1$ 得到
当内部用来存放桶的table
为null
(说明此时对象还没初始化),sizeCtl
存储初始的哈希表大小以备构造时使用(为0的话表示采用默认值DEFAULT_CAPACITY
),以参数为另一个Map的构造函数为例:
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this.sizeCtl = DEFAULT_CAPACITY;
putAll(m);
}
初始化完成后,sizeCtl
储存扩容阈值,用以参与判断哈希表是否需要扩容
addCount()方法
在put流程的putVal()
函数返回前,会调用addCount()
方法,增加当前CHM里的元素总数,并判断当前哈希表是否需要被扩容?如果需要扩容:
-
当前没有线程在扩容,则初始化
transfer()
方法 -
当前有其他线程正在扩容,则本线程也帮助进行扩容,执行
helpTransfer()
方法
ConcurrentHashMap::addCount
源码如下
/**
* Adds to count, and if table is too small and not already
* resizing, initiates transfer. If already resizing, helps
* perform transfer if work is available. Rechecks occupancy
* after a transfer to see if another resize is already needed
* because resizings are lagging additions.
*
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] cs; long b, s;
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();
}
}
}
putVal
函数调用addCount()
时,将put时插入后的桶中元素个数binCount
传入形参check
,控制本次addCount()
方法是否要检查需不需要扩容
方法体中,前半部分是利用CounterCell
计算当前哈希表中的总计元素个数,后半部分检查是否要执行扩容方法transfer()
,这两个部分有点复杂,所以下文分开梳理
元素计数器baseCount与CounterCell[]
addCount方法接收的第一个参数long x
表示本次添加的元素个数,尔后通过CAS操作更新baseCount
字段的值,如果本次CAS操作失败,说明有其他线程在竞争执行addCount()
方法,就会转而使用CounterCell[]
数组分摊并发情况下多个线程操作单一字段baseCount
的竞争压力
CounterCell[] cs; long b, s;
if ((cs = counterCells) != null ||
!U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell c; long v; int m;
boolean uncontended = true;
if (cs == null || (m = cs.length - 1) < 0 ||
(c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
上述代码中第3行使用Unsafe类提供的compareAndSetLong
这个CAS更新Long类型值的函数,类静态常量BASECOUNT
为字段baseCount
在CHM对象实例中的内存偏移量
CAS函数根据当前CHM对象的内存起始地址加上basecount
字段的实际偏移量BASECOUNT
,取出主内存中baseCount
的实际值与当前线程工作内存的baseCount
预期值进行比较,相符则说明没有其他线程在竞争,于是更新为新的元素个数
/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;
private static final long BASECOUNT
= U.objectFieldOffset(ConcurrentHashMap.class, "baseCount");
/**
* A padded cell for distributing counts. Adapted from LongAdder
* and Striped64. See their internal docs for explanation.
*/
private static final long CELLVALUE
= U.objectFieldOffset(CounterCell.class, "value");
@jdk.internal.vm.annotation.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
如果不相符,则本次CAS失败,设置uncontended
标志变量表示”没有竞争”这个状态,利用当前线程内置的ThreadLocalRandom
这个线程独占seed种子的随机数生成器快速产生一个随机数,尔后和当前CHM对象的counterCells[]
数组大小m=length-1
相与,得到要操作的下标i
,即CounterCell c = counterCells[i]
当前线程只要操作counterCells[i]
完成元素个数更新,最后再调用fullAddCount()
方法将counterCells[]
数组中每个CounterCell.value
累加,再加上旧baseCount
的值后更新元素总数
同时观察这部分代码可以发现,当出现线程竞争后,操作完counterCells[i]
后无视了check
扩容过程直接返回,这是因为,既然出现了线程竞争的高并发状态,说明其他线程也在执行fullAddCount()
的计数过程,如果当前线程计数完成后还继续检测是否需要扩容,就会将高并发压力延续到扩容流程里,导致压力持续增大,所以源码中选择直接return减少高并发,等到下一次某个线程在竞争不激烈的状态下完成元素个数更新,再检测是否需要扩容
[题外话] 为什么要使用Unsafe类计算偏移量呢? 难道这些字段不可以根据其类型对应的字节数按照字段出现顺序计算其起始偏移地址吗?
由于JVM对齐填充的存在(对应虚拟机选项-XX:ObjectAlignmentInBytes
,默认为8字节),它不仅作用于对象与对象之间,还存在于对象中的字段之间(以下均是针对64位JVM):
-
对象与对象之间: 任何对象的大小都是8的倍数,若不满足则进行对象填充以对齐
-
对象中的字段之间: 64位的long类型、double类型以及不开启压缩指针时的对象引用类型这三种字段的起始地址必须为8的倍数(若开启压缩指针,则对象引用类型的偏移量只需为4的倍数)
对齐填充是为了保证不出现跨CPU缓存行的字段,提高执行效率。而JVM为了做到这一点,就会使用字段重排列方法,重新分配对象中字段的先后顺序,以达到内存对齐的目的
所以如果仅按照字段出现顺序计算偏移量,会导致偏移量不准确,所以需要使用Unsafe::objectFieldOffset()
函数计算字段在同一类但是在不同JVM中的实际偏移量
另外,上面修饰CounterCell
类的注解@Contended
是用来解决对象字段之间的虚共享(False Sharing)问题:
同一对象中不同的
volatile
字段,逻辑上它们并没有共享内容,不需要同步,但是如果这两个字段恰好处于同一CPU缓存行中,那么对这些字段进行写操作就会导致缓存行写回主内存中,造成了实质物理上的共享
JVM会让被@Contended
注解的不同字段分配到不同的缓存行中以解决虚共享问题
检查是否需要扩容
addCount()
函数的第二部分执行的是扩容检查逻辑:
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
if (sc < 0) {
if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
(nt = nextTable) == null || transferIndex <= 0)
break;
if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
transfer(tab, null);
s = sumCount();
}
}
上述代码中,判断更新后的元素总个数s
是否大于当前sizeCtl
扩容阈值,如果大于阈值说明需要扩容,调用resizeStamp()
函数根据哈希表大小n = tab.length
生成一个唯一的扩容戳rs
:
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
private final int RESIZE_STAMP_BITS = 16;
首先利用Integer.numberOfLeadingZeros()
得到n
的32位Int类型二进制表示中的前导0个数,然后与二进制中第15位为1的扩容标记掩码相或,生成一个唯一的扩容戳
以n = 16
为例,调用resizeStamp(n)
有如下计算:
-
\[\textrm{leadingZeros} = [0000, 0000, 0000, 0000, 0000, 0000, 000\colorbox{yellow}{1}, \colorbox{yellow}{1}\colorbox{yellow}{1}00]\]Integer.numberOfLeadingZeros(n)
= $28$,一共有28个前导0,28的二进制表示如下: -
\[\textrm{mask} = [0000, 0000, 0000, 0000, \colorbox{red}{1}000, 0000, 0000, 0000]\]1 << (RESIZE_STAMP_BITS - 1)
: -
两者相或得到低16位的扩容戳标记:
\[\textrm{rs}_\textrm{low16bit} = [0000, 0000, 0000, 0000, \colorbox{red}{1}000, 0000, 000\colorbox{yellow}{1}, \colorbox{yellow}{1}\colorbox{yellow}{1}00]\]
最后,使用int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT
,将得到的低16位扩容戳标记左移16位,这样rs的高16位就是扩容标记:
那么低16位的作用呢?存储并行扩容时的线程数:
高 RESIZE_STAMP_BITS 位 | 低RESIZE_STAMP_BITS 位 |
---|---|
扩容标记 | 并行扩容线程个数 |
rs
的二进制表示中最高位为1,即rs是逻辑意义上的负数
然后判断sc = sizeCtl
是否小于0:
-
大于等于0:说明没有其他线程在扩容,当前线程是马上要执行扩容的第一个线程,利用CAS操作将
sizeCtl
的值更新为扩容戳rs
+ 2为什么是+2呢?因为CHM对象初始化时,
sizeCtl = -1
,即低16位的最低位为1,而各个函数中处理sizeCtl
都是以位运算方式进行的,最低位的1留作初始化标志位,所以当前线程作为第一个扩容线程其扩容戳的线程个数要从+2开始计算。这也应证了官方文档对sizeCtl
的解释,利用-sizeCtl - 1
即可获得当前正在并发扩容的线程总数 -
小于0:说明已经有其他线程正在执行扩容,利用CAS操作将
sizeCtl
的值+1,而sizeCtl
被第一个扩容线程更新为扩容戳,低16位储存的是正在并行扩容的线程个数,+1表示当前线程也将参与扩容
扩容主函数transfer()
详细源码自行定位到源码中的ConcurrentHashMap::transfer
函数查看,下面分块解释源码作用
int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
stride
为每个物理机上CPU处理的桶数量,用当前哈希表大小n
除以8(即右移3位)再除以CPU的个数,当然stride
最小为MIN_TRANSFER_STRIDE=16
,也就是说每个扩容线程至少要处理16个桶
初始化nextTable:
扩容是将旧的哈希表tab
中的桶迁移到新的哈希表nextTab
中,如果当前是第一个进入本次扩容的线程,那么nextTab
为空,需要初始化,其过程在如下代码块中:
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
可以看到会新建一个nextTable
,其大小为旧哈希表大小n
左移1位即哈希表容量扩容为2倍,这与HashMap
的扩容倍率一致。同时,初始化时还将迁移区间的起始索引transferIndex
设为n
,设置为哈希表的长度是为了让扩容线程逆向遍历它所要处理的区间,详见下面解析
划分处理区间:
然后计算当前扩容线程分得的处理区间,用bound
表示处理的下标区间的最小值:
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
else if (U.compareAndSetInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
... // 先省略,下面再分析
}
代码中的nextBound
和nextIndex
分别表示本次处理区间的索引最小值和索引最大值+1,即处理区间为:
advance = true
时,不断进入该划分区间的循环:
-
首先检查当前处理的桶下标
i
是否刚好为处理区间的索引最小值或者当前是否已完成迁移,是的话则设置advance=false
结束区间划分循环 -
其次将
transferIndex
的值赋给nextIndex
后,判断当前要处理的区间索引最大值是否小于等于0,若是的话将i
置为-1,说明当前线程没有拿到新的空闲可处理区间,设置advance=false
结束区间划分循环 -
若以上都完成,则使用CAS操作将
ConcurrentHashMap.transferIndex
更新为nextBound
的值(注意nextBound
被赋值为nextIndex - stride
了),这里使用CAS操作的原因是:如果有其他线程也在执行扩容函数
transfer()
,当它们也使用CAS操作修改了transferIndex
的值,说明当前线程准备处理的桶下标区间已被其他线程接管。所以CAS会失败并继续进行下一趟划分区间的循环,直到当前线程竞争下一段处理区间成功。这种自旋的操作保证了不同线程执行扩容时,各自负责一块处理区间,而不会产生竞争若是CAS成功,则更新
bound
和i
的值,设置advance=false
结束区间划分循环
边界处理:
划分完区间后,如果要处理的桶下标i
不在旧哈希表的索引区间 $[0, n)$ 中,则说明存在2种情况:
-
对区间 $[0, \textrm{stride}-1]$ 进行处理的线程已经完成迁移,但此时整体的迁移可能还没完成
-
当前线程没有竞争到空闲的处理区间,即所有区间都有线程在进行迁移(即上一步划分区间的循环中将
i
赋为-1的分支)
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
nextTable = null;
table = nextTab;
sizeCtl = (n << 1) - (n >>> 1);
return;
}
if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
首先对第一种情况进行判断,如果全局扩容完成标志finishing
为true,说明所有扩容线程都已经完成了扩容操作,这个时候可以将旧哈希表table
更新为新的哈希表nextTab
,并将扩容线程的共享变量nextTable
置为null,其他线程可以判断扩容是否结束。同时,将sizeCtl
设置下次触发扩容的阈值: (n << 1) - (n >>> 1)
即原来哈希表大小n
的1.5倍,也是新哈希表大小的0.75(和HashMap
中的默认装载因子0.75一致!)
如果全局扩容还没有完成,说明此时只是当前线程完成了其负责区间的扩容,所以使用CAS操作将sizeCtl
的值更新为sizeCtl - 1
,即从扩容戳的低16位中减去1,代表正在扩容的线程数已减少1个
如果减去当前线程后,判断 sc - 2
的值是否等于 原始扩容标记 rs
(扩容前初始化sc = rs + 2
的逆运算),是的话则说明所有线程都扩容完毕,将全局扩容完成标志finishing
置为true,同时这里又将advance
置为true且处理的桶下标i
置为旧哈希表大小n
,这么做的目的是为了多线程环境下的Double-Check,从区间终点从头开始,检查每个桶是否都被迁移完成
桶迁移:
历经千辛万苦,终于来到真正进行桶迁移的部分了捏
当前扩容线程正在处理下标为i
的桶table[i]
,利用tabAt()
函数获得下标为i
的桶中节点f
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
... // 中间省略部分为上述小节讲解的源码块
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
如果桶为空,即(f = tabAt(tab, i)) == null
,直接使用CAS操作在该桶中放入一个ForwardingNode
节点,该节点为指向新哈希表nextTable
的桥梁,使得CHM在扩容的同时,若其他线程对CHM执行get()
时若访问到这个下标为i
的桶,则可以通过这个桥接节点访问nextTable
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
如果(fh = f.hash) == MOVED
成立,说明该桶中的节点f
已被迁移到nextTable
中,设置advance为true,在下一次大循环体中执行区间划分循环,确定当前线程是否需要继续处理下一区间
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
else if (f instanceof TreeBin) {
... // 红黑树节点的迁移略去,逻辑类似
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
}
如果该桶中节点f
还没有被迁移过,则使用synchronized
关键字锁住该节点,保证迁移时的原子性(仅靠CAS乐观锁无法防止多线程运行时可能会出现的ABA问题)
锁住f
后,按照Double-Check思想,再使用一次tabAt(tab, i)
保证节点f
还是之前使用tabAt()
时取出的原始数据
后面的迁移过程与HashMap类似,桶中链表上的各个节点的hash
值需要判断是否会被分到新哈希表中的i + n
位置,将不会被分到i + n
位置的节点尾插到ln
代表的low_n链表里,其他节点尾插到hn
代表的high_n链表里
如果不清楚为什么需要这么做,可以回看HashMap扩容原理一节中的再哈希rehash部分
最后使用setTabAt()
函数将ln
与hn
分别放入新哈希表中的索引为i
和i + n
的桶中,并将当前桶table[i]
替换为桥接节点fwd
总结:
CHM并发扩容的过程,是拆分Node<K,V>[] table
,让每个线程处理各个区域,每个线程至少可以分到16个桶(即MIN_TRANSFER_STRIDE
),每个线程通过区间逆向遍历以实现扩容,一个已经完成迁移的桶会被替换成ForwardingNode
节点,桥接到nextTab
中,这样即使在扩容时,其他线程对CHM当前(旧)的table
进行get()
时,如果读到了桥接节点,就会自动桥接到正在执行扩容的nextTab
中获取对应元素
LinkedHashMap
存储结构
LinkedHashMap继承自HashMap
public class LinkedHashMap<K,V> extends HashMap<K,V> implements Map<K,V> { ... }
其存储键值对的基础数据结构Entry
也是继承自HashMap::Node
,但是它还添加了前一个节点引用before
和下一个节点引用after
,是一个双向链表的基础节点
static class Entry<K,V> extends HashMap.Node<K,V> {
Entry<K,V> before, after;
Entry(int hash, K key, V value, Node<K,V> next) {
super(hash, key, value, next);
}
}
LinkedHashMap内部还维护了另一个双向链表(储存链表的首尾节点),维护键值对的插入顺序或者LRU顺序
/* The head (eldest) of the doubly linked list. */
transient LinkedHashMap.Entry<K,V> head;
/* The tail (youngest) of the doubly linked list. */
transient LinkedHashMap.Entry<K,V> tail;
LRU(Least Recently Used)即最近最少使用算法,它是内存管理的一种经典算法,在LinkedHashMap中它主要用来淘汰长时间未被访问的节点
源码中的accessOrder
变量决定了具体维护的顺序,true
为LRU顺序,false
为插入顺序
/** The iteration ordering method for this linked hash map:
* {@code true} for access-order, {@code false} for insertion-order. */
final boolean accessOrder;
每次执行节点访问或插入时,会按照当前设定的顺序accessOrder
去维护内部双向链表,具体在以下两个函数中实现
void afterNodeInsertion(boolean evict) {}
void afterNodeAccess(Node<K,V> e) {}
LRU顺序链表维护过程
afterNodeInsertion
每次执行put
插入操作后会调用该函数,唯一参数evict
只有在构造LinkedHashMap时为false
,后面的操作阶段都为true
,注意LinkedHashMap使用的仍是其父类HashMap定义的put
函数,插入后该回调在HashMap::put
中被调用,见上一大节中的HashMap::putVal源码
当LinkedHashMap::removeEldestEntry
方法返回true
时,它会移除内部维护的”LRU顺序“双向链表的头节点,即最近最久未被访问的节点
但是
removeEldestEntry
方法默认返回false
,如果想维护LRU顺序,则需要继承LinkedHashMap并重写该方法使其返回true
void afterNodeInsertion(boolean evict) { // possibly remove eldest
LinkedHashMap.Entry<K,V> first;
if (evict && (first = head) != null && removeEldestEntry(first)) {
K key = first.key;
removeNode(hash(key), key, null, false, true);
}
}
protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
return false;
}
afterNodeAccess
当accessOrder=true
即指定维护LRU顺序时,每当一个节点e被访问后(调用LinkedHashMap::get
),会将该节点放到LRU顺序链表的尾部,保证链表尾部是最近访问的节点,这样一来链表头部自然就是最近最久未访问的节点了
public V get(Object key) {
Node<K,V> e;
if ((e = getNode(key)) == null)
return null;
if (accessOrder)
afterNodeAccess(e);
return e.value;
}
void afterNodeAccess(Node<K,V> e) { // move node to last
LinkedHashMap.Entry<K,V> last;
if (accessOrder && (last = tail) != e) {
LinkedHashMap.Entry<K,V> p =
(LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
p.after = null;
if (b == null)
head = a;
else
b.after = a;
if (a != null)
a.before = b;
else
last = b;
if (last == null)
head = p;
else {
p.before = last;
last.after = p;
}
tail = p;
++modCount;
}
}
继承LinkedHashMap实现LRU缓存
根据上一节的分析,如果想使用LRU缓存这一数据结构,可以继承LinkedHashMap,下面是一个demo
- 设定缓存空间上限为
MAX_ENTRIES=3
- 调用父类的构造函数时,将
accessOrder
设为true
,开启LRU顺序维护 - 重写
removeEldestEntry
方法,当节点数多于缓存空间上限时使该方法返回true
public class LRUCacheDemo<K, V> extends LinkedHashMap<K, V> {
private static final int MAX_ENTRIES = 3;
LRUCacheDemo() {
super(MAX_ENTRIES, 0.75f, true)
}
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_ENTRIES;
}
}
测试一下是否满足LRU顺序,注意默认节点插入顺序跟HashMap
一样都是尾插
public static void main(String[] args) {
LRUCacheDemo<Integer, String> cache = new LRUCacheDemo<>();
cache.put(1, "1"); // 插入1: 1
cache.put(2, "2"); // 插入2: 1->2
cache.put(3, "3"); // 插入3: 1->2->3
cache.get(1); // 访问1,按照LRU算法,节点1被放到当前链表的末尾: 2->3->1
cache.put(4, "4"); // 插入4,超过了缓存空间上限,移除最近最久未使用: 3->1->4
System.out.println(cache.keySet()); // 结果为: 3->1->4
}
ThreadLocalMap
ThreadLocalMap是ThreadLocal类内部持有的线程私有变量的哈希表,其中细节很多,见博文ThreadLocal详解