# Script to query Gerrit users by email address to debug accounts with email # address conflicts. The idea here is we'll identify which users are active # and need proper manipulation to correct and which are inactive and can # be retired. # # The input list of emails can be generated by a gerrit config consistency # check again external ids. # # This script should also identify when accounts are inactive according to # Gerrit and not just by our "have they pushed or reviewed code in the last # year metric. Accounts that are already inactive can be safely retired too. # This script builds and operates on a datastructure that looks like this. # john.doe@example.com: # 1234: # active: True # recently_used: True # recent_change: '2021-01-23 17:31:25.000000000' # recent_review: None # 5678: # active: False # recently_used: False # recent_change: None # recent_review: '2019-03-05 12:15:34.000000000' # active: # - 1234 # inactive # - 5678 # recently_used: # - 1234 # nonrecently_used: # - 5678 import datetime import json import getpass import requests import yaml TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%f' TODAY = datetime.datetime.now() DELTAT = datetime.timedelta(days=int(365)) SINCET = TODAY - DELTAT def query_gerrit(loc, query, auth=None): # Need to do this authenticated and as admin. Start with first pass just # normal user, then switch to admin and rerun. if auth: loc = 'a/' + loc r = requests.get('https://review.opendev.org/%s/' % loc, params=query, auth=auth) # Strip off the gerrit json prefix j = json.loads(r.text[5:]) return j def get_account_detail(account_id, auth=None): # Need to do this authenticated and as admin. We do this without auth # for quicker debugging cycles, but proper data should be generated with # auth. return query_gerrit('accounts/%s/detail' % account_id, {}, auth) def get_account_sshkeys(account_id, auth=None): # Need to do this authenticated and as admin. We do this without auth # for quicker debugging cycles, but proper data should be generated with # auth. try: sshkeys = query_gerrit('accounts/%s/sshkeys' % account_id, {}, auth) except json.JSONDecodeError: if auth: raise # This handles lack of auth error above sshkeys = [] return sshkeys def get_account_externalids(account_id, auth=None): # Need to do this authenticated and as admin. We do this without auth # for quicker debugging cycles, but proper data should be generated with # auth. try: eids = query_gerrit('accounts/%s/external.ids' % account_id, {}, auth) except json.JSONDecodeError: if auth: raise # This handles lack of auth error above eids = [] return eids def recently_used(timestamp): # Gerrit apparently gives us nanoseconds which we can't parse. timestamp = timestamp[:-3] activity = datetime.datetime.strptime(timestamp, TIME_FORMAT) if TODAY - activity < DELTAT: # We decide the account was recently used if it has reviewed or # pushed code within the last year. return True else: return False def read_email_list(): with open('email_list.txt') as f: users = {} for email in f: users[email.strip()] = {} return users def check_recent_changes(account_id, account_info, auth): # Gerrit appears to do a reverse sort giving you the newest results # first. Since we only care about the most recent activity we set # n = 1 here. #query = {'q': 'owner:%s after:%s' % (account_id, SINCET.strftime('%Y-%m-%d')), 'n': 1} query = {'q': 'owner:%s' % account_id, 'n': 1} j = query_gerrit('changes', query, auth) if j: account_info['recent_change'] = j[0]['updated'] if recently_used(account_info['recent_change']): account_info['recently_used'] = True else: account_info['recent_change'] = None #query = {'q': 'reviewedby:%s after:%s' % (account_id, SINCET.strftime('%Y-%m-%d')), 'n': 1} query = {'q': 'reviewedby:%s' % account_id, 'n': 1} j = query_gerrit('changes', query, auth) if j: account_info['recent_review'] = j[0]['updated'] if recently_used(account_info['recent_review']): account_info['recently_used'] = True else: account_info['recent_review'] = None def gather_user_info(account_id, user, auth): detail = get_account_detail(account_id, auth) if 'registered_on' in detail: user[account_id]['registered_on'] = detail['registered_on'] if 'username' in detail: user[account_id]['username'] = detail['username'] sshkeys = get_account_sshkeys(account_id, auth) if sshkeys: user[account_id]['sshkeys'] = True eids = get_account_externalids(account_id, auth) for eid in eids: # We only care about login.ubuntu urls now if 'login.ubuntu' in eid['identity']: r = requests.head(eid['identity']) if r.status_code == 200: # If there is an openid and it is valid we add it # to the list of valid openids user[account_id]['openids'].append(eid['identity']) else: user[account_id]['invalid_openids'].append(eid['identity']) check_recent_changes(account_id, user[account_id], auth) if user[account_id]['recently_used']: user['recently_used'].append(account_id) else: user['nonrecently_used'].append(account_id) def get_user_activity(users, auth=None): for email in users.keys(): users[email]['active'] = [] users[email]['inactive'] = [] users[email]['recently_used'] = [] users[email]['nonrecently_used'] = [] active_query = {'q': 'email:%s is:active' % email} active_j = query_gerrit('accounts', active_query, auth) inactive_query = {'q': 'email:%s is:inactive' % email} inactive_j = query_gerrit('accounts', inactive_query, auth) if len(active_j + inactive_j) < 2: # Using an admin account to query this info seems to address # this problem, but we'll leave this here as a double check. print("Email %s only has one account" % email) continue for account in active_j: account_id = str(account['_account_id']) users[email][account_id] = {'recently_used': False, 'active': True, 'username': None, 'sshkeys': None, 'openids': [], 'invalid_openids': []} users[email]['active'].append(account_id) gather_user_info(account_id, users[email], auth) for account in inactive_j: account_id = str(account['_account_id']) users[email][account_id] = {'recently_used': False, 'active': False, 'username': None, 'sshkeys': None, 'openids': [], 'invalid_openids': []} users[email]['inactive'].append(account_id) gather_user_info(account_id, users[email], auth) if __name__ == '__main__': query_user = input('Username: ') query_pass = getpass.getpass('Password: ') if query_user and query_pass: auth = (query_user, query_pass) else: auth = None users = read_email_list() get_user_activity(users, auth=auth) with open('audit-results.yaml', 'w') as f: yaml.dump(users, default_flow_style=False, explicit_start=True, indent=4, stream=f) # TODO there are probably better ways to present this data. print() print('Users with inactive accounts. We may just be able to retire these.' '\nThen remove their external ids.') print('Email active accounts|inactive accounts') for email in users: if users[email]['inactive']: print(email + ' ' + ','.join(users[email]['active']) + '|' + ','.join(users[email]['inactive'])) print() print('Users without username, ssh keys, valid openid, and no changes or reviews') print('Email accounts with creds or activity|accounts without creds or activity') for email in users: all_accounts = users[email]['recently_used'] + users[email]['nonrecently_used'] accounts_with_creds = [] accounts_without_creds = [] for account_id in all_accounts: if (not users[email][account_id]['username'] and not users[email][account_id]['sshkeys'] and not users[email][account_id]['recent_change'] and not users[email][account_id]['recent_review'] and not users[email][account_id]['openids']): accounts_without_creds.append(account_id) else: accounts_with_creds.append(account_id) if accounts_without_creds: print(email + ' ' + ','.join(accounts_with_creds) + '|' + ','.join(accounts_without_creds)) print() print('Users without username, sshkeys and zero changes pushed or reviews') print('Email accounts with usage|accounts without usage') for email in users: all_accounts = users[email]['recently_used'] + users[email]['nonrecently_used'] accounts_with_usage = [] accounts_without_usage = [] for account_id in all_accounts: if (not users[email][account_id]['username'] and not users[email][account_id]['sshkeys'] and not users[email][account_id]['recent_change'] and not users[email][account_id]['recent_review']): accounts_without_usage.append(account_id) else: accounts_with_usage.append(account_id) if accounts_without_usage: print(email + ' ' + ','.join(accounts_with_usage) + '|' + ','.join(accounts_without_usage)) print() print('Non recently used Users without username or ssh keys') print('Email accounts with creds|accounts without creds') for email in users: if not users[email]['recently_used'] and users[email]['nonrecently_used']: accounts_with_creds = [] accounts_without_creds = [] for account_id in users[email]['nonrecently_used']: if not users[email][account_id]['username'] and \ not users[email][account_id]['sshkeys']: accounts_without_creds.append(account_id) else: accounts_with_creds.append(account_id) if not accounts_with_creds == users[email]['nonrecently_used']: print(email + ' ' + ','.join(accounts_with_creds) + '|' + ','.join(accounts_without_creds)) print() print('Non recently used Users') print('Email non recent accounts') for email in users: if not users[email]['recently_used'] and users[email]['nonrecently_used']: print(email + ' ' + ','.join(users[email]['nonrecently_used'])) print() print('Recently used Users') print('Email recent accounts|nonrecent accounts') for email in users: if users[email]['recently_used']: print(email + ' ' + ','.join(users[email]['recently_used']) + '|' + ','.join(users[email]['nonrecently_used'])) print() print('Emails that need further investigation') for email in users: if not users[email]['recently_used'] and not users[email]['nonrecently_used']: print(email)