Send swift upload instructions to workers

Have zuul send signed credentials as part of the job for workers to
consume and upload assets to a defined location.

Zuul currently doesn't care about logs however this change will
suggest a LOG_PATH to builders as a unqiue destination prefix
allowing zuul to know preemptively the destination.
The workers are still required to send a URL of the final location.

Change-Id: I042cdd2dd2407f381cafcabc5c6b83d9b9a9eb00
This commit is contained in:
Joshua Hesketh 2014-01-22 11:40:52 +11:00
parent 6e81141f58
commit 36c3fa5749
15 changed files with 555 additions and 14 deletions

View File

@ -11,6 +11,8 @@
.. _`Turbo-Hipster Documentation`:
http://turbo-hipster.rtfd.org/
.. _FormPost: http://docs.openstack.org/developer/swift/misc.html#module-swift.common.middleware.formpost
.. _launchers:
Launchers
@ -117,6 +119,34 @@ And finally a reference being altered::
Your jobs can check whether the parameters are ``000000`` to act
differently on each kind of event.
Swift parameters
~~~~~~~~~~~~~~~~
If swift information has been configured for the job zuul will also
provide signed credentials for the builder to upload results and
assets into containers using the `FormPost`_ middleware.
Each zuul container/instruction set will contain each of the following
parameters where $NAME is the ``name`` defined in the layout.
*SWIFT_$NAME_URL*
The swift destination URL. This will be the entire URL including
the AUTH, container and path prefix (folder).
*SWIFT_$NAME_HMAC_BODY*
The information signed in the HMAC body. The body is as follows::
PATH TO OBJECT PREFIX (excluding domain)
BLANK LINE (zuul implements no form redirect)
MAX FILE SIZE
MAX FILE COUNT
SIGNATURE EXPIRY
*SWIFT_$NAME_SIGNATURE*
The HMAC body signed with the configured key.
*SWIFT_$NAME_LOGSERVER_PREFIX*
The URL to prepend to the object path when returning the results
from a build.
Gearman
-------

View File

@ -168,6 +168,59 @@ smtp
This can be overridden by individual pipelines.
``default_to=you@example.com``
.. _swift:
swift
"""""
To send (optional) swift upload instructions this section must be
present. Multiple destinations can be defined in the :ref:`jobs`
section of the layout.
**authurl**
The (keystone) Auth URL for swift
``For example, https://identity.api.rackspacecloud.com/v2.0/``
Any of the `swiftclient connection parameters`_ can also be defined
here by the same name. Including the os_options by their key name (
``for example tenant_id``)
.. _swiftclient connection parameters: http://docs.openstack.org/developer/python-swiftclient/swiftclient.html#module-swiftclient.client
**X-Account-Meta-Temp-Url-Key** (optional)
This is the key used to sign the HMAC message. zuul will send the
key to swift for you so you only need to define it here. If you do
not set a key zuul will generate one automatically.
**region_name** (optional)
The region name holding the swift container
``For example, SYD``
Each destination defined by the :ref:`jobs` will have the following
default values that it may overwrite.
**default_container** (optional)
Container name to place the log into
``For example, logs``
**default_expiry** (optional)
How long the signed destination should be available for
``default: 7200 (2hrs)``
**default_max_file_size** (optional)
The maximum size of an individual file
``default: 104857600 (100MB)``
**default_max_file_count** (optional)
The maximum number of separate files to allow
``default: 10``
**default_logserver_prefix**
Provide a URL to the CDN or logserver app so that a worker knows
what URL to return. The worker should return the logserver_prefix
url and the object path.
``For example: http://logs.example.org/server.app?obj=``
layout.yaml
~~~~~~~~~~~
@ -567,6 +620,9 @@ That kind of pipeline is nice to run regression or performance tests.
the ``ref-updated`` event which does include the commit sha1 (but lacks the
Gerrit change number).
.. _jobs:
Jobs
""""
@ -649,6 +705,39 @@ each job as it builds a list from the project specification.
be used to specify on what node (or class of node) the job should be
run.
**swift**
If :ref:`swift` is configured then each job can define a destination
container for the builder to place logs and/or assets into. Multiple
containers can be listed for each job by providing a unique ``name``.
*name*
Set an identifying name for the container. This is used in the
parameter key sent to the builder. For example if it ``logs`` then
one of the parameters sent will be ``SWIFT_logs_CONTAINER``
(case-sensitive).
Each of the defaults defined in :ref:`swift` can be overwritten as:
*container* (optional)
Container name to place the log into
``For example, logs``
*expiry* (optional)
How long the signed destination should be available for
*max_file_size** (optional)
The maximum size of an individual file
*max_file_count* (optional)
The maximum number of separate files to allow
*logserver_prefix*
Provide a URL to the CDN or logserver app so that a worker knows
what URL to return.
``For example: http://logs.example.org/server.app?obj=``
The worker should return the logserver_prefix url and the object
path as the URL in the results data packet.
Here is an example of setting the failure message for jobs that check
whether a change merges cleanly::

View File

@ -23,6 +23,15 @@ git_dir=/var/lib/zuul/git
;git_user_name=zuul
zuul_url=http://zuul.example.com/p
[swift]
authurl=https://identity.api.example.org/v2.0/
user=username
key=password
default_container=logs
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/
[smtp]
server=localhost
port=25

View File

@ -14,3 +14,5 @@ statsd>=1.0.0,<3.0
voluptuous>=0.7
gear>=0.5.4,<1.0.0
apscheduler>=2.1.1,<3.0
python-swiftclient>=1.6
python-keystoneclient>=0.4.2

59
tests/fixtures/layout-swift.yaml vendored Normal file
View File

@ -0,0 +1,59 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
- name: post
manager: IndependentPipelineManager
trigger:
gerrit:
- event: ref-updated
ref: ^(?!refs/).*$
- name: gate
manager: DependentPipelineManager
failure-message: Build failed. For information on how to proceed, see http://wiki.example.org/Test_Failures
trigger:
gerrit:
- event: comment-added
approval:
- approved: 1
success:
gerrit:
verified: 2
submit: true
failure:
gerrit:
verified: -2
start:
gerrit:
verified: 0
precedence: high
jobs:
- name: ^.*$
swift:
- name: logs
- name: ^.*-merge$
swift:
- name: logs
container: merge_logs
failure-message: Unable to merge change
- name: test-test
swift:
- name: MOSTLY
container: stash
projects:
- name: org/project
gate:
- test-merge
- test-test

29
tests/fixtures/layouts/bad_swift.yaml vendored Normal file
View File

@ -0,0 +1,29 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
jobs:
- name: ^.*$
swift:
- name: logs
- name: ^.*-merge$
swift:
container: merge_assets
failure-message: Unable to merge change
- name: test-test
swift:
projects:
- name: test-org/test
check:
- test-merge
- test-test

32
tests/fixtures/layouts/good_swift.yaml vendored Normal file
View File

@ -0,0 +1,32 @@
pipelines:
- name: check
manager: IndependentPipelineManager
trigger:
gerrit:
- event: patchset-created
success:
gerrit:
verified: 1
failure:
gerrit:
verified: -1
jobs:
- name: ^.*$
swift:
- name: logs
- name: ^.*-merge$
swift:
- name: assets
container: merge_assets
failure-message: Unable to merge change
- name: test-test
swift:
- name: mostly
container: stash
projects:
- name: test-org/test
check:
- test-merge
- test-test

View File

@ -22,3 +22,13 @@ server=localhost
port=25
default_from=zuul@example.com
default_to=you@example.com
[swift]
authurl=https://identity.api.example.org/v2.0/
user=username
key=password
tenant_name=" "
default_container=logs
region_name=EXP
logserver_prefix=http://logs.example.org/server.app/

View File

@ -30,6 +30,7 @@ import shutil
import socket
import string
import subprocess
import swiftclient
import threading
import time
import urllib
@ -47,6 +48,7 @@ import zuul.webapp
import zuul.rpclistener
import zuul.rpcclient
import zuul.launcher.gearman
import zuul.lib.swift
import zuul.merger.server
import zuul.merger.client
import zuul.reporter.gerrit
@ -743,6 +745,18 @@ class FakeSMTP(object):
return True
class FakeSwiftClientConnection(swiftclient.client.Connection):
def post_account(self, headers):
# Do nothing
pass
def get_auth(self):
# Returns endpoint and (unused) auth token
endpoint = os.path.join('https://storage.example.org', 'V1',
'AUTH_account')
return endpoint, ''
class TestScheduler(testtools.TestCase):
log = logging.getLogger("zuul.test")
@ -823,12 +837,18 @@ class TestScheduler(testtools.TestCase):
self.sched = zuul.scheduler.Scheduler()
self.useFixture(fixtures.MonkeyPatch('swiftclient.client.Connection',
FakeSwiftClientConnection))
self.swift = zuul.lib.swift.Swift(self.config)
def URLOpenerFactory(*args, **kw):
args = [self.fake_gerrit] + list(args)
return FakeURLOpener(self.upstream_root, *args, **kw)
urllib2.urlopen = URLOpenerFactory
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched)
self.launcher = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
self.merge_client = zuul.merger.client.MergeClient(
self.config, self.sched)
@ -3810,3 +3830,48 @@ For CI problems and help debugging, contact ci@example.org"""
self.assertEqual(1, len(self.smtp_messages))
self.assertEqual('The merge failed! For more information...',
self.smtp_messages[0]['body'])
def test_swift_instructions(self):
"Test that the correct swift instructions are sent to the workers"
self.config.set('zuul', 'layout_config',
'tests/fixtures/layout-swift.yaml')
self.sched.reconfigure(self.config)
self.registerJobs()
self.worker.hold_jobs_in_build = True
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
A.addApproval('CRVW', 2)
self.fake_gerrit.addEvent(A.addApproval('APRV', 1))
self.waitUntilSettled()
self.assertEqual(
"https://storage.example.org/V1/AUTH_account/merge_logs/1/1/1/"
"gate/test-merge/",
self.builds[0].parameters['SWIFT_logs_URL'][:-32])
self.assertEqual(5,
len(self.builds[0].parameters['SWIFT_logs_HMAC_BODY'].
split('\n')))
self.assertIn('SWIFT_logs_SIGNATURE', self.builds[0].parameters)
self.assertEqual(
"https://storage.example.org/V1/AUTH_account/logs/1/1/1/"
"gate/test-test/",
self.builds[1].parameters['SWIFT_logs_URL'][:-32])
self.assertEqual(5,
len(self.builds[1].parameters['SWIFT_logs_HMAC_BODY'].
split('\n')))
self.assertIn('SWIFT_logs_SIGNATURE', self.builds[1].parameters)
self.assertEqual(
"https://storage.example.org/V1/AUTH_account/stash/1/1/1/"
"gate/test-test/",
self.builds[1].parameters['SWIFT_MOSTLY_URL'][:-32])
self.assertEqual(5,
len(self.builds[1].
parameters['SWIFT_MOSTLY_HMAC_BODY'].split('\n')))
self.assertIn('SWIFT_MOSTLY_SIGNATURE', self.builds[1].parameters)
self.worker.hold_jobs_in_build = False
self.worker.release()
self.waitUntilSettled()

View File

@ -180,6 +180,7 @@ class Server(object):
import zuul.scheduler
import zuul.launcher.gearman
import zuul.merger.client
import zuul.lib.swift
import zuul.reporter.gerrit
import zuul.reporter.smtp
import zuul.trigger.gerrit
@ -195,8 +196,10 @@ class Server(object):
self.log = logging.getLogger("zuul.Server")
self.sched = zuul.scheduler.Scheduler()
self.swift = zuul.lib.swift.Swift(self.config)
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched)
gearman = zuul.launcher.gearman.Gearman(self.config, self.sched,
self.swift)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
gerrit = zuul.trigger.gerrit.Gerrit(self.config, self.sched)
timer = zuul.trigger.timer.Timer(self.config, self.sched)

View File

@ -16,6 +16,7 @@ import gear
import inspect
import json
import logging
import os
import time
import threading
from uuid import uuid4
@ -151,8 +152,10 @@ class Gearman(object):
log = logging.getLogger("zuul.Gearman")
negative_function_cache_ttl = 5
def __init__(self, config, sched):
def __init__(self, config, sched, swift):
self.config = config
self.sched = sched
self.swift = swift
self.builds = {}
self.meta_jobs = {} # A list of meta-jobs like stop or describe
@ -215,6 +218,50 @@ class Gearman(object):
self.log.debug("Function %s is not registered" % name)
return False
def updateBuildParams(self, job, item, params):
"""Allow the job to modify and add build parameters"""
# NOTE(jhesketh): The params need to stay in a key=value data pair
# as workers cannot necessarily handle lists.
if job.swift and self.swift.connection:
for name, s in job.swift.items():
swift_instructions = {}
s_config = {}
s_config.update((k, v.format(item=item, job=job,
change=item.change))
for k, v in s.items())
(swift_instructions['URL'],
swift_instructions['HMAC_BODY'],
swift_instructions['SIGNATURE']) = \
self.swift.generate_form_post_middleware_params(
params['LOG_PATH'], **s_config)
if 'logserver_prefix' in s_config:
swift_instructions['LOGSERVER_PREFIX'] = \
s_config['logserver_prefix']
elif self.config.has_option('swift',
'default_logserver_prefix'):
swift_instructions['LOGSERVER_PREFIX'] = \
s_config['logserver_prefix']
# Create a set of zuul instructions for each instruction-set
# given in the form of NAME_PARAMETER=VALUE
for key, value in swift_instructions.items():
params['_'.join(['SWIFT', name, key])] = value
if callable(job.parameter_function):
pargs = inspect.getargspec(job.parameter_function)
if len(pargs.args) == 2:
job.parameter_function(item, params)
else:
job.parameter_function(item, job, params)
self.log.debug("Custom parameter function used for job %s, "
"change: %s, params: %s" % (job, item.change,
params))
def launch(self, job, item, pipeline, dependent_items=[]):
self.log.info("Launch job %s for change %s with dependent changes %s" %
(job, item.change,
@ -252,6 +299,16 @@ class Gearman(object):
params['ZUUL_REF'] = item.change.ref
params['ZUUL_COMMIT'] = item.change.newrev
# The destination_path is a unqiue path for this build request
# and generally where the logs are expected to be placed
destination_path = os.path.join(item.change.getBasePath(),
pipeline.name, job.name, uuid)
params['BASE_LOG_PATH'] = item.change.getBasePath()
params['LOG_PATH'] = destination_path
# Allow the job to update the params
self.updateBuildParams(job, item, params)
# This is what we should be heading toward for parameters:
# required:
@ -273,16 +330,6 @@ class Gearman(object):
# ZUUL_OLDREV
# ZUUL_NEWREV
if callable(job.parameter_function):
pargs = inspect.getargspec(job.parameter_function)
if len(pargs.args) == 2:
job.parameter_function(item, params)
else:
job.parameter_function(item, job, params)
self.log.debug("Custom parameter function used for job %s, "
"change: %s, params: %s" % (job, item.change,
params))
if 'ZUUL_NODE' in params:
name = "build:%s:%s" % (job.name, params['ZUUL_NODE'])
else:

View File

@ -99,6 +99,14 @@ class LayoutSchema(object):
project_template = {v.Required('name'): str}
project_templates = [project_template]
swift = {v.Required('name'): str,
'container': str,
'expiry': int,
'max_file_size': int,
'max_file_count': int,
'logserver_prefix': int,
}
job = {v.Required('name'): str,
'failure-message': str,
'success-message': str,
@ -109,6 +117,7 @@ class LayoutSchema(object):
'parameter-function': str,
'branch': toList(str),
'files': toList(str),
'swift': toList(swift),
}
jobs = [job]

140
zuul/lib/swift.py Normal file
View File

@ -0,0 +1,140 @@
# Copyright 2014 Rackspace Australia
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hmac
from hashlib import sha1
from time import time
import os
import random
import string
import swiftclient
import urlparse
class Swift(object):
def __init__(self, config):
self.config = config
self.connection = False
if self.config.has_option('swift', 'X-Account-Meta-Temp-Url-Key'):
self.secure_key = self.config.get('swift',
'X-Account-Meta-Temp-Url-Key')
else:
self.secure_key = ''.join(
random.choice(string.ascii_uppercase + string.digits)
for x in range(20)
)
self.connect()
def connect(self):
if self.config.has_section('swift'):
# required
authurl = self.config.get('swift', 'authurl')
user = (self.config.get('swift', 'user')
if self.config.has_option('swift', 'user') else None)
key = (self.config.get('swift', 'key')
if self.config.has_option('swift', 'key') else None)
retries = (self.config.get('swift', 'retries')
if self.config.has_option('swift', 'retries') else 5)
preauthurl = (self.config.get('swift', 'preauthurl')
if self.config.has_option('swift', 'preauthurl')
else None)
preauthtoken = (self.config.get('swift', 'preauthtoken')
if self.config.has_option('swift', 'preauthtoken')
else None)
snet = (self.config.get('swift', 'snet')
if self.config.has_option('swift', 'snet') else False)
starting_backoff = (self.config.get('swift', 'starting_backoff')
if self.config.has_option('swift',
'starting_backoff')
else 1)
max_backoff = (self.config.get('swift', 'max_backoff')
if self.config.has_option('swift', 'max_backoff')
else 64)
tenant_name = (self.config.get('swift', 'tenant_name')
if self.config.has_option('swift', 'tenant_name')
else None)
auth_version = (self.config.get('swift', 'auth_version')
if self.config.has_option('swift', 'auth_version')
else 2.0)
cacert = (self.config.get('swift', 'cacert')
if self.config.has_option('swift', 'cacert') else None)
insecure = (self.config.get('swift', 'insecure')
if self.config.has_option('swift', 'insecure')
else False)
ssl_compression = (self.config.get('swift', 'ssl_compression')
if self.config.has_option('swift',
'ssl_compression')
else True)
available_os_options = ['tenant_id', 'auth_token', 'service_type',
'endpoint_type', 'tenant_name',
'object_storage_url', 'region_name']
os_options = {}
for os_option in available_os_options:
if self.config.has_option('swift', os_option):
os_options[os_option] = self.config.get('swift', os_option)
self.connection = swiftclient.client.Connection(
authurl=authurl, user=user, key=key, retries=retries,
preauthurl=preauthurl, preauthtoken=preauthtoken, snet=snet,
starting_backoff=starting_backoff, max_backoff=max_backoff,
tenant_name=tenant_name, os_options=os_options,
auth_version=auth_version, cacert=cacert, insecure=insecure,
ssl_compression=ssl_compression)
# Tell swift of our key
headers = {}
headers['X-Account-Meta-Temp-Url-Key'] = self.secure_key
self.connection.post_account(headers)
self.storage_url, self.auth_token = self.connection.get_auth()
def generate_form_post_middleware_params(self, destination_prefix='',
**kwargs):
"""Generate the FormPost middleware params for the given settings"""
# Define the available settings and their defaults
settings = {
'container': '',
'expiry': 7200,
'max_file_size': 104857600,
'max_file_count': 10,
'file_path_prefix': ''
}
for key, default in settings.iteritems():
if key in kwargs:
settings[key] = kwargs[key]
elif self.config.has_option('swift', 'default_' + key):
settings[key] = self.config.get('swift', 'default_' + key)
expires = int(time() + settings['expiry'])
redirect = ''
url = os.path.join(self.storage_url, settings['container'],
settings['file_path_prefix'],
destination_prefix)
u = urlparse.urlparse(url)
hmac_body = '%s\n%s\n%s\n%s\n%s' % (u.path, redirect,
settings['max_file_size'],
settings['max_file_count'],
expires)
signature = hmac.new(self.secure_key, hmac_body, sha1).hexdigest()
return url, hmac_body, signature

View File

@ -537,6 +537,7 @@ class Job(object):
self._branches = []
self.files = []
self._files = []
self.swift = {}
def __str__(self):
return self.name
@ -561,6 +562,8 @@ class Job(object):
if other.files:
self.files = other.files[:]
self._files = other._files[:]
if other.swift:
self.swift.update(other.swift)
self.hold_following_changes = other.hold_following_changes
self.voting = other.voting
@ -763,6 +766,16 @@ class Changeish(object):
def __init__(self, project):
self.project = project
def getBasePath(self):
base_path = ''
if hasattr(self, 'refspec'):
base_path = "%s/%s/%s" % (
self.number[-2:], self.number, self.patchset)
elif hasattr(self, 'ref'):
base_path = "%s/%s" % (self.newrev[:2], self.newrev)
return base_path
def equals(self, other):
raise NotImplementedError()

View File

@ -347,6 +347,10 @@ class Scheduler(threading.Thread):
if files:
job._files = files
job.files = [re.compile(x) for x in files]
swift = toList(config_job.get('swift'))
if swift:
for s in swift:
job.swift[s['name']] = s
def add_jobs(job_tree, config_jobs):
for job in config_jobs: