923 lines
30 KiB
Python
923 lines
30 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright 2018 Red Hat, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import functools
|
|
import urllib
|
|
from collections import defaultdict
|
|
|
|
import datetime
|
|
|
|
import github3.exceptions
|
|
import re
|
|
import time
|
|
|
|
import graphene
|
|
from requests import HTTPError
|
|
from requests.structures import CaseInsensitiveDict
|
|
|
|
from tests.fake_graphql import FakeGithubQuery
|
|
from zuul.driver.github.githubconnection import utc
|
|
|
|
FAKE_BASE_URL = 'https://example.com/api/v3/'
|
|
|
|
|
|
class FakeUser(object):
|
|
def __init__(self, login):
|
|
self.login = login
|
|
self.name = login
|
|
self.email = '%s@example.com' % login
|
|
self.html_url = 'https://example.com/%s' % login
|
|
|
|
|
|
class FakeBranch(object):
|
|
def __init__(self, fake_repo, branch='master', protected=False):
|
|
self.name = branch
|
|
self._fake_repo = fake_repo
|
|
|
|
@property
|
|
def protected(self):
|
|
return self.name in self._fake_repo._branch_protection_rules
|
|
|
|
def as_dict(self):
|
|
return {
|
|
'name': self.name,
|
|
'protected': self.protected
|
|
}
|
|
|
|
|
|
class FakeCreator:
|
|
def __init__(self, login):
|
|
self.login = login
|
|
|
|
|
|
class FakeStatus(object):
|
|
def __init__(self, state, url, description, context, user):
|
|
self.state = state
|
|
self.context = context
|
|
self.creator = FakeCreator(user)
|
|
self._url = url
|
|
self._description = description
|
|
|
|
def as_dict(self):
|
|
return {
|
|
'state': self.state,
|
|
'url': self._url,
|
|
'description': self._description,
|
|
'context': self.context,
|
|
'creator': {
|
|
'login': self.creator.login
|
|
}
|
|
}
|
|
|
|
|
|
class FakeApp:
|
|
def __init__(self, name, slug):
|
|
self.name = name
|
|
self.slug = slug
|
|
|
|
|
|
class FakeCheckRun(object):
|
|
def __init__(self, id, name, details_url, output, status, conclusion,
|
|
completed_at, external_id, actions, app):
|
|
if actions is None:
|
|
actions = []
|
|
|
|
self.id = id
|
|
self.name = name
|
|
self.details_url = details_url
|
|
self.output = output
|
|
self.conclusion = conclusion
|
|
self.completed_at = completed_at
|
|
self.external_id = external_id
|
|
self.actions = actions
|
|
self.app = FakeApp(name=app, slug=app)
|
|
|
|
# Github automatically sets the status to "completed" if a conclusion
|
|
# is provided.
|
|
if conclusion is not None:
|
|
self.status = "completed"
|
|
else:
|
|
self.status = status
|
|
|
|
def as_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
"name": self.name,
|
|
"status": self.status,
|
|
"output": self.output,
|
|
"details_url": self.details_url,
|
|
"conclusion": self.conclusion,
|
|
"completed_at": self.completed_at,
|
|
"external_id": self.external_id,
|
|
"actions": self.actions,
|
|
"app": {
|
|
"slug": self.app.slug,
|
|
"name": self.app.name,
|
|
},
|
|
}
|
|
|
|
def update(self, conclusion, completed_at, output, details_url,
|
|
external_id, actions):
|
|
self.conclusion = conclusion
|
|
self.completed_at = completed_at
|
|
self.output = output
|
|
self.details_url = details_url
|
|
self.external_id = external_id
|
|
self.actions = actions
|
|
|
|
# As we are only calling the update method when a build is completed,
|
|
# we can always set the status to "completed".
|
|
self.status = "completed"
|
|
|
|
|
|
class FakeGHReview(object):
|
|
|
|
def __init__(self, data):
|
|
self.data = data
|
|
|
|
def as_dict(self):
|
|
return self.data
|
|
|
|
|
|
class FakeCombinedStatus(object):
|
|
def __init__(self, sha, statuses):
|
|
self.sha = sha
|
|
self.statuses = statuses
|
|
|
|
|
|
class FakeCommit(object):
|
|
def __init__(self, sha):
|
|
self._statuses = []
|
|
self.sha = sha
|
|
self._check_runs = []
|
|
|
|
def set_status(self, state, url, description, context, user):
|
|
status = FakeStatus(
|
|
state, url, description, context, user)
|
|
# always insert a status to the front of the list, to represent
|
|
# the last status provided for a commit.
|
|
self._statuses.insert(0, status)
|
|
|
|
def set_check_run(self, id, name, details_url, output, status, conclusion,
|
|
completed_at, external_id, actions, app):
|
|
check_run = FakeCheckRun(
|
|
id,
|
|
name,
|
|
details_url,
|
|
output,
|
|
status,
|
|
conclusion,
|
|
completed_at,
|
|
external_id,
|
|
actions,
|
|
app,
|
|
)
|
|
# Always insert a check_run to the front of the list to represent the
|
|
# last check_run provided for a commit.
|
|
self._check_runs.insert(0, check_run)
|
|
return check_run
|
|
|
|
def get_url(self, path, params=None):
|
|
if path == 'statuses':
|
|
statuses = [s.as_dict() for s in self._statuses]
|
|
return FakeResponse(statuses)
|
|
if path == "check-runs":
|
|
check_runs = [c.as_dict() for c in self._check_runs]
|
|
resp = {"total_count": len(check_runs), "check_runs": check_runs}
|
|
return FakeResponse(resp)
|
|
|
|
def statuses(self):
|
|
return self._statuses
|
|
|
|
def check_runs(self):
|
|
return self._check_runs
|
|
|
|
def status(self):
|
|
'''
|
|
Returns the combined status wich only contains the latest statuses of
|
|
the commit together with some other information that we don't need
|
|
here.
|
|
'''
|
|
latest_statuses_by_context = {}
|
|
for status in self._statuses:
|
|
if status.context not in latest_statuses_by_context:
|
|
latest_statuses_by_context[status.context] = status
|
|
combined_statuses = latest_statuses_by_context.values()
|
|
return FakeCombinedStatus(self.sha, combined_statuses)
|
|
|
|
|
|
class FakeRepository(object):
|
|
def __init__(self, name, data):
|
|
self._api = FAKE_BASE_URL
|
|
self._branches = [FakeBranch(self)]
|
|
self._commits = {}
|
|
self.data = data
|
|
self.name = name
|
|
self.check_run_counter = 0
|
|
|
|
# Simple dictionary to store permission values per feature (e.g.
|
|
# checks, Repository contents, Pull requests, Commit statuses, ...).
|
|
# Could be used to just enable/deable a permission (True, False) or
|
|
# provide more specific values like "read" or "read&write". The mocked
|
|
# functionality in the FakeRepository class should then check for this
|
|
# value and raise an appropriate exception like a production Github
|
|
# would do in case the permission is not sufficient or missing at all.
|
|
self._permissions = {}
|
|
|
|
# List of branch protection rules
|
|
self._branch_protection_rules = defaultdict(FakeBranchProtectionRule)
|
|
self._repodata = {
|
|
'allow_merge_commit': True,
|
|
'allow_squash_merge': True,
|
|
'allow_rebase_merge': True,
|
|
'default_branch': 'master',
|
|
}
|
|
|
|
# fail the next commit requests with 404
|
|
self.fail_not_found = 0
|
|
|
|
def branches(self, protected=False):
|
|
if protected:
|
|
# simulate there is no protected branch
|
|
return [b for b in self._branches if b.protected]
|
|
return self._branches
|
|
|
|
def _set_branch_protection(self, branch_name, protected=True,
|
|
contexts=None, require_review=False):
|
|
if not protected:
|
|
if branch_name in self._branch_protection_rules:
|
|
del self._branch_protection_rules[branch_name]
|
|
return
|
|
|
|
rule = self._branch_protection_rules[branch_name]
|
|
rule.pattern = branch_name
|
|
rule.required_contexts = contexts or []
|
|
rule.require_reviews = require_review
|
|
|
|
def _set_permission(self, key, value):
|
|
# NOTE (felix): Currently, this is only used to mock a repo with
|
|
# missing checks API permissions. But we could also use it to test
|
|
# arbitrary permission values like missing write, but only read
|
|
# permissions for a specific functionality.
|
|
self._permissions[key] = value
|
|
|
|
def _build_url(self, *args, **kwargs):
|
|
path_args = ['repos', self.name]
|
|
path_args.extend(args)
|
|
fakepath = '/'.join(path_args)
|
|
return FAKE_BASE_URL + fakepath
|
|
|
|
def _get(self, url, headers=None):
|
|
client = FakeGithubClient(data=self.data)
|
|
return client.session.get(url, headers)
|
|
|
|
def _create_branch(self, branch):
|
|
self._branches.append((FakeBranch(self, branch=branch)))
|
|
|
|
def _delete_branch(self, branch_name):
|
|
self._branches = [b for b in self._branches if b.name != branch_name]
|
|
|
|
def create_status(self, sha, state, url, description, context,
|
|
user='zuul'):
|
|
# Since we're bypassing github API, which would require a user, we
|
|
# default the user as 'zuul' here.
|
|
commit = self._commits.get(sha, None)
|
|
if commit is None:
|
|
commit = FakeCommit(sha)
|
|
self._commits[sha] = commit
|
|
commit.set_status(state, url, description, context, user)
|
|
|
|
def create_check_run(self, head_sha, name, details_url=None, output=None,
|
|
status=None, conclusion=None, completed_at=None,
|
|
external_id=None, actions=None, app="zuul"):
|
|
|
|
# Raise the appropriate github3 exception in case we don't have
|
|
# permission to access the checks API
|
|
if self._permissions.get("checks") is False:
|
|
# To create a proper github3 exception, we need to mock a response
|
|
# object
|
|
raise github3.exceptions.ForbiddenError(
|
|
FakeResponse("Resource not accessible by integration", 403)
|
|
)
|
|
|
|
commit = self._commits.get(head_sha, None)
|
|
if commit is None:
|
|
commit = FakeCommit(head_sha)
|
|
self._commits[head_sha] = commit
|
|
self.check_run_counter += 1
|
|
commit.set_check_run(
|
|
str(self.check_run_counter),
|
|
name,
|
|
details_url,
|
|
output,
|
|
status,
|
|
conclusion,
|
|
completed_at,
|
|
external_id,
|
|
actions,
|
|
app,
|
|
)
|
|
|
|
def commit(self, sha):
|
|
|
|
if self.fail_not_found > 0:
|
|
self.fail_not_found -= 1
|
|
resp = FakeResponse(404, 'Not found')
|
|
raise github3.exceptions.NotFoundError(resp)
|
|
|
|
commit = self._commits.get(sha, None)
|
|
if commit is None:
|
|
commit = FakeCommit(sha)
|
|
self._commits[sha] = commit
|
|
return commit
|
|
|
|
def get_url(self, path, params=None):
|
|
if '/' in path:
|
|
entity, request = path.split('/', 1)
|
|
else:
|
|
entity = path
|
|
request = None
|
|
|
|
if entity == 'branches':
|
|
return self.get_url_branches(request, params=params)
|
|
if entity == 'collaborators':
|
|
return self.get_url_collaborators(request)
|
|
if entity == 'commits':
|
|
return self.get_url_commits(request, params=params)
|
|
if entity == '':
|
|
return self.get_url_repo()
|
|
else:
|
|
return None
|
|
|
|
def get_url_branches(self, path, params=None):
|
|
if path is None:
|
|
# request wants a branch list
|
|
return self.get_url_branch_list(params)
|
|
|
|
elements = path.split('/')
|
|
|
|
entity = elements[-1]
|
|
if entity == 'protection':
|
|
branch = '/'.join(elements[0:-1])
|
|
return self.get_url_protection(branch)
|
|
else:
|
|
# fall back to treat all elements as branch
|
|
branch = '/'.join(elements)
|
|
return self.get_url_branch(branch)
|
|
|
|
def get_url_commits(self, path, params=None):
|
|
if '/' in path:
|
|
sha, request = path.split('/', 1)
|
|
else:
|
|
sha = path
|
|
request = None
|
|
commit = self._commits.get(sha)
|
|
|
|
# Commits are created lazy so check if there is a PR with the correct
|
|
# head sha.
|
|
if commit is None:
|
|
pull_requests = [pr for pr in self.data.pull_requests.values()
|
|
if pr.head_sha == sha]
|
|
if pull_requests:
|
|
commit = FakeCommit(sha)
|
|
self._commits[sha] = commit
|
|
|
|
if not commit:
|
|
return FakeResponse({}, 404)
|
|
|
|
return commit.get_url(request, params=params)
|
|
|
|
def get_url_branch_list(self, params):
|
|
if params.get('protected') == 1:
|
|
exclude_unprotected = True
|
|
else:
|
|
exclude_unprotected = False
|
|
branches = [x.as_dict() for x in self.branches(exclude_unprotected)]
|
|
|
|
return FakeResponse(branches, 200)
|
|
|
|
def get_url_branch(self, branch_name):
|
|
for branch in self._branches:
|
|
if branch.name == branch_name:
|
|
return FakeResponse(branch.as_dict())
|
|
return FakeResponse(None, 404)
|
|
|
|
def get_url_collaborators(self, path):
|
|
login, entity = path.split('/')
|
|
|
|
if entity == 'permission':
|
|
owner, proj = self.name.split('/')
|
|
permission = None
|
|
for pr in self.data.pull_requests.values():
|
|
pr_owner, pr_project = pr.project.split('/')
|
|
if (pr_owner == owner and proj == pr_project):
|
|
if login in pr.admins:
|
|
permission = 'admin'
|
|
break
|
|
elif login in pr.writers:
|
|
permission = 'write'
|
|
break
|
|
else:
|
|
permission = 'read'
|
|
data = {
|
|
'permission': permission,
|
|
}
|
|
return FakeResponse(data)
|
|
else:
|
|
return None
|
|
|
|
def get_url_protection(self, branch):
|
|
rule = self._branch_protection_rules.get(branch)
|
|
|
|
if not rule:
|
|
# Note that GitHub returns 404 if branch protection is off so do
|
|
# the same here as well
|
|
return FakeResponse({}, 404)
|
|
data = {
|
|
'required_status_checks': {
|
|
'contexts': rule.required_contexts
|
|
}
|
|
}
|
|
return FakeResponse(data)
|
|
|
|
def get_url_repo(self):
|
|
return FakeResponse(self._repodata)
|
|
|
|
def pull_requests(self, state=None, sort=None, direction=None):
|
|
# sort and direction are unused currently, but present to match
|
|
# real world call signatures.
|
|
pulls = []
|
|
for pull in self.data.pull_requests.values():
|
|
if pull.project != self.name:
|
|
continue
|
|
if state and pull.state != state:
|
|
continue
|
|
pulls.append(FakePull(pull))
|
|
return pulls
|
|
|
|
|
|
class FakeIssue(object):
|
|
def __init__(self, fake_pull_request):
|
|
self._fake_pull_request = fake_pull_request
|
|
|
|
def pull_request(self):
|
|
return FakePull(self._fake_pull_request)
|
|
|
|
@property
|
|
def number(self):
|
|
return self._fake_pull_request.number
|
|
|
|
|
|
@functools.total_ordering
|
|
class FakeFile(object):
|
|
def __init__(self, filename, previous_filename=None):
|
|
self.filename = filename
|
|
if previous_filename is not None:
|
|
self.previous_filename = previous_filename
|
|
|
|
def __eq__(self, other):
|
|
return self.filename == other.filename
|
|
|
|
def __lt__(self, other):
|
|
return self.filename < other.filename
|
|
|
|
__hash__ = object.__hash__
|
|
|
|
|
|
class FakePull(object):
|
|
def __init__(self, fake_pull_request):
|
|
self._fake_pull_request = fake_pull_request
|
|
|
|
def issue(self):
|
|
return FakeIssue(self._fake_pull_request)
|
|
|
|
def files(self):
|
|
# Github lists max. 300 files of a PR in alphabetical order
|
|
return sorted(self._fake_pull_request.files)[:300]
|
|
|
|
def reviews(self):
|
|
return self._fake_pull_request.reviews
|
|
|
|
def create_review(self, body, commit_id, event):
|
|
review = FakeGHReview({
|
|
'state': event,
|
|
'user': {
|
|
'login': 'fakezuul',
|
|
'email': 'fakezuul@fake.test',
|
|
},
|
|
'submitted_at': time.gmtime(),
|
|
})
|
|
self._fake_pull_request.reviews.append(review)
|
|
return review
|
|
|
|
@property
|
|
def head(self):
|
|
client = FakeGithubClient(
|
|
data=self._fake_pull_request.github.github_data)
|
|
repo = client.repo_from_project(self._fake_pull_request.project)
|
|
return repo.commit(self._fake_pull_request.head_sha)
|
|
|
|
def commits(self):
|
|
# since we don't know all commits of a pr we just return here a list
|
|
# with the head_sha as the only commit
|
|
return [self.head]
|
|
|
|
def as_dict(self):
|
|
pr = self._fake_pull_request
|
|
connection = pr.github
|
|
data = {
|
|
'number': pr.number,
|
|
'title': pr.subject,
|
|
'url': 'https://%s/api/v3/%s/pulls/%s' % (
|
|
connection.server, pr.project, pr.number
|
|
),
|
|
'html_url': 'https://%s/%s/pull/%s' % (
|
|
connection.server, pr.project, pr.number
|
|
),
|
|
'updated_at': pr.updated_at,
|
|
'base': {
|
|
'repo': {
|
|
'full_name': pr.project
|
|
},
|
|
'ref': pr.branch,
|
|
'sha': pr.base_sha,
|
|
},
|
|
'user': {
|
|
'login': 'octocat'
|
|
},
|
|
'draft': pr.draft,
|
|
'mergeable': pr.mergeable,
|
|
'state': pr.state,
|
|
'head': {
|
|
'sha': pr.head_sha,
|
|
'ref': pr.getPRReference(),
|
|
'repo': {
|
|
'full_name': pr.project
|
|
}
|
|
},
|
|
'merged': pr.is_merged,
|
|
'body': pr.body,
|
|
'body_text': pr.body_text,
|
|
'changed_files': len(pr.files),
|
|
'labels': [{'name': l} for l in pr.labels]
|
|
}
|
|
return data
|
|
|
|
|
|
class FakeIssueSearchResult(object):
|
|
def __init__(self, issue):
|
|
self.issue = issue
|
|
|
|
|
|
class FakeResponse(object):
|
|
def __init__(self, data, status_code=200, status_message='OK'):
|
|
self.status_code = status_code
|
|
self.status_message = status_message
|
|
self.data = data
|
|
self.links = {}
|
|
|
|
@property
|
|
def content(self):
|
|
# Building github3 exceptions requires a Response object with the
|
|
# content attribute set.
|
|
return self.data
|
|
|
|
def json(self):
|
|
return self.data
|
|
|
|
def raise_for_status(self):
|
|
if 400 <= self.status_code < 600:
|
|
if isinstance(self.data, str):
|
|
text = '{} {}'.format(self.status_code, self.data)
|
|
else:
|
|
text = '{} {}'.format(self.status_code, self.status_message)
|
|
raise HTTPError(text, response=self)
|
|
|
|
|
|
class FakeGithubSession(object):
|
|
|
|
def __init__(self, client):
|
|
self.client = client
|
|
self.headers = CaseInsensitiveDict()
|
|
self._base_url = None
|
|
self.schema = graphene.Schema(query=FakeGithubQuery)
|
|
|
|
# Imitate hooks dict. This will be unused and ignored in the tests.
|
|
self.hooks = {
|
|
'response': []
|
|
}
|
|
|
|
def build_url(self, *args):
|
|
fakepath = '/'.join(args)
|
|
return FAKE_BASE_URL + fakepath
|
|
|
|
def get(self, url, headers=None, params=None):
|
|
request = url
|
|
if request.startswith(FAKE_BASE_URL):
|
|
request = request[len(FAKE_BASE_URL):]
|
|
|
|
entity, request = request.split('/', 1)
|
|
|
|
if entity == 'repos':
|
|
return self.get_repo(request, params=params)
|
|
else:
|
|
# unknown entity to process
|
|
return None
|
|
|
|
def post(self, url, data=None, headers=None, params=None, json=None):
|
|
|
|
# Handle graphql
|
|
if json and json.get('query'):
|
|
query = json.get('query')
|
|
variables = json.get('variables')
|
|
result = self.schema.execute(
|
|
query, variables=variables, context=self.client)
|
|
if result.errors:
|
|
# Note that github really returns 200 and an errors field in
|
|
# case of an error.
|
|
return FakeResponse({'errors': result.errors}, 200)
|
|
return FakeResponse({'data': result.data}, 200)
|
|
|
|
# Handle creating comments
|
|
match = re.match(r'.+/repos/(.+)/issues/(\d+)/comments$', url)
|
|
if match:
|
|
project, pr_number = match.groups()
|
|
project = urllib.parse.unquote(project)
|
|
self.client._data.reports.append((project, pr_number, 'comment'))
|
|
pull_request = self.client._data.pull_requests[int(pr_number)]
|
|
pull_request.addComment(json['body'])
|
|
return FakeResponse(None, 200)
|
|
|
|
# Handle access token creation
|
|
if re.match(r'.*/app/installations/.*/access_tokens', url):
|
|
expiry = (datetime.datetime.now(utc) + datetime.timedelta(
|
|
minutes=60)).replace(microsecond=0).isoformat()
|
|
install_id = url.split('/')[-2]
|
|
data = {
|
|
'token': 'token-%s' % install_id,
|
|
'expires_at': expiry,
|
|
}
|
|
return FakeResponse(data, 201)
|
|
|
|
# Handle check run creation
|
|
match = re.match(r'.*/repos/(.*)/check-runs$', url)
|
|
if match:
|
|
if self.client._data.fail_check_run_creation:
|
|
return FakeResponse('Internal server error', 500)
|
|
|
|
org, reponame = match.groups()[0].split('/', 1)
|
|
repo = self.client._data.repos.get((org, reponame))
|
|
|
|
if repo._permissions.get("checks") is False:
|
|
# To create a proper github3 exception, we need to mock a
|
|
# response object
|
|
return FakeResponse(
|
|
"Resource not accessible by integration", 403)
|
|
|
|
head_sha = json.get('head_sha')
|
|
commit = repo._commits.get(head_sha, None)
|
|
if commit is None:
|
|
commit = FakeCommit(head_sha)
|
|
repo._commits[head_sha] = commit
|
|
repo.check_run_counter += 1
|
|
check_run = commit.set_check_run(
|
|
str(repo.check_run_counter),
|
|
json['name'],
|
|
json['details_url'],
|
|
json['output'],
|
|
json.get('status'),
|
|
json.get('conclusion'),
|
|
json.get('completed_at'),
|
|
json['external_id'],
|
|
json['actions'],
|
|
json.get('app', 'zuul'),
|
|
)
|
|
|
|
return FakeResponse(check_run.as_dict(), 201)
|
|
|
|
return FakeResponse(None, 404)
|
|
|
|
def put(self, url, data=None, headers=None, params=None, json=None):
|
|
# Handle pull request merge
|
|
match = re.match(r'.+/repos/(.+)/pulls/(\d+)/merge$', url)
|
|
if match:
|
|
project, pr_number = match.groups()
|
|
project = urllib.parse.unquote(project)
|
|
pr = self.client._data.pull_requests[int(pr_number)]
|
|
conn = pr.github
|
|
|
|
# record that this got reported
|
|
self.client._data.reports.append(
|
|
(pr.project, pr.number, 'merge', json["merge_method"]))
|
|
if conn.merge_failure:
|
|
raise Exception('Unknown merge failure')
|
|
if conn.merge_not_allowed_count > 0:
|
|
conn.merge_not_allowed_count -= 1
|
|
# GitHub returns 405 Method not allowed with more details in
|
|
# the body of the response.
|
|
data = {
|
|
'message': 'Merge not allowed because of fake reason',
|
|
}
|
|
return FakeResponse(data, 405, 'Method not allowed')
|
|
pr.setMerged(json.get("commit_message", ""))
|
|
return FakeResponse({"merged": True}, 200)
|
|
|
|
return FakeResponse(None, 404)
|
|
|
|
def patch(self, url, data=None, headers=None, params=None, json=None):
|
|
|
|
# Handle check run update
|
|
match = re.match(r'.*/repos/(.*)/check-runs/(.*)$', url)
|
|
if match:
|
|
org, reponame = match.groups()[0].split('/', 1)
|
|
check_run_id = match.groups()[1]
|
|
repo = self.client._data.repos.get((org, reponame))
|
|
|
|
# Find the specified check run
|
|
check_runs = [
|
|
check_run
|
|
for commit in repo._commits.values()
|
|
for check_run in commit._check_runs
|
|
if check_run.id == check_run_id
|
|
]
|
|
check_run = check_runs[0]
|
|
|
|
check_run.update(json['conclusion'],
|
|
json['completed_at'],
|
|
json['output'],
|
|
json['details_url'],
|
|
json['external_id'],
|
|
json['actions'])
|
|
return FakeResponse(check_run.as_dict(), 200)
|
|
|
|
def get_repo(self, request, params=None):
|
|
parts = request.split('/', 2)
|
|
if len(parts) == 2:
|
|
org, project = parts
|
|
request = ''
|
|
else:
|
|
org, project, request = parts
|
|
project_name = '{}/{}'.format(org, project)
|
|
|
|
repo = self.client.repo_from_project(project_name)
|
|
|
|
return repo.get_url(request, params=params)
|
|
|
|
def mount(self, prefix, adapter):
|
|
# Don't care in tests
|
|
pass
|
|
|
|
|
|
class FakeBranchProtectionRule:
|
|
|
|
def __init__(self):
|
|
self.pattern = None
|
|
self.required_contexts = []
|
|
self.require_reviews = False
|
|
self.require_codeowners_review = False
|
|
|
|
|
|
class FakeGithubData(object):
|
|
def __init__(self, pull_requests):
|
|
self.pull_requests = pull_requests
|
|
self.repos = {}
|
|
self.reports = []
|
|
self.fail_check_run_creation = False
|
|
|
|
def __repr__(self):
|
|
return ("pull_requests:%s repos:%s reports:%s "
|
|
"fail_check_run_creation:%s" % (
|
|
self.pull_requests, self.repos, self.reports,
|
|
self.fail_check_run_creation))
|
|
|
|
|
|
class FakeGithubClient(object):
|
|
|
|
def __init__(self, session=None, data=None):
|
|
self._data = data
|
|
self._inst_id = None
|
|
self.session = FakeGithubSession(self)
|
|
|
|
def setData(self, data):
|
|
self._data = data
|
|
|
|
def setInstId(self, inst_id):
|
|
self._inst_id = inst_id
|
|
|
|
def user(self, login):
|
|
return FakeUser(login)
|
|
|
|
def repository(self, owner, proj):
|
|
return self._data.repos.get((owner, proj), None)
|
|
|
|
def login(self, token):
|
|
pass
|
|
|
|
def repo_from_project(self, project):
|
|
# This is a convenience method for the tests.
|
|
owner, proj = project.split('/')
|
|
return self.repository(owner, proj)
|
|
|
|
def addProject(self, project):
|
|
owner, proj = project.name.split('/')
|
|
self._data.repos[(owner, proj)] = FakeRepository(
|
|
project.name, self._data)
|
|
|
|
def addProjectByName(self, project_name):
|
|
owner, proj = project_name.split('/')
|
|
self._data.repos[(owner, proj)] = FakeRepository(
|
|
project_name, self._data)
|
|
|
|
def pull_request(self, owner, project, number):
|
|
fake_pr = self._data.pull_requests[int(number)]
|
|
repo = self.repository(owner, project)
|
|
# Ensure a commit for the head_sha exists so this can be resolved in
|
|
# graphql queries.
|
|
repo._commits.setdefault(
|
|
fake_pr.head_sha,
|
|
FakeCommit(fake_pr.head_sha)
|
|
)
|
|
return FakePull(fake_pr)
|
|
|
|
def search_issues(self, query):
|
|
def tokenize(s):
|
|
# Tokenize with handling for quoted substrings.
|
|
# Bit hacky and needs PDA, but our current inputs are
|
|
# constrained enough that this should work.
|
|
s = s[:-len(" type:pr is:open in:body")]
|
|
OR_split = [x.strip() for x in s.split('OR')]
|
|
tokens = [x.strip('"') for x in OR_split]
|
|
return tokens
|
|
|
|
def query_is_sha(s):
|
|
return re.match(r'[a-z0-9]{40}', s)
|
|
|
|
if query_is_sha(query):
|
|
# Github returns all PR's that contain the sha in their history
|
|
result = []
|
|
for pr in self._data.pull_requests.values():
|
|
# Quick check if head sha matches
|
|
if pr.head_sha == query:
|
|
result.append(FakeIssueSearchResult(FakeIssue(pr)))
|
|
continue
|
|
|
|
# If head sha doesn't match it still could be in the pr history
|
|
repo = pr._getRepo()
|
|
commits = repo.iter_commits(
|
|
'%s...%s' % (pr.branch, pr.head_sha))
|
|
for commit in commits:
|
|
if commit.hexsha == query:
|
|
result.append(FakeIssueSearchResult(FakeIssue(pr)))
|
|
continue
|
|
|
|
return result
|
|
|
|
# Non-SHA queries are of the form:
|
|
#
|
|
# '"Depends-On: <url>" OR "Depends-On: <url>"
|
|
# OR ... type:pr is:open in:body'
|
|
#
|
|
# For the tests is currently enough to simply check for the
|
|
# existence of the Depends-On strings in the PR body.
|
|
tokens = tokenize(query)
|
|
terms = set(tokens)
|
|
results = []
|
|
for pr in self._data.pull_requests.values():
|
|
if not pr.body:
|
|
body = ""
|
|
else:
|
|
body = pr.body
|
|
for term in terms:
|
|
if term in body:
|
|
issue = FakeIssue(pr)
|
|
results.append(FakeIssueSearchResult(issue))
|
|
break
|
|
|
|
return iter(results)
|
|
|
|
|
|
class FakeGithubEnterpriseClient(FakeGithubClient):
|
|
|
|
version = '2.21.0'
|
|
|
|
def __init__(self, url, session=None, verify=True):
|
|
super().__init__(session=session)
|
|
|
|
def meta(self):
|
|
data = {
|
|
'installed_version': self.version,
|
|
}
|
|
return data
|