Release DB session thread lock earlier in syncs

For some of the sync tasks, the DB session context manager was holding
the Database object's thread lock for longer periods of time than was
necessary. This patch changes the sessionmaker in the Database object to
not expire objects on commit and not autoflush the session. This allows
the objects returned by a DB query to be used without running into an
UnboundExecutionError when trying to access attributes on the SQLAlchemy
models returned from a query after the session.commit() has been called.

We then modify some of the sync tasks to grab records and release the
thread lock on the DB session and then do their processing work instead
of holding the lock for the duration of the processing work.

Storyboard story: 2000086

Change-Id: Iae6d789409b88f4e3cb35ac9e4f839f8e4d79a97
This commit is contained in:
Jay Pipes 2014-12-17 16:53:21 -05:00 committed by James E. Blair
parent cd9c164c6e
commit 77f980b70e
2 changed files with 51 additions and 34 deletions

View File

@ -488,7 +488,14 @@ class Database(object):
self.engine = create_engine(self.app.config.dburi)
#metadata.create_all(self.engine)
self.migrate()
self.session_factory = sessionmaker(bind=self.engine)
# If we want the objects returned from query() to be usable
# outside of the session, we need to expunge them from the session,
# and since the DatabaseSession always calls commit() on the session
# when the context manager exits, we need to inform the session to
# expire objects when it does so.
self.session_factory = sessionmaker(bind=self.engine,
expire_on_commit=False,
autoflush=False)
self.session = scoped_session(self.session_factory)
self.lock = threading.Lock()
@ -581,6 +588,14 @@ class DatabaseSession(object):
except sqlalchemy.orm.exc.NoResultFound:
return None
def getChangeIDs(self, ids):
# Returns a set of IDs that exist in the local database matching
# the set of supplied IDs. This is used when sync'ing the changesets
# locally with the remote changes.
if not ids:
return set([])
return set([r[0] for r in self.session().query(Change.id).filter(Change.id.in_(ids)).all()])
def getChangeByChangeID(self, change_id):
try:
return self.session().query(Change).filter_by(change_id=change_id).one()

View File

@ -183,8 +183,9 @@ class SyncSubscribedProjectBranchesTask(Task):
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
for p in session.getProjects(subscribed=True):
sync.submitTask(SyncProjectBranchesTask(p.name, self.priority))
projects = session.getProjects(subscribed=True)
for p in projects:
sync.submitTask(SyncProjectBranchesTask(p.name, self.priority))
class SyncProjectBranchesTask(Task):
branch_re = re.compile(r'refs/heads/(.*)')
@ -227,10 +228,10 @@ class SyncSubscribedProjectsTask(Task):
app = sync.app
with app.db.getSession() as session:
keys = [p.key for p in session.getProjects(subscribed=True)]
for i in range(0, len(keys), 10):
t = SyncProjectTask(keys[i:i+10], self.priority)
self.tasks.append(t)
sync.submitTask(t)
for i in range(0, len(keys), 10):
t = SyncProjectTask(keys[i:i+10], self.priority)
self.tasks.append(t)
sync.submitTask(t)
class SyncProjectTask(Task):
_closed_statuses = ['MERGED', 'ABANDONED']
@ -276,14 +277,17 @@ class SyncProjectTask(Task):
if batch and '_more_changes' in batch[-1]:
sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
done = False
change_ids = [c['id'] for c in changes]
with app.db.getSession() as session:
for c in changes:
# For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever
change = session.getChangeByID(c['id'])
if change or (c['status'] not in self._closed_statuses):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Change %s update %s" % (c['id'], c['updated']))
# Winnow the list of IDs to only the ones in the local DB.
change_ids = session.getChangeIDs(change_ids)
for c in changes:
# For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever
if c['id'] in change_ids or (c['status'] not in self._closed_statuses):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Change %s update %s" % (c['id'], c['updated']))
for key in self.project_keys:
sync.submitTask(SetProjectUpdatedTask(key, now, priority=self.priority))
@ -316,10 +320,9 @@ class SyncChangeByCommitTask(Task):
query = 'commit:%s' % self.commit
changes = sync.get('changes/?q=%s' % query)
self.log.debug('Query: %s ' % (query,))
with app.db.getSession() as session:
for c in changes:
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Sync change %s for its commit %s" % (c['id'], self.commit))
for c in changes:
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Sync change %s for its commit %s" % (c['id'], self.commit))
class SyncChangeByNumberTask(Task):
def __init__(self, number, priority=NORMAL_PRIORITY):
@ -335,12 +338,11 @@ class SyncChangeByNumberTask(Task):
query = '%s' % self.number
changes = sync.get('changes/?q=%s' % query)
self.log.debug('Query: %s ' % (query,))
with app.db.getSession() as session:
for c in changes:
task = SyncChangeTask(c['id'], priority=self.priority)
self.tasks.append(task)
sync.submitTask(task)
self.log.debug("Sync change %s because it is number %s" % (c['id'], self.number))
for c in changes:
task = SyncChangeTask(c['id'], priority=self.priority)
self.tasks.append(task)
sync.submitTask(task)
self.log.debug("Sync change %s because it is number %s" % (c['id'], self.number))
class SyncChangeTask(Task):
def __init__(self, change_id, force_fetch=False, priority=NORMAL_PRIORITY):
@ -584,17 +586,17 @@ class CheckReposTask(Task):
def run(self, sync):
app = sync.app
to_check = []
with app.db.getSession() as session:
for project in session.getProjects(subscribed=True):
try:
repo = app.getRepo(project.name)
if repo.newly_cloned or app.fetch_missing_refs:
to_check.append(project.key)
except Exception:
self.log.exception("Exception checking repo %s" % (project.name,))
for key in to_check:
sync.submitTask(CheckRevisionsTask(key, priority=LOW_PRIORITY))
projects = session.getProjects(subscribed=True)
for project in projects:
try:
repo = app.getRepo(project.name)
if repo.newly_cloned or app.fetch_missing_refs:
sync.submitTask(CheckRevisionsTask(key,
priority=LOW_PRIORITY))
except Exception:
self.log.exception("Exception checking repo %s" %
(project.name,))
class CheckRevisionsTask(Task):
def __init__(self, project_key, priority=NORMAL_PRIORITY):