Implement new API format

Signing requests are expected to arrive at

/v1/sign/<registration_authority>

now. Virtual registration authority is a new concept which right now includes
everything the original configuration included. That means for example each
registration authority available within Anchor deployment can configure its own
CA, auth, and validators. Clients request a specific registration authority via
the URL. This does not mean they can just choose who signs the CSR - they still
need to pass all checks. Only the guesswork of "which validation set applies to
them" is gone because of this change.

The previous concept of validator sets is gone. Each registration authority
configures its own validators and all of them need to pass.

Previous endpoint /sign will not work anymore. It's incompatible with
the new design.

The configuration file changes in the following way:
1. Registration authorities need to be defined in the main config.
2. Validator sets are not available anymore.
3. CA and auth settings at the top level need to be named. They can be referred
   to in the registration authority block.
4. Old names are removed. Any use of "auth", "ca", or "validators" at the top
   level will result in an error and explanation regarding the upgrade path.

Further documentation and a sample config with the new layout can be found in
the docs/configuration.rst file.

Closes-bug: 1463752
Change-Id: I5a949e0c79a2d56eadadf5ece62bb8b8eea89e78
This commit is contained in:
Stanisław Pitucha 2015-06-23 18:01:49 +10:00
parent 9cb201f60b
commit f1ed12e2cf
17 changed files with 727 additions and 222 deletions

View File

@ -60,60 +60,116 @@ def _check_file_exists(path):
def validate_config(conf):
logger = logging.getLogger("anchor")
for old_name in ['auth', 'ca', 'validators']:
if old_name in conf.config:
raise ConfigValidationException("The config seems to be for an "
"old version of Anchor. Please "
"check documentation.")
if not hasattr(conf, "auth") or not conf.auth:
raise ConfigValidationException("No authentication configured")
if not conf.config.get('registration_authority'):
raise ConfigValidationException("No registration authorities present")
if not hasattr(conf, "ca") or not conf.ca:
raise ConfigValidationException("No ca configuration present")
if not conf.config.get('signing_ca'):
raise ConfigValidationException("No signing CA configurations present")
if not conf.config.get('authentication'):
raise ConfigValidationException("No authentication methods present")
for name in conf.registration_authority.keys():
logger.info("Checking config for registration authority: %s", name)
validate_registration_authority_config(name, conf)
for name in conf.signing_ca.keys():
logger.info("Checking config for signing ca: %s", name)
validate_signing_ca_config(name, conf)
for name in conf.authentication.keys():
logger.info("Checking config for authentication method: %s", name)
validate_authentication_config(name, conf)
def validate_authentication_config(name, conf):
auth_conf = conf.authentication[name]
default_user = "myusername"
default_secret = "simplepassword"
if not auth_conf.get('backend'):
raise ConfigValidationException(
"Authentication method %s doesn't define backend" % name)
if auth_conf['backend'] not in ('static', 'keystone', 'ldap'):
raise ConfigValidationException(
"Authentication backend % unknown" % (auth_conf['backend'],))
# Check for anchor being run with default user/secret
if auth_conf['backend'] == 'static':
if auth_conf['user'] == default_user:
logger.warning("default user for static auth in use")
if auth_conf['secret'] == default_secret:
logger.warning("default secret for static auth in use")
def validate_signing_ca_config(name, conf):
ca_conf = conf.signing_ca[name]
# mandatory CA settings
ca_config_requirements = ["cert_path", "key_path", "output_path",
"signing_hash", "valid_hours"]
for requirement in ca_config_requirements:
if requirement not in conf.ca.keys():
raise ConfigValidationException("CA config missing: %s" %
requirement)
if requirement not in ca_conf.keys():
raise ConfigValidationException(
"CA config missing: %s (for signing CA %s)" % (requirement,
name))
# all are specified, check the CA certificate and key are readable with
# sane permissions
_check_file_exists(conf.ca['cert_path'])
_check_file_exists(conf.ca['key_path'])
_check_file_exists(ca_conf['cert_path'])
_check_file_exists(ca_conf['key_path'])
_check_file_permissions(conf.ca['key_path'])
_check_file_permissions(ca_conf['key_path'])
if not hasattr(conf, "validators"):
raise ConfigValidationException("No validators configured")
logger.info("Found {} validator sets.".format(len(conf.validators)))
for name, validator_set in conf.validators.items():
logger.info("Checking validator set <{}> ....".format(name))
if len(validator_set) == 0:
def validate_registration_authority_config(ra_name, conf):
ra_conf = conf.registration_authority[ra_name]
auth_name = ra_conf.get('authentication')
if not auth_name:
raise ConfigValidationException(
"No authentication configured for registration authority: %s" %
ra_name)
if not conf.authentication.get(auth_name):
raise ConfigValidationException(
"Authentication method %s configured for registration authority "
"%s doesn't exist" % (auth_name, ra_name))
ca_name = ra_conf.get('signing_ca')
if not ca_name:
raise ConfigValidationException(
"No signing CA configuration present for registration authority: "
"%s" % ra_name)
if not conf.signing_ca.get(ca_name):
raise ConfigValidationException(
"Signing CA %s configured for registration authority %s doesn't "
"exist" % (ca_name, ra_name))
if not ra_conf.get("validators"):
raise ConfigValidationException(
"No validators configured for registration authority: %s" %
ra_name)
ra_validators = ra_conf['validators']
for step in ra_validators.keys():
if not hasattr(validators, step):
raise ConfigValidationException(
"Validator set <{}> is empty".format(name))
"Unknown validator <{}> found (for registration "
"authority {})".format(step, ra_name))
for step in validator_set.keys():
if not hasattr(validators, step):
raise ConfigValidationException(
"Validator set <{}> contains an "
"unknown validator <{}>".format(name, step))
config_check_domains(validator_set)
logger.info("Validator set OK")
def check_default_auth(conf):
default_user = "myusername"
default_secret = "simplepassword"
# Check for anchor being run with default user/secret
if 'static' in conf.auth.keys():
if conf.auth['static']['user'] == default_user:
logger.warning("default user for static auth in use")
if conf.auth['static']['secret'] == default_secret:
logger.warning("default secret for static auth in use")
config_check_domains(ra_validators)
logger.info("Validators OK for registration authority: %s", ra_name)
def load_config():
@ -134,7 +190,7 @@ def load_config():
sys_config_path = os.path.join(os.sep, 'etc', 'anchor', 'config.json')
if 'auth' not in jsonloader.conf.config:
if 'registration_authority' not in jsonloader.conf.config:
config_path = ""
if config_name in os.environ:
config_path = os.environ[config_name]
@ -163,6 +219,4 @@ def setup_app(config):
**app_conf
)
check_default_auth(jsonloader.conf)
return paste.translogger.TransLogger(app, setup_console_handler=False)

View File

@ -21,22 +21,24 @@ from anchor.auth import static # noqa
from anchor import jsonloader
def validate(user, secret):
def validate(ra_name, user, secret):
"""Top-level authN entry point.
This will return an AuthDetails object or abort. This will only
check that a single auth method. That method will either succeed
or fail.
:param ra_name: name of the registration authority
:param user: user provided user name
:param secret: user provided secret (password or token)
:return: AuthDetails if authenticated or aborts
"""
for name in jsonloader.conf.auth.keys():
module = globals()[name]
res = module.login(user, secret)
if res:
return res
auth_conf = jsonloader.authentication_for_registration_authority(ra_name)
backend_name = auth_conf['backend']
module = globals()[backend_name]
res = module.login(ra_name, user, secret)
if res:
return res
# we should only get here if a module failed to abort
pecan.abort(401, "authentication failure")

View File

@ -23,7 +23,7 @@ from anchor import util
logger = logging.getLogger(__name__)
def login(user, secret):
def login(ra_name, user, secret):
"""Validates a user supplied user/password against an expected value.
The expected value is pulled from the pecan config. Note that this
@ -35,18 +35,21 @@ def login(user, secret):
leaked. It may also be possible to use a timing attack to see
which input failed validation. See comments below for details.
:param ra_name: name of the registration authority
:param user: The user supplied username (unicode or string)
:param secret: The user supplied password (unicode or string)
:return: None on failure or an AuthDetails object on success
"""
auth_conf = jsonloader.authentication_for_registration_authority(ra_name)
# convert input to strings
user = str(user)
secret = str(secret)
# expected values
try:
expected_user = str(jsonloader.conf.auth['static']['user'])
expected_secret = str(jsonloader.conf.auth['static']['secret'])
expected_user = str(auth_conf['user'])
expected_secret = str(auth_conf['secret'])
except (KeyError, TypeError):
logger.warning("auth conf missing static user or secret")
return None

View File

@ -85,60 +85,60 @@ def _run_validator(name, body, args):
return False
def validate_csr(auth_result, csr, request):
def validate_csr(ra_name, auth_result, csr, request):
"""Validates various aspects of the CSR based on the loaded config.
The arguments of this method are passed to the underlying validate
methods. Therefore, some may be optional, depending on which
validation routines are specified in the configuration.
:param ra_name: name of the registration authority
:param auth_result: AuthDetails value from auth.validate
:param csr: CSR value from certificate_ops.parse_csr
:param request: pecan request object associated with this action
"""
# TODO(tkelsey): make this more robust
ra_conf = jsonloader.config_for_registration_authority(ra_name)
args = {'auth_result': auth_result,
'csr': csr,
'conf': jsonloader.conf,
'conf': ra_conf,
'request': request}
# It is ok if the config doesn't have any validators listed
# so we set the initial state to valid.
valid = True
try:
for name, vset in jsonloader.conf.validators.items():
logger.debug("validate_csr: checking with set {}".format(name))
for vname, validator in vset.items():
valid = _run_validator(vname, validator, args)
if not valid:
break # early out at the first error
for vname, validator in ra_conf['validators'].items():
valid = _run_validator(vname, validator, args)
if not valid:
break
except Exception as e:
logger.exception("Error running validator <%s> - %s", vname, e)
pecan.abort(500, "Internal Validation Error running validator "
"'{}' in set '{}'".format(vname, name))
"'{}' for registration authority "
"'{}'".format(vname, ra_name))
# something failed, return a 400 to the client
if not valid:
pecan.abort(400, "CSR failed validation")
def sign(csr):
def sign(ra_name, csr):
"""Generate an X.509 certificate and sign it.
:param ra_name: name of the registration authority
:param csr: X509 certificate signing request
"""
ca_conf = jsonloader.signing_ca_for_registration_authority(ra_name)
try:
ca = certificate.X509Certificate.from_file(
jsonloader.conf.ca["cert_path"])
ca_conf['cert_path'])
except Exception as e:
logger.exception("Cannot load the signing CA: %s", e)
pecan.abort(500, "certificate signing error")
try:
key = utils.get_private_key_from_file(jsonloader.conf.ca['key_path'])
key = utils.get_private_key_from_file(ca_conf['key_path'])
except Exception as e:
logger.exception("Cannot load the signing CA key: %s", e)
pecan.abort(500, "certificate signing error")
@ -147,7 +147,7 @@ def sign(csr):
new_cert.set_version(2)
start_time = int(time.time())
end_time = start_time + (jsonloader.conf.ca['valid_hours'] * 60 * 60)
end_time = start_time + (ca_conf['valid_hours'] * 60 * 60)
new_cert.set_not_before(start_time)
new_cert.set_not_after(end_time)
@ -167,12 +167,12 @@ def sign(csr):
logger.info("Signing certificate for <%s> with serial <%s>",
csr.get_subject(), serial)
new_cert.sign(key, jsonloader.conf.ca['signing_hash'])
new_cert.sign(key, ca_conf['signing_hash'])
path = os.path.join(
jsonloader.conf.ca['output_path'],
ca_conf['output_path'],
'%s.crt' % new_cert.get_fingerprint(
jsonloader.conf.ca['signing_hash']))
ca_conf['signing_hash']))
logger.info("Saving certificate to: %s", path)

View File

@ -18,6 +18,7 @@ from pecan import rest
from anchor import auth
from anchor import certificate_ops
from anchor import jsonloader
logger = logging.getLogger(__name__)
@ -31,22 +32,40 @@ class RobotsController(rest.RestController):
return "User-agent: *\nDisallow: /\n"
class SignController(rest.RestController):
"""Handles POST requests to /sign."""
class SignInstanceController(rest.RestController):
"""Handles POST requests to /v1/sign/ra_name."""
def __init__(self, ra_name):
self.ra_name = ra_name
@pecan.expose(content_type="text/plain")
def post(self):
auth_result = auth.validate(pecan.request.POST.get('user'),
pecan.request.POST.get('secret'))
ra_name = self.ra_name
logger.debug("processing signing request in registration authority %s",
ra_name)
auth_result = auth.validate(ra_name,
pecan.request.POST.get('user'),
pecan.request.POST.get('secret'))
csr = certificate_ops.parse_csr(pecan.request.POST.get('csr'),
pecan.request.POST.get('encoding'))
certificate_ops.validate_csr(ra_name, auth_result, csr, pecan.request)
certificate_ops.validate_csr(auth_result, csr, pecan.request)
return certificate_ops.sign(ra_name, csr)
return certificate_ops.sign(csr)
class SignController(rest.RestController):
@pecan.expose()
def _lookup(self, ra_name, *remaining):
if ra_name in jsonloader.registration_authority_names():
return SignInstanceController(ra_name), remaining
pecan.abort(404)
class V1Controller(rest.RestController):
sign = SignController()
class RootController(object):
robots = RobotsController()
sign = SignController()
v1 = V1Controller()

View File

@ -36,19 +36,21 @@ class AnchorConf():
self._logger = logger
self._config = {}
def load_file_data(self, config_file):
'''Load a config from a file.'''
def _load_json_file(self, config_file):
try:
with open(config_file, 'r') as f:
self._config = json.load(f)
return json.load(f)
except IOError:
logger.error("could not open config file: %s" % config_file)
raise
except Exception:
except ValueError:
logger.error("error parsing config file: %s" % config_file)
raise
def load_file_data(self, config_file):
'''Load a config from a file.'''
self._config = self._load_json_file(config_file)
def load_str_data(self, data):
'''Load a config from string data.'''
self._config = json.loads(data)
@ -70,3 +72,33 @@ class AnchorConf():
conf = AnchorConf(logger)
def config_for_registration_authority(ra_name):
"""Get configuration for a given name."""
return conf.registration_authority[ra_name]
def authentication_for_registration_authority(ra_name):
"""Get authentication config for a given name.
This is only supposed to be called after config validation. All the right
elements are expected to be in place.
"""
auth_name = conf.registration_authority[ra_name]['authentication']
return conf.authentication[auth_name]
def signing_ca_for_registration_authority(ra_name):
"""Get signing ca config for a given name.
This is only supposed to be called after config validation. All the right
elements are expected to be in place.
"""
ca_name = conf.registration_authority[ra_name]['signing_ca']
return conf.signing_ca[ca_name]
def registration_authority_names():
"""List the names of supported registration authorities."""
return conf.registration_authority.keys()

View File

@ -5,7 +5,6 @@
"user": "myusername"
}
},
"ca": {
"cert_path": "CA/root-ca.crt",
"key_path": "CA/root-ca-unwrapped.key",
@ -13,53 +12,54 @@
"signing_hash": "sha256",
"valid_hours": 24
},
"validators": {
"default" : {
"common_name" : {
"allowed_domains": [
".example.com"
"instances": {
"default": {
"validators": {
"common_name" : {
"allowed_domains": [
".example.com"
]
},
"alternative_names": {
"allowed_domains": [
".example.com"
]
},
"server_group": {
"group_prefixes": {
"bk": "Bock_Team",
"cs": "CS_Team",
"gl": "Glance_Team",
"mb": "MB_Team",
"nv": "Nova_Team",
"ops": "SysEng_Team",
"qu": "Neutron_Team",
"sw": "Swift_Team"
}
},
"extensions": {
"allowed_extensions": [
"keyUsage",
"subjectAltName",
"basicConstraints",
"subjectKeyIdentifier"
]
},
"key_usage": {
"allowed_usage": [
"Digital Signature",
"Key Encipherment",
"Non Repudiation"
]
},
"ca_status": {
"ca_requested": false
},
"source_cidrs": {
"cidrs": [
"127.0.0.0/8"
]
},
"alternative_names": {
"allowed_domains": [
".example.com"
]
},
"server_group": {
"group_prefixes": {
"bk": "Bock_Team",
"cs": "CS_Team",
"gl": "Glance_Team",
"mb": "MB_Team",
"nv": "Nova_Team",
"ops": "SysEng_Team",
"qu": "Neutron_Team",
"sw": "Swift_Team"
}
},
"extensions": {
"allowed_extensions": [
"keyUsage",
"subjectAltName",
"basicConstraints",
"subjectKeyIdentifier"
]
},
"key_usage": {
"allowed_usage": [
"Digital Signature",
"Key Encipherment",
"Non Repudiation"
]
},
"ca_status": {
"ca_requested": false
},
"source_cidrs": {
"cidrs": [
"127.0.0.0/8"
]
}
}
}

28
docs/api.rst Normal file
View File

@ -0,0 +1,28 @@
API version 1
=============
The following endpoints are available in version 1 of the API.
/robots.txt (GET)
-----------------
Prevents attempts to index the service.
/v1/sign/<registration_authority> (POST)
----------------------------------------
Requests signing of the CSR provided in the POST parameters. The request is
processed by the selected virtual registration authority.
Request parameters
~~~~~~~~~~~~~~~~~~
* ``user``: username used in authentication (optional)
* ``secret``: secret used in authentication
* ``encoding``: request encoding - currently supported: "pem"
* ``csr``: the text of the submitted CSR
Result
~~~~~~
Signed certificate

197
docs/configuration.rst Normal file
View File

@ -0,0 +1,197 @@
Configuration files
===================
Anchor is configured using two files: ``config.py`` and ``config.json``. The
first one defines the Python and webservice related values. You can change the
listening iterface address and port there, as well as logging details to suit
your deployment. The second configuration defines the service behaviour at
runtime.
There are three main sections at the moment: ``authentication`` for
authentication parameters, ``signing_ca`` for defining signing authorities, and
``registration_authority`` for listing virtual registration authorities which
can be selected by client requests.
The main ``config.json`` structure looks like this:
.. code:: json
{
"authentication": { ... },
"signing_ca": { ... },
"registration_authority": { ... }
}
Each block apart from ``registration_authority`` defines a number of mapping
from labels to definitions. Those labels can then be used in the
``registration_authority`` block to refer to settings defined earlier.
Authentication
--------------
The authentication block can define any number of authentication blocks, each
using one specific authentication backend.
Currently available authentication methods are: ``static``, ``keystone``, and
``ldap``.
Static
~~~~~~
Username and password are present in ``config.json``. This mode should be used
only for development and testing.
.. code:: json
{
"authentication": {
"method_1": {
"backend": "static",
"secret": "simplepassword",
"user": "myusername"
}
}
}
Keystone
~~~~~~~~
Username is ignored, but password is a token valid in the configured keystone
location.
.. code:: json
{
"authentication": {
"method_2": {
"backend": "keystone",
"url": "https://keystone.example.com"
}
}
}
LDAP
~~~~
Username and password are used to bind to an LDAP user in a configured domain.
User's groups for the ``server_group`` filter are retrieved from attribute
``memberOf`` in search for ``(sAMAccountName=username@domain)``. The search is done
in the configured base.
.. code:: json
{
"authentication": {
"method_3": {
"backend": "ldap",
"host": "ldap.example.com",
"base": "ou=Users,dc=example,dc=com",
"domain": "example.com",
"port": 636,
"ssl": true
}
}
}
Signing authority
-----------------
The ``signing_ca`` section defines any number of signing authorities which can
be referenced later on. Currently there's only one, default implementation
which uses local files. An example configuration looks like this.
.. code:: json
{
"signing_ca": {
"local": {
"cert_path": "CA/root-ca.crt",
"key_path": "CA/root-ca-unwrapped.key",
"output_path": "certs",
"signing_hash": "sha256",
"valid_hours": 24
}
}
}
Parameters ``cert_path`` and ``key_path`` define the location of respectively
the CA certificate and its private key. The location where the local copies of
issued certificates is held is defiend by ``output_path``. The ``signing_hash``
defines the hash used to sign the results. The validity of issued certificates
(in hours) is set by ``valid_hours``.
Virtual registration authority
------------------------------
The registration authority section puts together previously described elements
and the list of validators applied to each request.
.. code:: json
{
"registration_authority": {
"default": {
"authentication": "method_1",
"signing_ca": "local",
"validators": {
"ca_status": {
"ca_requested": false
},
"source_cidrs": {
"cidrs": [ "127.0.0.0/8" ]
}
}
}
}
}
In the example above, CSRs sent to registration authority ``default`` will be
authenticated using previously defined block ``method_1``, will be validated
against two validators (``ca_status`` and ``source_cidrs``) and if they pass,
the CSR will be signed by the previously defined signing ca called ``local``.
Each validator has its own set of parameters described separately in the
:doc:`validators section </validators>`.
Example configuration
---------------------
.. code:: json
{
"authentication": {
"method_1": {
"backend": "static",
"secret": "simplepassword",
"user": "myusername"
}
},
"signing_ca": {
"local": {
"cert_path": "CA/root-ca.crt",
"key_path": "CA/root-ca-unwrapped.key",
"output_path": "certs",
"signing_hash": "sha256",
"valid_hours": 24
}
},
"registration_authority": {
"default": {
"authentication": "method_1",
"signing_ca": "local",
"validators": {
"ca_status": {
"ca_requested": false
},
"source_cidrs": {
"cidrs": [ "127.0.0.0/8" ]
}
}
}
}
}

View File

@ -10,6 +10,9 @@ Contents:
.. toctree::
:maxdepth: 2
configuration
api
validators
Indices and tables
==================

4
docs/validators.rst Normal file
View File

@ -0,0 +1,4 @@
Validators
==========
TODO

View File

@ -34,29 +34,37 @@ class DefaultConfigMixin(object):
def setUp(self):
self.sample_conf_auth = {
"static": {
"default_auth": {
"backend": "static",
"user": "myusername",
"secret": "simplepassword"
}
}
self.sample_conf_ca = {
"cert_path": "tests/CA/root-ca.crt",
"key_path": "tests/CA/root-ca-unwrapped.key",
"output_path": "certs",
"signing_hash": "sha256",
"valid_hours": 24
"default_ca": {
"cert_path": "tests/CA/root-ca.crt",
"key_path": "tests/CA/root-ca-unwrapped.key",
"output_path": "certs",
"signing_hash": "sha256",
"valid_hours": 24
}
}
self.sample_conf_validators = {
"steps": {
"common_name": {
"allowed_domains": [".test.com"]
}
"common_name": {
"allowed_domains": [".test.com"]
}
}
self.sample_conf_ra = {
"default_ra": {
"authentication": "default_auth",
"signing_ca": "default_ca",
"validators": self.sample_conf_validators
}
}
self.sample_conf = {
"auth": self.sample_conf_auth,
"ca": self.sample_conf_ca,
"validators": self.sample_conf_validators,
"authentication": self.sample_conf_auth,
"signing_ca": self.sample_conf_ca,
"registration_authority": self.sample_conf_ra,
}
super(DefaultConfigMixin, self).setUp()

View File

@ -35,39 +35,33 @@ class AuthStaticTests(tests.DefaultConfigMixin, unittest.TestCase):
def test_validate_static(self):
"""Test all static user/pass authentication paths."""
config = "anchor.jsonloader.conf._config"
self.sample_conf_auth['static'] = {'secret': 'simplepassword',
'user': 'myusername'}
self.sample_conf_auth['default_auth'] = {
"backend": "static",
"user": "myusername",
"secret": "simplepassword"
}
data = self.sample_conf
with mock.patch.dict(config, data):
valid_user = data['auth']['static']['user']
valid_pass = data['auth']['static']['secret']
valid_user = self.sample_conf_auth['default_auth']['user']
valid_pass = self.sample_conf_auth['default_auth']['secret']
expected = results.AuthDetails(username=valid_user, groups=[])
self.assertEqual(auth.validate(valid_user, valid_pass), expected)
self.assertEqual(auth.validate('default_ra', valid_user,
valid_pass), expected)
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate(valid_user, 'badpass')
auth.validate('default_ra', valid_user, 'badpass')
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', valid_pass)
auth.validate('default_ra', 'baduser', valid_pass)
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', 'badpass')
auth.validate('default_ra', 'baduser', 'badpass')
def test_validate_static_malformed1(self):
"""Test static user/pass authentication with malformed config."""
config = "anchor.jsonloader.conf._config"
self.sample_conf_auth['static'] = {}
self.sample_conf_auth['default_auth'] = {'backend': 'static'}
data = self.sample_conf
with mock.patch.dict(config, data):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', 'badpass')
def test_validate_static_malformed2(self):
"""Test static user/pass authentication with malformed config."""
config = "anchor.jsonloader.conf._config"
self.sample_conf['auth'] = {}
data = self.sample_conf
with mock.patch.dict(config, data):
with self.assertRaises(http_status.HTTPUnauthorized):
auth.validate('baduser', 'badpass')
auth.validate('default_ra', 'baduser', 'badpass')

View File

@ -41,8 +41,11 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
@mock.patch('anchor.app._check_file_exists')
@mock.patch('anchor.app._check_file_permissions')
def test_config_check_domains_good(self, a, b):
steps = self.sample_conf_validators['steps']
steps['common_name']['allowed_domains'] = ['.test.com']
self.sample_conf_ra['default_ra']['validators'] = {
"common_name": {
"allowed_domains": [".test.com"]
}
}
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
@ -53,8 +56,11 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
@mock.patch('anchor.app._check_file_exists')
@mock.patch('anchor.app._check_file_permissions')
def test_config_check_domains_bad(self, a, b):
steps = self.sample_conf_validators['steps']
steps['common_name']['allowed_domains'] = ['error.test.com']
self.sample_conf_ra['default_ra']['validators'] = {
"common_name": {
"allowed_domains": ["error.test.com"]
}
}
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
@ -77,24 +83,74 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
self.assertRaises(app.ConfigValidationException,
app._check_file_permissions, "/mock/path")
def test_validate_config_no_auth(self):
del self.sample_conf['auth']
def test_validate_old_config(self):
config = json.dumps({
"ca": {},
"auth": {},
"validators": {},
})
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"old version of Anchor",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
def test_validate_config_no_registration_authorities(self,
mock_check_perm):
del self.sample_conf['registration_authority']
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"No authentication configured",
"No registration authorities present",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
def test_validate_config_no_auth(self, mock_check_perm):
del self.sample_conf['authentication']
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"No authentication methods present",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
def test_validate_config_no_auth_backend(self, mock_check_perm):
del self.sample_conf_auth['default_auth']['backend']
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"Authentication method .* doesn't define "
"backend",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
def test_validate_config_no_ra_auth(self, mock_check_perm):
del self.sample_conf_ra['default_ra']['authentication']
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"No authentication .* for .* default_ra",
app.validate_config, jsonloader.conf)
def test_validate_config_no_ca(self):
del self.sample_conf['ca']
del self.sample_conf['signing_ca']
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"No ca configuration present",
"No signing CA configurations present",
app.validate_config, jsonloader.conf)
def test_validate_config_ca_config_reqs(self):
@mock.patch('anchor.app._check_file_permissions')
def test_validate_config_no_ra_ca(self, mock_check_perm):
del self.sample_conf_ra['default_ra']['signing_ca']
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
self.assertRaisesRegexp(app.ConfigValidationException,
"No signing CA .* for .* default_ra",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
def test_validate_config_ca_config_reqs(self, mock_check_perm):
ca_config_requirements = ["cert_path", "key_path", "output_path",
"signing_hash", "valid_hours"]
@ -118,11 +174,13 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
"could not read file: tests/CA/root-ca.crt",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
@mock.patch('os.path.isfile')
@mock.patch('os.access')
@mock.patch('os.stat')
def test_validate_config_no_validators(self, stat, access, isfile):
del self.sample_conf['validators']
def test_validate_config_no_validators(self, stat, access, isfile,
mock_check_perm):
self.sample_conf_ra['default_ra']['validators'] = {}
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
isfile.return_value = True
@ -132,45 +190,35 @@ class TestApp(tests.DefaultConfigMixin, unittest.TestCase):
"No validators configured",
app.validate_config, jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
@mock.patch('os.path.isfile')
@mock.patch('os.access')
@mock.patch('os.stat')
def test_validate_config_no_validator_steps(self, stat, access, isfile):
del self.sample_conf_validators['steps']
self.sample_conf_validators['no_steps'] = {}
def test_validate_config_unknown_validator(self, stat, access, isfile,
mock_check_perm):
self.sample_conf_validators['unknown_validator'] = {}
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
isfile.return_value = True
access.return_value = True
stat.return_value.st_mode = self.expected_key_permissions
self.assertRaisesRegexp(app.ConfigValidationException,
"Validator set <no_steps> is empty",
app.validate_config, jsonloader.conf)
with self.assertRaises(app.ConfigValidationException,
msg="Unknown validator <unknown_validator> "
"found (for registration authority "
"default)"):
app.validate_config(jsonloader.conf)
@mock.patch('anchor.app._check_file_permissions')
@mock.patch('os.path.isfile')
@mock.patch('os.access')
@mock.patch('os.stat')
def test_validate_config_unknown_validator(self, stat, access, isfile):
self.sample_conf_validators['steps']['unknown_validator'] = {}
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
isfile.return_value = True
access.return_value = True
stat.return_value.st_mode = self.expected_key_permissions
self.assertRaisesRegexp(app.ConfigValidationException,
"Validator set <steps> contains an "
"unknown validator <unknown_validator>",
app.validate_config, jsonloader.conf)
@mock.patch('os.path.isfile')
@mock.patch('os.access')
@mock.patch('os.stat')
def test_validate_config_good(self, stat, access, isfile):
def test_validate_config_good(self, stat, access, isfile, mock_check_perm):
config = json.dumps(self.sample_conf)
jsonloader.conf.load_str_data(config)
isfile.return_value = True
access.return_value = True
stat.return_value.st_mode = self.expected_key_permissions
app.validate_config(jsonloader.conf)
@mock.patch('anchor.jsonloader.conf.load_file_data')
def test_config_paths_env(self, conf):

View File

@ -104,59 +104,63 @@ class CertificateOpsTests(tests.DefaultConfigMixin, unittest.TestCase):
"""Test basic success path for validate_csr."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "anchor.jsonloader.conf._config"
self.sample_conf_validators['steps'] = {'extensions': {
self.sample_conf_ra['default_ra']['validators'] = {'extensions': {
'allowed_extensions': []}}
data = self.sample_conf
with mock.patch.dict(config, data):
certificate_ops.validate_csr(None, csr_obj, None)
certificate_ops.validate_csr('default_ra', None, csr_obj, None)
def test_validate_csr_bypass(self):
"""Test empty validator set for validate_csr."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "anchor.jsonloader.conf._config"
self.sample_conf['validators'] = {}
self.sample_conf_ra['default_ra']['validators'] = {}
data = self.sample_conf
with mock.patch.dict(config, data):
# this should work, it allows people to bypass validation
certificate_ops.validate_csr(None, csr_obj, None)
certificate_ops.validate_csr('default_ra', None, csr_obj, None)
def test_validate_csr_fail(self):
"""Test failure path for validate_csr."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "anchor.jsonloader.conf._config"
self.sample_conf_validators['steps'] = {'common_name': {
'allowed_domains': ['.testing.com']}}
self.sample_conf_ra['default_ra']['validators'] = {
'common_name': {
'allowed_domains': ['.testing.com']
}
}
data = self.sample_conf
with mock.patch.dict(config, data):
with self.assertRaises(http_status.HTTPException) as cm:
certificate_ops.validate_csr(None, csr_obj, None)
certificate_ops.validate_csr('default_ra', None, csr_obj, None)
self.assertEqual(cm.exception.code, 400)
def test_ca_cert_read_failure(self):
"""Test CA certificate read failure."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "anchor.jsonloader.conf._config"
self.sample_conf_ca['cert_path'] = '/xxx/not/a/valid/path'
self.sample_conf_ca['key_path'] = 'tests/CA/root-ca-unwrapped.key'
ca_conf = self.sample_conf_ca['default_ca']
ca_conf['cert_path'] = '/xxx/not/a/valid/path'
ca_conf['key_path'] = 'tests/CA/root-ca-unwrapped.key'
data = self.sample_conf
with mock.patch.dict(config, data):
with self.assertRaises(http_status.HTTPException) as cm:
certificate_ops.sign(csr_obj)
certificate_ops.sign('default_ra', csr_obj)
self.assertEqual(cm.exception.code, 500)
def test_ca_key_read_failure(self):
"""Test CA key read failure."""
csr_obj = certificate_ops.parse_csr(self.csr, 'pem')
config = "anchor.jsonloader.conf._config"
self.sample_conf_ca['cert_path'] = 'tests/CA/root-ca.crt'
self.sample_conf_ca['key_path'] = '/xxx/not/a/valid/path'
self.sample_conf_ca['default_ca']['cert_path'] = 'tests/CA/root-ca.crt'
self.sample_conf_ca['default_ca']['key_path'] = '/xxx/not/a/valid/path'
data = self.sample_conf
with mock.patch.dict(config, data):
with self.assertRaises(http_status.HTTPException) as cm:
certificate_ops.sign(csr_obj)
certificate_ops.sign('default_ra', csr_obj)
self.assertEqual(cm.exception.code, 500)

98
tests/test_config.py Normal file
View File

@ -0,0 +1,98 @@
# -*- coding:utf-8 -*-
#
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from anchor import jsonloader
import json
import logging
import sys
import unittest
import mock
import tests
logger = logging.getLogger(__name__)
# find the class representing an open file; it depends on the python version
# it's used later for mocking
if sys.version_info[0] < 3:
file_class = file # noqa
else:
import _io
file_class = _io.TextIOWrapper
class TestConfig(tests.DefaultConfigMixin, unittest.TestCase):
def test_wrong_key(self):
"""Wrong config key should raise the right error."""
jsonloader.conf = jsonloader.AnchorConf(logger)
with self.assertRaises(AttributeError):
jsonloader.conf.abcdef
def test_load_file(self):
"""Test loading of a correct configuration."""
jsonloader.conf = jsonloader.AnchorConf(logger)
open_name = 'anchor.jsonloader.open'
with mock.patch(open_name, create=True) as mock_open:
mock_open.return_value = mock.MagicMock(spec=file_class)
m_file = mock_open.return_value.__enter__.return_value
m_file.read.return_value = json.dumps(self.sample_conf)
jsonloader.conf.load_file_data('/tmp/impossible_path')
self.assertEqual(
(jsonloader.conf.registration_authority['default_ra']
['authentication']),
'default_auth')
self.assertEqual(
jsonloader.conf.signing_ca['default_ca']['valid_hours'],
24)
def test_load_file_cant_open(self):
"""Test failures when opening files."""
jsonloader.conf = jsonloader.AnchorConf(logger)
open_name = 'anchor.jsonloader.open'
with mock.patch(open_name, create=True) as mock_open:
mock_open.return_value = mock.MagicMock(spec=file_class)
mock_open.side_effect = IOError("can't open file")
with self.assertRaises(IOError):
jsonloader.conf.load_file_data('/tmp/impossible_path')
def test_load_file_cant_parse(self):
"""Test failues when parsing json format."""
jsonloader.conf = jsonloader.AnchorConf(logger)
open_name = 'anchor.jsonloader.open'
with mock.patch(open_name, create=True) as mock_open:
mock_open.return_value = mock.MagicMock(spec=file_class)
m_file = mock_open.return_value.__enter__.return_value
m_file.read.return_value = "{{{{ bad json"
with self.assertRaises(ValueError):
jsonloader.conf.load_file_data('/tmp/impossible_path')
def test_registration_authority_names(self):
"""Instances should be listed once config is loaded."""
jsonloader.conf = jsonloader.AnchorConf(logger)
jsonloader.conf.load_str_data(json.dumps(self.sample_conf))
self.assertEqual(list(jsonloader.registration_authority_names()),
['default_ra'])

View File

@ -76,12 +76,13 @@ class TestFunctional(tests.DefaultConfigMixin, unittest.TestCase):
# Load config from json test config
jsonloader.conf.load_str_data(json.dumps(self.sample_conf))
self.conf = getattr(jsonloader.conf, "_config")
self.conf["ca"]["output_path"] = tempfile.mkdtemp()
self.conf = jsonloader.conf._config
ca_conf = self.conf["signing_ca"]["default_ca"]
ca_conf["output_path"] = tempfile.mkdtemp()
# Set CA file permissions
os.chmod(self.conf["ca"]["cert_path"], stat.S_IRUSR | stat.S_IFREG)
os.chmod(self.conf["ca"]["key_path"], stat.S_IRUSR | stat.S_IFREG)
os.chmod(ca_conf["cert_path"], stat.S_IRUSR | stat.S_IFREG)
os.chmod(ca_conf["key_path"], stat.S_IRUSR | stat.S_IFREG)
app_conf = {"app": copy.deepcopy(config.app),
"logging": copy.deepcopy(config.logging)}
@ -92,7 +93,7 @@ class TestFunctional(tests.DefaultConfigMixin, unittest.TestCase):
self.app.reset()
def test_check_unauthorised(self):
resp = self.app.post('/sign', expect_errors=True)
resp = self.app.post('/v1/sign/default_ra', expect_errors=True)
self.assertEqual(401, resp.status_int)
def test_robots(self):
@ -105,16 +106,25 @@ class TestFunctional(tests.DefaultConfigMixin, unittest.TestCase):
'secret': 'simplepassword',
'encoding': 'pem'}
resp = self.app.post('/sign', data, expect_errors=True)
resp = self.app.post('/v1/sign/default_ra', data, expect_errors=True)
self.assertEqual(400, resp.status_int)
def test_check_unknown_instance(self):
data = {'user': 'myusername',
'secret': 'simplepassword',
'encoding': 'pem',
'csr': TestFunctional.csr_good}
resp = self.app.post('/v1/sign/unknown', data, expect_errors=True)
self.assertEqual(404, resp.status_int)
def test_check_bad_csr(self):
data = {'user': 'myusername',
'secret': 'simplepassword',
'encoding': 'pem',
'csr': TestFunctional.csr_bad}
resp = self.app.post('/sign', data, expect_errors=True)
resp = self.app.post('/v1/sign/default_ra', data, expect_errors=True)
self.assertEqual(400, resp.status_int)
def test_check_good_csr(self):
@ -123,7 +133,7 @@ class TestFunctional(tests.DefaultConfigMixin, unittest.TestCase):
'encoding': 'pem',
'csr': TestFunctional.csr_good}
resp = self.app.post('/sign', data, expect_errors=False)
resp = self.app.post('/v1/sign/default_ra', data, expect_errors=False)
self.assertEqual(200, resp.status_int)
cert = X509_cert.X509Certificate.from_buffer(resp.text)
@ -147,10 +157,11 @@ class TestFunctional(tests.DefaultConfigMixin, unittest.TestCase):
raise Exception("BOOM")
validators.broken_validator = derp
jsonloader.conf.validators["steps"]["broken_validator"] = {}
ra = jsonloader.conf.registration_authority['default_ra']
ra['validators']["broken_validator"] = {}
resp = self.app.post('/sign', data, expect_errors=True)
resp = self.app.post('/v1/sign/default_ra', data, expect_errors=True)
self.assertEqual(500, resp.status_int)
self.assertTrue(("Internal Validation Error running "
"validator 'broken_validator' "
"in set 'steps'") in str(resp))
self.assertTrue(("Internal Validation Error running validator "
"'broken_validator' for registration authority "
"'default_ra'") in str(resp))