ConcurrentHashMap.h

1、介绍

  • 高性能并发哈希map
  • 大部分读操作无等待
  • 写操作共享(细粒度锁)
  • 多线程性能仅次于无锁原子map(AtomicHashMap等),除非事先知道map大小且不需要erase,否则应选ConcurrentHashMap
  • 基于java 的Java's ConcurrentHashMap,性能上接近 std::unodered_map

 

2、构造函数

  explicit ConcurrentHashMap(size_t size = 8, size_t max_size = 0) {
    size_ = folly::nextPowTwo(size); // 获取比size大的首个2的幂
    if (max_size != 0) {
      max_size_ = folly::nextPowTwo(max_size);
    }
    CHECK(max_size_ == 0 || max_size_ >= size_);
    for (uint64_t i = 0; i < NumShards; i++) {
      segments_[i].store(nullptr, std::memory_order_relaxed); // segments_是std::atomic 类型数组,详见cpp文档;store 用于原子的用nullptr替换数组元素值,memory_order_relaxed表示无其他约束。
    }
  }
  ConcurrentHashMap(ConcurrentHashMap&& o) noexcept // 这里入参是右值引用
      : size_(o.size_), max_size_(o.max_size_) {
    for (uint64_t i = 0; i < NumShards; i++) {
      segments_[i].store(
          o.segments_[i].load(std::memory_order_relaxed), // 原子的获取入参map下标i的值
          std::memory_order_relaxed);
      o.segments_[i].store(nullptr, std::memory_order_relaxed); // 原子的设置入参map下标i的值为null
    }
    cohort_.store(o.cohort(), std::memory_order_relaxed);
    o.cohort_.store(nullptr, std::memory_order_relaxed);
  }

 

3、赋值函数

  ConcurrentHashMap& operator=(ConcurrentHashMap&& o) {
    for (uint64_t i = 0; i < NumShards; i++) {
      auto seg = segments_[i].load(std::memory_order_relaxed);
      if (seg) { // 第i个槽非空,需要析构并释放空间
        seg->~SegmentT();
        Allocator().deallocate((uint8_t*)seg, sizeof(SegmentT));
      }
      segments_[i].store( // 原子的将入参map的内容填进来,并清空入参map的内容
          o.segments_[i].load(std::memory_order_relaxed),
          std::memory_order_relaxed);
      o.segments_[i].store(nullptr, std::memory_order_relaxed);
    }
    size_ = o.size_;
    max_size_ = o.max_size_;
    cohort_shutdown_cleanup();
    cohort_.store(o.cohort(), std::memory_order_relaxed);
    o.cohort_.store(nullptr, std::memory_order_relaxed);
    return *this;
  }

 

4、析构函数

  ~ConcurrentHashMap() {
    for (uint64_t i = 0; i < NumShards; i++) {
      auto seg = segments_[i].load(std::memory_order_relaxed);
      if (seg) { // 析构并释放空间
        seg->~SegmentT();
        Allocator().deallocate((uint8_t*)seg, sizeof(SegmentT));
      }
    }
    cohort_shutdown_cleanup();
  }

 

5、查找元素所在的槽

  ConstIterator find(const KeyType& k) const {
    auto segment = pickSegment(k); // 查找元素k所在的槽
    ConstIterator res(this, segment);
    auto seg = segments_[segment].load(std::memory_order_acquire);
    if (!seg || !seg->find(res.it_, k)) { // 在槽中寻找元素
      res.segment_ = NumShards;
    }
    return res;
  }

……
  uint64_t pickSegment(const KeyType& k) const {
    auto h = HashFn()(k);
    return h & (NumShards - 1);
  }

 

6、插入元素

  std::pair<ConstIterator, bool> insert(
      std::pair<key_type, mapped_type>&& foo) {
    auto segment = pickSegment(foo.first); // 获取key对应的槽位置
    std::pair<ConstIterator, bool> res( // 初始化返回值
        std::piecewise_construct, // piecewise_construct 是空结构,为逐段构造选择正确函数重载的标签类型
        std::forward_as_tuple(this, segment), // 构造tuple,初始化res的迭代器
        std::forward_as_tuple(false)); // 构造tuple,初始化res的bool值
    res.second = ensureSegment(segment)->insert(res.first.it_, std::move(foo)); // ensuresegment保证第i个槽的初始化;
    return res;
  }

  template <typename Key, typename Value>
  std::pair<ConstIterator, bool> insert(Key&& k, Value&& v) {
    auto segment = pickSegment(k);
    std::pair<ConstIterator, bool> res(
        std::piecewise_construct,
        std::forward_as_tuple(this, segment),
        std::forward_as_tuple(false));
    res.second = ensureSegment(segment)->insert(
        res.first.it_, std::forward<Key>(k), std::forward<Value>(v));
    return res;
  }

首先要保证元素要插入的槽已分配:

  SegmentT* ensureSegment(uint64_t i) const {
    SegmentT* seg = segments_[i].load(std::memory_order_acquire);
    if (!seg) { // 槽是空的,要分配空间
      auto b = ensureCohort();
      SegmentT* newseg = (SegmentT*)Allocator().allocate(sizeof(SegmentT)); // 分配空间
      newseg = new (newseg) // placement new
          SegmentT(size_ >> ShardBits, load_factor_, max_size_ >> ShardBits, b);
      if (!segments_[i].compare_exchange_strong(seg, newseg)) { // CAS 原子操作
        // seg 已被更新,释放这里的seg
        newseg->~SegmentT();
        Allocator().deallocate((uint8_t*)newseg, sizeof(SegmentT));
      } else {
        seg = newseg;
      }
    }
    return seg;
  }

先创建好新建的节点,尝试插入:

  template <typename Key, typename Value>
  bool insert(Iterator& it, Key&& k, Value&& v) {
    auto node = (Node*)Allocator().allocate(sizeof(Node)); // 分配节点
    new (node) Node(cohort_, std::forward<Key>(k), std::forward<Value>(v));
    auto res = insert_internal( // 
        it,
        node->getItem().first,
        InsertType::DOES_NOT_EXIST,
        [](const ValueType&) { return false; },
        node);
    if (!res) { // 失败释放已创建节点
      node->~Node();
      Allocator().deallocate((uint8_t*)node, sizeof(Node));
    }
    return res;
  }

只有当元素不存在时才插入新的key:

  template <typename MatchFunc, typename... Args>
  bool insert_internal(
      Iterator& it,
      const KeyType& k,
      InsertType type, // 插入类型,上面是InsertType::DOES_NOT_EXIST
      MatchFunc match,
      Node* cur) {
    return impl_.insert(it, k, type, match, cur, cohort_);
  }

先预插入,判断空间是否足够,是否要rehash:

  template <typename MatchFunc, typename... Args>
  bool prepare_insert(
      Iterator& it,
      const KeyType& k,
      InsertType type,
      MatchFunc match,
      hazptr_obj_cohort<Atom>* cohort,
      size_t& chunk_idx,
      size_t& tag_idx,
      Node*& node,
      Chunks*& chunks,
      size_t& ccount,
      const HashPair& hp) {
    ccount = chunk_count_.load(std::memory_order_relaxed);
    chunks = chunks_.load(std::memory_order_relaxed);

    if (size() >= grow_threshold_ && type == InsertType::DOES_NOT_EXIST) {
      if (max_size_ && size() << 1 > max_size_) {
        // 插入此元素会超出空间限制
        throw std::bad_alloc();
      }
      rehash_internal(ccount << 1, cohort); // 重新分配空间并rehash
      ccount = chunk_count_.load(std::memory_order_relaxed);
      chunks = chunks_.load(std::memory_order_relaxed);
    }

    node = find_internal(k, hp, chunks, ccount, chunk_idx, tag_idx); // 找到插入的key对应的节点

    it.hazptrs_[0].reset(chunks);
    if (node) { // 节点已存在
      it.hazptrs_[1].reset(node);
      it.setNode(node, chunks, ccount, chunk_idx, tag_idx);
      if (type == InsertType::MATCH) { // 请求是做匹配的,不匹配则false
        if (!match(node->getItem().second)) {
          return false;
        }
      } else if (type == InsertType::DOES_NOT_EXIST) { // 请求是不存在才插入,存在则false
        return false;
      }
    } else {
      if (type != InsertType::DOES_NOT_EXIST && type != InsertType::ANY) {
        it.hazptrs_[0].reset();
        return false;
      }
      // Already checked for rehash on DOES_NOT_EXIST, now check on ANY
      if (size() >= grow_threshold_ && type == InsertType::ANY) {
        if (max_size_ && size() << 1 > max_size_) {
          // 超出空间限制
          throw std::bad_alloc();
        }
        rehash_internal(ccount << 1, cohort); // rehash
        ccount = chunk_count_.load(std::memory_order_relaxed);
        chunks = chunks_.load(std::memory_order_relaxed);
        it.hazptrs_[0].reset(chunks);
      }
    }
    return true;
  }

空间不足时,重新分配两倍空间:

  void rehash_internal(
      size_t new_chunk_count,
      hazptr_obj_cohort<Atom>* cohort) {
    DCHECK(isPowTwo(new_chunk_count)); // 新的空间大小必须是2的幂次
    auto old_chunk_count = chunk_count_.load(std::memory_order_relaxed);
    if (old_chunk_count >= new_chunk_count) {
      return;
    }
    auto new_chunks = Chunks::create(new_chunk_count, cohort);
    auto old_chunks = chunks_.load(std::memory_order_relaxed);
    grow_threshold_ = new_chunk_count * Chunk::kCapacity * load_factor_; // 元素上限是chunk 数量 * 每个chunk容量 * 装载因子

    for (size_t i = 0; i < old_chunk_count; i++) {
      Chunk* oldchunk = old_chunks->getChunk(i, old_chunk_count);
      auto occupied = oldchunk->occupiedIter();
      while (occupied.hasNext()) {
        auto idx = occupied.next();
        Node* node = oldchunk->item(idx).load(std::memory_order_relaxed);
        size_t new_chunk_idx;
        size_t new_tag_idx;
        auto h = HashFn()(node->getItem().first);
        auto hp = splitHash(h);
        std::tie(new_chunk_idx, new_tag_idx) =
            findEmptyInsertLocation(new_chunks, new_chunk_count, hp); // 重新计算节点hash值并找到新的chunk位置
        Chunk* newchunk = new_chunks->getChunk(new_chunk_idx, new_chunk_count);
        newchunk->setNodeAndTag(new_tag_idx, node, hp.second);
      }
    }

    seqlock_.fetch_add(1, std::memory_order_release); // 修改序列号自增1 
    chunk_count_.store(new_chunk_count, std::memory_order_release); // 更新chunk 数量和chunk
    chunks_.store(new_chunks, std::memory_order_release);
    seqlock_.fetch_add(1, std::memory_order_release);
    if (old_chunks) {
      old_chunks->retire(HazptrTableDeleter(old_chunk_count));
    }
  }

插入元素(加锁):

template <typename MatchFunc, typename... Args>
  bool insert(
      Iterator& it,
      const KeyType& k,
      InsertType type,
      MatchFunc match,
      Node* cur,
      hazptr_obj_cohort<Atom>* cohort) {
    DCHECK(cur != nullptr);
    Node* node;
    Chunks* chunks;
    size_t ccount, chunk_idx, tag_idx;

    auto h = HashFn()(k);
    auto hp = splitHash(h);

    std::unique_lock<Mutex> g(m_); //加锁

    if (!prepare_insert( // 先预插入,判断是否要resize
            it,
            k,
            type,
            match,
            cohort,
            chunk_idx,
            tag_idx,
            node,
            chunks,
            ccount,
            hp)) {
      return false;
    }

    if (!node) { // 节点不存在,找到位置并插入,增加size
      std::tie(chunk_idx, tag_idx) =
          findEmptyInsertLocation(chunks, ccount, hp);
      it.setNode(cur, chunks, ccount, chunk_idx, tag_idx);
      incSize();
    }

    Chunk* chunk = chunks->getChunk(chunk_idx, ccount);
    chunk->setNodeAndTag(tag_idx, cur, hp.second);

    g.unlock();
    // Retire not under lock
    if (node) {
      node->retire();
    }
    return true;
  }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐