Refactored github tools to prefer token-based auth

The github workflow still asks for username, password,
and 2FA token if need be, but creates a repo token and
uses that for further exchanges to save on user input
fatigue.

Closes-Bug: #1442764

Change-Id: I530a965dc5d5bad080c49b5cf0d38f3f7aee201a
This commit is contained in:
Ed Cranford
2015-04-06 17:16:53 -05:00
parent e55c453056
commit ac71757082
3 changed files with 153 additions and 121 deletions

View File

@@ -25,24 +25,27 @@ import string
import httplib2
class GitHubException(Exception):
def __init__(self, status_code, message):
self.status_code = status_code
self.message = message
def __str__(self):
return "GitHub Exception: %s: %s" % (self.status_code, self.message)
class GitHubAuth(object):
_username = None
_password = None
_onetime_password = None
_token = None
user_org_name = None
full_repo_name = None
git_url = None
_github_auth_url = 'https://api.github.com/authorizations'
_github_repo_hook_url = 'https://api.github.com/repos/%s/hooks'
_github_user_key_url = 'https://api.github.com/user/keys'
_github_repo_regex = r'github\.com[:/](.+?)/(.+?)($|/$|\.git$|\.git/$)'
_auth_url = 'https://api.github.com/authorizations'
def __init__(self, git_url, username=None, password=None):
def __init__(self, git_url, username=None, password=None, repo_token=None):
self.git_url = git_url
user_org_name, repo = '', ''
github_regex = r'github\.com[:/](.+?)/(.+?)($|/$|\.git$|\.git/$)'
repo_pat = re.compile(github_regex)
repo_pat = re.compile(self._github_repo_regex)
match = repo_pat.search(self.git_url)
if match:
user_org_name, repo = match.group(1), match.group(2)
@@ -53,11 +56,10 @@ class GitHubAuth(object):
self.full_repo_name = '/'.join([user_org_name, repo])
self._repo_token = repo_token
self._username = username
self._password = password
# If either is None, ask for them now.
self.username, self.password
self._otp_required = False
@property
def username(self):
@@ -75,72 +77,94 @@ class GitHubAuth(object):
@property
def onetime_password(self):
if self._onetime_password is None:
self._onetime_password = getpass.getpass("2FA Token: ")
return self._onetime_password
# This is prompted for every time it's needed.
print("Two-Factor Authentication required.")
return getpass.getpass("2FA Token: ")
@property
def token(self):
if self._token is None:
self._get_repo_token()
return self._token
def repo_token(self):
if self._repo_token is None:
self.create_repo_token()
return self._repo_token
def _auth_header(self, use_otp=False):
authstring = '%s:%s' % (self.username, self.password)
auth = ''
try:
auth = base64.encodestring(authstring)
except TypeError:
# Python 3
auth = base64.encodestring(bytes(authstring, 'utf-8'))
@property
def auth_header(self):
header = {
'Authorization': 'Basic %s' % auth,
'Content-Type': 'application/json',
}
if use_otp:
# The token on its own should suffice
if self._repo_token:
header['Authorization'] = 'token %s' % self._repo_token
return header
# This will prompt the user if either name or pass is missing.
authstring = '%s:%s' % (self.username, self.password)
basic_auth = ''
try:
basic_auth = base64.encodestring(authstring)
except TypeError:
# Python 3
basic_auth = base64.encodestring(bytes(authstring, 'utf-8'))
basic_auth = basic_auth.decode('utf-8')
basic_auth = basic_auth.strip()
header['Authorization'] = 'Basic %s' % basic_auth
# This will prompt for the OTP.
if self._otp_required:
header['x-github-otp'] = self.onetime_password
return header
def _get_repo_token(self):
note = ''.join(random.sample(string.lowercase, 5))
auth_info = {
'scopes': 'repo',
'note': 'Solum-status-%s' % note,
}
resp, content = httplib2.Http().request(
self._auth_url,
'POST',
headers=self._auth_header(),
body=json.dumps(auth_info))
if resp.get('status') in ['200', '201']:
response_body = json.loads(content)
self._token = response_body.get('token')
else:
print("Error getting repo token.")
def _send_authed_request(self, url, body_dict):
body_text = json.dumps(body_dict)
resp, content = httplib2.Http().request(
url,
'POST',
headers=self._auth_header(),
body=json.dumps(body_dict))
url, 'POST',
headers=self.auth_header,
body=body_text)
if resp.get('status') in ['401']:
if resp.get('x-github-otp', '').startswith('required'):
print("Two-Factor Authentication required.")
self._otp_required = True
resp, content = httplib2.Http().request(
url,
'POST',
headers=self._auth_header(use_otp=True),
body=json.dumps(body_dict))
url, 'POST',
headers=self.auth_header,
body=body_text)
return resp, content
def create_repo_token(self):
print("Creating repo token")
note = ''.join(random.sample(string.lowercase, 5))
auth_info = {
'scopes': ['repo', 'write:public_key', 'write:repo_hook'],
'note': 'Solum-status-%s' % note,
}
resp, content = self._send_authed_request(
self._github_auth_url,
auth_info)
status_code = int(resp.get('status', '500'))
response_body = json.loads(content)
if status_code in [200, 201]:
self._repo_token = response_body.get('token')
elif status_code >= 400 and status_code < 600:
message = response_body.get('message',
'No error message provided.')
raise GitHubException(status_code, message)
def create_webhook(self, trigger_uri, workflow=None):
hook_url = ('https://api.github.com/repos/%s/hooks' %
self.full_repo_name)
hook_url = self._github_repo_hook_url % self.full_repo_name
if workflow is not None:
# workflow is a list of strings, likely
# ['unittest', 'build', 'deploy'].
# They're joined with + and appended to the
# trigger_uri here.
wf_query = "?workflow=%s" % '+'.join(workflow)
trigger_uri += wf_query
hook_info = {
'name': 'web',
'events': ['pull_request', 'commit_comment'],
@@ -153,18 +177,13 @@ class GitHubAuth(object):
if resp.get('status') not in ['200', '201']:
print("Error creating webhook.")
def add_ssh_key(self, public_key=None, is_private=False):
def add_ssh_key(self, public_key=None):
if not public_key:
print("No public key to upload.")
return
if is_private:
print("Uploading public key to user account.")
else:
print("Uploading public key to repository.")
api_url = ('https://api.github.com/repos/%s/keys' %
self.full_repo_name)
if is_private:
api_url = 'https://api.github.com/user/keys'
print("Uploading public key to user account.")
api_url = self._github_user_key_url
key_info = {
'title': 'devops@Solum',
'key': public_key,

View File

@@ -707,11 +707,15 @@ Available commands:
action='store_true',
dest='setup_trigger',
help="Set up app trigger on git repo")
trigger_help = ("Which of stages build, unittest, deploy to trigger "
"from git. For example: "
"--trigger-workflow=unittest+build+deploy. "
"Implies --setup-trigger.")
self.parser.add_argument('--trigger-workflow',
default='',
dest='workflow',
help="Which of stages build, unittest, "
"deploy to trigger from git")
help=trigger_help)
args = self.parser.parse_args()
@@ -872,6 +876,24 @@ Available commands:
args.param_file)
raise exc.CommandError(message=message)
repo_token = plan_definition['artifacts'][0].get('repo_token')
gha = github.GitHubAuth(git_url, repo_token=repo_token)
# Set up a repo token for the user if it hasn't already been created.
repo_token = repo_token or gha.repo_token
plan_definition['artifacts'][0]['repo_token'] = repo_token
# If there's a public key defined in the plan, upload it.
content = plan_definition['artifacts'][0].get('content')
if content:
public_key = content.get('public_key', '')
private_repo = content.get('private', False)
if private_repo and public_key:
try:
gha.add_ssh_key(public_key=public_key)
except github.GitHubException as ghe:
raise exc.CommandError(message=str(ghe))
plan = self.client.plans.create(yamlutils.dump(plan_definition))
fields = ['uuid', 'name', 'description', 'uri', 'artifacts',
'trigger_uri']
@@ -880,18 +902,16 @@ Available commands:
self._print_dict(plan, fields, wrap=72)
self._show_public_keys(artifacts)
if artifacts and args.setup_trigger:
gha = github.GitHubAuth(git_url)
content = artifacts[0].content
public_key = content.get('public_key', '')
private_repo = content.get('private', False)
gha.add_ssh_key(public_key=public_key, is_private=private_repo)
if args.setup_trigger or args.workflow:
trigger_uri = vars(plan).get('trigger_uri', '')
if trigger_uri:
workflow = None
if args.workflow:
workflow = args.workflow.replace('+', ' ').split(' ')
gha.create_webhook(trigger_uri, workflow=workflow)
try:
gha.create_webhook(trigger_uri, workflow=workflow)
except github.GitHubException as ghe:
raise exc.CommandError(message=str(ghe))
def deploy(self):
"""Deploy an application, building any applicable artifacts first."""

View File

@@ -23,44 +23,57 @@ from solumclient.tests import base
class TestGitHubAuth(base.TestCase):
fake_repo = "http://github.com/fakeuser/fakerepo.git"
fake_trigger = "http://example.com/trigger/1"
fake_username = 'fakeuser'
fake_password = 'fakepassword'
fake_token = 'faketoken'
def test_invalid_repo(self):
self.assertRaises(ValueError,
github.GitHubAuth,
"http://example.com")
def test_token_fetched_on_request(self):
def test_auth_header_username_password(self):
gha = github.GitHubAuth(self.fake_repo,
username=self.fake_username,
password=self.fake_password)
gha._get_repo_token = mock.Mock()
gha.token
gha._get_repo_token.assert_called_once()
# base64.b64encode('fakeuser:fakepassword') yields 'ZmFrZX...'
expected_auth_header = {
'Content-Type': 'application/json',
'Authorization': 'Basic ZmFrZXVzZXI6ZmFrZXBhc3N3b3Jk',
}
self.assertEqual(expected_auth_header, gha.auth_header)
def test_token_fetched_only_once(self):
@mock.patch('getpass.getpass')
def test_auth_header_username_password_2fa(self, fake_getpass):
gha = github.GitHubAuth(self.fake_repo,
username=self.fake_username,
password=self.fake_password)
gha._otp_required = True
fake_getpass.return_value = 'fakeonetime'
expected_auth_header = {
'Content-Type': 'application/json',
'Authorization': 'Basic ZmFrZXVzZXI6ZmFrZXBhc3N3b3Jk',
'x-github-otp': 'fakeonetime',
}
self.assertEqual(expected_auth_header, gha.auth_header)
def update_token(some_gha):
some_gha._token = 'foo'
gha._get_repo_token = mock.Mock(side_effect=update_token(gha))
for i in range(5):
gha.token
gha._get_repo_token.assert_called_once()
self.assertEqual(gha.token, 'foo')
def test_auth_header_repo_token(self):
gha = github.GitHubAuth(self.fake_repo,
repo_token=self.fake_token)
expected_auth_header = {
'Content-Type': 'application/json',
'Authorization': 'token %s' % self.fake_token,
}
self.assertEqual(expected_auth_header, gha.auth_header)
@mock.patch('httplib2.Http.request')
def test_create_webhook(self, fake_request):
gha = github.GitHubAuth(self.fake_repo,
username=self.fake_username,
password=self.fake_password)
repo_token=self.fake_token)
fake_request.return_value = ({'status': '200'},
'{"token": "foo"}')
fake_trigger_url = 'http://example.com'
gha.create_webhook(fake_trigger_url)
'{"token": "%s"}' % self.fake_token)
gha.create_webhook(self.fake_trigger)
fake_request.assert_called_once_with(
'https://api.github.com/repos/fakeuser/fakerepo/hooks',
'POST',
@@ -68,7 +81,7 @@ class TestGitHubAuth(base.TestCase):
body=mock.ANY)
expected_body = {
"config": {
"url": fake_trigger_url,
"url": self.fake_trigger,
"content_type": "json"},
"name": "web",
"events": ["pull_request", "commit_comment"]}
@@ -82,8 +95,7 @@ class TestGitHubAuth(base.TestCase):
password=self.fake_password)
fake_request.return_value = ({'status': '200'},
'{"token": "foo"}')
fake_trigger_url = 'http://example.com'
gha.create_webhook(fake_trigger_url, workflow=['unittest'])
gha.create_webhook(self.fake_trigger, workflow=['unittest'])
fake_request.assert_called_once_with(
'https://api.github.com/repos/fakeuser/fakerepo/hooks',
'POST',
@@ -91,7 +103,7 @@ class TestGitHubAuth(base.TestCase):
body=mock.ANY)
expected_body = {
"config": {
"url": fake_trigger_url + "?workflow=unittest",
"url": self.fake_trigger + "?workflow=unittest",
"content_type": "json"},
"name": "web",
"events": ["pull_request", "commit_comment"]}
@@ -105,8 +117,7 @@ class TestGitHubAuth(base.TestCase):
password=self.fake_password)
fake_request.return_value = ({'status': '200'},
'{"token": "foo"}')
fake_trigger_url = 'http://example.com'
gha.create_webhook(fake_trigger_url, workflow=['unittest', 'build'])
gha.create_webhook(self.fake_trigger, workflow=['unittest', 'build'])
fake_request.assert_called_once_with(
'https://api.github.com/repos/fakeuser/fakerepo/hooks',
'POST',
@@ -114,7 +125,7 @@ class TestGitHubAuth(base.TestCase):
body=mock.ANY)
expected_body = {
"config": {
"url": fake_trigger_url + "?workflow=unittest+build",
"url": self.fake_trigger + "?workflow=unittest+build",
"content_type": "json"},
"name": "web",
"events": ["pull_request", "commit_comment"]}
@@ -122,7 +133,7 @@ class TestGitHubAuth(base.TestCase):
self.assertEqual(expected_body, actual_body)
@mock.patch('httplib2.Http.request')
def test_add_ssh_key_public(self, fake_request):
def test_add_ssh_key(self, fake_request):
gha = github.GitHubAuth(self.fake_repo,
username=self.fake_username,
password=self.fake_password)
@@ -130,24 +141,6 @@ class TestGitHubAuth(base.TestCase):
'{"token": "foo"}')
fake_pub_key = 'foo'
gha.add_ssh_key(public_key=fake_pub_key)
fake_request.assert_called_once_with(
'https://api.github.com/repos/fakeuser/fakerepo/keys',
'POST',
headers=mock.ANY,
body=mock.ANY)
expected_body = {"key": "foo", "title": "devops@Solum"}
actual_body = json.loads(fake_request.call_args[1]['body'])
self.assertEqual(expected_body, actual_body)
@mock.patch('httplib2.Http.request')
def test_add_ssh_key_private(self, fake_request):
gha = github.GitHubAuth(self.fake_repo,
username=self.fake_username,
password=self.fake_password)
fake_request.return_value = ({'status': '200'},
'{"token": "foo"}')
fake_pub_key = 'foo'
gha.add_ssh_key(public_key=fake_pub_key, is_private=True)
fake_request.assert_called_once_with(
'https://api.github.com/user/keys',
'POST',