The Gatekeeper, or a project gating system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

5990 lines
219 KiB

# Copyright 2012 Hewlett-Packard Development Company, L.P.
# Copyright 2016 Red Hat, Inc.
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import configparser
from collections import OrderedDict
from configparser import ConfigParser
from contextlib import contextmanager
import copy
import datetime
import errno
import gc
import hashlib
from io import StringIO
import itertools
import json
import logging
import os
import random
import re
from collections import defaultdict, namedtuple
from queue import Queue
from typing import Callable, Optional, Any, Iterable, Generator, List, Dict
import requests
import select
import shutil
import socket
import string
import subprocess
import sys
import tempfile
import threading
import traceback
import time
import uuid
import socketserver
import http.server
import urllib.parse
import git
import gear
import fixtures
import kazoo.client
import kazoo.exceptions
import pymysql
import psycopg2
import psycopg2.extensions
import testtools
import testtools.content
import testtools.content_type
from git.exc import NoSuchPathError
from git.util import IterableList
import yaml
import paramiko
import prometheus_client.exposition
import sqlalchemy
from zuul.driver.sql.sqlconnection import DatabaseSession
from zuul import model
from zuul.model import (
BuildRequest, Change, MergeRequest, PRECEDENCE_NORMAL, WebInfo
)
from zuul.rpcclient import RPCClient
from zuul.driver.zuul import ZuulDriver
from zuul.driver.git import GitDriver
from zuul.driver.smtp import SMTPDriver
from zuul.driver.github import GithubDriver
from zuul.driver.timer import TimerDriver
from zuul.driver.sql import SQLDriver, sqlconnection, sqlreporter
from zuul.driver.bubblewrap import BubblewrapDriver
from zuul.driver.nullwrap import NullwrapDriver
from zuul.driver.mqtt import MQTTDriver
from zuul.driver.pagure import PagureDriver
from zuul.driver.gitlab import GitlabDriver
from zuul.driver.gerrit import GerritDriver
from zuul.driver.github.githubconnection import GithubClientManager
from zuul.driver.elasticsearch import ElasticsearchDriver
from zuul.lib.collections import DefaultKeyDict
from zuul.lib.connections import ConnectionRegistry
from zuul.zk import ZooKeeperClient
from zuul.zk.event_queues import ConnectionEventQueue
from zuul.zk.executor import ExecutorApi
from zuul.zk.merger import MergerApi
from psutil import Popen
import zuul.driver.gerrit.gerritsource as gerritsource
import zuul.driver.gerrit.gerritconnection as gerritconnection
import zuul.driver.git.gitwatcher as gitwatcher
import zuul.driver.github.githubconnection as githubconnection
import zuul.driver.pagure.pagureconnection as pagureconnection
import zuul.driver.gitlab.gitlabconnection as gitlabconnection
import zuul.driver.github
import zuul.driver.elasticsearch.connection as elconnection
import zuul.driver.sql
import zuul.scheduler
import zuul.executor.server
import zuul.executor.client
import zuul.lib.ansible
import zuul.lib.connections
import zuul.lib.auth
import zuul.lib.keystorage
import zuul.merger.client
import zuul.merger.merger
import zuul.merger.server
import zuul.nodepool
import zuul.rpcclient
import zuul.configloader
from zuul.lib.config import get_default
from zuul.lib.logutil import get_annotated_logger
import tests.fakegithub
FIXTURE_DIR = os.path.join(os.path.dirname(__file__), 'fixtures')
KEEP_TEMPDIRS = bool(os.environ.get('KEEP_TEMPDIRS', False))
def repack_repo(path):
cmd = ['git', '--git-dir=%s/.git' % path, 'repack', '-afd']
output = subprocess.Popen(cmd, close_fds=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode:
raise Exception("git repack returned %d" % output.returncode)
return out
def random_sha1():
return hashlib.sha1(str(random.random()).encode('ascii')).hexdigest()
def iterate_timeout(max_seconds, purpose):
start = time.time()
count = 0
while (time.time() < start + max_seconds):
count += 1
yield count
time.sleep(0.01)
raise Exception("Timeout waiting for %s" % purpose)
def simple_layout(path, driver='gerrit'):
"""Specify a layout file for use by a test method.
:arg str path: The path to the layout file.
:arg str driver: The source driver to use, defaults to gerrit.
Some tests require only a very simple configuration. For those,
establishing a complete config directory hierachy is too much
work. In those cases, you can add a simple zuul.yaml file to the
test fixtures directory (in fixtures/layouts/foo.yaml) and use
this decorator to indicate the test method should use that rather
than the tenant config file specified by the test class.
The decorator will cause that layout file to be added to a
config-project called "common-config" and each "project" instance
referenced in the layout file will have a git repo automatically
initialized.
"""
def decorator(test):
test.__simple_layout__ = (path, driver)
return test
return decorator
def never_capture():
"""Never capture logs/output
Due to high volume, log files are normally captured and attached
to the subunit stream only on error. This can make diagnosing
some problems difficult. Use this dectorator on a test to
indicate that logs and output should not be captured.
"""
def decorator(test):
test.__never_capture__ = True
return test
return decorator
def registerProjects(source_name, client, config):
path = config.get('scheduler', 'tenant_config')
with open(os.path.join(FIXTURE_DIR, path)) as f:
tenant_config = yaml.safe_load(f.read())
for tenant in tenant_config:
sources = tenant['tenant']['source']
conf = sources.get(source_name)
if not conf:
return
projects = conf.get('config-projects', [])
projects.extend(conf.get('untrusted-projects', []))
for project in projects:
if isinstance(project, dict):
# This can be a dict with the project as the only key
client.addProjectByName(
list(project.keys())[0])
else:
client.addProjectByName(project)
class StatException(Exception):
# Used by assertReportedStat
pass
class GerritDriverMock(GerritDriver):
def __init__(self, registry, changes: Dict[str, Dict[str, Change]],
upstream_root: str, additional_event_queues, poller_events,
add_cleanup: Callable[[Callable[[], None]], None]):
super(GerritDriverMock, self).__init__()
self.registry = registry
self.changes = changes
self.upstream_root = upstream_root
self.additional_event_queues = additional_event_queues
self.poller_events = poller_events
self.add_cleanup = add_cleanup
def getConnection(self, name, config):
db = self.changes.setdefault(config['server'], {})
poll_event = self.poller_events.setdefault(name, threading.Event())
ref_event = self.poller_events.setdefault(name + '-ref',
threading.Event())
connection = FakeGerritConnection(
self, name, config,
changes_db=db,
upstream_root=self.upstream_root,
poller_event=poll_event,
ref_watcher_event=ref_event)
if connection.web_server:
self.add_cleanup(connection.web_server.stop)
setattr(self.registry, 'fake_' + name, connection)
return connection
class GithubDriverMock(GithubDriver):
def __init__(self, registry, changes: Dict[str, Dict[str, Change]],
config: ConfigParser, upstream_root: str,
additional_event_queues, rpcclient: RPCClient,
git_url_with_auth: bool):
super(GithubDriverMock, self).__init__()
self.registry = registry
self.changes = changes
self.config = config
self.upstream_root = upstream_root
self.additional_event_queues = additional_event_queues
self.rpcclient = rpcclient
self.git_url_with_auth = git_url_with_auth
def getConnection(self, name, config):
server = config.get('server', 'github.com')
db = self.changes.setdefault(server, {})
connection = FakeGithubConnection(
self, name, config, self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root,
git_url_with_auth=self.git_url_with_auth)
setattr(self.registry, 'fake_' + name, connection)
client = connection.getGithubClient(None)
registerProjects(connection.source.name, client, self.config)
return connection
class PagureDriverMock(PagureDriver):
def __init__(self, registry, changes: Dict[str, Dict[str, Change]],
upstream_root: str, additional_event_queues,
rpcclient: RPCClient):
super(PagureDriverMock, self).__init__()
self.registry = registry
self.changes = changes
self.upstream_root = upstream_root
self.additional_event_queues = additional_event_queues
self.rpcclient = rpcclient
def getConnection(self, name, config):
server = config.get('server', 'pagure.io')
db = self.changes.setdefault(server, {})
connection = FakePagureConnection(
self, name, config, self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
setattr(self.registry, 'fake_' + name, connection)
return connection
class GitlabDriverMock(GitlabDriver):
def __init__(self, registry, changes: Dict[str, Dict[str, Change]],
config: ConfigParser, upstream_root: str,
additional_event_queues, rpcclient: RPCClient):
super(GitlabDriverMock, self).__init__()
self.registry = registry
self.changes = changes
self.config = config
self.upstream_root = upstream_root
self.additional_event_queues = additional_event_queues
self.rpcclient = rpcclient
def getConnection(self, name, config):
server = config.get('server', 'gitlab.com')
db = self.changes.setdefault(server, {})
connection = FakeGitlabConnection(
self, name, config, self.rpcclient,
changes_db=db,
upstream_root=self.upstream_root)
setattr(self.registry, 'fake_' + name, connection)
registerProjects(connection.source.name, connection.gl_client,
self.config)
return connection
class SQLDriverMock(SQLDriver):
def getConnection(self, name, config):
return FakeSqlConnection(self, name, config)
def getReporter(self, connection, pipeline, config=None):
return FakeSqlReporter(self, connection, config)
class TestConnectionRegistry(ConnectionRegistry):
def __init__(self, changes: Dict[str, Dict[str, Change]],
config: ConfigParser, additional_event_queues,
upstream_root: str, rpcclient: RPCClient, poller_events,
git_url_with_auth: bool,
add_cleanup: Callable[[Callable[[], None]], None],
fake_sql: bool):
self.connections = OrderedDict()
self.drivers = {}
self.registerDriver(ZuulDriver())
self.registerDriver(GerritDriverMock(
self, changes, upstream_root, additional_event_queues,
poller_events, add_cleanup))
self.registerDriver(GitDriver())
self.registerDriver(GithubDriverMock(
self, changes, config, upstream_root, additional_event_queues,
rpcclient, git_url_with_auth))
self.registerDriver(SMTPDriver())
self.registerDriver(TimerDriver())
self.registerDriver(SQLDriver())
self.registerDriver(BubblewrapDriver())
self.registerDriver(NullwrapDriver())
self.registerDriver(MQTTDriver())
self.registerDriver(PagureDriverMock(
self, changes, upstream_root, additional_event_queues, rpcclient))
self.registerDriver(GitlabDriverMock(
self, changes, config, upstream_root, additional_event_queues,
rpcclient))
self.registerDriver(ElasticsearchDriver())
class FakeAnsibleManager(zuul.lib.ansible.AnsibleManager):
def validate(self):
return True
def copyAnsibleFiles(self):
pass
class GerritChangeReference(git.Reference):
_common_path_default = "refs/changes"
_points_to_commits_only = True
class FakeGerritChange(object):
categories = {'Approved': ('Approved', -1, 1),
'Code-Review': ('Code-Review', -2, 2),
'Verified': ('Verified', -2, 2)}
def __init__(self, gerrit, number, project, branch, subject,
status='NEW', upstream_root=None, files={},
parent=None, merge_parents=None, merge_files=None):
self.gerrit = gerrit
self.source = gerrit
self.reported = 0
self.queried = 0
self.patchsets = []
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.latest_patchset = 0
self.depends_on_change = None
self.depends_on_patchset = None
self.needed_by_changes = []
self.fail_merge = False
self.messages = []
self.comments = []
self.checks = {}
self.checks_history = []
self.data = {
'branch': branch,
'comments': self.comments,
'commitMessage': subject,
'createdOn': time.time(),
'id': 'I' + random_sha1(),
'lastUpdated': time.time(),
'number': str(number),
'open': status == 'NEW',
'owner': {'email': 'user@example.com',
'name': 'User Name',
'username': 'username'},
'patchSets': self.patchsets,
'project': project,
'status': status,
'subject': subject,
'submitRecords': [],
'url': '%s/%s' % (self.gerrit.baseurl.rstrip('/'), number)}
self.upstream_root = upstream_root
if merge_parents:
self.addMergePatchset(parents=merge_parents,
merge_files=merge_files)
else:
self.addPatchset(files=files, parent=parent)
if merge_parents:
self.data['parents'] = merge_parents
elif parent:
self.data['parents'] = [parent]
self.data['submitRecords'] = self.getSubmitRecords()
self.open = status == 'NEW'
def addFakeChangeToRepo(self, msg, files, large, parent):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
if parent is None:
parent = 'refs/tags/init'
ref = GerritChangeReference.create(
repo, '%s/%s/%s' % (str(self.number).zfill(2)[-2:],
self.number,
self.latest_patchset),
parent)
repo.head.reference = ref
repo.head.reset(working_tree=True)
repo.git.clean('-x', '-f', '-d')
path = os.path.join(self.upstream_root, self.project)
if not large:
for fn, content in files.items():
fn = os.path.join(path, fn)
if content is None:
os.unlink(fn)
repo.index.remove([fn])
else:
d = os.path.dirname(fn)
if not os.path.exists(d):
os.makedirs(d)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
else:
for fni in range(100):
fn = os.path.join(path, str(fni))
f = open(fn, 'w')
for ci in range(4096):
f.write(random.choice(string.printable))
f.close()
repo.index.add([fn])
r = repo.index.commit(msg)
repo.head.reference = 'master'
repo.head.reset(working_tree=True)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
return r
def addFakeMergeCommitChangeToRepo(self, msg, parents):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
ref = GerritChangeReference.create(
repo, '%s/%s/%s' % (str(self.number).zfill(2)[-2:],
self.number,
self.latest_patchset),
parents[0])
repo.head.reference = ref
repo.head.reset(working_tree=True)
repo.git.clean('-x', '-f', '-d')
repo.index.merge_tree(parents[1])
r = repo.index.commit(msg)
repo.head.reference = 'master'
repo.head.reset(working_tree=True)
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
return r
def addPatchset(self, files=None, large=False, parent=None):
self.latest_patchset += 1
if not files:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
data = ("test %s %s %s\n" %
(self.branch, self.number, self.latest_patchset))
files = {fn: data}
msg = self.subject + '-' + str(self.latest_patchset)
c = self.addFakeChangeToRepo(msg, files, large, parent)
ps_files = [{'file': '/COMMIT_MSG',
'type': 'ADDED'},
{'file': 'README',
'type': 'MODIFIED'}]
for f in files:
ps_files.append({'file': f, 'type': 'ADDED'})
d = {'approvals': [],
'createdOn': time.time(),
'files': ps_files,
'number': str(self.latest_patchset),
'ref': 'refs/changes/%s/%s/%s' % (str(self.number).zfill(2)[-2:],
self.number,
self.latest_patchset),
'revision': c.hexsha,
'uploader': {'email': 'user@example.com',
'name': 'User name',
'username': 'user'}}
self.data['currentPatchSet'] = d
self.patchsets.append(d)
self.data['submitRecords'] = self.getSubmitRecords()
def addMergePatchset(self, parents, merge_files=None):
self.latest_patchset += 1
if not merge_files:
merge_files = []
msg = self.subject + '-' + str(self.latest_patchset)
c = self.addFakeMergeCommitChangeToRepo(msg, parents)
ps_files = [{'file': '/COMMIT_MSG',
'type': 'ADDED'},
{'file': '/MERGE_LIST',
'type': 'ADDED'}]
for f in merge_files:
ps_files.append({'file': f, 'type': 'ADDED'})
d = {'approvals': [],
'createdOn': time.time(),
'files': ps_files,
'number': str(self.latest_patchset),
'ref': 'refs/changes/%s/%s/%s' % (str(self.number).zfill(2)[-2:],
self.number,
self.latest_patchset),
'revision': c.hexsha,
'uploader': {'email': 'user@example.com',
'name': 'User name',
'username': 'user'}}
self.data['currentPatchSet'] = d
self.patchsets.append(d)
self.data['submitRecords'] = self.getSubmitRecords()
def setCheck(self, checker, reset=False, **kw):
if reset:
self.checks[checker] = {'state': 'NOT_STARTED',
'created': str(datetime.datetime.now())}
chk = self.checks.setdefault(checker, {})
chk['updated'] = str(datetime.datetime.now())
for (key, default) in [
('state', None),
('repository', self.project),
('change_number', self.number),
('patch_set_id', self.latest_patchset),
('checker_uuid', checker),
('message', None),
('url', None),
('started', None),
('finished', None),
]:
val = kw.get(key, chk.get(key, default))
if val is not None:
chk[key] = val
elif key in chk:
del chk[key]
self.checks_history.append(copy.deepcopy(self.checks))
def addComment(self, filename, line, message, name, email, username,
comment_range=None):
comment = {
'file': filename,
'line': int(line),
'reviewer': {
'name': name,
'email': email,
'username': username,
},
'message': message,
}
if comment_range:
comment['range'] = comment_range
self.comments.append(comment)
def getPatchsetCreatedEvent(self, patchset):
event = {"type": "patchset-created",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"uploader": {"name": "User Name"}}
return event
def getChangeRestoredEvent(self):
event = {"type": "change-restored",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"restorer": {"name": "User Name"},
"patchSet": self.patchsets[-1],
"reason": ""}
return event
def getChangeAbandonedEvent(self):
event = {"type": "change-abandoned",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"abandoner": {"name": "User Name"},
"patchSet": self.patchsets[-1],
"reason": ""}
return event
def getChangeCommentEvent(self, patchset, comment=None,
patchsetcomment=None):
if comment is None and patchsetcomment is None:
comment = "Patch Set %d:\n\nThis is a comment" % patchset
elif comment:
comment = "Patch Set %d:\n\n%s" % (patchset, comment)
else: # patchsetcomment is not None
comment = "Patch Set %d:\n\n(1 comment)" % patchset
commentevent = {"comment": comment}
if patchsetcomment:
commentevent.update(
{'patchSetComments':
{"/PATCHSET_LEVEL": [{"message": patchsetcomment}]}
}
)
event = {"type": "comment-added",
"change": {"project": self.project,
"branch": self.branch,
"id": "I5459869c07352a31bfb1e7a8cac379cabfcb25af",
"number": str(self.number),
"subject": self.subject,
"owner": {"name": "User Name"},
"url": "https://hostname/3"},
"patchSet": self.patchsets[patchset - 1],
"author": {"name": "User Name"},
"approvals": [{"type": "Code-Review",
"description": "Code-Review",
"value": "0"}]}
event.update(commentevent)
return event
def getChangeMergedEvent(self):
event = {"submitter": {"name": "Jenkins",
"username": "jenkins"},
"newRev": "29ed3b5f8f750a225c5be70235230e3a6ccb04d9",
"patchSet": self.patchsets[-1],
"change": self.data,
"type": "change-merged",
"eventCreatedOn": 1487613810}
return event
def getRefUpdatedEvent(self):
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
oldrev = repo.heads[self.branch].commit.hexsha
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": oldrev,
"newRev": self.patchsets[-1]['revision'],
"refName": self.branch,
"project": self.project,
}
}
return event
def addApproval(self, category, value, username='reviewer_john',
granted_on=None, message='', tag=None):
if not granted_on:
granted_on = time.time()
approval = {
'description': self.categories[category][0],
'type': category,
'value': str(value),
'by': {
'username': username,
'email': username + '@example.com',
},
'grantedOn': int(granted_on),
'__tag': tag, # Not available in ssh api
}
for i, x in enumerate(self.patchsets[-1]['approvals'][:]):
if x['by']['username'] == username and x['type'] == category:
del self.patchsets[-1]['approvals'][i]
self.patchsets[-1]['approvals'].append(approval)
event = {'approvals': [approval],
'author': {'email': 'author@example.com',
'name': 'Patchset Author',
'username': 'author_phil'},
'change': {'branch': self.branch,
'id': 'Iaa69c46accf97d0598111724a38250ae76a22c87',
'number': str(self.number),
'owner': {'email': 'owner@example.com',
'name': 'Change Owner',
'username': 'owner_jane'},
'project': self.project,
'subject': self.subject,
'topic': 'master',
'url': 'https://hostname/459'},
'comment': message,
'patchSet': self.patchsets[-1],
'type': 'comment-added'}
self.data['submitRecords'] = self.getSubmitRecords()
return json.loads(json.dumps(event))
def setWorkInProgress(self, wip):
# Gerrit only includes 'wip' in the data returned via ssh if
# the value is true.
if wip:
self.data['wip'] = True
elif 'wip' in self.data:
del self.data['wip']
def getSubmitRecords(self):
status = {}
for cat in self.categories:
status[cat] = 0
for a in self.patchsets[-1]['approvals']:
cur = status[a['type']]
cat_min, cat_max = self.categories[a['type']][1:]
new = int(a['value'])
if new == cat_min:
cur = new
elif abs(new) > abs(cur):
cur = new
status[a['type']] = cur
labels = []
ok = True
for typ, cat in self.categories.items():
cur = status[typ]
cat_min, cat_max = cat[1:]
if cur == cat_min:
value = 'REJECT'
ok = False
elif cur == cat_max:
value = 'OK'
else:
value = 'NEED'
ok = False
labels.append({'label': cat[0], 'status': value})
if ok:
return [{'status': 'OK'}]
return [{'status': 'NOT_READY',
'labels': labels}]
def setDependsOn(self, other, patchset):
self.depends_on_change = other
self.depends_on_patchset = patchset
d = {'id': other.data['id'],
'number': other.data['number'],
'ref': other.patchsets[patchset - 1]['ref']
}
self.data['dependsOn'] = [d]
other.needed_by_changes.append((self, len(self.patchsets)))
needed = other.data.get('neededBy', [])
d = {'id': self.data['id'],
'number': self.data['number'],
'ref': self.patchsets[-1]['ref'],
'revision': self.patchsets[-1]['revision']
}
needed.append(d)
other.data['neededBy'] = needed
def query(self):
self.queried += 1
d = self.data.get('dependsOn')
if d:
d = d[0]
if (self.depends_on_change.patchsets[-1]['ref'] == d['ref']):
d['isCurrentPatchSet'] = True
else:
d['isCurrentPatchSet'] = False
return json.loads(json.dumps(self.data))
def queryHTTP(self):
self.queried += 1
labels = {}
for cat in self.categories:
labels[cat] = {}
for app in self.patchsets[-1]['approvals']:
label = labels[app['type']]
_, label_min, label_max = self.categories[app['type']]
val = int(app['value'])
label_all = label.setdefault('all', [])
approval = {
"value": val,
"username": app['by']['username'],
"email": app['by']['email'],
"date": str(datetime.datetime.fromtimestamp(app['grantedOn'])),
}
if app.get('__tag') is not None:
approval['tag'] = app['__tag']
label_all.append(approval)
if val == label_min:
label['blocking'] = True
if 'rejected' not in label:
label['rejected'] = app['by']
if val == label_max:
if 'approved' not in label:
label['approved'] = app['by']
revisions = {}
rev = self.patchsets[-1]
num = len(self.patchsets)
files = {}
for f in rev['files']:
if f['file'] == '/COMMIT_MSG':
continue
files[f['file']] = {"status": f['type'][0]} # ADDED -> A
parent = '0000000000000000000000000000000000000000'
if self.depends_on_change:
parent = self.depends_on_change.patchsets[
self.depends_on_patchset - 1]['revision']
revisions[rev['revision']] = {
"kind": "REWORK",
"_number": num,
"created": rev['createdOn'],
"uploader": rev['uploader'],
"ref": rev['ref'],
"commit": {
"subject": self.subject,
"message": self.data['commitMessage'],
"parents": [{
"commit": parent,
}]
},
"files": files
}
data = {
"id": self.project + '~' + self.branch + '~' + self.data['id'],
"project": self.project,
"branch": self.branch,
"hashtags": [],
"change_id": self.data['id'],
"subject": self.subject,
"status": self.data['status'],
"created": self.data['createdOn'],
"updated": self.data['lastUpdated'],
"_number": self.number,
"owner": self.data['owner'],
"labels": labels,
"current_revision": self.patchsets[-1]['revision'],
"revisions": revisions,
"requirements": [],
"work_in_progresss": self.data.get('wip', False)
}
if 'parents' in self.data:
data['parents'] = self.data['parents']
return json.loads(json.dumps(data))
def queryRevisionHTTP(self, revision):
for ps in self.patchsets:
if ps['revision'] == revision:
break
else:
return None
changes = []
if self.depends_on_change:
changes.append({
"commit": {
"commit": self.depends_on_change.patchsets[
self.depends_on_patchset - 1]['revision'],
},
"_change_number": self.depends_on_change.number,
"_revision_number": self.depends_on_patchset
})
for (needed_by_change, needed_by_patchset) in self.needed_by_changes:
changes.append({
"commit": {
"commit": needed_by_change.patchsets[
needed_by_patchset - 1]['revision'],
},
"_change_number": needed_by_change.number,
"_revision_number": needed_by_patchset,
})
return {"changes": changes}
def queryFilesHTTP(self, revision):
for rev in self.patchsets:
if rev['revision'] == revision:
break
else:
return None
files = {}
for f in rev['files']:
files[f['file']] = {"status": f['type'][0]} # ADDED -> A
return files
def setMerged(self):
if (self.depends_on_change and
self.depends_on_change.data['status'] != 'MERGED'):
return
if self.fail_merge:
return
self.data['status'] = 'MERGED'
self.open = False
path = os.path.join(self.upstream_root, self.project)
repo = git.Repo(path)
repo.head.reference = self.branch
repo.head.reset(working_tree=True)
repo.git.merge('-s', 'resolve', self.patchsets[-1]['ref'])
repo.heads[self.branch].commit = repo.head.commit
def setReported(self):
self.reported += 1
class PrometheusServer(object):
def start(self):
app = prometheus_client.make_wsgi_app(prometheus_client.REGISTRY)
self.httpd = prometheus_client.exposition.make_server(
"0.0.0.0",
0,
app,
prometheus_client.exposition.ThreadingWSGIServer,
handler_class=prometheus_client.exposition._SilentHandler)
self.port = self.httpd.socket.getsockname()[1]
self.thread = threading.Thread(target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.httpd.shutdown()
self.thread.join()
self.httpd.socket.close()
class GerritWebServer(object):
def __init__(self, fake_gerrit):
super(GerritWebServer, self).__init__()
self.fake_gerrit = fake_gerrit
def start(self):
fake_gerrit = self.fake_gerrit
class Server(http.server.SimpleHTTPRequestHandler):
log = logging.getLogger("zuul.test.FakeGerritConnection")
review_re = re.compile('/a/changes/(.*?)/revisions/(.*?)/review')
submit_re = re.compile('/a/changes/(.*?)/submit')
pending_checks_re = re.compile(
r'/a/plugins/checks/checks\.pending/\?'
r'query=checker:(.*?)\+\(state:(.*?)\)')
update_checks_re = re.compile(
r'/a/changes/(.*)/revisions/(.*?)/checks/(.*)')
list_checkers_re = re.compile('/a/plugins/checks/checkers/')
change_re = re.compile(r'/a/changes/(.*)\?o=.*')
related_re = re.compile(r'/a/changes/(.*)/revisions/(.*)/related')
files_re = re.compile(r'/a/changes/(.*)/revisions/(.*)/files'
r'\?parent=1')
change_search_re = re.compile(r'/a/changes/\?n=500.*&q=(.*)')
version_re = re.compile(r'/a/config/server/version')
def do_POST(self):
path = self.path
self.log.debug("Got POST %s", path)
data = self.rfile.read(int(self.headers['Content-Length']))
data = json.loads(data.decode('utf-8'))
self.log.debug("Got data %s", data)
m = self.review_re.match(path)
if m:
return self.review(m.group(1), m.group(2), data)
m = self.submit_re.match(path)
if m:
return self.submit(m.group(1), data)
m = self.update_checks_re.match(path)
if m:
return self.update_checks(
m.group(1), m.group(2), m.group(3), data)
self.send_response(500)
self.end_headers()
def do_GET(self):
path = self.path
self.log.debug("Got GET %s", path)
m = self.change_re.match(path)
if m:
return self.get_change(m.group(1))
m = self.related_re.match(path)
if m:
return self.get_related(m.group(1), m.group(2))
m = self.files_re.match(path)
if m:
return self.get_files(m.group(1), m.group(2))
m = self.change_search_re.match(path)
if m:
return self.get_changes(m.group(1))
m = self.pending_checks_re.match(path)
if m:
return self.get_pending_checks(m.group(1), m.group(2))
m = self.list_checkers_re.match(path)
if m:
return self.list_checkers()
m = self.version_re.match(path)
if m:
return self.version()
self.send_response(500)
self.end_headers()
def _404(self):
self.send_response(404)
self.end_headers()
def _get_change(self, change_id):
change_id = urllib.parse.unquote(change_id)
project, branch, change = change_id.split('~')
for c in fake_gerrit.changes.values():
if (c.data['id'] == change and
c.data['branch'] == branch and
c.data['project'] == project):
return c
def review(self, change_id, revision, data):
change = self._get_change(change_id)
if not change:
return self._404()
message = data['message']
labels = data.get('labels', {})
comments = data.get('robot_comments', data.get('comments', {}))
tag = data.get('tag', None)
fake_gerrit._test_handle_review(
int(change.data['number']), message, False, labels,
comments, tag=tag)
self.send_response(200)
self.end_headers()
def submit(self, change_id, data):
change = self._get_change(change_id)
if not change:
return self._404()
message = None
labels = {}
fake_gerrit._test_handle_review(
int(change.data['number']), message, True, labels)
self.send_response(200)
self.end_headers()
def update_checks(self, change_id, revision, checker, data):
self.log.debug("Update checks %s %s %s",
change_id, revision, checker)
change = self._get_change(change_id)
if not change:
return self._404()
change.setCheck(checker, **data)
self.send_response(200)
# TODO: return the real data structure, but zuul
# ignores this now.
self.end_headers()
def get_pending_checks(self, checker, state):
self.log.debug("Get pending checks %s %s", checker, state)
ret = []
for c in fake_gerrit.changes.values():
if checker not in c.checks:
continue
patchset_pending_checks = {}
if c.checks[checker]['state'] == state:
patchset_pending_checks[checker] = {
'state': c.checks[checker]['state'],
}
if patchset_pending_checks:
ret.append({
'patch_set': {
'repository': c.project,
'change_number': c.number,
'patch_set_id': c.latest_patchset,
},
'pending_checks': patchset_pending_checks,
})
self.send_data(ret)
def list_checkers(self):
self.log.debug("Get checkers")
self.send_data(fake_gerrit.fake_checkers)
def get_change(self, number):
change = fake_gerrit.changes.get(int(number))
if not change:
return self._404()
self.send_data(change.queryHTTP())
self.end_headers()
def get_related(self, number, revision):
change = fake_gerrit.changes.get(int(number))
if not change:
return self._404()
data = change.queryRevisionHTTP(revision)
if data is None:
return self._404()
self.send_data(data)
self.end_headers()
def get_files(self, number, revision):
change = fake_gerrit.changes.get(int(number))
if not change:
return self._404()
data = change.queryFilesHTTP(revision)
if data is None:
return self._404()
self.send_data(data)
self.end_headers()
def get_changes(self, query):
self.log.debug("simpleQueryHTTP: %s", query)
query = urllib.parse.unquote(query)
fake_gerrit.queries.append(query)
results = []
if query.startswith('(') and 'OR' in query:
query = query[1:-1]
for q in query.split(' OR '):
for r in fake_gerrit._simpleQuery(q, http=True):
if r not in results:
results.append(r)
else:
results = fake_gerrit._simpleQuery(query, http=True)
self.send_data(results)
self.end_headers()
def version(self):
self.send_data('3.0.0-some-stuff')
self.end_headers()
def send_data(self, data):
data = json.dumps(data).encode('utf-8')
data = b")]}'\n" + data
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
def log_message(self, fmt, *args):
self.log.debug(fmt, *args)
self.httpd = socketserver.ThreadingTCPServer(('', 0), Server)
self.port = self.httpd.socket.getsockname()[1]
self.thread = threading.Thread(name='GerritWebServer',
target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.httpd.shutdown()
self.thread.join()
class FakeGerritPoller(gerritconnection.GerritPoller):
"""A Fake Gerrit poller for use in tests.
This subclasses
:py:class:`~zuul.connection.gerrit.GerritPoller`.
"""
poll_interval = 1
def _poll(self, *args, **kw):
r = super(FakeGerritPoller, self)._poll(*args, **kw)
# Set the event so tests can confirm that the poller has run
# after they changed something.
self.connection._poller_event.set()
return r
class FakeGerritRefWatcher(gitwatcher.GitWatcher):
"""A Fake Gerrit ref watcher.
This subclasses
:py:class:`~zuul.connection.git.GitWatcher`.
"""
def __init__(self, *args, **kw):
super(FakeGerritRefWatcher, self).__init__(*args, **kw)
self.baseurl = self.connection.upstream_root
self.poll_delay = 1
def _poll(self, *args, **kw):
r = super(FakeGerritRefWatcher, self)._poll(*args, **kw)
# Set the event so tests can confirm that the watcher has run
# after they changed something.
self.connection._ref_watcher_event.set()
return r
class FakeElasticsearchConnection(elconnection.ElasticsearchConnection):
log = logging.getLogger("zuul.test.FakeElasticsearchConnection")
def __init__(self, driver, connection_name, connection_config):
self.driver = driver
self.connection_name = connection_name
self.source_it = None
def add_docs(self, source_it, index):
self.source_it = source_it
self.index = index
class FakeGerritConnection(gerritconnection.GerritConnection):
"""A Fake Gerrit connection for use in tests.
This subclasses
:py:class:`~zuul.connection.gerrit.GerritConnection` to add the
ability for tests to add changes to the fake Gerrit it represents.
"""
log = logging.getLogger("zuul.test.FakeGerritConnection")
_poller_class = FakeGerritPoller
_ref_watcher_class = FakeGerritRefWatcher
def __init__(self, driver, connection_name, connection_config,
changes_db=None, upstream_root=None, poller_event=None,
ref_watcher_event=None):
if connection_config.get('password'):
self.web_server = GerritWebServer(self)
self.web_server.start()
url = 'http://localhost:%s' % self.web_server.port
connection_config['baseurl'] = url
else:
self.web_server = None
super(FakeGerritConnection, self).__init__(driver, connection_name,
connection_config)
self.fixture_dir = os.path.join(FIXTURE_DIR, 'gerrit')
self.change_number = 0
self.changes = changes_db
self.queries = []
self.upstream_root = upstream_root
self.fake_checkers = []
self._poller_event = poller_event
self._ref_watcher_event = ref_watcher_event
def addFakeChecker(self, **kw):
self.fake_checkers.append(kw)
def addFakeChange(self, project, branch, subject, status='NEW',
files=None, parent=None, merge_parents=None,
merge_files=None):
"""Add a change to the fake Gerrit."""
self.change_number += 1
c = FakeGerritChange(self, self.change_number, project, branch,
subject, upstream_root=self.upstream_root,
status=status, files=files, parent=parent,
merge_parents=merge_parents,
merge_files=merge_files)
self.changes[self.change_number] = c
return c
def addFakeTag(self, project, branch, tag):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
commit = repo.heads[branch].commit
newrev = commit.hexsha
ref = 'refs/tags/' + tag
git.Tag.create(repo, tag, commit)
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": 40 * '0',
"newRev": newrev,
"refName": ref,
"project": project,
}
}
return event
def getFakeBranchCreatedEvent(self, project, branch):
path = os.path.join(self.upstream_root, project)
repo = git.Repo(path)
oldrev = 40 * '0'
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": oldrev,
"newRev": repo.heads[branch].commit.hexsha,
"refName": 'refs/heads/' + branch,
"project": project,
}
}
return event
def getFakeBranchDeletedEvent(self, project, branch):
oldrev = '4abd38457c2da2a72d4d030219ab180ecdb04bf0'
newrev = 40 * '0'
event = {
"type": "ref-updated",
"submitter": {
"name": "User Name",
},
"refUpdate": {
"oldRev": oldrev,
"newRev": newrev,
"refName": 'refs/heads/' + branch,
"project": project,
}
}
return event
def review(self, item, message, submit, labels, checks_api, file_comments,
zuul_event_id=None):
if self.web_server:
return super(FakeGerritConnection, self).review(
item, message, submit, labels, checks_api, file_comments,
zuul_event_id)
self._test_handle_review(int(item.change.number), message, submit,
labels)
def _test_handle_review(self, change_number, message, submit, labels,
file_comments=None, tag=None):
# Handle a review action from a test
change = self.changes[change_number]
# Add the approval back onto the change (ie simulate what gerrit would
# do).
# Usually when zuul leaves a review it'll create a feedback loop where
# zuul's review enters another gerrit event (which is then picked up by
# zuul). However, we can't mimic this behaviour (by adding this
# approval event into the queue) as it stops jobs from checking what
# happens before this event is triggered. If a job needs to see what
# happens they can add their own verified event into the queue.
# Nevertheless, we can update change with the new review in gerrit.
for cat in labels:
change.addApproval(cat, labels[cat], username=self.user,
tag=tag)
if message:
change.messages.append(message)
if file_comments:
for filename, commentlist in file_comments.items():
for comment in commentlist:
change.addComment(filename, comment['line'],
comment['message'], 'Zuul',
'zuul@example.com', self.user,
comment.get('range'))
if submit:
change.setMerged()
if message:
change.setReported()
def queryChangeSSH(self, number, event=None):
self.log.debug("Query change SSH: %s", number)
change = self.changes.get(int(number))
if change:
return change.query()
return {}
def _simpleQuery(self, query, http=False):
if http:
def queryMethod(change):
return change.queryHTTP()
else:
def queryMethod(change):
return change.query()
# the query can be in parenthesis so strip them if needed
if query.startswith('('):
query = query[1:-1]
if query.startswith('change:'):
# Query a specific changeid
changeid = query[len('change:'):]
l = [queryMethod(change) for change in self.changes.values()
if (change.data['id'] == changeid or
change.data['number'] == changeid)]
elif query.startswith('message:'):
# Query the content of a commit message
msg = query[len('message:'):].strip()
# Remove quoting if it is there
if msg.startswith('{') and msg.endswith('}'):
msg = msg[1:-1]
l = [queryMethod(change) for change in self.changes.values()
if msg in change.data['commitMessage']]
elif query.startswith("status:"):
cut_off_time = 0
parts = query.split(" ")
for part in parts:
if part.startswith("-age"):
_, _, age = part.partition(":")
cut_off_time = (
datetime.datetime.now().timestamp() - float(age[:-1])
)
l = [
queryMethod(change) for change in self.changes.values()
if change.data["lastUpdated"] >= cut_off_time
]
else:
# Query all open changes
l = [queryMethod(change) for change in self.changes.values()]
return l
def simpleQuerySSH(self, query, event=None):
log = get_annotated_logger(self.log, event)
log.debug("simpleQuerySSH: %s", query)
self.queries.append(query)
results = []
if query.startswith('(') and 'OR' in query:
query = query[1:-1]
for q in query.split(' OR '):
for r in self._simpleQuery(q):
if r not in results:
results.append(r)
else:
results = self._simpleQuery(query)
return results
def _start_watcher_thread(self, *args, **kw):
pass
def _uploadPack(self, project):
ret = ('00a31270149696713ba7e06f1beb760f20d359c4abed HEAD\x00'
'multi_ack thin-pack side-band side-band-64k ofs-delta '
'shallow no-progress include-tag multi_ack_detailed no-done\n')
path = os.path.join(self.upstream_root, project.name)
repo = git.Repo(path)
for ref in repo.refs:
if ref.path.endswith('.lock'):
# don't treat lockfiles as ref
continue
r = ref.object.hexsha + ' ' + ref.path + '\n'
ret += '%04x%s' % (len(r) + 4, r)
ret += '0000'
return ret
def getGitUrl(self, project):
return 'file://' + os.path.join(self.upstream_root, project.name)
class PagureChangeReference(git.Reference):
_common_path_default = "refs/pull"
_points_to_commits_only = True
class FakePagurePullRequest(object):
log = logging.getLogger("zuul.test.FakePagurePullRequest")
def __init__(self, pagure, number, project, branch,
subject, upstream_root, files={}, number_of_commits=1,
initial_comment=None):
self.pagure = pagure
self.source = pagure
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.upstream_root = upstream_root
self.number_of_commits = 0
self.status = 'Open'
self.initial_comment = initial_comment
self.uuid = uuid.uuid4().hex
self.comments = []
self.flags = []
self.files = {}
self.tags = []
self.cached_merge_status = ''
self.threshold_reached = False
self.commit_stop = None
self.commit_start = None
self.threshold_reached = False
self.upstream_root = upstream_root
self.cached_merge_status = 'MERGE'
self.url = "https://%s/%s/pull-request/%s" % (
self.pagure.server, self.project, self.number)
self.is_merged = False
self.pr_ref = self._createPRRef()
self._addCommitInPR(files=files)
self._updateTimeStamp()
def _getPullRequestEvent(self, action, pull_data_field='pullrequest'):
name = 'pg_pull_request'
data = {
'msg': {
pull_data_field: {
'branch': self.branch,
'comments': self.comments,
'commit_start': self.commit_start,
'commit_stop': self.commit_stop,
'date_created': '0',
'tags': self.tags,
'initial_comment': self.initial_comment,
'id': self.number,
'project': {
'fullname': self.project,
},
'status': self.status,
'subject': self.subject,
'uid': self.uuid,
}
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': action
}
if action == 'pull-request.flag.added':
data['msg']['flag'] = self.flags[0]
if action == 'pull-request.tag.added':
data['msg']['tags'] = self.tags
return (name, data)
def getPullRequestOpenedEvent(self):
return self._getPullRequestEvent('pull-request.new')
def getPullRequestClosedEvent(self, merged=True):
if merged:
self.is_merged = True
self.status = 'Merged'
else:
self.is_merged = False
self.status = 'Closed'
return self._getPullRequestEvent('pull-request.closed')
def getPullRequestUpdatedEvent(self):
self._addCommitInPR()
self.addComment(
"**1 new commit added**\n\n * ``Bump``\n",
True)
return self._getPullRequestEvent('pull-request.comment.added')
def getPullRequestCommentedEvent(self, message):
self.addComment(message)
return self._getPullRequestEvent('pull-request.comment.added')
def getPullRequestInitialCommentEvent(self, message):
self.initial_comment = message
self._updateTimeStamp()
return self._getPullRequestEvent('pull-request.initial_comment.edited')
def getPullRequestTagAddedEvent(self, tags, reset=True):
if reset:
self.tags = []
_tags = set(self.tags)
_tags.update(set(tags))
self.tags = list(_tags)
self.addComment(
"**Metadata Update from @pingou**:\n- " +
"Pull-request tagged with: %s" % ', '.join(tags),
True)
self._updateTimeStamp()
return self._getPullRequestEvent(
'pull-request.tag.added', pull_data_field='pull_request')
def getPullRequestStatusSetEvent(self, status, username="zuul"):
self.addFlag(
status, "https://url", "Build %s" % status, username)
return self._getPullRequestEvent('pull-request.flag.added')
def insertFlag(self, flag):
to_pop = None
for i, _flag in enumerate(self.flags):
if _flag['uid'] == flag['uid']:
to_pop = i
if to_pop is not None:
self.flags.pop(to_pop)
self.flags.insert(0, flag)
def addFlag(self, status, url, comment, username="zuul"):
flag_uid = "%s-%s-%s" % (username, self.number, self.project)
flag = {
"username": "Zuul CI",
"user": {
"name": username
},
"uid": flag_uid[:32],
"comment": comment,
"status": status,
"url": url
}
self.insertFlag(flag)
self._updateTimeStamp()
def editInitialComment(self, initial_comment):
self.initial_comment = initial_comment
self._updateTimeStamp()
def addComment(self, message, notification=False, fullname=None):
self.comments.append({
'comment': message,
'notification': notification,
'date_created': str(int(time.time())),
'user': {
'fullname': fullname or 'Pingou'
}}
)
self._updateTimeStamp()
def getPRReference(self):
return '%s/head' % self.number
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createPRRef(self):
repo = self._getRepo()
return PagureChangeReference.create(
repo, self.getPRReference(), 'refs/tags/init')
def addCommit(self, files={}):
"""Adds a commit on top of the actual PR head."""
self._addCommitInPR(files=files)
self._updateTimeStamp()
def forcePush(self, files={}):
"""Clears actual commits and add a commit on top of the base."""
self._addCommitInPR(files=files, reset=True)
self._updateTimeStamp()
def _addCommitInPR(self, files={}, reset=False):
repo = self._getRepo()
ref = repo.references[self.getPRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
repo.git.clean('-x', '-f', '-d')
if files:
self.files = files
else:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
self.files = {fn: "test %s %s\n" % (self.branch, self.number)}
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
self.commit_stop = repo.index.commit(msg).hexsha
if not self.commit_start:
self.commit_start = self.commit_stop
repo.create_head(self.getPRReference(), self.commit_stop, force=True)
self.pr_ref.set_commit(self.commit_stop)
repo.head.reference = 'master'
repo.git.clean('-x', '-f', '-d')
repo.heads['master'].checkout()
def _updateTimeStamp(self):
self.last_updated = str(int(time.time()))
class FakePagureAPIClient(pagureconnection.PagureAPIClient):
log = logging.getLogger("zuul.test.FakePagureAPIClient")
def __init__(self, baseurl, api_token, project,
pull_requests_db={}):
super(FakePagureAPIClient, self).__init__(
baseurl, api_token, project)
self.session = None
self.pull_requests = pull_requests_db
self.return_post_error = None
def gen_error(self, verb, custom_only=False):
if verb == 'POST' and self.return_post_error:
return {
'error': self.return_post_error['error'],
'error_code': self.return_post_error['error_code']
}, 401, "", 'POST'
self.return_post_error = None
if not custom_only:
return {
'error': 'some error',
'error_code': 'some error code'
}, 503, "", verb
def _get_pr(self, match):
project, number = match.groups()
pr = self.pull_requests.get(project, {}).get(number)
if not pr:
return self.gen_error("GET")
return pr
def get(self, url):
self.log.debug("Getting resource %s ..." % url)
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)$', url)
if match:
pr = self._get_pr(match)
return {
'branch': pr.branch,
'subject': pr.subject,
'status': pr.status,
'initial_comment': pr.initial_comment,
'last_updated': pr.last_updated,
'comments': pr.comments,
'commit_stop': pr.commit_stop,
'threshold_reached': pr.threshold_reached,
'cached_merge_status': pr.cached_merge_status,
'tags': pr.tags,
}, 200, "", "GET"
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/flag$', url)
if match:
pr = self._get_pr(match)
return {'flags': pr.flags}, 200, "", "GET"
match = re.match('.+/api/0/(.+)/git/branches$', url)
if match:
# project = match.groups()[0]
return {'branches': ['master']}, 200, "", "GET"
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/diffstats$', url)
if match:
pr = self._get_pr(match)
return pr.files, 200, "", "GET"
def post(self, url, params=None):
self.log.info(
"Posting on resource %s, params (%s) ..." % (url, params))
# Will only match if return_post_error is set
err = self.gen_error("POST", custom_only=True)
if err:
return err
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/merge$', url)
if match:
pr = self._get_pr(match)
pr.status = 'Merged'
pr.is_merged = True
return {}, 200, "", "POST"
match = re.match(r'.+/api/0/-/whoami$', url)
if match:
return {"username": "zuul"}, 200, "", "POST"
if not params:
return self.gen_error("POST")
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/flag$', url)
if match:
pr = self._get_pr(match)
params['user'] = {"name": "zuul"}
pr.insertFlag(params)
match = re.match(r'.+/api/0/(.+)/pull-request/(\d+)/comment$', url)
if match:
pr = self._get_pr(match)
pr.addComment(params['comment'])
return {}, 200, "", "POST"
class FakePagureConnection(pagureconnection.PagureConnection):
log = logging.getLogger("zuul.test.FakePagureConnection")
def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None):
super(FakePagureConnection, self).__init__(driver, connection_name,
connection_config)
self.connection_name = connection_name
self.pr_number = 0
self.pull_requests = changes_db
self.statuses = {}
self.upstream_root = upstream_root
self.reports = []
self.rpcclient = rpcclient
self.cloneurl = self.upstream_root
def get_project_api_client(self, project):
client = FakePagureAPIClient(
self.baseurl, None, project,
pull_requests_db=self.pull_requests)
if not self.username:
self.set_my_username(client)
return client
def get_project_webhook_token(self, project):
return 'fake_webhook_token-%s' % project
def emitEvent(self, event, use_zuulweb=False, project=None,
wrong_token=False):
name, payload = event
if use_zuulweb:
if not wrong_token:
secret = 'fake_webhook_token-%s' % project
else:
secret = ''
payload = json.dumps(payload).encode('utf-8')
signature, _ = pagureconnection._sign_request(payload, secret)
headers = {'x-pagure-signature': signature,
'x-pagure-project': project}
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
data = {'payload': payload}
self.event_queue.put(data)
return data
def openFakePullRequest(self, project, branch, subject, files=[],
initial_comment=None):
self.pr_number += 1
pull_request = FakePagurePullRequest(
self, self.pr_number, project, branch, subject, self.upstream_root,
files=files, initial_comment=initial_comment)
self.pull_requests.setdefault(
project, {})[str(self.pr_number)] = pull_request
return pull_request
def getGitReceiveEvent(self, project):
name = 'pg_push'
repo_path = os.path.join(self.upstream_root, project)
repo = git.Repo(repo_path)
headsha = repo.head.commit.hexsha
data = {
'msg': {
'project_fullname': project,
'branch': 'master',
'end_commit': headsha,
'old_commit': '1' * 40,
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.receive',
}
return (name, data)
def getGitTagCreatedEvent(self, project, tag, rev):
name = 'pg_push'
data = {
'msg': {
'project_fullname': project,
'tag': tag,
'rev': rev
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.tag.creation',
}
return (name, data)
def getGitBranchEvent(self, project, branch, type, rev):
name = 'pg_push'
data = {
'msg': {
'project_fullname': project,
'branch': branch,
'rev': rev,
},
'msg_id': str(uuid.uuid4()),
'timestamp': 1427459070,
'topic': 'git.branch.%s' % type,
}
return (name, data)
def setZuulWebPort(self, port):
self.zuul_web_port = port
class FakeGitlabConnection(gitlabconnection.GitlabConnection):
log = logging.getLogger("zuul.test.FakeGitlabConnection")
def __init__(self, driver, connection_name, connection_config, rpcclient,
changes_db=None, upstream_root=None):
super(FakeGitlabConnection, self).__init__(driver, connection_name,
connection_config)
self.merge_requests = changes_db
self.gl_client = FakeGitlabAPIClient(
self.baseurl, self.api_token, merge_requests_db=changes_db)
self.rpcclient = rpcclient
self.upstream_root = upstream_root
self.mr_number = 0
def addProject(self, project):
super(FakeGitlabConnection, self).addProject(project)
self.gl_client.addProject(project)
def protectBranch(self, owner, project, branch, protected=True):
if branch in self.gl_client.fake_repos[(owner, project)]:
del self.gl_client.fake_repos[(owner, project)][branch]
fake_branch = FakeBranch(branch, protected=protected)
self.gl_client.fake_repos[(owner, project)].append(fake_branch)
def deleteBranch(self, owner, project, branch):
if branch in self.gl_client.fake_repos[(owner, project)]:
del self.gl_client.fake_repos[(owner, project)][branch]
def getGitUrl(self, project):
return 'file://' + os.path.join(self.upstream_root, project.name)
def real_getGitUrl(self, project):
return super(FakeGitlabConnection, self).getGitUrl(project)
def openFakeMergeRequest(self, project,
branch, title, description='', files=[]):
self.mr_number += 1
merge_request = FakeGitlabMergeRequest(
self, self.mr_number, project, branch, title, self.upstream_root,
files=files, description=description)
self.merge_requests.setdefault(
project, {})[str(self.mr_number)] = merge_request
return merge_request
def emitEvent(self, event, use_zuulweb=False, project=None):
name, payload = event
if use_zuulweb:
payload = json.dumps(payload).encode('utf-8')
headers = {'x-gitlab-token': self.webhook_token}
return requests.post(
'http://127.0.0.1:%s/api/connection/%s/payload'
% (self.zuul_web_port, self.connection_name),
data=payload, headers=headers)
else:
data = {'payload': payload}
self.event_queue.put(data)
return data
def setZuulWebPort(self, port):
self.zuul_web_port = port
def getPushEvent(
self, project, before=None, after=None,
branch='refs/heads/master'):
name = 'gl_push'
if not after:
repo_path = os.path.join(self.upstream_root, project)
repo = git.Repo(repo_path)
after = repo.head.commit.hexsha
data = {
'object_kind': 'push',
'before': before or '1' * 40,
'after': after,
'ref': branch,
'project': {
'path_with_namespace': project
},
}
return (name, data)
def getGitTagEvent(self, project, tag, sha):
name = 'gl_push'
data = {
'object_kind': 'tag_push',
'before': '0' * 40,
'after': sha,
'ref': 'refs/tags/%s' % tag,
'project': {
'path_with_namespace': project
},
}
return (name, data)
@contextmanager
def enable_community_edition(self):
self.gl_client.community_edition = True
yield
self.gl_client.community_edition = False
FakeBranch = namedtuple('Branch', ('name', 'protected'))
class FakeGitlabAPIClient(gitlabconnection.GitlabAPIClient):
log = logging.getLogger("zuul.test.FakeGitlabAPIClient")
def __init__(self, baseurl, api_token, merge_requests_db={}):
super(FakeGitlabAPIClient, self).__init__(baseurl, api_token)
self.merge_requests = merge_requests_db
self.fake_repos = defaultdict(lambda: IterableList('name'))
self.community_edition = False
def gen_error(self, verb):
return {
'message': 'some error',
}, 503, "", verb
def _get_mr(self, match):
project, number = match.groups()
project = urllib.parse.unquote(project)
mr = self.merge_requests.get(project, {}).get(number)
if not mr:
return self.gen_error("GET")
return mr
def get(self, url, zuul_event_id=None):
log = get_annotated_logger(self.log, zuul_event_id)
log.debug("Getting resource %s ..." % url)
match = re.match(r'.+/projects/(.+)/merge_requests/(\d+)$', url)
if match:
mr = self._get_mr(match)
return {
'target_branch': mr.branch,
'title': mr.subject,
'state': mr.state,
'description': mr.description,
'author': {
'name': 'Administrator',
'username': 'admin'
},
'updated_at': mr.updated_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
'sha': mr.sha,
'labels': mr.labels,
'merged_at': mr.merged_at,
'diff_refs': {
'base_sha': 'c380d3acebd181f13629a25d2e2acca46ffe1e00',
'head_sha': '2be7ddb704c7b6b83732fdd5b9f09d5a397b5f8f',
'start_sha': 'c380d3acebd181f13629a25d2e2acca46ffe1e00'
},
'merge_status': mr.merge_status,
}, 200, "", "GET"
match = re.match('.+/projects/(.+)/'
'repository/branches\\?.*$', url)
if match:
project = urllib.parse.unquote(match.group(1)).split('/')
req = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(req.query)
per_page = int(query["per_page"][0])
page = int(query["page"][0])
repo = self.fake_repos[tuple(project)]
first_entry = (page - 1) * per_page
last_entry = min(len(repo), (page) * per_page)
if first_entry >= len(repo):
branches = []
else:
branches = [{'name': repo[i].name,
'protected': repo[i].protected}
for i in range(first_entry, last_entry)]
return branches, 200, "", "GET"
match = re.match(
r'.+/projects/(.+)/merge_requests/(\d+)/approvals$', url)
if match:
mr = self._get_mr(match)
if not self.community_edition:
return {
'approvals_left': 0 if mr.approved else 1,
}, 200, "", "GET"
else:
return {
'approved': mr.approved,
}, 200, "", "GET"
match = re.match(r'.+/projects/(.+)/repository/branches/(.+)$', url)
if match:
project, branch = match.groups()
project = urllib.parse.unquote(project)
branch = urllib.parse.unquote(branch)
owner, name = project.split('/')
if branch in self.fake_repos[(owner, name)]:
protected = self.fake_repos[(owner, name)][branch].protected
return {'protected': protected}, 200, "", "GET"
else:
return {}, 404, "", "GET"
def post(self, url, params=None, zuul_event_id=None):
self.log.info(
"Posting on resource %s, params (%s) ..." % (url, params))
match = re.match(r'.+/projects/(.+)/merge_requests/(\d+)/notes$', url)
if match:
mr = self._get_mr(match)
mr.addNote(params['body'])
match = re.match(
r'.+/projects/(.+)/merge_requests/(\d+)/approve$', url)
if match:
assert 'sha' in params
mr = self._get_mr(match)
if params['sha'] != mr.sha:
return {'message': 'SHA does not match HEAD of source '
'branch: <new_sha>'}, 409, "", "POST"
mr.approved = True
match = re.match(
r'.+/projects/(.+)/merge_requests/(\d+)/unapprove$', url)
if match:
mr = self._get_mr(match)
mr.approved = False
return {}, 200, "", "POST"
def put(self, url, params=None, zuul_event_id=None):
self.log.info(
"Put on resource %s, params (%s) ..." % (url, params))
match = re.match(r'.+/projects/(.+)/merge_requests/(\d+)/merge$', url)
if match:
mr = self._get_mr(match)
mr.mergeMergeRequest()
return {}, 200, "", "PUT"
def addProject(self, project):
self.addProjectByName(project.name)
def addProjectByName(self, project_name):
owner, proj = project_name.split('/')
repo = self.fake_repos[(owner, proj)]
branch = FakeBranch('master', False)
if 'master' not in repo:
repo.append(branch)
class GitlabChangeReference(git.Reference):
_common_path_default = "refs/merge-requests"
_points_to_commits_only = True
class FakeGitlabMergeRequest(object):
log = logging.getLogger("zuul.test.FakeGitlabMergeRequest")
def __init__(self, gitlab, number, project, branch,
subject, upstream_root, files=[], description=''):
self.gitlab = gitlab
self.source = gitlab
self.number = number
self.project = project
self.branch = branch
self.subject = subject
self.description = description
self.upstream_root = upstream_root
self.number_of_commits = 0
self.created_at = datetime.datetime.now(datetime.timezone.utc)
self.updated_at = self.created_at
self.merged_at = None
self.sha = None
self.state = 'opened'
self.is_merged = False
self.merge_status = 'can_be_merged'
self.labels = []
self.notes = []
self.url = "https://%s/%s/merge_requests/%s" % (
self.gitlab.server, self.project, self.number)
self.approved = False
self.mr_ref = self._createMRRef()
self._addCommitInMR(files=files)
def _getRepo(self):
repo_path = os.path.join(self.upstream_root, self.project)
return git.Repo(repo_path)
def _createMRRef(self):
repo = self._getRepo()
return GitlabChangeReference.create(
repo, self.getMRReference(), 'refs/tags/init')
def getMRReference(self):
return '%s/head' % self.number
def addNote(self, body):
self.notes.append(
{
"body": body,
"created_at": datetime.datetime.now(datetime.timezone.utc),
}
)
def addCommit(self, files=[]):
self._addCommitInMR(files=files)
self._updateTimeStamp()
def closeMergeRequest(self):
self.state = 'closed'
self._updateTimeStamp()
def mergeMergeRequest(self):
self.state = 'merged'
self.is_merged = True
self._updateTimeStamp()
self.merged_at = self.updated_at
def reopenMergeRequest(self):
self.state = 'opened'
self._updateTimeStamp()
self.merged_at = None
def _addCommitInMR(self, files=[], reset=False):
repo = self._getRepo()
ref = repo.references[self.getMRReference()]
if reset:
self.number_of_commits = 0
ref.set_object('refs/tags/init')
self.number_of_commits += 1
repo.head.reference = ref
repo.git.clean('-x', '-f', '-d')
if files:
self.files = files
else:
fn = '%s-%s' % (self.branch.replace('/', '_'), self.number)
self.files = {fn: "test %s %s\n" % (self.branch, self.number)}
msg = self.subject + '-' + str(self.number_of_commits)
for fn, content in self.files.items():
fn = os.path.join(repo.working_dir, fn)
with open(fn, 'w') as f:
f.write(content)
repo.index.add([fn])
self.sha = repo.index.commit(msg).hexsha
repo.create_head(self.getMRReference(), self.sha, force=True)
self.mr_ref.set_commit(self.sha)
repo.head.reference = 'master'
repo.git.clean('-x', '-f', '-d'