记一次Sqlalchemy Session问题
source link: https://mp.weixin.qq.com/s/bpHnBVsMwNK62Xa9AKFXeg
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
记一次Sqlalchemy Session问题
囧囧
囧囧,网易游戏高级开发工程师,专业搬砖工程师,目前主要负责网易游戏的CDN服务。对系统和代码实现原理和性能优化特感兴趣。
在一次更新中,我们优化了线上查询大数据量数据库记录的接口,将原来的串行查询,改成了多线程并发查询。上线后发现,日志偶尔会出现
Instance XXX is not bound to a Session; attribute refresh operation cannot proceed
因在线下回归测试的过程中,并没有出现类似的问题。所以初步怀疑是在多并发访问的时候才会出现这种场景。因此,在线下构建复现环境。
测试代码如下:
import gevent
from gevent import monkey
import threading
monkey.patch_all()
with app.test_client() as c:
def _exec(_id):
req_data = {
...
}
rv = c.post('/api/v1/A', json=req_data)
print(rv.get_json())
assert rv.status_code == 200
ts = []
for i in range(2):
ts.append(gevent.spawn(_exec, i))
for t in ts:
t.join()
代码省略了一些业务数据,一些无关的import
和Mock
代码。
测试代码模拟的是线上Gunicorn(gevent mode) => Flask API
这种进线程模型来进行复现.
接口A
的概要逻辑如下:
1 db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
2 @blueprint.route("/A", methods=["POST"])
3 def A(*args, **kwargs):
4 params = request.get_json()
5 valid(params)
6 b = create_orm_obj_b(params)
7 db_session.add(b)
8 multiple_threads_query_database()
9 try:
10 db_session.commit()
11 except Exception as ex:
12 db_session.rollback()
13 return 500
14 return 200
变更主要改动的地方就在于新增了第8行逻辑。
按照上面的测试脚本,开启两个协程并发请求接口。遗憾并不能复现。
紧接着猜想是否是协程数不够多,或许将协程的并发度逐步调大,然后看看结果。当并发度开到3的时候,果然,问题就复现了
将上面测试代码改成没有 gevent 模式下的纯线程试试,不管并发度如何,错误都没有出现
因此,可以怀疑的是gevent
协程库的问题
http://sqlalche.me/e/13/bhk3 报错信息的后面,官方解析了报错原因是: 操作的ORM对象(如上诉的b
)已经不和当前的session
关联了, 后续代码存在使用懒加载的形式来加载对象属性。
所以问题的关键点就在于: b
对象在哪一步和当前的session
对象失去关联了??
sqlalchemy.session.add()
这个是将ORM对象和Session
进行关联,与之相反的是sqlalchemy.session.expunge()
操作。
但是,很明确地知道,业务代码中没有任何一处的地方显示调用了expunge
。
其实,深入研究一下add
的代码。会发现,对象是由Session实例对象的_new
属性中。而_new
属性会在flush
方法最后调用pop
方法来删除ORM对象。
所以,严重怀疑地是Session
在调用commit()
方法的时候,让b
对象和session
的联系脱节了。(关于flush
和commit
两个操作的关系,可以看看这个链接:SQLAlchemy: What's the difference between flush() and commit()? , 大概就是commit内部也对调用flush操作)
打开DEBUG
日志看看Sqlalchemy
执行SQL日志。
果然,在一次事物中,执行了两次插入b
对象的操作。
这就可以得出初步的结论:
两次接口调用中,使用了相同的Session对象。A1调用在最后commit的时候将A2中的对象b2也一起提交了,最后当b2在使用属性的时候,就会发现b2不在Session的管理中(都flush出去了)
上图,在print(b1.id)
的时候,系统就会出错。
虽然找到了Object和Session取消关联的原因,但是归根结底,是因为两个协程公用的一个Session。正确的逻辑应该是每一个协程都应该维护自己的对象关系。
再来看看Sesseion
的初始化方法
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
class scoped_session(object):
def __init__(self, session_factory, scopefunc=None):
self.session_factory = session_factory
if scopefunc:
self.registry = ScopedRegistry(session_factory, scopefunc)
else:
self.registry = ThreadLocalRegistry(session_factory)
默认情况下,Session使用Thread Local Storage(TLS)
来使每个线程都持有一个独立的Session
, 这就解析为什么在多线程环境下运行测试代码,是没有问题的。
而gunicorn
的gevent模式
,在启动阶段,会调用gevent.monkey.patch_all()
方法来覆写TLS => gevent.local
。
我们再来看Session
和gunicorn
的执行顺序
def main():
db_session = scoped_session(sessionmaker(autocommit=False, autoflush=False, bind=engine))
run_with_gunicorn(flask_app)
我们可以发现,Session
里面的TLS初始化是threading.local.local
对象,然后gunicorn才运行gevent.monkey.patch_all()
在scoped_session
调用前,先PATCH一下gevent
的代码,结果问题彻底解决了。
在使用Gevent + Sqlalchemy
的时候,需要优先执行gevent.monkey.patch_all()
, 以防在并发逻辑下出现各种奇怪的问题。
同时,尽量避免把Session
定义成全局变量来使用。
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK