diff --git a/gertty/alembic/versions/1bb187bcd401_add_query_sync_table.py b/gertty/alembic/versions/1bb187bcd401_add_query_sync_table.py new file mode 100644 index 0000000..13c76c0 --- /dev/null +++ b/gertty/alembic/versions/1bb187bcd401_add_query_sync_table.py @@ -0,0 +1,26 @@ +"""add query sync table + +Revision ID: 1bb187bcd401 +Revises: 3cc7e3753dc3 +Create Date: 2015-03-26 07:32:33.584657 + +""" + +# revision identifiers, used by Alembic. +revision = '1bb187bcd401' +down_revision = '3cc7e3753dc3' + +from alembic import op +import sqlalchemy as sa + + +def upgrade(): + op.create_table('sync_query', + sa.Column('key', sa.Integer(), nullable=False), + sa.Column('name', sa.String(255), index=True, unique=True, nullable=False), + sa.Column('updated', sa.DateTime, index=True), + sa.PrimaryKeyConstraint('key') + ) + +def downgrade(): + pass diff --git a/gertty/db.py b/gertty/db.py index 716cc77..2625027 100644 --- a/gertty/db.py +++ b/gertty/db.py @@ -145,6 +145,12 @@ pending_cherry_pick_table = Table( Column('branch', String(255), nullable=False), Column('message', Text, nullable=False), ) +sync_query_table = Table( + 'sync_query', metadata, + Column('key', Integer, primary_key=True), + Column('name', String(255), index=True, unique=True, nullable=False), + Column('updated', DateTime, index=True), + ) class Account(object): def __init__(self, id, name=None, username=None, email=None): @@ -419,6 +425,10 @@ class PendingCherryPick(object): self.branch = branch self.message = message +class SyncQuery(object): + def __init__(self, name): + self.name = name + mapper(Account, account_table) mapper(Project, project_table, properties=dict( branches=relationship(Branch, backref='project', @@ -482,6 +492,7 @@ mapper(PermittedLabel, permitted_label_table) mapper(Approval, approval_table, properties=dict( reviewer=relationship(Account))) mapper(PendingCherryPick, pending_cherry_pick_table) +mapper(SyncQuery, sync_query_table) class Database(object): def __init__(self, app): @@ -578,6 +589,12 @@ class DatabaseSession(object): except sqlalchemy.orm.exc.NoResultFound: return None + def getSyncQueryByName(self, name): + try: + return self.session().query(SyncQuery).filter_by(name=name).one() + except sqlalchemy.orm.exc.NoResultFound: + return self.createSyncQuery(name) + def getChange(self, key): try: return self.session().query(Change).filter_by(key=key).one() @@ -738,3 +755,9 @@ class DatabaseSession(object): self.session().add(a) self.session().flush() return a + + def createSyncQuery(self, *args, **kw): + o = SyncQuery(*args, **kw) + self.session().add(o) + self.session().flush() + return o diff --git a/gertty/sync.py b/gertty/sync.py index da0ab88..6a287fe 100644 --- a/gertty/sync.py +++ b/gertty/sync.py @@ -42,6 +42,9 @@ LOW_PRIORITY=2 TIMEOUT=30 +CLOSED_STATUSES = ['MERGED', 'ABANDONED'] + + class MultiQueue(object): def __init__(self, priorities): try: @@ -237,10 +240,11 @@ class SyncSubscribedProjectsTask(Task): t = SyncProjectTask(keys[i:i+10], self.priority) self.tasks.append(t) sync.submitTask(t) + t = SyncOwnChangesTask(self.priority) + self.tasks.append(t) + sync.submitTask(t) class SyncProjectTask(Task): - _closed_statuses = ['MERGED', 'ABANDONED'] - def __init__(self, project_keys, priority=NORMAL_PRIORITY): super(SyncProjectTask, self).__init__(priority) if type(project_keys) == int: @@ -290,7 +294,7 @@ class SyncProjectTask(Task): for c in changes: # For now, just sync open changes or changes already # in the db optionally we could sync all changes ever - if c['id'] in change_ids or (c['status'] not in self._closed_statuses): + if c['id'] in change_ids or (c['status'] not in CLOSED_STATUSES): sync.submitTask(SyncChangeTask(c['id'], priority=self.priority)) for key in self.project_keys: sync.submitTask(SetProjectUpdatedTask(key, now, priority=self.priority)) @@ -310,6 +314,67 @@ class SetProjectUpdatedTask(Task): project = session.getProject(self.project_key) project.updated = self.updated +class SyncOwnChangesTask(Task): + query_name = 'owner' + + def __repr__(self): + return '' + + def run(self, sync): + app = sync.app + now = datetime.datetime.utcnow() + with app.db.getSession() as session: + sync_query = session.getSyncQueryByName(self.query_name) + query = 'q=is:owner' + if sync_query.updated: + # Allow 4 seconds for request time, etc. + query += ' -age:%ss' % (int(math.ceil((now-sync_query.updated).total_seconds())) + 4,) + else: + query += ' status:open' + for project in session.getProjects(subscribed=True): + query += ' -project:%s' % project.name + changes = [] + sortkey = '' + done = False + while not done: + # We don't actually want to limit to 500, but that's the server-side default, and + # if we don't specify this, we won't get a _more_changes flag. + q = 'changes/?n=500%s&%s' % (sortkey, query) + self.log.debug('Query: %s ' % (q,)) + batch = sync.get(q) + done = True + if batch: + changes += batch + if '_more_changes' in batch[-1]: + sortkey = '&N=%s' % (batch[-1]['_sortkey'],) + done = False + change_ids = [c['id'] for c in changes] + with app.db.getSession() as session: + # Winnow the list of IDs to only the ones in the local DB. + change_ids = session.getChangeIDs(change_ids) + + for c in changes: + # For now, just sync open changes or changes already + # in the db optionally we could sync all changes ever + if c['id'] in change_ids or (c['status'] not in CLOSED_STATUSES): + sync.submitTask(SyncChangeTask(c['id'], priority=self.priority)) + sync.submitTask(SetSyncQueryUpdatedTask(self.query_name, now, priority=self.priority)) + +class SetSyncQueryUpdatedTask(Task): + def __init__(self, query_name, updated, priority=NORMAL_PRIORITY): + super(SetSyncQueryUpdatedTask, self).__init__(priority) + self.query_name = query_name + self.updated = updated + + def __repr__(self): + return '' % (self.query_name, self.updated) + + def run(self, sync): + app = sync.app + with app.db.getSession() as session: + sync_query = session.getSyncQueryByName(self.query_name) + sync_query.updated = self.updated + class SyncChangeByCommitTask(Task): def __init__(self, commit, priority=NORMAL_PRIORITY): super(SyncChangeByCommitTask, self).__init__(priority) @@ -431,8 +496,7 @@ class SyncChangeTask(Task): # TODO: handle multiple parents if revision.parent not in parent_commits: parent_revision = session.getRevisionByCommit(revision.parent) - # TODO: use a singleton list of closed states - if not parent_revision and change.status not in ['MERGED', 'ABANDONED']: + if not parent_revision and change.status not in CLOSED_STATUSES: sync.submitTask(SyncChangeByCommitTask(revision.parent, self.priority)) self.log.debug("Change %s revision %s needs parent commit %s synced" % (change.id, remote_revision['_number'], revision.parent))