folly 开源库源码学习:concurrency / ConcurrentHashMap.h
ConcurrentHashMap.h1、介绍高性能并发哈希map大部分读操作无等待写操作共享(细粒度锁)多线程性能仅次于无锁原子map(AtomicHashMap等),除非事先知道map大小且不需要erase,否则应选ConcurrentHashMap基于java 的Java's ConcurrentHashMap,性能上接近std::unodered_map2...
·
1、介绍
- 高性能并发哈希map
- 大部分读操作无等待
- 写操作共享(细粒度锁)
- 多线程性能仅次于无锁原子map(AtomicHashMap等),除非事先知道map大小且不需要erase,否则应选ConcurrentHashMap
- 基于java 的Java's ConcurrentHashMap,性能上接近 std::unodered_map
2、构造函数
explicit ConcurrentHashMap(size_t size = 8, size_t max_size = 0) {
size_ = folly::nextPowTwo(size); // 获取比size大的首个2的幂
if (max_size != 0) {
max_size_ = folly::nextPowTwo(max_size);
}
CHECK(max_size_ == 0 || max_size_ >= size_);
for (uint64_t i = 0; i < NumShards; i++) {
segments_[i].store(nullptr, std::memory_order_relaxed); // segments_是std::atomic 类型数组,详见cpp文档;store 用于原子的用nullptr替换数组元素值,memory_order_relaxed表示无其他约束。
}
}
ConcurrentHashMap(ConcurrentHashMap&& o) noexcept // 这里入参是右值引用
: size_(o.size_), max_size_(o.max_size_) {
for (uint64_t i = 0; i < NumShards; i++) {
segments_[i].store(
o.segments_[i].load(std::memory_order_relaxed), // 原子的获取入参map下标i的值
std::memory_order_relaxed);
o.segments_[i].store(nullptr, std::memory_order_relaxed); // 原子的设置入参map下标i的值为null
}
cohort_.store(o.cohort(), std::memory_order_relaxed);
o.cohort_.store(nullptr, std::memory_order_relaxed);
}
3、赋值函数
ConcurrentHashMap& operator=(ConcurrentHashMap&& o) {
for (uint64_t i = 0; i < NumShards; i++) {
auto seg = segments_[i].load(std::memory_order_relaxed);
if (seg) { // 第i个槽非空,需要析构并释放空间
seg->~SegmentT();
Allocator().deallocate((uint8_t*)seg, sizeof(SegmentT));
}
segments_[i].store( // 原子的将入参map的内容填进来,并清空入参map的内容
o.segments_[i].load(std::memory_order_relaxed),
std::memory_order_relaxed);
o.segments_[i].store(nullptr, std::memory_order_relaxed);
}
size_ = o.size_;
max_size_ = o.max_size_;
cohort_shutdown_cleanup();
cohort_.store(o.cohort(), std::memory_order_relaxed);
o.cohort_.store(nullptr, std::memory_order_relaxed);
return *this;
}
4、析构函数
~ConcurrentHashMap() {
for (uint64_t i = 0; i < NumShards; i++) {
auto seg = segments_[i].load(std::memory_order_relaxed);
if (seg) { // 析构并释放空间
seg->~SegmentT();
Allocator().deallocate((uint8_t*)seg, sizeof(SegmentT));
}
}
cohort_shutdown_cleanup();
}
5、查找元素所在的槽
ConstIterator find(const KeyType& k) const {
auto segment = pickSegment(k); // 查找元素k所在的槽
ConstIterator res(this, segment);
auto seg = segments_[segment].load(std::memory_order_acquire);
if (!seg || !seg->find(res.it_, k)) { // 在槽中寻找元素
res.segment_ = NumShards;
}
return res;
}
……
uint64_t pickSegment(const KeyType& k) const {
auto h = HashFn()(k);
return h & (NumShards - 1);
}
6、插入元素
std::pair<ConstIterator, bool> insert(
std::pair<key_type, mapped_type>&& foo) {
auto segment = pickSegment(foo.first); // 获取key对应的槽位置
std::pair<ConstIterator, bool> res( // 初始化返回值
std::piecewise_construct, // piecewise_construct 是空结构,为逐段构造选择正确函数重载的标签类型
std::forward_as_tuple(this, segment), // 构造tuple,初始化res的迭代器
std::forward_as_tuple(false)); // 构造tuple,初始化res的bool值
res.second = ensureSegment(segment)->insert(res.first.it_, std::move(foo)); // ensuresegment保证第i个槽的初始化;
return res;
}
template <typename Key, typename Value>
std::pair<ConstIterator, bool> insert(Key&& k, Value&& v) {
auto segment = pickSegment(k);
std::pair<ConstIterator, bool> res(
std::piecewise_construct,
std::forward_as_tuple(this, segment),
std::forward_as_tuple(false));
res.second = ensureSegment(segment)->insert(
res.first.it_, std::forward<Key>(k), std::forward<Value>(v));
return res;
}
首先要保证元素要插入的槽已分配:
SegmentT* ensureSegment(uint64_t i) const {
SegmentT* seg = segments_[i].load(std::memory_order_acquire);
if (!seg) { // 槽是空的,要分配空间
auto b = ensureCohort();
SegmentT* newseg = (SegmentT*)Allocator().allocate(sizeof(SegmentT)); // 分配空间
newseg = new (newseg) // placement new
SegmentT(size_ >> ShardBits, load_factor_, max_size_ >> ShardBits, b);
if (!segments_[i].compare_exchange_strong(seg, newseg)) { // CAS 原子操作
// seg 已被更新,释放这里的seg
newseg->~SegmentT();
Allocator().deallocate((uint8_t*)newseg, sizeof(SegmentT));
} else {
seg = newseg;
}
}
return seg;
}
先创建好新建的节点,尝试插入:
template <typename Key, typename Value>
bool insert(Iterator& it, Key&& k, Value&& v) {
auto node = (Node*)Allocator().allocate(sizeof(Node)); // 分配节点
new (node) Node(cohort_, std::forward<Key>(k), std::forward<Value>(v));
auto res = insert_internal( //
it,
node->getItem().first,
InsertType::DOES_NOT_EXIST,
[](const ValueType&) { return false; },
node);
if (!res) { // 失败释放已创建节点
node->~Node();
Allocator().deallocate((uint8_t*)node, sizeof(Node));
}
return res;
}
只有当元素不存在时才插入新的key:
template <typename MatchFunc, typename... Args>
bool insert_internal(
Iterator& it,
const KeyType& k,
InsertType type, // 插入类型,上面是InsertType::DOES_NOT_EXIST
MatchFunc match,
Node* cur) {
return impl_.insert(it, k, type, match, cur, cohort_);
}
先预插入,判断空间是否足够,是否要rehash:
template <typename MatchFunc, typename... Args>
bool prepare_insert(
Iterator& it,
const KeyType& k,
InsertType type,
MatchFunc match,
hazptr_obj_cohort<Atom>* cohort,
size_t& chunk_idx,
size_t& tag_idx,
Node*& node,
Chunks*& chunks,
size_t& ccount,
const HashPair& hp) {
ccount = chunk_count_.load(std::memory_order_relaxed);
chunks = chunks_.load(std::memory_order_relaxed);
if (size() >= grow_threshold_ && type == InsertType::DOES_NOT_EXIST) {
if (max_size_ && size() << 1 > max_size_) {
// 插入此元素会超出空间限制
throw std::bad_alloc();
}
rehash_internal(ccount << 1, cohort); // 重新分配空间并rehash
ccount = chunk_count_.load(std::memory_order_relaxed);
chunks = chunks_.load(std::memory_order_relaxed);
}
node = find_internal(k, hp, chunks, ccount, chunk_idx, tag_idx); // 找到插入的key对应的节点
it.hazptrs_[0].reset(chunks);
if (node) { // 节点已存在
it.hazptrs_[1].reset(node);
it.setNode(node, chunks, ccount, chunk_idx, tag_idx);
if (type == InsertType::MATCH) { // 请求是做匹配的,不匹配则false
if (!match(node->getItem().second)) {
return false;
}
} else if (type == InsertType::DOES_NOT_EXIST) { // 请求是不存在才插入,存在则false
return false;
}
} else {
if (type != InsertType::DOES_NOT_EXIST && type != InsertType::ANY) {
it.hazptrs_[0].reset();
return false;
}
// Already checked for rehash on DOES_NOT_EXIST, now check on ANY
if (size() >= grow_threshold_ && type == InsertType::ANY) {
if (max_size_ && size() << 1 > max_size_) {
// 超出空间限制
throw std::bad_alloc();
}
rehash_internal(ccount << 1, cohort); // rehash
ccount = chunk_count_.load(std::memory_order_relaxed);
chunks = chunks_.load(std::memory_order_relaxed);
it.hazptrs_[0].reset(chunks);
}
}
return true;
}
空间不足时,重新分配两倍空间:
void rehash_internal(
size_t new_chunk_count,
hazptr_obj_cohort<Atom>* cohort) {
DCHECK(isPowTwo(new_chunk_count)); // 新的空间大小必须是2的幂次
auto old_chunk_count = chunk_count_.load(std::memory_order_relaxed);
if (old_chunk_count >= new_chunk_count) {
return;
}
auto new_chunks = Chunks::create(new_chunk_count, cohort);
auto old_chunks = chunks_.load(std::memory_order_relaxed);
grow_threshold_ = new_chunk_count * Chunk::kCapacity * load_factor_; // 元素上限是chunk 数量 * 每个chunk容量 * 装载因子
for (size_t i = 0; i < old_chunk_count; i++) {
Chunk* oldchunk = old_chunks->getChunk(i, old_chunk_count);
auto occupied = oldchunk->occupiedIter();
while (occupied.hasNext()) {
auto idx = occupied.next();
Node* node = oldchunk->item(idx).load(std::memory_order_relaxed);
size_t new_chunk_idx;
size_t new_tag_idx;
auto h = HashFn()(node->getItem().first);
auto hp = splitHash(h);
std::tie(new_chunk_idx, new_tag_idx) =
findEmptyInsertLocation(new_chunks, new_chunk_count, hp); // 重新计算节点hash值并找到新的chunk位置
Chunk* newchunk = new_chunks->getChunk(new_chunk_idx, new_chunk_count);
newchunk->setNodeAndTag(new_tag_idx, node, hp.second);
}
}
seqlock_.fetch_add(1, std::memory_order_release); // 修改序列号自增1
chunk_count_.store(new_chunk_count, std::memory_order_release); // 更新chunk 数量和chunk
chunks_.store(new_chunks, std::memory_order_release);
seqlock_.fetch_add(1, std::memory_order_release);
if (old_chunks) {
old_chunks->retire(HazptrTableDeleter(old_chunk_count));
}
}
插入元素(加锁):
template <typename MatchFunc, typename... Args>
bool insert(
Iterator& it,
const KeyType& k,
InsertType type,
MatchFunc match,
Node* cur,
hazptr_obj_cohort<Atom>* cohort) {
DCHECK(cur != nullptr);
Node* node;
Chunks* chunks;
size_t ccount, chunk_idx, tag_idx;
auto h = HashFn()(k);
auto hp = splitHash(h);
std::unique_lock<Mutex> g(m_); //加锁
if (!prepare_insert( // 先预插入,判断是否要resize
it,
k,
type,
match,
cohort,
chunk_idx,
tag_idx,
node,
chunks,
ccount,
hp)) {
return false;
}
if (!node) { // 节点不存在,找到位置并插入,增加size
std::tie(chunk_idx, tag_idx) =
findEmptyInsertLocation(chunks, ccount, hp);
it.setNode(cur, chunks, ccount, chunk_idx, tag_idx);
incSize();
}
Chunk* chunk = chunks->getChunk(chunk_idx, ccount);
chunk->setNodeAndTag(tag_idx, cur, hp.second);
g.unlock();
// Retire not under lock
if (node) {
node->retire();
}
return true;
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献1条内容
所有评论(0)