Map这种映射关系的集合,由于它继承了数组和双向链表的优势,适合增删改查,在平时的编程中使用的还是比较多的。所以有必要很清楚的了解map集合在使用时的一些坑。

        本文基于常用的jdk版本1.7和1.8来分析下常用的两种类型map集合:hashMap和ConcurrentHashMap。

一、HashMap

       在jdk1.7中,hashMap的主干是个Entry数组。Entry是hashMap基本组成单元,每个Entry包含一个key-value键值对。HashMap由数组+链表组成的,数组是HashMap的主体,链表则是主要为了解决哈希冲突而存在的,如果定位到的数组位置不含链表(当前entry的next指向null),那么查找,添加等操作很快,仅需一次寻址即可;如果定位到的数组包含链表,对于添加操作,其时间复杂度为O(n),首先遍历链表,存在即覆盖,否则新增;对于查找操作来讲,仍需遍历链表,然后通过key对象的equals方法逐一比对查找。所以,性能考虑,HashMap中的链表出现越少,性能才会越好。

        hashMap结构如下图:

up-8ce221595732b9dc802a2d850ac7226aa69.png

        在jdk1.7中,引入了链表作为存储同样hash值key的value,虽然解决了hash冲突问题,但是在当链表过长时,会影响查询value的效率,因为链表不适合查询,适合增加、删除操作。

        在jdk1.8中,对于value的存储结构引入了红黑树(选择红黑树,主要还是考虑查询的效率,因为红黑树的机构,不会受极端值的影响,如二叉树,在value值比较极端时,就相当于是个链表),当链表的的size大于8时,会用红黑树来代替链表存储。

up-90b28c3861178545342ffd9779fcb986c99.png

 二、ConcurrentHashMap

         ConcurrentHashMap是线程安全的,这个是都清楚的,怎么实现其线程安全的访问,在jdk1.7和jdk1.8中的实现却不一样,存储结构,类似hashMap,这里主要讨论jdk1.7和jdk1.8分别是怎么实现ConcurrentHashMap的线程安全。

2.1 jdk1.7中实现

        在jdk1.7中,concurrentHashMap的实现结构为:Segment(锁数组)+hashEntry(hash数组)+链表(hashEntry节点)。

        结构图如下:

         上图可以看出,Segments数组,是在原来的hashMap存储结构的基础上又多加了一层,每个segment类继承了ReentrantLock锁,管理hash数组的其中几个数组,这样可以实现并发线程的修改。每个segment类似一个hashMap的结构,可以内部扩容。segment默认是16个,也就是最大可以支持16个线程并发读写

2.1.1 主要方法

        get()方法

  • 第一次hash找到对应的segment对象,调用segment的get方法。
  • 再次hash找到对应的链表
  • 在链表中查询对应的值。

        set()方法

  • 首先确定segment段的位置,调用Segment的put方法。
  • 加锁
  • 检查当前Segment数组中包含的HashEntry节点的个数,如果超过阈值,就需要重新hash
  • 然后再次hash确定放的链表
  • 在对应的链表中查找是否有相同的节点,有则覆盖,没有则放到尾部。

        在set方法中会设计重新hash的方法,只对segment对象中的hashEntry数组进行重新hash

2.2 jdk1.8中实现

        在jdk1.8中,使用优化过的synchronized锁配合cas乐观锁,来实现concurrentHashMap的线程安全,concurrentHashMap的存储结构对标jdk1.8对hashMap的优化,使用hashEntry+链表/红黑树的方式,优化了链表过大时影响查询效率,删除了segment的实现。

volatile Node<K,V>[] table;   // Node数组用于存放链表或者树的头结点
static final int TREEIFY_THRESHOLD = 8;   // 链表转红黑树的阈值 > 8 时
static final int UNTREEIFY_THRESHOLD = 6;  // 红黑树转链表的阈值  <= 6 时
static final int TREEBIN   = -2;    // 树根节点的hash值
static final float LOAD_FACTOR = 0.75f;// 负载因子
static final int DEFAULT_CAPACITY = 16;   // 默认大小为16
//内部类 
class Node<K,V> implements Map.Entry<K,V> {
    int hash;       
   final K key;       
   volatile V val;
   volatile Node<K,V> next;
}

2.2.1 主要方法

        new()方法

ConcurrentHashMap map = new ConcurrentHashMap(16);

        跟进源码,可以看到new方法的源码实现如下,并没有创建一个ConcurrentHashMap对象,只是检查传入的hashEntry(jdk1.8中为node数组)大小,最终确定其大小。 

public ConcurrentHashMap(int initialCapacity) {
        if (initialCapacity < 0)
            throw new IllegalArgumentException();
        // 如果传入的数值>= 最大容量的一半,就使用最大容量,否则使用
       //1.5*initialCapacity +1 ,然后向上取最近的 2 的 n 次方数;
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
                   MAXIMUM_CAPACITY :
                   tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
        this.sizeCtl = cap;
    }

        put方法

  • 检查key或者value是否为null
  • 得到key的hash值
  • 如果node数组为空,则开始initTable()
  • 如果找的对应的下标位置为空,直接new一个node,并放入,break
  • 如果对应头节点不为空,进入同步代码块
    • 判断此头节点的hash值是否大于零,大于零则说明链表的头节点在链表中寻找。
    • 如果有相同的hash值并且key相同,直接覆盖,返回旧值,结束
    • 如果没有,直接放在链表尾部
    • 此头结点的Hash值小于零,则说明此节点是红黑二叉树的根节点
    • 调用树的添加元素方法
    • 判断数组是否需要转红黑树

        put方法源码

final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            //判断tab是否为null
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            //数组中当前位置是null就创建一个新的Node,通过CAS写入数组指定位置
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            //当前节点正在扩容
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            //指定位置不为null
            else {
                V oldVal = null;
            //加synchronized 锁,
            //关注这一块代码,当链表/红黑树不为空时,通过synchronized代码块将数据存储到链表/红黑树
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                       //链表
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                //尾插法
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                       //红黑树
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    //大于8转红黑树
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }
 

         get()方法

  • 首先获取key的hash值
  • 找到对应的数组下标元素
  • 如果该元素是要找的,直接返回
  • 如果是null,返回null
  • 如果key的值小于0,是红黑树

        源码注释如下

public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());   //获得Hash值
        if ((tab = table) != null && (n = tab.length) > 0 &&
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {  // 比较 此头结点e是否是我们需要的元素
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;   // 如果是,就返回
            }
            else if (eh < 0)   // 如果小于零,说明此节点是红黑树 
                return (p = e.find(h, key)) != null ? p.val : null;
            while ((e = e.next) != null) {
                // 开始循环 查找
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }

        tryPresize()扩容

        扩容后,node数组容量变为原来的2倍

private final void tryPresize(int size) {
        int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
            tableSizeFor(size + (size >>> 1) + 1);
        int sc;
        while ((sc = sizeCtl) >= 0) {
            Node<K,V>[] tab = table; int n;
            if (tab == null || (n = tab.length) == 0) {
                n = (sc > c) ? sc : c;
                if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                    try {
                        if (table == tab) {
                            @SuppressWarnings("unchecked")
                            Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                            table = nt;
                            sc = n - (n >>> 2);
                        }
                    } finally {
                        sizeCtl = sc;
                    }
                }
            }
            else if (c <= sc || n >= MAXIMUM_CAPACITY)
                break;
            else if (tab == table) {
                int rs = resizeStamp(n);
                if (sc < 0) {
                    Node<K,V>[] nt;
                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                        sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
                        transferIndex <= 0)
                        break;
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);
                }
                else if (U.compareAndSwapInt(this, SIZECTL, sc,
                                             (rs << RESIZE_STAMP_SHIFT) + 2))
                    transfer(tab, null);
            }
        }
    }

        这里多说下Node的数据结构,在jdk1.8中,已经将hashEntry直接存储key的hash值,修改为Node对象,Node类实现了Map.Entry接口,对value和next属性设置了volatile锁。

2.3 jdk1.7和jdk.18实现线程安全方式比较

        jdk1.7:对整个hashEntry数组进行了分割,分割成多个Segement,对每个Segement设置分段锁(ReentrantLock),实现属于不同段的并发线程并发修改存储数据

        jdk1.8:只要put方法会加锁,get方法没有锁,通过阅读put方法的源码,当数组中当前位置为null,使用CAS新建Node对象,写入数组对应的位置,当数组中指定位置不为null时,通过synchronized锁的代码块来讲Node节点添加入数组(链表<8)或者红黑树(链表>=8)

三、HashMap的死锁问题

        多说点,在jdk1.7中,hashMap会出现死锁问题,面试的时候,会问到原因,这里就多说一嘴,主要还是因为hashMap在扩容完成后,重新hash,然后数据迁移时,并发线程会造成链表的死循环,及A.next=B,B.next=A这种场景,只要一调用get()方法,就会发生死锁。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐