SQLAlchemy源码阅读-上篇

简介: SQLAlchemy是Python SQL工具箱和ORM框架,它为应用程序开发人员提供了全面而灵活的SQL功能。它提供了一整套企业级持久化方案,旨在高效,高性能地访问数据库,并符合简单的Pythonic哲学。项目代码量比较大,接近200个文件,7万行代码, 我们一起来挑战一下。作者:游戏不存在链接:https://juejin.cn/post/6951945198322581518来源:稀土掘金著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

SQLAlchemy是Python SQL工具箱和ORM框架,它为应用程序开发人员提供了全面而灵活的SQL功能。它提供了一整套企业级持久化方案,旨在高效,高性能地访问数据库,并符合简单的Pythonic哲学。项目代码量比较大,接近200个文件,7万行代码, 我们一起来挑战一下。由于篇幅原因,分成上下两篇,上篇包括如下内容:


  • SQLAlchemy项目结构
  • 使用SQLAlchemy操作sqlite数据库
  • Engine代码分析
  • SQLiteDialect代码分析
  • Connection&&Pool代码分析
  • execute-SQL语句
  • Result分析
  • 小结
  • 小技巧


SQLAlchemy项目结构



源码使用的版本是 1.3.0, 对应的commitID是 740bb50c2,和参考链接中官方文档1.3版本一致。项目目录大概包括:


目录 描述
connectors 连接
dialects 方言
engine 引擎
event 事件
ext 扩展功能
orm orm
pool 连接池
sql sql处理
util 工具类


SQLAlchemy的架构图如下:


image.png


整体分成3层,从上到下分别是ORM,core和DBAPI,其中core,又分成左右两个区域。我们先学习其中的引擎,连接池dialects(仅sqlite)和DBAPI部分,也就是架构图的右半侧。其中DBAPI(sqlite相关)是在python-core-library中提供。


用SQLAlchemy操作sqlite数据库



先从使用DBAPI操作sqlite的API开始:


import sqlite3
con = sqlite3.connect('example.db')
cur = con.cursor()
# Create table
cur.execute('''CREATE TABLE stocks
               (date text, trans text, symbol text, qty real, price real)''')
# Insert a row of data
cur.execute("INSERT INTO stocks VALUES ('2006-01-05','BUY','RHAT',100,35.14)")
# Save (commit) the changes
con.commit()
# Do this instead
t = ('RHAT',)
cur.execute('SELECT * FROM stocks WHERE symbol=?', t)
print(cur.fetchone())
# We can also close the connection if we are done with it.
# Just be sure any chang
con.close()


操作sqlite数据库主要包括了下面几个步骤:


  • connect数据库获得连接con
  • 从连接中获取操作游标cur
  • 使用cur执行sql语句(statement)
  • 向连接con提交commit事务
  • 使用cur的fetchone/fecthmany/fetchall方法获取数据
  • 完成数据获取后使用close方法关闭连接con


对比一下使用sqlalchemy进行sqlite操作:


from sqlalchemy import create_engine
eng = create_engine("sqlite:///:memory:", echo=True)
conn = eng.connect()
conn.execute("create table x (a integer, b integer)")
conn.execute("insert into x (a, b) values (1, 1)")
conn.execute("insert into x (a, b) values (2, 2)")
result = conn.execute("select x.a, x.b from x")
assert result.keys() == ["a", "b"]
result = conn.execute('''
    select x.a, x.b from x where a=1
    union
    select x.a, x.b from x where a=2
''')
assert result.keys() == ["a", "b"]


可以看到使用sqlalchemy后操作变的简单,把cursor,commit,fetch和close等操作隐藏到engine内部,简化成3步:


  • 使用create_engine函数创建引擎eng
  • 使用引擎的connect方法创建连接conn
  • 使用conn执行SQL语句并获取返回的执行结果


Engine代码分析



跟随create_engine的API,可以看到这里使用策略模式去创建不同的engine实现:


# engine/__init__.py
from . import strategies
default_strategy = "plain"  # 默认
def create_engine(*args, **kwargs):
    strategy = kwargs.pop("strategy", default_strategy)
    strategy = strategies.strategies[strategy]
    return strategy.create(*args, **kwargs)


默认的engine策略:


# engine/strategies.py
strategies = {}
class EngineStrategy(object):
    def __init__(self):
        strategies[self.name] = self
class DefaultEngineStrategy(EngineStrategy):
    def create(self, name_or_url, **kwargs):
        ...
class PlainEngineStrategy(DefaultEngineStrategy):
    name = "plain"
    engine_cls = base.Engine  # 引擎类
PlainEngineStrategy()


重点就在策略的create方法了, 去掉数据准备和异常处理后核心代码如下:


def create(self, name_or_url, **kwargs):
    ...
    # get dialect class
    u = url.make_url(name_or_url)
    entrypoint = u._get_entrypoint()
    dialect_cls = entrypoint.get_dialect_cls(u)
    # create dialect
    dialect = dialect_cls(**dialect_args)
    # pool
    poolclass = dialect_cls.get_pool_class(u)
    pool = poolclass(creator, **pool_args)
    # engine
    engineclass = self.engine_cls
    engine = engineclass(pool, dialect, u, **engine_args)
    ...
    return engine


create函数可以理解为engine的创建模版,主要是下面3个步骤:


  • 根据url获取到数据库方言,适配不同数据库sqlite/mysql/postgresql...
  • 获取不同方言的连接池实现
  • 创建engine,持有dialect和pool


Engine的构造函数和connect方法如下:


class Engine(Connectable, log.Identified):
    _connection_cls = Connection
    def __init__(
        self,
        pool,
        dialect,
        url,
        logging_name=None,
        echo=None,
        proxy=None,
        execution_options=None,
    ):
        self.pool = pool
        self.url = url
        self.dialect = dialect
        self.engine = self
        ...
    def connect(self, **kwargs):
        return self._connection_cls(self, **kwargs)


engine主要功能就是管理和持有connection,pool和dialect,对外提供API。


SQLiteDialect代码分析



dialect是根据url自动识别,使用PluginLoader进行动态加载:


class PluginLoader(object):
    def __init__(self, group, auto_fn=None):
        self.group = group
        self.impls = {}
        self.auto_fn = auto_fn
    def load(self, name):
        # import一次 
        if name in self.impls:
            return self.impls[name]()
        if self.auto_fn:
            loader = self.auto_fn(name)
            if loader:
                self.impls[name] = loader
                return loader()
        ...


sqlite-dialect使用下面的 __import__ 动态加载模块:


def _auto_fn(name):
    if "." in name:
        dialect, driver = name.split(".")
    else:
        dialect = name
        driver = "base"
    if dialect in _translates:
        translated = _translates[dialect]
        dialect = translated
    try:
        # 动态加载模块
        module = __import__("sqlalchemy.dialects.%s" % (dialect,)).dialects
    except ImportError:
        return None
    module = getattr(module, dialect)
    if hasattr(module, driver):
        module = getattr(module, driver)
        return lambda: module.dialect
    else:
        return None
registry = util.PluginLoader("sqlalchemy.dialects", auto_fn=_auto_fn)


不同方言实现需要提供一个dialect对象,在sqlite中是这样的:


## sqlalchemy/dialects/sqlite/__init__.py
base.dialect = dialect = pysqlite.dialect
## sqlalchemy/dialects/sqlite/pysqlite.py
class SQLiteDialect_pysqlite(SQLiteDialect):
    pass
dialect = SQLiteDialect_pysqlite


SQLiteDialect功能相简单,一是决定POOL_CLASS的类型: memory实现使用的是SingletonThreadPool;db文件使用NullPool,下面分析Pool时候会用到。


class SQLiteDialect_pysqlite(SQLiteDialect):
    @classmethod
    def get_pool_class(cls, url):
        if url.database and url.database != ":memory:":
            return pool.NullPool
        else:
            return pool.SingletonThreadPool


二是提供包装DBAPI得到的connect:


class DefaultDialect(interfaces.Dialect):
    ...
    def connect(self, *cargs, **cparams):
        return self.dbapi.connect(*cargs, **cparams)
class SQLiteDialect_pysqlite(SQLiteDialect):
    ...
    @classmethod
    def dbapi(cls):
        try:
            from pysqlite2 import dbapi2 as sqlite
        except ImportError:
            try:
                from sqlite3 import dbapi2 as sqlite  # try 2.5+ stdlib name.
            except ImportError as e:
                raise e
        return sqlite
    def connect(self, *cargs, **cparams):
        passphrase = cparams.pop("passphrase", "")
        pragmas = dict((key, cparams.pop(key, None)) for key in self.pragmas)
        conn = super(SQLiteDialect_pysqlcipher, self).connect(
            *cargs, **cparams
        )
        conn.execute('pragma key="%s"' % passphrase)
        for prag, value in pragmas.items():
            if value is not None:
                conn.execute('pragma %s="%s"' % (prag, value))
        return conn


connect在SQLiteDialect_pysqlite类和父类DefaultDialect之间反复横跳,核心功能就是下面2句代码:


from sqlite3 import dbapi2 as sqlite
sqlite.connect(*cargs, **cparams)


Connect和Pool代码分析



Connection构造函数如下:


class Connection(Connectable):
    def __init__(
        self,
        engine,
        connection=None,
        close_with_result=False,
        _branch_from=None,
        _execution_options=None,
        _dispatch=None,
        _has_events=None,
    ):
        self.engine = engine
        self.dialect = engine.dialect
        self.__connection =  engine.raw_connection()
        ...


connection主要使用engine.raw_connection创建了一个DBAPI连接


class Engine(Connectable, log.Identified):
    def raw_connection(self, _connection=None):
        return self._wrap_pool_connect(
            self.pool.unique_connection, _connection
        )
    def _wrap_pool_connect(self, fn, connection):
        dialect = self.dialect
        try:
            return fn()
        except dialect.dbapi.Error as e:
            ...


pool.unique_connection负责创建数据库连接,这里的实现过程比较复杂,个人觉得也挺绕的,涉及Pool,ConnectionFairy和ConnectionRecord三个类。我们一点一点的跟踪:


class SingletonThreadPool(Pool):
    def __init__(self, creator, pool_size=5, **kw):
        Pool.__init__(self, creator, **kw)
        self._conn = threading.local()
        self._all_conns = set()
        self.size = pool_size
    def unique_connection(self):
        return _ConnectionFairy._checkout(self)
    def _do_get(self):
        c = _ConnectionRecord(self)
        self._conn.current = weakref.ref(c)
        if len(self._all_conns) >= self.size:
            self._cleanup()
        self._all_conns.add(c)
        return c


SingletonThreadPool主要在_do_get的实现,创建一个ConnectionRecor对象,然后将其加入到自己管理的集合中后再返回,标准的池操作了。 如何通过unique_connection方法去触发_do_get方法并得到实际的db-connect


class _ConnectionFairy(object):
    def __init__(self, dbapi_connection, connection_record, echo):
        self.connection = dbapi_connection
        self._connection_record = connection_record
    @classmethod
    def _checkout(cls, pool, threadconns=None, fairy=None):
        if not fairy:
            fairy = _ConnectionRecord.checkout(pool)
            fairy._pool = pool
            fairy._counter = 0
        return fairy
...
class _ConnectionRecord(object):
    def __init__(self, pool, connect=True):
        self.__pool = pool
    @classmethod
    def checkout(cls, pool):
        rec = pool._do_get()
        try:
            dbapi_connection = rec.get_connection()
        except Exception as err:
            ...
        fairy = _ConnectionFairy(dbapi_connection, rec, echo)
        rec.fairy_ref = weakref.ref(
            fairy,
            lambda ref: _finalize_fairy
            and _finalize_fairy(None, rec, pool, ref, echo),
        )
        ...
        return fairy
    def get_connection(self):
        pool = self.__pool
        connection = pool.creator(self)
        self.connection = connection
        return connection
...
class DefaultEngineStrategy(EngineStrategy):
    def create(self, name_or_url, **kwargs):
        def connect(connection_record=None):
            # dbapai-connection
            return dialect.connect(*cargs, **cparams)
        creator = pop_kwarg("creator", connect)
        pool = poolclass(creator, **pool_args)
        ...


整个过程大概是这样的:


  1. ConnectionFairy.checkout调用ConnectionRecord.checkout方法
  2. ConnectionRecord再回调SingletonThreadPool的_do_get方法创建rec对象
  3. rec对象继续调用SingletonThreadPool的creator方法
  4. creator方法使用dialect.connect获取数据库连接dbapi_connection
  5. 使用rec和dbapi_connection再创建fairy对象
  6. 返回fairy对象


除了执行过程在来回穿插外,还因为ConnectionFairy和ConnectionRecord是循环依赖的:


class _ConnectionRecord(object):
    fairy_ref = None
...
class _ConnectionFairy(object):
    def __init__(self, dbapi_connection, connection_record, echo):
        self._connection_record = connection_record


循环依赖的安全建立主要使用weakref,想学习的可以翻看之前的博文


execute-SQL语句



知道connection如何创建后,继续看connection使用execute方法执行sql语句:


def execute(self, object_, *multiparams, **params):
    if isinstance(object_, util.string_types[0]):
        return self._execute_text(object_, multiparams, params)
    ...
def _execute_text(self, statement, multiparams, params):
        """Execute a string SQL statement."""
        dialect = self.dialect
        parameters = _distill_params(multiparams, params)
        ret = self._execute_context(
            dialect,
            dialect.execution_ctx_cls._init_statement,
            statement,
            parameters,
            statement,
            parameters,
        )
        return ret
def _execute_context(
        self, dialect, constructor, statement, parameters, *args
    ):
    conn = self.__connection
    ...
    context = constructor(dialect, self, conn, *args)
    ...
    cursor, statement, parameters = (
            context.cursor,
            context.statement,
            context.parameters,
        )
    ...
    self.dialect.do_execute(
                        cursor, statement, parameters, context
                    )
    ...
    result = context._setup_crud_result_proxy()
    return result


execute还有一些其它分支,可以适用ORM等场景,本篇只介绍纯文本的sql


函数层层穿透后,主要包括下面三段代码:


  • 利用dialect创建context上下文
  • 使用dialect执行sql语句(文本)
  • 使用context获取执行的结果并返回


dialect涉及的上下文context创建和sql执行:


class DefaultDialect(interfaces.Dialect):
    def do_execute(self, cursor, statement, parameters, context=None):
        cursor.execute(statement, parameters)
DefaultDialect.execution_ctx_cls = DefaultExecutionContext


可以看到执行语句就是使用cursor对象,和前面直接操作sqlite一致。每条sql执行的上下文context是下面方式构建的:


class DefaultExecutionContext(interfaces.ExecutionContext):
    @classmethod
    def _init_statement(
        cls, dialect, connection, dbapi_connection, statement, parameters
    ):
        self = cls.__new__(cls)
        self.root_connection = connection
        self._dbapi_connection = dbapi_connection
        self.dialect = connection.dialect
        ...
        self.parameters = [{}]
        ...
        self.statement = self.unicode_statement = statement
        self.cursor = self.create_cursor()
        return self
    def create_cursor(self):
        return self._dbapi_connection.cursor()


Result分析



sql执行的结果,在context._setup_crud_result_proxy中返回ResultProxy对象。 ResultProxy是一个可以迭代的对象,可以使用fetchone获取单条记录:


class ResultProxy(object):
    def __iter__(self):
        while True:
            row = self.fetchone()
            if row is None:
                return
            else:
                yield row
    def __next__(self):
        row = self.fetchone()
        if row is None:
            raise StopIteration()
        else:
            return row
    def fetchone(self):
        try:
            row = self._fetchone_impl()
            if row is not None:
                return self.process_rows([row])[0]
    def _fetchone_impl(self):
        try:
            return self.cursor.fetchone()
        except AttributeError:
            return self._non_result(None)


对获取的记录还可以使用process_rows进行数据封装,这个以后再介绍。


小结



我们完整的追逐了使用sqlalchemy执行sql语句的过程,可以简单小结如下:


  • 使用url语法查找并动态加载数据库方言
  • 创建引擎对象,管理方言,方言的连接池,提供SQL的API
  • 使用引擎对象获取到数据库链接connect,获取后的链接使用pool管理
  • 执行SQL语句并获取执行结果


下面的类图介绍的更详细, 完整展示了engine/pool/connection/dialect的关系:


image.png


小技巧



deprecated是一个废弃API装饰器, 主要给一些不再支持/推荐的API加上使用警告和更替的方法:


def deprecated(version, message=None, add_deprecation_to_docstring=True):
    if add_deprecation_to_docstring:
        header = ".. deprecated:: %s %s" % (version, (message or ""))
    else:
        header = None
    if message is None:
        message = "Call to deprecated function %(func)s"
    def decorate(fn):
        return _decorate_with_warning(
            fn,
            exc.SADeprecationWarning,
            message % dict(func=fn.__name__),
            header,
        )
    return decorate


比如Connectable.contextual_connect的API这样使用:


class Connectable(object):
    @util.deprecated(
        "1.3",
        "The :meth:`.Engine.contextual_connect` and "
        ":meth:`.Connection.contextual_connect` methods are deprecated.  This "
        "method is an artifact of the threadlocal engine strategy which is "
        "also to be deprecated.   For explicit connections from an "
        ":class:`.Engine`, use the :meth:`.Engine.connect` method.",
    )
    def contextual_connect(self, *arg, **kw):
        ...


这对库/框架的开发者非常有用,API的变动可以这种方式通知使用者,进行平滑的升级替换。


参考链接




目录
相关文章
|
4月前
|
数据采集 消息中间件 并行计算
Python多线程与多进程性能对比:从原理到实战的深度解析
在Python编程中,多线程与多进程是提升并发性能的关键手段。本文通过实验数据、代码示例和通俗比喻,深入解析两者在不同任务类型下的性能表现,帮助开发者科学选择并发策略,优化程序效率。
327 1
|
5月前
|
数据采集 监控 调度
干货分享“用 多线程 爬取数据”:单线程 + 协程的效率反超 3 倍,这才是 Python 异步的正确打开方式
在 Python 爬虫中,多线程因 GIL 和切换开销效率低下,而协程通过用户态调度实现高并发,大幅提升爬取效率。本文详解协程原理、实战对比多线程性能,并提供最佳实践,助你掌握异步爬虫核心技术。
|
消息中间件 测试技术 领域建模
DDD - 一文读懂DDD领域驱动设计
DDD - 一文读懂DDD领域驱动设计
45563 6
|
Web App开发 Java
使用java操作浏览器的工具selenium-java和webdriver下载地址
【10月更文挑战第12天】Selenium-java依赖包用于自动化Web测试,版本为3.141.59。ChromeDriver和EdgeDriver分别用于控制Chrome和Edge浏览器,需确保版本与浏览器匹配。示例代码展示了如何使用Selenium-java模拟登录CSDN,包括设置驱动路径、添加Cookies和获取页面源码。
915 6
|
存储 运维 NoSQL
如何撰写好的技术方案设计-真实案例干货分享
如何撰写好的技术方案设计-真实案例干货分享
2326 0
|
SQL 数据库 Python
SqlAlchemy 2.0 中文文档(十一)(3)
SqlAlchemy 2.0 中文文档(十一)
221 11
|
Docker 容器
docker 设置国内镜像源
docker 设置国内镜像源
91470 1
|
SQL API 数据库
Python中的SQLAlchemy框架:深度解析与实战应用
【4月更文挑战第13天】在Python的众多ORM(对象关系映射)框架中,SQLAlchemy以其功能强大、灵活性和易扩展性脱颖而出,成为许多开发者首选的数据库操作工具。本文将深入探讨SQLAlchemy的核心概念、功能特点以及实战应用,帮助读者更好地理解和使用这一框架。
|
存储 程序员 Python
Python函数定义与调用详解
Python中的函数是可重用代码块,用于接收参数、执行操作并可能返回输出。通过`def`定义函数,如`def greet(name): print(f"Hello, {name}!")`。函数可接受任意数量的参数,包括默认值。调用函数时提供参数,如`greet("Alice")`。可变参数通过星号(*)和双星号(**)实现。函数有助于代码模块化、理解和维护。掌握函数是Python编程基础。