Garland +

peewee 里的 Error 2006 MySQL server has gone away 报错

场景

使用 peewee 这个 ORM 的过程中或多或少都会遇到 Error 2006: MySQL server has gone away, 不过我最近遇到的有点多,详细看了下,作文记之。

官网解释

peewee 官方文档是明确的对这种情况做了说明

This particular error can occur when MySQL kills an idle database connection. This typically happens with web apps that do not explicitly manage database connections. What happens is your application starts, a connection is opened to handle the first query that executes, and, since that connection is never closed, it remains open, waiting for more queries.

To fix this, make sure you are explicitly connecting to the database when you need to execute queries, and close your connection when you are done. In a web-application, this typically means you will open a connection when a request comes in, and close the connection when you return a response. ```

也就是如果一个 mysql 连接长时间不用的话,mysql server 会主动 kill 调(据说默认是 6 小时),而此时 peewee 的 client 端的状态没有发生变化,所以下一次的请求过来,使用该连接就会报错。

要解决这个问题,就需要显示的去打开和关闭连接,web 应用的话也就是加上钩子。

uwsgi fork

之前同事遇到了这种情况,也简单说下,uwsgi 部署是 fork 模式,也就是先起一个 web 应用主进程,然后 fork 出其他多个进程出来,我们自己实现了一个 fork_safe 方法来包装这些 io client 实例, 目的是在各自进程调用各自 client 上的方法时,使用的是当前进程 pid 自己的 connection

但是有些代码块外面包装有 @db.atomic() 这样的事务装饰器,这会把 peewee db 实例保存在装饰器创建出来的对象上,而这一过程是在加载 python 模块,即 uwsgi fork 之前进行的。 导致 fork 后的进程,db.atomic() 实例中保存的 peewee db 实例与当前进程内使用的 db 实例不同造成错误。

解决的办法是使用 uwsgidecorators.postfork,uwsgi fork 后,重新 reopen 一下 peewee db

解决办法

加钩子

请求量小的话老老实实请求之前获取连接,请求之后关闭连接,比如 Flask:

from flask import Flask
from peewee import *

database = SqliteDatabase('my_app.db')
app = Flask(__name__)

# This hook ensures that a connection is opened to handle any queries
# generated by the request.
@app.before_request
def _db_connect():
    database.connect()

# This hook ensures that the connection is closed when we've finished
# processing the request.
@app.teardown_request
def _db_close(exc):
    if not database.is_closed():
        database.close()

不想加钩子

对请求量稍微大些想复用连接的话,遇到这种情况可以捕获一下这个异常,然后新建一个连接就可以了,老版本有一个 RetryOperationalError 的类来做这个事情, 3.0 以后作者嫌这个类太黑了就去掉了,不过还是有 workaround

from peewee import MySQLDatabase
from peewee import OperationalError


class RetryOperationalError(object):

    def execute_sql(self, sql, params=None, commit=True):
        try:
            cursor = super(RetryOperationalError, self).execute_sql(
                sql, params, commit)
        except OperationalError:
            if not self.is_closed():
                self.close()
            with __exception_wrapper__:
                cursor = self.cursor(commit)
                cursor.execute(sql, params or ())
                if commit and not self.in_transaction():
                    self.commit()
        return cursor


class MyRetryDB(RetryOperationalError, MySQLDatabase):
    pass

网上流传的是这一坨代码,初始化数据库的时候注册的是这个 MyRetryDB,相当于替换了 execute_sql 方法

不过这个代码是有问题的,比如只调用了一个函数并且这个函数被包在 @db.atomic() 这样的事务中就会报错,看一下 close 方法的实现

def close(self):
    with self._lock:
        if self.deferred:
            raise InterfaceError('Error, database must be initialized '
                                    'before opening a connection.')
        if self.in_transaction():
            raise OperationalError('Attempting to close database while '
                                    'transaction is open.')
        is_open = not self._state.closed
        try:
            if is_open:
                with __exception_wrapper__:
                    self._close(self._state.conn)
        finally:
            self._state.reset()
        return is_open

在事务中是不能直接关闭连接的,就会抛异常,所以会导致 mysql 连接得不到更新永远是失效的状态,导致后续请求一直失败,所以这里得判一下是不是在事务中, 在事务中的话需要关闭 mysql 连接,清理上下文,然后抛出异常,因为没有实际的 mysql 连接所以不用考虑回滚上下文的问题,让调用方重试即可。

from peewee import MySQLDatabase, OperationalError, __exception_wrapper__


class RetryOperationalError(object):

    def execute_sql(self, sql, params=None, commit=True):
        try:
            cursor = super(RetryOperationalError, self).execute_sql(
                sql, params, commit)
        except OperationalError:
            if not self.is_closed():
                if self.in_transaction():
                    is_open = not self._state.closed
                    try:
                        if is_open:
                            with __exception_wrapper__:
                                self._close(self._state.conn)
                    finally:
                        self._state.closed = True
                        self._state.conn = None

                    raise OperationalError('request in transaction while mysql has gone away')
                else:
                    self.close()
            with __exception_wrapper__:
                cursor = self.cursor()
                cursor.execute(sql, params or ())
                if commit and not self.in_transaction():
                    self.commit()
        return cursor


class MyRetryDB(RetryOperationalError, MySQLDatabase):
    pass

注意这里 finally 的实现是

self._state.closed = True
self._state.conn = None

不能直接像 close 方法一样调用 self._state.reset(),因为事务以及 db 的上下文环境(ctxtransaction 数组)在退出(__exit__)的时候会自己清理,这里帮他们清理了后面执行也会抛异常

连接池

其实我个人是不太喜欢👆的处理方式的,因为对于有事务的请求还会抛一次异常,而且这种相当于自己在 RetryOperationalError 实现了一个连接保活的功能,既然想要复用连接所以为何不直接用 peewee 的连接池呢?

The pool module contains a number of Database classes that provide connection pooling for PostgreSQL, MySQL and SQLite databases. The pool works by overriding the methods on the Database class that open and close connections to the backend. The pool can specify a timeout after which connections are recycled, as well as an upper bound on the number of open connections.

In a single-threaded application, only one connection will be created. It will be continually recycled until either it exceeds the stale timeout or is closed explicitly (using .manual_close()).

By default, all your application needs to do is ensure that connections are closed when you are finished with them, and they will be returned to the pool. For web applications, this typically means that at the beginning of a request, you will open a connection, and when you return a response, you will close the connection.

单线程应用下只有一个连接会创建,然后会复用这个连接直到超时或者手动 close,这里也说明了,应用需要显示的关闭连接后才会放回连接池中。至于为什么要显示关闭连接,这里给出了说明

Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.

超时的连接不会确定的关闭,只有新请求事件来的时候才会关闭,这里来看一下建立连接的实现

def _connect(self):
    while True:
        try:
            # Remove the oldest connection from the heap.
            ts, conn = heapq.heappop(self._connections)
            key = self.conn_key(conn)
        except IndexError:
            ts = conn = None
            logger.debug('No connection available in pool.')
            break
        else:
            if self._is_closed(conn):
                # This connecton was closed, but since it was not stale
                # it got added back to the queue of available conns. We
                # then closed it and marked it as explicitly closed, so
                # it's safe to throw it away now.
                # (Because Database.close() calls Database._close()).
                logger.debug('Connection %s was closed.', key)
                ts = conn = None
            elif self._stale_timeout and self._is_stale(ts):
                # If we are attempting to check out a stale connection,
                # then close it. We don't need to mark it in the "closed"
                # set, because it is not in the list of available conns
                # anymore.
                logger.debug('Connection %s was stale, closing.', key)
                self._close(conn, True)
                ts = conn = None
            else:
                break

    if conn is None:
        if self._max_connections and (
                len(self._in_use) >= self._max_connections):
            raise MaxConnectionsExceeded('Exceeded maximum connections.')
        conn = super(PooledDatabase, self)._connect()
        ts = time.time() - random.random() / 1000
        key = self.conn_key(conn)
        logger.debug('Created new connection %s.', key)

    self._in_use[key] = PoolConnection(ts, conn, time.time())
    return conn

class PooledMySQLDatabase(PooledDatabase, MySQLDatabase):
    def _is_closed(self, conn):
        try:
            conn.ping(False)
        except:
            return True
        else:
            return False

每次建立数据库连接的时候,会检查连接实例是否超时,这里 mysql 的话是 ping 了一下来检查,不论是 mysql server 主动关闭连接还是这个连接到达了我们自己 设置的超时都会重新建立一个连接。

为何三令五申要显示关闭连接

对于👆_connect 中的 while True 循环,每次创建连接实例都是在 _connections 这个连接池中取,有的话判超时没有的话新建, 看一下关闭连接做了什么事情

def _close(self, conn, close_conn=False):
    key = self.conn_key(conn)
    if close_conn:
        super(PooledDatabase, self)._close(conn)
    elif key in self._in_use:
        pool_conn = self._in_use.pop(key)
        if self._stale_timeout and self._is_stale(pool_conn.timestamp):
            logger.debug('Closing stale connection %s.', key)
            super(PooledDatabase, self)._close(conn)
        elif self._can_reuse(conn):
            logger.debug('Returning %s to pool.', key)
            heapq.heappush(self._connections, (pool_conn.timestamp, conn))
        else:
            logger.debug('Closed %s.', key)

这里的 _close 方法做了两件事,把连接还到连接池和关闭数据库连接,如果不显示的关闭连接的话:

peewee 是个很不错的 orm 框架,代码简短还比较好读,比如在查这个问题的时候发现嵌套事务的实现是通过 savepoint (之前一直以为是内层事务开始的话会判一下如果在事务中就会让内层事务无效23333333)

REF

言:

Blog

Thoughts

Project