Add support for 'connection' concept

This is a large refactor and as small as I could feasibly make it
while keeping the tests working. I'll do the documentation and
touch ups in the next commit to make digesting easier.

Change-Id: Iac5083996a183d1d8a9b6cb8f70836f7c39ee910
This commit is contained in:
Joshua Hesketh 2015-08-11 23:42:08 +10:00
parent 70b13490a3
commit 352264b3c2
48 changed files with 1508 additions and 950 deletions

View File

@ -42,6 +42,8 @@ import statsd
import testtools
from git import GitCommandError
import zuul.connection.gerrit
import zuul.connection.smtp
import zuul.scheduler
import zuul.webapp
import zuul.rpclistener
@ -379,20 +381,20 @@ class FakeChange(object):
self.reported += 1
class FakeGerrit(object):
log = logging.getLogger("zuul.test.FakeGerrit")
class FakeGerritConnection(zuul.connection.gerrit.GerritConnection):
log = logging.getLogger("zuul.test.FakeGerritConnection")
def __init__(self, hostname, username, port=29418, keyfile=None,
changes_dbs={}, queues_dbs={}):
self.hostname = hostname
self.username = username
self.port = port
self.keyfile = keyfile
self.event_queue = queues_dbs.get(hostname, {})
def __init__(self, connection_name, connection_config,
changes_db=None, queues_db=None):
super(FakeGerritConnection, self).__init__(connection_name,
connection_config)
self.event_queue = queues_db
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_dbs.get(hostname, {})
self.changes = changes_db
self.queries = []
self.upstream_root = None
def addFakeChange(self, project, branch, subject, status='NEW'):
self.change_number += 1
@ -402,15 +404,6 @@ class FakeGerrit(object):
self.changes[self.change_number] = c
return c
def addEvent(self, data):
return self.event_queue.put((time.time(), data))
def getEvent(self):
return self.event_queue.get()
def eventDone(self):
self.event_queue.task_done()
def review(self, project, changeid, message, action):
number, ps = changeid.split(',')
change = self.changes[int(number)]
@ -427,11 +420,11 @@ class FakeGerrit(object):
for cat in ['CRVW', 'VRFY', 'APRV']:
if cat in action:
change.addApproval(cat, action[cat], username=self.username)
change.addApproval(cat, action[cat], username=self.user)
if 'label' in action:
parts = action['label'].split('=')
change.addApproval(parts[0], parts[2], username=self.username)
change.addApproval(parts[0], parts[2], username=self.user)
change.messages.append(message)
@ -464,9 +457,12 @@ class FakeGerrit(object):
l = [change.query() for change in self.changes.values()]
return l
def startWatching(self, *args, **kw):
def _start_watcher_thread(self, *args, **kw):
pass
def getGitUrl(self, project):
return os.path.join(self.upstream_root, project.name)
class BuildHistory(object):
def __init__(self, **kw):
@ -500,19 +496,6 @@ class FakeURLOpener(object):
return ret
class FakeGerritSource(zuul.source.gerrit.GerritSource):
name = 'gerrit'
def __init__(self, upstream_root, *args):
super(FakeGerritSource, self).__init__(*args)
self.upstream_root = upstream_root
self.replication_timeout = 1.5
self.replication_retry_interval = 0.5
def getGitUrl(self, project):
return os.path.join(self.upstream_root, project.name)
class FakeStatsd(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
@ -898,7 +881,6 @@ class ZuulTestCase(BaseTestCase):
shutil.rmtree(self.test_root)
os.makedirs(self.test_root)
os.makedirs(self.upstream_root)
os.makedirs(self.git_root)
# Make per test copy of Configuration.
self.setup_config()
@ -942,15 +924,22 @@ class ZuulTestCase(BaseTestCase):
self.worker.addServer('127.0.0.1', self.gearman_server.port)
self.gearman_server.worker = self.worker
self.merge_server = zuul.merger.server.MergeServer(self.config)
self.merge_server.start()
zuul.source.gerrit.GerritSource.replication_timeout = 1.5
zuul.source.gerrit.GerritSource.replication_retry_interval = 0.5
zuul.connection.gerrit.GerritEventConnector.delay = 0.0
self.sched = zuul.scheduler.Scheduler()
self.sched = zuul.scheduler.Scheduler(self.config)
self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
FakeSwiftClientConnection))
self.swift = zuul.lib.swift.Swift(self.config)
# Set up connections and give out the default gerrit for testing
self.configure_connections()
self.sched.registerConnections(self.connections)
self.fake_gerrit = self.connections['gerrit']
self.fake_gerrit.upstream_root = self.upstream_root
def URLOpenerFactory(*args, **kw):
if isinstance(args[0], urllib2.Request):
return old_urlopen(*args, **kw)
@ -960,30 +949,9 @@ class ZuulTestCase(BaseTestCase):
old_urlopen = urllib2.urlopen
urllib2.urlopen = URLOpenerFactory
self.smtp_messages = []
def FakeSMTPFactory(*args, **kw):
args = [self.smtp_messages] + list(args)
return FakeSMTP(*args, **kw)
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
self.gerrit_queues_dbs = {
self.config.get('gerrit', 'server'): Queue.Queue()
}
self.gerrit_changes_dbs = {
self.config.get('gerrit', 'server'): {}
}
def FakeGerritFactory(*args, **kw):
kw['changes_dbs'] = self.gerrit_changes_dbs
kw['queues_dbs'] = self.gerrit_queues_dbs
return FakeGerrit(*args, **kw)
self.useFixture(fixtures.MonkeyPatch('zuul.lib.gerrit.Gerrit',
FakeGerritFactory))
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
self.merge_server = zuul.merger.server.MergeServer(self.config,
self.connections)
self.merge_server.start()
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
@ -993,13 +961,6 @@ class ZuulTestCase(BaseTestCase):
self.sched.setLauncher(self.launcher)
self.sched.setMerger(self.merge_client)
self.register_sources()
self.fake_gerrit = self.gerrit_source.gerrit
self.fake_gerrit.upstream_root = self.upstream_root
self.register_triggers()
self.register_reporters()
self.webapp = zuul.webapp.WebApp(self.sched, port=0)
self.rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
@ -1016,37 +977,67 @@ class ZuulTestCase(BaseTestCase):
self.addCleanup(self.assertFinalState)
self.addCleanup(self.shutdown)
def register_sources(self):
# Register the available sources
self.gerrit_source = FakeGerritSource(
self.upstream_root, self.config, self.sched)
self.gerrit_source.replication_timeout = 1.5
self.gerrit_source.replication_retry_interval = 0.5
def configure_connections(self):
# Register connections from the config
self.smtp_messages = []
self.sched.registerSource(self.gerrit_source)
def FakeSMTPFactory(*args, **kw):
args = [self.smtp_messages] + list(args)
return FakeSMTP(*args, **kw)
def register_triggers(self):
# Register the available triggers
self.gerrit_trigger = zuul.trigger.gerrit.GerritTrigger(
self.fake_gerrit, self.config, self.sched, self.gerrit_source)
self.gerrit_trigger.gerrit_connector.delay = 0.0
self.useFixture(fixtures.MonkeyPatch('smtplib.SMTP', FakeSMTPFactory))
self.sched.registerTrigger(self.gerrit_trigger)
self.timer = zuul.trigger.timer.TimerTrigger(self.config, self.sched)
self.sched.registerTrigger(self.timer)
self.zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(self.config,
self.sched)
self.sched.registerTrigger(self.zuultrigger)
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
self.gerrit_changes_dbs = {}
self.gerrit_queues_dbs = {}
self.connections = {}
def register_reporters(self):
# Register the available reporters
self.sched.registerReporter(
zuul.reporter.gerrit.GerritReporter(self.fake_gerrit))
self.smtp_reporter = zuul.reporter.smtp.SMTPReporter(
self.config.get('smtp', 'default_from'),
self.config.get('smtp', 'default_to'),
self.config.get('smtp', 'server'))
self.sched.registerReporter(self.smtp_reporter)
for section_name in self.config.sections():
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
section_name, re.I)
if not con_match:
continue
con_name = con_match.group(2)
con_config = dict(self.config.items(section_name))
if 'driver' not in con_config:
raise Exception("No driver specified for connection %s."
% con_name)
con_driver = con_config['driver']
# TODO(jhesketh): load the required class automatically
if con_driver == 'gerrit':
self.gerrit_changes_dbs[con_name] = {}
self.gerrit_queues_dbs[con_name] = Queue.Queue()
self.connections[con_name] = FakeGerritConnection(
con_name, con_config,
changes_db=self.gerrit_changes_dbs[con_name],
queues_db=self.gerrit_queues_dbs[con_name]
)
elif con_driver == 'smtp':
self.connections[con_name] = \
zuul.connection.smtp.SMTPConnection(con_name, con_config)
else:
raise Exception("Unknown driver, %s, for connection %s"
% (con_config['driver'], con_name))
# If the [gerrit] or [smtp] sections still exist, load them in as a
# connection named 'gerrit' or 'smtp' respectfully
if 'gerrit' in self.config.sections():
self.gerrit_changes_dbs['gerrit'] = {}
self.gerrit_queues_dbs['gerrit'] = Queue.Queue()
self.connections['gerrit'] = FakeGerritConnection(
'_legacy_gerrit', dict(self.config.items('gerrit')),
changes_db=self.gerrit_changes_dbs['gerrit'],
queues_db=self.gerrit_queues_dbs['gerrit'])
if 'smtp' in self.config.sections():
self.connections['smtp'] = \
zuul.connection.smtp.SMTPConnection(
'_legacy_smtp', dict(self.config.items('smtp')))
def setup_config(self):
"""Per test config object. Override to set different config."""
@ -1074,8 +1065,6 @@ class ZuulTestCase(BaseTestCase):
self.merge_server.join()
self.merge_client.stop()
self.worker.shutdown()
self.gerrit_trigger.stop()
self.timer.stop()
self.sched.stop()
self.sched.join()
self.statsd.stop()

View File

@ -0,0 +1,18 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
not_gerrit:
- event: patchset-created
success:
review_gerrit:
verified: 1
failure:
review_gerrit:
verified: -1
projects:
- name: test-org/test
check:
- test-merge
- test-test

View File

@ -2,13 +2,13 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
success:
gerrit:
review_gerrit:
verified: 1
failure:
gerrit:
review_gerrit:
verified: -1
# merge-failure-message needs a string.
merge-failure-message:
@ -17,20 +17,20 @@ pipelines:
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
review_gerrit:
- event: comment-added
approval:
- approved: 1
success:
gerrit:
review_gerrit:
verified: 2
submit: true
failure:
gerrit:
review_gerrit:
verified: -2
merge-failure:
start:
gerrit:
review_gerrit:
verified: 0
precedence: high

View File

@ -2,7 +2,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
ref: /some/ref/path

View File

@ -2,7 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
# event is a required item but it is missing.
- approval:
- approved: 1

View File

@ -2,7 +2,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: comment-added
# approved is not a valid entry. Should be approval.
approved: 1

View File

@ -17,5 +17,5 @@ pipelines:
- code-review: [-1, -2]
username: core-person
trigger:
gerrit:
review_gerrit:
- event: patchset-created

View File

@ -2,13 +2,13 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
success:
gerrit:
review_gerrit:
verified: 1
failure:
gerrit:
review_gerrit:
verified: -1
jobs:

View File

@ -4,7 +4,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
project-templates:

View File

@ -4,7 +4,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
project-templates:

View File

@ -0,0 +1,42 @@
[gearman]
server=127.0.0.1
[zuul]
layout_config=layout.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true
[merger]
git_dir=/tmp/zuul-test/git
git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[swift]
authurl=https://identity.api.example.org/v2.0/
user=username
key=password
tenant_name=" "
default_container=logs
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/
[connection review_gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=none
[connection other_gerrit]
driver=gerrit
server=review2.example.com
user=jenkins2
sshkey=none
[connection my_smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com

View File

@ -0,0 +1,18 @@
pipelines:
- name: check
manager: IndependentPipelineManager
source: review_gerrit
trigger:
review_gerrit:
- event: patchset-created
success:
review_gerrit:
verified: 1
failure:
other_gerrit:
verified: -1
projects:
- name: org/project
check:
- project-check

View File

@ -8,7 +8,7 @@ pipelines:
open: True
current-patchset: True
trigger:
gerrit:
review_gerrit:
- event: patchset-created
- event: comment-added
require-approval:
@ -17,16 +17,16 @@ pipelines:
approval:
- workflow: 1
success:
gerrit:
review_gerrit:
verified: 1
failure:
gerrit:
review_gerrit:
verified: -1
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
ignore-deletes: True
@ -46,32 +46,32 @@ pipelines:
approval:
- code-review: [-1, -2]
trigger:
gerrit:
review_gerrit:
- event: comment-added
approval:
- approved: 1
start:
gerrit:
review_gerrit:
verified: 0
success:
gerrit:
review_gerrit:
verified: 2
code-review: 1
submit: true
failure:
gerrit:
review_gerrit:
verified: -2
workinprogress: true
- name: merge-check
manager: IndependentPipelineManager
source: gerrit
source: review_gerrit
ignore-dependencies: true
trigger:
zuul:
- event: project-change-merged
merge-failure:
gerrit:
review_gerrit:
verified: -1
jobs:

View File

@ -3,47 +3,47 @@ pipelines:
manager: IndependentPipelineManager
merge-failure-message: "Could not merge the change. Please rebase..."
trigger:
gerrit:
review_gerrit:
- event: patchset-created
success:
gerrit:
review_gerrit:
verified: 1
failure:
gerrit:
review_gerrit:
verified: -1
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
merge-failure:
gerrit:
review_gerrit:
verified: -1
- name: gate
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
review_gerrit:
- event: comment-added
approval:
- approved: 1
success:
gerrit:
review_gerrit:
verified: 2
submit: true
failure:
gerrit:
review_gerrit:
verified: -2
merge-failure:
gerrit:
review_gerrit:
verified: -1
smtp:
my_smtp:
to: you@example.com
start:
gerrit:
review_gerrit:
verified: 0
precedence: high

View File

@ -5,7 +5,7 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: comment-added
require-approval:
- username: jenkins
@ -23,10 +23,10 @@ pipelines:
username: jenkins
email: jenkins@example.com
success:
gerrit:
review_gerrit:
verified: 1
failure:
gerrit:
review_gerrit:
verified: -1
projects:

View File

@ -2,13 +2,13 @@ pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
success:
gerrit:
review_gerrit:
verified: 1
failure:
gerrit:
review_gerrit:
verified: -1
jobs:

View File

@ -2,7 +2,7 @@ pipelines:
- name: 'check'
manager: IndependentPipelineManager
trigger:
gerrit:
review_gerrit:
- event: patchset-created
project-templates:

View File

@ -0,0 +1,36 @@
[gearman]
server=127.0.0.1
[zuul]
layout_config=layout.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
job_name_in_report=true
[merger]
git_dir=/tmp/zuul-test/git
git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[swift]
authurl=https://identity.api.example.org/v2.0/
user=username
key=password
tenant_name=" "
default_container=logs
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/
[connection review_gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=none
[connection my_smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com

View File

@ -1,11 +1,6 @@
[gearman]
server=127.0.0.1
[gerrit]
server=review.example.com
user=jenkins
sshkey=none
[zuul]
layout_config=layout.yaml
url_pattern=http://logs.example.com/{change.number}/{change.patchset}/{pipeline.name}/{job.name}/{build.number}
@ -17,12 +12,6 @@ git_user_email=zuul@example.com
git_user_name=zuul
zuul_url=http://zuul.example.com/p
[smtp]
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[swift]
authurl=https://identity.api.example.org/v2.0/
user=username
@ -32,3 +21,16 @@ tenant_name=" "
default_container=logs
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/
[connection gerrit]
driver=gerrit
server=review.example.com
user=jenkins
sshkey=none
[connection smtp]
driver=smtp
server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com

26
tests/test_connection.py Normal file
View File

@ -0,0 +1,26 @@
# Copyright 2014 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import testtools
import zuul.connection.gerrit
class TestGerritConnection(testtools.TestCase):
log = logging.getLogger("zuul.test_connection")
def test_driver_name(self):
self.assertEqual('gerrit',
zuul.connection.gerrit.GerritConnection.driver_name)

View File

@ -21,7 +21,7 @@ except ImportError:
import mock
from tests.base import BaseTestCase
from zuul.lib.gerrit import Gerrit
from zuul.connection.gerrit import GerritConnection
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures/gerrit')
@ -46,9 +46,13 @@ def read_fixtures(files):
class TestGerrit(BaseTestCase):
@mock.patch('zuul.lib.gerrit.Gerrit._ssh')
@mock.patch('zuul.connection.gerrit.GerritConnection._ssh')
def run_query(self, files, expected_patches, _ssh_mock):
gerrit = Gerrit('localhost', 'user')
gerrit_config = {
'user': 'gerrit',
'server': 'localhost',
}
gerrit = GerritConnection('review_gerrit', gerrit_config)
calls, values = read_fixtures(files)
_ssh_mock.side_effect = values

View File

@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
import re
@ -22,6 +23,7 @@ import voluptuous
import yaml
import zuul.layoutvalidator
import zuul.lib.connections
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
@ -38,19 +40,31 @@ class TestLayoutValidator(testtools.TestCase):
if not m:
continue
print fn
# Load any .conf file by the same name but .conf extension.
config_file = ("%s.conf" %
os.path.join(FIXTURE_DIR, 'layouts',
fn.split('.yaml')[0]))
if not os.path.isfile(config_file):
config_file = os.path.join(FIXTURE_DIR, 'layouts',
'zuul_default.conf')
config = ConfigParser.ConfigParser()
config.read(config_file)
connections = zuul.lib.connections.configure_connections(config)
layout = os.path.join(FIXTURE_DIR, 'layouts', fn)
data = yaml.load(open(layout))
validator = zuul.layoutvalidator.LayoutValidator()
if m.group(1) == 'good':
try:
validator.validate(data)
validator.validate(data, connections)
except voluptuous.Invalid as e:
raise Exception(
'Unexpected YAML syntax error in %s:\n %s' %
(fn, str(e)))
else:
try:
validator.validate(data)
validator.validate(data, connections)
raise Exception("Expected a YAML syntax error in %s." %
fn)
except voluptuous.Invalid as e:

View File

@ -26,7 +26,7 @@ class TestSMTPReporter(testtools.TestCase):
def test_reporter_abc(self):
# We only need to instantiate a class for this
reporter = zuul.reporter.smtp.SMTPReporter('', '') # noqa
reporter = zuul.reporter.smtp.SMTPReporter({}) # noqa
def test_reporter_name(self):
self.assertEqual('smtp', zuul.reporter.smtp.SMTPReporter.name)

View File

@ -61,7 +61,7 @@ jobs:
""".strip()
data = yaml.load(job_yaml)
config_job = data.get('jobs')[0]
sched = zuul.scheduler.Scheduler()
sched = zuul.scheduler.Scheduler({})
cm = zuul.change_matcher
expected = cm.MatchAny([
cm.MatchAll([
@ -762,9 +762,9 @@ class TestScheduler(ZuulTestCase):
self.fake_gerrit.addEvent(B.addApproval('APRV', 1))
self.waitUntilSettled()
self.log.debug("len %s" % self.gerrit_source._change_cache.keys())
self.log.debug("len %s" % self.fake_gerrit._change_cache.keys())
# there should still be changes in the cache
self.assertNotEqual(len(self.gerrit_source._change_cache.keys()), 0)
self.assertNotEqual(len(self.fake_gerrit._change_cache.keys()), 0)
self.worker.hold_jobs_in_build = False
self.worker.release()
@ -1469,7 +1469,7 @@ class TestScheduler(ZuulTestCase):
"Test that the merger works with large changes after a repack"
# https://bugs.launchpad.net/zuul/+bug/1078946
# This test assumes the repo is already cloned; make sure it is
url = self.sched.sources['gerrit'].getGitUrl(
url = self.fake_gerrit.getGitUrl(
self.sched.layout.projects['org/project1'])
self.merge_server.merger.addProject('org/project1', url)
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@ -2164,7 +2164,8 @@ class TestScheduler(ZuulTestCase):
def test_test_config(self):
"Test that we can test the config"
self.sched.testConfig(self.config.get('zuul', 'layout_config'))
self.sched.testConfig(self.config.get('zuul', 'layout_config'),
self.connections)
def test_build_description(self):
"Test that build descriptions update"

View File

@ -23,8 +23,7 @@ class TestGerritTrigger(testtools.TestCase):
def test_trigger_abc(self):
# We only need to instantiate a class for this
trigger = zuul.trigger.gerrit.GerritTrigger(None, None, None, # noqa
None)
zuul.trigger.gerrit.GerritTrigger({})
def test_trigger_name(self):
self.assertEqual('gerrit', zuul.trigger.gerrit.GerritTrigger.name)
@ -35,7 +34,7 @@ class TestTimerTrigger(testtools.TestCase):
def test_trigger_abc(self):
# We only need to instantiate a class for this
trigger = zuul.trigger.timer.TimerTrigger(None, None) # noqa
zuul.trigger.timer.TimerTrigger({})
def test_trigger_name(self):
self.assertEqual('timer', zuul.trigger.timer.TimerTrigger.name)
@ -46,7 +45,7 @@ class TestZuulTrigger(testtools.TestCase):
def test_trigger_abc(self):
# We only need to instantiate a class for this
trigger = zuul.trigger.zuultrigger.ZuulTrigger(None, None) # noqa
zuul.trigger.zuultrigger.ZuulTrigger({})
def test_trigger_name(self):
self.assertEqual('zuul', zuul.trigger.zuultrigger.ZuulTrigger.name)

View File

@ -26,7 +26,9 @@ import traceback
yappi = extras.try_import('yappi')
# No zuul imports here because they pull in paramiko which must not be
import zuul.lib.connections
# Do not import modules that will pull in paramiko which must not be
# imported until after the daemonization.
# https://github.com/paramiko/paramiko/issues/59
# Similar situation with gear and statsd.
@ -59,6 +61,7 @@ class ZuulApp(object):
def __init__(self):
self.args = None
self.config = None
self.connections = {}
def _get_version(self):
from zuul.version import version_info as zuul_version_info
@ -86,3 +89,7 @@ class ZuulApp(object):
logging.config.fileConfig(fp)
else:
logging.basicConfig(level=logging.DEBUG)
def configure_connections(self):
self.connections = zuul.lib.connections.configure_connections(
self.config)

View File

@ -58,7 +58,8 @@ class Merger(zuul.cmd.ZuulApp):
self.setup_logging('merger', 'log_config')
self.merger = zuul.merger.server.MergeServer(self.config)
self.merger = zuul.merger.server.MergeServer(self.config,
self.connections)
self.merger.start()
signal.signal(signal.SIGUSR1, self.exit_handler)
@ -76,6 +77,7 @@ def main():
server.parse_arguments()
server.read_config()
server.configure_connections()
if server.config.has_option('zuul', 'state_dir'):
state_dir = os.path.expanduser(server.config.get('zuul', 'state_dir'))

View File

@ -60,9 +60,13 @@ class Server(zuul.cmd.ZuulApp):
def reconfigure_handler(self, signum, frame):
signal.signal(signal.SIGHUP, signal.SIG_IGN)
self.log.debug("Reconfiguration triggered")
self.sched.stopConnections()
self.read_config()
self.setup_logging('zuul', 'log_config')
try:
self.configure_connections()
self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
except Exception:
self.log.exception("Reconfiguration failed:")
@ -85,14 +89,11 @@ class Server(zuul.cmd.ZuulApp):
import zuul.trigger.gerrit
logging.basicConfig(level=logging.DEBUG)
self.sched = zuul.scheduler.Scheduler()
self.sched.registerReporter(None, 'gerrit')
self.sched.registerReporter(None, 'smtp')
self.sched.registerTrigger(None, 'gerrit')
self.sched.registerTrigger(None, 'timer')
self.sched.registerTrigger(None, 'zuul')
self.sched = zuul.scheduler.Scheduler(self.config)
self.configure_connections()
layout = self.sched.testConfig(self.config.get('zuul',
'layout_config'))
'layout_config'),
self.connections)
if not job_list_path:
return False
@ -144,51 +145,6 @@ class Server(zuul.cmd.ZuulApp):
if self.gear_server_pid:
os.kill(self.gear_server_pid, signal.SIGKILL)
def register_sources(self):
# Register the available sources
# See comment at top of file about zuul imports
import zuul.source.gerrit
self.gerrit_source = zuul.source.gerrit.GerritSource(self.config,
self.sched)
self.sched.registerSource(self.gerrit_source)
def register_triggers(self):
# Register the available triggers
# See comment at top of file about zuul imports
import zuul.trigger.gerrit
import zuul.trigger.timer
import zuul.trigger.zuultrigger
self.gerrit_trigger = zuul.trigger.gerrit.GerritTrigger(
self.gerrit, self.config, self.sched, self.gerrit_source)
timer = zuul.trigger.timer.TimerTrigger(self.config, self.sched)
zuultrigger = zuul.trigger.zuultrigger.ZuulTrigger(
self.config, self.sched)
self.sched.registerTrigger(self.gerrit_trigger)
self.sched.registerTrigger(timer)
self.sched.registerTrigger(zuultrigger)
def register_reporters(self):
# Register the available reporters
# See comment at top of file about zuul imports
import zuul.reporter.gerrit
import zuul.reporter.smtp
gerrit_reporter = zuul.reporter.gerrit.GerritReporter(self.gerrit)
smtp_reporter = zuul.reporter.smtp.SMTPReporter(
self.config.get('smtp', 'default_from')
if self.config.has_option('smtp', 'default_from') else 'zuul',
self.config.get('smtp', 'default_to')
if self.config.has_option('smtp', 'default_to') else 'zuul',
self.config.get('smtp', 'server')
if self.config.has_option('smtp', 'server') else 'localhost',
self.config.get('smtp', 'port')
if self.config.has_option('smtp', 'port') else 25
)
self.sched.registerReporter(gerrit_reporter)
self.sched.registerReporter(smtp_reporter)
def main(self):
# See comment at top of file about zuul imports
import zuul.scheduler
@ -206,7 +162,8 @@ class Server(zuul.cmd.ZuulApp):
self.setup_logging('zuul', 'log_config')
self.log = logging.getLogger("zuul.Server")
self.sched = zuul.scheduler.Scheduler()
self.sched = zuul.scheduler.Scheduler(self.config)
# TODO(jhesketh): Move swift into a connection?
self.swift = zuul.lib.swift.Swift(self.config)
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched,
@ -220,17 +177,13 @@ class Server(zuul.cmd.ZuulApp):
webapp = zuul.webapp.WebApp(self.sched, cache_expiry=cache_expiry)
rpc = zuul.rpclistener.RPCListener(self.config, self.sched)
self.configure_connections()
self.sched.setLauncher(gearman)
self.sched.setMerger(merger)
self.register_sources()
# TODO(jhesketh): Use connections instead of grabbing the gerrit lib
# from the source
self.gerrit = self.gerrit_source.gerrit
self.register_triggers()
self.register_reporters()
self.log.info('Starting scheduler')
self.sched.start()
self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config)
self.sched.resume()
self.log.info('Starting Webapp')

View File

@ -0,0 +1,53 @@
# Copyright 2014 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class BaseConnection(object):
"""Base class for connections.
A connection is a shared object that sources, triggers and reporters can
use to speak with a remote API without needing to establish a new
connection each time or without having to authenticate each time.
Multiple instances of the same connection may exist with different
credentials, for example, thus allowing for different pipelines to operate
on different Gerrit installations or post back as a different user etc.
Connections can implement their own public methods. Required connection
methods are validated by the {trigger, source, reporter} they are loaded
into. For example, a trigger will likely require some kind of query method
while a reporter may need a review method."""
def __init__(self, connection_name, connection_config):
# connection_name is the name given to this connection in zuul.ini
# connection_config is a dictionary of config_section from zuul.ini for
# this connection.
# __init__ shouldn't make the actual connection in case this connection
# isn't used in the layout.
self.connection_name = connection_name
self.connection_config = connection_config
def onLoad(self):
pass
def onStop(self):
pass
def registerScheduler(self, sched):
self.sched = sched

465
zuul/connection/gerrit.py Normal file
View File

@ -0,0 +1,465 @@
# Copyright 2011 OpenStack, LLC.
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import select
import json
import time
from six.moves import queue as Queue
import paramiko
import logging
import pprint
import voluptuous as v
import urllib2
from zuul.connection import BaseConnection
from zuul.model import TriggerEvent
class GerritEventConnector(threading.Thread):
"""Move events from Gerrit to the scheduler."""
log = logging.getLogger("zuul.GerritEventConnector")
delay = 5.0
def __init__(self, connection):
super(GerritEventConnector, self).__init__()
self.daemon = True
self.connection = connection
self._stopped = False
def stop(self):
self._stopped = True
self.connection.addEvent(None)
def _handleEvent(self):
ts, data = self.connection.getEvent()
if self._stopped:
self.connection.eventDone()
return
# Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul
# until at least a certain amount of time has passed. Note
# that if we receive several events in succession, we will
# only need to delay for the first event. In essence, Zuul
# should always be a constant number of seconds behind Gerrit.
now = time.time()
time.sleep(max((ts + self.delay) - now, 0.0))
event = TriggerEvent()
event.type = data.get('type')
event.trigger_name = 'gerrit'
change = data.get('change')
if change:
event.project_name = change.get('project')
event.branch = change.get('branch')
event.change_number = change.get('number')
event.change_url = change.get('url')
patchset = data.get('patchSet')
if patchset:
event.patch_number = patchset.get('number')
event.refspec = patchset.get('ref')
event.approvals = data.get('approvals', [])
event.comment = data.get('comment')
refupdate = data.get('refUpdate')
if refupdate:
event.project_name = refupdate.get('project')
event.ref = refupdate.get('refName')
event.oldrev = refupdate.get('oldRev')
event.newrev = refupdate.get('newRev')
# Map the event types to a field name holding a Gerrit
# account attribute. See Gerrit stream-event documentation
# in cmd-stream-events.html
accountfield_from_type = {
'patchset-created': 'uploader',
'draft-published': 'uploader', # Gerrit 2.5/2.6
'change-abandoned': 'abandoner',
'change-restored': 'restorer',
'change-merged': 'submitter',
'merge-failed': 'submitter', # Gerrit 2.5/2.6
'comment-added': 'author',
'ref-updated': 'submitter',
'reviewer-added': 'reviewer', # Gerrit 2.5/2.6
}
try:
event.account = data.get(accountfield_from_type[event.type])
except KeyError:
self.log.error("Received unrecognized event type '%s' from Gerrit.\
Can not get account information." % event.type)
event.account = None
if (event.change_number and
self.connection.sched.getProject(event.project_name)):
# Mark the change as needing a refresh in the cache
event._needs_refresh = True
<