Java多线程——ConcurrentMap、ConcurrentHashMap
ConcurrentMap是Map的子接口,是高并发下线程安全的Map集合。public interface ConcurrentMap<K, V> extends Map<K, V> {//如果map中已有指定key,返回key对应的value,否则将指定的key-value放入map中V putIfAbsent(K key, V value)...
ConcurrentMap是Map的子接口,是高并发下线程安全的Map集合。
public interface ConcurrentMap<K, V> extends Map<K, V> {
//如果map中已有指定key,返回key对应的value,否则将指定的key-value放入map中
V putIfAbsent(K key, V value);
//只有map中指定的key映射指定的value,才移除该键的条目
boolean remove(Object key, Object value);
//只有map中指定的key映射指定的value,才替换该键映射
boolean replace(K key, V oldValue, V newValue);
//只有map中包含指定的key,才将key映射为value值
V replace(K key, V value);
}
ConcurrentMap有两个实现类:ConcurrentHashMap 和 ConcurrentSkipListMap
- ConcurrentHashMap底层是通过hash实现,效率高、线程安全,采取了“锁分段”技术来细化锁的粒度:把整个map划分为一系列被成为segment的组成单元,一个segment相当于一个小的hashtable。(JDK1.8中对它做了优化,采用系统底层技术实现线程安全,Segment不再用于线程安全的保证)
- ConcurrentSkipListMap的底层是通过跳表来实现的。跳表是一个链表,但是通过使用“跳跃式”查找的方式使得插入、读取数据时复杂度变成了O(logn)。
这里主要介绍一下ConcurrentHashMap(JDK1.7版本):
ConcurrentHashMap :
ConcurrentHashMap底层是基于哈希实现的同步Map。相较于HashMap,它是线程安全的,适用于高并发;相较于HashTable,它是高效的。
构造方法:
//创建一个带有指定初始容量、加载因子和并发级别的新的空映射。
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS;
// //2的sshif次方等于ssize。ssize为segments数组的长度
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
//segmentShift 、segmentMask 用于定位segment
this.segmentShift = 32 - sshift;
this.segmentMask = ssize - 1;
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
//计算cap,cap是segment中HashEntry的数组长度,且cap为2的整数次幂
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = MIN_SEGMENT_TABLE_CAPACITY;
while (cap < c)
cap <<= 1;
// 创建segment数组,并初始化segment[0]
Segment<K,V> s0 =
new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
(HashEntry<K,V>[])new HashEntry[cap]);
Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
this.segments = ss;
}
//创建一个带有指定初始容量、加载因子和默认 concurrencyLevel (16) 的新的空映射
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
//创建一个带有指定初始容量、默认加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
//带有默认初始容量 (16)、加载因子 (0.75) 和 concurrencyLevel (16) 的新的空映射
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
// 构造一个与给定映射具有相同映射关系的新映射
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
ConcurrentHashMap中还定义了一些常量:
//默认初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认负载因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认并发级别,在构造函数中未指定时使用。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//最大容量,必须是2的幂<= 1<<30,以确保条目可以使用int索引。
static final int MAXIMUM_CAPACITY = 1 << 30;
//每个段表的最小容量。必须为2的幂,至少为2
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;
//允许的最大段数;用于绑定构造函数参数,2的16次幂
static final int MAX_SEGMENTS = 1 << 16;
//锁定之前重试次数,这是为了避免在表进行连续修改时进行无界重试,因为修改会导致无法获得准确的结果。
static final int RETRIES_BEFORE_LOCK = 2;
从ConcurrentHashMap的构造函数中可以发现两个类:Segment 和 HashEntry。Segment继承了ReentrantLock,它属于可重入锁。Segment类似于HashMap,一个Segment中维护了一个HashEntry数组,并发环境下,对于不同的Segment的数据不需要考虑锁的竞争,以默认的ConcurrentLevel = 16为例,理论上允许16个线程并发执行;HashEntry是ConcurrentHashMap中的最小的逻辑处理单元。
ConcurrentHashMap线程安全并且提高性能原因就在于:对map中的读是并发的,无需加锁;只有在put、remove操作时才加锁,而加锁仅是对需要操作的segment加锁,不会影响其他segment的读写,由此,不同的segment之间可以并发使用,极大地提高了性能。
static final class Segment<K,V> extends ReentrantLock implements Serializable {
static final int MAX_SCAN_RETRIES =
Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
//segment中的hash表,与hashMap结构相同,表中每个元素都是一个链表。
transient volatile HashEntry<K,V>[] table;
//元素个数
transient int count;
//此段中突变操作的总数。
transient int modCount;
//当表的大小超过此阈值时,将rehash。
transient int threshold;
//哈希表的加载因子
final float loadFactor;
Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
this.loadFactor = lf;
this.threshold = threshold;
this.table = tab;
}
final V put(K key, int hash, V value, boolean onlyIfAbsent){......}
final V remove(Object key, int hash, Object value){.......}
final boolean replace(K key, int hash, V oldValue, V newValue){......}
final V replace(K key, int hash, V value){......}
final void clear(){......}
//将表的大小加倍并重新打包条目,还将给定的节点添加到新表中
private void rehash(HashEntry<K,V> node){......}
//扫描包含给定key的节点,同时尝试获取锁,没有找到,则创建并返回一个节点。返回时,确保锁被持有。
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {......}
//扫描包含给定key的节点,同时尝试获取用于删除或替换操作的锁。返回时,确保锁被持有.
private void scanAndLock(Object key, int hash){......}
}
segment是整个ConcurrentHashMap线程安全操作的核心,关于修改操作的线程安全已经被封装在segment中。想要详细了解Segment源码的同学可以参考文章:https://blog.csdn.net/zhoushimiao1990/article/details/89552440
相比之下HashEntry的结构要简单的多:
static final class HashEntry<K,V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K,V> next;
HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
this.hash = hash;
this.key = key;
this.value = value;
this.next = next;
}
final void setNext(HashEntry<K,V> n) {
UNSAFE.putOrderedObject(this, nextOffset, n);
}
static final sun.misc.Unsafe UNSAFE;
static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class k = HashEntry.class;
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
HashEntry相当于链表中的一个存储单元,记录当前节点,并指向下一个存储单元。
ConcurrentHashMap常用方法:
- void clear() 从该映射中移除所有映射关系
- V putIfAbsent(K key, V value) 如果map中已有指定key,返回key对应的value,否则将指定的key-value放入map中。
- V put(K key, V value) 将指定键映射到此表中的指定值。
- V get(Object key) 返回指定键所映射到的值,如果此映射不包含该键的映射关系,则返回 null。
- V remove(Object key) 从此映射中移除键(及其相应的值)。
- boolean remove(Object key, Object value) 只有map中指定的key映射指定的value,才移除该键的条目
- V replace(K key, V value) 只有map中包含指定的key,才将key映射为value值
- boolean replace(K key, V oldValue, V newValue) 只有map中指定的key映射指定的value,才替换该键映射
- boolean contains(Object value) 等同于containsValue(Object value)
- boolean containsKey(Object key) 测试指定对象是否为此表中的键。
- boolean containsValue(Object value) 如果此映射将一个或多个键映射到指定值,则返回 true。
- int size() 返回此映射中的键-值映射关系数。
- Enumeration<V> elements() 返回此表中值的枚举。
- Set<Map.Entry<K,V>> entrySet() 返回此映射所包含的映射关系的 Set 视图。
- boolean isEmpty() 如果此映射不包含键-值映射关系,则返回 true。
- Enumeration<K> keys() 返回此表中键的枚举。
- Set<K> keySet() 返回此映射中包含的键的 Set 视图。
- void putAll(Map<? extends K,? extends V> m) 将指定映射中所有映射关系复制到此映射中。
- Collection<V> values() 返回此映射中包含的值的 Collection 视图。
这里简单了解一下几个常用方法的源码:
put:
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
int hash = hash(key);
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false); //调用Segment中的put方法
}
remove:
public V remove(Object key) {
int hash = hash(key);
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.remove(key, hash, null);
}
replace:
public V replace(K key, V value) {
int hash = hash(key);
if (value == null)
throw new NullPointerException();
Segment<K,V> s = segmentForHash(hash);
return s == null ? null : s.replace(key, hash, value);
}
从上面三个方法的源码可以看出,ConcurrentHashMap对链表中元素的增、删、改操作,都是在Segment内部类中完成。上面也介绍了Segment是一个可重入锁,内部对元素的修改操作都保证了线程安全性。所以在ConcurrentHashMap中直接调用Segment的修改元素的方法,实现线程安全。
ConcurrentHashMap对于读取元素的操作get方法,没有加锁,因为涉及到的共享变量都使用volatile修饰,volatile可以保证内存可见性,所以不会读取到过期数据。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)