gertty/gertty/sync.py

1600 lines
66 KiB
Python

# Copyright 2014 OpenStack Foundation
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import errno
import logging
import math
import os
import re
import threading
import json
import time
import datetime
import warnings
import dateutil.parser
try:
import ordereddict
except:
pass
import requests
import requests.utils
import six
from six.moves import queue
from six.moves.urllib import parse as urlparse
import gertty.version
from gertty import gitrepo
from gertty.auth import FormAuth
HIGH_PRIORITY=0
NORMAL_PRIORITY=1
LOW_PRIORITY=2
TIMEOUT=30
CLOSED_STATUSES = ['MERGED', 'ABANDONED']
class OfflineError(Exception):
pass
class MultiQueue(object):
def __init__(self, priorities):
try:
self.queues = collections.OrderedDict()
except AttributeError:
self.queues = ordereddict.OrderedDict()
for key in priorities:
self.queues[key] = collections.deque()
self.condition = threading.Condition()
self.incomplete = []
def qsize(self):
count = 0
self.condition.acquire()
try:
for queue in self.queues.values():
count += len(queue)
return count + len(self.incomplete)
finally:
self.condition.release()
def put(self, item, priority):
added = False
self.condition.acquire()
try:
if item not in self.queues[priority]:
self.queues[priority].append(item)
added = True
self.condition.notify()
finally:
self.condition.release()
return added
def get(self):
self.condition.acquire()
try:
while True:
for queue in self.queues.values():
try:
ret = queue.popleft()
self.incomplete.append(ret)
return ret
except IndexError:
pass
self.condition.wait()
finally:
self.condition.release()
def find(self, klass, priority):
results = []
self.condition.acquire()
try:
for item in self.queues[priority]:
if isinstance(item, klass):
results.append(item)
finally:
self.condition.release()
return results
def complete(self, item):
self.condition.acquire()
try:
if item in self.incomplete:
self.incomplete.remove(item)
finally:
self.condition.release()
class UpdateEvent(object):
def updateRelatedChanges(self, session, change):
related_change_keys = set()
related_change_keys.add(change.key)
for revision in change.revisions:
parent = session.getRevisionByCommit(revision.parent)
if parent:
related_change_keys.add(parent.change.key)
for child in session.getRevisionsByParent(revision.commit):
related_change_keys.add(child.change.key)
self.related_change_keys = related_change_keys
class ProjectAddedEvent(UpdateEvent):
def __repr__(self):
return '<ProjectAddedEvent project_key:%s>' % (
self.project_key,)
def __init__(self, project):
self.project_key = project.key
class ChangeAddedEvent(UpdateEvent):
def __repr__(self):
return '<ChangeAddedEvent project_key:%s change_key:%s>' % (
self.project_key, self.change_key)
def __init__(self, change):
self.project_key = change.project.key
self.change_key = change.key
self.related_change_keys = set()
self.review_flag_changed = True
self.status_changed = True
self.held_changed = False
class ChangeUpdatedEvent(UpdateEvent):
def __repr__(self):
return '<ChangeUpdatedEvent project_key:%s change_key:%s review_flag_changed:%s status_changed:%s>' % (
self.project_key, self.change_key, self.review_flag_changed, self.status_changed)
def __init__(self, change):
self.project_key = change.project.key
self.change_key = change.key
self.related_change_keys = set()
self.review_flag_changed = False
self.status_changed = False
self.held_changed = False
class Task(object):
def __init__(self, priority=NORMAL_PRIORITY):
self.log = logging.getLogger('gertty.sync')
self.priority = priority
self.succeeded = None
self.event = threading.Event()
self.tasks = []
self.results = []
def complete(self, success):
self.succeeded = success
self.event.set()
def wait(self, timeout=None):
self.event.wait(timeout)
return self.succeeded
def __eq__(self, other):
raise NotImplementedError()
class SyncOwnAccountTask(Task):
def __repr__(self):
return '<SyncOwnAccountTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
remote = sync.get('accounts/self')
sync.account_id = remote['_account_id']
with app.db.getSession() as session:
session.getAccountByID(remote['_account_id'],
remote.get('name'),
remote.get('username'),
remote.get('email'))
class GetVersionTask(Task):
def __repr__(self):
return '<GetVersionTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
version = sync.get('config/server/version')
sync.setRemoteVersion(version)
class SyncProjectListTask(Task):
def __repr__(self):
return '<SyncProjectListTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
remote = sync.get('projects/?d')
remote_keys = set(remote.keys())
with app.db.getSession() as session:
local = {}
for p in session.getProjects():
local[p.name] = p
local_keys = set(local.keys())
for name in local_keys-remote_keys:
self.log.info("Deleted project %s", name)
local[name].delete()
for name in remote_keys-local_keys:
p = remote[name]
project = session.createProject(name,
description=p.get('description', ''))
self.log.info("Created project %s", project.name)
self.results.append(ProjectAddedEvent(project))
class SyncSubscribedProjectBranchesTask(Task):
def __repr__(self):
return '<SyncSubscribedProjectBranchesTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
projects = session.getProjects(subscribed=True)
for p in projects:
sync.submitTask(SyncProjectBranchesTask(p.name, self.priority))
class SyncProjectBranchesTask(Task):
branch_re = re.compile(r'refs/heads/(.*)')
def __init__(self, project_name, priority=NORMAL_PRIORITY):
super(SyncProjectBranchesTask, self).__init__(priority)
self.project_name = project_name
def __repr__(self):
return '<SyncProjectBranchesTask %s>' % (self.project_name,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_name == self.project_name):
return True
return False
def run(self, sync):
app = sync.app
remote = sync.get('projects/%s/branches/' % urlparse.quote_plus(self.project_name))
remote_branches = set()
for x in remote:
m = self.branch_re.match(x['ref'])
if m:
remote_branches.add(m.group(1))
with app.db.getSession() as session:
local = {}
project = session.getProjectByName(self.project_name)
for branch in project.branches:
local[branch.name] = branch
local_branches = set(local.keys())
for name in local_branches-remote_branches:
session.delete(local[name])
self.log.info("Deleted branch %s from project %s in local DB.", name, project.name)
for name in remote_branches-local_branches:
project.createBranch(name)
self.log.info("Added branch %s to project %s in local DB.", name, project.name)
class SyncSubscribedProjectsTask(Task):
def __repr__(self):
return '<SyncSubscribedProjectsTask>'
def __eq__(self, other):
if (other.__class__ == self.__class__):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
keys = [p.key for p in session.getProjects(subscribed=True)]
for i in range(0, len(keys), 10):
t = SyncProjectTask(keys[i:i+10], self.priority)
self.tasks.append(t)
sync.submitTask(t)
t = SyncQueriedChangesTask('owner', 'is:owner', self.priority)
self.tasks.append(t)
sync.submitTask(t)
t = SyncQueriedChangesTask('starred', 'is:starred', self.priority)
self.tasks.append(t)
sync.submitTask(t)
class SyncProjectTask(Task):
def __init__(self, project_keys, priority=NORMAL_PRIORITY):
super(SyncProjectTask, self).__init__(priority)
if type(project_keys) == int:
project_keys = [project_keys]
self.project_keys = project_keys
def __repr__(self):
return '<SyncProjectTask %s>' % (self.project_keys,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_keys == self.project_keys):
return True
return False
def run(self, sync):
app = sync.app
now = datetime.datetime.utcnow()
queries = []
with app.db.getSession() as session:
for project_key in self.project_keys:
project = session.getProject(project_key)
query = 'q=project:%s' % project.name
if project.updated:
# Allow 4 seconds for request time, etc.
query += ' -age:%ss' % (int(math.ceil((now-project.updated).total_seconds())) + 4,)
else:
query += ' status:open'
queries.append(query)
changes = sync.query(queries)
change_ids = [c['id'] for c in changes]
with app.db.getSession() as session:
# Winnow the list of IDs to only the ones in the local DB.
change_ids = session.getChangeIDs(change_ids)
for c in changes:
# For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever
if c['id'] in change_ids or (c['status'] not in CLOSED_STATUSES):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
for key in self.project_keys:
sync.submitTask(SetProjectUpdatedTask(key, now, priority=self.priority))
class SetProjectUpdatedTask(Task):
def __init__(self, project_key, updated, priority=NORMAL_PRIORITY):
super(SetProjectUpdatedTask, self).__init__(priority)
self.project_key = project_key
self.updated = updated
def __repr__(self):
return '<SetProjectUpdatedTask %s %s>' % (self.project_key, self.updated)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_key == self.project_key and
other.updated == self.updated):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
project = session.getProject(self.project_key)
project.updated = self.updated
class SyncQueriedChangesTask(Task):
def __init__(self, query_name, query, priority=NORMAL_PRIORITY):
super(SyncQueriedChangesTask, self).__init__(priority)
self.query_name = query_name
self.query = query
def __repr__(self):
return '<SyncQueriedChangesTask %s>' % self.query_name
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.query_name == self.query_name and
other.query == self.query):
return True
return False
def run(self, sync):
app = sync.app
now = datetime.datetime.utcnow()
with app.db.getSession() as session:
sync_query = session.getSyncQueryByName(self.query_name)
query = 'q=%s' % self.query
if sync_query.updated:
# Allow 4 seconds for request time, etc.
query += ' -age:%ss' % (int(math.ceil((now-sync_query.updated).total_seconds())) + 4,)
else:
query += ' status:open'
for project in session.getProjects(subscribed=True):
query += ' -project:%s' % project.name
changes = []
sortkey = ''
done = False
offset = 0
while not done:
# We don't actually want to limit to 500, but that's the server-side default, and
# if we don't specify this, we won't get a _more_changes flag.
q = 'changes/?n=500%s&%s' % (sortkey, query)
self.log.debug('Query: %s ' % (q,))
batch = sync.get(q)
done = True
if batch:
changes += batch
if '_more_changes' in batch[-1]:
done = False
if '_sortkey' in batch[-1]:
sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
else:
offset += len(batch)
sortkey = '&start=%s' % (offset,)
change_ids = [c['id'] for c in changes]
with app.db.getSession() as session:
# Winnow the list of IDs to only the ones in the local DB.
change_ids = session.getChangeIDs(change_ids)
for c in changes:
# For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever
if c['id'] in change_ids or (c['status'] not in CLOSED_STATUSES):
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
sync.submitTask(SetSyncQueryUpdatedTask(self.query_name, now, priority=self.priority))
class SetSyncQueryUpdatedTask(Task):
def __init__(self, query_name, updated, priority=NORMAL_PRIORITY):
super(SetSyncQueryUpdatedTask, self).__init__(priority)
self.query_name = query_name
self.updated = updated
def __repr__(self):
return '<SetSyncQueryUpdatedTask %s %s>' % (self.query_name, self.updated)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.query_name == self.query_name and
other.updated == self.updated):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
sync_query = session.getSyncQueryByName(self.query_name)
sync_query.updated = self.updated
class SyncChangesByCommitsTask(Task):
def __init__(self, commits, priority=NORMAL_PRIORITY):
super(SyncChangesByCommitsTask, self).__init__(priority)
self.commits = commits
def __repr__(self):
return '<SyncChangesByCommitsTask %s>' % (self.commits,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.commits == self.commits):
return True
return False
def run(self, sync):
query = ' OR '.join(['commit:%s' % x for x in self.commits])
changes = sync.get('changes/?q=%s' % query)
self.log.debug('Query: %s ' % (query,))
for c in changes:
sync.submitTask(SyncChangeTask(c['id'], priority=self.priority))
self.log.debug("Sync change %s for its commit" % (c['id'],))
def addCommit(self, commit):
if commit in self.commits:
return True
# 100 should be under the URL length limit
if len(self.commits) >= 100:
return False
self.commits.append(commit)
return True
class SyncChangeByNumberTask(Task):
def __init__(self, number, priority=NORMAL_PRIORITY):
super(SyncChangeByNumberTask, self).__init__(priority)
self.number = number
def __repr__(self):
return '<SyncChangeByNumberTask %s>' % (self.number,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.number == self.number):
return True
return False
def run(self, sync):
query = '%s' % self.number
changes = sync.get('changes/?q=%s' % query)
self.log.debug('Query: %s ' % (query,))
for c in changes:
task = SyncChangeTask(c['id'], priority=self.priority)
self.tasks.append(task)
sync.submitTask(task)
self.log.debug("Sync change %s because it is number %s" % (c['id'], self.number))
class SyncOutdatedChangesTask(Task):
def __init__(self, priority=NORMAL_PRIORITY):
super(SyncOutdatedChangesTask, self).__init__(priority)
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def __repr__(self):
return '<SyncOutdatedChangesTask>'
def run(self, sync):
with sync.app.db.getSession() as session:
for change in session.getOutdated():
self.log.debug("Sync outdated change %s" % (change.id,))
sync.submitTask(SyncChangeTask(change.id, priority=self.priority))
class SyncChangeTask(Task):
def __init__(self, change_id, force_fetch=False, priority=NORMAL_PRIORITY):
super(SyncChangeTask, self).__init__(priority)
self.change_id = change_id
self.force_fetch = force_fetch
def __repr__(self):
return '<SyncChangeTask %s>' % (self.change_id,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_id == self.change_id and
other.force_fetch == self.force_fetch):
return True
return False
def run(self, sync):
start_time = time.time()
try:
self._syncChange(sync)
end_time = time.time()
total_time = end_time - start_time
self.log.info("Synced change %s in %0.5f seconds.", self.change_id, total_time)
except Exception:
try:
self.log.error("Marking change %s outdated" % (self.change_id,))
with sync.app.db.getSession() as session:
change = session.getChangeByID(self.change_id)
if change:
change.outdated = True
except Exception:
self.log.exception("Error while marking change %s as outdated" % (self.change_id,))
raise
def _syncChange(self, sync):
app = sync.app
remote_change = sync.get('changes/%s?o=DETAILED_LABELS&o=ALL_REVISIONS&o=ALL_COMMITS&o=MESSAGES&o=DETAILED_ACCOUNTS&o=CURRENT_ACTIONS&o=ALL_FILES' % self.change_id)
# Perform subqueries this task will need outside of the db session
for remote_commit, remote_revision in remote_change.get('revisions', {}).items():
remote_comments_data = sync.get('changes/%s/revisions/%s/comments' % (self.change_id, remote_commit))
remote_revision['_gertty_remote_comments_data'] = remote_comments_data
try:
remote_conflicts = sync.query(['q=status:open+is:mergeable+conflicts:%s' %
remote_change['_number']])
except Exception:
self.log.exception("Unable to sync conflicts for change %s" % self.change_id)
warnings.warn("Unable to sync conflicts for change %s" % self.change_id)
remote_conflicts = []
fetches = collections.defaultdict(list)
parent_commits = set()
with app.db.getSession() as session:
change = session.getChangeByID(self.change_id)
account = session.getAccountByID(remote_change['owner']['_account_id'],
name=remote_change['owner'].get('name'),
username=remote_change['owner'].get('username'),
email=remote_change['owner'].get('email'))
if not change:
project = session.getProjectByName(remote_change['project'])
if not project:
self.log.debug("Project %s unknown while syncing change" % (
remote_change['project'],))
remote_project = sync.get('projects/%s' %
(urlparse.quote_plus(remote_change['project']),))
if remote_project:
project = session.createProject(
remote_project['name'],
description=remote_project.get('description', ''))
self.log.info("Created project %s", project.name)
self.results.append(ProjectAddedEvent(project))
sync.submitTask(SyncProjectBranchesTask(project.name, self.priority))
created = dateutil.parser.parse(remote_change['created'])
updated = dateutil.parser.parse(remote_change['updated'])
change = project.createChange(remote_change['id'], account, remote_change['_number'],
remote_change['branch'], remote_change['change_id'],
remote_change['subject'], created,
updated, remote_change['status'],
topic=remote_change.get('topic'))
self.log.info("Created new change %s in local DB.", change.id)
result = ChangeAddedEvent(change)
else:
result = ChangeUpdatedEvent(change)
app.project_cache.clear(change.project)
self.results.append(result)
change.owner = account
if change.status != remote_change['status']:
change.status = remote_change['status']
result.status_changed = True
if remote_change.get('starred'):
change.starred = True
else:
change.starred = False
change.subject = remote_change['subject']
change.updated = dateutil.parser.parse(remote_change['updated'])
change.topic = remote_change.get('topic')
unseen_conflicts = [x.id for x in change.conflicts]
for remote_conflict in remote_conflicts:
conflict_id = remote_conflict['id']
conflict = session.getChangeByID(conflict_id)
if not conflict:
self.log.info("Need to sync conflicting change %s for change %s.",
conflict_id, change.number)
sync.submitTask(SyncChangeTask(conflict_id, priority=self.priority))
else:
if conflict not in change.conflicts:
self.log.info("Added conflict %s for change %s in local DB.",
conflict.number, change.number)
change.addConflict(conflict)
self.results.append(ChangeUpdatedEvent(conflict))
if conflict_id in unseen_conflicts:
unseen_conflicts.remove(conflict_id)
for conflict_id in unseen_conflicts:
conflict = session.getChangeByID(conflict_id)
self.log.info("Deleted conflict %s for change %s in local DB.",
conflict.number, change.number)
change.delConflict(conflict)
self.results.append(ChangeUpdatedEvent(conflict))
repo = gitrepo.get_repo(change.project.name, app.config)
new_revision = False
for remote_commit, remote_revision in remote_change.get('revisions', {}).items():
revision = session.getRevisionByCommit(remote_commit)
# TODO: handle multiple parents
url = sync.app.config.git_url + change.project.name
if 'anonymous http' in remote_revision['fetch']:
ref = remote_revision['fetch']['anonymous http']['ref']
url = remote_revision['fetch']['anonymous http']['url']
auth = False
elif 'http' in remote_revision['fetch']:
auth = True
ref = remote_revision['fetch']['http']['ref']
url = list(urlparse.urlsplit(sync.app.config.url + change.project.name))
url[1] = '%s:%s@%s' % (
urlparse.quote_plus(sync.app.config.username),
urlparse.quote_plus(sync.app.config.password), url[1])
url = urlparse.urlunsplit(url)
elif 'ssh' in remote_revision['fetch']:
ref = remote_revision['fetch']['ssh']['ref']
url = remote_revision['fetch']['ssh']['url']
auth = False
elif 'git' in remote_revision['fetch']:
ref = remote_revision['fetch']['git']['ref']
url = remote_revision['fetch']['git']['url']
auth = False
else:
if len(remote_revision['fetch']):
errMessage = "No supported fetch method found. Server offers: %s" % (
', '.join(remote_revision['fetch'].keys()))
else:
errMessage = "The server is missing the download-commands plugin."
raise Exception(errMessage)
if (not revision) or self.force_fetch:
fetches[url].append('+%(ref)s:%(ref)s' % dict(ref=ref))
if not revision:
revision = change.createRevision(remote_revision['_number'],
remote_revision['commit']['message'], remote_commit,
remote_revision['commit']['parents'][0]['commit'],
auth, ref)
self.log.info("Created new revision %s for change %s revision %s in local DB.",
revision.key, self.change_id, remote_revision['_number'])
new_revision = True
revision.message = remote_revision['commit']['message']
actions = remote_revision.get('actions', {})
revision.can_submit = 'submit' in actions
# TODO: handle multiple parents
if revision.parent not in parent_commits:
parent_revision = session.getRevisionByCommit(revision.parent)
if not parent_revision and change.status not in CLOSED_STATUSES:
sync._syncChangeByCommit(revision.parent, self.priority)
self.log.debug("Change %s revision %s needs parent commit %s synced" %
(change.id, remote_revision['_number'], revision.parent))
parent_commits.add(revision.parent)
result.updateRelatedChanges(session, change)
f = revision.getFile('/COMMIT_MSG')
if f is None:
f = revision.createFile('/COMMIT_MSG', None,
None, None, None)
for remote_path, remote_file in remote_revision['files'].items():
f = revision.getFile(remote_path)
if f is None:
if remote_file.get('binary'):
inserted = deleted = None
else:
inserted = remote_file.get('lines_inserted', 0)
deleted = remote_file.get('lines_deleted', 0)
f = revision.createFile(remote_path, remote_file.get('status', 'M'),
remote_file.get('old_path'),
inserted, deleted)
remote_comments_data = remote_revision['_gertty_remote_comments_data']
for remote_file, remote_comments in remote_comments_data.items():
for remote_comment in remote_comments:
account = session.getAccountByID(remote_comment['author']['_account_id'],
name=remote_comment['author'].get('name'),
username=remote_comment['author'].get('username'),
email=remote_comment['author'].get('email'))
comment = session.getCommentByID(remote_comment['id'])
if not comment:
# Normalize updated -> created
created = dateutil.parser.parse(remote_comment['updated'])
parent = False
if remote_comment.get('side', '') == 'PARENT':
parent = True
fileobj = revision.getFile(remote_file)
if fileobj is None:
fileobj = revision.createFile(remote_file, 'M')
comment = fileobj.createComment(remote_comment['id'], account,
remote_comment.get('in_reply_to'),
created,
parent, remote_comment.get('line'),
remote_comment['message'])
self.log.info("Created new comment %s for revision %s in local DB.",
comment.key, revision.key)
else:
if comment.author != account:
comment.author = account
new_message = False
for remote_message in remote_change.get('messages', []):
if 'author' in remote_message:
account = session.getAccountByID(remote_message['author']['_account_id'],
name=remote_message['author'].get('name'),
username=remote_message['author'].get('username'),
email=remote_message['author'].get('email'))
if account.username != app.config.username:
new_message = True
else:
account = session.getSystemAccount()
message = session.getMessageByID(remote_message['id'])
if not message:
revision = session.getRevisionByNumber(change, remote_message.get('_revision_number', 1))
if revision:
# Normalize date -> created
created = dateutil.parser.parse(remote_message['date'])
message = revision.createMessage(remote_message['id'], account, created,
remote_message['message'])
self.log.info("Created new review message %s for revision %s in local DB.", message.key, revision.key)
else:
self.log.info("Unable to create new review message for revision %s because it is not in local DB (draft?).", remote_message.get('_revision_number'))
else:
if message.author != account:
message.author = account
remote_approval_entries = {}
remote_label_entries = {}
user_voted = False
for remote_label_name, remote_label_dict in remote_change.get('labels', {}).items():
for remote_approval in remote_label_dict.get('all', []):
if remote_approval.get('value') is None:
continue
remote_approval['category'] = remote_label_name
key = '%s~%s' % (remote_approval['category'], remote_approval['_account_id'])
remote_approval_entries[key] = remote_approval
if remote_approval['_account_id'] == sync.account_id and int(remote_approval['value']) != 0:
user_voted = True
for key, value in remote_label_dict.get('values', {}).items():
# +1: "LGTM"
label = dict(value=key,
description=value,
category=remote_label_name)
key = '%s~%s~%s' % (label['category'], label['value'], label['description'])
remote_label_entries[key] = label
remote_approval_keys = set(remote_approval_entries.keys())
remote_label_keys = set(remote_label_entries.keys())
local_approvals = {}
local_labels = {}
user_votes = {}
for approval in change.approvals:
if approval.draft and not new_revision:
# If we have a new revision, we need to delete
# draft local approvals because they can no longer
# be uploaded. Otherwise, keep them because we
# may be about to upload a review. Ignoring an
# approval here means it will not be deleted.
# Also keep track of these approvals so we can
# determine whether we should hold the change
# later.
user_votes[approval.category] = approval.value
# Count draft votes as having voted for the
# purposes of deciding whether to clear the
# reviewed flag later.
user_voted = True
continue
key = '%s~%s' % (approval.category, approval.reviewer.id)
if key in local_approvals:
# Delete duplicate approvals.
session.delete(approval)
else:
local_approvals[key] = approval
local_approval_keys = set(local_approvals.keys())
for label in change.labels:
key = '%s~%s~%s' % (label.category, label.value, label.description)
local_labels[key] = label
local_label_keys = set(local_labels.keys())
for key in local_approval_keys-remote_approval_keys:
session.delete(local_approvals[key])
for key in local_label_keys-remote_label_keys:
session.delete(local_labels[key])
for key in remote_approval_keys-local_approval_keys:
remote_approval = remote_approval_entries[key]
account = session.getAccountByID(remote_approval['_account_id'],
name=remote_approval.get('name'),
username=remote_approval.get('username'),
email=remote_approval.get('email'))
change.createApproval(account,
remote_approval['category'],
remote_approval['value'])
self.log.info("Created approval for change %s in local DB.", change.id)
user_value = user_votes.get(remote_approval['category'], 0)
if user_value > 0 and remote_approval['value'] < 0:
# Someone left a negative vote after the local
# user created a draft positive vote. Hold the
# change so that it doesn't look like the local
# user is ignoring negative feedback.
if not change.held:
change.held = True
result.held_changed = True
self.log.info("Setting change %s to held due to negative review after positive", change.id)
for key in remote_label_keys-local_label_keys:
remote_label = remote_label_entries[key]
change.createLabel(remote_label['category'],
remote_label['value'],
remote_label['description'])
for key in remote_approval_keys.intersection(local_approval_keys):
local_approval = local_approvals[key]
remote_approval = remote_approval_entries[key]
local_approval.value = remote_approval['value']
# For the side effect of updating account info:
account = session.getAccountByID(remote_approval['_account_id'],
name=remote_approval.get('name'),
username=remote_approval.get('username'),
email=remote_approval.get('email'))
remote_permitted_entries = {}
for remote_label_name, remote_label_values in remote_change.get('permitted_labels', {}).items():
for remote_label_value in remote_label_values:
remote_label = dict(category=remote_label_name,
value=remote_label_value)
key = '%s~%s' % (remote_label['category'], remote_label['value'])
remote_permitted_entries[key] = remote_label
remote_permitted_keys = set(remote_permitted_entries.keys())
local_permitted = {}
for permitted in change.permitted_labels:
key = '%s~%s' % (permitted.category, permitted.value)
local_permitted[key] = permitted
local_permitted_keys = set(local_permitted.keys())
for key in local_permitted_keys-remote_permitted_keys:
session.delete(local_permitted[key])
for key in remote_permitted_keys-local_permitted_keys:
remote_permitted = remote_permitted_entries[key]
change.createPermittedLabel(remote_permitted['category'],
remote_permitted['value'])
if not user_voted:
# Only consider changing the reviewed state if we don't have a vote
if new_revision or new_message:
if change.reviewed:
change.reviewed = False
result.review_flag_changed = True
app.project_cache.clear(change.project)
change.outdated = False
for url, refs in fetches.items():
self.log.debug("Fetching from %s with refs %s", url, refs)
try:
repo.fetch(url, refs)
except Exception:
# Backwards compat with GitPython before the multi-ref fetch
# patch.
# (https://github.com/gitpython-developers/GitPython/pull/170)
for ref in refs:
self.log.debug("git fetch %s %s" % (url, ref))
repo.fetch(url, ref)
class CheckReposTask(Task):
# on startup, check all projects
# for any subscribed project withot a local repo or if
# --fetch-missing-refs is supplied, check all local changes for
# missing refs, and sync the associated changes
def __repr__(self):
return '<CheckReposTask>'
def __eq__(self, other):
if (other.__class__ == self.__class__):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
projects = session.getProjects(subscribed=True)
for project in projects:
try:
missing = False
try:
repo = gitrepo.get_repo(project.name, app.config)
except gitrepo.GitCloneError:
missing = True
if missing or app.fetch_missing_refs:
sync.submitTask(
CheckRevisionsTask(project.key,
force_fetch=app.fetch_missing_refs,
priority=LOW_PRIORITY)
)
except Exception:
self.log.exception("Exception checking repo %s" %
(project.name,))
class CheckRevisionsTask(Task):
def __init__(self, project_key, force_fetch=False,
priority=NORMAL_PRIORITY):
super(CheckRevisionsTask, self).__init__(priority)
self.project_key = project_key
self.force_fetch = force_fetch
def __repr__(self):
return '<CheckRevisionsTask %s>' % (self.project_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.project_key == self.project_key):
return True
return False
def run(self, sync):
app = sync.app
to_sync = set()
with app.db.getSession() as session:
project = session.getProject(self.project_key)
repo = None
try:
repo = gitrepo.get_repo(project.name, app.config)
except gitrepo.GitCloneError:
pass
for change in project.open_changes:
if repo:
for revision in change.revisions:
if repo.checkCommits([revision.parent, revision.commit]):
to_sync.add(change.id)
else:
to_sync.add(change.id)
for change_id in to_sync:
sync.submitTask(SyncChangeTask(change_id,
force_fetch=self.force_fetch,
priority=self.priority))
class UploadReviewsTask(Task):
def __repr__(self):
return '<UploadReviewsTask>'
def __eq__(self, other):
if (other.__class__ == self.__class__):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
for c in session.getPendingTopics():
sync.submitTask(SetTopicTask(c.key, self.priority))
for c in session.getPendingRebases():
sync.submitTask(RebaseChangeTask(c.key, self.priority))
for c in session.getPendingStatusChanges():
sync.submitTask(ChangeStatusTask(c.key, self.priority))
for c in session.getPendingStarred():
sync.submitTask(ChangeStarredTask(c.key, self.priority))
for c in session.getPendingCherryPicks():
sync.submitTask(SendCherryPickTask(c.key, self.priority))
for r in session.getPendingCommitMessages():
sync.submitTask(ChangeCommitMessageTask(r.key, self.priority))
for m in session.getPendingMessages():
sync.submitTask(UploadReviewTask(m.key, self.priority))
class SetTopicTask(Task):
def __init__(self, change_key, priority=NORMAL_PRIORITY):
super(SetTopicTask, self).__init__(priority)
self.change_key = change_key
def __repr__(self):
return '<SetTopicTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
change = session.getChange(self.change_key)
data = dict(topic=change.topic)
change.pending_topic = False
# Inside db session for rollback
sync.put('changes/%s/topic' % (change.id,),
data)
sync.submitTask(SyncChangeTask(change.id, priority=self.priority))
class RebaseChangeTask(Task):
def __init__(self, change_key, priority=NORMAL_PRIORITY):
super(RebaseChangeTask, self).__init__(priority)
self.change_key = change_key
def __repr__(self):
return '<RebaseChangeTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
change = session.getChange(self.change_key)
change.pending_rebase = False
# Inside db session for rollback
sync.post('changes/%s/rebase' % (change.id,), {})
sync.submitTask(SyncChangeTask(change.id, priority=self.priority))
class ChangeStarredTask(Task):
def __init__(self, change_key, priority=NORMAL_PRIORITY):
super(ChangeStarredTask, self).__init__(priority)
self.change_key = change_key
def __repr__(self):
return '<ChangeStarredTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
change = session.getChange(self.change_key)
if change.starred:
sync.put('accounts/self/starred.changes/%s' % (change.id,),
data={})
else:
sync.delete('accounts/self/starred.changes/%s' % (change.id,),
data={})
change.pending_starred = False
sync.submitTask(SyncChangeTask(change.id, priority=self.priority))
class ChangeStatusTask(Task):
def __init__(self, change_key, priority=NORMAL_PRIORITY):
super(ChangeStatusTask, self).__init__(priority)
self.change_key = change_key
def __repr__(self):
return '<ChangeStatusTask %s>' % (self.change_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.change_key == self.change_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
change = session.getChange(self.change_key)
if change.pending_status_message:
data = dict(message=change.pending_status_message)
else:
data = {}
change.pending_status = False
change.pending_status_message = None
# Inside db session for rollback
if change.status == 'ABANDONED':
sync.post('changes/%s/abandon' % (change.id,),
data)
elif change.status == 'NEW':
sync.post('changes/%s/restore' % (change.id,),
data)
elif change.status == 'SUBMITTED':
sync.post('changes/%s/submit' % (change.id,), {})
sync.submitTask(SyncChangeTask(change.id, priority=self.priority))
class SendCherryPickTask(Task):
def __init__(self, cp_key, priority=NORMAL_PRIORITY):
super(SendCherryPickTask, self).__init__(priority)
self.cp_key = cp_key
def __repr__(self):
return '<SendCherryPickTask %s>' % (self.cp_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.cp_key == self.cp_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
cp = session.getPendingCherryPick(self.cp_key)
data = dict(message=cp.message,
destination=cp.branch)
session.delete(cp)
# Inside db session for rollback
ret = sync.post('changes/%s/revisions/%s/cherrypick' %
(cp.revision.change.id, cp.revision.commit),
data)
if ret and 'id' in ret:
sync.submitTask(SyncChangeTask(ret['id'], priority=self.priority))
class ChangeCommitMessageTask(Task):
def __init__(self, revision_key, priority=NORMAL_PRIORITY):
super(ChangeCommitMessageTask, self).__init__(priority)
self.revision_key = revision_key
def __repr__(self):
return '<ChangeCommitMessageTask %s>' % (self.revision_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.revision_key == self.revision_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
revision = session.getRevision(self.revision_key)
revision.pending_message = False
data = dict(message=revision.message)
# Inside db session for rollback
if sync.version < (2,11,0):
sync.post('changes/%s/revisions/%s/message' %
(revision.change.id, revision.commit),
data)
else:
edit = sync.get('changes/%s/edit' % revision.change.id)
if edit is not None:
raise Exception("Edit already in progress on change %s" %
(revision.change.number,))
sync.put('changes/%s/edit:message' % (revision.change.id,), data)
sync.post('changes/%s/edit:publish' % (revision.change.id,), {})
change_id = revision.change.id
sync.submitTask(SyncChangeTask(change_id, priority=self.priority))
class UploadReviewTask(Task):
def __init__(self, message_key, priority=NORMAL_PRIORITY):
super(UploadReviewTask, self).__init__(priority)
self.message_key = message_key
def __repr__(self):
return '<UploadReviewTask %s>' % (self.message_key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.message_key == self.message_key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
message = session.getMessage(self.message_key)
if message is None:
self.log.debug("Message %s has already been uploaded" % (
self.message_key))
return
change = message.revision.change
if not change.held:
self.log.debug("Syncing %s to find out if it should be held" % (change.id,))
t = SyncChangeTask(change.id)
t.run(sync)
self.results += t.results
submit = False
change_id = None
with app.db.getSession() as session:
message = session.getMessage(self.message_key)
revision = message.revision
change = message.revision.change
if change.held:
self.log.debug("Not uploading review to %s because it is held" %
(change.id,))
return
change_id = change.id
current_revision = change.revisions[-1]
if change.pending_status and change.status == 'SUBMITTED':
submit = True
data = dict(message=message.message,
strict_labels=False)
if revision == current_revision:
data['labels'] = {}
for approval in change.draft_approvals:
data['labels'][approval.category] = approval.value
session.delete(approval)
comments = {}
for file in revision.files:
if file.draft_comments:
comment_list = []
for comment in file.draft_comments:
d = dict(line=comment.line,
message=comment.message)
if comment.parent:
d['side'] = 'PARENT'
comment_list.append(d)
session.delete(comment)
comments[file.path] = comment_list
if comments:
data['comments'] = comments
session.delete(message)
# Inside db session for rollback
sync.post('changes/%s/revisions/%s/review' % (change.id, revision.commit),
data)
if submit:
# In another db session in case submit fails after posting
# the message succeeds
with app.db.getSession() as session:
change = session.getChangeByID(change_id)
change.pending_status = False
change.pending_status_message = None
sync.post('changes/%s/submit' % (change_id,), {})
sync.submitTask(SyncChangeTask(change_id, priority=self.priority))
class PruneDatabaseTask(Task):
def __init__(self, age, priority=NORMAL_PRIORITY):
super(PruneDatabaseTask, self).__init__(priority)
self.age = age
def __repr__(self):
return '<PruneDatabaseTask %s>' % (self.age,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.age == self.age):
return True
return False
def run(self, sync):
if not self.age:
return
app = sync.app
with app.db.getSession() as session:
for change in session.getChanges('status:closed age:%s' % self.age):
t = PruneChangeTask(change.key, priority=self.priority)
self.tasks.append(t)
sync.submitTask(t)
t = VacuumDatabaseTask(priority=self.priority)
self.tasks.append(t)
sync.submitTask(t)
class PruneChangeTask(Task):
def __init__(self, key, priority=NORMAL_PRIORITY):
super(PruneChangeTask, self).__init__(priority)
self.key = key
def __repr__(self):
return '<PruneChangeTask %s>' % (self.key,)
def __eq__(self, other):
if (other.__class__ == self.__class__ and
other.key == self.key):
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
change = session.getChange(self.key)
if not change:
return
repo = gitrepo.get_repo(change.project.name, app.config)
self.log.info("Pruning %s change %s status:%s updated:%s" % (
change.project.name, change.number, change.status, change.updated))
change_ref = None
for revision in change.revisions:
if change_ref is None:
change_ref = '/'.join(revision.fetch_ref.split('/')[:-1])
self.log.info("Deleting %s ref %s" % (
change.project.name, revision.fetch_ref))
repo.deleteRef(revision.fetch_ref)
self.log.info("Deleting %s ref %s" % (
change.project.name, change_ref))
try:
repo.deleteRef(change_ref)
except OSError as e:
if e.errno not in [errno.EISDIR, errno.EPERM]:
raise
session.delete(change)
class VacuumDatabaseTask(Task):
def __init__(self, priority=NORMAL_PRIORITY):
super(VacuumDatabaseTask, self).__init__(priority)
def __repr__(self):
return '<VacuumDatabaseTask>'
def __eq__(self, other):
if other.__class__ == self.__class__:
return True
return False
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
session.vacuum()
class Sync(object):
def __init__(self, app, disable_background_sync):
self.user_agent = 'Gertty/%s %s' % (gertty.version.version_info.release_string(),
requests.utils.default_user_agent())
self.version = (0, 0, 0)
self.offline = False
self.account_id = None
self.app = app
self.log = logging.getLogger('gertty.sync')
self.queue = MultiQueue([HIGH_PRIORITY, NORMAL_PRIORITY, LOW_PRIORITY])
self.result_queue = queue.Queue()
self.session = requests.Session()
if self.app.config.auth_type == 'basic':
authclass = requests.auth.HTTPBasicAuth
elif self.app.config.auth_type == 'form':
authclass = FormAuth
else:
authclass = requests.auth.HTTPDigestAuth
self.auth = authclass(
self.app.config.username, self.app.config.password)
self.submitTask(GetVersionTask(HIGH_PRIORITY))
self.submitTask(SyncOwnAccountTask(HIGH_PRIORITY))
if not disable_background_sync:
self.submitTask(CheckReposTask(HIGH_PRIORITY))
self.submitTask(UploadReviewsTask(HIGH_PRIORITY))
self.submitTask(SyncProjectListTask(HIGH_PRIORITY))
self.submitTask(SyncSubscribedProjectsTask(NORMAL_PRIORITY))
self.submitTask(SyncSubscribedProjectBranchesTask(LOW_PRIORITY))
self.submitTask(SyncOutdatedChangesTask(LOW_PRIORITY))
self.submitTask(PruneDatabaseTask(self.app.config.expire_age, LOW_PRIORITY))
self.periodic_thread = threading.Thread(target=self.periodicSync)
self.periodic_thread.daemon = True
self.periodic_thread.start()
def periodicSync(self):
hourly = time.time()
while True:
try:
time.sleep(60)
self.syncSubscribedProjects()
now = time.time()
if now-hourly > 3600:
hourly = now
self.pruneDatabase()
self.syncOutdatedChanges()
except Exception:
self.log.exception('Exception in periodicSync')
def submitTask(self, task):
if not self.offline:
if not self.queue.put(task, task.priority):
task.complete(False)
else:
task.complete(False)
def run(self, pipe):
task = None
while True:
task = self._run(pipe, task)
def _run(self, pipe, task=None):
if not task:
task = self.queue.get()
self.log.debug('Run: %s' % (task,))
try:
task.run(self)
task.complete(True)
self.queue.complete(task)
except (requests.ConnectionError, OfflineError,
requests.exceptions.ChunkedEncodingError,
requests.exceptions.ReadTimeout
) as e:
self.log.warning("Offline due to: %s" % (e,))
if not self.offline:
self.submitTask(GetVersionTask(HIGH_PRIORITY))
self.submitTask(UploadReviewsTask(HIGH_PRIORITY))
self.offline = True
self.app.status.update(offline=True, refresh=False)
os.write(pipe, six.b('refresh\n'))
time.sleep(30)
return task
except Exception:
task.complete(False)
self.queue.complete(task)
self.log.exception('Exception running task %s' % (task,))
self.app.status.update(error=True, refresh=False)
self.offline = False
self.app.status.update(offline=False, refresh=False)
for r in task.results:
self.result_queue.put(r)
os.write(pipe, six.b('refresh\n'))
return None
def url(self, path):
return self.app.config.url + 'a/' + path
def checkResponse(self, response):
self.log.debug('HTTP status code: %d', response.status_code)
if response.status_code == 503:
raise OfflineError("Received 503 status code")
def get(self, path):
url = self.url(path)
self.log.debug('GET: %s' % (url,))
r = self.session.get(url,
verify=self.app.config.verify_ssl,
auth=self.auth, timeout=TIMEOUT,
headers = {'Accept': 'application/json',
'Accept-Encoding': 'gzip',
'User-Agent': self.user_agent})
self.checkResponse(r)
if r.status_code == 200:
ret = json.loads(r.text[4:])
if len(ret):
self.log.debug('200 OK, Received: %s' % (ret,))
else:
self.log.debug('200 OK, No body.')
return ret
def post(self, path, data):
url = self.url(path)
self.log.debug('POST: %s' % (url,))
self.log.debug('data: %s' % (data,))
r = self.session.post(url, data=json.dumps(data).encode('utf8'),
verify=self.app.config.verify_ssl,
auth=self.auth, timeout=TIMEOUT,
headers = {'Content-Type': 'application/json;charset=UTF-8',
'User-Agent': self.user_agent})
self.checkResponse(r)
self.log.debug('Received: %s' % (r.text,))
ret = None
if r.status_code > 400:
raise Exception("POST to %s failed with http code %s (%s)",
path, r.status_code, r.text)
if r.text and len(r.text)>4:
try:
ret = json.loads(r.text[4:])
except Exception:
self.log.exception("Unable to parse result %s from post to %s" %
(r.text, url))
raise
return ret
def put(self, path, data):
url = self.url(path)
self.log.debug('PUT: %s' % (url,))
self.log.debug('data: %s' % (data,))
r = self.session.put(url, data=json.dumps(data).encode('utf8'),
verify=self.app.config.verify_ssl,
auth=self.auth, timeout=TIMEOUT,
headers = {'Content-Type': 'application/json;charset=UTF-8',
'User-Agent': self.user_agent})
self.checkResponse(r)
self.log.debug('Received: %s' % (r.text,))
def delete(self, path, data):
url = self.url(path)
self.log.debug('DELETE: %s' % (url,))
self.log.debug('data: %s' % (data,))
r = self.session.delete(url, data=json.dumps(data).encode('utf8'),
verify=self.app.config.verify_ssl,
auth=self.auth, timeout=TIMEOUT,
headers = {'Content-Type': 'application/json;charset=UTF-8',
'User-Agent': self.user_agent})
self.checkResponse(r)
self.log.debug('Received: %s' % (r.text,))
def syncSubscribedProjects(self):
task = SyncSubscribedProjectsTask(LOW_PRIORITY)
self.submitTask(task)
if task.wait():
for subtask in task.tasks:
subtask.wait()
def pruneDatabase(self):
task = PruneDatabaseTask(self.app.config.expire_age, LOW_PRIORITY)
self.submitTask(task)
if task.wait():
for subtask in task.tasks:
subtask.wait()
def syncOutdatedChanges(self):
task = SyncOutdatedChangesTask(LOW_PRIORITY)
self.submitTask(task)
if task.wait():
for subtask in task.tasks:
subtask.wait()
def _syncChangeByCommit(self, commit, priority):
# Accumulate sync change by commit tasks because they often
# come in batches. This method assumes it is being called
# from within the run queue already and therefore does not
# need to worry about locking the queue.
task = None
for task in self.queue.find(SyncChangesByCommitsTask, priority):
if task.addCommit(commit):
return
task = SyncChangesByCommitsTask([commit], priority)
self.submitTask(task)
def setRemoteVersion(self, version):
base = version.split('-')[0]
parts = base.split('.')
major = minor = micro = 0
if len(parts) > 0:
major = int(parts[0])
if len(parts) > 1:
minor = int(parts[1])
if len(parts) > 2:
micro = int(parts[2])
self.version = (major, minor, micro)
self.log.info("Remote version is: %s (parsed as %s)" % (version, self.version))
def query(self, queries):
changes = []
sortkey = ''
done = False
offset = 0
while not done:
query = '&'.join(queries)
# We don't actually want to limit to 500, but that's the server-side default, and
# if we don't specify this, we won't get a _more_changes flag.
q = 'changes/?n=500%s&%s' % (sortkey, query)
self.log.debug('Query: %s' % (q,))
responses = self.get(q)
if len(queries) == 1:
responses = [responses]
done = True
for batch in responses:
changes += batch
if batch and '_more_changes' in batch[-1]:
done = False
if '_sortkey' in batch[-1]:
sortkey = '&N=%s' % (batch[-1]['_sortkey'],)
else:
offset += len(batch)
sortkey = '&start=%s' % (offset,)
return changes