Share a fake pull request database across connections
Because connections can be recreated, ensure that the fake pull request database (really a dictionary) is shared across instances of connections and fake github classes. Also, move the fake github3 classes to their own file -- they were getting larger and unruly. Change-Id: I471c1487039c8b25a0bab95d918f31b92b9cd32b
This commit is contained in:
214
tests/fakegithub.py
Normal file
214
tests/fakegithub.py
Normal file
@@ -0,0 +1,214 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2018 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
|
||||
class FakeUser(object):
|
||||
def __init__(self, login):
|
||||
self.login = login
|
||||
self.name = "Github User"
|
||||
self.email = "github.user@example.com"
|
||||
|
||||
|
||||
class FakeBranch(object):
|
||||
def __init__(self, branch='master'):
|
||||
self.name = branch
|
||||
|
||||
|
||||
class FakeStatus(object):
|
||||
def __init__(self, state, url, description, context, user):
|
||||
self._state = state
|
||||
self._url = url
|
||||
self._description = description
|
||||
self._context = context
|
||||
self._user = user
|
||||
|
||||
def as_dict(self):
|
||||
return {
|
||||
'state': self._state,
|
||||
'url': self._url,
|
||||
'description': self._description,
|
||||
'context': self._context,
|
||||
'creator': {
|
||||
'login': self._user
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class FakeCommit(object):
|
||||
def __init__(self):
|
||||
self._statuses = []
|
||||
|
||||
def set_status(self, state, url, description, context, user):
|
||||
status = FakeStatus(
|
||||
state, url, description, context, user)
|
||||
# always insert a status to the front of the list, to represent
|
||||
# the last status provided for a commit.
|
||||
self._statuses.insert(0, status)
|
||||
|
||||
def statuses(self):
|
||||
return self._statuses
|
||||
|
||||
|
||||
class FakeRepository(object):
|
||||
def __init__(self):
|
||||
self._branches = [FakeBranch()]
|
||||
self._commits = {}
|
||||
|
||||
def branches(self, protected=False):
|
||||
if protected:
|
||||
# simulate there is no protected branch
|
||||
return []
|
||||
return self._branches
|
||||
|
||||
def create_status(self, sha, state, url, description, context,
|
||||
user='zuul'):
|
||||
# Since we're bypassing github API, which would require a user, we
|
||||
# default the user as 'zuul' here.
|
||||
commit = self._commits.get(sha, None)
|
||||
if commit is None:
|
||||
commit = FakeCommit()
|
||||
self._commits[sha] = commit
|
||||
commit.set_status(state, url, description, context, user)
|
||||
|
||||
def commit(self, sha):
|
||||
commit = self._commits.get(sha, None)
|
||||
if commit is None:
|
||||
commit = FakeCommit()
|
||||
self._commits[sha] = commit
|
||||
return commit
|
||||
|
||||
|
||||
class FakeLabel(object):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
|
||||
class FakeIssue(object):
|
||||
def __init__(self, fake_pull_request):
|
||||
self._fake_pull_request = fake_pull_request
|
||||
|
||||
def pull_request(self):
|
||||
return FakePull(self._fake_pull_request)
|
||||
|
||||
def labels(self):
|
||||
return [FakeLabel(l)
|
||||
for l in self._fake_pull_request.labels]
|
||||
|
||||
|
||||
class FakeFile(object):
|
||||
def __init__(self, filename):
|
||||
self.filename = filename
|
||||
|
||||
|
||||
class FakePull(object):
|
||||
def __init__(self, fake_pull_request):
|
||||
self._fake_pull_request = fake_pull_request
|
||||
|
||||
def issue(self):
|
||||
return FakeIssue(self._fake_pull_request)
|
||||
|
||||
def files(self):
|
||||
return [FakeFile(fn)
|
||||
for fn in self._fake_pull_request.files]
|
||||
|
||||
def as_dict(self):
|
||||
pr = self._fake_pull_request
|
||||
connection = pr.github
|
||||
data = {
|
||||
'number': pr.number,
|
||||
'title': pr.subject,
|
||||
'url': 'https://%s/%s/pull/%s' % (
|
||||
connection.server, pr.project, pr.number
|
||||
),
|
||||
'updated_at': pr.updated_at,
|
||||
'base': {
|
||||
'repo': {
|
||||
'full_name': pr.project
|
||||
},
|
||||
'ref': pr.branch,
|
||||
},
|
||||
'mergeable': True,
|
||||
'state': pr.state,
|
||||
'head': {
|
||||
'sha': pr.head_sha,
|
||||
'repo': {
|
||||
'full_name': pr.project
|
||||
}
|
||||
},
|
||||
'merged': pr.is_merged,
|
||||
'body': pr.body
|
||||
}
|
||||
return data
|
||||
|
||||
|
||||
class FakeIssueSearchResult(object):
|
||||
def __init__(self, issue):
|
||||
self.issue = issue
|
||||
|
||||
|
||||
class FakeGithub(object):
|
||||
def __init__(self, pull_requests):
|
||||
self._pull_requests = pull_requests
|
||||
self._repos = {}
|
||||
|
||||
def user(self, login):
|
||||
return FakeUser(login)
|
||||
|
||||
def repository(self, owner, proj):
|
||||
return self._repos.get((owner, proj), None)
|
||||
|
||||
def repo_from_project(self, project):
|
||||
# This is a convenience method for the tests.
|
||||
owner, proj = project.split('/')
|
||||
return self.repository(owner, proj)
|
||||
|
||||
def addProject(self, project):
|
||||
owner, proj = project.name.split('/')
|
||||
self._repos[(owner, proj)] = FakeRepository()
|
||||
|
||||
def pull_request(self, owner, project, number):
|
||||
fake_pr = self._pull_requests[number]
|
||||
return FakePull(fake_pr)
|
||||
|
||||
def search_issues(self, query):
|
||||
def tokenize(s):
|
||||
return re.findall(r'[\w]+', s)
|
||||
|
||||
parts = tokenize(query)
|
||||
terms = set()
|
||||
results = []
|
||||
for part in parts:
|
||||
kv = part.split(':', 1)
|
||||
if len(kv) == 2:
|
||||
if kv[0] in set('type', 'is', 'in'):
|
||||
# We only perform one search now and these aren't
|
||||
# important; we can honor these terms later if
|
||||
# necessary.
|
||||
continue
|
||||
terms.add(part)
|
||||
|
||||
for pr in self._pull_requests.values():
|
||||
if not pr.body:
|
||||
body = set()
|
||||
else:
|
||||
body = set(tokenize(pr.body))
|
||||
if terms.intersection(body):
|
||||
issue = FakeIssue(pr)
|
||||
results.append(FakeIssueSearchResult(issue))
|
||||
|
||||
return results
|
||||
Reference in New Issue
Block a user