diff --git a/gertty/alembic/versions/7ef7dfa2ca3a_add_change_outdated.py b/gertty/alembic/versions/7ef7dfa2ca3a_add_change_outdated.py new file mode 100644 index 0000000..dd99722 --- /dev/null +++ b/gertty/alembic/versions/7ef7dfa2ca3a_add_change_outdated.py @@ -0,0 +1,37 @@ +"""add change.outdated + +Revision ID: 7ef7dfa2ca3a +Revises: 37a702b7f58e +Create Date: 2016-08-09 08:59:04.441926 + +""" + +# revision identifiers, used by Alembic. +revision = '7ef7dfa2ca3a' +down_revision = '37a702b7f58e' + +import warnings + +from alembic import op +import sqlalchemy as sa + +from gertty.dbsupport import sqlite_alter_columns + + +def upgrade(): + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + op.add_column('change', sa.Column('outdated', sa.Boolean())) + + connection = op.get_bind() + change = sa.sql.table('change', + sa.sql.column('outdated', sa.Boolean())) + connection.execute(change.update().values({'outdated':False})) + + sqlite_alter_columns('change', [ + sa.Column('outdated', sa.Boolean(), index=True, nullable=False), + ]) + + +def downgrade(): + pass diff --git a/gertty/db.py b/gertty/db.py index 1a5b131..b96abe7 100644 --- a/gertty/db.py +++ b/gertty/db.py @@ -82,6 +82,7 @@ change_table = Table( Column('pending_status', Boolean, index=True, nullable=False), Column('pending_status_message', Text), Column('last_seen', DateTime, index=True), + Column('outdated', Boolean, index=True, nullable=False), ) change_conflict_table = Table( 'change_conflict', metadata, @@ -257,7 +258,7 @@ class Change(object): hidden=False, reviewed=False, starred=False, held=False, pending_rebase=False, pending_topic=False, pending_starred=False, pending_status=False, - pending_status_message=None): + pending_status_message=None, outdated=False): self.project_key = project.key self.account_key = owner.key self.id = id @@ -278,6 +279,7 @@ class Change(object): self.pending_starred = pending_starred self.pending_status = pending_status self.pending_status_message = pending_status_message + self.outdated = outdated def getCategories(self): categories = set([label.category for label in self.labels]) @@ -933,6 +935,9 @@ class DatabaseSession(object): def getHeld(self): return self.session().query(Change).filter_by(held=True).all() + def getOutdated(self): + return self.session().query(Change).filter_by(outdated=True).all() + def getPendingMessages(self): return self.session().query(Message).filter_by(pending=True).all() diff --git a/gertty/sync.py b/gertty/sync.py index de93550..ccd396e 100644 --- a/gertty/sync.py +++ b/gertty/sync.py @@ -528,6 +528,24 @@ class SyncChangeByNumberTask(Task): sync.submitTask(task) self.log.debug("Sync change %s because it is number %s" % (c['id'], self.number)) +class SyncOutdatedChangesTask(Task): + def __init__(self, priority=NORMAL_PRIORITY): + super(SyncOutdatedChangesTask, self).__init__(priority) + + def __eq__(self, other): + if other.__class__ == self.__class__: + return True + return False + + def __repr__(self): + return '' + + def run(self, sync): + with sync.app.db.getSession() as session: + for change in session.getOutdated(): + self.log.debug("Sync outdated change %s" % (change.id,)) + sync.submitTask(SyncChangeTask(change.id, priority=self.priority)) + class SyncChangeTask(Task): def __init__(self, change_id, force_fetch=False, priority=NORMAL_PRIORITY): super(SyncChangeTask, self).__init__(priority) @@ -546,6 +564,22 @@ class SyncChangeTask(Task): def run(self, sync): start_time = time.time() + try: + self._syncChange(sync) + end_time = time.time() + total_time = end_time - start_time + self.log.info("Synced change %s in %0.5f seconds.", self.change_id, total_time) + except Exception: + try: + self.log.error("Marking change %s outdated" % (self.change_id,)) + with sync.app.db.getSession() as session: + change = session.getChangeByID(self.change_id) + change.outdated = True + except Exception: + self.log.exception("Error while marking change %s as outdated" % (self.change_id,)) + raise + + def _syncChange(self, sync): app = sync.app remote_change = sync.get('changes/%s?o=DETAILED_LABELS&o=ALL_REVISIONS&o=ALL_COMMITS&o=MESSAGES&o=DETAILED_ACCOUNTS&o=CURRENT_ACTIONS&o=ALL_FILES' % self.change_id) # Perform subqueries this task will need outside of the db session @@ -870,6 +904,7 @@ class SyncChangeTask(Task): change.reviewed = False result.review_flag_changed = True app.project_cache.clear(change.project) + change.outdated = False for url, refs in fetches.items(): self.log.debug("Fetching from %s with refs %s", url, refs) try: @@ -881,9 +916,6 @@ class SyncChangeTask(Task): for ref in refs: self.log.debug("git fetch %s %s" % (url, ref)) repo.fetch(url, ref) - end_time = time.time() - total_time = end_time - start_time - self.log.info("Synced change %s in %0.5f seconds.", self.change_id, total_time) class CheckReposTask(Task): # on startup, check all projects @@ -1340,6 +1372,7 @@ class Sync(object): self.submitTask(SyncProjectListTask(HIGH_PRIORITY)) self.submitTask(SyncSubscribedProjectsTask(NORMAL_PRIORITY)) self.submitTask(SyncSubscribedProjectBranchesTask(LOW_PRIORITY)) + self.submitTask(SyncOutdatedChangesTask(LOW_PRIORITY)) self.submitTask(PruneDatabaseTask(self.app.config.expire_age, LOW_PRIORITY)) self.periodic_thread = threading.Thread(target=self.periodicSync) self.periodic_thread.daemon = True @@ -1355,6 +1388,7 @@ class Sync(object): if now-hourly > 3600: hourly = now self.pruneDatabase() + self.syncOutdatedChanges() except Exception: self.log.exception('Exception in periodicSync') @@ -1484,6 +1518,13 @@ class Sync(object): for subtask in task.tasks: subtask.wait() + def syncOutdatedChanges(self): + task = SyncOutdatedChangesTask(LOW_PRIORITY) + self.submitTask(task) + if task.wait(): + for subtask in task.tasks: + subtask.wait() + def _syncChangeByCommit(self, commit, priority): # Accumulate sync change by commit tasks because they often # come in batches. This method assumes it is being called