Sync own changes regardless of subscription

Currently if you submit a change to a project to which you are not
subscribed, it will not automatically be updated.  This change adds
a sync task that behaves like a project sync but instead of querying
a particular project, queries the set of changes owned by the user
but not in subscribed projects.

A new db table is added to store the last updated time for this query,
and is constructed in such a way as to support future similar sync
queries.

Change-Id: I2d51e0e68cc4f6457edaf2c8f8c0e5f6c0b87aed
This commit is contained in:
James E. Blair 2015-03-26 16:14:32 -07:00
parent 35f5af52e7
commit caac2d18fe
3 changed files with 118 additions and 5 deletions

View File

@ -0,0 +1,26 @@
"""add query sync table
Revision ID: 1bb187bcd401
Revises: 3cc7e3753dc3
Create Date: 2015-03-26 07:32:33.584657
"""
# revision identifiers, used by Alembic.
revision = '1bb187bcd401'
down_revision = '3cc7e3753dc3'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table('sync_query',
sa.Column('key', sa.Integer(), nullable=False),
sa.Column('name', sa.String(255), index=True, unique=True, nullable=False),
sa.Column('updated', sa.DateTime, index=True),
sa.PrimaryKeyConstraint('key')
)
def downgrade():
pass

View File

@ -145,6 +145,12 @@ pending_cherry_pick_table = Table(
Column('branch', String(255), nullable=False), Column('branch', String(255), nullable=False),
Column('message', Text, nullable=False), Column('message', Text, nullable=False),
) )
sync_query_table = Table(
'sync_query', metadata,
Column('key', Integer, primary_key=True),
Column('name', String(255), index=True, unique=True, nullable=False),
Column('updated', DateTime, index=True),
)
class Account(object): class Account(object):
def __init__(self, id, name=None, username=None, email=None): def __init__(self, id, name=None, username=None, email=None):
@ -419,6 +425,10 @@ class PendingCherryPick(object):
self.branch = branch self.branch = branch
self.message = message self.message = message
class SyncQuery(object):
def __init__(self, name):
self.name = name
mapper(Account, account_table) mapper(Account, account_table)
mapper(Project, project_table, properties=dict( mapper(Project, project_table, properties=dict(
branches=relationship(Branch, backref='project', branches=relationship(Branch, backref='project',
@ -482,6 +492,7 @@ mapper(PermittedLabel, permitted_label_table)
mapper(Approval, approval_table, properties=dict( mapper(Approval, approval_table, properties=dict(
reviewer=relationship(Account))) reviewer=relationship(Account)))
mapper(PendingCherryPick, pending_cherry_pick_table) mapper(PendingCherryPick, pending_cherry_pick_table)
mapper(SyncQuery, sync_query_table)
class Database(object): class Database(object):
def __init__(self, app): def __init__(self, app):
@ -578,6 +589,12 @@ class DatabaseSession(object):
except sqlalchemy.orm.exc.NoResultFound: except sqlalchemy.orm.exc.NoResultFound:
return None return None
def getSyncQueryByName(self, name):
try:
return self.session().query(SyncQuery).filter_by(name=name).one()
except sqlalchemy.orm.exc.NoResultFound:
return self.createSyncQuery(name)
def getChange(self, key): def getChange(self, key):
try: try:
return self.session().query(Change).filter_by(key=key).one() return self.session().query(Change).filter_by(key=key).one()
@ -738,3 +755,9 @@ class DatabaseSession(object):
self.session().add(a) self.session().add(a)
self.session().flush() self.session().flush()
return a return a
def createSyncQuery(self, *args, **kw):
o = SyncQuery(*args, **kw)
self.session().add(o)
self.session().flush()
return o

View File

@ -42,6 +42,9 @@ LOW_PRIORITY=2
TIMEOUT=30 TIMEOUT=30
CLOSED_STATUSES = ['MERGED', 'ABANDONED']
class MultiQueue(object): class MultiQueue(object):
def __init__(self, priorities): def __init__(self, priorities):
try: try:
@ -237,10 +240,11 @@ class SyncSubscribedProjectsTask(Task):
t = SyncProjectTask(keys[i:i+10], self.priority) t = SyncProjectTask(keys[i:i+10], self.priority)
self.tasks.append(t) self.tasks.append(t)
sync.submitTask(t) sync.submitTask(t)
t = SyncOwnChangesTask(self.priority)
self.tasks.append(t)
sync.submitTask(t)
class SyncProjectTask(Task): class SyncProjectTask(Task):
_closed_statuses = ['MERGED', 'ABANDONED']
def __init__(self, project_keys, priority=NORMAL_PRIORITY): def __init__(self, project_keys, priority=NORMAL_PRIORITY):
super(SyncProjectTask, self).__init__(priority) super(SyncProjectTask, self).__init__(priority)
if type(project_keys) == int: if type(project_keys) == int:
@ -290,7 +294,7 @@ class SyncProjectTask(Task):
for c in changes: for c in changes:
# For now, just sync open changes or changes already # For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever # in the db optionally we could sync all changes ever
if c['id'] in change_ids or (c['status'] not in self._closed_statuses): if c['id'] in change_ids or (c['status'] not in CLOSED_STATUSES):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority)) sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
for key in self.project_keys: for key in self.project_keys:
sync.submitTask(SetProjectUpdatedTask(key, now, priority=self.priority)) sync.submitTask(SetProjectUpdatedTask(key, now, priority=self.priority))
@ -310,6 +314,67 @@ class SetProjectUpdatedTask(Task):
project = session.getProject(self.project_key) project = session.getProject(self.project_key)
project.updated = self.updated project.updated = self.updated
class SyncOwnChangesTask(Task):
query_name = 'owner'
def __repr__(self):
return '<SyncOwnChangesTask>'
def run(self, sync):
app = sync.app
now = datetime.datetime.utcnow()
with app.db.getSession() as session:
sync_query = session.getSyncQueryByName(self.query_name)
query = 'q=is:owner'
if sync_query.updated:
# Allow 4 seconds for request time, etc.
query += ' -age:%ss' % (int(math.ceil((now-sync_query.updated).total_seconds())) + 4,)
else:
query += ' status:open'
for project in session.getProjects(subscribed=True):
query += ' -project:%s' % project.name
changes = []
sortkey = ''
done = False
while not done:
# We don't actually want to limit to 500, but that's the server-side default, and
# if we don't specify this, we won't get a _more_changes flag.
q = 'changes/?n=500%s&%s' % (sortkey, query)
self.log.debug('Query: %s ' % (q,))
batch = sync.get(q)
done = True
if batch:
changes += batch
if '_more_changes' in batch[-1]:
sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
done = False
change_ids = [c['id'] for c in changes]
with app.db.getSession() as session:
# Winnow the list of IDs to only the ones in the local DB.
change_ids = session.getChangeIDs(change_ids)
for c in changes:
# For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever
if c['id'] in change_ids or (c['status'] not in CLOSED_STATUSES):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
sync.submitTask(SetSyncQueryUpdatedTask(self.query_name, now, priority=self.priority))
class SetSyncQueryUpdatedTask(Task):
def __init__(self, query_name, updated, priority=NORMAL_PRIORITY):
super(SetSyncQueryUpdatedTask, self).__init__(priority)
self.query_name = query_name
self.updated = updated
def __repr__(self):
return '<SetSyncQueryUpdatedTask %s %s>' % (self.query_name, self.updated)
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
sync_query = session.getSyncQueryByName(self.query_name)
sync_query.updated = self.updated
class SyncChangeByCommitTask(Task): class SyncChangeByCommitTask(Task):
def __init__(self, commit, priority=NORMAL_PRIORITY): def __init__(self, commit, priority=NORMAL_PRIORITY):
super(SyncChangeByCommitTask, self).__init__(priority) super(SyncChangeByCommitTask, self).__init__(priority)
@ -431,8 +496,7 @@ class SyncChangeTask(Task):
# TODO: handle multiple parents # TODO: handle multiple parents
if revision.parent not in parent_commits: if revision.parent not in parent_commits:
parent_revision = session.getRevisionByCommit(revision.parent) parent_revision = session.getRevisionByCommit(revision.parent)
# TODO: use a singleton list of closed states if not parent_revision and change.status not in CLOSED_STATUSES:
if not parent_revision and change.status not in ['MERGED', 'ABANDONED']:
sync.submitTask(SyncChangeByCommitTask(revision.parent, self.priority)) sync.submitTask(SyncChangeByCommitTask(revision.parent, self.priority))
self.log.debug("Change %s revision %s needs parent commit %s synced" % self.log.debug("Change %s revision %s needs parent commit %s synced" %
(change.id, remote_revision['_number'], revision.parent)) (change.id, remote_revision['_number'], revision.parent))