boartty/gertty/sync.py

462 lines
20 KiB
Python

# Copyright 2014 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
import math
import os
import threading
import urlparse
import json
import time
import Queue
import datetime
import dateutil.parser
try:
import ordereddict
except:
pass
import requests
HIGH_PRIORITY=0
NORMAL_PRIORITY=1
LOW_PRIORITY=2
class MultiQueue(object):
def __init__(self, priorities):
try:
self.queues = collections.OrderedDict()
except AttributeError:
self.queues = ordereddict.OrderedDict()
for key in priorities:
self.queues[key] = collections.deque()
self.condition = threading.Condition()
def qsize(self):
count = 0
for queue in self.queues.values():
count += len(queue)
return count
def put(self, item, priority):
self.condition.acquire()
try:
self.queues[priority].append(item)
self.condition.notify()
finally:
self.condition.release()
def get(self):
self.condition.acquire()
try:
while True:
for queue in self.queues.values():
try:
ret = queue.popleft()
return ret
except IndexError:
pass
self.condition.wait()
finally:
self.condition.release()
class Task(object):
def __init__(self, priority=NORMAL_PRIORITY):
self.log = logging.getLogger('gertty.sync')
self.priority = priority
self.succeeded = None
self.event = threading.Event()
def complete(self, success):
self.succeeded = success
self.event.set()
def wait(self):
self.event.wait()
class SyncProjectListTask(Task):
def __repr__(self):
return '<SyncProjectListTask>'
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
remote = sync.get('projects/?d')
remote_keys = set(remote.keys())
local = {}
for p in session.getProjects():
local[p.name] = p
local_keys = set(local.keys())
for name in local_keys-remote_keys:
session.delete(local[name])
for name in remote_keys-local_keys:
p = remote[name]
session.createProject(name, description=p.get('description', ''))
class SyncSubscribedProjectsTask(Task):
def __repr__(self):
return '<SyncSubscribedProjectsTask>'
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
for p in session.getProjects(subscribed=True):
sync.submitTask(SyncProjectTask(p.key, self.priority))
class SyncProjectTask(Task):
_closed_statuses = ['MERGED', 'ABANDONED']
def __init__(self, project_key, priority=NORMAL_PRIORITY):
super(SyncProjectTask, self).__init__(priority)
self.project_key = project_key
def __repr__(self):
return '<SyncProjectTask %s>' % (self.project_key,)
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
project = session.getProject(self.project_key)
query = 'project:%s' % project.name
if project.updated:
query += ' -age:%ss' % (int(math.ceil((datetime.datetime.utcnow()-project.updated).total_seconds())) + 0,)
changes = sync.get('changes/?q=%s' % query)
self.log.debug('Query: %s ' % (query,))
for c in reversed(changes):
# The list we get is newest to oldest; if we are
# interrupted, we will have already synced the newest
# change and a subsequent sync will not catch up the
# old ones. So reverse the list before we process it
# so that the updated time is accurate.
# For now, just sync open changes or changes already
# in the db optionally we could sync all changes ever
change = session.getChangeByID(c['id'])
if change or (c['status'] not in self._closed_statuses):
sync.submitTask(SyncChangeTask(c['id'], self.priority))
self.log.debug("Change %s update %s" % (c['id'], c['updated']))
class SyncChangeTask(Task):
def __init__(self, change_id, priority=NORMAL_PRIORITY):
super(SyncChangeTask, self).__init__(priority)
self.change_id = change_id
def __repr__(self):
return '<SyncChangeTask %s>' % (self.change_id,)
def run(self, sync):
app = sync.app
remote_change = sync.get('changes/%s?o=DETAILED_LABELS&o=ALL_REVISIONS&o=ALL_COMMITS&o=MESSAGES&o=DETAILED_ACCOUNTS' % self.change_id)
fetches = []
with app.db.getSession() as session:
change = session.getChangeByID(self.change_id)
if not change:
project = session.getProjectByName(remote_change['project'])
created = dateutil.parser.parse(remote_change['created'])
updated = dateutil.parser.parse(remote_change['updated'])
change = project.createChange(remote_change['id'], remote_change['_number'],
remote_change['branch'], remote_change['change_id'],
remote_change['owner']['name'],
remote_change['subject'], created,
updated, remote_change['status'],
topic=remote_change.get('topic'))
change.status = remote_change['status']
change.subject = remote_change['subject']
change.updated = dateutil.parser.parse(remote_change['updated'])
change.topic = remote_change.get('topic')
repo = app.getRepo(change.project.name)
new_revision = False
for remote_commit, remote_revision in remote_change.get('revisions', {}).items():
revision = session.getRevisionByCommit(remote_commit)
if not revision:
# TODO: handle multiple parents
url = sync.app.config.url + change.project.name
if 'anonymous http' in remote_revision['fetch']:
ref = remote_revision['fetch']['anonymous http']['ref']
else:
ref = remote_revision['fetch']['http']['ref']
url = list(urlparse.urlsplit(url))
url[1] = '%s:%s@%s' % (sync.app.config.username,
sync.app.config.password, url[1])
url = urlparse.urlunsplit(url)
fetches.append((url, ref))
revision = change.createRevision(remote_revision['_number'],
remote_revision['commit']['message'], remote_commit,
remote_revision['commit']['parents'][0]['commit'])
new_revision = True
remote_comments = sync.get('changes/%s/revisions/%s/comments' % (self.change_id, revision.commit))
for remote_file, remote_comments in remote_comments.items():
for remote_comment in remote_comments:
comment = session.getCommentByID(remote_comment['id'])
if not comment:
# Normalize updated -> created
created = dateutil.parser.parse(remote_comment['updated'])
parent = False
if remote_comment.get('side', '') == 'PARENT':
parent = True
comment = revision.createComment(remote_comment['id'],
remote_comment.get('in_reply_to'),
created, remote_comment['author']['name'],
remote_file, parent, remote_comment.get('line'),
remote_comment['message'])
new_message = False
for remote_message in remote_change.get('messages', []):
message = session.getMessageByID(remote_message['id'])
if not message:
revision = session.getRevisionByNumber(change, remote_message['_revision_number'])
# Normalize date -> created
created = dateutil.parser.parse(remote_message['date'])
if 'author' in remote_message:
author_name = remote_message['author']['name']
if remote_message['author']['username'] != app.config.username:
new_message = True
else:
author_name = 'Gerrit Code Review'
message = revision.createMessage(remote_message['id'], created,
author_name,
remote_message['message'])
remote_approval_entries = {}
remote_label_entries = {}
user_voted = False
for remote_label_name, remote_label_dict in remote_change.get('labels', {}).items():
for remote_approval in remote_label_dict.get('all', []):
if remote_approval.get('value') is None:
continue
remote_approval['category'] = remote_label_name
key = '%s~%s' % (remote_approval['category'], remote_approval['name'])
remote_approval_entries[key] = remote_approval
if remote_approval.get('username', None) == app.config.username and int(remote_approval['value']) != 0:
user_voted = True
for key, value in remote_label_dict.get('values', {}).items():
# +1: "LGTM"
label = dict(value=key,
description=value,
category=remote_label_name)
key = '%s~%s~%s' % (label['category'], label['value'], label['description'])
remote_label_entries[key] = label
remote_approval_keys = set(remote_approval_entries.keys())
remote_label_keys = set(remote_label_entries.keys())
local_approvals = {}
local_labels = {}
for approval in change.approvals:
key = '%s~%s' % (approval.category, approval.name)
local_approvals[key] = approval
local_approval_keys = set(local_approvals.keys())
for label in change.labels:
key = '%s~%s~%s' % (label.category, label.value, label.description)
local_labels[key] = label
local_label_keys = set(local_labels.keys())
for key in local_approval_keys-remote_approval_keys:
session.delete(local_approvals[key])
for key in local_label_keys-remote_label_keys:
session.delete(local_labels[key])
for key in remote_approval_keys-local_approval_keys:
remote_approval = remote_approval_entries[key]
change.createApproval(remote_approval['name'],
remote_approval['category'],
remote_approval['value'])
for key in remote_label_keys-local_label_keys:
remote_label = remote_label_entries[key]
change.createLabel(remote_label['category'],
remote_label['value'],
remote_label['description'])
for key in remote_approval_keys.intersection(local_approval_keys):
local_approval = local_approvals[key]
remote_approval = remote_approval_entries[key]
local_approval.value = remote_approval['value']
remote_permitted_entries = {}
for remote_label_name, remote_label_values in remote_change.get('permitted_labels', {}).items():
for remote_label_value in remote_label_values:
remote_label = dict(category=remote_label_name,
value=remote_label_value)
key = '%s~%s' % (remote_label['category'], remote_label['value'])
remote_permitted_entries[key] = remote_label
remote_permitted_keys = set(remote_permitted_entries.keys())
local_permitted = {}
for permitted in change.permitted_labels:
key = '%s~%s' % (permitted.category, permitted.value)
local_permitted[key] = permitted
local_permitted_keys = set(local_permitted.keys())
for key in local_permitted_keys-remote_permitted_keys:
session.delete(local_permitted[key])
for key in remote_permitted_keys-local_permitted_keys:
remote_permitted = remote_permitted_entries[key]
change.createPermittedLabel(remote_permitted['category'],
remote_permitted['value'])
if not user_voted:
# Only consider changing the reviewed state if we don't have a vote
if new_revision or new_message:
change.reviewed = False
for (url, ref) in fetches:
self.log.debug("git fetch %s %s" % (url, ref))
repo.fetch(url, ref)
class UploadReviewsTask(Task):
def __repr__(self):
return '<UploadReviewsTask>'
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
for m in session.getPendingMessages():
sync.submitTask(UploadReviewTask(m.key, self.priority))
class UploadReviewTask(Task):
def __init__(self, message_key, priority=NORMAL_PRIORITY):
super(UploadReviewTask, self).__init__(priority)
self.message_key = message_key
def __repr__(self):
return '<UploadReviewTask %s>' % (self.message_key,)
def run(self, sync):
app = sync.app
with app.db.getSession() as session:
message = session.getMessage(self.message_key)
revision = message.revision
change = message.revision.change
current_revision = change.revisions[-1]
data = dict(message=message.message,
strict_labels=False)
if revision == current_revision:
data['labels'] = {}
for approval in change.pending_approvals:
data['labels'][approval.category] = approval.value
session.delete(approval)
if revision.pending_comments:
data['comments'] = {}
last_file = None
comment_list = []
for comment in revision.pending_comments:
if comment.file != last_file:
last_file = comment.file
comment_list = []
data['comments'][comment.file] = comment_list
d = dict(line=comment.line,
message=comment.message)
if comment.parent:
d['side'] = 'PARENT'
comment_list.append(d)
session.delete(comment)
session.delete(message)
sync.post('changes/%s/revisions/%s/review' % (change.id, revision.commit),
data)
sync.submitTask(SyncChangeTask(change.id, self.priority))
class Sync(object):
def __init__(self, app):
self.offline = False
self.app = app
self.log = logging.getLogger('gertty.sync')
self.queue = MultiQueue([HIGH_PRIORITY, NORMAL_PRIORITY, LOW_PRIORITY])
self.submitTask(SyncProjectListTask(HIGH_PRIORITY))
self.submitTask(SyncSubscribedProjectsTask(HIGH_PRIORITY))
self.submitTask(UploadReviewsTask(HIGH_PRIORITY))
self.periodic_thread = threading.Thread(target=self.periodicSync)
self.periodic_thread.daemon = True
self.periodic_thread.start()
def periodicSync(self):
while True:
try:
time.sleep(60)
self.syncSubscribedProjects()
except Exception:
self.log.exception('Exception in periodicSync')
def submitTask(self, task):
if not self.offline:
self.queue.put(task, task.priority)
def run(self, pipe):
task = None
while True:
task = self._run(pipe, task)
def _run(self, pipe, task=None):
if not task:
task = self.queue.get()
self.log.debug('Run: %s' % (task,))
try:
task.run(self)
task.complete(True)
except requests.ConnectionError, e:
self.log.warning("Offline due to: %s" % (e,))
if not self.offline:
self.submitTask(SyncSubscribedProjectsTask(HIGH_PRIORITY))
self.submitTask(UploadReviewsTask(HIGH_PRIORITY))
self.offline = True
self.app.status.update(offline=True)
os.write(pipe, 'refresh\n')
time.sleep(30)
return task
except Exception:
task.complete(False)
self.log.exception('Exception running task %s' % (task,))
self.app.status.update(error=True)
self.offline = False
self.app.status.update(offline=False)
os.write(pipe, 'refresh\n')
return None
def url(self, path):
return self.app.config.url + 'a/' + path
def get(self, path):
url = self.url(path)
self.log.debug('GET: %s' % (url,))
r = requests.get(url,
verify=self.app.config.verify_ssl,
auth=requests.auth.HTTPDigestAuth(self.app.config.username,
self.app.config.password),
headers = {'Accept': 'application/json',
'Accept-Encoding': 'gzip'})
self.log.debug('Received: %s' % (r.text,))
ret = json.loads(r.text[4:])
return ret
def post(self, path, data):
url = self.url(path)
self.log.debug('POST: %s' % (url,))
self.log.debug('data: %s' % (data,))
r = requests.post(url, data=json.dumps(data).encode('utf8'),
verify=self.app.config.verify_ssl,
auth=requests.auth.HTTPDigestAuth(self.app.config.username,
self.app.config.password),
headers = {'Content-Type': 'application/json;charset=UTF-8'})
self.log.debug('Received: %s' % (r.text,))
def syncSubscribedProjects(self):
keys = []
with self.app.db.getSession() as session:
for p in session.getProjects(subscribed=True):
keys.append(p.key)
for key in keys:
t = SyncProjectTask(key, LOW_PRIORITY)
self.submitTask(t)
t.wait()