首先,要搞懂一点,异步操作本质上也是并发,我们想要在线程级别实现异步并发基本就靠三种方式:

  • 多线程并发
  • 回调函数
  • 协程

今天我们讨论的是回调函数,我们如何通过回调函数来实现异步操作呢?

  1. 非阻塞I/O操作+回调函数实现异步IO
  2. 基于定时器+回调函数实现异步任务调度
  3. 事件队列+回调函数

我们就分别来实现一下他们吧,代码比较长,请耐心阅读。

非阻塞I/O+回调

这种方式允许程序在等待 I/O 操作完成时继续执行其他任务,从而提高并发性和性能。

我们可以使用标准 C++ 库和 C++11 中的线程库来实现一个基于 epoll 的非阻塞 I/O 和回调函数的示例。epoll 是 Linux 提供的高效 I/O 多路复用机制,适用于处理大量并发连接。我们将使用 epoll 来实现异步 I/O,并使用回调函数处理完成的 I/O 操作。

实例代码:使用 epoll 实现非阻塞 I/O 和回调函数

#include <sys/epoll.h>
#include <unistd.h>
#include <fcntl.h>
#include <iostream>
#include <cstring>
#include <functional>
#include <unordered_map>
#include <vector>
#include <thread>
#include <chrono>

void set_non_blocking(int fd) {
	int flags = fcntl(fd, F_GETFL, 0);
	if (flage == -1) {
		throw std::runtime_error("fcntl get error");
	}
	if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
		throw std::runtime_error("fcntl set error");
	}
}

//事件处理器类
class EventLoop {
public:
	EventLoop() {
		epollFd_ = epoll_create1(0);
		if (epoll_fd == -1) {
			throw std::runtime_error("epoll_creat1 error");
		}
	}

	~EpollLoop() {
		close(epoll_fd);
	}

	//添加文件描述符及其对应的回调函数
	void add_fd(int fd, std::function<void(int)> cb) {
		set_non_blocking(fd);
		epoll_event event;
		event.events = EPOLLIN | EPOLLET;
		event.data.fd = fd;
		if (epoll_ctl(epollFd_, EPOLL_CTL_ADD, fd, &event) == -1) {
			throw std::runtime_error("epoll_ctl add error");
		}
		callbacks_[fd] = cb;
	}

	//运行时间循环
	void run() {
		while (running_) {
			std::vector<epoll_event> events(10);
			int numActives = epoll_wait(epollFd_, events.data(), events.size(), -1);
			if (numActives == -1) {
				throw std::runtime_error("epoll_wait error");
			}
			for (int i = 0; i < n; ++i) {
				int fd = events[i].data.fd;
				if (callbacks_.find(fd) != callbacks_.end()) 
					callbacks[fd](fd);
			}
		}
	}

	//停止事件循环
	void stop() {
		return = false;
	}

private:
	int epollFd_; //epoll实例
	std::unordered_map<int, std::function<void(int)>> callbacks_;
	bool running_ = true;
};


// 示例回调函数,处理标准输入的读取
void handle_stdin(int fd) {
	char buffer[128];
	ssize_t n = read(fd, buffer, sizeof(buffer) - 1);
	if (n > 0) {
		buffer[n] = '\0';
		std::cout << "Read from stdin: " << buffer << std::endl;
	} else if (n == 0) {
		std::cout << "EOF from stdin" << std::endl;
	} else {
	if (errno != EAGAIN && errno != EWOULDBLOCK) {
		std::ceer << "Read error: " <<  strerror(errno) << std::endl;
		}	
	}
}

int main () {
	try {
		EventLoop loop;

		//注册标准输入的文件描述符和回调函数
		loop.add_fd(STDIN_FILENO, handle_stdin);

		//运行事件循环
		std::thread loop_thread([&loop]() {
			loop.run();
		});

		//模拟只线程的其他工作
		std::this_thread::sleep_for(std::chrono::seconds(10));

		//停止事件循环
		loop.stop();
		loop_thread.join();
	} catch (const std::exception &e) {
		std::cerr << "Exception: " << e.what() << std::endl;
		return 1
	}

	return 0;
}

在该示例中,由于是非阻塞IO,所以如果有两个 fd 对应的事件被激活,那么他们会循环执行各自的回调操作,因为我们的EventLoop里面是又一个 while 循环的,如果这两个 fd 如果都是读一个很大很大的文件,那么他们在while循环中会交替执行各自的回调任务,实现异步操作。

基于定时器+回调函数实现异步任务调度

通过定时器和回调函数,可以在单线程环境中实现异步任务调度。在这个示例中,我们实现了一个简单的 Web 服务器定时清理过期会话的功能。类似的技术可以应用于其他需要定时任务调度的场景,如定时备份、日志轮换、定时数据采集等。

1. SessionManager 类

这个类负责管理用户会话,提供添加、删除和清理过期会话的方法。

class SessionManager {
#include <iostream>
#include <unordered_map>
#include <chrono>
#include <functional>
#include <thread>
#include <vector>
#include <algorithm>

class SessionManager {
public:
    void add_session(const std::string& session_id) {
        sessions[session_id] = std::chrono::steady_clock::now();
    }

    void remove_session(const std::string& session_id) {
        sessions.erase(session_id);
    }

    void clean_expired_sessions(std::chrono::seconds timeout) {
        auto now = std::chrono::steady_clock::now();
        for (auto it = sessions.begin(); it != sessions.end(); ) {
            if (now - it->second > timeout) {
                std::cout << "Removing expired session: " << it->first << std::endl;
                it = sessions.erase(it);
            } else {
                ++it;
            }
        }
    }

private:
    std::unordered_map<std::string, std::chrono::steady_clock::time_point> sessions;
};
  • add_session:添加一个新会话,记录会话 ID 和当前时间。
  • remove_session:删除指定的会话。
  • clean_expired_sessions:清理过期会话,检查所有会话,如果会话时间超过指定的超时时间,则删除会话。

2. EventLoop类

这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

class EventLoop {
public:
    void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {
        auto next_run = std::chrono::steady_clock::now() + interval;
        events.emplace_back(next_run, interval, callback);
    }

    void run() {
        while (running) {
            auto now = std::chrono::steady_clock::now();
            for (auto& event : events) {
                if (now >= event.next_run) {
                    event.callback();
                    event.next_run = now + event.interval;
                }
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用
        }
    }

    void stop() {
        running = false;
    }

private:
    struct Event {
        std::chrono::steady_clock::time_point next_run;
        std::chrono::milliseconds interval;
        std::function<void()> callback;

        Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb)
            : next_run(nr), interval(i), callback(cb) {}
    };

    std::vector<Event> events;
    bool running = true;
};
  • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
  • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
  • stop:停止事件循环。

3. 回调函数

定义一个回调函数,用于清理过期会话。
如果我们定义两个不同任务的回调函数,那么是不是就可以在某段时间实现多个任务了呢?

void clean_sessions_task(SessionManager& session_manager, std::chrono::seconds timeout) {
    session_manager.clean_expired_sessions(timeout);
}

这个函数调用 SessionManager 的 clean_expired_sessions 方法来清理过期会话。

4. main 函数

int main() {
    SessionManager session_manager;
    EventLoop event_loop;

    // 添加一些示例会话
    session_manager.add_session("session1");
    std::this_thread::sleep_for(std::chrono::seconds(1));
    session_manager.add_session("session2");

    // 每2秒清理一次过期会话(假设过期时间为3秒)
    event_loop.add_event(std::bind(clean_sessions_task, std::ref(session_manager), std::chrono::seconds(3)), std::chrono::seconds(2));

    // 运行事件循环
    std::thread event_loop_thread([&event_loop]() {
        event_loop.run();
    });

    // 模拟服务器运行
    std::this_thread::sleep_for(std::chrono::seconds(10));

    // 停止事件循环
    event_loop.stop();
    event_loop_thread.join();

    return 0;
}
  1. 创建 SessionManager 和 EventLoop 对象。
  2. 添加两个示例会话,分别在 0 秒和 1 秒时添加。
  3. 添加一个定时事件,每 2 秒调用一次 clean_sessions_task 来清理过期会话,过期时间为 3 秒。
  4. 启动一个线程运行事件循环。
  5. 主线程模拟服务器运行 10 秒钟。
  6. 停止事件循环并等待事件循环线程结束。

如果我们添加两个定时事件,并且打开循环,那么不是就在某段时间实现了异步任务调度呢?

基于事件队列+回调函数实现异步操作

这个示例展示了如何使用标准 C++ 库和 C++11 线程库,通过事件队列和回调函数在单线程中实现异步任务调度。具体来说,我们实现了一个简单的系统,可以调度并执行延迟任务。

1.EventLoop 类

这个类管理定时任务,通过维护一个事件列表并运行事件循环,在指定时间间隔内执行任务。

class EventLoop {
public:
    // 添加一个新的定时事件
    void add_event(std::function<void()> callback, std::chrono::milliseconds interval) {
        auto next_run = std::chrono::steady_clock::now() + interval;
        events.emplace_back(next_run, interval, callback);
    }

    // 运行事件循环
    void run() {
        while (running) {
            auto now = std::chrono::steady_clock::now();
            for (auto& event : events) {
                if (now >= event.next_run) {
                    event.callback(); // 执行回调函数
                    event.next_run = now + event.interval; // 更新下一次执行时间
                }
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(10)); // 小休眠,减少CPU占用
        }
    }

    // 停止事件循环
    void stop() {
        running = false;
    }

private:
    struct Event {
        std::chrono::steady_clock::time_point next_run;
        std::chrono::milliseconds interval;
        std::function<void()> callback;

        Event(std::chrono::steady_clock::time_point nr, std::chrono::milliseconds i, std::function<void()> cb)
            : next_run(nr), interval(i), callback(cb) {}
    };

    std::vector<Event> events;
    bool running = true;
};
  • add_event:添加一个新的定时事件,指定回调函数和时间间隔。
    • 参数 callback 是一个回调函数,当事件触发时调用。
    • 参数 interval 是一个时间间隔,表示事件应该在多长时间后执行。
    • next_run 记录了事件的下一次执行时间。
  • run:启动事件循环,循环检查所有事件,如果事件到期则执行回调函数,并更新下一次执行时间。
    • while (running):事件循环在 running 为 true 时持续运行。
    • event.callback():执行回调函数,表示事件触发。
    • event.next_run = now + event.interval:更新事件的下一次执行时间。
    • std::this_thread::sleep_for(std::chrono::milliseconds(10)):通过短暂休眠来减少 CPU 占用。
  • stop:停止事件循环,将 running 设置为 false。

2. 回调函数

定义两个简单的回调函数,用于示例任务。

void say_hello() {
    std::cout << "Hello, World!" << std::endl;
}

void say_goodbye() {
    std::cout << "Goodbye, World!" << std::endl;
}

3.main函数

设置并运行事件循环,添加一些示例任务,并定期执行这些任务。

int main() {
    EventLoop loop;

    // 添加定时事件
    loop.add_event(say_hello, std::chrono::seconds(1)); // 每1秒执行一次
    loop.add_event(say_goodbye, std::chrono::seconds(2)); // 每2秒执行一次

    // 运行事件循环
    loop.run();

    return 0;
}

异步任务的实现

异步任务的实现核心在于 EventLoop 类中的 add_event 和 run 方法:

  • add_event 方法将任务(回调函数)和执行时间(间隔)添加到事件队列中。
  • run 方法启动事件循环,定期检查事件队列,触发到期的任务,并通过回调函数执行这些任务。

具体来说,异步任务的执行发生在以下代码行:

if (now >= event.next_run) {
    event.callback(); // 执行回调函数
    event.next_run = now + event.interval; // 更新下一次执行时间
}
  • event.callback():当当前时间 now 大于等于 event.next_run 时,执行回调函数,表示异步任务触发并执行。

这个机制允许在单线程环境中,通过事件队列和回调函数实现异步任务调度,无需多线程或协程。每个任务在指定的时间间隔后触发,保持事件循环的运行和任务的调度。

事件队列\定时器 + 回调真的能实现异步任务吗

当回调任务是一个非常耗时的操作时,如果在单线程中执行,确实会导致事件循环被阻塞,从而影响其他任务的执行。这违背了异步执行的初衷,即不阻塞程序的其他部分。

为了解决这个问题,主要还是以下方法:

  • 使用线程池
    • 我们可以使用一个线程池来处理耗时的任务,从而避免阻塞事件循环。线程池可以并发执行多个任务,确保事件循环保持响应。
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐