# Copyright 2011 OpenStack, LLC. # Copyright 2012 Hewlett-Packard Development Company, L.P. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import threading import select import json import time from six.moves import queue as Queue from six.moves import urllib import paramiko import logging import pprint import voluptuous as v from zuul.connection import BaseConnection from zuul.model import TriggerEvent class GerritEventConnector(threading.Thread): """Move events from Gerrit to the scheduler.""" log = logging.getLogger("zuul.GerritEventConnector") delay = 10.0 def __init__(self, connection): super(GerritEventConnector, self).__init__() self.daemon = True self.connection = connection self._stopped = False def stop(self): self._stopped = True self.connection.addEvent(None) def _handleEvent(self): ts, data = self.connection.getEvent() if self._stopped: return # Gerrit can produce inconsistent data immediately after an # event, So ensure that we do not deliver the event to Zuul # until at least a certain amount of time has passed. Note # that if we receive several events in succession, we will # only need to delay for the first event. In essence, Zuul # should always be a constant number of seconds behind Gerrit. now = time.time() time.sleep(max((ts + self.delay) - now, 0.0)) event = TriggerEvent() event.type = data.get('type') event.trigger_name = 'gerrit' change = data.get('change') if change: event.project_name = change.get('project') event.branch = change.get('branch') event.change_number = change.get('number') event.change_url = change.get('url') patchset = data.get('patchSet') if patchset: event.patch_number = patchset.get('number') event.refspec = patchset.get('ref') event.approvals = data.get('approvals', []) event.comment = data.get('comment') refupdate = data.get('refUpdate') if refupdate: event.project_name = refupdate.get('project') event.ref = refupdate.get('refName') event.oldrev = refupdate.get('oldRev') event.newrev = refupdate.get('newRev') # Map the event types to a field name holding a Gerrit # account attribute. See Gerrit stream-event documentation # in cmd-stream-events.html accountfield_from_type = { 'patchset-created': 'uploader', 'draft-published': 'uploader', # Gerrit 2.5/2.6 'change-abandoned': 'abandoner', 'change-restored': 'restorer', 'change-merged': 'submitter', 'merge-failed': 'submitter', # Gerrit 2.5/2.6 'comment-added': 'author', 'ref-updated': 'submitter', 'reviewer-added': 'reviewer', # Gerrit 2.5/2.6 } try: event.account = data.get(accountfield_from_type[event.type]) except KeyError: self.log.warning("Received unrecognized event type '%s' from Gerrit.\ Can not get account information." % event.type) event.account = None if (event.change_number and self.connection.sched.getProject(event.project_name)): # Call _getChange for the side effect of updating the # cache. Note that this modifies Change objects outside # the main thread. # NOTE(jhesketh): Ideally we'd just remove the change from the # cache to denote that it needs updating. However the change # object is already used by Item's and hence BuildSet's etc. and # we need to update those objects by reference so that they have # the correct/new information and also avoid hitting gerrit # multiple times. if self.connection.attached_to['source']: self.connection.attached_to['source'][0]._getChange( event.change_number, event.patch_number, refresh=True) # We only need to do this once since the connection maintains # the cache (which is shared between all the sources) # NOTE(jhesketh): We may couple sources and connections again # at which point this becomes more sensible. self.connection.sched.addEvent(event) def run(self): while True: if self._stopped: return try: self._handleEvent() except: self.log.exception("Exception moving Gerrit event:") finally: self.connection.eventDone() class GerritWatcher(threading.Thread): log = logging.getLogger("gerrit.GerritWatcher") poll_timeout = 500 def __init__(self, gerrit_connection, username, hostname, port=29418, keyfile=None, keepalive=60): threading.Thread.__init__(self) self.username = username self.keyfile = keyfile self.hostname = hostname self.port = port self.gerrit_connection = gerrit_connection self.keepalive = keepalive self._stopped = False def _read(self, fd): l = fd.readline() data = json.loads(l) self.log.debug("Received data from Gerrit event stream: \n%s" % pprint.pformat(data)) self.gerrit_connection.addEvent(data) def _listen(self, stdout, stderr): poll = select.poll() poll.register(stdout.channel) while not self._stopped: ret = poll.poll(self.poll_timeout) for (fd, event) in ret: if fd == stdout.channel.fileno(): if event == select.POLLIN: self._read(stdout) else: raise Exception("event on ssh connection") def _run(self): try: client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.WarningPolicy()) client.connect(self.hostname, username=self.username, port=self.port, key_filename=self.keyfile) transport = client.get_transport() transport.set_keepalive(self.keepalive) stdin, stdout, stderr = client.exec_command("gerrit stream-events") self._listen(stdout, stderr) if not stdout.channel.exit_status_ready(): # The stream-event is still running but we are done polling # on stdout most likely due to being asked to stop. # Try to stop the stream-events command sending Ctrl-C stdin.write("\x03") time.sleep(.2) if not stdout.channel.exit_status_ready(): # we're still not ready to exit, lets force the channel # closed now. stdout.channel.close() ret = stdout.channel.recv_exit_status() self.log.debug("SSH exit status: %s" % ret) client.close() if ret and ret not in [-1, 130]: raise Exception("Gerrit error executing stream-events") except: self.log.exception("Exception on ssh event stream:") time.sleep(5) def run(self): while not self._stopped: self._run() def stop(self): self.log.debug("Stopping watcher") self._stopped = True class GerritConnection(BaseConnection): driver_name = 'gerrit' log = logging.getLogger("connection.gerrit") def __init__(self, connection_name, connection_config): super(GerritConnection, self).__init__(connection_name, connection_config) if 'server' not in self.connection_config: raise Exception('server is required for gerrit connections in ' '%s' % self.connection_name) if 'user' not in self.connection_config: raise Exception('user is required for gerrit connections in ' '%s' % self.connection_name) self.user = self.connection_config.get('user') self.server = self.connection_config.get('server') self.port = int(self.connection_config.get('port', 29418)) self.keyfile = self.connection_config.get('sshkey', None) self.keepalive = int(self.connection_config.get('keepalive', 60)) self.watcher_thread = None self.event_queue = None self.client = None self.baseurl = self.connection_config.get('baseurl', 'https://%s' % self.server) self._change_cache = {} self.gerrit_event_connector = None def getCachedChange(self, key): if key in self._change_cache: return self._change_cache.get(key) return None def updateChangeCache(self, key, value): self._change_cache[key] = value def deleteCachedChange(self, key): if key in self._change_cache: del self._change_cache[key] def maintainCache(self, relevant): # This lets the user supply a list of change objects that are # still in use. Anything in our cache that isn't in the supplied # list should be safe to remove from the cache. remove = [] for key, change in self._change_cache.items(): if change not in relevant: remove.append(key) for key in remove: del self._change_cache[key] def addEvent(self, data): return self.event_queue.put((time.time(), data)) def getEvent(self): return self.event_queue.get() def eventDone(self): self.event_queue.task_done() def review(self, project, change, message, action={}): cmd = 'gerrit review --project %s' % project if message: cmd += ' --message "%s"' % message for key, val in action.items(): if val is True: cmd += ' --%s' % key else: cmd += ' --%s %s' % (key, val) cmd += ' %s' % change out, err = self._ssh(cmd) return err def query(self, query): args = '--all-approvals --comments --commit-message' args += ' --current-patch-set --dependencies --files' args += ' --patch-sets --submit-records' cmd = 'gerrit query --format json %s %s' % ( args, query) out, err = self._ssh(cmd) if not out: return False lines = out.split('\n') if not lines: return False data = json.loads(lines[0]) if not data: return False self.log.debug("Received data from Gerrit query: \n%s" % (pprint.pformat(data))) return data def simpleQuery(self, query): def _query_chunk(query): args = '--commit-message --current-patch-set' cmd = 'gerrit query --format json %s %s' % ( args, query) out, err = self._ssh(cmd) if not out: return False lines = out.split('\n') if not lines: return False # filter out blank lines data = [json.loads(line) for line in lines if line.startswith('{')] # check last entry for more changes more_changes = None if 'moreChanges' in data[-1]: more_changes = data[-1]['moreChanges'] # we have to remove the statistics line del data[-1] if not data: return False, more_changes self.log.debug("Received data from Gerrit query: \n%s" % (pprint.pformat(data))) return data, more_changes # gerrit returns 500 results by default, so implement paging # for large projects like nova alldata = [] chunk, more_changes = _query_chunk(query) while(chunk): alldata.extend(chunk) if more_changes is None: # continue sortKey based (before Gerrit 2.9) resume = "resume_sortkey:'%s'" % chunk[-1]["sortKey"] elif more_changes: # continue moreChanges based (since Gerrit 2.9) resume = "-S %d" % len(alldata) else: # no more changes break chunk, more_changes = _query_chunk("%s %s" % (query, resume)) return alldata def _open(self): client = paramiko.SSHClient() client.load_system_host_keys() client.set_missing_host_key_policy(paramiko.WarningPolicy()) client.connect(self.server, username=self.user, port=self.port, key_filename=self.keyfile) transport = client.get_transport() transport.set_keepalive(self.keepalive) self.client = client def _ssh(self, command, stdin_data=None): if not self.client: self._open() try: self.log.debug("SSH command:\n%s" % command) stdin, stdout, stderr = self.client.exec_command(command) except: self._open() stdin, stdout, stderr = self.client.exec_command(command) if stdin_data: stdin.write(stdin_data) out = stdout.read() self.log.debug("SSH received stdout:\n%s" % out) ret = stdout.channel.recv_exit_status() self.log.debug("SSH exit status: %s" % ret) err = stderr.read() self.log.debug("SSH received stderr:\n%s" % err) if ret: raise Exception("Gerrit error executing %s" % command) return (out, err) def getInfoRefs(self, project): url = "%s/p/%s/info/refs?service=git-upload-pack" % ( self.baseurl, project) try: data = urllib.request.urlopen(url).read() except: self.log.error("Cannot get references from %s" % url) raise # keeps urllib error informations ret = {} read_headers = False read_advertisement = False if data[4] != '#': raise Exception("Gerrit repository does not support " "git-upload-pack") i = 0 while i < len(data): if len(data) - i < 4: raise Exception("Invalid length in info/refs") plen = int(data[i:i + 4], 16) i += 4 # It's the length of the packet, including the 4 bytes of the # length itself, unless it's null, in which case the length is # not included. if plen > 0: plen -= 4 if len(data) - i < plen: raise Exception("Invalid data in info/refs") line = data[i:i + plen] i += plen if not read_headers: if plen == 0: read_headers = True continue if not read_advertisement: read_advertisement = True continue if plen == 0: # The terminating null continue line = line.strip() revision, ref = line.split() ret[ref] = revision return ret def getGitUrl(self, project): url = 'ssh://%s@%s:%s/%s' % (self.user, self.server, self.port, project.name) return url def getGitwebUrl(self, project, sha=None): url = '%s/gitweb?p=%s.git' % (self.baseurl, project) if sha: url += ';a=commitdiff;h=' + sha return url def onLoad(self): self.log.debug("Starting Gerrit Conncetion/Watchers") self._start_watcher_thread() self._start_event_connector() def onStop(self): self.log.debug("Stopping Gerrit Conncetion/Watchers") self._stop_watcher_thread() self._stop_event_connector() def _stop_watcher_thread(self): if self.watcher_thread: self.watcher_thread.stop() self.watcher_thread.join() def _start_watcher_thread(self): self.event_queue = Queue.Queue() self.watcher_thread = GerritWatcher( self, self.user, self.server, self.port, keyfile=self.keyfile, keepalive=self.keepalive) self.watcher_thread.start() def _stop_event_connector(self): if self.gerrit_event_connector: self.gerrit_event_connector.stop() self.gerrit_event_connector.join() def _start_event_connector(self): self.gerrit_event_connector = GerritEventConnector(self) self.gerrit_event_connector.start() def getSchema(): gerrit_connection = v.Any(str, v.Schema({}, extra=True)) return gerrit_connection