governance/tools/validate_atcs.py
Mohammed Naser 346d3d2f06 linters: enforce ATC membership after January 2020
In the past, we did not have any enforcement for extra ATCs in
terms of foundation membership.  This has resulted in entries
being added which are not foundation members that cannot be
ATCs.

This patch adds a check to ignore our old records (until they
expire out in January 2020) and enforce it on any newly added
ATCs.

Change-Id: Id1f6d5b0bffb41c952e140372a3594669359fbdb
2019-08-12 10:46:42 -04:00

145 lines
4.4 KiB
Python
Executable File

#!/usr/bin/env python3
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
from datetime import datetime
import json
import logging
import requests
import yaml
LOG = logging.getLogger()
# The OpenStack foundation member directory lookup API endpoint
MEMBER_LOOKUP_URL = 'https://openstackid-resources.openstack.org/'
def lookup_member(email):
"A requests wrapper to querying the OSF member directory API"
# URL pattern for querying foundation members by E-mail address
LOG.debug('looking up %s', email)
raw = requester(
MEMBER_LOOKUP_URL + '/api/public/v1/members',
params={
'filter[]': [
'group_slug==foundation-members',
'email==' + email,
],
'expand': 'all_affiliations',
},
headers={'Accept': 'application/json'},
)
decoded = decode_json(raw)
try:
return decoded['data'][0]
except (KeyError, IndexError):
return None
def requester(url, params={}, headers={}):
"""A requests wrapper to consistently retry HTTPS queries
:param url: The URL to get.
:type url: str
:param params: Additional parameters to provide.
:type params: dict(str, str)
:param headers: Additional headers to set.
:type params: dict(str, str)
"""
# Try up to 3 times
retry = requests.Session()
retry.mount("https://", requests.adapters.HTTPAdapter(max_retries=3))
return retry.get(url=url, params=params, headers=headers)
def decode_json(raw):
"""Trap JSON decoding failures and provide more detailed errors
Remove ')]}' XSS prefix from data if it is present, then decode it
as JSON and return the results.
:param raw: Response text from API
:type raw: str
"""
# Gerrit's REST API prepends a JSON-breaker to avoid XSS vulnerabilities
if raw.text.startswith(")]}'"):
trimmed = raw.text[4:]
else:
trimmed = raw.text
# Try to decode and bail with much detail if it fails
try:
decoded = json.loads(trimmed)
except Exception:
LOG.error(
'\nrequest returned %s error to query:\n\n %s\n'
'\nwith detail:\n\n %s\n',
raw, raw.url, trimmed)
raise
return decoded
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
'-p', '--projects',
default='./reference/projects.yaml',
help='projects.yaml file path (%(default)s)',
)
args = parser.parse_args()
# Quiet the urllib3 module output coming out of requests.
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.basicConfig()
with open(args.projects, 'r', encoding='utf-8') as f:
projects = yaml.safe_load(f.read())
exit_code = 0
for project_name, project_data in sorted(projects.items()):
atcs = project_data.get('extra-atcs', [])
for atc in atcs:
try:
member = lookup_member(atc['email'])
except Exception as e:
print('ERROR: {}: Could not validate ATC {}: {}'.format(
project_name, atc, e))
else:
if not member:
# NOTE(mnaser): The ATC membership checks were not
# enforced for all ATCs expiring before
# January 2020. This should be removed
# after January 2020.
expires = datetime.strptime(atc['expires-in'], "%B %Y")
if expires <= datetime(2020, 1, 1):
msg = 'Skipping {} from validation'.format(atc)
else:
msg = 'Unable to find membership: {}'.format(atc)
exit_code = 1
print('{}: {}'.format(project_name, msg))
return exit_code
if __name__ == '__main__':
import sys
sys.exit(main())