ConcurrentMap是Map的子接口,是高并发下线程安全的Map集合。

public interface ConcurrentMap<K, V> extends Map<K, V> {

    //如果map中已有指定key,返回key对应的value,否则将指定的key-value放入map中
    V putIfAbsent(K key, V value);

    //只有map中指定的key映射指定的value,才移除该键的条目
    boolean remove(Object key, Object value);

    //只有map中指定的key映射指定的value,才替换该键映射
    boolean replace(K key, V oldValue, V newValue);

    //只有map中包含指定的key,才将key映射为value值
    V replace(K key, V value);
}

ConcurrentMap有两个实现类:ConcurrentHashMap 和 ConcurrentSkipListMap

  • ConcurrentHashMap底层是通过hash实现,效率高、线程安全,采取了“锁分段”技术来细化锁的粒度:把整个map划分为一系列被成为segment的组成单元,一个segment相当于一个小的hashtable。(JDK1.8中对它做了优化,采用系统底层技术实现线程安全,Segment不再用于线程安全的保证)
  • ConcurrentSkipListMap的底层是通过跳表来实现的。跳表是一个链表,但是通过使用“跳跃式”查找的方式使得插入、读取数据时复杂度变成了O(logn)。

这里主要介绍一下ConcurrentHashMap(JDK1.7版本):

ConcurrentHashMap :

ConcurrentHashMap底层是基于哈希实现的同步Map。相较于HashMap,它是线程安全的,适用于高并发;相较于HashTable,它是高效的。

构造方法:

    //创建一个带有指定初始容量、加载因子和并发级别的新的空映射。
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // //2的sshif次方等于ssize。ssize为segments数组的长度
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }

        //segmentShift 、segmentMask 用于定位segment
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;

        //计算cap,cap是segment中HashEntry的数组长度,且cap为2的整数次幂
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;

        // 创建segment数组,并初始化segment[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
        this.segments = ss;
    }

    //创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (16) 的新的空映射
    public ConcurrentHashMap(int initialCapacity, float loadFactor) {
        this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
    }

    //创建一个带有指定初始容量、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
    public ConcurrentHashMap(int initialCapacity) {
        this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    //带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
    public ConcurrentHashMap() {
        this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    }

    // 构造一个与给定映射具有相同映射关系的新映射
    public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
        this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
                      DEFAULT_INITIAL_CAPACITY),
             DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
        putAll(m);
    }

ConcurrentHashMap中还定义了一些常量:

	//默认初始容量
	static final int DEFAULT_INITIAL_CAPACITY = 16;

    //默认负载因子
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    //默认并发级别,在构造函数中未指定时使用。
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    //最大容量,必须是2的幂<= 1<<30,以确保条目可以使用int索引。
    static final int MAXIMUM_CAPACITY = 1 << 30;

    //每个段表的最小容量。必须为2的幂,至少为2
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    //允许的最大段数;用于绑定构造函数参数,2的16次幂
    static final int MAX_SEGMENTS = 1 << 16; 

    //锁定之前重试次数,这是为了避免在表进行连续修改时进行无界重试,因为修改会导致无法获得准确的结果。
    static final int RETRIES_BEFORE_LOCK = 2;

从ConcurrentHashMap的构造函数中可以发现两个类:Segment 和 HashEntry。Segment继承了ReentrantLock,它属于可重入锁。Segment类似于HashMap,一个Segment中维护了一个HashEntry数组,并发环境下,对于不同的Segment的数据不需要考虑锁的竞争,以默认的ConcurrentLevel = 16为例,理论上允许16个线程并发执行;HashEntry是ConcurrentHashMap中的最小的逻辑处理单元。

ConcurrentHashMap线程安全并且提高性能原因就在于:对map中的读是并发的,无需加锁;只有在put、remove操作时才加锁,而加锁仅是对需要操作的segment加锁,不会影响其他segment的读写,由此,不同的segment之间可以并发使用,极大地提高了性能。

  
     static final class Segment<K,V> extends ReentrantLock implements Serializable {
        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
        
        //segment中的hash表,与hashMap结构相同,表中每个元素都是一个链表。
        transient volatile HashEntry<K,V>[] table;

        //元素个数
        transient int count;

        //此段中突变操作的总数。
        transient int modCount;
		
        //当表的大小超过此阈值时,将rehash。
        transient int threshold;

        //哈希表的加载因子
        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        }

        final V put(K key, int hash, V value, boolean onlyIfAbsent){......}

        final V remove(Object key, int hash, Object value){.......}

        final boolean replace(K key, int hash, V oldValue, V newValue){......}

        final V replace(K key, int hash, V value){......}

        final void clear(){......}

        //将表的大小加倍并重新打包条目,还将给定的节点添加到新表中
        private void rehash(HashEntry<K,V> node){......}
        
        //扫描包含给定key的节点,同时尝试获取锁,没有找到,则创建并返回一个节点。返回时,确保锁被持有。
        private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {......}

        //扫描包含给定key的节点,同时尝试获取用于删除或替换操作的锁。返回时,确保锁被持有.
        private void scanAndLock(Object key, int hash){......}

    }

segment是整个ConcurrentHashMap线程安全操作的核心,关于修改操作的线程安全已经被封装在segment中。想要详细了解Segment源码的同学可以参考文章:https://blog.csdn.net/zhoushimiao1990/article/details/89552440

相比之下HashEntry的结构要简单的多:

    
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;

        HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
            this.hash = hash;
            this.key = key;
            this.value = value;
            this.next = next;
        }

        final void setNext(HashEntry<K,V> n) {
            UNSAFE.putOrderedObject(this, nextOffset, n);
        }

        static final sun.misc.Unsafe UNSAFE;
        static final long nextOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class k = HashEntry.class;
                nextOffset = UNSAFE.objectFieldOffset
                    (k.getDeclaredField("next"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

HashEntry相当于链表中的一个存储单元,记录当前节点,并指向下一个存储单元。

ConcurrentHashMap常用方法:

  •  void clear()   从该映射中移除所有映射关系 
  •  V putIfAbsent(K key, V value)   如果map中已有指定key,返回key对应的value,否则将指定的key-value放入map中。
  •  V put(K key, V value)   将指定键映射到此表中的指定值。 
  •  V get(Object key)    返回指定键所映射到的值,如果此映射不包含该键的映射关系,则返回 null。  
  •  V remove(Object key)    从此映射中移除键(及其相应的值)。 
  •  boolean remove(Object key, Object value)    只有map中指定的key映射指定的value,才移除该键的条目
  •  V replace(K key, V value)    只有map中包含指定的key,才将key映射为value值
  •  boolean replace(K key, V oldValue, V newValue)    只有map中指定的key映射指定的value,才替换该键映射
  •  boolean contains(Object value)   等同于containsValue(Object value)
  •  boolean containsKey(Object key)   测试指定对象是否为此表中的键。 
  •  boolean containsValue(Object value)    如果此映射将一个或多个键映射到指定值,则返回 true。 
  •  int size()    返回此映射中的键-值映射关系数。 
  •  Enumeration<V> elements()   返回此表中值的枚举。 
  •  Set<Map.Entry<K,V>> entrySet()   返回此映射所包含的映射关系的 Set 视图。 
  •  boolean isEmpty()   如果此映射不包含键-值映射关系,则返回 true。 
  •  Enumeration<K> keys()    返回此表中键的枚举。 
  •  Set<K> keySet()   返回此映射中包含的键的 Set 视图。 
  •  void putAll(Map<? extends K,? extends V> m)    将指定映射中所有映射关系复制到此映射中。 
  •  Collection<V> values()    返回此映射中包含的值的 Collection 视图。 

这里简单了解一下几个常用方法的源码:

put:

    public V put(K key, V value) {
        Segment<K,V> s;
        if (value == null)
            throw new NullPointerException();
        int hash = hash(key);
        int j = (hash >>> segmentShift) & segmentMask;
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
            s = ensureSegment(j);
        return s.put(key, hash, value, false); //调用Segment中的put方法
    }

remove:

    public V remove(Object key) {
        int hash = hash(key);
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.remove(key, hash, null);
    }

replace:

    public V replace(K key, V value) {
        int hash = hash(key);
        if (value == null)
            throw new NullPointerException();
        Segment<K,V> s = segmentForHash(hash);
        return s == null ? null : s.replace(key, hash, value);
    }

从上面三个方法的源码可以看出,ConcurrentHashMap对链表中元素的增、删、改操作,都是在Segment内部类中完成。上面也介绍了Segment是一个可重入锁,内部对元素的修改操作都保证了线程安全性。所以在ConcurrentHashMap中直接调用Segment的修改元素的方法,实现线程安全。

ConcurrentHashMap对于读取元素的操作get方法,没有加锁,因为涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐