121 lines
4.3 KiB
Python
121 lines
4.3 KiB
Python
# Copyright 2011 OpenStack, LLC.
|
|
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
|
# Copyright 2013 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import logging
|
|
import math
|
|
import threading
|
|
import time
|
|
|
|
from zuul.zk.event_queues import EventReceiverElection
|
|
|
|
|
|
class GerritChecksPoller(threading.Thread):
|
|
# Poll gerrit without stream-events
|
|
log = logging.getLogger("zuul.GerritConnection.poller")
|
|
poll_interval = 30
|
|
|
|
def __init__(self, connection):
|
|
threading.Thread.__init__(self)
|
|
self.connection = connection
|
|
self.last_merged_poll = 0
|
|
self.poller_election = EventReceiverElection(
|
|
connection.sched.zk_client, connection.connection_name, "poller")
|
|
self._stopped = False
|
|
self._stop_event = threading.Event()
|
|
|
|
def _makePendingCheckEvent(self, change, uuid, check):
|
|
return {'type': 'pending-check',
|
|
'uuid': uuid,
|
|
'change': {
|
|
'project': change['patch_set']['repository'],
|
|
'number': change['patch_set']['change_number'],
|
|
},
|
|
'patchSet': {
|
|
'number': change['patch_set']['patch_set_id'],
|
|
}}
|
|
|
|
def _makeChangeMergedEvent(self, change):
|
|
"""Make a simulated change-merged event
|
|
|
|
Mostly for the benefit of scheduler reconfiguration.
|
|
"""
|
|
rev = change['revisions'][change['current_revision']]
|
|
return {'type': 'change-merged',
|
|
'change': {
|
|
'project': change['project'],
|
|
'number': change['_number'],
|
|
},
|
|
'patchSet': {
|
|
'number': rev['_number'],
|
|
}}
|
|
|
|
def _poll_checkers(self):
|
|
for checker in self.connection.watched_checkers:
|
|
changes = self.connection.get(
|
|
'plugins/checks/checks.pending/?'
|
|
'query=checker:%s+(state:NOT_STARTED)' % checker)
|
|
for change in changes:
|
|
for uuid, check in change['pending_checks'].items():
|
|
event = self._makePendingCheckEvent(
|
|
change, uuid, check)
|
|
self.connection.addEvent(event)
|
|
|
|
def _poll_merged_changes(self):
|
|
now = datetime.datetime.utcnow()
|
|
age = self.last_merged_poll
|
|
if age:
|
|
# Allow an extra 4 seconds for request time
|
|
age = int(math.ceil((now - age).total_seconds())) + 4
|
|
changes = self.connection.simpleQueryHTTP(
|
|
"status:merged -age:%ss" % (age,))
|
|
self.last_merged_poll = now
|
|
for change in changes:
|
|
event = self._makeChangeMergedEvent(change)
|
|
self.connection.addEvent(event)
|
|
|
|
def _poll(self):
|
|
next_start = self._last_start + self.poll_interval
|
|
self._stop_event.wait(max(next_start - time.time(), 0))
|
|
if self._stopped or not self.poller_election.is_still_valid():
|
|
return
|
|
self._last_start = time.time()
|
|
self._poll_checkers()
|
|
if self.connection.event_source == self.connection.EVENT_SOURCE_NONE:
|
|
self._poll_merged_changes()
|
|
|
|
def _run(self):
|
|
self._last_start = time.time()
|
|
while not self._stopped and self.poller_election.is_still_valid():
|
|
# during tests, a sub-class _poll method is used to send
|
|
# notifications
|
|
self._poll()
|
|
|
|
def run(self):
|
|
while not self._stopped:
|
|
try:
|
|
self.poller_election.run(self._run)
|
|
except Exception:
|
|
self.log.exception("Exception on Gerrit poll with %s:",
|
|
self.connection.connection_name)
|
|
time.sleep(1)
|
|
|
|
def stop(self):
|
|
self.log.debug("Stopping watcher")
|
|
self._stopped = True
|
|
self._stop_event.set()
|
|
self.poller_election.cancel()
|