The Gatekeeper, or a project gating system
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

458 lines
14 KiB

#!/usr/bin/env python
# Copyright 2018 Red Hat, Inc.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import github3.exceptions
import re
class FakeUser(object):
def __init__(self, login):
self.login = login = "Github User" = ""
class FakeBranch(object):
def __init__(self, branch='master', protected=False): = branch
self.protected = protected
def as_dict(self):
return {
'protected': self.protected
class FakeStatus(object):
def __init__(self, state, url, description, context, user):
self.state = state
self.context = context
self._url = url
self._description = description
self._user = user
def as_dict(self):
return {
'state': self.state,
'url': self._url,
'description': self._description,
'context': self.context,
'creator': {
'login': self._user
class FakeCombinedStatus(object):
def __init__(self, sha, statuses):
self.sha = sha
self.statuses = statuses
class FakeCommit(object):
def __init__(self, sha):
self._statuses = []
self.sha = sha
def set_status(self, state, url, description, context, user):
status = FakeStatus(
state, url, description, context, user)
# always insert a status to the front of the list, to represent
# the last status provided for a commit.
self._statuses.insert(0, status)
def statuses(self):
return self._statuses
def status(self):
Returns the combined status wich only contains the latest statuses of
the commit together with some other information that we don't need
latest_statuses_by_context = {}
for status in self._statuses:
if status.context not in latest_statuses_by_context:
latest_statuses_by_context[status.context] = status
combined_statuses = latest_statuses_by_context.values()
return FakeCombinedStatus(self.sha, combined_statuses)
class FakeRepository(object):
def __init__(self, name, data):
self._api = FAKE_BASE_URL
self._branches = [FakeBranch()]
self._commits = {} = data = name
# fail the next commit requests with 404
self.fail_not_found = 0
def branches(self, protected=False):
if protected:
# simulate there is no protected branch
return [b for b in self._branches if b.protected]
return self._branches
def _set_branch_protection(self, branch_name, protected):
for branch in self._branches:
if == branch_name:
branch.protected = protected
def _build_url(self, *args, **kwargs):
path_args = ['repos',]
fakepath = '/'.join(path_args)
return FAKE_BASE_URL + fakepath
def _get(self, url, headers=None):
client = FakeGithubClient(
return client.session.get(url, headers)
def _create_branch(self, branch):
def _delete_branch(self, branch_name):
self._branches = [b for b in self._branches if != branch_name]
def create_status(self, sha, state, url, description, context,
# Since we're bypassing github API, which would require a user, we
# default the user as 'zuul' here.
commit = self._commits.get(sha, None)
if commit is None:
commit = FakeCommit(sha)
self._commits[sha] = commit
commit.set_status(state, url, description, context, user)
def commit(self, sha):
if self.fail_not_found > 0:
self.fail_not_found -= 1
class Response:
status_code = 0
message = ''
def json(self):
return {
'message': self.message
resp = Response()
resp.status_code = 404
resp.message = 'Not Found'
raise github3.exceptions.NotFoundError(resp)
commit = self._commits.get(sha, None)
if commit is None:
commit = FakeCommit(sha)
self._commits[sha] = commit
return commit
def get_url(self, path, params=None):
if '/' in path:
entity, request = path.split('/', 1)
entity = path
request = None
if entity == 'branches':
return self.get_url_branches(request, params=params)
if entity == 'collaborators':
return self.get_url_collaborators(request)
return None
def get_url_branches(self, path, params=None):
if path is None:
# request wants a branch list
return self.get_url_branch_list(params)
elements = path.split('/')
entity = elements[-1]
if entity == 'protection':
branch = '/'.join(elements[0:-1])
return self.get_url_protection(branch)
# fall back to treat all elements as branch
branch = '/'.join(elements)
return self.get_url_branch(branch)
def get_url_branch_list(self, params):
if params.get('protected') == 1:
exclude_unprotected = True
exclude_unprotected = False
branches = [x.as_dict() for x in self.branches(exclude_unprotected)]
return FakeResponse(branches, 200)
def get_url_branch(self, branch_name):
for branch in self._branches:
if == branch_name:
return FakeResponse(branch.as_dict())
return FakeResponse(None, 404)
def get_url_collaborators(self, path):
login, entity = path.split('/')
if entity == 'permission':
owner, proj ='/')
permission = None
for pr in
pr_owner, pr_project = pr.project.split('/')
if (pr_owner == owner and proj == pr_project):
if login in pr.admins:
permission = 'admin'
elif login in pr.writers:
permission = 'write'
permission = 'read'
data = {
'permission': permission,
return FakeResponse(data)
return None
def get_url_protection(self, branch):
contexts =, branch), [])
if not contexts:
# Note that GitHub returns 404 if branch protection is off so do
# the same here as well
return FakeResponse({}, 404)
data = {
'required_status_checks': {
'contexts': contexts
return FakeResponse(data)
def pull_requests(self, state=None, sort=None, direction=None):
# sort and direction are unused currently, but present to match
# real world call signatures.
pulls = []
for pull in
if pull.project !=
if state and pull.state != state:
return pulls
class FakeLabel(object):
def __init__(self, name): = name
class FakeIssue(object):
def __init__(self, fake_pull_request):
self._fake_pull_request = fake_pull_request
def pull_request(self):
return FakePull(self._fake_pull_request)
def labels(self):
return [FakeLabel(l)
for l in self._fake_pull_request.labels]
class FakeFile(object):
def __init__(self, filename):
self.filename = filename
class FakePull(object):
def __init__(self, fake_pull_request):
self._fake_pull_request = fake_pull_request
def issue(self):
return FakeIssue(self._fake_pull_request)
def files(self):
# Github lists max. 300 files of a PR in alphabetical order
return [FakeFile(fn)
for fn in sorted(self._fake_pull_request.files)][:300]
def reviews(self):
def head(self):
client = FakeGithubClient(self._fake_pull_request.github.github_data)
repo = client.repo_from_project(self._fake_pull_request.project)
return repo.commit(self._fake_pull_request.head_sha)
def commits(self):
# since we don't know all commits of a pr we just return here a list
# with the head_sha as the only commit
return [self.head]
def as_dict(self):
pr = self._fake_pull_request
connection = pr.github
data = {
'number': pr.number,
'title': pr.subject,
'url': 'https://%s/%s/pull/%s' % (
connection.server, pr.project, pr.number
'updated_at': pr.updated_at,
'base': {
'repo': {
'full_name': pr.project
'ref': pr.branch,
'mergeable': True,
'state': pr.state,
'head': {
'sha': pr.head_sha,
'ref': pr.getPRReference(),
'repo': {
'full_name': pr.project
'merged': pr.is_merged,
'body': pr.body,
'changed_files': len(pr.files),
return data
class FakeIssueSearchResult(object):
def __init__(self, issue):
self.issue = issue
class FakeResponse(object):
def __init__(self, data, status_code=200):
self.status_code = status_code = data
self.links = {}
def json(self):
class FakeGithubSession(object):
def __init__(self, data):
self._data = data
def build_url(self, *args):
fakepath = '/'.join(args)
return FAKE_BASE_URL + fakepath
def get(self, url, headers=None, params=None):
request = url
if request.startswith(FAKE_BASE_URL):
request = request[len(FAKE_BASE_URL):]
entity, request = request.split('/', 1)
if entity == 'repos':
return self.get_repo(request, params=params)
# unknown entity to process
return None
def get_repo(self, request, params=None):
org, project, request = request.split('/', 2)
project_name = '{}/{}'.format(org, project)
client = FakeGithubClient(self._data)
repo = client.repo_from_project(project_name)
return repo.get_url(request, params=params)
class FakeGithubData(object):
def __init__(self, pull_requests):
self.pull_requests = pull_requests
self.repos = {}
self.required_contexts = {}
class FakeGithubClient(object):
def __init__(self, data, inst_id=None):
self._data = data
self._inst_id = inst_id
self.session = FakeGithubSession(data)
def user(self, login):
return FakeUser(login)
def repository(self, owner, proj):
return self._data.repos.get((owner, proj), None)
def repo_from_project(self, project):
# This is a convenience method for the tests.
owner, proj = project.split('/')
return self.repository(owner, proj)
def addProject(self, project):
owner, proj ='/')
self._data.repos[(owner, proj)] = FakeRepository(, self._data)
def addProjectByName(self, project_name):
owner, proj = project_name.split('/')
self._data.repos[(owner, proj)] = FakeRepository(
project_name, self._data)
def pull_request(self, owner, project, number):
fake_pr = self._data.pull_requests[int(number)]
return FakePull(fake_pr)
def search_issues(self, query):
def tokenize(s):
return re.findall(r'[\w]+', s)
parts = tokenize(query)
terms = set()
results = []
for part in parts:
kv = part.split(':', 1)
if len(kv) == 2:
if kv[0] in set('type', 'is', 'in'):
# We only perform one search now and these aren't
# important; we can honor these terms later if
# necessary.
for pr in self._data.pull_requests.values():
if not pr.body:
body = set()
body = set(tokenize(pr.body))
if terms.intersection(body):
issue = FakeIssue(pr)
return results