The Gatekeeper, or a project gating system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

4096 lines
151 KiB

# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import configparser
from contextlib import contextmanager
import datetime
import errno
import gc
import hashlib
from io import StringIO
import itertools
import json
import logging
import os
import queue
import random
import re
import requests
import select
import shutil
import socket
import string
import subprocess
import sys
import tempfile
import threading
import traceback
import time
import uuid
import socketserver
import http.server
import git
import gear
import fixtures
import kazoo.client
import kazoo.exceptions
import pymysql
import psycopg2
import psycopg2.extensions
import testtools
import testtools.content
import testtools.content_type
from git.exc import NoSuchPathError
import yaml
import paramiko
import tests.fakegithub
import zuul.driver.gerrit.gerritsource as gerritsource
import zuul.driver.gerrit.gerritconnection as gerritconnection
import zuul.driver.github.githubconnection as githubconnection
import zuul.driver.pagure.pagureconnection as pagureconnection
import zuul.driver.github
import zuul.driver.sql
import zuul.scheduler
import zuul.executor.server
import zuul.executor.client
import zuul.lib.ansible
import zuul.lib.connections
import zuul.merger.client
import zuul.merger.merger
import zuul.merger.server
import zuul.model
import zuul.nodepool
import zuul.rpcclient
import zuul.zk
import zuul.configloader
from zuul.exceptions import MergeFailure
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
FIXTURE_DIR = os.path.join(os.path.dirname(__file__),
'fixtures')
KEEP_TEMPDIRS = bool(os.environ.get('KEEP_TEMPDIRS', False))
def repack_repo(path):
cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
output = subprocess.Popen(cmd, close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode:
raise Exception("git repack returned %d" % output.returncode)
return out
def random_sha1():
return hashlib.sha1(str(random.random()).encode('ascii')).hexdigest()
def iterate_timeout(max_seconds, purpose):
start = time.time()
count = 0
while (time.time() < start + max_seconds):
count += 1
yield count
time.sleep(0.01)
raise Exception("Timeout waiting for %s" % purpose)
def simple_layout(path, driver='gerrit'):
"""Specify a layout file for use by a test method.
:arg str path: The path to the layout file.
:arg str driver: The source driver to use, defaults to gerrit.
Some tests require only a very simple configuration. For those,
establishing a complete config directory hierachy is too much
work. In those cases, you can add a simple zuul.yaml file to the
test fixtures directory (in fixtures/layouts/foo.yaml) and use
this decorator to indicate the test method should use that rather
than the tenant config file specified by the test class.
The decorator will cause that layout file to be added to a
config-project called "common-config" and each "project" instance
referenced in the layout file will have a git repo automatically
initialized.
"""
def decorator(test):
test.__simple_layout__ = (path, driver)
return test
return decorator
def never_capture():
"""Never capture logs/output
Due to high volume, log files are normally captured and attached
to the subunit stream only on error. This can make diagnosing
some problems difficult. Use this dectorator on a test to
indicate that logs and output should not be captured.
"""
def decorator(test):
test.__never_capture__ = True
return test
return decorator
class FakeAnsibleManager(zuul.lib.ansible.AnsibleManager):
def validate(self):
return True
def copyAnsibleFiles(self):
pass
class GerritChangeReference(git.Reference):
_common_path_default = "refs/changes"
_points_to_commits_only = True
class FakeGerritChange(object):
categories = {'Approved': ('Approved', -1, 1),
'Code-Review': ('Code-Review', -2, 2),
'Verified': ('Verified', -2, 2)}
def __init__(self, gerrit, number, project, branch, subject,
status='NEW', upstream_root=None, files={},
parent=None):
self.gerrit = gerrit
self.source = gerrit
self.reported = 0
self.queried = 0
self.patchsets = []
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.latest_patchset = 0
self.depends_on_change = None
self.needed_by_changes = []
self.fail_merge = False
self.messages = []
self.comments = []
self.data = {
'branch': branch,
'comments': self.comments,
'commitMessage': subject,
'createdOn': time.time(),
'id': 'I' + random_sha1(),
'lastUpdated': time.time(),
'number': str(number),
'open': status == 'NEW',
'owner': {'email': 'user@example.com',
'name': 'User Name',
'username': 'username'},
'patchSets': self.patchsets,
'project': project,
'status': status,
'subject': subject,
'submitRecords': [],
'url': '%s/%s' % (self.gerrit.baseurl.rstrip('/'), number)}
self.upstream_root = upstream_root
self.addPatchset(files=files, parent=parent)
self.data['submitRecords'] = self.getSubmitRecords()
self.open = status == 'NEW'
def addFakeChangeToRepo(self, msg, files, large, parent):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
if parent is None:
parent = 'refs/tags/init'
ref = GerritChangeReference.create(
repo, '1/%s/%s' % (self.number, self.latest_patchset),
parent)
repo.head.reference = ref
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
path = os.path.join(self.upstream_root, self.project)
if not large:
for fn, content in files.items():
fn = os.path.join(path, fn)
if content is None:
os.unlink(fn)
repo.index.remove([fn])
else:
d = os.path.dirname(fn)
if not os.path.exists(d):
os.makedirs(d)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
else:
for fni in range(100):
fn = os.path.join(path, str(fni))
f = open(fn, 'w')
for ci in range(4096):
f.write(random.choice(string.printable))
f.close()
repo.index.add([fn])
r = repo.index.commit(msg)
repo.head.reference = 'master'
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
return r
def addPatchset(self, files=None, large=False, parent=None):
self.latest_patchset += 1
if not files:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
data = ("test %s %s %s\n" %
(self.branch, self.number, self.latest_patchset))
files = {fn: data}
msg = self.subject + '-' + str(self.latest_patchset)
c = self.addFakeChangeToRepo(msg, files, large, parent)
ps_files = [{'file': '/COMMIT_MSG',
'type': 'ADDED'},
{'file': 'README',
'type': 'MODIFIED'}]
for f in files:
ps_files.append({'file': f, 'type': 'ADDED'})
d = {'approvals': [],
'createdOn': time.time(),
'files': ps_files,
'number': str(self.latest_patchset),
'ref': 'refs/changes/1/%s/%s' % (self.number,
self.latest_patchset),
'revision': c.hexsha,
'uploader': {'email': 'user@example.com',
'name': 'User name',
'username': 'user'}}
self.data['currentPatchSet'] = d
self.patchsets.append(d)
self.data['submitRecords'] = self.getSubmitRecords()
def addComment(self, filename, line, message, name, email, username,
comment_range=None):
comment = {
'file': filename,
'line': int(line),
'reviewer': {
'name': name,
'email': email,
'username': username,
},
'message': message,
}
if comment_range:
comment['range'] = comment_range
self.comments.append(comment)
def getPatchsetCreatedEvent(self, patchset):
event = {"type": "patchset-created",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"uploader": {"name": "User Name"}}
return event
def getChangeRestoredEvent(self):
event = {"type": "change-restored",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"restorer": {"name": "User Name"},
"patchSet": self.patchsets[-1],
"reason": ""}
return event
def getChangeAbandonedEvent(self):
event = {"type": "change-abandoned",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"abandoner": {"name": "User Name"},
"patchSet": self.patchsets[-1],
"reason": ""}
return event
def getChangeCommentEvent(self, patchset):
event = {"type": "comment-added",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"author": {"name": "User Name"},
"approvals": [{"type": "Code-Review",
"description": "Code-Review",
"value": "0"}],
"comment": "This is a comment"}
return event
def getChangeMergedEvent(self):
event = {"submitter": {"name": "Jenkins",
"username": "jenkins"},
"newRev": "29ed3b5f8f750a225c5be70235230e3a6ccb04d9",
"patchSet": self.patchsets[-1],
"change": self.data,
"type": "change-merged",
"eventCreatedOn": 1487613810}
return event
def getRefUpdatedEvent(self):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
oldrev = repo.heads[self.branch].commit.hexsha
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": oldrev,
"newRev": self.patchsets[-1]['revision'],
"refName": self.branch,
"project": self.project,
}
}
return event
def addApproval(self, category, value, username='reviewer_john',
granted_on=None, message=''):
if not granted_on:
granted_on = time.time()
approval = {
'description': self.categories[category][0],
'type': category,
'value': str(value),
'by': {
'username': username,
'email': username + '@example.com',
},
'grantedOn': int(granted_on)
}
for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
if x['by']['username'] == username and x['type'] == category:
del self.patchsets[-1]['approvals'][i]
self.patchsets[-1]['approvals'].append(approval)
event = {'approvals': [approval],
'author': {'email': 'author@example.com',
'name': 'Patchset Author',
'username': 'author_phil'},
'change': {'branch': self.branch,
'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
'number': str(self.number),
'owner': {'email': 'owner@example.com',
'name': 'Change Owner',
'username': 'owner_jane'},
'project': self.project,
'subject': self.subject,
'topic': 'master',
'url': 'https://hostname/459'},
'comment': message,
'patchSet': self.patchsets[-1],
'type': 'comment-added'}
self.data['submitRecords'] = self.getSubmitRecords()
return json.loads(json.dumps(event))
def getSubmitRecords(self):
status = {}
for cat in self.categories:
status[cat] = 0
for a in self.patchsets[-1]['approvals']:
cur = status[a['type']]
cat_min, cat_max = self.categories[a['type']][1:]
new = int(a['value'])
if new == cat_min:
cur = new
elif abs(new) > abs(cur):
cur = new
status[a['type']] = cur
labels = []
ok = True
for typ, cat in self.categories.items():
cur = status[typ]
cat_min, cat_max = cat[1:]
if cur == cat_min:
value = 'REJECT'
ok = False
elif cur == cat_max:
value = 'OK'
else:
value = 'NEED'
ok = False
labels.append({'label': cat[0], 'status': value})
if ok:
return [{'status': 'OK'}]
return [{'status': 'NOT_READY',
'labels': labels}]
def setDependsOn(self, other, patchset):
self.depends_on_change = other
d = {'id': other.data['id'],
'number': other.data['number'],
'ref': other.patchsets[patchset - 1]['ref']
}
self.data['dependsOn'] = [d]
other.needed_by_changes.append(self)
needed = other.data.get('neededBy', [])
d = {'id': self.data['id'],
'number': self.data['number'],
'ref': self.patchsets[-1]['ref'],
'revision': self.patchsets[-1]['revision']
}
needed.append(d)
other.data['neededBy'] = needed
def query(self):
self.queried += 1
d = self.data.get('dependsOn')
if d:
d = d[0]
if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
d['isCurrentPatchSet'] = True
else:
d['isCurrentPatchSet'] = False
return json.loads(json.dumps(self.data))
def setMerged(self):
if (self.depends_on_change and
self.depends_on_change.data['status'] != 'MERGED'):
return
if self.fail_merge:
return
self.data['status'] = 'MERGED'
self.open = False
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
repo.heads[self.branch].commit = \
repo.commit(self.patchsets[-1]['revision'])
def setReported(self):
self.reported += 1
class GerritWebServer(object):
def __init__(self, fake_gerrit):
super(GerritWebServer, self).__init__()
self.fake_gerrit = fake_gerrit
def start(self):
fake_gerrit = self.fake_gerrit
class Server(http.server.SimpleHTTPRequestHandler):
log = logging.getLogger("zuul.test.FakeGerritConnection")
review_re = re.compile('/a/changes/(.*?)/revisions/(.*?)/review')
submit_re = re.compile('/a/changes/(.*?)/submit')
def do_POST(self):
path = self.path
self.log.debug("Got POST %s", path)
data = self.rfile.read(int(self.headers['Content-Length']))
data = json.loads(data.decode('utf-8'))
self.log.debug("Got data %s", data)
m = self.review_re.match(path)
if m:
return self.review(m.group(1), m.group(2), data)
m = self.submit_re.match(path)
if m:
return self.submit(m.group(1), data)
self.send_response(500)
self.end_headers()
def _404(self):
self.send_response(404)
self.end_headers()
def _get_change(self, change_id):
project, branch, change = change_id.split('~')
for c in fake_gerrit.changes.values():
if c.data['id'] == change:
return c
def review(self, change_id, revision, data):
change = self._get_change(change_id)
if not change:
return self._404()
message = data['message']
action = data['labels']
comments = data.get('comments', {})
fake_gerrit._test_handle_review(
int(change.data['number']), message, action, comments)
self.send_response(200)
self.end_headers()
def submit(self, change_id, data):
change = self._get_change(change_id)
if not change:
return self._404()
message = None
action = {'submit': True}
fake_gerrit._test_handle_review(
int(change.data['number']), message, action)
self.send_response(200)
self.end_headers()
def log_message(self, fmt, *args):
self.log.debug(fmt, *args)
self.httpd = socketserver.ThreadingTCPServer(('', 0), Server)
self.port = self.httpd.socket.getsockname()[1]
self.thread = threading.Thread(name='GerritWebServer',
target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.httpd.shutdown()
self.thread.join()
class FakeGerritConnection(gerritconnection.GerritConnection):
"""A Fake Gerrit connection for use in tests.
This subclasses
:py:class:`~zuul.connection.gerrit.GerritConnection` to add the
ability for tests to add changes to the fake Gerrit it represents.
"""
log = logging.getLogger("zuul.test.FakeGerritConnection")
def __init__(self, driver, connection_name, connection_config,
changes_db=None, upstream_root=None):
if connection_config.get('password'):
self.web_server = GerritWebServer(self)
self.web_server.start()
url = 'http://localhost:%s' % self.web_server.port
connection_config['baseurl'] = url
else:
self.web_server = None
super(FakeGerritConnection, self).__init__(driver, connection_name,
connection_config)
self.event_queue = queue.Queue()
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_db
self.queries = []
self.upstream_root = upstream_root
def addFakeChange(self, project, branch, subject, status='NEW',
files=None, parent=None):
"""Add a change to the fake Gerrit."""
self.change_number += 1
c = FakeGerritChange(self, self.change_number, project, branch,
subject, upstream_root=self.upstream_root,
status=status, files=files, parent=parent)
self.changes[self.change_number] = c
return c
def addFakeTag(self, project, branch, tag):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
commit = repo.heads[branch].commit
newrev = commit.hexsha
ref = 'refs/tags/' + tag
git.Tag.create(repo, tag, commit)
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": 40 * '0',
"newRev": newrev,
"refName": ref,
"project": project,
}
}
return event
def getFakeBranchCreatedEvent(self, project, branch):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
oldrev = 40 * '0'
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": oldrev,
"newRev": repo.heads[branch].commit.hexsha,
"refName": 'refs/heads/' + branch,
"project": project,
}
}
return event
def getFakeBranchDeletedEvent(self, project, branch):
oldrev = '4abd38457c2da2a72d4d030219ab180ecdb04bf0'
newrev = 40 * '0'
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": oldrev,
"newRev": newrev,
"refName": 'refs/heads/' + branch,
"project": project,
}
}
return event
def review(self, change, message, action={}, file_comments={},
zuul_event_id=None):
if self.web_server:
return super(FakeGerritConnection, self).review(
change, message, action, file_comments)
self._test_handle_review(int(change.number), message, action)
def _test_handle_review(self, change_number, message, action,
file_comments=None):
# Handle a review action from a test
change = self.changes[change_number]
# Add the approval back onto the change (ie simulate what gerrit would
# do).
# Usually when zuul leaves a review it'll create a feedback loop where
# zuul's review enters another gerrit event (which is then picked up by
# zuul). However, we can't mimic this behaviour (by adding this
# approval event into the queue) as it stops jobs from checking what
# happens before this event is triggered. If a job needs to see what
# happens they can add their own verified event into the queue.
# Nevertheless, we can update change with the new review in gerrit.
for cat in action:
if cat != 'submit':
change.addApproval(cat, action[cat], username=self.user)
if message:
change.messages.append(message)
if file_comments:
for filename, commentlist in file_comments.items():
for comment in commentlist:
change.addComment(filename, comment['line'],
comment['message'], 'Zuul',
'zuul@example.com', self.user,
comment.get('range'))
if 'submit' in action:
change.setMerged()
if message:
change.setReported()
def query(self, number, event=None):
if type(number) == int:
return self.queryChange(number, event=event)
raise Exception("Could not query %s %s" % (type(number, number)))
def queryChange(self, number, event=None):
change = self.changes.get(int(number))
if change:
return change.query()
return {}
def _simpleQuery(self, query):
# the query can be in parenthesis so strip them if needed
if query.startswith('('):
query = query[1:-1]
if query.startswith('change:'):
# Query a specific changeid
changeid = query[len('change:'):]
l = [change.query() for change in self.changes.values()
if (change.data['id'] == changeid or
change.data['number'] == changeid)]
elif query.startswith('message:'):
# Query the content of a commit message
msg = query[len('message:'):].strip()
# Remove quoting if it is there
if msg.startswith('{') and msg.endswith('}'):
msg = msg[1:-1]
l = [change.query() for change in self.changes.values()
if msg in change.data['commitMessage']]
else:
# Query all open changes
l = [change.query() for change in self.changes.values()]
return l
def simpleQuery(self, query, event=None):
log = get_annotated_logger(self.log, event)
log.debug("simpleQuery: %s", query)
self.queries.append(query)
results = []
if query.startswith('(') and 'OR' in query:
query = query[1:-1]
for q in query.split(' OR '):
for r in self._simpleQuery(q):
if r not in results:
results.append(r)
else:
results = self._simpleQuery(query)
return results
def _start_watcher_thread(self, *args, **kw):
pass
def _uploadPack(self, project):
ret = ('00a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
'multi_ack thin-pack side-band side-band-64k ofs-delta '
'shallow no-progress include-tag multi_ack_detailed no-done\n')
path = os.path.join(self.upstream_root, project.name)
repo = git.Repo(path)
for ref in repo.refs:
if ref.path.endswith('.lock'):
# don't treat lockfiles as ref
continue
r = ref.object.hexsha + ' ' + ref.path + '\n'
ret += '%04x%s' % (len(r) + 4, r)
ret += '0000'
return ret
def getGitUrl(self, project):
return 'file://' + os.path.join(self.upstream_root, project.name)
class PagureChangeReference(git.Reference):
_common_path_default = "refs/pull"
_points_to_commits_only = True
class FakePagurePullRequest(object):
log = logging.getLogger("zuul.test.FakePagurePullRequest")
def __init__(self, pagure, number, project, branch,
subject, upstream_root, files=[], number_of_commits=1,
initial_comment=None):
self.pagure = pagure
self.source = pagure
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.upstream_root = upstream_root
self.number_of_commits = 0
self.status = 'Open'
self.initial_comment = initial_comment
self.uuid = uuid.uuid4().hex
self.comments = []
self.flags = []
self.files = {}
self.cached_merge_status = ''
self.threshold_reached = False
self.commit_stop = None
self.commit_start = None
self.threshold_reached = False
self.upstream_root = upstream_root
self.cached_merge_status = 'MERGE'
self.url = "https://%s/%s/pull-request/%s" % (
self.pagure.server, self.project, self.number)
self.is_merged = False
self.pr_ref = self._createPRRef()
self._addCommitInPR(files=files)
self._updateTimeStamp()
def _getPullRequestEvent(self, action):
name = 'pg_pull_request'
data = {
'msg': {
'pullrequest': {
'branch': self.branch,
'comments': self.comments,
'commit_start': self.commit_start,
'commit_stop': self.commit_stop,
'date_created': '0',
'id': self.number,
'project': {
'fullname': self.project,
},
'status': self.status,
'subject': self.subject,
'uid': self.uuid,
}
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': action
}
if action == 'pull-request.flag.added':
data['msg']['flag'] = self.flags[0]
return (name, data)
def getPullRequestOpenedEvent(self):
return self._getPullRequestEvent('pull-request.new')
def getPullRequestUpdatedEvent(self):
self._addCommitInPR()
self.addComment(
"**1 new commit added**\n\n * ``Bump``\n",
True)
return self._getPullRequestEvent('pull-request.comment.added')
def getPullRequestCommentedEvent(self, message):
self.addComment(message)
return self._getPullRequestEvent('pull-request.comment.added')
def getPullRequestStatusSetEvent(self, status):
self.addFlag(
status, "https://url", "Build %s" % status)
return self._getPullRequestEvent('pull-request.flag.added')
def addFlag(self, status, url, comment, username="Pingou"):
flag = {
"username": username,
"comment": comment,
"status": status,
"url": url
}
self.flags.insert(0, flag)
self._updateTimeStamp()
def editInitialComment(self, initial_comment):
self.initial_comment = initial_comment
self._updateTimeStamp()
def addComment(self, message, notification=False, fullname=None):
self.comments.append({
'comment': message,
'notification': notification,
'date_created': str(int(time.time())),
'user': {
'fullname': fullname or 'Pingou'
}}
)
self._updateTimeStamp()
def getPRReference(self):
return '%s/head' % self.number
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createPRRef(self):
repo = self._getRepo()
return PagureChangeReference.create(
repo, self.getPRReference(), 'refs/tags/init')
def addCommit(self, files=[]):
"""Adds a commit on top of the actual PR head."""
self._addCommitInPR(files=files)
self._updateTimeStamp()
def forcePush(self, files=[]):
"""Clears actual commits and add a commit on top of the base."""
self._addCommitInPR(files=files, reset=True)
self._updateTimeStamp()
def _addCommitInPR(self, files=[], reset=False):
repo = self._getRepo()
ref = repo.references[self.getPRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
repo.git.clean('-x', '-f', '-d')
if files:
self.files = files
else:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
self.files = {fn: "test %s %s\n" % (self.branch, self.number)}
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
self.commit_stop = repo.index.commit(msg).hexsha
if not self.commit_start:
self.commit_start = self.commit_stop
repo.create_head(self.getPRReference(), self.commit_stop, force=True)
self.pr_ref.set_commit(self.commit_stop)
repo.head.reference = 'master'
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
def _updateTimeStamp(self):
self.last_updated = str(int(time.time()))
class FakePagureAPIClient(pagureconnection.PagureAPIClient):
log = logging.getLogger("zuul.test.FakePagureAPIClient")
def __init__(self, baseurl, api_token, project,
token_exp_date=None, pull_requests_db={}):
super(FakePagureAPIClient, self).__init__(
baseurl, api_token, project, token_exp_date)
self.session = None
self.pull_requests = pull_requests_db
def gen_error(self):
return {
'error': 'some error',
'error_code': 'some error code'
}
def _get_pr(self, match):
project, number = match.groups()
pr = self.pull_requests.get(project, {}).get(number)
if not pr:
return self.gen_error()
return pr
def get(self, url):
self.log.debug("Getting resource %s ..." % url)
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)$', url)
if match:
pr = self._get_pr(match)
return {
'branch': pr.branch,
'subject': pr.subject,
'status': pr.status,
'initial_comment': pr.initial_comment,
'last_updated': pr.last_updated,
'comments': pr.comments,
'commit_stop': pr.commit_stop,
'threshold_reached': pr.threshold_reached,
'cached_merge_status': pr.cached_merge_status
}
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/flag$', url)
if match:
pr = self._get_pr(match)
return {'flags': pr.flags}
match = re.match('.+/api/0/(.+)/git/branches$', url)
if match:
# project = match.groups()[0]
return {'branches': ['master']}
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/diffstats$', url)
if match:
pr = self._get_pr(match)
return pr.files
def post(self, url, params=None):
self.log.info(
"Posting on resource %s, params (%s) ..." % (url, params))
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/merge$', url)
if match:
pr = self._get_pr(match)
pr.status = 'Merged'
pr.is_merged = True
if not params:
return self.gen_error()
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/flag$', url)
if match:
pr = self._get_pr(match)
pr.flags.insert(0, params)
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/comment$', url)
if match:
pr = self._get_pr(match)
pr.addComment(params['comment'])
class FakePagureConnection(pagureconnection.PagureConnection):
log = logging.getLogger("zuul.test.FakePagureConnection")
def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None):
super(FakePagureConnection, self).__init__(driver, connection_name,
connection_config)
self.connection_name = connection_name
self.pr_number = 0
self.pull_requests = changes_db
self.statuses = {}
self.upstream_root = upstream_root
self.reports = []
self.rpcclient = rpcclient
self.cloneurl = self.upstream_root
def _refresh_project_connectors(self, project):
connector = self.connectors.setdefault(
project, {'api_client': None, 'webhook_token': None})
api_token_exp_date = int(time.time()) + 60 * 24 * 3600
connector['api_client'] = FakePagureAPIClient(
self.baseurl, "fake_api_token-%s" % project, project,
token_exp_date=api_token_exp_date,
pull_requests_db=self.pull_requests)
connector['webhook_token'] = "fake_webhook_token-%s" % project
return connector
def emitEvent(self, event, use_zuulweb=False, project=None):
name, payload = event
secret = 'fake_webhook_token-%s' % project
if use_zuulweb:
payload = json.dumps(payload).encode('utf-8')
signature, _ = pagureconnection._sign_request(payload, secret)
headers = {'x-pagure-signature': signature,
'x-pagure-project': project}
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
job = self.rpcclient.submitJob(
'pagure:%s:payload' % self.connection_name,
{'payload': payload})
return json.loads(job.data[0])
def openFakePullRequest(self, project, branch, subject, files=[],
initial_comment=None):
self.pr_number += 1
pull_request = FakePagurePullRequest(
self, self.pr_number, project, branch, subject, self.upstream_root,
files=files, initial_comment=initial_comment)
self.pull_requests.setdefault(
project, {})[str(self.pr_number)] = pull_request
return pull_request
def getGitReceiveEvent(self, project):
name = 'pg_push'
repo_path = os.path.join(self.upstream_root, project)
repo = git.Repo(repo_path)
headsha = repo.head.commit.hexsha
data = {
'msg': {
'project_fullname': project,
'branch': 'master',
'stop_commit': headsha,
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.receive',
}
return (name, data)
def setZuulWebPort(self, port):
self.zuul_web_port = port
class GithubChangeReference(git.Reference):
_common_path_default = "refs/pull"
_points_to_commits_only = True
class FakeGithubPullRequest(object):
def __init__(self, github, number, project, branch,
subject, upstream_root, files=[], number_of_commits=1,
writers=[], body=None):
"""Creates a new PR with several commits.
Sends an event about opened PR."""
self.github = github
self.source = github
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.body = body
self.number_of_commits = 0
self.upstream_root = upstream_root
self.files = []
self.comments = []
self.labels = []
self.statuses = {}
self.reviews = []
self.writers = []
self.admins = []
self.updated_at = None
self.head_sha = None
self.is_merged = False
self.merge_message = None
self.state = 'open'
self.url = 'https://%s/%s/pull/%s' % (github.server, project, number)
self._createPRRef()
self._addCommitToRepo(files=files)
self._updateTimeStamp()
def addCommit(self, files=[]):
"""Adds a commit on top of the actual PR head."""
self._addCommitToRepo(files=files)
self._updateTimeStamp()
def forcePush(self, files=[]):
"""Clears actual commits and add a commit on top of the base."""
self._addCommitToRepo(files=files, reset=True)
self._updateTimeStamp()
def getPullRequestOpenedEvent(self):
return self._getPullRequestEvent('opened')
def getPullRequestSynchronizeEvent(self):
return self._getPullRequestEvent('synchronize')
def getPullRequestReopenedEvent(self):
return self._getPullRequestEvent('reopened')
def getPullRequestClosedEvent(self):
return self._getPullRequestEvent('closed')
def getPullRequestEditedEvent(self):
return self._getPullRequestEvent('edited')
def addComment(self, message):
self.comments.append(message)
self._updateTimeStamp()
def getIssueCommentAddedEvent(self, text):
name = 'issue_comment'
data = {
'action': 'created',
'issue': {
'number': self.number
},
'comment': {
'body': text
},
'repository': {
'full_name': self.project
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def getCommentAddedEvent(self, text):
name, data = self.getIssueCommentAddedEvent(text)
# A PR comment has an additional 'pull_request' key in the issue data
data['issue']['pull_request'] = {
'url': 'http://%s/api/v3/repos/%s/pull/%s' % (
self.github.server, self.project, self.number)
}
return (name, data)
def getReviewAddedEvent(self, review):
name = 'pull_request_review'
data = {
'action': 'submitted',
'pull_request': {
'number': self.number,
'title': self.subject,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha
}
},
'review': {
'state': review
},
'repository': {
'full_name': self.project
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def addLabel(self, name):
if name not in self.labels:
self.labels.append(name)
self._updateTimeStamp()
return self._getLabelEvent(name)
def removeLabel(self, name):
if name in self.labels:
self.labels.remove(name)
self._updateTimeStamp()
return self._getUnlabelEvent(name)
def _getLabelEvent(self, label):
name = 'pull_request'
data = {
'action': 'labeled',
'pull_request': {
'number': self.number,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha
}
},
'label': {
'name': label
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def _getUnlabelEvent(self, label):
name = 'pull_request'
data = {
'action': 'unlabeled',
'pull_request': {
'number': self.number,
'title': self.subject,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha,
'repo': {
'full_name': self.project
}
}
},
'label': {
'name': label
},
'sender': {
'login': 'ghuser'
}
}
return (name, data)
def editBody(self, body):
self.body = body
self._updateTimeStamp()
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createPRRef(self):
repo = self._getRepo()
GithubChangeReference.create(
repo, self.getPRReference(), 'refs/tags/init')
def _addCommitToRepo(self, files=[], reset=False):
repo = self._getRepo()
ref = repo.references[self.getPRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
if files:
self.files = files
else:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
self.files = {fn: "test %s %s\n" % (self.branch, self.number)}
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
self.head_sha = repo.index.commit(msg).hexsha
repo.create_head(self.getPRReference(), self.head_sha, force=True)
# Create an empty set of statuses for the given sha,
# each sha on a PR may have a status set on it
self.statuses[self.head_sha] = []
repo.head.reference = 'master'
zuul.merger.merger.reset_repo_to_head(repo)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
def _updateTimeStamp(self):
self.updated_at = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.localtime())
def getPRHeadSha(self):
repo = self._getRepo()
return repo.references[self.getPRReference()].commit.hexsha
def addReview(self, user, state, granted_on=None):
gh_time_format = '%Y-%m-%dT%H:%M:%SZ'
# convert the timestamp to a str format that would be returned
# from github as 'submitted_at' in the API response
if granted_on:
granted_on = datetime.datetime.utcfromtimestamp(granted_on)
submitted_at = time.strftime(
gh_time_format, granted_on.timetuple())
else:
# github timestamps only down to the second, so we need to make
# sure reviews that tests add appear to be added over a period of
# time in the past and not all at once.
if not self.reviews:
# the first review happens 10 mins ago
offset = 600
else:
# subsequent reviews happen 1 minute closer to now
offset = 600 - (len(self.reviews) * 60)
granted_on = datetime.datetime.utcfromtimestamp(
time.time() - offset)
submitted_at = time.strftime(
gh_time_format, granted_on.timetuple())
self.reviews.append(tests.fakegithub.FakeGHReview({
'state': state,
'user': {
'login': user,
'email': user + "@derp.com",
},
'submitted_at': submitted_at,
}))
def getPRReference(self):
return '%s/head' % self.number
def _getPullRequestEvent(self, action):
name = 'pull_request'
data = {
'action': action,
'number': self.number,
'pull_request': {
'number': self.number,
'title': self.subject,
'updated_at': self.updated_at,
'base': {
'ref': self.branch,
'repo': {
'full_name': self.project
}
},
'head': {
'sha': self.head_sha,
'repo': {
'full_name': self.project
}
},
'body': self.body
},
'sender': {
'login': 'ghuser'
},
'labels': [{'name': l} for l in self.labels]
}
return (name, data)
def getCommitStatusEvent(self, context, state='success', user='zuul'):
name = 'status'
data = {
'state': state,
'sha': self.head_sha,
'name': self.project,
'description': 'Test results for %s: %s' % (self.head_sha, state),
'target_url': 'http://zuul/%s' % self.head_sha,
'branches': [],
'context': context,
'sender': {
'login': user
}
}
return (name, data)
def setMerged(self, commit_message):
self.is_merged = True
self.merge_message = commit_message
repo = self._getRepo()
repo.heads[self.branch].commit = repo.commit(self.head_sha)
class FakeGithubConnection(githubconnection.GithubConnection):
log = logging.getLogger("zuul.test.FakeGithubConnection")
def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None, git_url_with_auth=False):
super(FakeGithubConnection, self).__init__(driver, connection_name,
connection_config)
self.connection_name = connection_name
self.pr_number = 0
self.pull_requests = changes_db
self.statuses = {}
self.upstream_root = upstream_root
self.merge_failure = False
self.merge_not_allowed_count = 0
self.reports = []
self.github_data = tests.fakegithub.FakeGithubData(changes_db)
self.recorded_clients = []
self.git_url_with_auth = git_url_with_auth
self.rpcclient = rpcclient
self.record_clients = False
def getGithubClient(self,
project=None,
user_id=None,
zuul_event_id=None):
if self.app_id:
inst_id = self.installation_map.get(project)
client = tests.fakegithub.FakeGithubClient(
self.github_data, inst_id=inst_id)
else:
client = tests.fakegithub.FakeGithubClient(self.github_data)
if self.record_clients:
self.recorded_clients.append(client)
return client
def _prime_installation_map(self):
if not self.app_id:
return
# simulate one installation per org
orgs = {}
latest_inst_id = 0
for repo in self.github_data.repos:
inst_id = orgs.get(repo[0])
if not inst_id:
latest_inst_id += 1
inst_id = latest_inst_id
orgs[repo[0]] = inst_id
self.installation_map['/'.join(repo)] = inst_id
def setZuulWebPort(self, port):
self.zuul_web_port = port
def openFakePullRequest(self, project, branch, subject, files=[],
body=None):
self.pr_number += 1
pull_request = FakeGithubPullRequest(
self, self.pr_number, project, branch, subject, self.upstream_root,
files=files, body=body)
self.pull_requests[self.pr_number] = pull_request
return pull_request
def getPushEvent(self, project, ref, old_rev=None, new_rev=None,
added_files=[], removed_files=[], modified_files=[]):
if not old_rev:
old_rev = '0' * 40
if not new_rev:
new_rev = random_sha1()
name = 'push'
data = {
'ref': ref,
'before': old_rev,
'after': new_rev,
'repository': {
'full_name': project
},
'commits': [
{
'added': added_files,
'removed': removed_files,
'modified': modified_files
}
]
}
return (name, data)
def emitEvent(self, event, use_zuulweb=False):
"""Emulates sending the GitHub webhook event to the connection."""
name, data = event
payload = json.dumps(data).encode('utf8')
secret = self.connection_config['webhook_token']
signature = githubconnection._sign_request(payload, secret)
headers = {'x-github-event': name,
'x-hub-signature': signature,
'x-github-delivery': str(uuid.uuid4())}
if use_zuulweb:
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
json=data, headers=headers)
else:
job = self.rpcclient.submitJob(
'github:%s:payload' % self.connection_name,
{'headers': headers, 'body': data})
return json.loads(job.data[0])
def addProject(self, project):
# use the original method here and additionally register it in the
# fake github
super(FakeGithubConnection, self).addProject(project)
self.getGithubClient(project).addProject(project)
def getGitUrl(self, project):
if self.git_url_with_auth:
auth_token = ''.join(
random.choice(string.ascii_lowercase) for x in range(8))
prefix = 'file://x-access-token:%s@' % auth_token
else:
prefix = ''
return prefix + os.path.join(self.upstream_root, str(project))
def real_getGitUrl(self, project):
return super(FakeGithubConnection, self).getGitUrl(project)
def commentPull(self, project, pr_number, message, zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'comment'))
pull_request = self.pull_requests[int(pr_number)]
pull_request.addComment(message)
def mergePull(self, project, pr_number, commit_message='', sha=None,
zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'merge'))
pull_request = self.pull_requests[int(pr_number)]
if self.merge_failure:
raise Exception('Pull request was not merged')
if self.merge_not_allowed_count > 0:
self.merge_not_allowed_count -= 1
raise MergeFailure('Merge was not successful due to mergeability'
' conflict')
pull_request.setMerged(commit_message)
def setCommitStatus(self, project, sha, state, url='', description='',
context='default', user='zuul', zuul_event_id=None):
# record that this got reported and call original method
self.reports.append((project, sha, 'status', (user, context, state)))
super(FakeGithubConnection, self).setCommitStatus(
project, sha, state,
url=url, description=description, context=context)
def labelPull(self, project, pr_number, label, zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'label', label))
pull_request = self.pull_requests[int(pr_number)]
pull_request.addLabel(label)
def unlabelPull(self, project, pr_number, label, zuul_event_id=None):
# record that this got reported
self.reports.append((project, pr_number, 'unlabel', label))
pull_request = self.pull_requests[pr_number]
pull_request.removeLabel(label)
class BuildHistory(object):
def __init__(self, **kw):
self.__dict__.update(kw)
def __repr__(self):
return ("<Completed build, result: %s name: %s uuid: %s "
"changes: %s ref: %s>" %
(self.result, self.name, self.uuid,
self.changes, self.ref))
class FakeStatsd(threading.Thread):
log = logging.getLogger("zuul.test.FakeStatsd")
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
self.sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.sock.bind(('', 0))
self.port = self.sock.getsockname()[1]
self.wake_read, self.wake_write = os.pipe()
self.stats = []
def run(self):
while True:
poll = select.poll()
poll.register(self.sock, select.POLLIN)
poll.register(self.wake_read, select.POLLIN)
ret = poll.poll()
for (fd, event) in ret:
if fd == self.sock.fileno():
data = self.sock.recvfrom(1024)
if not data:
return
self.log.debug("Appending: %s" % data[0])
self.stats.append(data[0])
if fd == self.wake_read:
return
def stop(self):
os.write(self.wake_write, b'1\n')
class FakeBuild(object):
log = logging.getLogger("zuul.test")
def __init__(self, executor_server, job):
self.daemon = True
self.executor_server = executor_server
self.job = job
self.jobdir = None
self.uuid = job.unique
self.parameters = json.loads(job.arguments)
# TODOv3(jeblair): self.node is really "the label of the node
# assigned". We should rename it (self.node_label?) if we
# keep using it like this, or we may end up exposing more of
# the complexity around multi-node jobs here
# (self.nodes[0].label?)
self.node = None
if len(self.parameters.get('nodes')) == 1:
self.node = self.parameters['nodes'][0]['label']
self.unique = self.parameters['zuul']['build']
self.pipeline = self.parameters['zuul']['pipeline']
self.project = self.parameters['zuul']['project']['name']
self.name = self.parameters['job']
self.wait_condition = threading.Condition()
self.waiting = False
self.paused = False
self.aborted = False
self.requeue = False
self.created = time.time()
self.changes = None
items = self.parameters['zuul']['items']
self.changes = ' '.join(['%s,%s' % (x['change'], x['patchset'])
for x in items if 'change' in x])
if 'change' in items[-1]:
self.change = ' '.join((items[-1]['change'],
items[-1]['patchset']))
else:
self.change = None
def __repr__(self):
waiting = ''
if self.waiting:
waiting = ' [waiting]'
return '<FakeBuild %s:%s %s%s>' % (self.pipeline, self.name,
self.changes, waiting)
def release(self):
"""Release this build."""
self.wait_condition.acquire()
self.wait_condition.notify()
self.waiting = False
self.log.debug("Build %s released" % self.unique)
self.wait_condition.release()
def isWaiting(self):
"""Return whether this build is being held.
:returns: Whether the build is being held.
:rtype: bool
"""
self.wait_condition.acquire()
if self.waiting:
ret = True
else:
ret = False
self.wait_condition.release()
return ret
def _wait(self):
self.wait_condition.acquire()
self.waiting = True
self.log.debug("Build %s waiting" % self.unique)
self.wait_condition.wait()
self.wait_condition.release()
def run(self):
self.log.debug('Running build %s' % self.unique)
if self.executor_server.hold_jobs_in_build:
self.log.debug('Holding build %s' % self.unique)
self._wait()
self.log.debug("Build %s continuing" % self.unique)
self.writeReturnData()
result = (RecordingAnsibleJob.RESULT_NORMAL, 0) # Success
if self.shouldFail():
result = (RecordingAnsibleJob.RESULT_NORMAL, 1) # Failure
if self.aborted:
result = (RecordingAnsibleJob.RESULT_ABORTED, None)
if self.requeue:
result = (RecordingAnsibleJob.RESULT_UNREACHABLE, None)
return result
def shouldFail(self):
changes = self.executor_server.fail_tests.get(self.name, [])
for change in changes:
if self.hasChanges(change):
return True
return False
def writeReturnData(self):
changes = self.executor_server.return_data.get(self.name, {})
data = changes.get(self.change)
if data is None:
return
with open(self.jobdir.result_data_file, 'w') as f:
f.write(json.dumps(data))
def hasChanges(self, *changes):
"""Return whether this build has certain changes in its git repos.
:arg FakeChange changes: One or more changes (varargs) that
are expected to be present (in order) in the git repository of
the active project.
:returns: Whether the build has the indicated changes.
:rtype: bool
"""
for change in changes:
hostname = change.source.canonical_hostname
path = os.path.join(self.jobdir.src_root, hostname, change.project)
try:
repo = git.Repo(path)
except NoSuchPathError as e:
self.log.debug('%s' % e)
return False
repo_messages = [c.message.strip() for c in repo.iter_commits()]
commit_message = '%s-1' % change.subject
self.log.debug("Checking if build %s has changes; commit_message "
"%s; repo_messages %s" % (self, commit_message,
repo_messages))
if commit_message not in repo_messages:
self.log.debug(" messages do not match")
return False
self.log.debug(" OK")
return True
def getWorkspaceRepos(self, projects):
"""Return workspace git repo objects for the listed projects
:arg list projects: A list of strings, each the canonical name
of a project.
:returns: A dictionary of {name: repo} for every listed
project.
:rtype: dict
"""
repos = {}
for project in projects:
path = os.path.join(self.jobdir.src_root, project)
repo = git.Repo(path)
repos[project] = repo
return repos
class RecordingAnsibleJob(zuul.executor.server.AnsibleJob):
def doMergeChanges(self, merger, items, repo_state):
# Get a merger in order to update the repos involved in this job.
commit = super(RecordingAnsibleJob, self).doMergeChanges(
merger, items, repo_state)
if not commit: # merge conflict
self.recordResult('MERGER_FAILURE')
return commit
def recordResult(self, result):
build = self.executor_server.job_builds[self.job.unique]
self.executor_server.lock.acquire()
self.executor_server.build_history.append(
BuildHistory(name=build.name, result=result, changes=build.changes,
node=build.node, uuid=build.unique,
ref=build.parameters['zuul']['ref'],
newrev=build.parameters['zuul'].get('newrev'),
parameters=build.parameters, jobdir=build.jobdir,
pipeline=build.parameters['zuul']['pipeline'])
)
self.executor_server.running_builds.remove(build)
del self.executor_server.job_builds[self.job.unique]
self.executor_server.lock.release()
def runPlaybooks(self, args):
build = self.executor_server.job_builds[self.job.unique]
build.jobdir = self.jobdir
result = super(RecordingAnsibleJob, self).runPlaybooks(args)
self.recordResult(result)
return result
def runAnsible(self, cmd, timeout, playbook, ansible_version,
wrapped=True):
build = self.executor_server.job_builds[self.job.unique]
if self.executor_server._run_ansible:
# Call run on the fake build omitting the result so we also can
# hold real ansible jobs.
if playbook.path:
build.run()
result = super(RecordingAnsibleJob, self).runAnsible(
cmd, timeout, playbook, ansible_version, wrapped)
else:
if playbook.path:
result = build.run()
else:
result = (self.RESULT_NORMAL, 0)
return result
def getHostList(self, args):
self.log.debug("hostlist")
hosts = super(RecordingAnsibleJob, self).getHostList(args)
for host in hosts:
if not host['host_vars'].get('ansible_connection'):
host['host_vars']['ansible_connection'] = 'local'
if not hosts:
hosts.append(dict(
name='localhost',
host_vars=dict(ansible_connection='local'),
host_keys=[]))
return hosts
def pause(self):
build = self.executor_server.job_builds[self.job.unique]
build.paused = True
super().pause()
def resume(self):
build = self.executor_server.job_builds.get(self.job.unique)
if build:
build.paused = False
super().resume()
class RecordingMergeClient(zuul.merger.client.MergeClient):
def __init__(self, config, sched):
super().__init__(config, sched)
self.history = {}
def submitJob(self, name, data, build_set,
precedence=zuul.model.PRECEDENCE_NORMAL, event=None):
self.history.setdefault(name, [])
self.history[name].append((data, build_set))
return super().submitJob(
name, data, build_set, precedence, event=event)
class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
"""An Ansible executor to be used in tests.
:ivar bool hold_jobs_in_build: If true, when jobs are executed
they will report that they have started but then pause until
released before reporting completion. This attribute may be
changed at any time and will take effect for subsequently
executed builds, but previously held builds will still need to
be explicitly released.
"""
_job_class = RecordingAnsibleJob
def __init__(self, *args, **kw):
self._run_ansible = kw.pop('_run_ansible', False)
self._test_root = kw.pop('_test_root', False)
if self._run_ansible:
self._ansible_manager_class = zuul.lib.ansible.AnsibleManager
else:
self._ansible_manager_class = FakeAnsibleManager
super(RecordingExecutorServer, self).__init__(*args, **kw)
self.hold_jobs_in_build = False
self.lock = threading.Lock()
self.running_builds = []
self.build_history = []
self.fail_tests = {}
self.return_data = {}
self.job_builds = {}
def failJob(self, name, change):
"""Instruct the executor to report matching builds as failures.
:arg str name: The name of the job to fail.
:arg Change change: The :py:class:`~tests.base.FakeChange`
instance which should cause the job to fail. This job
will also fail for changes depending on this change.
"""
l = self.fail_tests.get(name, [])
l.append(change)
self.fail_tests[name] = l
def returnData(self, name, change, data):
"""Instruct the executor to return data for this build.
:arg str name: The name of the job to return data.
:arg Change change: The :py:class:`~tests.base.FakeChange`
instance which should cause the job to return data.
:arg dict data: The data to return
"""
# TODO(clarkb) We are incredibly change focused here and in FakeBuild
# above. This makes it very difficult to test non change items with
# return data. We currently rely on the hack that None is used as a
# key for the changes dict, but we should improve that to look up
# refnames or similar.
changes = self.return_data.setdefault(name, {})
if hasattr(change, 'number'):
cid = ' '.join((str(change.number), str(change.latest_patchset)))
else:
# Not actually a change, but a ref update event for tags/etc
# In this case a key of None is used by writeReturnData
cid = None
changes[cid] = data
def release(self, regex=None):
"""Release a held build.
:arg str regex: A regular expression which, if supplied, will
cause only builds with matching names to be released. If
not supplied, all builds will be released.
"""
builds = self.running_builds[:]
self.log.debug("Releasing build %s (%s)" % (regex,
len(self.running_builds)))
for build in builds:
if not regex or re.match(regex, build.name):
self.log.debug("Releasing build %s" %
(build.parameters['zuul']['build']))
build.release()
else:
self.log.debug("Not releasing build %s" %
(build.parameters['zuul']['build']))
self.log.debug("Done releasing builds %s (%s)" %
(regex, len(self.running_builds)))
def executeJob(self, job):
build = FakeBuild(self, job)
job.build = build
self.running_builds.append(build)
self.job_builds[job.unique] = build
args = json.loads(job.arguments)
args['zuul']['_test'] = dict(test_root=self._test_root)
job.arguments = json.dumps(args)
super(RecordingExecutorServer, self).executeJob(job)
def stopJob(self, job):
self.log.debug("handle stop")
parameters = json.loads(job.arguments)
uuid = parameters['uuid']
for build in self.running_builds:
if build.unique == uuid:
build.aborted = True
build.release()
super(RecordingExecutorServer, self).stopJob(job)
def stop(self):
for build in self.running_builds:
build.release()
super(RecordingExecutorServer, self).stop()
class FakeGearmanServer(gear.Server):
"""A Gearman server for use in tests.
:ivar bool hold_jobs_in_queue: If true, submitted jobs will be
added to the queue but will not be distributed to workers
until released. This attribute may be changed at any time and
will take effect for subsequently enqueued jobs, but
previously held jobs will still need to be explicitly
released.
"""
def __init__(self, use_ssl=False):
self.hold_jobs_in_queue = False
self.hold_merge_jobs_in_queue = False
self.jobs_history = []
if use_ssl:
ssl_ca = os.path.join(FIXTURE_DIR, 'gearman/root-ca.pem')
ssl_cert = os.path.join(FIXTURE_DIR, 'gearman/server.pem')
ssl_key = os.path.join(FIXTURE_DIR, 'gearman/server.key')
else:
ssl_ca = None
ssl_cert = None
ssl_key = None
super(FakeGearmanServer, self).__init__(0, ssl_key=ssl_key,
ssl_cert=ssl_cert,
ssl_ca=ssl_ca)
def getJobForConnection(self, connection, peek=False):
for job_queue in [self.high_queue, self.normal_queue, self.low_queue]:
for job in job_queue:
self.jobs_history.append(job)
if not hasattr(job, 'waiting'):
if job.name.startswith(b'executor:execute'):
job.waiting = self.hold_jobs_in_queue
elif job.name.startswith(b'merger:'):
job.waiting = self.hold_merge_jobs_in_queue
else:
job.waiting = False
if job.waiting:
continue
if job.name in connection.functions:
if not peek:
job_queue.remove(job)
connection.related_jobs[job.handle] = job
job.worker_connection = connection
job.running = True
return job
return None
def release(self, regex=None):
"""Release a held job.
:arg str regex: A regular expression which, if supplied, will
cause only jobs with matching names to be released. If
not supplied, all jobs will be released.
"""
released = False
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("releasing queued job %s (%s)" % (regex, qlen))
for job in self.getQueue():
match = False
if job.name.startswith(b'executor:execute'):
parameters = json.loads(job.arguments.decode('utf8'))
if not regex or re.match(regex, parameters.get('job')):
match = True
if job.name.startswith(b'merger:'):
if not regex:
match = True
if match:
self.log.debug("releasing queued job %s" %
job.unique)
job.waiting = False
released = True
else:
self.log.debug("not releasing queued job %s" %
job.unique)
if released:
self.wakeConnections()
qlen = (len(self.high_queue) + len(self.normal_queue) +
len(self.low_queue))
self.log.debug("done releasing queued jobs %s (%s)" % (regex, qlen))
class FakeSMTP(object):
log = logging.getLogger('zuul.FakeSMTP')
def __init__(self, messages, server, port):
self.server = server
self.port = port
self.messages = messages
def sendmail(self, from_email, to_email, msg):
self.log.info("Sending email from %s, to %s, with msg %s" % (
from_email, to_email, msg))
headers = msg.split('\n\n', 1)[0]
body = msg.split('\n\n', 1)[1]
self.messages.append(dict(
from_email=from_email,
to_email=to_email,
msg=msg,
headers=headers,
body=body,
))
return True
def quit(self):
return True
class FakeNodepool(object):
REQUEST_ROOT = '/nodepool/requests'
NODE_ROOT = '/nodepool/nodes'
LAUNCHER_ROOT = '/nodepool/launchers'
log = logging.getLogger("zuul.test.FakeNodepool")
def __init__(self, host, port, chroot):
self.complete_event = threading.Event()
self.host_keys = None
self.client = kazoo.client.KazooClient(
hosts='%s:%s%s' % (host, port, chroot))
self.client.start()
self.registerLauncher()
self._running = True
self.paused = False
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()
self.fail_requests = set()
self.remote_ansible = False
self.attributes = None
self.resources = None
self.python_path = '/usr/bin/python2'
def stop(self):
self._running = False
self.thread.join()
self.client.stop()
self.client.close()
def pause(self):
self.complete_event.wait()
self.paused = True
def unpause(self):
self.paused = False
def run(self):
while self._running:
self.complete_event.clear()
try:
self._run()
except Exception:
self.log.exception("Error in fake nodepool:")
self.complete_event.set()
time.sleep(0.1)
def _run(self):
if self.paused:
return
for req in self.getNodeRequests():
self.fulfillRequest(req)
def registerLauncher(self, labels=["label1"], id="FakeLauncher"):
path = os.path.join(self.LAUNCHER_ROOT, id)
data = {'id': id, 'supported_labels': labels}
self.client.create(
path, json.dumps(data).encode('utf8'), makepath=True)
def getNodeRequests(self):
try:
reqids = self.client.get_children(self.REQUEST_ROOT)
except kazoo.exceptions.NoNodeError:
return []
reqs = []
for oid in reqids:
path = self.REQUEST_ROOT + '/' + oid
try:
data, stat = self.client.get(path)
data = json.loads(data.decode('utf8'))
data['_oid'] = oid
reqs.append(data)
except kazoo.exceptions.NoNodeError:
pass
reqs.sort(key=lambda r: (r['_oid'].split('-')[0],
r['relative_priority'],
r['_oid'].split('-')[1]))
return reqs
def getNodes(self):
try:
nodeids = self.client.get_children(self.NODE_ROOT)
except kazoo.exceptions.NoNodeError:
return []
nodes = []
for oid in sorted(nodeids):
path = self.NODE_ROOT + '/' + oid
data, stat = self.client.get(path)
data = json.loads(data.decode('utf8'))
data['_oid'] = oid
try:
lockfiles = self.client.get_children(path + '/lock')
except kazoo.exceptions.NoNodeError:
lockfiles = []
if lockfiles:
data['_lock'] = True
else:
data['_lock'] = False
nodes.append(data)
return nodes
def makeNode(self, request_id, node_type):
now = time.time()
path = '/nodepool/nodes/'
remote_ip = os.environ.get('ZUUL_REMOTE_IPV4', '127.0.0.1')
if self.remote_ansible and not self.host_keys:
self.host_keys = self.keyscan(remote_ip)
host_keys = self.host_keys or ["fake-key1", "fake-key2"]
data = dict(type=node_type,
cloud='test-cloud',
provider='test-provider',
region='test-region',
az='test-az',
attributes=self.attributes,
host_id='test-host-id',
interface_ip=remote_ip,
public_ipv4=remote_ip,
private_ipv4=None,
public_ipv6=None,
python_path=self.python_path,
allocated_to=request_id,
state='ready',
state_time=now,
created_time=now,
updated_time=now,
image_id=None,
host_keys=host_keys,
executor='fake-nodepool',
hold_expiration=None)
if self.resources:
data['resources'] = self.resources
if self.remote_ansible:
data['connection_type'] = 'ssh'
if 'fakeuser' in node_type:
data['username'] = 'fakeuser'
if 'windows' in node_type:
data['connection_type'] = 'winrm'
if 'network' in node_type:
data['connection_type'] = 'network_cli'
if 'kubernetes-namespace' in node_type or 'fedora-pod' in node_type:
data['connection_type'] = 'namespace'
data['connection_port'] = {
'name': 'zuul-ci',
'namespace': 'zuul-ci-abcdefg',
'host': 'localhost',
'skiptls': True,
'token': 'FakeToken',
'ca_crt': 'FakeCA',
'user': 'zuul-worker',
}
if 'fedora-pod' in node_type:
data['connection_type'] = 'kubectl'
data['connection_port']['pod'] = 'fedora-abcdefg'
data = json.dumps(data).encode('utf8')
path = self.client.create(path, data,
makepath=True,
sequence=True)
nodeid = path.split("/")[-1]
return nodeid
def removeNode(self, node):
path = self.NODE_ROOT + '/' + node["_oid"]
self.client.delete(path, recursive=True)
def addFailRequest(self, request):
self.fail_requests.add(request['_oid'])
def fulfillRequest(self, request):
if request['state'] != 'requested':
return
request = request.copy()
oid = request['_oid']
del request['_oid']
if oid in self.fail_requests:
request['state'] = 'failed'
else:
request['state'] = 'fulfilled'
nodes = []
for node in request['node_types']:
nodeid = self.makeNode(oid, node)
nodes.append(nodeid)
request['nodes'] = nodes
request['state_time'] = time.time()
path = self.REQUEST_ROOT + '/' + oid
data = json.dumps(request).encode('utf8')
self.log.debug("Fulfilling node request: %s %s" % (oid, data))
try:
self.client.set(path, data)
except kazoo.exceptions.NoNodeError:
self.log.debug("Node request %s %s disappeared" % (oid, data))
def keyscan(self, ip, port=22, timeout=60):
'''
Scan the IP address for public SSH keys.
Keys are returned formatted as: "<type> <base64_string>"
'''
addrinfo = socket.getaddrinfo(ip, port)[0]
family = addrinfo[0]
sockaddr = addrinfo[4]
keys = []
key = None
for count in iterate_timeout(timeout, "ssh access"):
sock = None
t = None
try:
sock = socket.socket(family, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect(sockaddr)
t = paramiko.transport.Transport(sock)
t.start_client(timeout=timeout)
key = t.get_remote_server_key()
break
except socket.error as e:
if e.errno not in [
errno.ECONNREFUSED, errno.EHOSTUNREACH, None]:
self.log.exception(
'Exception with ssh access to %s:' % ip)
except Exception as e:
self.log.exception("ssh-keyscan failure: %s", e)
finally:
try:
if t:
t.close()
except Exception as e:
self.log.exception('Exception closing paramiko: %s', e)
try:
if sock:
sock.close()
except Exception as e:
self.log.exception('Exception closing socket: %s', e)
# Paramiko, at this time, seems to return only the ssh-rsa key, so
# only the single key is placed into the list.
if key:
keys.append("%s %s" % (key.get_name(), key.get_base64()))
return keys
class ChrootedKazooFixture(fixtures.Fixture):
def __init__(self, test_id):
super(ChrootedKazooFixture, self).__init__()
if 'ZOOKEEPER_2181_TCP' in os.environ:
# prevent any nasty hobbits^H^H^H suprises
if 'NODEPOOL_ZK_HOST' in os.environ:
raise Exception(
'Looks like tox-docker is being used but you have also '
'configured NODEPOOL_ZK_HOST. Either avoid using the '
'docker environment or unset NODEPOOL_ZK_HOST.')
zk_host = 'localhost:' + os.environ['ZOOKEEPER_2181_TCP']
elif 'NODEPOOL_ZK_HOST' in os.environ:
zk_host = os.environ['NODEPOOL_ZK_HOST']
else:
zk_host = 'localhost'
if ':' in zk_host:
host, port = zk_host.split(':')
else:
host = zk_host
port = None
self.zookeeper_host = host
if not port:
self.zookeeper_port = 2181
else:
self.zookeeper_port = int(port)
self.test_id = test_id
def _setUp(self):
# Make sure the test chroot paths do not conflict
random_bits = ''.join(random