15

记一次Sqlalchemy Session问题

 4 years ago
source link: https://mp.weixin.qq.com/s/bpHnBVsMwNK62Xa9AKFXeg
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

记一次Sqlalchemy Session问题

原创 囧囧 网易游戏运维平台 8月8日



囧囧



囧囧,网易游戏高级开发工程师,专业搬砖工程师,目前主要负责网易游戏的CDN服务。对系统和代码实现原理和性能优化特感兴趣。



在一次更新中,我们优化了线上查询大数据量数据库记录的接口,将原来的串行查询,改成了多线程并发查询。上线后发现,日志偶尔会出现

Instance XXX is not bound to a Session; attribute refresh operation cannot proceed

因在线下回归测试的过程中,并没有出现类似的问题。所以初步怀疑是在多并发访问的时候才会出现这种场景。因此,在线下构建复现环境。

测试代码如下:

import gevent
from gevent import monkey

import threading

monkey.patch_all()

with app.test_client() as c:
    def _exec(_id):
        req_data = {
            ...
        }
        rv = c.post('/api/v1/A', json=req_data)
        print(rv.get_json())
        assert rv.status_code == 200

ts = []
    for i in range(2):
        ts.append(gevent.spawn(_exec, i))

for t in ts:
        t.join()

代码省略了一些业务数据,一些无关的importMock代码。

测试代码模拟的是线上Gunicorn(gevent mode) => Flask API这种进线程模型来进行复现.

接口A的概要逻辑如下:

1  db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
2  @blueprint.route("/A", methods=["POST"])
3  def A(*args, **kwargs):
4      params = request.get_json()    
5      valid(params)
6      b = create_orm_obj_b(params)
7      db_session.add(b)
8      multiple_threads_query_database()
9      try:
10         db_session.commit()
11     except Exception as ex:
12         db_session.rollback()
13         return 500    
14     return 200

变更主要改动的地方就在于新增了第8行逻辑。

按照上面的测试脚本,开启两个协程并发请求接口。遗憾并不能复现。

紧接着猜想是否是协程数不够多,或许将协程的并发度逐步调大,然后看看结果。当并发度开到3的时候,果然,问题就复现了

640?wx_fmt=png

将上面测试代码改成没有 gevent 模式下的纯线程试试,不管并发度如何,错误都没有出现

因此,可以怀疑的是gevent协程库的问题

http://sqlalche.me/e/13/bhk3 报错信息的后面,官方解析了报错原因是: 操作的ORM对象(如上诉的b)已经不和当前的session关联了, 后续代码存在使用懒加载的形式来加载对象属性。

所以问题的关键点就在于: b对象在哪一步和当前的session对象失去关联了??

640?wx_fmt=jpeg

sqlalchemy.session.add()这个是将ORM对象和Session进行关联,与之相反的是sqlalchemy.session.expunge()操作。

但是,很明确地知道,业务代码中没有任何一处的地方显示调用了expunge

其实,深入研究一下add的代码。会发现,对象是由Session实例对象的_new属性中。而_new属性会在flush方法最后调用pop方法来删除ORM对象。

640?wx_fmt=jpeg
640?wx_fmt=jpeg

所以,严重怀疑地是Session在调用commit()方法的时候,让b对象和session的联系脱节了。(关于flushcommit两个操作的关系,可以看看这个链接:SQLAlchemy: What's the difference between flush() and commit()? , 大概就是commit内部也对调用flush操作)

打开DEBUG日志看看Sqlalchemy执行SQL日志。

640?wx_fmt=jpeg

果然,在一次事物中,执行了两次插入b对象的操作。

这就可以得出初步的结论:

两次接口调用中,使用了相同的Session对象。A1调用在最后commit的时候将A2中的对象b2也一起提交了,最后当b2在使用属性的时候,就会发现b2不在Session的管理中(都flush出去了)

640?wx_fmt=jpeg

上图,在print(b1.id)的时候,系统就会出错。

虽然找到了Object和Session取消关联的原因,但是归根结底,是因为两个协程公用的一个Session。正确的逻辑应该是每一个协程都应该维护自己的对象关系。

再来看看Sesseion的初始化方法

db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))

class scoped_session(object):

def __init__(self, session_factory, scopefunc=None):

self.session_factory = session_factory

if scopefunc:
            self.registry = ScopedRegistry(session_factory, scopefunc)
        else:
            self.registry = ThreadLocalRegistry(session_factory)

默认情况下,Session使用Thread Local Storage(TLS)来使每个线程都持有一个独立的Session, 这就解析为什么在多线程环境下运行测试代码,是没有问题的。

gunicorngevent模式,在启动阶段,会调用gevent.monkey.patch_all()方法来覆写TLS => gevent.local

我们再来看Sessiongunicorn的执行顺序

def main():

db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))

run_with_gunicorn(flask_app)

我们可以发现,Session里面的TLS初始化是threading.local.local对象,然后gunicorn才运行gevent.monkey.patch_all()

scoped_session调用前,先PATCH一下gevent的代码,结果问题彻底解决了。

在使用Gevent + Sqlalchemy的时候,需要优先执行gevent.monkey.patch_all(), 以防在并发逻辑下出现各种奇怪的问题。

同时,尽量避免把Session定义成全局变量来使用。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK