Do not enqueue duplicate tasks

If a change requires a parent change to be synced, it might enqueue
a sync change task for each patchset with the same parent change.
This might result in many times the number of change syncs as
should be required.  Instead, if a task is already enqueued in
a given queue (for the same priority), do not enqueue that task.

The 'has' method on the queue is no longer necessary, so is removed.

Change-Id: I6ebd67b78f99142a401766bed645ae6968c63bf8
This commit is contained in:
James E. Blair 2015-04-11 11:17:56 -04:00
parent ad5a6f68c1
commit a5eb452f2b
1 changed files with 143 additions and 17 deletions

View File

@ -65,7 +65,8 @@ class MultiQueue(object):
def put(self, item, priority):
self.condition.acquire()
try:
self.queues[priority].append(item)
if item not in self.queues[priority]:
self.queues[priority].append(item)
self.condition.notify()
finally:
self.condition.release()
@ -84,16 +85,6 @@ class MultiQueue(object):
finally:
self.condition.release()
def has(self, item):
self.condition.acquire()
try:
for queue in self.queues.values():
for qitem in queue:
if isinstance(qitem, item):
return True
finally:
self.condition.release()
return False
class UpdateEvent(object):
def updateRelatedChanges(self, session, change):
@ -158,10 +149,18 @@ class Task(object):
self.event.wait(timeout)
return self.succeeded
def __eq__(self, other):
raise NotImplementedError()
class SyncOwnAccountTask(Task):
def __repr__(self):
return '<SyncOwnAccountTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
remote = sync.get('accounts/self')
@ -176,6 +175,11 @@ class SyncProjectListTask(Task):
def __repr__(self):
return '<SyncProjectListTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
remote = sync.get('projects/?d')
@ -200,6 +204,11 @@ class SyncSubscribedProjectBranchesTask(Task):
def __repr__(self):
return '<SyncSubscribedProjectBranchesTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -217,6 +226,12 @@ class SyncProjectBranchesTask(Task):
def __repr__(self):
return '<SyncProjectBranchesTask %s>' % (self.project_name,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_name == self.project_name):
return True
return False
def run(self, sync):
app = sync.app
remote = sync.get('projects/%s/branches/' % urllib.quote_plus(self.project_name))
@ -244,6 +259,11 @@ class SyncSubscribedProjectsTask(Task):
def __repr__(self):
return '<SyncSubscribedProjectsTask>'
def __eq__(self, other):
if (other.__class__ == self.__class__):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -269,6 +289,12 @@ class SyncProjectTask(Task):
def __repr__(self):
return '<SyncProjectTask %s>' % (self.project_keys,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_keys == self.project_keys):
return True
return False
def run(self, sync):
app = sync.app
now = datetime.datetime.utcnow()
@ -323,6 +349,13 @@ class SetProjectUpdatedTask(Task):
def __repr__(self):
return '<SetProjectUpdatedTask %s %s>' % (self.project_key, self.updated)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_key == self.project_key and
other.updated == self.updated):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -338,6 +371,13 @@ class SyncQueriedChangesTask(Task):
def __repr__(self):
return '<SyncQueriedChangesTask %s>' % self.query_name
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.query_name == self.query_name and
other.query == self.query):
return True
return False
def run(self, sync):
app = sync.app
now = datetime.datetime.utcnow()
@ -387,6 +427,13 @@ class SetSyncQueryUpdatedTask(Task):
def __repr__(self):
return '<SetSyncQueryUpdatedTask %s %s>' % (self.query_name, self.updated)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.query_name == self.query_name and
other.updated == self.updated):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -401,6 +448,12 @@ class SyncChangeByCommitTask(Task):
def __repr__(self):
return '<SyncChangeByCommitTask %s>' % (self.commit,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.commit == self.commit):
return True
return False
def run(self, sync):
query = 'commit:%s' % self.commit
changes = sync.get('changes/?q=%s' % query)
@ -417,6 +470,12 @@ class SyncChangeByNumberTask(Task):
def __repr__(self):
return '<SyncChangeByNumberTask %s>' % (self.number,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.number == self.number):
return True
return False
def run(self, sync):
query = '%s' % self.number
changes = sync.get('changes/?q=%s' % query)
@ -436,6 +495,13 @@ class SyncChangeTask(Task):
def __repr__(self):
return '<SyncChangeTask %s>' % (self.change_id,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_id == self.change_id and
other.force_fetch == self.force_fetch):
return True
return False
def run(self, sync):
start_time = time.time()
app = sync.app
@ -726,6 +792,11 @@ class CheckReposTask(Task):
def __repr__(self):
return '<CheckReposTask>'
def __eq__(self, other):
if (other.__class__ == self.__class__):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -752,6 +823,12 @@ class CheckRevisionsTask(Task):
def __repr__(self):
return '<CheckRevisionsTask %s>' % (self.project_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_key == self.project_key):
return True
return False
def run(self, sync):
app = sync.app
to_sync = set()
@ -777,6 +854,11 @@ class UploadReviewsTask(Task):
def __repr__(self):
return '<UploadReviewsTask>'
def __eq__(self, other):
if (other.__class__ == self.__class__):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -803,6 +885,12 @@ class SetTopicTask(Task):
def __repr__(self):
return '<SetTopicTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -822,6 +910,12 @@ class RebaseChangeTask(Task):
def __repr__(self):
return '<RebaseChangeTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -839,6 +933,12 @@ class ChangeStarredTask(Task):
def __repr__(self):
return '<ChangeStarredTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -860,6 +960,12 @@ class ChangeStatusTask(Task):
def __repr__(self):
return '<ChangeStatusTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -889,6 +995,12 @@ class SendCherryPickTask(Task):
def __repr__(self):
return '<SendCherryPickTask %s>' % (self.cp_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.cp_key == self.cp_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -911,6 +1023,12 @@ class ChangeCommitMessageTask(Task):
def __repr__(self):
return '<ChangeCommitMessageTask %s>' % (self.revision_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.revision_key == self.revision_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
@ -932,6 +1050,12 @@ class UploadReviewTask(Task):
def __repr__(self):
return '<UploadReviewTask %s>' % (self.message_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.message_key == self.message_key):
return True
return False
def run(self, sync):
app = sync.app
@ -1034,7 +1158,10 @@ class Sync(object):
def submitTask(self, task):
if not self.offline:
self.queue.put(task, task.priority)
if not self.queue.put(task, task.priority):
task.complete(False)
else:
task.complete(False)
def run(self, pipe):
task = None
@ -1051,8 +1178,7 @@ class Sync(object):
except requests.ConnectionError, e:
self.log.warning("Offline due to: %s" % (e,))
if not self.offline:
if not self.queue.has(UploadReviewsTask):
self.submitTask(UploadReviewsTask(HIGH_PRIORITY))
self.submitTask(UploadReviewsTask(HIGH_PRIORITY))
self.offline = True
self.app.status.update(offline=True, refresh=False)
os.write(pipe, 'refresh\n')
@ -1135,6 +1261,6 @@ class Sync(object):
def syncSubscribedProjects(self):
task = SyncSubscribedProjectsTask(LOW_PRIORITY)
self.submitTask(task)
task.wait()
for subtask in task.tasks:
subtask.wait()
if task.wait():
for subtask in task.tasks:
subtask.wait()