Allow operator to generate auth tokens through the CLI
Add the "create-auth-token" subcommand to the zuul CLI; this subcommand allows an operator to create an authentication token for a user with customized authorizations. This requires at least one auth section with a signing key to be specified in Zuul's configuration file. This is meant as a way to provide authorizations "manually" on test deployments, until a proper authorization engine is plugged into Zuul, in a subsequent patch. Change-Id: I039e70cd8d5e502795772af0ea2a336c08316f2c
This commit is contained in:
@@ -17,8 +17,10 @@
|
||||
import argparse
|
||||
import babel.dates
|
||||
import datetime
|
||||
import jwt
|
||||
import logging
|
||||
import prettytable
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
import textwrap
|
||||
@@ -160,6 +162,44 @@ class Client(zuul.cmd.ZuulApp):
|
||||
help='validate the tenant configuration')
|
||||
cmd_conf_check.set_defaults(func=self.validate)
|
||||
|
||||
cmd_create_auth_token = subparsers.add_parser(
|
||||
'create-auth-token',
|
||||
help='create an Authentication Token for the web API',
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
description=textwrap.dedent('''\
|
||||
Create an Authentication Token for the administration web API
|
||||
|
||||
Create a bearer token that can be used to access Zuul's
|
||||
administration web API. This is typically used to delegate
|
||||
privileged actions such as enqueueing and autoholding to
|
||||
third parties, scoped to a single tenant.
|
||||
At least one authenticator must be configured with a secret
|
||||
that can be used to sign the token.'''))
|
||||
cmd_create_auth_token.add_argument(
|
||||
'--auth-config',
|
||||
help=('The authenticator to use. '
|
||||
'Must match an authenticator defined in zuul\'s '
|
||||
'configuration file.'),
|
||||
default='zuul_operator',
|
||||
required=True)
|
||||
cmd_create_auth_token.add_argument(
|
||||
'--tenant',
|
||||
help='tenant name',
|
||||
required=True)
|
||||
cmd_create_auth_token.add_argument(
|
||||
'--user',
|
||||
help=("The user's name. Used for traceability in logs."),
|
||||
default=None,
|
||||
required=True)
|
||||
cmd_create_auth_token.add_argument(
|
||||
'--expires-in',
|
||||
help=('Token validity duration in seconds '
|
||||
'(default: %i)' % 600),
|
||||
type=int,
|
||||
default=600,
|
||||
required=False)
|
||||
cmd_create_auth_token.set_defaults(func=self.create_auth_token)
|
||||
|
||||
return parser
|
||||
|
||||
def parseArguments(self, args=None):
|
||||
@@ -286,6 +326,52 @@ class Client(zuul.cmd.ZuulApp):
|
||||
ref=self.args.ref)
|
||||
return r
|
||||
|
||||
def create_auth_token(self):
|
||||
auth_section = ''
|
||||
for section_name in self.config.sections():
|
||||
if re.match(r'^auth ([\'\"]?)%s(\1)$' % self.args.auth_config,
|
||||
section_name, re.I):
|
||||
auth_section = section_name
|
||||
break
|
||||
if auth_section == '':
|
||||
print('"%s" authenticator configuration not found.'
|
||||
% self.args.auth_config)
|
||||
sys.exit(1)
|
||||
token = {'exp': time.time() + self.args.expires_in,
|
||||
'iss': get_default(self.config, auth_section, 'issuer_id'),
|
||||
'aud': get_default(self.config, auth_section, 'client_id'),
|
||||
'sub': self.args.user,
|
||||
'zuul': {'admin': [self.args.tenant, ]},
|
||||
}
|
||||
driver = get_default(
|
||||
self.config, auth_section, 'driver')
|
||||
if driver == 'HS256':
|
||||
key = get_default(self.config, auth_section, 'secret')
|
||||
elif driver == 'RS256':
|
||||
private_key = get_default(self.config, auth_section, 'private_key')
|
||||
try:
|
||||
with open(private_key, 'r') as pk:
|
||||
key = pk.read()
|
||||
except Exception as e:
|
||||
print('Could not read private key at "%s": %s' % (private_key,
|
||||
e))
|
||||
sys.exit(1)
|
||||
else:
|
||||
print('Unknown or unsupported authenticator driver "%s"' % driver)
|
||||
sys.exit(1)
|
||||
try:
|
||||
auth_token = jwt.encode(token,
|
||||
key=key,
|
||||
algorithm=driver).decode('utf-8')
|
||||
print("Bearer %s" % auth_token)
|
||||
err_code = 0
|
||||
except Exception as e:
|
||||
print("Error when generating Auth Token")
|
||||
print(e)
|
||||
err_code = 1
|
||||
finally:
|
||||
sys.exit(err_code)
|
||||
|
||||
def promote(self):
|
||||
client = zuul.rpcclient.RPCClient(
|
||||
self.server, self.port, self.ssl_key, self.ssl_cert, self.ssl_ca)
|
||||
|
||||
Reference in New Issue
Block a user