[Java] 容器类源码分析-Map篇

继承Collection和Map的容器类源码一窥

Posted by Penistrong on December 24, 2022

Java容器类源码分析-Map篇

基于JDK8src.zip对常用的容器类源码进行简单剖析

HashMap

HashMap采用了一个Node类型(实现了Map.Entry接口)的数组作为存放哈希表的基础数据结构,利用位运算(除留余数法的简化版,见#第三节)计算哈希地址,并且使用拉链法处理哈希冲突问题

存储结构

  • 不会被序列化的哈希表数组

    transient Node<K,V>[] table;
    
  • 数组的初始化大小为16,且必须为2的幂,这是为了使用哈希函数时提高效率

      /* The default initial capacity - MUST be a power of two. */
      static final int DEFAULT_INITIAL_CAPACITY = 1 << 4; // aka 16
    
  • Node是一种单向链表节点结构,同时存储哈希值、键值对和下一个具有同义哈希的节点

    static class Node<K,V> implements Map.Entry<K,V> {
        final int hash;
        final K key;
        V value;
        Node<K,V> next;
    
        Node(int hash, K key, V value, Node<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }
    
        public final K getKey()        { return key; }
        public final V getValue()      { return value; }
        public final String toString() { return key + "=" + value; }
    
        public final int hashCode() {
            return Objects.hashCode(key) ^ Objects.hashCode(value);
        }
    
        public final V setValue(V newValue) {
            V oldValue = value;
            value = newValue;
            return oldValue;
        }
    
        public final boolean equals(Object o) {
            if (o == this)
                return true;
    
            return o instanceof Map.Entry<?, ?> e
                    && Objects.equals(key, e.getKey())
                    && Objects.equals(value, e.getValue());
        }
    }
    

键值对的存储流程

当调用HashMap::put时,会使用内建的HashMap::putVal函数

public V put(K key, V value) { return putVal(hash(key), key, value, false, true); }

final V putVal(int hash, K key, V value, boolean onlyIfAbsent,
                   boolean evict) {
    Node<K,V>[] tab; Node<K,V> p; int n, i;
    if ((tab = table) == null || (n = tab.length) == 0)
        n = (tab = resize()).length;
    if ((p = tab[i = (n - 1) & hash]) == null)
        tab[i] = newNode(hash, key, value, null);
    else {
        Node<K,V> e; K k;
        if (p.hash == hash &&
            ((k = p.key) == key || (key != null && key.equals(k))))
            e = p;
        else if (p instanceof TreeNode)
            e = ((TreeNode<K,V>)p).putTreeVal(this, tab, hash, key, value);
        else {
            for (int binCount = 0; ; ++binCount) {
                if ((e = p.next) == null) {
                    p.next = newNode(hash, key, value, null);
                    if (binCount >= TREEIFY_THRESHOLD - 1) // -1 for 1st
                        treeifyBin(tab, hash);
                    break;
                }
                if (e.hash == hash &&
                    ((k = e.key) == key || (key != null && key.equals(k))))
                    break;
                p = e;
            }
        }
        if (e != null) { // existing mapping for key
            V oldValue = e.value;
            if (!onlyIfAbsent || oldValue == null)
                e.value = value;
            afterNodeAccess(e);
            return oldValue;
        }
    }
    ++modCount;
    if (++size > threshold)
        resize();
    afterNodeInsertion(evict);
    return null;
}

首先计算哈希地址

存储键值对时,首先会使用hash()函数计算键的哈希值

static final int hash(Object key) {
    int h;
    return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
}

HashMap在存入键为null的K-V对时,会将该K-V对存放在哈希表中索引为0的桶里,否则会将键的hashCode右移16位后再和其本身做异或

hash()函数的JavaDoc里可以看到这么做是为了减少哈希冲突,需要结合下面的散列操作一起理解

if ((p = tab[i = (n - 1) & hash]) == null) { ... }

一般的哈希函数为除留余数法,除数为哈希表长度。假设当前哈希数组的容量为n,它被规定为2的幂次,即 $n=2^p$ (注意n在二进制下一共有p+1位)

取余操作如下: 让hash去除 $n$,在二进制下商就是hash右移 $p$ 位,被右移出去的这 $p$ 个数位组成的就是余数

因为除法操作被编译后其计算速度远不如位运算,为了提高效率HashMap用位运算代替取余操作: n-1在二进制下是一串连续的1,可以看作是二进制掩码,将n-1直接与hash相与,在二进制下相当于提取了hash的低 $p$ 位作为余数

虽然位运算取余快捷,但是产生的哈希冲突会比较多,如果直接使用存入键的hashCode作为hash去取余,由于采用掩码操作,其高位信息相当于都被丢弃了。比如,当哈希数组初始容量为n=16n-1即 $(1111)_b$ 只会保留hash的低4位信息

hashCode为32位int类型,可能出现多个键的hashcode低p位完全相同而高位大相径庭的情况,但它们都被存在同一个索引的桶中,性能大幅降低

因此在对键初步取哈希值时,会将其hashCode的高位右移16位到int类型的低16位上,再和其本身进行异或得到最终的哈希值hash,这样高位信息和低位信息获得了混合,就可以减少散列冲突、增大哈希表的散列程度

(h = key.hashCode()) ^ (h >>> 16)

接着处理哈希冲突

获得哈希地址即键值对要被存放进的桶下标后

  • 如果当前桶为空,则新建1个Node节点放入桶中

  • 如果不为空,则在桶中已有节点的单向链表上进行顺序查找,对每个节点的key执行判等操作:使用==判断两者地址是否相同或者调用equals函数继续判断两个键是否相同

    if (p.hash == hash &&
        ((k = p.key) == key || (key != null && key.equals(k))))
    
    1. 如果某个已有节点的key与要插入的newKey相同,则将已有节点的value更新为newValue
    2. 否则采用尾插法,新建一个节点追加到该桶中链表的尾部

JDK8开始,插入桶中链表的操作从JDK7头插法改成了尾插法,主要是为了防止对HashMap并发插入时,如果触发了数组扩容,扩容时需要重新分配所有元素的哈希地址,可能会出现同一桶中链表里的两个节点互相指向对方导致死循环

键值对的查找流程

  1. 计算查找键的哈希地址,时间复杂度为$O(1)$
  2. 在对应桶中的单向链表上执行顺序查找,时间复杂度为$O(N)$

假设表长为$M$,键值对总数size=$N$,在哈希表比较均匀的情况下,每个桶里链表的平均长度大约是$\frac{N}{M}$,查找的平均复杂度为$O(\frac{N}{M})$

扩容原理

HashMap的内置构造函数为

public HashMap(int initialCapacity, float loadFactor) {
    if (initialCapacity < 0)
        throw new IllegalArgumentException("Illegal initial capacity: " +
                                          initialCapacity);
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    if (loadFactor <= 0 || Float.isNaN(loadFactor))
        throw new IllegalArgumentException("Illegal load factor: " +
                                          loadFactor);
    this.loadFactor = loadFactor;
    this.threshold = tableSizeFor(initialCapacity);
}

其他构造函数包括无参和单参的,其中loadFactor默认为0.75f,初始容量默认为16。HashMap允许传入一个不为2的幂次的初始容量,它会调用tableSizeFor()函数转换到距离最近的2的幂

参数 含义
capacity 数组容量,为2的幂次,默认为16
size 实际存储的键值对数量,也就是对外暴露的Map::size
threshold size到达该阈值时就会触发数组扩容,构造HashMap时会通过tableSizeFor()计算得到,每次执行resize()时以newThr=(int)newCap * loadFactor进行更新
loadFactor 装载因子,即哈希表能够装载的元素比例,默认为0.75f

计算扩容后新数组的容量与阈值

由哈希表的查找流程可知,为了使$O(N/M)$尽可能小,需要哈希表的容量$M$尽可能得大。所以HashMap为了兼顾时间与空间上的效率,采用动态扩容的方式,每当存储的键值对数量size超过阈值threshold后便执行resize()函数

// resize()函数里,新哈希数组的容量及其阈值的计算部分如下
final Node<K,V>[] resize() {
    Node<K,V>[] oldTab = table;
    int oldCap = (oldTab == null) ? 0 : oldTab.length;
    int oldThr = threshold;
    int newCap, newThr = 0;
    if (oldCap > 0) {
        if (oldCap >= MAXIMUM_CAPACITY) {
            threshold = Integer.MAX_VALUE;
            return oldTab;
        }
        else if ((newCap = oldCap << 1) < MAXIMUM_CAPACITY &&
                  oldCap >= DEFAULT_INITIAL_CAPACITY)
            newThr = oldThr << 1; // double threshold
    }
    else if (oldThr > 0) // initial capacity was placed in threshold
        newCap = oldThr;
    else {               // zero initial threshold signifies using defaults
        newCap = DEFAULT_INITIAL_CAPACITY;
        newThr = (int)(DEFAULT_LOAD_FACTOR * DEFAULT_INITIAL_CAPACITY);
    }
    if (newThr == 0) {
        float ft = (float)newCap * loadFactor;
        newThr = (newCap < MAXIMUM_CAPACITY && ft < (float)MAXIMUM_CAPACITY ?
                  (int)ft : Integer.MAX_VALUE);
    }
    threshold = newThr;

    ...
}

如果哈希数组的旧容量oldCap大于等于默认初始容量16,则每次扩容都是将原数组扩大到2倍,相应的,新阈值newThr也更新为旧阈值oldThr的2倍。在不超出最大容量上限MaximumCapacity的其他情况下,新阈值也可以通过(int)newCap * loadFactor得到

对旧数组中每个桶中的节点再哈希

扩容后的数组容量和阈值计算完毕后,新建1个新的数组,并遍历旧数组的每个桶,对里面存在的节点执行rehash操作:

// resize()函数里,rehash再哈希的处理部分如下
final Node<K,V>[] resize() {
    ...

    @SuppressWarnings({"rawtypes","unchecked"})
    Node<K,V>[] newTab = (Node<K,V>[])new Node[newCap];
    table = newTab;
    if (oldTab != null) {
        for (int j = 0; j < oldCap; ++j) {
            Node<K,V> e;
            if ((e = oldTab[j]) != null) {
                oldTab[j] = null;
                if (e.next == null)
                    newTab[e.hash & (newCap - 1)] = e;
                else if (e instanceof TreeNode)
                    ((TreeNode<K,V>)e).split(this, newTab, j, oldCap);
                else { // preserve order
                    Node<K,V> loHead = null, loTail = null;
                    Node<K,V> hiHead = null, hiTail = null;
                    Node<K,V> next;
                    do {
                        next = e.next;
                        if ((e.hash & oldCap) == 0) {
                            if (loTail == null)
                                loHead = e;
                            else
                                loTail.next = e;
                            loTail = e;
                        }
                        else {
                            if (hiTail == null)
                                hiHead = e;
                            else
                                hiTail.next = e;
                            hiTail = e;
                        }
                    } while ((e = next) != null);
                    if (loTail != null) {
                        loTail.next = null;
                        newTab[j] = loHead;
                    }
                    if (hiTail != null) {
                        hiTail.next = null;
                        newTab[j + oldCap] = hiHead;
                    }
                }
            }
        }
    }
    return newTab;
}
  1. 如果当前桶里的节点唯一,将节点存入对应的新桶中,地址为hash & (newCap - 1)

  2. 如果当前桶里的链表存在,无需每次都使用新容量去计算散列地址,因为newCap=$2^{p+1}$刚好为oldCap=$2^{p}$的2倍,且它们都是2的幂,则newCap - 1刚好比oldCap - 1多出一个p(最低位从0开始)。对每个节点判断其键hash在二进制表示中第p位上的值,注意到oldCap的二进制表示下只有第p位为1,其他数位都是0,因此直接使用位运算e.hash & oldCap就能够提取第p位上的数值:

    • 如果为0, hash & (newCap - 1) == hash & (oldCap - 1),散列地址不变,插入临时低位链表中(loHead后面)

    • 如果为1, hash & (newCap - 1) == hash & (oldCap - 1) + pow(2, p),而$2^{p}$的大小就是oldCap的值,因此其散列地址就是原散列地址j加上oldCap,插入临时高位链表中(hiHead后面)

    • 最后将低位链表放在散列地址不变的桶中newTab[j] = loHead, 高位链表放在原散列地址加上原数组容量的新桶中newTab[j + oldCap] = hiHead

    再散列的过程中都是采用尾插的方式将旧节点插入到新链表的尾部,这样可以保证节点原来的有序性不被破坏

桶中链表转换红黑树

HashMap源码中还定义了如下变量

参数 默认值 含义
TREEIFY_THRESHOLD 8 桶中节点数超过该阈值后便将链表转换为红黑树(调用treeifyBin())
UNTREEIFY_THRESHOLD 6 扩容分桶时使用,原桶里的红黑树结构因为分桶导致节点数减小,小于该阈值后就将红黑树转换为链表(调用untreeify())
MIN_TREEIFY_CAPACITY 64 treeifyBin()中的关键阈值,只有当数组容量n超过该阈值后后才会执行转换操作,否则只调用resize()

默认的TREEIFY_THRESHOLD也说明里自JDK8之后,虽然处理哈希冲突时采用时间复杂度较高的尾插法,但是只要桶中节点数超过8且数组总容量超过64后就会将桶中链表转换为红黑树,所以尾插法对性能的影响也不算大

但是红黑树建树还是比较消耗性能的

HashSet

你可能会有疑问,HashSet不是实现了Set接口吗,而Set接口继承了Collection接口,明明跟Map接口下的容器类八竿子打不着啊?

其实HashSet内部数据结构使用的就是HashMap,只是它暴露给外界的方法等都是CollectionSet等提供的公用方法

private transient HashMap<E,Object> map;

// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();

HashSet的键值对,键是放入的对象,而值是一个Dummy Value: 名为Present的Object对象

add()contains()remove()等方法都是调用内部的HashMap的相关方法,在上面做了一层封装罢了,凡是需要值的地方都使用PRESENT对象代替

ConcurrentHashMap

  • JDK7旧版

    JDK7ConcurrentHashMap采用的是分段锁机制,基础数据结构Segment继承自可重入锁ReentrantLock,每个分段锁都维护着几个桶HashEntry,不同线程可以同时访问不同分段锁上的桶,使并发度更高(并发度就是Segment的个数),默认为16

  • JDK8新版

    JDK8取消了分段锁机制,与HashMap一样都是采用Node数组保存数据,但是会利用桶中节点(链表头节点或者红黑树根节点)作为锁对每个桶里的数据加锁,进一步减少并发冲突、提高并发度

对于并发的处理,JDK8版本的ConcurrentMap引入了CAS机制(Compare And Swap, 比较并交换): 某线程要修改变量$x$的值,首先它会从内存中读取$x$的值设为a(称作期待值),再次从内存中读取$x$的值设为b,如果a == b,则将$x$更新为目标值,否则不进行修改。CAS的本质是一种基于冲突检测的乐观锁策略: 先进行操作,如果没有其他线程争用共享数据,那么操作就成功了,否则采取补偿措施。CAS是非阻塞同步操作,不像synchronizedReentrantLock这两个悲观互斥锁需要通过阻塞其他线程的运行以实现同步

以下都是基于JDK8的ConcurrentHashMap源码进行分析

存储结构

与HashMap一样都是使用Node<K, V>作为基础结构,维护1个数组+链表/红黑树的拉链式哈希表结构,但是在值属性和指向下一个节点的引用属性上都使用了关键字volatile作为修饰,目的是在访问节点时实现不用加锁的无锁读

static class Node<K,V> implements Map.Entry<K,V> {
    final int hash;
    final K key;
    volatile V val;
    volatile Node<K,V> next;
    ...
}

get无锁读

JMM(Java Memory Model, Java内存模型)如下图所示,所有变量都存于主存区域中,每个线程都拥有自己的工作内存,其中包含主内存中共享变量的副本。线程进行的所有操作都是在其对应的工作内存中进行,不能直接操作主存也不能操作其他线程的工作内存

Java内存模型

当变量$x$没有volatile关键字修饰时,线程操作变量的流程如下:

如果线程1要修改$x$的值,它会从主存区将$x$拷贝到自己的工作内存中,然后更新$x$再写回主存中;如果线程2要读取$x$的值,也是去主存区将$x$拷贝进自己的工作内存中再进行读取

这样就会导致可见性问题:比如线程1修改了$x$的值但还没有写回主存,同时线程2要对$x$进行操作,此时线程1所做的修改对线程2不可见。这种因工作内存与主内存存在同步延迟的情况导致了可见性问题

当变量$x$被volatile关键字修饰时,相当于标记变量$x$“应当在主存区进行读写操作”,基于内存屏障(Memory Barrier)实现volatile变量的内存可见性

对一个volatile变量进行写操作时(赋值),通过jitwatch工具可以查看其编译后得到的汇编代码,在mov指令后还有一条lock cmpxchg的汇编指令,在多核处理器上执行该指令时会引发两个操作:

  1. 将当前核中缓存(Java工作内存)行对应的数据写回到系统内存(Java主存)中
  2. 写回系统内存时会使其他核里缓存相同内存地址的数据无效

CPU的不同核之间为了保证缓存一致,实现了缓存一致性协议(MESI),每个核会嗅探在总线上传播的信号以检查自己缓存的数据是否过期,当它发现某个缓存行对应的内存地址被修改,就会将其设置成无效状态,下次要对这个数据执行任何操作时就会去系统内存中重新将数据读取到缓存里,保证数据是最新的变量副本

volatile关键字保证了多线程下操作的可见性,这样在调用ConcurrentHashMap::get方法时,可以不用加锁直接进行无锁读,提高并发效率

ConcurrentHashMap::get源码如下

public V get(Object key) {
    Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
    int h = spread(key.hashCode());
    if ((tab = table) != null && (n = tab.length) > 0 &&
        (e = tabAt(tab, (n - 1) & h)) != null) {
        if ((eh = e.hash) == h) {
            if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                return e.val;
        }
        else if (eh < 0)
            return (p = e.find(h, key)) != null ? p.val : null;
        while ((e = e.next) != null) {
            if (e.hash == h &&
                ((ek = e.key) == key || (ek != null && key.equals(ek))))
                return e.val;
        }
    }
    return null;
}

根据给定的键key计算Hash以定位对应桶的下标,利用ConcurrentHashMap::tabAt封装的volatile读操作,获取对应桶的头节点,如果key不匹配,则继续通过Node的volatile属性next遍历桶里链表中的所有节点,直到key匹配再去读取volatile属性val

以上的3个操作读取的都是volatile修饰的变量,在不加锁的情况下保证了其对应value的内存可见性,读操作也因此线程安全

ConcurrentHashMap::tabAt源码如下

// Unsafe mechanics
private static final Unsafe U = Unsafe.getUnsafe();

static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getReferenceAcquire(tab, ((long)i << ASHIFT) + ABASE);
}

/* In jdk.internal.misc.Unsafe */
@IntrinsicCandidate
public final Object getReferenceAcquire(Object o, long offset) {
    return getReferenceVolatile(o, offset);
}

/**
  * Fetches a reference value from a given Java variable, with volatile
  * load semantics. Otherwise identical to {@link #getReference(Object, long)}
  */
@IntrinsicCandidate
public native Object getReferenceVolatile(Object o, long offset);

put流程

put的调用过程大致分为两部分:

  • 对非空桶的头节点加上互斥锁synchronized,再在其对应的链表上寻找键匹配的节点,对值进行替换

  • 对于空桶,采用基于乐观锁策略的CAS机制往桶里添加新的头节点

put函数调用的还是内建的putVal函数:

final V putVal(K key, V value, boolean onlyIfAbsent) {
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break;                   // no lock when adding to empty bin
        }
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        else if (onlyIfAbsent // check first node without acquiring lock
                    && fh == hash
                    && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                    && (fv = f.val) != null)
            return fv;
        else {
            V oldVal = null;
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    if (fh >= 0) {
                        binCount = 1;
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            if (binCount != 0) {
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    addCount(1L, binCount);
    return null;
}

put非空桶

找到非空桶后,对其中的头节点加上互斥锁,这样其他线程就无法对该桶内的链表进行操作,后续的覆盖或者插入流程与HashMap一致

注意,仔细观察会发现源码中有以下两行:

else if ((fh = f.hash) == MOVED)
    tab = helpTransfer(tab, f);

这个意思是如果当前桶内节点hash值为MOVED,说明当前节点实际上是个ForwardingNode桥接节点,表示有其他线程正在对CHM进行扩容,所以会调用helpTransfer()函数让当前线程也参与扩容,这样子并发扩容能够加速扩容(其实就是迁移旧哈希表中的桶到新哈希表中)所消耗的时间

详见下一节CHM扩容机制

put空桶

找到空桶后,调用casTabAt()函数,其封装的是Unsafe类提供的native方法compareAndSetReference(),它与volatile具有相同的内存读写语义

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetReference(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

/* In jdk.internal.misc.Unsafe */
@IntrinsicCandidate
public final native boolean compareAndSetReference(Object o, long offset,
                                                   Object expected,
                                                   Object x);

compareAndSetReference()函数会比较引用对象o在内存中的地址加上偏移offset处的值是否等于期待值expected,是的话则将该地址的值替换为给定对象x的地址(相当于设置了指向对象x的引用),并返回true

在并发环境下,Put空桶操作基于自旋锁+CAS实现:假设有两个线程都在进行put操作,它们插入的键值对相同,且桶内为空:

  1. 线程1执行casTabAt(tab, i, null, node1)后,期待值null与主内存中tab[i]的值相等,则将tab[i]里的内容替换为node1对象所在的地址,并返回true。新的节点插入成功,此时返回true后执行到break一行后结束for死循环for (Node<K, V>[] tab = table;;)

  2. 线程2在线程1插入键值对之前通过tabAt得知相同位置为空桶,但是线程2稍微落后执行casTabAt(tab, i, null, node2),由于期待值null与此时tab[i]在内存中的实际值不相同(现在tab[i]处为node1的引用地址),说明另有线程对tab[i]执行了修改,继续执行for死循环,在第二次循环中执行前置的tabAt()操作后发现tab[i]这个桶不为空,执行put非空桶分支

扩容机制

ConcurrentHashMap支持并发扩容,且这个过程跟sizeCtl变量息息相关,其源码中的文档如下所示:

/**
 * Table initialization and resizing control.  When negative, the
 * table is being initialized or resized: -1 for initialization,
 * else -(1 + the number of active resizing threads).  Otherwise,
 * when table is null, holds the initial table size to use upon
 * creation, or 0 for default. After initialization, holds the
 * next element count value upon which to resize the table.
 */
private transient volatile int sizeCtl;

根据文档可以得知,sizeCtl为负值时:

  • $-1$ 表示当前ConcurrentHashMap对象正在初始化(执行构造函数)
  • $-(1 + \textrm{正在扩容的线程数})$ 表示正在扩容,并发扩容的线程数可以通过反向计算 $-\textrm{sizeCtl} - 1$ 得到

当内部用来存放桶的tablenull(说明此时对象还没初始化),sizeCtl存储初始的哈希表大小以备构造时使用(为0的话表示采用默认值DEFAULT_CAPACITY),以参数为另一个Map的构造函数为例:

public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this.sizeCtl = DEFAULT_CAPACITY;
    putAll(m);
}

初始化完成后,sizeCtl储存扩容阈值,用以参与判断哈希表是否需要扩容

addCount()方法

在put流程的putVal()函数返回前,会调用addCount()方法,增加当前CHM里的元素总数,并判断当前哈希表是否需要被扩容?如果需要扩容:

  • 当前没有线程在扩容,则初始化transfer()方法

  • 当前有其他线程正在扩容,则本线程也帮助进行扩容,执行helpTransfer()方法

ConcurrentHashMap::addCount源码如下

/**
 * Adds to count, and if table is too small and not already
 * resizing, initiates transfer. If already resizing, helps
 * perform transfer if work is available.  Rechecks occupancy
 * after a transfer to see if another resize is already needed
 * because resizings are lagging additions.
 *
 * @param x the count to add
 * @param check if <0, don't check resize, if <= 1 only check if uncontended
 */
private final void addCount(long x, int check) {
    CounterCell[] cs; long b, s;
    if ((cs = counterCells) != null ||
        !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
        CounterCell c; long v; int m;
        boolean uncontended = true;
        if (cs == null || (m = cs.length - 1) < 0 ||
            (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
            !(uncontended =
                U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
            fullAddCount(x, uncontended);
            return;
        }
        if (check <= 1)
            return;
        s = sumCount();
    }
    if (check >= 0) {
        Node<K,V>[] tab, nt; int n, sc;
        while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
                (n = tab.length) < MAXIMUM_CAPACITY) {
            int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
            if (sc < 0) {
                if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                    (nt = nextTable) == null || transferIndex <= 0)
                    break;
                if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                    transfer(tab, nt);
            }
            else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
                transfer(tab, null);
            s = sumCount();
        }
    }
}

putVal函数调用addCount()时,将put时插入后的桶中元素个数binCount传入形参check,控制本次addCount()方法是否要检查需不需要扩容

方法体中,前半部分是利用CounterCell计算当前哈希表中的总计元素个数,后半部分检查是否要执行扩容方法transfer(),这两个部分有点复杂,所以下文分开梳理

元素计数器baseCount与CounterCell[]

addCount方法接收的第一个参数long x表示本次添加的元素个数,尔后通过CAS操作更新baseCount字段的值,如果本次CAS操作失败,说明有其他线程在竞争执行addCount()方法,就会转而使用CounterCell[]数组分摊并发情况下多个线程操作单一字段baseCount的竞争压力

CounterCell[] cs; long b, s;
if ((cs = counterCells) != null ||
    !U.compareAndSetLong(this, BASECOUNT, b = baseCount, s = b + x)) {
    CounterCell c; long v; int m;
    boolean uncontended = true;
    if (cs == null || (m = cs.length - 1) < 0 ||
        (c = cs[ThreadLocalRandom.getProbe() & m]) == null ||
        !(uncontended =
            U.compareAndSetLong(c, CELLVALUE, v = c.value, v + x))) {
        fullAddCount(x, uncontended);
        return;
    }
    if (check <= 1)
        return;
    s = sumCount();
}

上述代码中第3行使用Unsafe类提供的compareAndSetLong这个CAS更新Long类型值的函数,类静态常量BASECOUNT为字段baseCount在CHM对象实例中的内存偏移量

CAS函数根据当前CHM对象的内存起始地址加上basecount字段的实际偏移量BASECOUNT,取出主内存中baseCount的实际值与当前线程工作内存的baseCount预期值进行比较,相符则说明没有其他线程在竞争,于是更新为新的元素个数

/**
 * Base counter value, used mainly when there is no contention,
 * but also as a fallback during table initialization
 * races. Updated via CAS.
 */
private transient volatile long baseCount;

private static final long BASECOUNT
    = U.objectFieldOffset(ConcurrentHashMap.class, "baseCount");

/**
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */
private static final long CELLVALUE
        = U.objectFieldOffset(CounterCell.class, "value");

@jdk.internal.vm.annotation.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

如果不相符,则本次CAS失败,设置uncontended标志变量表示”没有竞争”这个状态,利用当前线程内置的ThreadLocalRandom这个线程独占seed种子的随机数生成器快速产生一个随机数,尔后和当前CHM对象的counterCells[]数组大小m=length-1相与,得到要操作的下标i,即CounterCell c = counterCells[i]

当前线程只要操作counterCells[i]完成元素个数更新,最后再调用fullAddCount()方法将counterCells[]数组中每个CounterCell.value累加,再加上旧baseCount的值后更新元素总数

同时观察这部分代码可以发现,当出现线程竞争后,操作完counterCells[i]后无视了check扩容过程直接返回,这是因为,既然出现了线程竞争的高并发状态,说明其他线程也在执行fullAddCount()的计数过程,如果当前线程计数完成后还继续检测是否需要扩容,就会将高并发压力延续到扩容流程里,导致压力持续增大,所以源码中选择直接return减少高并发,等到下一次某个线程在竞争不激烈的状态下完成元素个数更新,再检测是否需要扩容

[题外话] 为什么要使用Unsafe类计算偏移量呢? 难道这些字段不可以根据其类型对应的字节数按照字段出现顺序计算其起始偏移地址吗?

由于JVM对齐填充的存在(对应虚拟机选项-XX:ObjectAlignmentInBytes,默认为8字节),它不仅作用于对象与对象之间,还存在于对象中的字段之间(以下均是针对64位JVM):

  1. 对象与对象之间: 任何对象的大小都是8的倍数,若不满足则进行对象填充以对齐

  2. 对象中的字段之间: 64位的long类型、double类型以及不开启压缩指针时的对象引用类型这三种字段的起始地址必须为8的倍数(若开启压缩指针,则对象引用类型的偏移量只需为4的倍数)

对齐填充是为了保证不出现跨CPU缓存行的字段,提高执行效率。而JVM为了做到这一点,就会使用字段重排列方法,重新分配对象中字段的先后顺序,以达到内存对齐的目的

所以如果仅按照字段出现顺序计算偏移量,会导致偏移量不准确,所以需要使用Unsafe::objectFieldOffset()函数计算字段在同一类但是在不同JVM中的实际偏移量

另外,上面修饰CounterCell类的注解@Contended是用来解决对象字段之间的虚共享(False Sharing)问题:

同一对象中不同的volatile字段,逻辑上它们并没有共享内容,不需要同步,但是如果这两个字段恰好处于同一CPU缓存行中,那么对这些字段进行写操作就会导致缓存行写回主内存中,造成了实质物理上的共享

JVM会让被@Contended注解的不同字段分配到不同的缓存行中以解决虚共享问题

检查是否需要扩容

addCount()函数的第二部分执行的是扩容检查逻辑:

if (check >= 0) {
    Node<K,V>[] tab, nt; int n, sc;
    while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
            (n = tab.length) < MAXIMUM_CAPACITY) {
        int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT;
        if (sc < 0) {
            if (sc == rs + MAX_RESIZERS || sc == rs + 1 ||
                (nt = nextTable) == null || transferIndex <= 0)
                break;
            if (U.compareAndSetInt(this, SIZECTL, sc, sc + 1))
                transfer(tab, nt);
        }
        else if (U.compareAndSetInt(this, SIZECTL, sc, rs + 2))
            transfer(tab, null);
        s = sumCount();
    }
}

上述代码中,判断更新后的元素总个数s是否大于当前sizeCtl扩容阈值,如果大于阈值说明需要扩容,调用resizeStamp()函数根据哈希表大小n = tab.length生成一个唯一的扩容戳rs:

static final int resizeStamp(int n) {
    return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

private final int RESIZE_STAMP_BITS = 16;

首先利用Integer.numberOfLeadingZeros()得到n的32位Int类型二进制表示中的前导0个数,然后与二进制中第15位为1的扩容标记掩码相或,生成一个唯一的扩容戳

n = 16为例,调用resizeStamp(n)有如下计算:

\[n = [0000, 0000, 0000, 0000, 0000, 0000, 000\colorbox{red}{1}, 0000]\]
  1. Integer.numberOfLeadingZeros(n) = $28$,一共有28个前导0,28的二进制表示如下:

    \[\textrm{leadingZeros} = [0000, 0000, 0000, 0000, 0000, 0000, 000\colorbox{yellow}{1}, \colorbox{yellow}{1}\colorbox{yellow}{1}00]\]
  2. 1 << (RESIZE_STAMP_BITS - 1):

    \[\textrm{mask} = [0000, 0000, 0000, 0000, \colorbox{red}{1}000, 0000, 0000, 0000]\]
  3. 两者相或得到低16位的扩容戳标记:

    \[\textrm{rs}_\textrm{low16bit} = [0000, 0000, 0000, 0000, \colorbox{red}{1}000, 0000, 000\colorbox{yellow}{1}, \colorbox{yellow}{1}\colorbox{yellow}{1}00]\]

最后,使用int rs = resizeStamp(n) << RESIZE_STAMP_SHIFT,将得到的低16位扩容戳标记左移16位,这样rs的高16位就是扩容标记:

\[\textrm{rs} = [\colorbox{red}{1}000, 0000, 000\colorbox{yellow}{1}, \colorbox{yellow}{1}\colorbox{yellow}{1}00, 0000, 0000, 0000, 0000]\]

那么低16位的作用呢?存储并行扩容时的线程数:

高 RESIZE_STAMP_BITS 位 低RESIZE_STAMP_BITS 位
扩容标记 并行扩容线程个数

rs的二进制表示中最高位为1,即rs是逻辑意义上的负数

然后判断sc = sizeCtl是否小于0:

  • 大于等于0:说明没有其他线程在扩容,当前线程是马上要执行扩容的第一个线程,利用CAS操作将sizeCtl的值更新为扩容戳rs + 2

    为什么是+2呢?因为CHM对象初始化时,sizeCtl = -1,即低16位的最低位为1,而各个函数中处理sizeCtl都是以位运算方式进行的,最低位的1留作初始化标志位,所以当前线程作为第一个扩容线程其扩容戳的线程个数要从+2开始计算。这也应证了官方文档对sizeCtl的解释,利用-sizeCtl - 1即可获得当前正在并发扩容的线程总数

  • 小于0:说明已经有其他线程正在执行扩容,利用CAS操作将sizeCtl的值+1,而sizeCtl被第一个扩容线程更新为扩容戳,低16位储存的是正在并行扩容的线程个数,+1表示当前线程也将参与扩容

扩容主函数transfer()

详细源码自行定位到源码中的ConcurrentHashMap::transfer函数查看,下面分块解释源码作用

int n = tab.length, stride;
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    stride = MIN_TRANSFER_STRIDE; // subdivide range

stride为每个物理机上CPU处理的桶数量,用当前哈希表大小n除以8(即右移3位)再除以CPU的个数,当然stride最小为MIN_TRANSFER_STRIDE=16,也就是说每个扩容线程至少要处理16个桶

初始化nextTable:

扩容是将旧的哈希表tab中的桶迁移到新的哈希表nextTab中,如果当前是第一个进入本次扩容的线程,那么nextTab为空,需要初始化,其过程在如下代码块中:

if (nextTab == null) {            // initiating
    try {
        @SuppressWarnings("unchecked")
        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
        nextTab = nt;
    } catch (Throwable ex) {      // try to cope with OOME
        sizeCtl = Integer.MAX_VALUE;
        return;
    }
    nextTable = nextTab;
    transferIndex = n;
}

可以看到会新建一个nextTable,其大小为旧哈希表大小n左移1位即哈希表容量扩容为2倍,这与HashMap的扩容倍率一致。同时,初始化时还将迁移区间的起始索引transferIndex设为n,设置为哈希表的长度是为了让扩容线程逆向遍历它所要处理的区间,详见下面解析

划分处理区间:

然后计算当前扩容线程分得的处理区间,用bound表示处理的下标区间的最小值:

boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
    Node<K,V> f; int fh;
    while (advance) {
        int nextIndex, nextBound;
        if (--i >= bound || finishing)
            advance = false;
        else if ((nextIndex = transferIndex) <= 0) {
            i = -1;
            advance = false;
        }
        else if (U.compareAndSetInt
                    (this, TRANSFERINDEX, nextIndex,
                    nextBound = (nextIndex > stride ?
                                nextIndex - stride : 0))) {
            bound = nextBound;
            i = nextIndex - 1;
            advance = false;
        }
    }
    ... // 先省略,下面再分析
}

代码中的nextBoundnextIndex分别表示本次处理区间的索引最小值和索引最大值+1,即处理区间为:

\[\textrm{table}\left[\textrm{nextBound}, \textrm{nextIndex}\right)\]

advance = true时,不断进入该划分区间的循环:

  • 首先检查当前处理的桶下标i是否刚好为处理区间的索引最小值或者当前是否已完成迁移,是的话则设置advance=false结束区间划分循环

  • 其次将transferIndex的值赋给nextIndex后,判断当前要处理的区间索引最大值是否小于等于0,若是的话将i置为-1,说明当前线程没有拿到新的空闲可处理区间,设置advance=false结束区间划分循环

  • 若以上都完成,则使用CAS操作将ConcurrentHashMap.transferIndex更新为nextBound的值(注意nextBound被赋值为nextIndex - stride了),这里使用CAS操作的原因是:

    如果有其他线程也在执行扩容函数transfer(),当它们也使用CAS操作修改了transferIndex的值,说明当前线程准备处理的桶下标区间已被其他线程接管。所以CAS会失败并继续进行下一趟划分区间的循环,直到当前线程竞争下一段处理区间成功。这种自旋的操作保证了不同线程执行扩容时,各自负责一块处理区间,而不会产生竞争

    若是CAS成功,则更新boundi的值,设置advance=false结束区间划分循环

边界处理:

划分完区间后,如果要处理的桶下标i不在旧哈希表的索引区间 $[0, n)$ 中,则说明存在2种情况:

  1. 对区间 $[0, \textrm{stride}-1]$ 进行处理的线程已经完成迁移,但此时整体的迁移可能还没完成

  2. 当前线程没有竞争到空闲的处理区间,即所有区间都有线程在进行迁移(即上一步划分区间的循环中将i赋为-1的分支)

if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

首先对第一种情况进行判断,如果全局扩容完成标志finishing为true,说明所有扩容线程都已经完成了扩容操作,这个时候可以将旧哈希表table更新为新的哈希表nextTab,并将扩容线程的共享变量nextTable置为null,其他线程可以判断扩容是否结束。同时,将sizeCtl设置下次触发扩容的阈值: (n << 1) - (n >>> 1) 即原来哈希表大小n的1.5倍,也是新哈希表大小的0.75(和HashMap中的默认装载因子0.75一致!)

如果全局扩容还没有完成,说明此时只是当前线程完成了其负责区间的扩容,所以使用CAS操作将sizeCtl的值更新为sizeCtl - 1,即从扩容戳的低16位中减去1,代表正在扩容的线程数已减少1个

如果减去当前线程后,判断 sc - 2的值是否等于 原始扩容标记 rs(扩容前初始化sc = rs + 2的逆运算),是的话则说明所有线程都扩容完毕,将全局扩容完成标志finishing置为true,同时这里又将advance置为true且处理的桶下标i置为旧哈希表大小n,这么做的目的是为了多线程环境下的Double-Check,从区间终点从头开始,检查每个桶是否都被迁移完成

桶迁移:

历经千辛万苦,终于来到真正进行桶迁移的部分了捏

当前扩容线程正在处理下标为i的桶table[i],利用tabAt()函数获得下标为i的桶中节点f

ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
... // 中间省略部分为上述小节讲解的源码块
else if ((f = tabAt(tab, i)) == null)
    advance = casTabAt(tab, i, null, fwd);

如果桶为空,即(f = tabAt(tab, i)) == null,直接使用CAS操作在该桶中放入一个ForwardingNode节点,该节点为指向新哈希表nextTable的桥梁,使得CHM在扩容的同时,若其他线程对CHM执行get()时若访问到这个下标为i的桶,则可以通过这个桥接节点访问nextTable

else if ((fh = f.hash) == MOVED)
    advance = true; // already processed

如果(fh = f.hash) == MOVED成立,说明该桶中的节点f已被迁移到nextTable中,设置advance为true,在下一次大循环体中执行区间划分循环,确定当前线程是否需要继续处理下一区间

else {
    synchronized (f) {
        if (tabAt(tab, i) == f) {
            Node<K,V> ln, hn;
            if (fh >= 0) {
                int runBit = fh & n;
                Node<K,V> lastRun = f;
                for (Node<K,V> p = f.next; p != null; p = p.next) {
                    int b = p.hash & n;
                    if (b != runBit) {
                        runBit = b;
                        lastRun = p;
                    }
                }
                if (runBit == 0) {
                    ln = lastRun;
                    hn = null;
                }
                else {
                    hn = lastRun;
                    ln = null;
                }
                for (Node<K,V> p = f; p != lastRun; p = p.next) {
                    int ph = p.hash; K pk = p.key; V pv = p.val;
                    if ((ph & n) == 0)
                        ln = new Node<K,V>(ph, pk, pv, ln);
                    else
                        hn = new Node<K,V>(ph, pk, pv, hn);
                }
                setTabAt(nextTab, i, ln);
                setTabAt(nextTab, i + n, hn);
                setTabAt(tab, i, fwd);
                advance = true;
            }
            else if (f instanceof TreeBin) {
                ... // 红黑树节点的迁移略去,逻辑类似
            }
            else if (f instanceof ReservationNode)
                throw new IllegalStateException("Recursive update");
        }
    }
}

如果该桶中节点f还没有被迁移过,则使用synchronized关键字锁住该节点,保证迁移时的原子性(仅靠CAS乐观锁无法防止多线程运行时可能会出现的ABA问题)

锁住f后,按照Double-Check思想,再使用一次tabAt(tab, i)保证节点f还是之前使用tabAt()时取出的原始数据

后面的迁移过程与HashMap类似,桶中链表上的各个节点的hash值需要判断是否会被分到新哈希表中的i + n位置,将不会被分到i + n位置的节点尾插到ln代表的low_n链表里,其他节点尾插到hn代表的high_n链表里

如果不清楚为什么需要这么做,可以回看HashMap扩容原理一节中的再哈希rehash部分

最后使用setTabAt()函数将lnhn分别放入新哈希表中的索引为ii + n的桶中,并将当前桶table[i]替换为桥接节点fwd

总结:

CHM并发扩容的过程,是拆分Node<K,V>[] table,让每个线程处理各个区域,每个线程至少可以分到16个桶(即MIN_TRANSFER_STRIDE),每个线程通过区间逆向遍历以实现扩容,一个已经完成迁移的桶会被替换成ForwardingNode节点,桥接到nextTab中,这样即使在扩容时,其他线程对CHM当前(旧)的table进行get()时,如果读到了桥接节点,就会自动桥接到正在执行扩容的nextTab中获取对应元素

LinkedHashMap

存储结构

LinkedHashMap继承自HashMap

public class LinkedHashMap<K,V> extends HashMap<K,V> implements Map<K,V> { ... }

其存储键值对的基础数据结构Entry也是继承自HashMap::Node,但是它还添加了前一个节点引用before和下一个节点引用after,是一个双向链表的基础节点

static class Entry<K,V> extends HashMap.Node<K,V> {
    Entry<K,V> before, after;
    Entry(int hash, K key, V value, Node<K,V> next) {
        super(hash, key, value, next);
    }
}

LinkedHashMap内部还维护了另一个双向链表(储存链表的首尾节点),维护键值对的插入顺序或者LRU顺序

/* The head (eldest) of the doubly linked list. */
transient LinkedHashMap.Entry<K,V> head;

/* The tail (youngest) of the doubly linked list. */
transient LinkedHashMap.Entry<K,V> tail;

LRU(Least Recently Used)即最近最少使用算法,它是内存管理的一种经典算法,在LinkedHashMap中它主要用来淘汰长时间未被访问的节点

源码中的accessOrder变量决定了具体维护的顺序,true为LRU顺序,false为插入顺序

/** The iteration ordering method for this linked hash map:
  * {@code true} for access-order, {@code false} for insertion-order. */
final boolean accessOrder;

每次执行节点访问或插入时,会按照当前设定的顺序accessOrder去维护内部双向链表,具体在以下两个函数中实现

void afterNodeInsertion(boolean evict) {}

void afterNodeAccess(Node<K,V> e) {}

LRU顺序链表维护过程

afterNodeInsertion

每次执行put插入操作后会调用该函数,唯一参数evict只有在构造LinkedHashMap时为false,后面的操作阶段都为true,注意LinkedHashMap使用的仍是其父类HashMap定义的put函数,插入后该回调在HashMap::put中被调用,见上一大节中的HashMap::putVal源码

LinkedHashMap::removeEldestEntry方法返回true时,它会移除内部维护的”LRU顺序“双向链表的头节点,即最近最久未被访问的节点

但是removeEldestEntry方法默认返回false,如果想维护LRU顺序,则需要继承LinkedHashMap并重写该方法使其返回true

void afterNodeInsertion(boolean evict) { // possibly remove eldest
    LinkedHashMap.Entry<K,V> first;
    if (evict && (first = head) != null && removeEldestEntry(first)) {
        K key = first.key;
        removeNode(hash(key), key, null, false, true);
    }
}

protected boolean removeEldestEntry(Map.Entry<K,V> eldest) {
    return false;
}

afterNodeAccess

accessOrder=true即指定维护LRU顺序时,每当一个节点e被访问后(调用LinkedHashMap::get),会将该节点放到LRU顺序链表的尾部,保证链表尾部是最近访问的节点,这样一来链表头部自然就是最近最久未访问的节点了

public V get(Object key) {
    Node<K,V> e;
    if ((e = getNode(key)) == null)
        return null;
    if (accessOrder)
        afterNodeAccess(e);
    return e.value;
}

void afterNodeAccess(Node<K,V> e) { // move node to last
    LinkedHashMap.Entry<K,V> last;
    if (accessOrder && (last = tail) != e) {
        LinkedHashMap.Entry<K,V> p =
            (LinkedHashMap.Entry<K,V>)e, b = p.before, a = p.after;
        p.after = null;
        if (b == null)
            head = a;
        else
            b.after = a;
        if (a != null)
            a.before = b;
        else
            last = b;
        if (last == null)
            head = p;
        else {
            p.before = last;
            last.after = p;
        }
        tail = p;
        ++modCount;
    }
}

继承LinkedHashMap实现LRU缓存

根据上一节的分析,如果想使用LRU缓存这一数据结构,可以继承LinkedHashMap,下面是一个demo

  • 设定缓存空间上限为MAX_ENTRIES=3
  • 调用父类的构造函数时,将accessOrder设为true,开启LRU顺序维护
  • 重写removeEldestEntry方法,当节点数多于缓存空间上限时使该方法返回true
public class LRUCacheDemo<K, V> extends LinkedHashMap<K, V> {
    private static final int MAX_ENTRIES = 3;

    LRUCacheDemo() {
        super(MAX_ENTRIES, 0.75f, true)
    }

    protected boolean removeEldestEntry(Map.Entry eldest) {
        return size() > MAX_ENTRIES;
    }
}

测试一下是否满足LRU顺序,注意默认节点插入顺序跟HashMap一样都是尾插

public static void main(String[] args) {
    LRUCacheDemo<Integer, String> cache = new LRUCacheDemo<>();

    cache.put(1, "1");  // 插入1: 1
    cache.put(2, "2");  // 插入2: 1->2
    cache.put(3, "3");  // 插入3: 1->2->3
    cache.get(1);       // 访问1,按照LRU算法,节点1被放到当前链表的末尾: 2->3->1
    cache.put(4, "4");  // 插入4,超过了缓存空间上限,移除最近最久未使用: 3->1->4

    System.out.println(cache.keySet()); // 结果为: 3->1->4
}

ThreadLocalMap

ThreadLocalMap是ThreadLocal类内部持有的线程私有变量的哈希表,其中细节很多,见博文ThreadLocal详解