James E. Blair 13c7c8bb7e Parallelize repo creation by org
This runs repo creation across two orgs at the same time.  It doesn't
help to parallelize more than 2 since openstack runs the entire time
in one thread (so the other thread handles all the other orgs).

Parallelizing by org avoids database contention for updating the user
table, since each org is a different user.  However, there's a weird
locking thing going on with the first update to the settings table,
so this does some extra work to serialize actions until we perform
that first update, then switches to parallel.

This is the maximum we can parallelize repo creation at the moment,
and it also maximizes settings updates (the settings updates take less
time than repo creation, so no further optimization helps).

Change-Id: I7f83dcdb4531a547ae5281434d7cda825dd50059
2019-07-16 14:24:44 -07:00

293 lines
11 KiB
Python
Executable File

#!/usr/bin/env python3
#
# Copyright 2019 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import concurrent.futures
import datetime
import time
import requests
import urllib.parse
from ansible.module_utils.basic import AnsibleModule
SB_REPO = 'https://storyboard.openstack.org/#!/project/{org}/{repo}'
SB_FORMAT = 'https://storyboard.openstack.org/#!/story/{{index}}'
LP_REPO = 'https://bugs.launchpad.net/{repo}'
LP_FORMAT = 'https://bugs.launchpad.net/{repo}/+bug/{{index}}'
class Gitea(object):
def __init__(self, url, password, always_update, projects):
self.url = url
self.password = password
self.always_update = always_update
self.projects = projects
self.orgs = { f['project'].split('/')[0] for f in self.projects }
self.org_projects = {}
for org in self.orgs:
p = [ f for f in self.projects
if (f['project'].split('/')[0] == org) ]
self.org_projects[org] = p
self._log = []
self.session = requests.Session()
self.failed = False
def log(self, *args):
now = datetime.datetime.utcnow().isoformat()
self._log.append(" ".join((now,) + args))
def get_log(self):
return "\n".join(self._log)
def request(self, method, endpoint, *args, **kwargs):
resp = self.session.request(
method,
urllib.parse.urljoin(self.url, endpoint),
auth=('root', self.password),
verify=False,
*args, **kwargs)
resp.raise_for_status()
return resp
def get(self, endpoint, *args, **kwargs):
return self.request('GET', endpoint, *args, **kwargs)
def post(self, endpoint, *args, **kwargs):
return self.request('POST', endpoint, *args, **kwargs)
def put(self, endpoint, *args, **kwargs):
return self.request('PUT', endpoint, *args, **kwargs)
def get_gitea_orgs(self):
orgs = self.get("/api/v1/user/orgs").json()
return [f['username'] for f in orgs]
def make_gitea_org(self, org):
self.post(
'/api/v1/admin/users/root/orgs',
json=dict(username=org))
self.log("Created org:", org)
def ensure_gitea_teams(self, org):
team_list = self.get('/api/v1/orgs/{org}/teams'.format(org=org)).json()
owner_id = [f['id'] for f in team_list if f['name'] == 'Owners'][0]
org_owners = self.get(
'/api/v1/teams/{owner_id}/members'.format(owner_id=owner_id))
if 'gerrit' not in [f['username'] for f in org_owners.json()]:
self.put('/api/v1/teams/{owner_id}/members/gerrit'.format(
owner_id=owner_id))
self.log("Added gerrit to team:", org)
def get_org_repo_list(self, org):
return [x['full_name'] for x in
self.get('/api/v1/orgs/{org}/repos'.format(org=org)).json()]
def get_csrf_token(self):
resp = self.get('/')
return urllib.parse.unquote(self.session.cookies.get('_csrf'))
def make_gitea_project(self, project, csrf_token):
org, repo = project['project'].split('/', 1)
resp = self.post(
'/api/v1/org/{org}/repos'.format(org=org),
json=dict(
auto_init=True,
description=project.get('description', '')[:255],
name=repo,
private=False,
readme='Default'))
self.log("Created repo:", project['project'])
def update_gitea_project_settings(self, project, csrf_token):
org, repo = project['project'].split('/', 1)
if project.get('use-storyboard'):
external_tracker_url = SB_REPO.format(org=org, repo=repo)
tracker_url_format = SB_FORMAT
elif project.get('groups'):
external_tracker_url = LP_REPO.format(repo=project['groups'][0])
tracker_url_format = LP_FORMAT.format(repo=project['groups'][0])
else:
external_tracker_url = LP_REPO.format(repo=repo)
tracker_url_format = LP_FORMAT.format(repo=repo)
for count in range(0, 5):
try:
self.post(
'/{org}/{repo}/settings'.format(org=org, repo=repo),
data=dict(
_csrf=csrf_token,
action='advanced',
# enable_pulls is not provided, which disables it
# enable_wiki is not provided, which disables it
enable_external_wiki=False,
external_wiki_url='',
# enable_issues is on so that issue links work
enable_issues='on',
enable_external_tracker=True,
external_tracker_url=external_tracker_url,
tracker_url_format=tracker_url_format,
tracker_issue_style='numeric',
),
allow_redirects=False)
# Set allow_redirects to false because gitea returns
# with a 302 on success, and we don't need to follow
# that.
self.log("Updated tracker url:", external_tracker_url)
return
except requests.exceptions.HTTPError as e:
time.sleep(3)
raise Exception("Could not update tracker url")
def update_gitea_project_branches(self, project, csrf_token):
org, repo = project['project'].split('/', 1)
for count in range(0, 5):
try:
self.post(
'/{org}/{repo}/settings/branches'.format(
org=org, repo=repo),
data=dict(
_csrf=csrf_token,
action='default_branch',
branch='master',
),
allow_redirects=False)
# Set allow_redirects to false because gitea returns
# with a 302 on success, and we don't need to follow
# that.
self.log("Set master branch:", project['project'])
return
except requests.exceptions.HTTPError as e:
time.sleep(3)
raise Exception("Could not update branch settings")
def make_projects(self, projects, gitea_repos, csrf_token,
settings_thread_pool, branches_thread_pool, futures):
for project in projects:
if project['project'] in gitea_repos:
create = False
else:
create = True
if create:
# TODO: use threadpool when we're running with
# https://github.com/go-gitea/gitea/pull/7493
self.make_gitea_project(project, csrf_token)
if create or self.always_update:
futures.append(settings_thread_pool.submit(
self.update_gitea_project_settings,
project, csrf_token))
futures.append(branches_thread_pool.submit(
self.update_gitea_project_branches,
project, csrf_token))
def run(self):
futures = []
gitea_orgs = self.get_gitea_orgs()
gitea_repos = []
for org in self.orgs:
if org not in gitea_orgs:
self.make_gitea_org(org)
self.ensure_gitea_teams(org)
gitea_repos.extend(self.get_org_repo_list(org))
csrf_token = self.get_csrf_token()
# We can create repos in parallel, as long as all the repos
# for the same org are in series (due to database contention,
# until https://github.com/go-gitea/gitea/pull/7493 is
# merged). It doesn't help to have more than 2 since
# openstack is the largest and everything else combined is
# less than that.
org_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2)
settings_thread_pool = concurrent.futures.ThreadPoolExecutor()
branches_thread_pool = concurrent.futures.ThreadPoolExecutor()
# The very first update to the repo_unit table needs to happen
# without any other actions in parallel, otherwise a lock will
# be held for a significant amount of time causing requests to
# back up (and some to fail). Work through the project list
# in series until we find the first that updates the project
# settings (this will be the first with any significant work).
org_task_lists = []
for org, projects in self.org_projects.items():
org_task_lists.append(projects)
first_settings = False
for task_list in org_task_lists:
while task_list:
project = task_list.pop(0)
self.make_projects([project], gitea_repos, csrf_token,
settings_thread_pool, branches_thread_pool,
futures)
if len(futures) > 1:
first_settings = True
self.wait_for_futures(futures)
futures = []
if first_settings:
break
# Once that is done, we can parallelize the rest. Sort the
# org task lists by length so that we pack them into our two
# threads efficiently.
sorted_task_lists = sorted(
org_task_lists, key=lambda x: len(x), reverse=True)
for projects in sorted_task_lists:
futures.append(org_thread_pool.submit(
self.make_projects,
projects, gitea_repos, csrf_token, settings_thread_pool,
branches_thread_pool, futures))
self.wait_for_futures(futures)
def wait_for_futures(self, futures):
for f in futures:
try:
r = f.result()
except Exception as e:
self.log(str(e))
self.failed = True
def ansible_main():
module = AnsibleModule(
argument_spec=dict(
url=dict(required=True),
password=dict(required=True),
projects=dict(required=True, type='list'),
always_update=dict(type='bool', default=True),
)
)
p = module.params
gitea = Gitea(
url=p.get('url'),
password=p.get('password'),
always_update=p.get('always_update'),
projects=p.get('projects'),
)
try:
gitea.run()
except Exception as e:
module.fail_json(msg=str(e), changed=True)
log = gitea.get_log()
if gitea.failed:
module.fail_json(msg="Failure during repo creation, see log",
changed=bool(log), log=log)
module.exit_json(changed=bool(log), log=log)
if __name__ == '__main__':
ansible_main()