# Copyright 2014 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import collections import logging import math import os import threading import urlparse import json import time import Queue import datetime import dateutil.parser import ordereddict import requests HIGH_PRIORITY=0 NORMAL_PRIORITY=1 LOW_PRIORITY=2 class MultiQueue(object): def __init__(self, priorities): try: self.queues = collections.OrderedDict() except AttributeError: self.queues = ordereddict.OrderedDict() for key in priorities: self.queues[key] = collections.deque() self.condition = threading.Condition() def qsize(self): count = 0 for queue in self.queues.values(): count += len(queue) return count def put(self, item, priority): self.condition.acquire() try: self.queues[priority].append(item) self.condition.notify() finally: self.condition.release() def get(self): self.condition.acquire() try: while True: for queue in self.queues.values(): try: ret = queue.popleft() return ret except IndexError: pass self.condition.wait() finally: self.condition.release() class Task(object): def __init__(self, priority=NORMAL_PRIORITY): self.log = logging.getLogger('gertty.sync') self.priority = priority self.succeeded = None self.event = threading.Event() def complete(self, success): self.succeeded = success self.event.set() def wait(self): self.event.wait() class SyncProjectListTask(Task): def __repr__(self): return '' def run(self, sync): app = sync.app with app.db.getSession() as session: remote = sync.get('projects/?d') remote_keys = set(remote.keys()) local = {} for p in session.getProjects(): local[p.name] = p local_keys = set(local.keys()) for name in local_keys-remote_keys: session.delete(local[name]) for name in remote_keys-local_keys: p = remote[name] session.createProject(name, description=p.get('description', '')) class SyncSubscribedProjectsTask(Task): def __repr__(self): return '' def run(self, sync): app = sync.app with app.db.getSession() as session: for p in session.getProjects(subscribed=True): sync.submitTask(SyncProjectTask(p.key, self.priority)) class SyncProjectTask(Task): _closed_statuses = ['MERGED', 'ABANDONED'] def __init__(self, project_key, priority=NORMAL_PRIORITY): super(SyncProjectTask, self).__init__(priority) self.project_key = project_key def __repr__(self): return '' % (self.project_key,) def run(self, sync): app = sync.app with app.db.getSession() as session: project = session.getProject(self.project_key) query = 'project:%s' % project.name if project.updated: query += ' -age:%ss' % (int(math.ceil((datetime.datetime.utcnow()-project.updated).total_seconds())) + 0,) changes = sync.get('changes/?q=%s' % query) self.log.debug('Query: %s ' % (query,)) for c in reversed(changes): # The list we get is newest to oldest; if we are # interrupted, we will have already synced the newest # change and a subsequent sync will not catch up the # old ones. So reverse the list before we process it # so that the updated time is accurate. # For now, just sync open changes or changes already # in the db optionally we could sync all changes ever change = session.getChangeByID(c['id']) if change or (c['status'] not in self._closed_statuses): sync.submitTask(SyncChangeTask(c['id'], self.priority)) self.log.debug("Change %s update %s" % (c['id'], c['updated'])) class SyncChangeTask(Task): def __init__(self, change_id, priority=NORMAL_PRIORITY): super(SyncChangeTask, self).__init__(priority) self.change_id = change_id def __repr__(self): return '' % (self.change_id,) def run(self, sync): app = sync.app remote_change = sync.get('changes/%s?o=DETAILED_LABELS&o=ALL_REVISIONS&o=ALL_COMMITS&o=MESSAGES&o=DETAILED_ACCOUNTS' % self.change_id) fetches = [] with app.db.getSession() as session: change = session.getChangeByID(self.change_id) if not change: project = session.getProjectByName(remote_change['project']) created = dateutil.parser.parse(remote_change['created']) updated = dateutil.parser.parse(remote_change['updated']) change = project.createChange(remote_change['id'], remote_change['_number'], remote_change['branch'], remote_change['change_id'], remote_change['owner']['name'], remote_change['subject'], created, updated, remote_change['status'], topic=remote_change.get('topic')) change.status = remote_change['status'] change.subject = remote_change['subject'] change.updated = dateutil.parser.parse(remote_change['updated']) change.topic = remote_change.get('topic') repo = app.getRepo(change.project.name) new_revision = False for remote_commit, remote_revision in remote_change.get('revisions', {}).items(): revision = session.getRevisionByCommit(remote_commit) if not revision: # TODO: handle multiple parents url = sync.app.config.url + change.project.name if 'anonymous http' in remote_revision['fetch']: ref = remote_revision['fetch']['anonymous http']['ref'] else: ref = remote_revision['fetch']['http']['ref'] url = list(urlparse.urlsplit(url)) url[1] = '%s:%s@%s' % (sync.app.config.username, sync.app.config.password, url[1]) url = urlparse.urlunsplit(url) fetches.append((url, ref)) revision = change.createRevision(remote_revision['_number'], remote_revision['commit']['message'], remote_commit, remote_revision['commit']['parents'][0]['commit']) new_revision = True remote_comments = sync.get('changes/%s/revisions/%s/comments' % (self.change_id, revision.commit)) for remote_file, remote_comments in remote_comments.items(): for remote_comment in remote_comments: comment = session.getCommentByID(remote_comment['id']) if not comment: # Normalize updated -> created created = dateutil.parser.parse(remote_comment['updated']) parent = False if remote_comment.get('side', '') == 'PARENT': parent = True comment = revision.createComment(remote_comment['id'], remote_comment.get('in_reply_to'), created, remote_comment['author']['name'], remote_file, parent, remote_comment.get('line'), remote_comment['message']) new_message = False for remote_message in remote_change.get('messages', []): message = session.getMessageByID(remote_message['id']) if not message: revision = session.getRevisionByNumber(change, remote_message['_revision_number']) # Normalize date -> created created = dateutil.parser.parse(remote_message['date']) if 'author' in remote_message: author_name = remote_message['author']['name'] if remote_message['author']['username'] != app.config.username: new_message = True else: author_name = 'Gerrit Code Review' message = revision.createMessage(remote_message['id'], created, author_name, remote_message['message']) remote_approval_entries = {} remote_label_entries = {} user_voted = False for remote_label_name, remote_label_dict in remote_change.get('labels', {}).items(): for remote_approval in remote_label_dict.get('all', []): if remote_approval.get('value') is None: continue remote_approval['category'] = remote_label_name key = '%s~%s' % (remote_approval['category'], remote_approval['name']) remote_approval_entries[key] = remote_approval if remote_approval.get('username', None) == app.config.username and int(remote_approval['value']) != 0: user_voted = True for key, value in remote_label_dict.get('values', {}).items(): # +1: "LGTM" label = dict(value=key, description=value, category=remote_label_name) key = '%s~%s~%s' % (label['category'], label['value'], label['description']) remote_label_entries[key] = label remote_approval_keys = set(remote_approval_entries.keys()) remote_label_keys = set(remote_label_entries.keys()) local_approvals = {} local_labels = {} for approval in change.approvals: key = '%s~%s' % (approval.category, approval.name) local_approvals[key] = approval local_approval_keys = set(local_approvals.keys()) for label in change.labels: key = '%s~%s~%s' % (label.category, label.value, label.description) local_labels[key] = label local_label_keys = set(local_labels.keys()) for key in local_approval_keys-remote_approval_keys: session.delete(local_approvals[key]) for key in local_label_keys-remote_label_keys: session.delete(local_labels[key]) for key in remote_approval_keys-local_approval_keys: remote_approval = remote_approval_entries[key] change.createApproval(remote_approval['name'], remote_approval['category'], remote_approval['value']) for key in remote_label_keys-local_label_keys: remote_label = remote_label_entries[key] change.createLabel(remote_label['category'], remote_label['value'], remote_label['description']) for key in remote_approval_keys.intersection(local_approval_keys): local_approval = local_approvals[key] remote_approval = remote_approval_entries[key] local_approval.value = remote_approval['value'] remote_permitted_entries = {} for remote_label_name, remote_label_values in remote_change.get('permitted_labels', {}).items(): for remote_label_value in remote_label_values: remote_label = dict(category=remote_label_name, value=remote_label_value) key = '%s~%s' % (remote_label['category'], remote_label['value']) remote_permitted_entries[key] = remote_label remote_permitted_keys = set(remote_permitted_entries.keys()) local_permitted = {} for permitted in change.permitted_labels: key = '%s~%s' % (permitted.category, permitted.value) local_permitted[key] = permitted local_permitted_keys = set(local_permitted.keys()) for key in local_permitted_keys-remote_permitted_keys: session.delete(local_permitted[key]) for key in remote_permitted_keys-local_permitted_keys: remote_permitted = remote_permitted_entries[key] change.createPermittedLabel(remote_permitted['category'], remote_permitted['value']) if not user_voted: # Only consider changing the reviewed state if we don't have a vote if new_revision or new_message: change.reviewed = False for (url, ref) in fetches: self.log.debug("git fetch %s %s" % (url, ref)) repo.fetch(url, ref) class UploadReviewsTask(Task): def __repr__(self): return '' def run(self, sync): app = sync.app with app.db.getSession() as session: for m in session.getPendingMessages(): sync.submitTask(UploadReviewTask(m.key, self.priority)) class UploadReviewTask(Task): def __init__(self, message_key, priority=NORMAL_PRIORITY): super(UploadReviewTask, self).__init__(priority) self.message_key = message_key def __repr__(self): return '' % (self.message_key,) def run(self, sync): app = sync.app with app.db.getSession() as session: message = session.getMessage(self.message_key) revision = message.revision change = message.revision.change current_revision = change.revisions[-1] data = dict(message=message.message, strict_labels=False) if revision == current_revision: data['labels'] = {} for approval in change.pending_approvals: data['labels'][approval.category] = approval.value session.delete(approval) if revision.pending_comments: data['comments'] = {} last_file = None comment_list = [] for comment in revision.pending_comments: if comment.file != last_file: last_file = comment.file comment_list = [] data['comments'][comment.file] = comment_list d = dict(line=comment.line, message=comment.message) if comment.parent: d['side'] = 'PARENT' comment_list.append(d) session.delete(comment) session.delete(message) sync.post('changes/%s/revisions/%s/review' % (change.id, revision.commit), data) sync.submitTask(SyncChangeTask(change.id, self.priority)) class Sync(object): def __init__(self, app): self.offline = False self.app = app self.log = logging.getLogger('gertty.sync') self.queue = MultiQueue([HIGH_PRIORITY, NORMAL_PRIORITY, LOW_PRIORITY]) self.submitTask(SyncProjectListTask(HIGH_PRIORITY)) self.submitTask(SyncSubscribedProjectsTask(HIGH_PRIORITY)) self.submitTask(UploadReviewsTask(HIGH_PRIORITY)) self.periodic_thread = threading.Thread(target=self.periodicSync) self.periodic_thread.start() def periodicSync(self): while True: try: time.sleep(60) self.syncSubscribedProjects() except Exception: self.log.exception('Exception in periodicSync') def submitTask(self, task): if not self.offline: self.queue.put(task, task.priority) def run(self, pipe): task = None while True: task = self._run(pipe, task) def _run(self, pipe, task=None): if not task: task = self.queue.get() self.log.debug('Run: %s' % (task,)) try: task.run(self) task.complete(True) except requests.ConnectionError, e: self.log.warning("Offline due to: %s" % (e,)) if not self.offline: self.submitTask(SyncSubscribedProjectsTask(HIGH_PRIORITY)) self.submitTask(UploadReviewsTask(HIGH_PRIORITY)) self.offline = True self.app.status.update(offline=True) os.write(pipe, 'refresh\n') time.sleep(30) return task except Exception: task.complete(False) self.log.exception('Exception running task %s' % (task,)) self.app.status.update(error=True) self.offline = False self.app.status.update(offline=False) os.write(pipe, 'refresh\n') return None def url(self, path): return self.app.config.url + 'a/' + path def get(self, path): url = self.url(path) self.log.debug('GET: %s' % (url,)) r = requests.get(url, verify=self.app.config.verify_ssl, auth=requests.auth.HTTPDigestAuth(self.app.config.username, self.app.config.password), headers = {'Accept': 'application/json', 'Accept-Encoding': 'gzip'}) self.log.debug('Received: %s' % (r.text,)) ret = json.loads(r.text[4:]) return ret def post(self, path, data): url = self.url(path) self.log.debug('POST: %s' % (url,)) self.log.debug('data: %s' % (data,)) r = requests.post(url, data=json.dumps(data).encode('utf8'), verify=self.app.config.verify_ssl, auth=requests.auth.HTTPDigestAuth(self.app.config.username, self.app.config.password), headers = {'Content-Type': 'application/json;charset=UTF-8'}) self.log.debug('Received: %s' % (r.text,)) def syncSubscribedProjects(self): keys = [] with self.app.db.getSession() as session: for p in session.getProjects(subscribed=True): keys.append(p.key) for key in keys: t = SyncProjectTask(key, LOW_PRIORITY) self.submitTask(t) t.wait()