288 lines
11 KiB
Python
288 lines
11 KiB
Python
# Copyright 2016 Red Hat, Inc.
|
|
# Copyright 2021 Acme Gating, LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from collections import defaultdict
|
|
import http.server
|
|
import json
|
|
import logging
|
|
import re
|
|
import socketserver
|
|
import threading
|
|
import urllib.parse
|
|
import time
|
|
|
|
from git.util import IterableList
|
|
|
|
|
|
class GitlabWebServer(object):
|
|
|
|
def __init__(self, merge_requests):
|
|
super(GitlabWebServer, self).__init__()
|
|
self.merge_requests = merge_requests
|
|
self.fake_repos = defaultdict(lambda: IterableList('name'))
|
|
# A dictionary so we can mutate it
|
|
self.options = dict(
|
|
community_edition=False,
|
|
delayed_complete_mr=0,
|
|
uncomplete_mr=False)
|
|
self.stats = {"get_mr": 0}
|
|
|
|
def start(self):
|
|
merge_requests = self.merge_requests
|
|
fake_repos = self.fake_repos
|
|
options = self.options
|
|
stats = self.stats
|
|
|
|
class Server(http.server.SimpleHTTPRequestHandler):
|
|
log = logging.getLogger("zuul.test.GitlabWebServer")
|
|
|
|
branches_re = re.compile(r'.+/projects/(?P<project>.+)/'
|
|
r'repository/branches\\?.*$')
|
|
branch_re = re.compile(r'.+/projects/(?P<project>.+)/'
|
|
r'repository/branches/(?P<branch>.+)$')
|
|
mr_re = re.compile(r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)$')
|
|
mr_approvals_re = re.compile(
|
|
r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)/approvals$')
|
|
|
|
mr_notes_re = re.compile(
|
|
r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)/notes$')
|
|
mr_approve_re = re.compile(
|
|
r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)/approve$')
|
|
mr_unapprove_re = re.compile(
|
|
r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)/unapprove$')
|
|
|
|
mr_merge_re = re.compile(r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)/merge$')
|
|
mr_update_re = re.compile(r'.+/projects/(?P<project>.+)/'
|
|
r'merge_requests/(?P<mr>\d+)$')
|
|
|
|
def _get_mr(self, project, number):
|
|
project = urllib.parse.unquote(project)
|
|
mr = merge_requests.get(project, {}).get(number)
|
|
if not mr:
|
|
# Find out what gitlab does in this case
|
|
raise NotImplementedError()
|
|
return mr
|
|
|
|
def do_GET(self):
|
|
path = self.path
|
|
self.log.debug("Got GET %s", path)
|
|
|
|
m = self.mr_re.match(path)
|
|
if m:
|
|
return self.get_mr(**m.groupdict())
|
|
m = self.mr_approvals_re.match(path)
|
|
if m:
|
|
return self.get_mr_approvals(**m.groupdict())
|
|
m = self.branch_re.match(path)
|
|
if m:
|
|
return self.get_branch(**m.groupdict())
|
|
m = self.branches_re.match(path)
|
|
if m:
|
|
return self.get_branches(path, **m.groupdict())
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
|
|
def do_POST(self):
|
|
path = self.path
|
|
self.log.debug("Got POST %s", path)
|
|
|
|
data = self.rfile.read(int(self.headers['Content-Length']))
|
|
if (self.headers['Content-Type'] ==
|
|
'application/x-www-form-urlencoded'):
|
|
data = urllib.parse.parse_qs(data.decode('utf-8'))
|
|
|
|
self.log.debug("Got data %s", data)
|
|
|
|
m = self.mr_notes_re.match(path)
|
|
if m:
|
|
return self.post_mr_notes(data, **m.groupdict())
|
|
m = self.mr_approve_re.match(path)
|
|
if m:
|
|
return self.post_mr_approve(data, **m.groupdict())
|
|
m = self.mr_unapprove_re.match(path)
|
|
if m:
|
|
return self.post_mr_unapprove(data, **m.groupdict())
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
|
|
def do_PUT(self):
|
|
path = self.path
|
|
self.log.debug("Got PUT %s", path)
|
|
|
|
data = self.rfile.read(int(self.headers['Content-Length']))
|
|
if (self.headers['Content-Type'] ==
|
|
'application/x-www-form-urlencoded'):
|
|
data = urllib.parse.parse_qs(data.decode('utf-8'))
|
|
|
|
self.log.debug("Got data %s", data)
|
|
|
|
m = self.mr_merge_re.match(path)
|
|
if m:
|
|
return self.put_mr_merge(data, **m.groupdict())
|
|
m = self.mr_update_re.match(path)
|
|
if m:
|
|
return self.put_mr_update(data, **m.groupdict())
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
|
|
def send_data(self, data, code=200):
|
|
data = json.dumps(data).encode('utf-8')
|
|
self.send_response(code)
|
|
self.send_header('Content-Type', 'application/json')
|
|
self.send_header('Content-Length', len(data))
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def get_mr(self, project, mr):
|
|
stats["get_mr"] += 1
|
|
mr = self._get_mr(project, mr)
|
|
data = {
|
|
'target_branch': mr.branch,
|
|
'title': mr.subject,
|
|
'state': mr.state,
|
|
'description': mr.description,
|
|
'author': {
|
|
'name': 'Administrator',
|
|
'username': 'admin'
|
|
},
|
|
'updated_at':
|
|
mr.updated_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ'),
|
|
'sha': mr.sha,
|
|
'labels': mr.labels,
|
|
'blocking_discussions_resolved':
|
|
mr.blocking_discussions_resolved,
|
|
'merged_at': mr.merged_at.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
|
if mr.merged_at else mr.merged_at,
|
|
'merge_status': mr.merge_status,
|
|
}
|
|
if options['delayed_complete_mr'] and \
|
|
time.monotonic() < options['delayed_complete_mr']:
|
|
diff_refs = None
|
|
elif options['uncomplete_mr']:
|
|
diff_refs = None
|
|
else:
|
|
diff_refs = {
|
|
'base_sha': mr.base_sha,
|
|
'head_sha': mr.sha,
|
|
'start_sha': 'c380d3acebd181f13629a25d2e2acca46ffe1e00'
|
|
}
|
|
data['diff_refs'] = diff_refs
|
|
self.send_data(data)
|
|
|
|
def get_mr_approvals(self, project, mr):
|
|
mr = self._get_mr(project, mr)
|
|
if not options['community_edition']:
|
|
self.send_data({
|
|
'approvals_left': 0 if mr.approved else 1,
|
|
})
|
|
else:
|
|
self.send_data({
|
|
'approved': mr.approved,
|
|
})
|
|
|
|
def get_branch(self, project, branch):
|
|
project = urllib.parse.unquote(project)
|
|
branch = urllib.parse.unquote(branch)
|
|
owner, name = project.split('/')
|
|
if branch in fake_repos[(owner, name)]:
|
|
protected = fake_repos[(owner, name)][branch].protected
|
|
self.send_data({'protected': protected})
|
|
else:
|
|
return self.send_data({}, code=404)
|
|
|
|
def get_branches(self, url, project):
|
|
project = urllib.parse.unquote(project).split('/')
|
|
req = urllib.parse.urlparse(url)
|
|
query = urllib.parse.parse_qs(req.query)
|
|
per_page = int(query["per_page"][0])
|
|
page = int(query["page"][0])
|
|
|
|
repo = fake_repos[tuple(project)]
|
|
first_entry = (page - 1) * per_page
|
|
last_entry = min(len(repo), (page) * per_page)
|
|
|
|
if first_entry >= len(repo):
|
|
branches = []
|
|
else:
|
|
branches = [{'name': repo[i].name,
|
|
'protected': repo[i].protected}
|
|
for i in range(first_entry, last_entry)]
|
|
self.send_data(branches)
|
|
|
|
def post_mr_notes(self, data, project, mr):
|
|
mr = self._get_mr(project, mr)
|
|
mr.addNote(data['body'][0])
|
|
self.send_data({})
|
|
|
|
def post_mr_approve(self, data, project, mr):
|
|
assert 'sha' in data
|
|
mr = self._get_mr(project, mr)
|
|
if data['sha'][0] != mr.sha:
|
|
return self.send_data(
|
|
{'message': 'SHA does not match HEAD of source '
|
|
'branch: <new_sha>'}, code=409)
|
|
if mr.approved:
|
|
return self.send_data(
|
|
{'message': '401 Unauthorized'}, code=401)
|
|
mr.approved = True
|
|
self.send_data({}, code=201)
|
|
|
|
def post_mr_unapprove(self, data, project, mr):
|
|
mr = self._get_mr(project, mr)
|
|
if not mr.approved:
|
|
return self.send_data(
|
|
{'message': "404 Not Found"}, code=404)
|
|
mr.approved = False
|
|
self.send_data({}, code=201)
|
|
|
|
def put_mr_merge(self, data, project, mr):
|
|
mr = self._get_mr(project, mr)
|
|
squash = None
|
|
if data and isinstance(data, dict):
|
|
squash = data.get('squash')
|
|
mr.mergeMergeRequest(squash)
|
|
self.send_data({'state': 'merged'})
|
|
|
|
def put_mr_update(self, data, project, mr):
|
|
mr = self._get_mr(project, mr)
|
|
labels = set(mr.labels)
|
|
add_labels = data.get('add_labels', [''])[0].split(',')
|
|
remove_labels = data.get('remove_labels', [''])[0].split(',')
|
|
labels = labels - set(remove_labels)
|
|
labels = labels | set(add_labels)
|
|
mr.labels = list(labels)
|
|
self.send_data({})
|
|
|
|
def log_message(self, fmt, *args):
|
|
self.log.debug(fmt, *args)
|
|
|
|
self.httpd = socketserver.ThreadingTCPServer(('', 0), Server)
|
|
self.port = self.httpd.socket.getsockname()[1]
|
|
self.thread = threading.Thread(name='GitlabWebServer',
|
|
target=self.httpd.serve_forever)
|
|
self.thread.daemon = True
|
|
self.thread.start()
|
|
|
|
def stop(self):
|
|
self.httpd.shutdown()
|
|
self.thread.join()
|
|
self.httpd.server_close()
|