本文详细解析了Java中线程安全的HashMap实现——ConcurrentHashMap的工作原理。通过深入分析其内部源码,我们阐述了ConcurrentHashMap如何利用分段锁、CAS操作、扩容机制、近似计数等技术实现高并发和线程安全。同时,我们还提供了一些实际的使用示例,帮助读者更好地理解和掌握ConcurrentHashMap的使用方法。

本文主要分成两部分:第一部分讲述ConcurrentHashMap的简介和用法;第二部分结合源码实现,具体阐述ConcurrentHashMap如何实现高并发和线程安全。

一、简介和用法

1.1 ConcurrentHashMap简介

ConcurrentHashMap是Java中提供的一个线程安全的HashMap实现,它采用分段锁和CAS(Compare and Swap)操作等技术来实现高并发和线程安全。下面我们结合ConcurrentHashMap的内部源码解释分段锁、CAS操作、扩容机制、近似计数等技术如何实现的。

1.2 并发操作方法

ConcurrentHashMap提供了一些用于并发操作的方法,如putIfAbsent()、replace()、remove()等。这些方法可以在一个原子操作中完成检查和更新,从而避免多线程环境下的竞争条件。

例如,下面的代码展示了使用putIfAbsent()方法来实现一个线程安全的缓存:

import java.util.concurrent.ConcurrentHashMap;

public class Cache {
    private ConcurrentHashMap<String, Object> cache = new ConcurrentHashMap<>();

    public Object get(String key) {
        return cache.get(key);
    }

    public void putIfAbsent(String key, Object value) {
        cache.putIfAbsent(key, value);
    }
}

在这个示例中,我们创建了一个Cache类,它使用ConcurrentHashMap来存储缓存数据。当我们需要添加一个键值对时,可以使用putIfAbsent()方法,这个方法会在键不存在时才添加键值对,从而避免覆盖已存在的值。

1.3 遍历ConcurrentHashMap

ConcurrentHashMap的遍历操作也是线程安全的。它提供了keySet、values和entrySet等方法,可以返回Map的键集、值集或键值对集。这些方法返回的集合是ConcurrentHashMap的视图,它们会反映ConcurrentHashMap的实时状态。也就是说,你在遍历这些集合的过程中,其他线程对ConcurrentHashMap的修改操作是可见的。

例如,下面的代码展示了如何遍历ConcurrentHashMap:

import java.util.concurrent.ConcurrentHashMap;

public class ConcurrentHashMapExample {
    public static void main(String[] args) {
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        // 添加键值对
        map.put("one", 1);
        map.put("two", 2);
        map.put("three", 3);

        // 遍历ConcurrentHashMap
        for (String key : map.keySet()) {
            System.out.println("Key: " + key + ", Value: " + map.get(key));
        }
    }
}

在这个示例中,我们创建了一个ConcurrentHashMap实例,然后使用put方法添加了一些键值对,最后使用for-each循环遍历了整个ConcurrentHashMap。这个遍历操作是线程安全的,即使在遍历过程中有其他线程修改ConcurrentHashMap,也不会抛出ConcurrentModificationException。

1.4 扩展方法介绍

ConcurrentHashMap还提供了一些并发编程中常用的扩展方法,如compute、merge等。这些方法可以在一个原子操作中完成复杂的更新逻辑,从而避免多线程环境下的竞争条件。

例如,下面的代码展示了使用compute方法来实现一个线程安全的计数器:

import java.util.concurrent.ConcurrentHashMap;

public class Counter {
    private ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

    public void increment(String key) {
        map.compute(key, (k, v) -> (v == null) ? 1 : v + 1);
    }

    public int getCount(String key) {
        return map.getOrDefault(key, 0);
    }
}

在这个示例中,我们创建了一个Counter类,它使用ConcurrentHashMap来存储计数数据。当我们需要增加一个键的计数时,可以使用compute方法,这个方法会在键存在时增加计数,否则初始化计数为1。

1.5 并发性能分析

由于ConcurrentHashMap采用了分段锁和CAS操作等技术,它在高并发环境下具有很好的性能。相比于同步的HashMap(如Hashtable或使用Collections.synchronizedMap包装的HashMap),ConcurrentHashMap在读操作上几乎没有锁的开销,在写操作上也只需要锁定部分段,因此并发性能更高。

然而,ConcurrentHashMap并不是万能的。在数据量较小或并发访问较低的情况下,简单的HashMap可能会更快。此外,ConcurrentHashMap也不能保证所有操作的全局有序性。如果需要全局有序性,可以考虑使用同步的Map实现,或者使用锁和其他同步工具来协调并发操作。

1.6 局限性与适用场景

虽然ConcurrentHashMap在并发环境下提供了很好的性能,但它也有一些局限性。首先,ConcurrentHashMap的所有操作都是线程安全的,但如果你需要执行复合操作(例如,先检查一个键是否存在,然后根据结果进行更新操作),那么就需要额外的同步措施来保证这些操作的原子性。因为在两个操作之间,可能有其他线程修改了ConcurrentHashMap的状态。

其次,ConcurrentHashMap的size方法和isEmpty方法返回的结果是近似的,它们可能不会立即反映其他线程的修改操作。这是因为为了提高性能,ConcurrentHashMap没有使用全局锁来同步这些方法。

最后,虽然ConcurrentHashMap的并发性能很好,但如果你的应用场景中读操作远多于写操作,那么使用Read-Write Locks可能会获得更好的性能。Read-Write Locks允许多个线程同时读,但只允许一个线程写,这对于读多写少的场景是非常有效的。

二、源码分析

2.1 数据结构

2.1.1 Java 7的数据结构

Java 7:ConcurrentHashMap的源码中可以看到Segment的定义,每个Segment都有一个HashEntry数组和一个ReentrantLock锁。当我们插入一个新元素时,会根据元素的哈希值确定要插入的Segment,然后再在该Segment的HashEntry数组中找到合适的位置插入元素。

源码中的HashEntry定义如下:

static final class HashEntry<K,V> {
    final K key;
    final int hash;
    volatile V value;
    final HashEntry<K,V> next;
}

源码中的Segment定义如下:

static final class Segment<K,V> extends ReentrantLock implements Serializable {
    private static final long serialVersionUID = 2249069246763182397L;

    // HashEntry 数组
    transient volatile HashEntry<K,V>[] table;

    // ...
}
2.1.2 Java 8的数据结构

Java 8:ConcurrentHashMap的源码中,Segment的定义已经被移除。现在的数据结构是一个Node数组,每个Node可以是一个链表或者红黑树。

当链表长度超过TREEIFY_THRESHOLD(默认为8)时,链表会转为红黑树。这部分代码位于 ConcurrentHashMap 类的 treeifyBin 方法中。以下是源码片段:

final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        // 如果数组长度小于 MIN_TREEIFY_CAPACITY(默认为64),则进行扩容而不是转为红黑树
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    // 遍历链表,将链表节点转换为树节点
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p = new TreeNode<K,V>
                            (e.hash, e.key, e.val, null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    // 将链表转换后的树节点设置到数组对应的位置
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}

当链表长度达到 TREEIFY_THRESHOLD 时,putVal 方法会调用 treeifyBin 方法。在 treeifyBin 方法中,首先检查数组长度是否小于 MIN_TREEIFY_CAPACITY(默认为64)。如果数组长度小于这个值,那么进行扩容操作。否则,遍历链表,将链表节点转换为树节点,并将转换后的树节点设置到数组对应的位置。这样,原来的链表就被转换为了红黑树。

2.2 锁机制

2.2.1 Java 7的锁机制

Java 7:在ConcurrentHashMap的源码中,每个Segment都有一个独立的ReentrantLock锁。在插入、删除和更新操作时,只需要锁定特定的Segment。

2.2.2 Java 8的锁机制

Java 8:锁机制的改进主要体现在putVal方法中。在Java 8的实现中,插入一个新元素时,如果计算出的位置当前没有元素,那么直接使用CAS操作插入新元素;如果计算出的位置已经有其他元素(存在哈希冲突),那么需要锁定这个位置,然后再进行插入操作。

final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 检查 key 和 value 是否为 null,如果为 null 则抛出空指针异常
    if (key == null || value == null) throw new NullPointerException();
    // 计算 key 的哈希值
    int hash = spread(key.hashCode());
    // 计数器,用于记录链表中节点的数量
    int binCount = 0;
    // 无限循环,直到插入成功或者更新成功
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; int n, i, fh; K fk; V fv;
        // 如果 table 未初始化或长度为 0,则初始化 table
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 如果计算出的位置为空,则尝试使用 CAS 操作插入新节点
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
                break; // 插入成功,跳出循环
        }
        // 如果当前位置的节点的哈希值为 MOVED(表示在进行扩容),则帮助进行数据迁移
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 如果 onlyIfAbsent 为 true,且当前位置的节点的 key 与待插入的 key 相等,则返回当前节点的 value
        else if (onlyIfAbsent
                 && fh == hash
                 && ((fk = f.key) == key || (fk != null && key.equals(fk)))
                 && (fv = f.val) != null)
            return fv;
        // 如果当前位置的节点的 key 与待插入的 key 不相等,则需要进行同步操作
        else {
            V oldVal = null;
            // 锁定当前位置的节点
            synchronized (f) {
                // 再次检查当前位置的节点是否未发生变化
                if (tabAt(tab, i) == f) {
                    // 如果当前位置的节点是普通节点(链表节点)
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历链表,查找 key 相等的节点
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                 (ek != null && key.equals(ek)))) {
                                // 找到相等的 key,更新 value
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            if ((e = e.next) == null) {
                                // 遍历到链表末尾,插入新节点
                                pred.next = new Node<K,V>(hash, key, value);
                                break;
                            }
                        }
                    }
                    // 如果当前位置的节点是红黑树节点
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 将新的 key-value 插入红黑树
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                       value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                    // 如果当前位置的节点是预留节点(用于扩容时的占位),抛出异常
                    else if (f instanceof ReservationNode)
                        throw new IllegalStateException("Recursive update");
                }
            }
            // 如果 binCount 不为 0,表示已经插入或更新成功
            if (binCount != 0) {
                // 如果链表长度达到阈值,将链表转换为红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                // 如果 oldVal 不为 null,表示更新成功,返回旧的 value
                if (oldVal != null)
                    return oldVal;
                // 插入成功,跳出
                break;
            }
        }
    }
    // 元素数量增加,如果超过阈值,进行扩容
    addCount(1L, binCount);
    // 插入成功,返回 null
    return null;
}

这段代码是 ConcurrentHashMap 的 putVal 方法。这个方法主要用于插入或者更新 key-value 对。如果 key 不存在,那么插入新的 key-value 对;如果 key 存在,那么更新对应的 value。如果 onlyIfAbsent 参数为 true,那么只有当 key 不存在时才插入新的 key-value 对。

这个方法首先计算 key 的哈希值,然后根据哈希值找到在数组中的位置。如果这个位置为空,那么直接插入新的节点;如果这个位置不为空,那么需要根据节点的类型(普通节点、红黑树节点、预留节点)进行相应的操作。

如果插入或更新成功,那么更新元素的数量,并检查是否需要进行扩容。如果需要扩容,那么进行扩容操作。

最后,如果插入成功,返回 null;如果更新成功,返回旧的 value。

2.3 扩容机制原理

当ConcurrentHashMap的某个Segment的填充程度超过阈值时,为了保持性能,ConcurrentHashMap会对该Segment进行扩容。扩容操作涉及创建一个新的、更大的HashEntry数组,并将旧数组中的所有键值对重新插入到新数组中。这个过程称为“rehashing”。

源码中的扩容操作如下:
在Java 8的ConcurrentHashMap中,rehashing过程主要在transfer方法中实现,这个方法在ConcurrentHashMap需要扩容时被调用。以下是transfer方法的部分源码:

以下是对代码的注释和解释:

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    // 计算旧哈希表的长度和迁移的步长
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range
    if (nextTab == null) { // 初始化新哈希表
        try {
            @SuppressWarnings("unchecked")
            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
            nextTab = nt;
        } catch (Throwable ex) { // 处理内存溢出异常
            sizeCtl = Integer.MAX_VALUE;
            return;
        }
        nextTable = nextTab;
        transferIndex = n;
    }
    int nextn = nextTab.length;
    ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    boolean advance = true;
    boolean finishing = false; // 确保在提交nextTab之前扫描
    for (int i = 0, bound = 0;;) {
        Node<K,V> f; int fh;
        while (advance) {
            int nextIndex, nextBound;
            if (--i >= bound || finishing)
                advance = false;
            else if ((nextIndex = transferIndex) <= 0) {
                i = -1;
                advance = false;
            }
            else if (U.compareAndSetInt
                     (this, TRANSFERINDEX, nextIndex,
                      nextBound = (nextIndex > stride ?
                                   nextIndex - stride : 0))) {
                bound = nextBound;
                i = nextIndex - 1;
                advance = false;
            }
        }
        // 检查是否完成迁移
        if (i < 0 || i >= n || i + n >= nextn) {
            int sc;
            if (finishing) {
                nextTable = null;
                table = nextTab;
                sizeCtl = (n << 1) - (n >>> 1);
                return;
            }
            // 尝试将sizeCtl减1,以便其他线程可以继续迁移
            if (U.compareAndSetInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                    return;
                finishing = advance = true;
                i = n; // 在提交之前重新检查
            }
        }
        else if ((f = tabAt(tab, i)) == null)
            advance = casTabAt(tab, i, null, fwd);
        else if ((fh = f.hash) == MOVED)
            advance = true; // 已经处理过
        else { // 对节点进行同步操作
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    Node<K,V> ln, hn;
                    // 处理普通链表节点
                    if (fh >= 0) {
                        // ...
                        // ...
                    }
                    // 处理红黑树节点
                    else if (f instanceof TreeBin) {
                        // ...
                        // ...
                    }
                }
            }
        }
    }
}

在这段代码中,transfer方法主要负责将旧哈希表(tab)中的元素重新计算哈希值并放入新的哈希表(nextTab)。这个过程包括以下几个步骤:

  1. 初始化新的哈希表,大小为旧哈希表的两倍。
  2. 遍历旧哈希表中的元素,根据元素的哈希值和新哈希表的大小计算新的索引位置。
  3. 将元素插入新哈希表的相应位置。如果旧哈希表中的元素是链表节点,那么在新哈希表中也使用链表节点;如果旧哈希表中的元素是红黑树节点,那么在新哈希表中也使用红黑树节点。
  4. 将旧哈希表中的元素设置为ForwardingNode,表示已经迁移完成。

这段代码实现了ConcurrentHashMap中的rehashing过程,即在扩容时将旧哈希表中的元素重新计算哈希值并放入新的哈希表中。这个过程包括初始化新的哈希表、遍历旧哈希表中的元素、将元素插入新哈希表的相应位置、将旧哈希表中的元素设置为ForwardingNode等步骤。在Java 8的ConcurrentHashMap中,这个过程是并发进行的,多个线程可以同时迁移不同的元素,从而提高扩容操作的性能。

2.4 近似计数原理

ConcurrentHashMap提供了一些用于统计的方法,如size()、isEmpty()等。这些方法在ConcurrentHashMap的实现中,采用了一种近似计算的策略。由于ConcurrentHashMap是高并发的,要精确地计算元素个数会带来很大的性能开销。因此,ConcurrentHashMap允许这些统计方法返回一个近似值,从而在保持性能的同时,还能提供一定程度的准确性。

源码中的近似计数方法如下:

public int size() {
    // 调用sumCount()方法获取ConcurrentHashMap中元素的总数
    long n = sumCount();
    // 如果元素总数小于0,返回0
    // 如果元素总数大于Integer.MAX_VALUE,返回Integer.MAX_VALUE
    // 否则,返回元素总数的整数值
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

final long sumCount() {
    // 获取CounterCell数组,这个数组用于存储元素计数
    CounterCell[] cs = counterCells;
    // 获取基础计数器的值,这是一个基础计数,表示没有使用CounterCell的情况下的元素计数
    long sum = baseCount;
    // 如果CounterCell数组不为空,遍历数组
    if (cs != null) {
        for (CounterCell c : cs)
            // 如果当前CounterCell不为空,将其值累加到sum中
            if (c != null)
                sum += c.value;
    }
    // 返回元素总数
    return sum;
}

这两个方法是ConcurrentHashMap中获取当前映射中元素数量的方法。size()方法返回ConcurrentHashMap中的元素数量。sumCount()方法用于计算ConcurrentHashMap中的元素总数,这个方法首先获取baseCount的值,然后遍历counterCells数组,将数组中每个CounterCell的值累加到sum变量中。最后返回sum作为元素总数。

sumCount() 方法计算的元素总数是一个近似值。在高并发环境下,由于多个线程可能同时在 ConcurrentHashMap 中进行插入、删除和更新操作,计算出的元素总数可能与实际元素总数有一定的偏差。

ConcurrentHashMap 的设计目标是为了提供高并发性能,因此在计算元素总数时,并没有使用全局锁或其他同步机制来确保计数的精确性。相反,它使用了一种分散计数的策略(通过 CounterCell 数组和 baseCount),这种策略在高并发场景下可以减少锁竞争,从而提高性能。

因此,当我们使用 ConcurrentHashMap 的 size() 方法或 sumCount() 方法时,需要注意它们返回的值可能是一个近似值,而不是精确值。在大多数情况下,这种近似值已经足够满足需求,但在对精确计数有严格要求的场景下,需要考虑其他数据结构或同步策略。

三、总结

ConcurrentHashMap是Java中提供的一个高性能、线程安全的HashMap实现。它采用了分段锁、CAS操作、扩容机制、近似计数等技术,实现了高并发和线程安全。在需要处理并发访问的场景中,ConcurrentHashMap是一个非常实用的工具。

在实际应用中,我们需要根据具体的场景和需求来选择合适的数据结构。

  • 如果需要高并发访问和更新,那么ConcurrentHashMap是一个很好的选择。
  • 如果数据量较小或并发访问较低,简单的HashMap可能会更快。
  • 如果需要全局有序性,可以考虑使用同步的Map实现,或者使用锁和其他同步工具来协调并发操作。

推荐阅读

图解ConcurrentHashMap

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐