zuul/tests/fakegitlab.py

288 lines
11 KiB
Python

# Copyright 2016 Red Hat, Inc.
# Copyright 2021 Acme Gating, LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
import http.server
import json
import logging
import re
import socketserver
import threading
import urllib.parse
import time
from git.util import IterableList
class GitlabWebServer(object):
def __init__(self, merge_requests):
super(GitlabWebServer, self).__init__()
self.merge_requests = merge_requests
self.fake_repos = defaultdict(lambda: IterableList('name'))
# A dictionary so we can mutate it
self.options = dict(
community_edition=False,
delayed_complete_mr=0,
uncomplete_mr=False)
self.stats = {"get_mr": 0}
def start(self):
merge_requests = self.merge_requests
fake_repos = self.fake_repos
options = self.options
stats = self.stats
class Server(http.server.SimpleHTTPRequestHandler):
log = logging.getLogger("zuul.test.GitlabWebServer")
branches_re = re.compile(r'.+/projects/(?P<project>.+)/'
r'repository/branches\\?.*$')
branch_re = re.compile(r'.+/projects/(?P<project>.+)/'
r'repository/branches/(?P<branch>.+)$')
mr_re = re.compile(r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)$')
mr_approvals_re = re.compile(
r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)/approvals$')
mr_notes_re = re.compile(
r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)/notes$')
mr_approve_re = re.compile(
r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)/approve$')
mr_unapprove_re = re.compile(
r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)/unapprove$')
mr_merge_re = re.compile(r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)/merge$')
mr_update_re = re.compile(r'.+/projects/(?P<project>.+)/'
r'merge_requests/(?P<mr>\d+)$')
def _get_mr(self, project, number):
project = urllib.parse.unquote(project)
mr = merge_requests.get(project, {}).get(number)
if not mr:
# Find out what gitlab does in this case
raise NotImplementedError()
return mr
def do_GET(self):
path = self.path
self.log.debug("Got GET %s", path)
m = self.mr_re.match(path)
if m:
return self.get_mr(**m.groupdict())
m = self.mr_approvals_re.match(path)
if m:
return self.get_mr_approvals(**m.groupdict())
m = self.branch_re.match(path)
if m:
return self.get_branch(**m.groupdict())
m = self.branches_re.match(path)
if m:
return self.get_branches(path, **m.groupdict())
self.send_response(500)
self.end_headers()
def do_POST(self):
path = self.path
self.log.debug("Got POST %s", path)
data = self.rfile.read(int(self.headers['Content-Length']))
if (self.headers['Content-Type'] ==
'application/x-www-form-urlencoded'):
data = urllib.parse.parse_qs(data.decode('utf-8'))
self.log.debug("Got data %s", data)
m = self.mr_notes_re.match(path)
if m:
return self.post_mr_notes(data, **m.groupdict())
m = self.mr_approve_re.match(path)
if m:
return self.post_mr_approve(data, **m.groupdict())
m = self.mr_unapprove_re.match(path)
if m:
return self.post_mr_unapprove(data, **m.groupdict())
self.send_response(500)
self.end_headers()
def do_PUT(self):
path = self.path
self.log.debug("Got PUT %s", path)
data = self.rfile.read(int(self.headers['Content-Length']))
if (self.headers['Content-Type'] ==
'application/x-www-form-urlencoded'):
data = urllib.parse.parse_qs(data.decode('utf-8'))
self.log.debug("Got data %s", data)
m = self.mr_merge_re.match(path)
if m:
return self.put_mr_merge(data, **m.groupdict())
m = self.mr_update_re.match(path)
if m:
return self.put_mr_update(data, **m.groupdict())
self.send_response(500)
self.end_headers()
def send_data(self, data, code=200):
data = json.dumps(data).encode('utf-8')
self.send_response(code)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', len(data))
self.end_headers()
self.wfile.write(data)
def get_mr(self, project, mr):
stats["get_mr"] += 1
mr = self._get_mr(project, mr)
data = {
'target_branch': mr.branch,
'title': mr.subject,
'state': mr.state,
'description': mr.description,
'author': {
'name': 'Administrator',
'username': 'admin'
},
'updated_at':
mr.updated_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
'sha': mr.sha,
'labels': mr.labels,
'blocking_discussions_resolved':
mr.blocking_discussions_resolved,
'merged_at': mr.merged_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
if mr.merged_at else mr.merged_at,
'merge_status': mr.merge_status,
}
if options['delayed_complete_mr'] and \
time.monotonic() < options['delayed_complete_mr']:
diff_refs = None
elif options['uncomplete_mr']:
diff_refs = None
else:
diff_refs = {
'base_sha': mr.base_sha,
'head_sha': mr.sha,
'start_sha': 'c380d3acebd181f13629a25d2e2acca46ffe1e00'
}
data['diff_refs'] = diff_refs
self.send_data(data)
def get_mr_approvals(self, project, mr):
mr = self._get_mr(project, mr)
if not options['community_edition']:
self.send_data({
'approvals_left': 0 if mr.approved else 1,
})
else:
self.send_data({
'approved': mr.approved,
})
def get_branch(self, project, branch):
project = urllib.parse.unquote(project)
branch = urllib.parse.unquote(branch)
owner, name = project.split('/')
if branch in fake_repos[(owner, name)]:
protected = fake_repos[(owner, name)][branch].protected
self.send_data({'protected': protected})
else:
return self.send_data({}, code=404)
def get_branches(self, url, project):
project = urllib.parse.unquote(project).split('/')
req = urllib.parse.urlparse(url)
query = urllib.parse.parse_qs(req.query)
per_page = int(query["per_page"][0])
page = int(query["page"][0])
repo = fake_repos[tuple(project)]
first_entry = (page - 1) * per_page
last_entry = min(len(repo), (page) * per_page)
if first_entry >= len(repo):
branches = []
else:
branches = [{'name': repo[i].name,
'protected': repo[i].protected}
for i in range(first_entry, last_entry)]
self.send_data(branches)
def post_mr_notes(self, data, project, mr):
mr = self._get_mr(project, mr)
mr.addNote(data['body'][0])
self.send_data({})
def post_mr_approve(self, data, project, mr):
assert 'sha' in data
mr = self._get_mr(project, mr)
if data['sha'][0] != mr.sha:
return self.send_data(
{'message': 'SHA does not match HEAD of source '
'branch: <new_sha>'}, code=409)
if mr.approved:
return self.send_data(
{'message': '401 Unauthorized'}, code=401)
mr.approved = True
self.send_data({}, code=201)
def post_mr_unapprove(self, data, project, mr):
mr = self._get_mr(project, mr)
if not mr.approved:
return self.send_data(
{'message': "404 Not Found"}, code=404)
mr.approved = False
self.send_data({}, code=201)
def put_mr_merge(self, data, project, mr):
mr = self._get_mr(project, mr)
squash = None
if data and isinstance(data, dict):
squash = data.get('squash')
mr.mergeMergeRequest(squash)
self.send_data({'state': 'merged'})
def put_mr_update(self, data, project, mr):
mr = self._get_mr(project, mr)
labels = set(mr.labels)
add_labels = data.get('add_labels', [''])[0].split(',')
remove_labels = data.get('remove_labels', [''])[0].split(',')
labels = labels - set(remove_labels)
labels = labels | set(add_labels)
mr.labels = list(labels)
self.send_data({})
def log_message(self, fmt, *args):
self.log.debug(fmt, *args)
self.httpd = socketserver.ThreadingTCPServer(('', 0), Server)
self.port = self.httpd.socket.getsockname()[1]
self.thread = threading.Thread(name='GitlabWebServer',
target=self.httpd.serve_forever)
self.thread.daemon = True
self.thread.start()
def stop(self):
self.httpd.shutdown()
self.thread.join()
self.httpd.server_close()