MelonBlog

给pymysql实现一个简单的连接池功能

前言

pymysql库是一个非常好用的python mysql库,但是它没提供连接池功能,可能是作者考虑到python因为gil的存在,不太可能用到一些高并发场景里。

我在使用python写项目的过程中遇到了一些需要频繁查询的场景,会频繁发起mysql连接,所以我在想,如果有一个连接池可以降低频繁创建socket连接的开销,应该能够提升程序的性能,于是我就自己动手实现了一个简单的连接池,并且运行稳定,目前还没遇到问题。

实现

包装pymysql的Connection

通过一个DelegateConnection类来替代原Connection的功能。

在使用Connection类的过程中,主要会用到begincommitrollbackcursorclose等函数,这里的close函数的逻辑做了一些修改,没有去直接调用原Connection的close功能,因为连接需要还原到连接池里。所有的sql操作都是从开启一个新的事物为起点,所以在begin函数里, 为了防止连接长时间因为闲置而断连,所以这里加了一个重连功能。occupy函数是我新增的一个用于修改连接状态的函数,表示连接已经被占用。

class DelegateConnection:
    def __init__(self, connection: PyMySQLConnection, pool: ConnectionPool, create_time=time.time()):
        self.connection: PyMySQLConnection = connection
        self.pool: ConnectionPool = pool
        self.status = 'idle'
        self.create_time = create_time
    def close(self):
        self.pool.release(self)
    def cursor(self):
        return self.connection.cursor()
    def occupy(self):
        self.status = 'occupied'
    def commit(self):
        self.connection.commit()
    def rollback(self):
        self.connection.rollback()
    def begin(self):
        self.connection.ping(reconnect=True)
        self.connection.begin()

连接池

连接池的核心功能就是管理连接,我在这里使用了一个list字段来存储创建的连接,初始化连接池的时候会根据配置来创建数量为最小连接数的连接,通过连接池获取连接时,会根据list字段中是否存在闲置连接来决定是否需要创建一个新的连接,但是不能在连接数已经满了的情况下创建连接,所以这里加了一个timeout处理,如果连接池已经满了,那么就循环等待一段时间,直到有连接使用完毕,处于闲置状态。如果等待一段时间都没有拿到闲置的连接,就直接抛出异常结束等待。

回收连接是连接池的核心功能之一,release函数通过修改连接的状态来达到回收连接的目的,这里多加了一个逻辑就是当连接数量大于最小连接数的时候,会直接关闭此连接并且移除连接池。

class ConnectionPool:
    def __init__(self, conf):
        self.conf = conf
        self._connections: list[DelegateConnection] = []
        self._min_connections = conf['pool']['min_connections']
        self._max_connections = conf['pool']['max_connections']
        for _ in range(self._min_connections):
            mysql_connection: PyMySQLConnection = pymysql.connect(host=conf['host'],
                                                                  user=conf['user'],
                                                                  password=conf['password'],
                                                                  database=conf['database'],
                                                                  cursorclass=DictCursor)
            self._connections.append(DelegateConnection(mysql_connection, self))
    def new_connection(self):
        return pymysql.connect(host=self.conf['host'],
                               user=self.conf['user'],
                               password=self.conf['password'],
                               database=self.conf['database'],
                               cursorclass=DictCursor)
    def get_connection(self, timeout=None):
        start_time = time.time()
        connection = self.occupy()
        if connection is not None:
            return connection
        if len(self._connections) < self._max_connections:
            mysql_connection = self.new_connection()
            connection = DelegateConnection(mysql_connection, self)
            connection.occupy()
            self._connections.append(connection)
            return connection
        if timeout is not None:
            raise Exception('No available connection')
        while True:
            time.sleep(0.1)
            if timeout is not None and time.time() - start_time > timeout:
                raise Exception('No available connection')
            connection = self.occupy()
            if connection is not None:
                return connection
    def occupy(self):
        for connection in self._connections:
            if connection.status == 'idle':
                connection.occupy()
                return connection
        return None
    def release(self, delegate_connection):
        if len(self._connections) > self._min_connections:
            delegate_connection.connection.close()
            self._connections.remove(delegate_connection)
        else:
            delegate_connection.status = 'idle'

aiomysql

以上就是一个简单的连接池的实现。实际上github上有一个优秀库:aiomysql,aiomysql在pymysql的基础上增加了连接池和异步功能,支持异步操作之后,真的非常提升程序的性能,例如如果你一个函数里需要查询对张表的数据,如果串连操作和异步操作最后性能差距可能高达好几倍。


aiomysql仓库地址:

https://github.com/aio-libs/aiomysql