Merge "Protect prime_installation_map with mutex"

This commit is contained in:
Zuul 2020-03-06 17:20:15 +00:00 committed by Gerrit Code Review
commit 25bdded704
1 changed files with 48 additions and 37 deletions

View File

@ -751,6 +751,7 @@ class GithubConnection(BaseConnection):
driver, connection_name, connection_config)
self._change_cache = {}
self._change_update_lock = {}
self._installation_map_lock = threading.Lock()
self._project_branch_cache_include_unprotected = {}
self._project_branch_cache_exclude_unprotected = {}
self.projects = {}
@ -1009,50 +1010,60 @@ class GithubConnection(BaseConnection):
if not self.app_id:
return
url = '%s/app/installations' % self.base_url
installations = []
headers = self._get_app_auth_headers()
page = 1
while url:
self.log.debug("Fetching installations for GitHub app "
"(page %s)" % page)
page += 1
response = requests.get(url, headers=headers)
response.raise_for_status()
installations.extend(response.json())
if self._installation_map_lock.acquire(blocking=False):
try:
url = '%s/app/installations' % self.base_url
installations = []
headers = self._get_app_auth_headers()
page = 1
while url:
self.log.debug("Fetching installations for GitHub app "
"(page %s)" % page)
page += 1
response = requests.get(url, headers=headers)
response.raise_for_status()
installations.extend(response.json())
# check if we need to do further paged calls
url = response.links.get(
'next', {}).get('url')
# check if we need to do further paged calls
url = response.links.get(
'next', {}).get('url')
headers_per_inst = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
headers_per_inst = {}
with concurrent.futures.ThreadPoolExecutor() as executor:
token_by_inst = {}
for install in installations:
inst_id = install.get('id')
token_by_inst[inst_id] = executor.submit(
self._get_installation_key, project=None, inst_id=inst_id)
token_by_inst = {}
for install in installations:
inst_id = install.get('id')
token_by_inst[inst_id] = executor.submit(
self._get_installation_key, project=None,
inst_id=inst_id)
for inst_id, result in token_by_inst.items():
token = result.result()
headers_per_inst[inst_id] = {
'Accept': PREVIEW_JSON_ACCEPT,
'Authorization': 'token %s' % token
}
for inst_id, result in token_by_inst.items():
token = result.result()
headers_per_inst[inst_id] = {
'Accept': PREVIEW_JSON_ACCEPT,
'Authorization': 'token %s' % token
}
project_names_by_inst = {}
for install in installations:
inst_id = install.get('id')
headers = headers_per_inst[inst_id]
project_names_by_inst = {}
for install in installations:
inst_id = install.get('id')
headers = headers_per_inst[inst_id]
project_names_by_inst[inst_id] = executor.submit(
self._get_repos_of_installation, inst_id, headers)
project_names_by_inst[inst_id] = executor.submit(
self._get_repos_of_installation, inst_id, headers)
for inst_id, result in project_names_by_inst.items():
project_names = result.result()
for project_name in project_names:
self.installation_map[project_name] = inst_id
for inst_id, result in project_names_by_inst.items():
project_names = result.result()
for project_name in project_names:
self.installation_map[project_name] = inst_id
finally:
self._installation_map_lock.release()
else:
self.log.debug(
'Already fetching installations, waiting to finish.')
with self._installation_map_lock:
self.log.debug('Finished waiting for fetching installations')
def get_request_lock(self, installation_id):
return self._request_locks.setdefault(