[CS144] Lab 1: stitching substrings into a byte
Lab 1: stitching substrings into a byte streamLab Guidance: Checkpoint 1: stitching substrings into a byte streamLab Code: https://github.com/VastRock-Huang/sponge/tree/lab1-startercode3 Putting subst
Lab 1: stitching substrings into a byte stream
- Lab Guide: Checkpoint 1: stitching substrings into a byte stream
- Lab Code: https://github.com/peakcrosser7/sponge/tree/lab1-startercode
3 Putting substrings in sequence
要点
- 实现
StreamReassembler
类, 用于将接收到的子串重组成顺序的字节流. StreamReassembler
的容量capacity
既包括未被读取的字节流ByteStream
中的字节数, 也包括未被重排的字节流的最大数目.
- 子串可能相互重叠, 且不能重复对其进行存储.
优化前
思路
数据结构
首先也是较为关键的是数据结构的选择. 需要对乱序的子串存储到缓冲区 _buffer
, 并重组成有序的字节流. 笔者首先想到的是使用哈希表存储 index
和子串 data
的映射, 但存在的问题在于子串之间可能是相互重叠的, 而哈希表的方式便无法判断, 进而会造成重复存储重叠部分.
而为解决重叠不重复存的问题, 就需要考虑可随机访问的顺序容器(由于乱序因此要求能够随机访问), 且以字节为单位而非整个子串. 此外由于会频繁的将重排后的子串写入 ByteStream
中, 即频繁对容器头部进行删除操作, 因此最终选择使用 deque
.
在选定了容器后还要考虑乱序带来的一个问题, 即区分缓冲区的一个字节是数据还是为空. 很显然在子串中不能保证一定没有 \0
字符, 因此直接利用在原字符位置无法直接判别, 因此笔者又使用了一个 deque<bool> _map
用于标识每个字节是否数据.
重组过程
实验的最关键部分是子串重组过程, 即 push_substring()
函数的实现. 在确定了数据结构之后, 重组过程实际上就相对比较清晰了. 先将子串存入缓冲区 _buffer
中, 然后将缓冲区头部所有重排好的子串写入 ByteStream
中. 在写入缓冲区时, 需要记录字节数, 用于统计未重排的字节总数, 即对应字段 _unassembled_bytes
, 同时在写入 ByteStream
后也需要进行调整.
临界判断
由于子串可乱序可重复, 因此有较多临界情况需要判断, 即对于不在未重组范围的字节, 可以直接丢弃. 具体而言, 这包括子串 index
超出 capacity
, 整个子串丢弃; 整个子串已被写入 ByteStream
中, 也无需再处理; 同样对于部分超出 capacity
或部分已经写入的字节, 仍无需处理.
而为了保证 capacity
的限制, 需要字段 _buffer_begin
来记录缓冲区未排序的字节的起始索引.
EOF
在子串标有 EOF 标识时, 表明该子串时字节流的结尾部分. 需要在这部分子串都被重排写入 ByteStream
后, 调用 ByteStream::end_input()
方法来停止 ByteStream
的写入.
这里需要注意的是, 由于子串可能由于超出容量被截短, 因此只有保证带 EOF 的整个子串都能被存入缓冲区后才能记录下 EOF 标识.
代码
libsponge/stream_reassembler.hh
#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#include "byte_stream.hh"
#include <cstdint>
#include <string>
#include <deque>
//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
private:
// Your code here -- add private members as necessary.
std::deque<char> _buffer; //!< The buffer to store unassembled bytes
std::deque<bool> _map; //!< The map to identify whether the byte is data
size_t _unassembled_bytes = 0; //!< Total number of unassembled bytes
size_t _buffer_begin = 0; //!< Starting index of the `_buffer`.
bool _eof = false; //!< Flag indicating that the end of bytes has been stored into `_buffer`
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
public:
//! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
//! \note This capacity limits both the bytes that have been reassembled,
//! and those that have not yet been reassembled.
StreamReassembler(const size_t capacity);
//! \brief Receive a substring and write any newly contiguous bytes into the stream.
//!
//! The StreamReassembler will stay within the memory limits of the `capacity`.
//! Bytes that would exceed the capacity are silently discarded.
//!
//! \param data the substring
//! \param index indicates the index (place in sequence) of the first byte in `data`
//! \param eof the last byte of `data` will be the last byte in the entire stream
void push_substring(const std::string &data, const uint64_t index, const bool eof);
//! \name Access the reassembled byte stream
//!@{
const ByteStream &stream_out() const { return _output; }
ByteStream &stream_out() { return _output; }
//!@}
//! The number of bytes in the substrings stored but not yet reassembled
//!
//! \note If the byte at a particular index has been pushed more than once, it
//! should only be counted once for the purpose of this function.
size_t unassembled_bytes() const;
//! \brief Is the internal state empty (other than the output stream)?
//! \returns `true` if no substrings are waiting to be assembled
bool empty() const;
};
#endif // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
libsponge/stream_reassembler.cc
#include "stream_reassembler.hh"
// Dummy implementation of a stream reassembler.
// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.
// You will need to add private members to the class declaration in `stream_reassembler.hh`
template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}
using namespace std;
StreamReassembler::StreamReassembler(const size_t capacity)
: _buffer(capacity), _map(capacity), _output(capacity), _capacity(capacity) {}
//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
// the maximum index of byte that `_buffer` can store
const size_t buffer_end = _buffer_begin + _output.remaining_capacity();
// if the whole substring exceed the capacity, it will be silently discarded.
if (index >= buffer_end) {
return;
}
// set the `_eof` flag if the end byte can be stored
if (eof && index + data.size() <= buffer_end) {
_eof = true;
}
// traverse each byte of the substring in `capacity` range for reassembling.
for (size_t buf_idx = max(index, _buffer_begin), data_idx = buf_idx - index;
data_idx < data.size() && buf_idx < buffer_end;
++data_idx, ++buf_idx) {
// not store bytes repeatedly
if (_map[buf_idx - _buffer_begin]) {
continue;
}
_buffer[buf_idx - _buffer_begin] = data[data_idx];
_map[buf_idx - _buffer_begin] = true;
++_unassembled_bytes;
}
string bytes;
// write the assembled bytes to `_output`
while (_map.front()) {
bytes.push_back(_buffer.front());
_buffer.pop_front();
_map.pop_front();
// keep the size of `_buffer` and `_map` always `_capacity`
_buffer.push_back('\0');
_map.push_back(false);
}
size_t len = bytes.size();
if (len > 0) {
// adjust `_buffer_begin` and `_unassembled_bytes` if some bytes have been reassembled
_buffer_begin += len;
_unassembled_bytes -= len;
_output.write(bytes);
}
// end the input of `_output` if `_eof` flag has been set and no unassembled bytes
if (_eof && _unassembled_bytes == 0) {
_output.end_input();
}
}
size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }
bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }
代码需要注意的是:
- 中间遍历的子串的字节范围被限定在
_buffer_begin
和buffer_end
之间, 以确保已重排写入_output
的字节以及超出capacity
上限的字节不予处理. - 由于子串可能是乱序的, 因此在存储时使用的是
deque::operator []
下标操作去存, 下标索引为字节的索引index
相对缓冲区起始索引_buffer_begin
的差值. - 由于存储是直接使用
[]
下标存, 这就需要保证访问的位置不能超过deque
的大小, 因此在出队写入_output
的同时要入队元素以保持_buffer
的大小不变, 防止后续存字节时出现问题.
遇到问题
- Test #18: The reassembler was expected to be at EOF, but was not.
解决: 根据提供的测试用例data "", index 0, eof 1
以及报错信息, 可以看到是由于ByteStream
没有设置 EOF. 原因是笔者在push_substring()
方法开始使用index+data.size()<=_buffer_begin
对于已被重排过的子串直接过滤掉, 但这存在data.size()==0
即空串的特殊情况, 此时若返回的话便不会设置 EOF 标识. 而考虑到后续重排时有检查, 因此应该直接去掉该语句.
- Test #18: The reassembler was expected to be at EOF, but was not.
解决: 根据报错信息可以看到同样与 EOF 有关. 最终发现是_eof
标识设置的判断条件有误,index+data.size()
需要小于等于buffer_end
, 笔者误设置为了"小于".
优化后
先前方法的缺点
上述方法虽然能够通过测试, 但是在 Lab4 经过测试后显示其实际性能并不优秀, 于是笔者后来有对此部分进行了修改.
先前方法使用了两个 deque<char>
的队列作为核心数据结构, 用于存放乱序的字符串, 目的在于利用 deque
本身的随机访问, 这样可以以字节为单位较为方便的去重. 但这实际上也是一个问题, 即在放入 _buffer
时, 会逐字节遍历整个接收的字符串, 同时判断 _buffer
中是否重叠并放入, 同理在由 _buffer
转移至 _output
输出字节流时, 同样是逐字节的从 _buffer
中取出. 这两个过程是与字符串长度成线性关系的时间复杂度. 换句话说, 若字符串长度很长, 则会浪费大量时间在逐字节的扫描中. 因此也成为了 TCP 连接接收的一大性能瓶颈.
优化思路
这里重点介绍该优化方法相较之前方法的不同, 共同的部分则不再赘述.
数据结构
优化的核心在于将 _buffer
的数据结构由 deque<char>
改为 set<Block>
. 首先介绍 Block
, 其为一个结构体, 有两个字段: 一个用于记录接收到的部分字符串 data
, 另一个则记录该部分字符串的索引 index
.
在上文中提到过, 笔者开始想到的哈希表的问题在于无需性, 而 set
底层基于红黑树, 数据结构是有序的. 而利用 Block
实际上就是以片段为单位存储字符串, 而非字节, 这样在处理时自然更快速. 这里笔者选择了 set
, 实际上选择 map
也是可以的, 这样就是将 index
作为键名 data
作为键值.
而由于是以片段为单位的, 相较于先前的方法, 需要额外的一步操作则是去除重复部分, 这里只需要遍历 _buffer
的每个片段, 根据已存储的片段来去除当前需要插入片段的头部或者尾部即可.
BufferPlus
在做到 Lab4 时, 对 Lab0 中 ByteStream
优化的一点即将 string
数据结构改为 BufferList
, 以减少复制的开销. 这里实际上是相同的考虑, 在裁剪字符串时即调用 string::substr()
方法, 即从新开辟一块内存来存储字符串, 且在向 _buffer
中转移时也均为字符串的复制, 有较大的开销. 因此容易想到使用 Buffer
通过共享内存和偏移量(具体可以见其实现)来替代 substr()
, 将复制改为移动以减少开销.
这里未使用原本的 Buffer
的原因在于其只有 remove_prefix()
方法去除字符串的首部, 而不能去除尾部. 而自然在字符串去除重复部分时也可能需要在尾部去除, 因此笔者参照原本的 Buffer
写了一个新的 BufferPlus
类, 可以同时在首尾去除字符串.
注意, 在 BufferPlus
和 Buffer
中的首尾去除字符串并不是真正的去除, 而是通过一个偏移指针来隐藏去除过的字符, 只有当整个字符串都去除时才会直接清除整个内存, 这样可以减少内存释放的开销, 只需要进行偏移值的改变, 从而提高性能.
代码
libsponge/buffer_plus.hh
#ifndef SPONGE_BUFFER_PLUS_HH
#define SPONGE_BUFFER_PLUS_HH
#include <memory>
#include <string>
//! \brief A reference-counted read-only string that can discard bytes from the front
class BufferPlus {
private:
std::shared_ptr<std::string> _storage{};
size_t _starting_offset{};
size_t _ending_offset{};
public:
BufferPlus() = default;
//! \brief Construct by taking ownership of a string
BufferPlus(std::string &&str) noexcept : _storage(std::make_shared<std::string>(std::move(str))) {}
//! \name Expose contents as a std::string_view
//!@{
std::string_view str() const {
if (!_storage) {
return {};
}
return {_storage->data() + _starting_offset, _storage->size() - _starting_offset - _ending_offset};
}
operator std::string_view() const { return str(); }
//!@}
//! \brief Get character at location `n`
uint8_t at(const size_t n) const { return str().at(n); }
//! \brief Size of the string
size_t size() const {
return _storage ? _storage->size() - _starting_offset - _ending_offset : 0;
}
//! \brief Make a copy to a new std::string
std::string copy() const { return std::string(str()); }
//! \brief Discard the first `n` bytes of the string (does not require a copy or move)
//! \note Doesn't free any memory until the whole string has been discarded in all copies of the Buffer.
void remove_prefix(size_t n) {
if(!_storage) {
return;
}
if (n >= str().size()) {
_storage.reset();
return;
}
_starting_offset += n;
}
//! \brief Discard the last `n` bytes of the string (does not require a copy or move)
void remove_suffix(size_t n) {
if(!_storage) {
return;
}
if(n >= str().size()) {
_storage.reset();
return;
}
_ending_offset += n;
}
};
#endif // SPONGE_BUFFER_PLUS_HH
libsponge/stream_reassembler.hh
#ifndef SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#define SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
#include "buffer_plus.hh"
#include "byte_stream.hh"
#include <cstdint>
#include <set>
#include <string>
//! \brief A class that assembles a series of excerpts from a byte stream (possibly out of order,
//! possibly overlapping) into an in-order byte stream.
class StreamReassembler {
private:
// Your code here -- add private members as necessary.
struct Block {
size_t index; //!< The index of the data in BufferPlus
BufferPlus data; //!< The buffer to store the string
Block(size_t idx, std::string &&str) noexcept : index(idx), data(std::move(str)) {}
bool operator<(const Block &block) const { return index < block.index; }
//! \brief the size of string in block
size_t size() const { return data.size(); }
};
std::set<Block> _buffer{}; //!< The set to store unassembled strings
size_t _unassembled_bytes{0}; //!< Total number of unassembled bytes
bool _eof{false}; //!< Flag indicating that the end of bytes has been stored into `_buffer`
ByteStream _output; //!< The reassembled in-order byte stream
size_t _capacity; //!< The maximum number of bytes
void insert_block(size_t index, std::string &&data);
public:
//! \brief Construct a `StreamReassembler` that will store up to `capacity` bytes.
//! \note This capacity limits both the bytes that have been reassembled,
//! and those that have not yet been reassembled.
StreamReassembler(const size_t capacity);
//! \brief Receive a substring and write any newly contiguous bytes into the stream.
//!
//! The StreamReassembler will stay within the memory limits of the `capacity`.
//! Bytes that would exceed the capacity are silently discarded.
//!
//! \param data the substring
//! \param index indicates the index (place in sequence) of the first byte in `data`
//! \param eof the last byte of `data` will be the last byte in the entire stream
void push_substring(const std::string &data, const uint64_t index, const bool eof);
//! \name Access the reassembled byte stream
//!@{
const ByteStream &stream_out() const { return _output; }
ByteStream &stream_out() { return _output; }
//!@}
//! The number of bytes in the substrings stored but not yet reassembled
//!
//! \note If the byte at a particular index has been pushed more than once, it
//! should only be counted once for the purpose of this function.
size_t unassembled_bytes() const;
//! \brief Is the internal state empty (other than the output stream)?
//! \returns `true` if no substrings are waiting to be assembled
bool empty() const;
};
#endif // SPONGE_LIBSPONGE_STREAM_REASSEMBLER_HH
libsponge/stream_reassembler.cc
#include "stream_reassembler.hh"
// Dummy implementation of a stream reassembler.
// For Lab 1, please replace with a real implementation that passes the
// automated checks run by `make check_lab1`.
// You will need to add private members to the class declaration in `stream_reassembler.hh`
template <typename... Targs>
void DUMMY_CODE(Targs &&.../* unused */) {}
using namespace std;
StreamReassembler::StreamReassembler(const size_t capacity) : _output(capacity), _capacity(capacity) {}
//! \details This function accepts a substring (aka a segment) of bytes,
//! possibly out-of-order, from the logical stream, and assembles any newly
//! contiguous substrings and writes them into the output stream in order.
void StreamReassembler::push_substring(const string &data, const size_t index, const bool eof) {
size_t first_unassembled = _output.bytes_read() + _output.buffer_size();
size_t first_unacceptable = _output.bytes_read() + _capacity;
// the index of data after removing the reassembled bytes
size_t idx = index;
// the length of data after removing the reassembled or unacceptable bytes
size_t len = data.size();
// the eof flag of data after removing
bool eof_flag = eof;
// if the whole data string is reassembled or unacceptable, it will be discarded
if (idx + len < first_unassembled || idx >= first_unacceptable) {
return;
}
// remove the unassembled and unacceptable bytes in data
// only update the `idx` and `len`
if (idx < first_unassembled) {
len -= first_unassembled - idx;
idx = first_unassembled;
}
if (idx + len > first_unacceptable) {
len = first_unacceptable - idx;
// the eof must be false when removing the tail of data
eof_flag = false;
}
// update the `_eof` flag, only when the end byte can be stored, will `_eof` be true
_eof = _eof || eof_flag;
// insert the substr of data after removing to `_buffer`
insert_block(idx, data.substr(idx - index, len));
// traverse the beginning of `_buffer` and write the assembled strings to `_output`
while (!_buffer.empty()) {
const Block &b = *_buffer.begin();
// if the block `b` is not the assembled string
if (first_unassembled != b.index) {
break;
}
_output.write(b.data.copy());
_unassembled_bytes -= b.size();
first_unassembled += b.size();
_buffer.erase(_buffer.begin());
}
// end the input of `_output` if `_eof` flag has been set and no unassembled bytes
if (_eof && _unassembled_bytes == 0) {
_output.end_input();
}
}
size_t StreamReassembler::unassembled_bytes() const { return _unassembled_bytes; }
bool StreamReassembler::empty() const { return _unassembled_bytes == 0; }
//! use the data and its index to build a `Block` and insert it to `_buffer`
void StreamReassembler::insert_block(size_t index, string &&data) {
// the size of `_unassembled_bytes` should update
size_t delta_bytes = data.size();
// do nothing if no data
if (delta_bytes == 0) {
return;
}
Block block(index, move(data));
// a flag to record whether to insert
bool hasInsert = false;
// traverse all blocks in `_buffer`
// note: using iterator because some blocks will be erased, use `for-range` is wrong
for (auto it = _buffer.begin(); it != _buffer.end();) {
const Block &b = *it;
// if the whole `block` is to the left of `b`, it can be inserted all
if (block.index + block.size() <= b.index) {
_buffer.insert(block);
hasInsert = true;
break;
}
// if the whole `block` is to the right of `b`, it should compare with next one
if (block.index >= b.index + b.size()) {
++it;
continue;
}
// if `b` is part of the `block`, we should erase `b`
if (block.index < b.index && block.index + block.size() > b.index + b.size()) {
// update `delta_bytes`
delta_bytes -= b.size();
it = _buffer.erase(it);
} else if (block.index < b.index) {
// if the tail of `block` overlaps with `b`, remove its suffix and insert to `_buffer`
size_t delta = block.index + block.size() - b.index;
block.data.remove_suffix(delta);
_buffer.insert(block);
delta_bytes -= delta;
hasInsert = true;
break;
} else if (block.index + block.size() > b.index + b.size()) {
// if the head of `block` overlaps with `b`, remove its prefix and compare with the next block
size_t delta = b.index + b.size() - block.index;
block.data.remove_prefix(delta);
block.index += delta;
delta_bytes -= delta;
++it;
} else {
// if the `block` is part of `b`, it should not be inserted to `_buffer`
delta_bytes = 0;
hasInsert = true;
break;
}
}
// insert `block` to `_buffer` if it has not been inserted after traversing `_buffer`
if (!hasInsert) {
_buffer.insert(block);
}
_unassembled_bytes += delta_bytes;
}
测试
在 build
目录下执行 make
后执行 make check_lab1
:
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)