一文读懂flask–gunicorn是如何启动flask应用

1.gunicorn是如何启动flask应用的

在了解了开发模式下flask是如何启动,并且是如何监听请求,在收到请求后又是怎么处理请求之后,接下来进一步了解在生产环境中最常用的gunicorn是如何启动、如何监听请求,并且保证一个请求只会被一个worker处理,而不会被多个worker处理,以及woker又是如何将请求分发到flask应用中的,同时这里也会提供一个简单的示例演示如何为每一个woker添加ros的初始化节点用以保证对外暴露的接口能够正常通过ros服务进行通信

1.1 gunicorn启动过程

gunicorn中入口方法是gunicorn.app.base:Application类的run方法,这里主要分析如何通过api的方式启动,不对命令行方式做解读,以下是定义一个自定义的Flask应用,并通过继承gunicorn.app.base:BaseApplication的方式拓展gunicorn的启动方式,将通过命令行的方式修改为何执行普通python监本一样的方式通过gunicorn启动我们的应用,以下是一个简单的代码示例:

class CustomWorker(SyncWorker):
    worker_count = 0  # 记录初始化次数,给woker节点编号

    def __init__(self, *args, **kwargs):
        super(CustomWorker, self).__init__(*args, **kwargs)
        self.__class__.worker_count += 1

    @classmethod
    def get_worker_count(cls):
        return cls.worker_count

    def init_process(self):
        if not rospy.core.is_initialized():  # 判断当前woker是否已经初始化过节点
            node_name = f'dispatch_interface_services_worker{self.get_worker_count()}'  # 使用worker编号给节点命名
            rospy.init_node(node_name)
            logger.info(f"ROS Node {node_name} initialized in worker {os.getpid()}")
        super(CustomWorker, self).init_process()
        

class RobotApp(Flask):
    def __init__(self, import_name):
        super(RobotApp, self).__init__(import_name)
        self.setup_blueprints()

    @staticmethod
    def check():
        return Response(None)

    def setup_blueprints(self):
    """这个方法主要用来注册路由和视图"""
        ......
        


class Application(BaseApplication):
    def __init__(self, service_name, options=None):
    “”“
    options: options = {
        'bind': f"127.0.0.1:8888",  # 绑定地址和端口
        'workers': 4,  # 指定 workers 数量
        'accesslog': "-",  # 输出到标准输出,取消gunicorn接管日志输出
        'worker_class': 'CustomWorker', 自定义worker,在这里初始化ros节点,包路径,需要从工作目录到这个类所在的完整路径
        'timeout': 300
    }
    ”“”
        self.options = options or {}
        super(Application, self).__init__()
        self.application = None
        self.logger = logger  # 自定义一直输出,如果项目中已经使用了其他的日志框架,而不想让gunicorn接管日志输出,需要将日志框架的对象指定给这里

    def load_config(self):
        config = {key: value for key, value in iteritems(self.options)
                  if key in self.cfg.settings and value is not None}
        for key, value in iteritems(config):
            self.cfg.set(key.lower(), value)

    def load(self):
        return RobotApp(__name__)
    
if __name__ == '__main__':
    app = Application("dispatch_interface_services", options={"bind": "0.0.0.0:5000", "workers": 1'worker_class': 'CustomWorker'})
    app.run()

以上是一个简单的实例,演示了如何通过gunicornapi的方式开发,并提供了通过自定义worker的方式为每个worker启动的时候添加自己的逻辑,这里主要是为了给每个worker初始化一个固定名称的ros节点,对于Flask应用来说是没有任何改动,只是在启动gunicorn的时候换了一种更加灵活和方便的方式,接下来就看一下gunicorn又是如何启动的。

在上述代码中,可以看到自定义的Application类是继承了gunicorn.app.base.BaseApplication类,然后直接通过调用run方法进行启动的,那么就从这个方法开始,看一下启动过程都有哪些步骤,下面这段代码是BaseApplicationrun方法的具体实现:

class BaseApplication(object):
    """
    An application interface for configuring and loading
    the various necessities for any given web framework.
    """
    ......

    def run(self):
        try:
            Arbiter(self).run()
        except RuntimeError as e:
            print("\nError: %s\n" % e, file=sys.stderr)
            sys.stderr.flush()
            sys.exit(1)
            
class Arbiter(object):
    """
    Arbiter maintain the workers processes alive. It launches or
    kills them if needed. It also manages application reloading
    via SIGHUP/USR2.
    """

  	......

    def run(self):
        "Main master loop."
        self.start() # 调用start方法,初始化配置信息,创建相关文件描述符,注册signal信号,创建socket等操作
        util._setproctitle("master [%s]" % self.proc_name)

        try:
            self.manage_workers()  # 根据配置的worker数管理woker,少了则创建,多了会销毁

            while True:  # 开启无限循环监听注册的信号事件,并做出相应处理
                self.maybe_promote_master()

                sig = self.SIG_QUEUE.pop(0) if self.SIG_QUEUE else None
                if sig is None:
                    self.sleep()
                    self.murder_workers()
                    self.manage_workers()
                    continue

                if sig not in self.SIG_NAMES:
                    self.log.info("Ignoring unknown signal: %s", sig)
                    continue

                signame = self.SIG_NAMES.get(sig)
                handler = getattr(self, "handle_%s" % signame, None)
                if not handler:
                    self.log.error("Unhandled signal: %s", signame)
                    continue
                self.log.info("Handling signal: %s", signame)
                handler()
                self.wakeup()
        except (StopIteration, KeyboardInterrupt):
            self.halt()
        except HaltServer as inst:
            self.halt(reason=inst.reason, exit_status=inst.exit_status)
        except SystemExit:
            raise
        except Exception:
            self.log.info("Unhandled exception in main loop",
                          exc_info=True)
            self.stop(False)
            if self.pidfile is not None:
                self.pidfile.unlink()
            sys.exit(-1)

在这个run方法中主要通过以下几个步骤来启动处理请求的worker,并对worker进行管理:

  • 调用start方法,初始化配置信息,创建相关文件描述符,注册signal信号,创建socket等操作
  • 根据配置的worker数管理woker,少了则创建,多了会销毁,保持worker数和配置的数量保持一致
  • 开启无限循环监听注册的信号事件,并做出相应处理

解下来看一下又是怎么创建一个新的worker和对其进行管理的:

class BaseApplication(object):
    """
    An application interface for configuring and loading
    the various necessities for any given web framework.
    """
    ......
    
    def manage_workers(self):
        """\
        Maintain the number of workers by spawning or killing
        as required.
        """
        if len(self.WORKERS) < self.num_workers: # 判断当前worker数量
            self.spawn_workers() # 创建新的worker

        workers = self.WORKERS.items()
        workers = sorted(workers, key=lambda w: w[1].age)
        while len(workers) > self.num_workers:
            (pid, _) = workers.pop(0)
            self.kill_worker(pid, signal.SIGTERM)  # 销毁已经存在的worker

        active_worker_count = len(workers)
        if self._last_logged_active_worker_count != active_worker_count:
            self._last_logged_active_worker_count = active_worker_count
            self.log.debug("{0} workers".format(active_worker_count),
                           extra={"metric": "gunicorn.workers",
                                  "value": active_worker_count,
                                  "mtype": "gauge"})

    def spawn_worker(self):
        self.worker_age += 1
        worker = self.worker_class(self.worker_age, self.pid, self.LISTENERS,
                                   self.app, self.timeout / 2.0,
                                   self.cfg, self.log)  # 调用指定的worker类,实例化worker
        self.cfg.pre_fork(self, worker)
        pid = os.fork()
        if pid != 0:
            worker.pid = pid
            self.WORKERS[pid] = worker
            return pid

        # Do not inherit the temporary files of other workers
        for sibling in self.WORKERS.values():
            sibling.tmp.close()

        # Process Child
        worker.pid = os.getpid()
        try:
            util._setproctitle("worker [%s]" % self.proc_name)
            self.log.info("Booting worker with pid: %s", worker.pid)
            self.cfg.post_fork(self, worker)
            worker.init_process()  # 调用worker的方法,我这里在这个方法为worker自定义了初始化ros节点,也可根据需要添加其他逻辑,
            sys.exit(0)
        except SystemExit:
            raise
        except AppImportError as e:
            self.log.debug("Exception while loading the application",
                           exc_info=True)
            print("%s" % e, file=sys.stderr)
            sys.stderr.flush()
            sys.exit(self.APP_LOAD_ERROR)
        except Exception:
            self.log.exception("Exception in worker process")
            if not worker.booted:
                sys.exit(self.WORKER_BOOT_ERROR)
            sys.exit(-1)
        finally:
            self.log.info("Worker exiting (pid: %s)", worker.pid)
            try:
                worker.tmp.close()
                self.cfg.worker_exit(self, worker)
            except Exception:
                self.log.warning("Exception during worker exit:\n%s",
                                 traceback.format_exc())
    
    def spawn_workers(self):
        """\
        Spawn new workers as needed.

        This is where a worker process leaves the main loop
        of the master process.
        """

        for _ in range(self.num_workers - len(self.WORKERS)):  # 遍历配置的worker数量,创建指定数量的worker
            self.spawn_worker()
            time.sleep(0.1 * random.random())

在这里通过执行以下几个步骤,来完成对worker的初始化,以及加载相关配置信息,按照指定的worker数量创建worker

  • 判断当前worker数量, 小于指定的worker数,则创建新的worker
  • 遍历配置的worker数量,创建指定数量的worker
  • 调用指定的worker类,实例化worker
  • 调用worker的方法,我这里在这个方法为worker自定义了初始化ros节点,也可根据需要添加其他逻辑

1.2 处理请求过程

在上述的创建worker的过程中,会为每一个worker分配监听的文件描述符、ros节点信息、并开启请求监听,到这里启动worker的过程就已经结束,接下来主要看一下是怎么为worker分配socket描述符以及如何开启监听的,又是怎么做到只有一个一个请求只会被一个worker处理的,下面是worker的初始化过程,在这里也有一个简单的继承关系如下,在前面我自定义的worker是继承gunicorn.workers.sync:SyncWorker,完整的继承关系如下

CustomWorker -> gunicorn.workers.sync.SyncWorker -> gunicorn.workers.base.Worker

而在前面调用的init_process方法除了在自定义CustomWorker中实现外,便是在gunicorn.workers.base.Worker中有实现,而在CustomWorker中只是添加了我自己的逻辑,最终还是调用了父类的这个方法,所以这里直接看gunicorn.workers.base.Worker中的实现,以下是部分源码

class Worker(object):

    ......

    def init_process(self):
        """\
        If you override this method in a subclass, the last statement
        in the function should be to call this method with
        super().init_process() so that the ``run()`` loop is initiated.
        """

        # set environment' variables
        if self.cfg.env:
            for k, v in self.cfg.env.items():
                os.environ[k] = v

        util.set_owner_process(self.cfg.uid, self.cfg.gid,
                               initgroups=self.cfg.initgroups)  # 设置worker的拥有者,通过用户名配置,并配置用户组

        # Reseed the random number generator
        util.seed()

        # For waking ourselves up
        self.PIPE = os.pipe()  # 创建一个单向的管道
        for p in self.PIPE:
            util.set_non_blocking(p)
            util.close_on_exec(p)

        # Prevent fd inheritance
        for s in self.sockets:
            util.close_on_exec(s)
        util.close_on_exec(self.tmp.fileno())

        self.wait_fds = self.sockets + [self.PIPE[0]]  # 指定监听的描述符信息,包括上述的管道和之前创建的网络socket

        self.log.close_on_exec()

        self.init_signals()

        # start the reloader
        if self.cfg.reload:
            def changed(fname):
                self.log.info("Worker reloading: %s modified", fname)
                self.alive = False
                os.write(self.PIPE[1], b"1")
                self.cfg.worker_int(self)
                time.sleep(0.1)
                sys.exit(0)

            reloader_cls = reloader_engines[self.cfg.reload_engine]
            self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
                                         callback=changed)

        self.load_wsgi()  # 通过调用app的load方法加载Flask应用的实例,并赋值给self.wsgi,就不在这里展示具体代码,感兴趣的可以去看一下
        if self.reloader:
            self.reloader.start()

        self.cfg.post_worker_init(self)

        # Enter main run loop
        self.booted = True
        self.run()  # 开始请求监听

在这个方法中首先会给worker设置拥有者,通过用户名配置,并配置用户组信息,再创建一个单向的管道,指定监听的描述符信息,包括上述的管道和之前创建的网络socket,通过调用appload方法加载Flask应用的实例,并赋值给self.wsgi,最后开始请求监听,那么接着看一下gunicorn是如何实现请求监听,并在收到请求是处理请求的

class SyncWorker(base.Worker):

    ......

    def run_for_multiple(self, timeout):
        while self.alive:
            self.notify()

            try:
                ready = self.wait(timeout)
            except StopWaiting:
                return

            if ready is not None:
                for listener in ready:
                    if listener == self.PIPE[0]:
                        continue

                    try:
                        self.accept(listener)  # 这里是通过调用操作系统提供的接口获取请求信息并进行处理,在这里保证了同一个收到的请求数据只会被一个worker获取并处理
                    except EnvironmentError as e:
                        if e.errno not in (errno.EAGAIN, errno.ECONNABORTED,
                                           errno.EWOULDBLOCK):
                            raise

            if not self.is_parent_alive():
                return

    def run(self):
        # if no timeout is given the worker will never wait and will
        # use the CPU for nothing. This minimal timeout prevent it.
        timeout = self.timeout or 0.5

        # self.socket appears to lose its blocking status after
        # we fork in the arbiter. Reset it here.
        for s in self.sockets:
            s.setblocking(0)

        if len(self.sockets) > 1:  # 如果指定了监听的地址,这里只会有一个,但是如果指定的是类似于0.0.0.0:8888的会监听主机上所有网卡的特定端口,会有多个socket
            self.run_for_multiple(timeout)
        else:
            self.run_for_one(timeout)
            
     ......

这个方法里面比较简单,只是通过需要监听的sockets数量来开启不同的监听。如果指定了监听的地址,这里只会有一个,但是如果指定的是类似于0.0.0.0:8888的会监听主机上所有网卡的特定端口,会有多个socket,则需要监听多个socket,同时通过调用操作系统提供的接口获取请求信息并进行处理,在这里保证了同一个收到的请求数据只会被一个worker获取并处理

class SyncWorker(base.Worker):

    def accept(self, listener):
        client, addr = listener.accept()  # 获取客户端连接的文件描述符,以及客户端地址
        client.setblocking(1)
        util.close_on_exec(client)
        self.handle(listener, client, addr)
     
     ......
     
     def handle(self, listener, client, addr):
        req = None
        try:
            if self.cfg.is_ssl:
                client = ssl.wrap_socket(client, server_side=True,
                                         **self.cfg.ssl_options)

            parser = http.RequestParser(self.cfg, client, addr)  # 指定请求解析类
            req = next(parser)  # 通过调用指定的解析类的mesg_class属性的Request类来进行请求解析
            self.handle_request(listener, req, client, addr)  # 开始处理请求
        except http.errors.NoMoreData as e:
            self.log.debug("Ignored premature client disconnection. %s", e)
        except StopIteration as e:
            self.log.debug("Closing connection. %s", e)
        except ssl.SSLError as e:
            if e.args[0] == ssl.SSL_ERROR_EOF:
                self.log.debug("ssl connection closed")
                client.close()
            else:
                self.log.debug("Error processing SSL request.")
                self.handle_error(req, client, addr, e)
        except EnvironmentError as e:
            if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
                self.log.exception("Socket error processing request.")
            else:
                if e.errno == errno.ECONNRESET:
                    self.log.debug("Ignoring connection reset")
                elif e.errno == errno.ENOTCONN:
                    self.log.debug("Ignoring socket not connected")
                else:
                    self.log.debug("Ignoring EPIPE")
        except Exception as e:
            self.handle_error(req, client, addr, e)
        finally:
            util.close(client)

在这里做了以下几个步骤获取到请求信息,并开始处理请求的

  • 获取客户端连接的文件描述符,以及客户端地址
  • 指定请求解析类
  • 通过调用指定的解析类的mesg_class属性的Request类来进行请求解析
  • 开始处理请求

到这里便已经完成了请求数据的读取以及解析为python中的对象,接下来便是请求的处理

class SyncWorker(base.Worker):

    ......

	def handle_request(self, listener, req, client, addr):
        environ = {}
        resp = None
        try:
            self.cfg.pre_request(self, req)  # 执行自定义的开始处理请求前的操作,比如打印日志,添加记录或者给请求添加某个属性等
            request_start = datetime.now()
            resp, environ = wsgi.create(req, client, addr,
                                        listener.getsockname(), self.cfg)
            # Force the connection closed until someone shows
            # a buffering proxy that supports Keep-Alive to
            # the backend.
            resp.force_close()
            self.nr += 1
            if self.nr >= self.max_requests:
                self.log.info("Autorestarting worker after current request.")
                self.alive = False
            respiter = self.wsgi(environ, resp.start_response)  # 这里便是前面提到的Flask应用的实例,通过调用这个实例的__call__方法处理请求
            try:
                if isinstance(respiter, environ['wsgi.file_wrapper']):
                    resp.write_file(respiter)
                else:
                    for item in respiter:
                        resp.write(item)
                resp.close()
                request_time = datetime.now() - request_start
                self.log.access(resp, req, environ, request_time)
            finally:
                if hasattr(respiter, "close"):
                    respiter.close()
        except EnvironmentError:
            # pass to next try-except level
            util.reraise(*sys.exc_info())
        except Exception:
            if resp and resp.headers_sent:
                # If the requests have already been sent, we should close the
                # connection to indicate the error.
                self.log.exception("Error handling request")
                try:
                    client.shutdown(socket.SHUT_RDWR)
                    client.close()
                except EnvironmentError:
                    pass
                raise StopIteration()
            raise
        finally:
            try:
                self.cfg.post_request(self, req, environ, resp)  # 执行自定义的请求结束后的操作,可以是删除某个属性、回收某个资源等操作
            except Exception:
                self.log.exception("Exception in post_request hook")

这个方法里面主要是先执行请求开始前的钩子函数,以及调用Flask应用的__call__方法处理请求。过程和之前的文章一样,这里就不在赘述,之后请求处理完成后调用请求后的钩子函数,到这里就了解完了gunicorn是如何启动Flask项目,以及是如何监听请求,又是如何分发请求的

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐