Use thread-local SQLAlchemy sessions

There are a few problems with how SQLAlchemy session is currently used.

First, there is global, non-thread-safe (see [1]) session instantiated
in db.py. That makes it hard to ensure session is properly closed when
there is no need to keep them open (e.g. in-between Flask requests).  We
also get stale data when we do queries without invalidating the
long-living session. The fix is to use scoped_session, as described in
[2], and by using new session for each Flask request, which is ensured
by using Session.remove() at the end of each request.

Another problem manifests when MySQL backend is used. During long idle
time, MySQL would close inactive connections. And because SQLAlchemy
does maintain a pool of connections open independently from session,
once we try to use it, we'll see 'MySQL server went away' and crash.
This further made worse by single global session that then gets stuck,
not being able to process any additional transactions.  The fix is to
set SQLAlchemy pool_recycle to one hour, which is less than MySQL
interactive timeout.

In the future, instead of continuing to manage Flask with SQLAlchemy by
hand, migration to Flask-SQLAlchemy extension should be considered, [3].

[1] http://docs.sqlalchemy.org/en/rel_1_0/orm/session_basics.html#is-the-session-thread-safe
[2] http://docs.sqlalchemy.org/en/rel_1_0/orm/contextual.html#unitofwork-contextual
[3] https://pythonhosted.org/Flask-SQLAlchemy/quickstart.html#a-minimal-application

Closes-Bug: #1512476

Change-Id: I78a7f3f0715c5aa71d73f9c496ba3b07208674b9
This commit is contained in:
Mikhail S Medvedev 2015-11-14 18:51:30 -06:00
parent 52315eedd7
commit 68f1910e77
5 changed files with 23 additions and 13 deletions

View File

@ -49,16 +49,16 @@ def _get_ci_info_for_patch_sets(ci, patch_sets):
def get_projects():
return db.session.query(Project).order_by(Project.name).all()
return db.Session().query(Project).order_by(Project.name).all()
def get_ci_servers():
return db.session.query(CiServer).order_by(
return db.Session().query(CiServer).order_by(
desc(CiServer.trusted), CiServer.name).all()
def get_patch_sets(project, since):
return db.session.query(PatchSet).filter(
return db.Session().query(PatchSet).filter(
and_(PatchSet.project == project, PatchSet.created >= since)
).order_by(PatchSet.created.desc()).all()
@ -71,7 +71,7 @@ def get_context():
project = request.args.get('project', DEFAULT_PROJECT)
time = request.args.get('time', DEFAULT_TIME_OPTION)
since = datetime.now() - timedelta(hours=TIME_OPTIONS[time])
project = db.session.query(Project).filter(
project = db.Session().query(Project).filter(
Project.name == project).one()
patch_sets = get_patch_sets(project=project, since=since)
results = OrderedDict()

View File

@ -13,6 +13,7 @@
# under the License.
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from ciwatch.config import Config
@ -20,11 +21,10 @@ from ciwatch import models
config = Config()
engine = create_engine(config.cfg.database.connection)
Session = sessionmaker()
Session.configure(bind=engine)
engine = create_engine(config.cfg.database.connection, pool_recycle=3600)
session_factory = sessionmaker(bind=engine)
Session = scoped_session(session_factory)
models.Base.metadata.create_all(engine)
session = Session()
def create_projects():
@ -32,10 +32,11 @@ def create_projects():
get_or_create(models.Project,
commit_=False,
name=name)
session.commit()
Session().commit()
def update_or_create_comment(commit_=True, **kwargs):
session = Session()
comment = session.query(models.Comment).filter_by(
ci_server_id=kwargs['ci_server_id'],
patch_set_id=kwargs['patch_set_id']).scalar()
@ -49,6 +50,7 @@ def update_or_create_comment(commit_=True, **kwargs):
def get_or_create(model, commit_=True, **kwargs):
session = Session()
result = session.query(model).filter_by(**kwargs).first()
if not result:
result = model(**kwargs)

View File

@ -121,7 +121,8 @@ def parse_json_event(event, projects):
def add_event_to_db(event, commit_=True):
project = db.session.query(models.Project).filter(
session = db.Session()
project = session.query(models.Project).filter(
models.Project.name == _process_project_name(
event["change"]["project"])).one()
patch_set = db.get_or_create(
@ -155,7 +156,7 @@ def add_event_to_db(event, commit_=True):
ci_server_id=ci_server.id,
patch_set_id=patch_set.id)
if commit_:
db.session.commit()
session.commit()
def main():

View File

@ -33,7 +33,7 @@ def get_data(datafile, projects):
def load_data(data):
for event in data:
add_event_to_db(event, commit_=False)
db.session.commit()
db.Session().commit()
def main():

View File

@ -19,6 +19,7 @@ from werkzeug.exceptions import abort
from ciwatch.api import get_context
from ciwatch.api import get_projects
from ciwatch.cache import cached
from ciwatch import db
from ciwatch.server import app
@ -26,7 +27,11 @@ from ciwatch.server import app
@app.route("/index")
@app.route("/home")
def home():
return render_template('index.html.jinja', projects=get_projects())
try:
return render_template(
'index.html.jinja', projects=get_projects())
finally:
db.Session.remove()
@app.route("/project")
@ -36,3 +41,5 @@ def project():
return render_template("project.html.jinja", **get_context())
except NoResultFound:
abort(404)
finally:
db.Session.remove()