Lab 1: stitching substrings into a byte stream

3 Putting substrings in sequence

要点

  • 实现 StreamReassembler 类, 用于将接收到的子串重组成顺序的字节流.
  • StreamReassembler 的容量 capacity 既包括未被读取的字节流 ByteStream 中的字节数, 也包括未被重排的字节流的最大数目.
    在这里插入图片描述
  • 子串可能相互重叠, 且不能重复对其进行存储.

优化前

思路
数据结构

首先也是较为关键的是数据结构的选择. 需要对乱序的子串存储到缓冲区 _buffer, 并重组成有序的字节流. 笔者首先想到的是使用哈希表存储 index 和子串 data 的映射, 但存在的问题在于子串之间可能是相互重叠的, 而哈希表的方式便无法判断, 进而会造成重复存储重叠部分.
而为解决重叠不重复存的问题, 就需要考虑可随机访问的顺序容器(由于乱序因此要求能够随机访问), 且以字节为单位而非整个子串. 此外由于会频繁的将重排后的子串写入 ByteStream 中, 即频繁对容器头部进行删除操作, 因此最终选择使用 deque.
在选定了容器后还要考虑乱序带来的一个问题, 即区分缓冲区的一个字节是数据还是为空. 很显然在子串中不能保证一定没有 \0 字符, 因此直接利用在原字符位置无法直接判别, 因此笔者又使用了一个 deque<bool> _map 用于标识每个字节是否数据.

重组过程

实验的最关键部分是子串重组过程, 即 push_substring() 函数的实现. 在确定了数据结构之后, 重组过程实际上就相对比较清晰了. 先将子串存入缓冲区 _buffer 中, 然后将缓冲区头部所有重排好的子串写入 ByteStream 中. 在写入缓冲区时, 需要记录字节数, 用于统计未重排的字节总数, 即对应字段 _unassembled_bytes, 同时在写入 ByteStream 后也需要进行调整.

临界判断

由于子串可乱序可重复, 因此有较多临界情况需要判断, 即对于不在未重组范围的字节, 可以直接丢弃. 具体而言, 这包括子串 index 超出 capacity, 整个子串丢弃; 整个子串已被写入 ByteStream 中, 也无需再处理; 同样对于部分超出 capacity 或部分已经写入的字节, 仍无需处理.
而为了保证 capacity 的限制, 需要字段 _buffer_begin 来记录缓冲区未排序的字节的起始索引.

EOF

在子串标有 EOF 标识时, 表明该子串时字节流的结尾部分. 需要在这部分子串都被重排写入 ByteStream 后, 调用 ByteStream::end_input() 方法来停止 ByteStream 的写入.
这里需要注意的是, 由于子串可能由于超出容量被截短, 因此只有保证带 EOF 的整个子串都能被存入缓冲区后才能记录下 EOF 标识.

代码
libsponge/stream_reassembler.hh
#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

#include "byte_stream.hh"

#include <cstdint>
#include <string>
#include <deque>

//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.
    std::deque<char> _buffer;       //!< The buffer to store unassembled bytes
    std::deque<bool> _map;          //!< The map to identify whether the byte is data
    size_t _unassembled_bytes = 0;  //!< Total number of unassembled bytes
    size_t _buffer_begin = 0;       //!< Starting index of the `_buffer`.
    bool _eof = false;              //!< Flag indicating that the end of bytes has been stored into `_buffer`

    ByteStream _output;  //!< The reassembled in-order byte stream
    size_t _capacity;    //!< The maximum number of bytes

  public:
    //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
    //! \note This capacity limits both the bytes that have been reassembled,
    //! and those that have not yet been reassembled.
    StreamReassembler(const size_t capacity);

    //! \brief Receive a substring and write any newly contiguous bytes into the stream.
    //!
    //! The StreamReassembler will stay within the memory limits of the `capacity`.
    //! Bytes that would exceed the capacity are silently discarded.
    //!
    //! \param data the substring
    //! \param index indicates the index (place in sequence) of the first byte in `data`
    //! \param eof the last byte of `data` will be the last byte in the entire stream
    void push_substring(const std::string &data, const uint64_t index, const bool eof);

    //! \name Access the reassembled byte stream
    //!@{
    const ByteStream &stream_out() const { return _output; }
    ByteStream &stream_out() { return _output; }
    //!@}

    //! The number of bytes in the substrings stored but not yet reassembled
    //!
    //! \note If the byte at a particular index has been pushed more than once, it
    //! should only be counted once for the purpose of this function.
    size_t unassembled_bytes() const;

    //! \brief Is the internal state empty (other than the output stream)?
    //! \returns `true` if no substrings are waiting to be assembled
    bool empty() const;
};

#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
libsponge/stream_reassembler.cc
#include "stream_reassembler.hh"

// Dummy implementation of a stream reassembler.

// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.

// You will need to add private members to the class declaration in `stream_reassembler.hh`

template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}

using namespace std;

StreamReassembler::StreamReassembler(const size_t capacity)
    : _buffer(capacity), _map(capacity), _output(capacity), _capacity(capacity) {}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    // the maximum index of byte that `_buffer` can store
    const size_t buffer_end = _buffer_begin + _output.remaining_capacity();
    // if the whole substring exceed the capacity, it will be silently discarded.
    if (index >= buffer_end) {
        return;
    }
    // set the `_eof` flag if the end byte can be stored
    if (eof && index + data.size() <= buffer_end) {
        _eof = true;
    }
    // traverse each byte of the substring in `capacity` range for reassembling.
    for (size_t buf_idx = max(index, _buffer_begin), data_idx = buf_idx - index;
         data_idx < data.size() && buf_idx < buffer_end;
         ++data_idx, ++buf_idx) {
        // not store bytes repeatedly
        if (_map[buf_idx - _buffer_begin]) {
            continue;
        }
        _buffer[buf_idx - _buffer_begin] = data[data_idx];
        _map[buf_idx - _buffer_begin] = true;
        ++_unassembled_bytes;
    }
    string bytes;
    // write the assembled bytes to `_output`
    while (_map.front()) {
        bytes.push_back(_buffer.front());
        _buffer.pop_front();
        _map.pop_front();
        // keep the size of `_buffer` and `_map` always `_capacity`
        _buffer.push_back('\0');
        _map.push_back(false);
    }
    size_t len = bytes.size();
    if (len > 0) {
        // adjust `_buffer_begin` and `_unassembled_bytes` if some bytes have been reassembled 
        _buffer_begin += len;
        _unassembled_bytes -= len;
        _output.write(bytes);
    }
    // end the input of `_output` if `_eof` flag has been set and no unassembled bytes
    if (_eof && _unassembled_bytes == 0) {
        _output.end_input();
    }
}

size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }

bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }

代码需要注意的是:

  1. 中间遍历的子串的字节范围被限定在 _buffer_beginbuffer_end 之间, 以确保已重排写入 _output 的字节以及超出 capacity 上限的字节不予处理.
  2. 由于子串可能是乱序的, 因此在存储时使用的是 deque::operator [] 下标操作去存, 下标索引为字节的索引 index 相对缓冲区起始索引 _buffer_begin 的差值.
  3. 由于存储是直接使用 [] 下标存, 这就需要保证访问的位置不能超过 deque 的大小, 因此在出队写入 _output 的同时要入队元素以保持 _buffer 的大小不变, 防止后续存字节时出现问题.
遇到问题
  • Test #18: The reassembler was expected to be at EOF, but was not.
    在这里插入图片描述
    解决: 根据提供的测试用例 data "", index 0, eof 1以及报错信息, 可以看到是由于 ByteStream 没有设置 EOF. 原因是笔者在 push_substring() 方法开始使用 index+data.size()<=_buffer_begin 对于已被重排过的子串直接过滤掉, 但这存在 data.size()==0 即空串的特殊情况, 此时若返回的话便不会设置 EOF 标识. 而考虑到后续重排时有检查, 因此应该直接去掉该语句.
    在这里插入图片描述
  • Test #18: The reassembler was expected to be at EOF, but was not.
    在这里插入图片描述
    解决: 根据报错信息可以看到同样与 EOF 有关. 最终发现是 _eof 标识设置的判断条件有误, index+data.size() 需要小于等于 buffer_end, 笔者误设置为了"小于".
    在这里插入图片描述

优化后

先前方法的缺点

上述方法虽然能够通过测试, 但是在 Lab4 经过测试后显示其实际性能并不优秀, 于是笔者后来有对此部分进行了修改.
先前方法使用了两个 deque<char> 的队列作为核心数据结构, 用于存放乱序的字符串, 目的在于利用 deque 本身的随机访问, 这样可以以字节为单位较为方便的去重. 但这实际上也是一个问题, 即在放入 _buffer 时, 会逐字节遍历整个接收的字符串, 同时判断 _buffer 中是否重叠并放入, 同理在_buffer 转移至 _output 输出字节流时, 同样是逐字节的从 _buffer 中取出. 这两个过程是与字符串长度成线性关系的时间复杂度. 换句话说, 若字符串长度很长, 则会浪费大量时间在逐字节的扫描中. 因此也成为了 TCP 连接接收的一大性能瓶颈.

优化思路

这里重点介绍该优化方法相较之前方法的不同, 共同的部分则不再赘述.

数据结构

优化的核心在于将 _buffer 的数据结构由 deque<char> 改为 set<Block>. 首先介绍 Block, 其为一个结构体, 有两个字段: 一个用于记录接收到的部分字符串 data, 另一个则记录该部分字符串的索引 index.
在上文中提到过, 笔者开始想到的哈希表的问题在于无需性, 而 set 底层基于红黑树, 数据结构是有序的. 而利用 Block 实际上就是以片段为单位存储字符串, 而非字节, 这样在处理时自然更快速. 这里笔者选择了 set, 实际上选择 map 也是可以的, 这样就是将 index 作为键名 data 作为键值.
而由于是以片段为单位的, 相较于先前的方法, 需要额外的一步操作则是去除重复部分, 这里只需要遍历 _buffer 的每个片段, 根据已存储的片段来去除当前需要插入片段的头部或者尾部即可.

BufferPlus

在做到 Lab4 时, 对 Lab0 中 ByteStream 优化的一点即将 string 数据结构改为 BufferList, 以减少复制的开销. 这里实际上是相同的考虑, 在裁剪字符串时即调用 string::substr() 方法, 即从新开辟一块内存来存储字符串, 且在向 _buffer 中转移时也均为字符串的复制, 有较大的开销. 因此容易想到使用 Buffer 通过共享内存和偏移量(具体可以见其实现)来替代 substr(), 将复制改为移动以减少开销.
这里未使用原本的 Buffer 的原因在于其只有 remove_prefix() 方法去除字符串的首部, 而不能去除尾部. 而自然在字符串去除重复部分时也可能需要在尾部去除, 因此笔者参照原本的 Buffer 写了一个新的 BufferPlus 类, 可以同时在首尾去除字符串.
注意, 在 BufferPlusBuffer 中的首尾去除字符串并不是真正的去除, 而是通过一个偏移指针来隐藏去除过的字符, 只有当整个字符串都去除时才会直接清除整个内存, 这样可以减少内存释放的开销, 只需要进行偏移值的改变, 从而提高性能.

代码
libsponge/buffer_plus.hh
#ifndef SPONGE_BUFFER_PLUS_HH
#define SPONGE_BUFFER_PLUS_HH

#include <memory>
#include <string>

//! \brief A reference-counted read-only string that can discard bytes from the front
class BufferPlus {
  private:
    std::shared_ptr<std::string> _storage{};
    size_t _starting_offset{};
    size_t _ending_offset{};

  public:
    BufferPlus() = default;

    //! \brief Construct by taking ownership of a string
    BufferPlus(std::string &&str) noexcept : _storage(std::make_shared<std::string>(std::move(str))) {}

    //! \name Expose contents as a std::string_view
    //!@{
    std::string_view str() const {
        if (!_storage) {
            return {};
        }
        return {_storage->data() + _starting_offset, _storage->size() - _starting_offset - _ending_offset};
    }

    operator std::string_view() const { return str(); }
    //!@}

    //! \brief Get character at location `n`
    uint8_t at(const size_t n) const { return str().at(n); }

    //! \brief Size of the string
    size_t size() const {
        return _storage ? _storage->size() - _starting_offset - _ending_offset : 0;
    }

    //! \brief Make a copy to a new std::string
    std::string copy() const { return std::string(str()); }

    //! \brief Discard the first `n` bytes of the string (does not require a copy or move)
    //! \note Doesn't free any memory until the whole string has been discarded in all copies of the Buffer.
    void remove_prefix(size_t n) {
        if(!_storage) {
            return;
        }
        if (n >= str().size()) {
            _storage.reset();
            return;
        }
        _starting_offset += n;
    }

    //! \brief Discard the last `n` bytes of the string (does not require a copy or move)
    void remove_suffix(size_t n) {
        if(!_storage) {
            return;
        }
        if(n >= str().size()) {
            _storage.reset();
            return;
        }
        _ending_offset += n;
    }
};

#endif  // SPONGE_BUFFER_PLUS_HH
libsponge/stream_reassembler.hh
#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH

#include "buffer_plus.hh"
#include "byte_stream.hh"

#include <cstdint>
#include <set>
#include <string>

//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
  private:
    // Your code here -- add private members as necessary.
    struct Block {
        size_t index;     //!< The index of the data in BufferPlus
        BufferPlus data;  //!< The buffer to store the string

        Block(size_t idx, std::string &&str) noexcept : index(idx), data(std::move(str)) {}

        bool operator<(const Block &block) const { return index < block.index; }

        //! \brief the size of string in block
        size_t size() const { return data.size(); }
    };

    std::set<Block> _buffer{};     //!< The set to store unassembled strings
    size_t _unassembled_bytes{0};  //!< Total number of unassembled bytes
    bool _eof{false};              //!< Flag indicating that the end of bytes has been stored into `_buffer`
    ByteStream _output;            //!< The reassembled in-order byte stream
    size_t _capacity;              //!< The maximum number of bytes

    void insert_block(size_t index, std::string &&data);

  public:
    //! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
    //! \note This capacity limits both the bytes that have been reassembled,
    //! and those that have not yet been reassembled.
    StreamReassembler(const size_t capacity);

    //! \brief Receive a substring and write any newly contiguous bytes into the stream.
    //!
    //! The StreamReassembler will stay within the memory limits of the `capacity`.
    //! Bytes that would exceed the capacity are silently discarded.
    //!
    //! \param data the substring
    //! \param index indicates the index (place in sequence) of the first byte in `data`
    //! \param eof the last byte of `data` will be the last byte in the entire stream
    void push_substring(const std::string &data, const uint64_t index, const bool eof);

    //! \name Access the reassembled byte stream
    //!@{
    const ByteStream &stream_out() const { return _output; }
    ByteStream &stream_out() { return _output; }
    //!@}

    //! The number of bytes in the substrings stored but not yet reassembled
    //!
    //! \note If the byte at a particular index has been pushed more than once, it
    //! should only be counted once for the purpose of this function.
    size_t unassembled_bytes() const;

    //! \brief Is the internal state empty (other than the output stream)?
    //! \returns `true` if no substrings are waiting to be assembled
    bool empty() const;
};

#endif  // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
libsponge/stream_reassembler.cc
#include "stream_reassembler.hh"

// Dummy implementation of a stream reassembler.

// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.

// You will need to add private members to the class declaration in `stream_reassembler.hh`

template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}

using namespace std;

StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity) {}

//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
    size_t first_unassembled = _output.bytes_read() + _output.buffer_size();
    size_t first_unacceptable = _output.bytes_read() + _capacity;
    // the index of data after removing the reassembled bytes
    size_t idx = index;
    // the length of data after removing the reassembled or unacceptable bytes
    size_t len = data.size();
    // the eof flag of data after removing
    bool eof_flag = eof;
    // if the whole data string is reassembled or unacceptable, it will be discarded
    if (idx + len < first_unassembled || idx >= first_unacceptable) {
        return;
    }
    // remove the unassembled and unacceptable bytes in data
    // only update the `idx` and `len`
    if (idx < first_unassembled) {
        len -= first_unassembled - idx;
        idx = first_unassembled;
    }
    if (idx + len > first_unacceptable) {
        len = first_unacceptable - idx;
        // the eof must be false when removing the tail of data
        eof_flag = false;
    }
    // update the `_eof` flag, only when the end byte can be stored, will `_eof` be true
    _eof = _eof || eof_flag;

    // insert the substr of data after removing to `_buffer`
    insert_block(idx, data.substr(idx - index, len));

    // traverse the beginning of `_buffer` and write the assembled strings to `_output`
    while (!_buffer.empty()) {
        const Block &b = *_buffer.begin();
        // if the block `b` is not the assembled string
        if (first_unassembled != b.index) {
            break;
        }
        _output.write(b.data.copy());
        _unassembled_bytes -= b.size();
        first_unassembled += b.size();
        _buffer.erase(_buffer.begin());
    }
    // end the input of `_output` if `_eof` flag has been set and no unassembled bytes
    if (_eof && _unassembled_bytes == 0) {
        _output.end_input();
    }
}

size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }

bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }

//! use the data and its index to build a `Block` and insert it to `_buffer`
void StreamReassembler::insert_block(size_t index, string &&data) {
    // the size of `_unassembled_bytes` should update
    size_t delta_bytes = data.size();
    // do nothing if no data
    if (delta_bytes == 0) {
        return;
    }
    Block block(index, move(data));
    // a flag to record whether to insert
    bool hasInsert = false;
    // traverse all blocks in `_buffer`
    // note: using iterator because some blocks will be erased, use `for-range` is wrong
    for (auto it = _buffer.begin(); it != _buffer.end();) {
        const Block &b = *it;
        // if the whole `block` is to the left of `b`, it can be inserted all
        if (block.index + block.size() <= b.index) {
            _buffer.insert(block);
            hasInsert = true;
            break;
        }
        // if the whole `block` is to the right of `b`, it should compare with next one
        if (block.index >= b.index + b.size()) {
            ++it;
            continue;
        }
        // if `b` is part of the `block`, we should erase `b`
        if (block.index < b.index && block.index + block.size() > b.index + b.size()) {
            // update `delta_bytes`
            delta_bytes -= b.size();
            it = _buffer.erase(it);
        } else if (block.index < b.index) {
            // if the tail of `block` overlaps with `b`, remove its suffix and insert to `_buffer`
            size_t delta = block.index + block.size() - b.index;
            block.data.remove_suffix(delta);
            _buffer.insert(block);
            delta_bytes -= delta;
            hasInsert = true;
            break;
        } else if (block.index + block.size() > b.index + b.size()) {
            // if the head of `block` overlaps with `b`, remove its prefix and compare with the next block
            size_t delta = b.index + b.size() - block.index;
            block.data.remove_prefix(delta);
            block.index += delta;
            delta_bytes -= delta;
            ++it;
        } else {
            // if the `block` is part of `b`, it should not be inserted to `_buffer`
            delta_bytes = 0;
            hasInsert = true;
            break;
        }
    }
    // insert `block` to `_buffer` if it has not been inserted after traversing `_buffer`
    if (!hasInsert) {
        _buffer.insert(block);
    }
    _unassembled_bytes += delta_bytes;
}

测试

build 目录下执行 make 后执行 make check_lab1:
在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐