86ee8b67ca64e8c8c9f88a5439e15e50.png

叶天奇:(技术报告)<tk>:一个可移植的c++事件驱动模拟库 | 负薪集

项目地址,现已更新:https://github.com/yecharlie/tk


撰文 | 叶天奇

简单来说,<tk>是一个基于c++的事件驱动模拟库,其定位类似于<boost/fiber>,<tk>脱胎于c++早期编译器cfront里面的一个古老的多任务模拟库<task>。除了具有可移植外的特性外,<tk>还得以利用现代c++编译器支持的一些机制,诸如模板等,并对<task>的部分设计做有限但必要的调整。本文将通过一个模拟实例来介绍<tk>的基本功能。

模拟任务——情景剧《休闲时光》

背景:又到了周日下午,三个好友小木、小兰和工具人聚在家里喝茶、聊天,平日里都有处理不完的课业,而再过一个月就是考试周,想一想只有现在才是难得的清闲啊。不知不觉,落地窗外的太阳又踱了两步,三个人决定就在房间里看电影,片名叫《爱你,西蒙》,正当工具人调试投影仪的时候,小木像是想到什么跑出了房门,不一会笑嘻嘻地回来,扔下一袋瓜子,手里还顺带拿了个小碟子……现实是很清楚的,工具人负责剥瓜子,小木、小兰则负责消费瓜子(仁)。每一次,工具人会把一到三颗不等的瓜子一并放到碟子中,其数目受电影情节的起伏以及自己当时的心情的影响,当然,一次放的越多花的时间也越长。而小木、小兰每隔两个和三个单位时间会分别取一颗瓜子。小木、小兰是很有教养的孩子,其父母也都是当地的体面人,虽然小木拿的比小兰勤,但如果小木在拿的时候看到小兰也准备要拿,会主动等小兰拿完自己再去拿。现在请用程序还原当时的情景。

问题分析与任务设计

在定义各自的任务前,要明确事件发生的依赖关系,生产瓜子和消费瓜子便是一对有依赖关系的事件,为此可以利用<tk>中的object类和task类的wait方法,object类的接口为:

class object
{
	public:
		virtual ~object();

		void remember(task*);
		void forget(task*);
		void alert();
		virtual int pending();
		virtual void print(int);
		static task* this_task() {return thistask;}
		static int task_error(int, object*);
};

首先定义pending函数,当某任务(task对象)对该object对象调用wait函数时,如果pending返回0任务会休眠;当依赖条件解除后就可以条用alert函数将对应的任务唤醒。这几个函数总是配套使用的。现在可以定义瓜子类(seeds)记录当前碟中剩余瓜子数,首先包括头文件。eeds类继承object类,并定义了put方法(注意到调用了alert函数)、take方法来放置瓜子仁和取瓜子仁:

#include "tk.h"

#include <cstdlib> // rand()
#include <vector>
#include <sstream>
#include <fstream>

struct seeds : public tk::object 
{
	int count;
	int pending() {return count == 0;}
	void put(int n) {if ((count += n) == n) alert();}
	int take() {
		if (!count) return 0;
		else {count--; return 1;}
	}
};

接着就定义生产者类(producer),让producer继承task类并覆盖routine函数,task类的接口像是这个样子:

class task : public sched
{
	protected:
	       task(const char* name=0, std::size_t stacksize=DFSS);
	public:
	       const char* t_name;
	       static std::forward_list<task*> task_chain() {return taskchain;} 

	       void start();
	       virtual ~task();
	       void wait(object*);
	       int waitlist(object* ...);
	       int waitvec(object**);
	       void delay(long);
	       void sleep(object* t = 0);
	       void cancel(int);
	       void setwho(object* o) {t_alert = o;}
	       void print(int);
	       object* who_alerted_me() {return t_alert;}
	private:
	       virtual void routine() = 0;

};

每个任务有待机(IDLE)、运行(RUNNING)、终止(TERMINATED)三种状态。所有正在接受调度的任务都处于运行态,start函数就用于把新建的(一组)任务送去调度。如果一个任务的例行程序(routine函数)执行完毕,它会自动转为终止态,此外还可以调用cancel函数来强制终止任务。处于这两种情况之外的任务则处于待机状态,比如一个休眠(通过sleep函数)了的任务将处于待机状态,一个生产者类的定义可以是:

const int MIN_REQ_SEEDS = 300;

class producer : public tk::task
{
friend class logger;
	private:
		static int allprd;
		int nprd;
		seeds* sp;
		void routine();
	public:
		static std::vector<producer*> pvec;
		producer(const char* n, seeds* ss) : sp(ss), task(n)  {pvec.push_back(this);}
		~producer() {if (pvec.empty()) delete sp;}
};

int producer::allprd = 0;
std::vector<producer*> producer::pvec;

void producer::routine()
{
	while (allprd < MIN_REQ_SEEDS) {
		// assume it takes the producer #interval time  to produce #cnt amount of seeds 
		int interval = std::rand() % 4 + 2; // [2, 5]

		// cnt is propotional with interval
		int cnt;
	        if (interval <= 2)
			cnt = 1;
		else if (interval <= 4)
			cnt = 2;
		else
			cnt = 3;
		delay(interval);

		sp->put(cnt);
		nprd += cnt;
		allprd += cnt;
		std::printf("%3d | %s produces %d seeds at %ldn", sp->count, t_name, cnt, getclock());
	}
}

这里假定每次剥瓜子需要2到5单位时间,能对应产生1到3颗瓜子仁,为了体现生产活动时间消费的概念,使用了 delay函数,它使当前推迟到一定时间后执行,然后系统立即转入下一个任务。只有delay可以拨动<tk>的系统时钟前进。


在定义消费者类(customer)时出现了新的事件依赖关系:如果在某一时刻有多个消费者同时活动,那么活动间隔长的消费者的优先级更高,可以用定义seeds类的做法定义一个礼仪类来表达这种关系,但这里选择直接在customer类中定义依赖关系,之所以能这样做是因为task类也是从object类(间接)继承过来的。customer类定义如下:

class customer : public tk::task
{
friend class logger;
	private:
		int ncus;
		seeds* sp;
		int modest(); 
		void acknowledge() {alert();}
		void routine();
	public:
		static std::vector<customer*> cvec;
		customer(const char* n, int interv, seeds* ss) : task(n), interval(interv),sp(ss) {cvec.push_back(this);}
		~customer() {sp->forget(this);}

		int interval;
};
std::vector<customer*> customer::cvec;


int customer::modest()
{
	customer* cp;

	for (int i = 0; i<cvec.size() && ( cp = cvec[i] ); i++)
		if (cp != this 
			&& cp
			&& cp->rdstate() == tk::sched::RUNNING
			&& cp->rdtime() == rdtime()
			&& interval < cp->interval) {
			// let cp reminder this later
			sleep(cp);
			return 1;
		}
	return 0;
}


void customer::routine()
{
	while (true) {
		do {
			wait(sp);
		} while (modest());

		ncus += sp->take();
		acknowledge();
		std::printf("%3d | %s consumes %d seed at %ldn", sp->count, t_name, 1, getclock());

		delay(interval);
	}
}

在customer类中的routine函数里面,使用了wait函数来等待新瓜子仁的产生,某消费者在等到瓜子仁后,会先谦让(modest函数)一下,在modest函数中,如果此时发现了一个优先级更高的消费者(记为cp),调用sleep函数让自己休眠,并让cp记住自己。此后系统自动转入下一个任务,当cp消费完则后会通过致谢(acknowledge函数)来唤醒自己。rdtime函数返回任务的调度时间,rdstate函数返回任务状态。注意到这次delay的位置放在了最后。


可能还希望有一个日志类(logger)来记录系统状态的变化,便于事后分析。这时可以将logger任务设置为时钟任务。包括在创建的时刻,时钟任务在每个新时刻开始之前都会被系统自动调用。

class logger : public tk::task
{
		std::list<std::string>*  data;
		seeds* sp;
		static std::string title(std::vector<producer*>& pvec, std::vector<customer*>& vec);
		void routine();
	public:
		logger(seeds* ss, int sz) : task("logger", sz), data(new std::list<std::string>), sp(ss){}
		void flush(const char*);
};

std::string logger::title(std::vector<producer*>& pvec, std::vector<customer*>& cvec)
{
	producer* pp;
	customer* cp;
	std::ostringstream oss;
	oss << "Time" << ",";
	for (int i = 0; i < pvec.size() && (pp=pvec[i]); i++) oss << pp->t_name << ",";
	for (int j = 0; j < cvec.size() && (cp=cvec[j]); j++) oss << cp->t_name << ",";
	oss << "Seeds";
	return oss.str();
}

void logger::flush(const char* n) 
{
	if (data) {
		std::ofstream out(n);
		auto it = data->begin();
		for (;it != data->end(); it++)
			out << *it << std::endl;
		delete data;
		data = 0;
	}
}


void logger::routine()
{
	std::vector<producer*>& pvec = producer::pvec;
	std::vector<customer*>& cvec = customer::cvec;
	std::ostringstream oss;
	data->push_back(title(pvec, cvec));

	producer* pp;
	customer* cp;
	while (true) {
		std::ostringstream oss;
		oss << getclock() << ",";
		for (int i = 0; i < pvec.size() && (pp=pvec[i]); i++) oss << pp->nprd << ",";
		for (int j = 0; j < cvec.size() && (cp=cvec[j]); j++) oss << cp->ncus << ",";
		oss << sp->count;
		data->push_back(oss.str());

		sleep();
	}
}

也因此,logger的routine里面调用的是无参sleep函数。getclock返回当前系统时钟。logger的构造函数中多了一个sz参数来指定任务所需的内存空间,默认情况下一个任务分配的空间为10000字节。用时,logger定义了flush函数来把收集到的记录写入硬盘,flush函数需要在所有任务结束后调用。最后写出main函数定义:

void exit() 
{
	// save log
	logger* lg =  ( logger* ) tk::sched::clock_task;
	lg->flush("./test_task_leisure_time.out");

	// the folowing operations are optional

	customer* cp;
	std::vector<customer*>& cvec = customer::cvec;
	while (!cvec.empty()) {
		cp = cvec.back();
		cp->cancel(0); // IDLE->TERMINATED, required before one could delete it
		cvec.pop_back();
		delete cp;
	}

	producer* pp;
	std::vector<producer*>& pvec = producer::pvec;
	while (!pvec.empty()) {
		pp = pvec.back();
		pvec.pop_back();
		delete pp;
	}

	// delete logger task (current task) 
	lg->cancel(0);
	delete lg;
}


int main()
{
	seeds* sd = new seeds();

	// the order in which the lg (along with other tasks) is created doesnot matter.
	// explicitly allocate 15000 bytes for lg by task(name, stack_size). The default stack size for every tak is 10000.
	logger* lg = new logger(sd, 15000); 

	// clock task will be invoked at every new clock tick before any other tasks could execute
	tk::sched::clock_task = lg;

	// invoke logger after all task finished
	// to do clean job, optional
	tk::sched::clock_task_exit = true;

	producer* pd = new producer("Tool", sd);
	customer* cs = new customer("XiaoMu", 2, sd);
	customer* cs2 = new customer("XiaoLan", 3, sd);

	// start all created task in a sequence
	currenttask->start();

	// write to disk & clear objects on exiting 
	tk::sched::exit_fct = exit;
}

首先创建了logger任务,给它显式分配15000字节的空间,同时将其设置为时钟任务(clocktask)。这里还用到配合时钟任务使用的标志位clock_taskexit,如果设置为rue,系统会在没有可调度任务时最后调度一次时钟任务,这里只是为了在最后清理对象的时候能简化代码。即使模拟的任务没有使用时间机制(没有使用delay函数),也可以设置clock_task和clock_task_exit在系统的最初和最后执行特定任务。

在系统启动时会创建一个默认的主函数任务(main),在主函数中可以使用currenttask替代main任务的this指针。可能和想象中的不一样,start函数不是启动自己,而是启动自己创建的所有子任务。调用start函数后,系统会立即转入子任务,并依据创建顺序依次调用子任务,假设在子任务中没有创建新的任务,在一轮调用完毕后会转回父任务(这里是main函数)。回到main函数后,利用exit_fct来设置在所有调度结束后(包括由clock_task_exit设置的时钟任务的调度)的退出函数exit,main任务结束,其占用空间被回收。此时程序不会退出,直到所有调度结束,系统会调用exit函数,保存日志,清理对象,最后结束。

不幸的是,<tk>系统不会帮助用户清理task对象,而且清理工作也比想象中要更麻烦一些。直接删除当前任务会触发调度,这也是设置clock_task_exit的直接原因,此外还需要考虑到结束时各任务的状态,以及结束时customer对象对seeds对象的依赖关系。幸运的是,清理操作是可选的,而且task对象本身占用的空间并不大。

模拟结果与分析

最后模拟结果如下:

  2 | Tool produces 2 seeds at 3
  1 | XiaoLan consumes 1 seed at 3
  0 | XiaoMu consumes 1 seed at 3
  3 | Tool produces 3 seeds at 8
  2 | XiaoLan consumes 1 seed at 8
  1 | XiaoMu consumes 1 seed at 8
  0 | XiaoMu consumes 1 seed at 10
  2 | Tool produces 2 seeds at 12
  1 | XiaoLan consumes 1 seed at 12
  0 | XiaoMu consumes 1 seed at 12
  2 | Tool produces 2 seeds at 15
  1 | XiaoLan consumes 1 seed at 15
  0 | XiaoMu consumes 1 seed at 15
  1 | Tool produces 1 seeds at 17
  0 | XiaoMu consumes 1 seed at 17
  1 | Tool produces 1 seeds at 19
  0 | XiaoLan consumes 1 seed at 19
...
  2 | Tool produces 2 seeds at 520
  1 | XiaoLan consumes 1 seed at 520
  0 | XiaoMu consumes 1 seed at 520
  3 | Tool produces 3 seeds at 525
  2 | XiaoLan consumes 1 seed at 525
  1 | XiaoMu consumes 1 seed at 525
  0 | XiaoMu consumes 1 seed at 527

从部分数据中可以的确总结出规律,当有新瓜子出现时,的确都是小兰先拿,除非是小兰上次任务周期还没有结束,

简单处理日志文件“test_task_leisure_time.out”,可以得到下图结果:

cebfadc250904891980282d87bad48d5.png
模拟程序日志的分析结果,Seeds表示当时剩余的瓜子仁数目,显然生产者速度跟不上消费者(们)的速度

从图中可以看到几个人的生产/消费速率基本稳定,可以估算出小木每取一颗瓜子的平均延迟为1.06个单位时间,小兰为1.09个单位时间。

总结

本文通过一个实例介绍了<tk>的基本功能,包括利用object类实现任务的简单通信,任务间的直接通信,定义任务,利用时钟任务记录日志,利用退出函数做清理工作。通常情况下使用<tk>中预定义的队列模板qtail/qhead进行任务通信会更方便一些,具体参考<task>文献[1]。<tk>继承了<task>大部分接口,少部分不同之处被整理成了表格,请参考附录。

参考文献

[1] Stroustrup, Bjarne & Shopiro, J.. (1985). A set of C++ classes for co-routine style programming.

附录

A <tk>与<task>接口不同之处总结

3c9b514a21f228db2e3c1b81fa5eea53.png
&amp;amp;amp;amp;lt;tk&amp;amp;amp;amp;gt;与&amp;amp;amp;amp;lt;task&amp;amp;amp;amp;gt;接口不同之处总结

B 运行时错误,<tk>有着和<task>相类似的错误处理机制,系统会捕获以下特定的运行时错误:

  1. E_OLINE, "object::delete(): has chain"
  2. E_SCHOBJ, "sched object used directly (not as base)"
  3. E_RESTERM, "sched::insert(): cannot schedule terminated sched"
  4. E_RESRUN, "sched::schedule(): running"
  5. E_NEGTIME, "sched::schedule(): clock<0"
  6. E_SETCLOCK, "sched::setclock(): clock!=0"
  7. E_RESULT, "task::result(): thistask->result()”
  8. E_SCHTIME, "sched::schedule(): runchain corrupted: bad time"
  9. E_TASKDEL, "task::~task(): not terminated"
  10. E_ILLSTART, "task::start(): not by thistask"
  11. E_NSTART, "sched::schedule(): new created task(s) not started"
  12. E_CLINE, "ctrler::insert_after(): has been inserted earlier"
  13. E_LONGJMP, "task::setjmp_limited(): unmatched Jcode"
  14. E_STACKSIZE, "task::eat(): insufficient stack size"
  15. E_STACKALLOC, "task::eat(): cannot allocate stack prpperly"
  16. E_STACKOVERFLOW, "task::restore(): the stack has corrupted."
  17. E_NOSPACE, "task::restore(): no free stack space"
  18. E_QDEL, "queue::~queue(): not empty"
  19. E_PUTFULL, "qtail::put(): full"
  20. E_GETEMPTY, "qhead::get(): empty"
  21. E_BACKFULL,"qhead::putback(): full"
  22. E_TIMERDEL,"timer::~timer(): not terminated"
  23. E_WAIT,"task::wait() | waitvec() | waitlist() : wait for self"
  24. E_CLOCKIDLE,"sched::schedule(): clock_task not idle"
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐