Browse Source

Retiring Project

http://lists.openstack.org/pipermail/openstack-sigs/2018-August/000481.html

Depends-On: 90ca23f2ef5bf2cfdaf63552a7d8d8be325a03e6

Change-Id: I9ebc8cfcbb8906e9c4e1fd9e91205fe364bdc3c9
changes/22/597022/2
lhinds 4 years ago
parent
commit
93aacb43e6
  1. 7
      .coveragerc
  2. 29
      .gitignore
  3. 7
      .testr.conf
  4. 0
      CA/.empty
  5. 17
      CONTRIBUTING.rst
  6. 6
      Dockerfile
  7. 364
      README.rst
  8. 0
      anchor/X509/__init__.py
  9. 284
      anchor/X509/certificate.py
  10. 31
      anchor/X509/errors.py
  11. 523
      anchor/X509/extension.py
  12. 172
      anchor/X509/name.py
  13. 175
      anchor/X509/signature.py
  14. 244
      anchor/X509/signing_request.py
  15. 180
      anchor/X509/utils.py
  16. 0
      anchor/__init__.py
  17. 237
      anchor/app.py
  18. 0
      anchor/asn1/__init__.py
  19. 1505
      anchor/asn1/rfc3280.py
  20. 344
      anchor/asn1/rfc3281.py
  21. 663
      anchor/asn1/rfc3852.py
  22. 349
      anchor/asn1/rfc4211.py
  23. 1502
      anchor/asn1/rfc5280.py
  24. 666
      anchor/asn1/rfc5652.py
  25. 576
      anchor/asn1/rfc6402.py
  26. 132
      anchor/audit/__init__.py
  27. 44
      anchor/auth/__init__.py
  28. 55
      anchor/auth/keystone.py
  29. 75
      anchor/auth/ldap.py
  30. 32
      anchor/auth/results.py
  31. 69
      anchor/auth/static.py
  32. 201
      anchor/certificate_ops.py
  33. 80
      anchor/cmc.py
  34. 48
      anchor/config.py
  35. 106
      anchor/controllers/__init__.py
  36. 2
      anchor/errors.py
  37. 44
      anchor/fixups.py
  38. 135
      anchor/jsonloader.py
  39. 91
      anchor/signers/__init__.py
  40. 74
      anchor/signers/cryptography_io.py
  41. 120
      anchor/signers/pkcs11.py
  42. 86
      anchor/util.py
  43. 84
      anchor/validation.py
  44. 0
      anchor/validators/__init__.py
  45. 313
      anchor/validators/custom.py
  46. 16
      anchor/validators/errors.py
  47. 42
      anchor/validators/internal.py
  48. 111
      anchor/validators/standards.py
  49. 137
      anchor/validators/utils.py
  50. 10
      asn/autogenerated.txt
  51. 628
      asn/rfc3280.e.asn
  52. 347
      asn/rfc3280.i.asn
  53. 51
      asn/rfc3852.e.asn
  54. 333
      asn/rfc3852.i.asn
  55. 284
      asn/rfc4211.asn
  56. 425
      asn/rfc6402.asn
  57. 43
      bin/anchor
  58. 6
      bin/anchor_debug
  59. 67
      bin/container_bootstrap.py
  60. 0
      certs/.empty
  61. 36
      config.json
  62. 48
      doc/source/api.rst
  63. 27
      doc/source/audit.rst
  64. 258
      doc/source/conf.py
  65. 240
      doc/source/configuration.rst
  66. 129
      doc/source/ephemeralPKI.rst
  67. 42
      doc/source/extensions.rst
  68. 19
      doc/source/fixups.rst
  69. 29
      doc/source/index.rst
  70. 117
      doc/source/signing_backends.rst
  71. 184
      doc/source/validators.rst
  72. 17
      requirements.txt
  73. 161
      run_tests.sh
  74. 67
      setup.cfg
  75. 29
      setup.py
  76. 18
      test-requirements.txt
  77. 15
      tests/CA/root-ca-unwrapped.key
  78. 58
      tests/CA/root-ca.crt
  79. 0
      tests/X509/__init__.py
  80. 270
      tests/X509/test_extension.py
  81. 34
      tests/X509/test_utils.py
  82. 296
      tests/X509/test_x509_certificate.py
  83. 199
      tests/X509/test_x509_csr.py
  84. 132
      tests/X509/test_x509_name.py
  85. 102
      tests/__init__.py
  86. 0
      tests/auth/__init__.py
  87. 143
      tests/auth/test_keystone.py
  88. 117
      tests/auth/test_ldap.py
  89. 70
      tests/auth/test_static.py
  90. 0
      tests/controllers/__init__.py
  91. 256
      tests/controllers/test_app.py
  92. 0
      tests/fixups/__init__.py
  93. 109
      tests/fixups/test_fixup_ensure_alternative_names_present.py
  94. 84
      tests/fixups/test_fixup_functionality.py
  95. 148
      tests/test_certificate_ops.py
  96. 98
      tests/test_config.py
  97. 149
      tests/test_functional.py
  98. 265
      tests/test_signing_backend.py
  99. 0
      tests/validators/__init__.py
  100. 82
      tests/validators/test_base_validation_functions.py
  101. Some files were not shown because too many files have changed in this diff Show More

7
.coveragerc

@ -1,7 +0,0 @@
[run]
branch = True
[report]
exclude_lines =
pragma: no cover
raise NotImplementedError

29
.gitignore vendored

@ -1,29 +0,0 @@
*.pyc
temp-*.crt
config.cfg
.venv
*.sw[op]
certs/*.crt
dist/*
build/*
.tox/*
.DS_Store
*.egg
*.egg-info
.testrepository
build/*
cover/*
.cover
.coverage
doc/build
.eggs/
pep8.txt
AUTHORS
ChangeLog
# mentioned in README for test/bootstrap
anchor-test.example.com.key
anchor-test.example.com.csr
CA/serial
CA/*.key
CA/*.crt

7
.testr.conf

@ -1,7 +0,0 @@
[DEFAULT]
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./tests $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

17
CONTRIBUTING.rst

@ -1,17 +0,0 @@
This project is part of the OpenStack / Stackforge family. If you would like to
contribute to the development, you must follow the steps in this page:
http://docs.openstack.org/infra/manual/developers.html
Once those steps have been completed, changes to OpenStack should be submitted
for review via the Gerrit tool, following the workflow documented at:
http://docs.openstack.org/infra/manual/developers.html#development-workflow
(in short - install git-review package, then submit changes via `git review`)
Pull requests submitted through GitHub will be ignored.
Bugs should be filed on Launchpad, not GitHub:
https://bugs.launchpad.net/anchor

6
Dockerfile

@ -1,6 +0,0 @@
FROM python:2.7
RUN pip install pecan
ADD . /code
WORKDIR /code
RUN pip install -e .
ENTRYPOINT ["python","bin/container_bootstrap.py"]

364
README.rst

@ -1,358 +1,10 @@
Anchor
======
This project is no longer maintained.
.. image:: https://img.shields.io/pypi/v/anchor.svg
:target: https://pypi.python.org/pypi/anchor/
:alt: Latest Version
The contents of this repository are still available in the Git
source code management system. To see the contents of this
repository before it reached its end of life, please check out the
previous commit with "git checkout HEAD^1".
.. image:: https://img.shields.io/pypi/pyversions/anchor.svg
:target: https://pypi.python.org/pypi/anchor/
:alt: Python Versions
.. image:: https://img.shields.io/pypi/format/anchor.svg
:target: https://pypi.python.org/pypi/anchor/
:alt: Format
.. image:: https://img.shields.io/badge/license-Apache%202-blue.svg
:target: https://git.openstack.org/cgit/openstack/anchor/plain/LICENSE
:alt: License
Anchor is an ephemeral PKI service that, based on certain conditions,
automates the verification of CSRs and signs certificates for clients.
The validity period can be set in the config file with hour resolution.
Ideas behind Anchor
===================
A critical capability within PKI is to revoke a certificate - to ensure
that it is no longer trusted by any peer. Unfortunately research has
demonstrated that the two typical methods of revocation (Certificate
Revocation Lists and Online Certificate Status Protocol) both have
failings that make them unreliable, especially when attempting to
leverage PKI outside of web-browser software.
Through the use of short-lifetime certificates Anchor introduces the
concept of "passive revocation". By issuing certificates with lifetimes
measured in hours, revocation can be achieved by simply not re-issuing
certificates to clients.
The benefits of using Anchor instead of manual long-term certificates
are:
* quick certificate revoking / rotation
* always tested certificate update mechanism (used daily)
* easy integration with certmonger for service restarting
* certificates are signed only when validation is passed
* signing certificates follows consistent process
Installation
============
In order to install Anchor from source, the following system
dependencies need to be present:
* python 2.7
* python (dev files)
* libffi (dev)
* libssl (dev)
When everything is in place, Anchor can be installed in one of three
ways: a local development instance in a python virtual environment, a local
production instance or a test instance in a docker container.
For a development instance with virtualenv, run:
virtualenv .venv && source .venv/bin/activate && pip install .
For installing in production, either install a perpared system package,
or install globally in the system:
python setup.py install
Running the service
===================
In order to run the service, it needs to be started via the `pecan`
application server. The only extra parameter is a config file:
pecan serve anchor/config.py
For development, an additional `--reload` parameter may be used. It will
cause the service to reload every time a source file is changed, however
it requires installing an additional `watchdog` python module.
In the default configuration, Anchor will wait for web requests on port
5016 on local network interface. This can be adjusted in the `config.py`
file.
Preparing a test environment
============================
In order to test Anchor with the default configuration, the following
can be done to create a test CA. The test certificate can be then used
to sign the new certificates.
openssl req -out CA/root-ca.crt -keyout CA/root-ca-unwrapped.key \
-newkey rsa:4096 -subj "/CN=Anchor Test CA" -nodes -x509 -days 365 \
-sha256
chmod 0400 CA/root-ca-unwrapped.key
Next, a new certificate request may be generated:
openssl req -out anchor-test.example.com.csr -nodes \
-keyout anchor-test.example.com.key -newkey rsa:2048 \
-subj "/CN=anchor-test.example.com" -sha256
That reqest can be submitted using curl (while `pecan serve config.py`
is running):
curl http://0.0.0.0:5016/v1/sign/default -F user='myusername' \
-F secret='simplepassword' -F encoding=pem \
-F 'csr=<anchor-test.example.com.csr'
This will result in the signed request being created in the `certs`
directory.
Docker test environment
=======================
We have published a docker image for anchor at
https://hub.docker.com/r/openstacksecurity/anchor/ These instructions expect
the reader to have a working Docker install already. Docker should *not* be
used to serve Anchor in any production environments.
The behaviour of the Anchor container is controlled through docker volumes. To
run a plain version of Anchor, with a default configuration and a dynamically
generated private key simply invoke the container without any volumes. Note
that Anchor exposes port 5016:
docker run -p 5016:5016 openstacksecurity/anchor
The recommended way to use the anchor container is to use a pre-compiled private
key and certificate. You can read more about generating these (if you do not
already have them) in this readme.
Once a key and certificate have been created, they can be provided to Anchor
using docker volumes. In this example we've stored the sensitive data in
/var/keys (note, docker must be able to access the folder where you have stored
your keys). When the container starts it looks for a mounted volume in '/key'
and files called root-ca-unwrapped.key and root-ca.crt that it will use.
docker run -p 5016:5016 -v /var/keys:/key anchor
Anchor is highly configurable, you can read more about Anchor configuration in
the documentation here:
http://docs.openstack.org/developer/anchor/configuration.html the method for
exposing configuration to Anchor is very similar as for keys, simply provide
docker with the folder the config.json is within and create a volume called
/config In the below example, Anchor will start with a custom configuration but
as no key was provided it will generate one on the fly.
docker run -p 5016:5016 -v /var/config:/config anchor
Obviously it's possible to run Anchor with a custom configuration and a custom
key/certificate by running the following (note in this case we've used -d to
detach the container from our terminal)
docker run -d -p 5016:5016 -v /var/config:/config -v /var/keys:/key anchor
If you prefer to use locally built containers or want to modify the container
build you can do that, we provide a simple Dockerfile to make the process
easier.
Assuming you are already in the anchor directory, build a container
called 'anchor' that runs the anchor service, with any local changes
that have been made in the repo:
docker build -t anchor .
To start the service in the container and serve Anchor on port 5016:
docker run -p 5016:5016 anchor
When Anchor is running in a container, certificate requests will not pass
validation unless the docker network is added as a source_cidr in the Anchor
configuration and then passed into the container. Find the network by starting
the container, inspecting the docker network and finding the anchor container:
docker run -p 5016:5016 --name=anchor anchor
docker network inspect bridge
Under the 'containers' section, find the 'anchor' container and find the
IPv4Address. For example:
"Containers": {
"6998a....5f4a57": {
"Name": "anchor",
"MacAddress": "02:42:ac:11:00:03",
"IPv4Address": "172.17.0.3/16",
Add this network as a source_cidr to the config.json, and pass it to the
docker container as described above.
Running Anchor in production
============================
Anchor shouldn't be exposed directly to the network. It's running via an
application server (Pecan) and doesn't have all the features you'd
normally expect from a http proxy - for example dealing well with
deliberately slow connections, or using multiple workers. Anchor can
however be run in production using a better frontend.
To run Anchor using uwsgi you can use the following command:
uwsgi --http-socket :5016 --venv path/to/venv --pecan config.py -p 4
In case a more complex scripted configuration is needed, for example to
handle custom headers, rate limiting, or source filtering a complete
HTTP proxy like Nginx may be needed. This is however out of scope for
Anchor project. You can read more about production deployment in
`Pecan documentation <http://pecan.readthedocs.org/en/latest/deployment.html>`_.
Additionally, using an AppArmor profile for Anchor is a good idea to
prevent exploits relying on one of the native libraries used by Anchor
(for example OpenSSL). This can be done with sample profiles which you
can find in the `tools/apparmor.anchor_*` files. The used file needs to
be reviewed and updated with the right paths depending on the deployment
location.
Validators
==========
One of the main features of Anchor are the validators which make sure
that all requests match a given set of rules. They're configured in
`config.json` and the sample configuration includes a few of them.
Each validator takes a dictionary of options which provide the specific
matching conditions.
Currently available validators are:
* `common_name` ensures CN matches one of names in `allowed_domains` or
ranges in `allowed_networks`
* `alternative_names` ensures alternative names match one of the names
in `allowed_domains`
* `alternative_names_ip` ensures alternative names match one of the
names in `allowed_domains` or IP ranges in `allowed_networks`
* `blacklist_names` ensures CN and alternative names do not contain any
of the configured `domains`
* `server_group` ensures the group the requester is contained within
`group_prefixes`
* `extensions` ensures only `allowed_extensions` are present in the
request
* `key_usage` ensures only `allowed_usage` is requested for the
certificate
* `ca_status` ensures the request does/doesn't require the CA flag
* `source_cidrs` ensures the request comes from one of the ranges in
`cidrs`
A configuration entry for a validator might look like one from the
sample config:
"key_usage": {
"allowed_usage": [
"Digital Signature",
"Key Encipherment",
"Non Repudiation"
]
}
Authentication
==============
Anchor can use one of the following authentication modules: static,
keystone, ldap.
Static: Username and password are present in `config.json`. This mode
should be used only for development and testing.
"auth": {
"static": {
"secret": "simplepassword",
"user": "myusername"
}
}
Keystone: Username is ignored, but password is a token valid in the
configured keystone location.
"auth": {
"keystone": {
"url": "https://keystone.example.com"
}
}
LDAP: Username and password are used to bind to an LDAP user in a
configured domain. User's groups for the `server_group` filter are
retrieved from attribute `memberOf` in search for
`(sAMAccountName=username@domain)`. The search is done in the configured
base.
"auth": {
"ldap": {
"host": "ldap.example.com",
"base": "ou=Users,dc=example,dc=com",
"domain": "example.com"
"port": 636,
"ssl": true
}
}
Signing backends
================
Anchor allows the use of configurable signing backend. Currently it provides two
implementation: one based on cryptography.io ("anchor"), the other using PKCS#11
libraries ("pkcs11"). The first one is used in the sample config. Other backends
may have extra dependencies: pkcs11 requires the PyKCS11 module, not required by
anchor by default.
The resulting certificate is stored locally if the `output_path` is set
to any string. This does not depend on the configured backend.
Backends can specify their own options - please refer to the backend
documentation for the specific list. The default backend takes the
following options:
* `cert_path`: path where local CA certificate can be found
* `key_path`: path to the key for that certificate
* `signing_hash`: which hash method to use when producing signatures
* `valid_hours`: number of hours the signed certificates are valid for
Sample configuration for the default backend:
"ca": {
"backend": "anchor"
"cert_path": "CA/root-ca.crt",
"key_path": "CA/root-ca-unwrapped.key",
"output_path": "certs",
"signing_hash": "sha256",
"valid_hours": 24
}
Other backends may be created too. For more information, please refer to the
documentation.
Fixups
======
Anchor can modify the submitted CSRs in order to enforce some rules,
remove deprecated elements, or just add information. Submitted CSR may
be modified or entirely redone. Fixup are loaded from "anchor.fixups"
namespace and can take parameters just like validators.
Reporting bugs and contributing
===============================
For bug reporting and contributing, please check the CONTRIBUTING.rst
file.
For any further questions, please email
openstack-dev@lists.openstack.org or join #openstack-dev on
Freenode.

0
anchor/X509/__init__.py

284
anchor/X509/certificate.py

@ -1,284 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import base64
import binascii
import io
from cryptography.hazmat import backends as cio_backends
from cryptography.hazmat.primitives import hashes
from pyasn1.codec.ber import encoder as ber_encoder
from pyasn1.codec.der import decoder
from pyasn1.codec.der import encoder
from pyasn1.type import univ as asn1_univ
from pyasn1_modules import pem
from anchor.asn1 import rfc5280
from anchor.X509 import errors
from anchor.X509 import extension
from anchor.X509 import name
from anchor.X509 import signature
from anchor.X509 import utils
SIGNING_ALGORITHMS = {
('RSA', 'SHA224'): asn1_univ.ObjectIdentifier('1.2.840.113549.1.1.14'),
('RSA', 'SHA256'): asn1_univ.ObjectIdentifier('1.2.840.113549.1.1.11'),
('RSA', 'SHA384'): asn1_univ.ObjectIdentifier('1.2.840.113549.1.1.12'),
('RSA', 'SHA512'): asn1_univ.ObjectIdentifier('1.2.840.113549.1.1.13'),
('DSA', 'SHA224'): asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.3.1'),
('DSA', 'SHA256'): asn1_univ.ObjectIdentifier('2.16.840.1.101.3.4.3.2'),
}
SIGNING_ALGORITHMS_INV = dict((v, k) for k, v in SIGNING_ALGORITHMS.items())
class X509CertificateError(errors.X509Error):
"""Specific error for X509 certificate operations."""
pass
class X509Certificate(signature.SignatureMixin):
"""X509 certificate class."""
def __init__(self, certificate=None):
if certificate is None:
self._cert = rfc5280.Certificate()
self._cert['tbsCertificate'] = rfc5280.TBSCertificate()
else:
self._cert = certificate
@staticmethod
def from_open_file(f):
try:
der_content = pem.readPemFromFile(f)
certificate = decoder.decode(der_content,
asn1Spec=rfc5280.Certificate())[0]
return X509Certificate(certificate)
except Exception:
raise X509CertificateError("Could not read X509 certificate from "
"PEM data.")
@staticmethod
def from_buffer(data):
"""Build this X509 object from a data buffer in memory.
:param data: A data buffer
"""
return X509Certificate.from_open_file(io.StringIO(data))
@staticmethod
def from_file(path):
"""Build this X509 certificate object from a data file on disk.
:param path: A data buffer
"""
with open(path, 'r') as f:
return X509Certificate.from_open_file(f)
def as_pem(self):
"""Serialise this X509 certificate object as PEM string."""
header = '-----BEGIN CERTIFICATE-----'
footer = '-----END CERTIFICATE-----'
der_cert = encoder.encode(self._cert)
b64_encoder = (base64.encodestring if str is bytes else
base64.encodebytes)
b64_cert = b64_encoder(der_cert).decode('ascii')
return "%s\n%s%s\n" % (header, b64_cert, footer)
def set_version(self, v):
"""Set the version of this X509 certificate object.
:param v: The version
"""
self._cert['tbsCertificate']['version'] = v
def get_version(self):
"""Get the version of this X509 certificate object."""
return self._cert['tbsCertificate']['version']
def get_validity(self):
if self._cert['tbsCertificate']['validity'] is None:
self._cert['tbsCertificate']['validity'] = None
return self._cert['tbsCertificate']['validity']
def set_not_before(self, t):
"""Set the 'not before' date field.
:param t: time in seconds since the epoch
"""
asn1_time = utils.timestamp_to_asn1_time(t)
validity = self.get_validity()
validity['notBefore'] = asn1_time
def get_not_before(self):
"""Get the 'not before' date field as seconds since the epoch."""
validity = self.get_validity()
not_before = validity['notBefore']
return utils.asn1_time_to_timestamp(not_before)
def set_not_after(self, t):
"""Set the 'not after' date field.
:param t: time in seconds since the epoch
"""
asn1_time = utils.timestamp_to_asn1_time(t)
validity = self.get_validity()
validity['notAfter'] = asn1_time
def get_not_after(self):
"""Get the 'not after' date field as seconds since the epoch."""
validity = self.get_validity()
not_after = validity['notAfter']
return utils.asn1_time_to_timestamp(not_after)
def set_pubkey(self, pkey):
"""Set the public key field.
:param pkey: The public key, rfc5280.SubjectPublicKeyInfo description
"""
self._cert['tbsCertificate']['subjectPublicKeyInfo'] = pkey
def get_subject(self):
"""Get the subject name field value.
:return: An X509Name object instance
"""
val = self._cert['tbsCertificate']['subject'][0]
return name.X509Name(val)
def set_subject(self, subject):
"""Set the subject name filed value.
:param subject: An X509Name object instance
"""
val = subject._name_obj
if self._cert['tbsCertificate']['subject'] is None:
self._cert['tbsCertificate']['subject'] = rfc5280.Name()
self._cert['tbsCertificate']['subject'][0] = val
def set_issuer(self, issuer):
"""Set the issuer name field value.
:param issuer: An X509Name object instance
"""
val = issuer._name_obj
if self._cert['tbsCertificate']['issuer'] is None:
self._cert['tbsCertificate']['issuer'] = rfc5280.Name()
self._cert['tbsCertificate']['issuer'][0] = val
def get_issuer(self):
"""Get the issuer name field value.
:return: An X509Name object instance
"""
val = self._cert['tbsCertificate']['issuer'][0]
return name.X509Name(val)
def set_serial_number(self, serial):
"""Set the serial number
The serial number is a 32 bit integer value that should be unique to
each certificate issued by a given certificate authority.
:param serial: The serial number, 32 bit integer
"""
self._cert['tbsCertificate']['serialNumber'] = serial
def get_serial_number(self,):
return self._cert['tbsCertificate']['serialNumber']
def _get_extensions(self):
if self._cert['tbsCertificate']['extensions'] is None:
# this actually initialises the extensions tag rather than
# assign None
self._cert['tbsCertificate']['extensions'] = None
return self._cert['tbsCertificate']['extensions']
def get_extensions(self, ext_type=None):
extensions = self._get_extensions()
return [extension.construct_extension(e) for e in extensions
if ext_type is None or e['extnID'] == ext_type._oid]
def add_extension(self, ext, index):
"""Add an X509 V3 Certificate extension.
:param ext: An X509Extension instance
:param index: The index of the extension
"""
if not isinstance(ext, extension.X509Extension):
raise errors.X509Error("ext needs to be a pyasn1 extension")
extensions = self._get_extensions()
extensions[index] = ext.as_asn1()
def _get_bytes_to_sign(self):
return encoder.encode(self._cert['tbsCertificate'])
def _embed_signature_algorithm(self, algo_id):
self._cert['tbsCertificate']['signature'] = algo_id
def _embed_signature(self, algo_id, signature):
self._cert['signature'] = "'%s'H" % (
str(binascii.hexlify(signature).decode('ascii')),)
self._cert['signatureAlgorithm'] = algo_id
def _get_signature(self):
return utils.bin_to_bytes(self._cert['signature'])
def _get_signing_algorithm(self):
tbs_signature = self._cert['tbsCertificate']['signature']
cert_signature = self._cert['signatureAlgorithm']
if tbs_signature != cert_signature:
raise errors.X509Error("algorithms mismatch")
return tbs_signature['algorithm']
def as_der(self):
"""Return this X509 certificate as DER encoded data."""
return encoder.encode(self._cert)
def get_fingerprint(self, md='sha256'):
"""Get the fingerprint of this X509 certificate.
:param md: The message digest algorithm used to compute the fingerprint
:return: The fingerprint encoded as a hex string
"""
hash_class = utils.get_hash_class(md)
if hash_class is None:
raise errors.X509Error(
"Unknown hash %s" % (md,))
hasher = hashes.Hash(hash_class(),
backend=cio_backends.default_backend())
hasher.update(self.as_der())
return binascii.hexlify(hasher.finalize()).upper().decode('ascii')
def get_key_id(self):
"""Construct a key identifier from public key.
Return the hash useful for keyIdentifier field, constructed as
described in RFC5280 section 4.2.1.2, method 1. The result is
SHA1(subjectPublicKey).
"""
key_info = self._cert['tbsCertificate']['subjectPublicKeyInfo']
public_key = key_info['subjectPublicKey']
# get the actual bit string value, without the length and tags
value = ber_encoder.BitStringEncoder().encodeValue(
None, public_key, True, None)[0][1:]
digest = hashes.Hash(hashes.SHA1(),
backend=cio_backends.default_backend())
digest.update(value)
return digest.finalize()

31
anchor/X509/errors.py

@ -1,31 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# not needed right now, just to be consistent and future-proof
from __future__ import absolute_import
class X509Error(Exception):
"""Base exception for X509 errors."""
def __init__(self, what):
super(X509Error, self).__init__(what)
class ASN1TimeError(Exception):
"""Base exception for ASN1-time related errors."""
pass
class ASN1StringError(X509Error):
"""Base exception for ASN1-string related errors."""
pass

523
anchor/X509/extension.py

@ -1,523 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import functools
import netaddr
from pyasn1.codec.der import decoder
from pyasn1.codec.der import encoder
from pyasn1.type import constraint as asn1_constraint
from pyasn1.type import namedtype as asn1_namedtype
from pyasn1.type import tag as asn1_tag
from pyasn1.type import univ as asn1_univ
from anchor.asn1 import rfc5280
from anchor.X509 import errors
from anchor.X509 import utils
# missing extended use ids from rfc5280
id_kp_OCSPSigning = asn1_univ.ObjectIdentifier(rfc5280.id_kp.asTuple() + (9,))
anyExtendedKeyUsage = asn1_univ.ObjectIdentifier(
rfc5280.id_ce_extKeyUsage.asTuple() + (0,))
# names matching openssl
EXT_KEY_USAGE_NAMES = {
rfc5280.id_kp_serverAuth: "TLS Web Server Authentication",
rfc5280.id_kp_clientAuth: "TLS Web Client Authentication",
rfc5280.id_kp_codeSigning: "Code Signing",
rfc5280.id_kp_emailProtection: "E-mail Protection",
rfc5280.id_kp_timeStamping: "Time Stamping",
id_kp_OCSPSigning: "OCSP Signing",
anyExtendedKeyUsage: "Any Extended Key Usage",
}
EXT_KEY_USAGE_NAMES_INV = dict((v, k) for k, v in EXT_KEY_USAGE_NAMES.items())
EXT_KEY_USAGE_SHORT_NAMES = {
rfc5280.id_kp_serverAuth: "serverAuth",
rfc5280.id_kp_clientAuth: "clientAuth",
rfc5280.id_kp_codeSigning: "codeSigning",
rfc5280.id_kp_emailProtection: "emailProtection",
rfc5280.id_kp_timeStamping: "timeStamping",
id_kp_OCSPSigning: "ocspSigning",
anyExtendedKeyUsage: "anyExtendedKeyUsage",
}
EXT_KEY_USAGE_SHORT_NAMES_INV = dict((v, k) for k, v in
EXT_KEY_USAGE_SHORT_NAMES.items())
EXTENSION_NAMES = {
rfc5280.id_ce_policyConstraints: 'policyConstraints',
rfc5280.id_ce_basicConstraints: 'basicConstraints',
rfc5280.id_ce_subjectDirectoryAttributes: 'subjectDirectoryAttributes',
rfc5280.id_ce_deltaCRLIndicator: 'deltaCRLIndicator',
rfc5280.id_ce_cRLDistributionPoints: 'cRLDistributionPoints',
rfc5280.id_ce_issuingDistributionPoint: 'issuingDistributionPoint',
rfc5280.id_ce_nameConstraints: 'nameConstraints',
rfc5280.id_ce_certificatePolicies: 'certificatePolicies',
rfc5280.id_ce_policyMappings: 'policyMappings',
rfc5280.id_ce_privateKeyUsagePeriod: 'privateKeyUsagePeriod',
rfc5280.id_ce_keyUsage: 'keyUsage',
rfc5280.id_ce_authorityKeyIdentifier: 'authorityKeyIdentifier',
rfc5280.id_ce_subjectKeyIdentifier: 'subjectKeyIdentifier',
rfc5280.id_ce_certificateIssuer: 'certificateIssuer',
rfc5280.id_ce_subjectAltName: 'subjectAltName',
rfc5280.id_ce_issuerAltName: 'issuerAltName',
}
LONG_KEY_USAGE_NAMES = {
"Digital Signature": "digitalSignature",
"Non Repudiation": "nonRepudiation",
"Key Encipherment": "keyEncipherment",
"Data Encipherment": "dataEncipherment",
"Key Agreement": "keyAgreement",
"Certificate Sign": "keyCertSign",
"CRL Sign": "cRLSign",
"Encipher Only": "encipherOnly",
"Decipher Only": "decipherOnly",
}
def uses_ext_value(f):
"""Wrapper allowing reading of extension value.
Because the value is normally saved in a (double) serialised way, it's
not easily accessible to the member methods. This is made easier by
unpacking the extension value into an extra argument.
"""
@functools.wraps(f)
def ext_value_filled(self, *args, **kwargs):
kwargs['ext_value'] = self._get_value()
return f(self, *args, **kwargs)
return ext_value_filled
def modifies_ext_value(f):
"""Wrapper allowing modification of extension value.
Because the value is normally saved in a (double) serialised way, it's
not easily accessible to the member methods. This is made easier by
unpacking the extension value into an extra argument.
New value needs to be returned from the method.
"""
@functools.wraps(f)
def ext_value_filled(self, *args, **kwargs):
value = self._get_value()
kwargs['ext_value'] = value
# since some elements like NamedValue are pure value types, there is
# no interface to modify them and new versions have to be returned
value = f(self, *args, **kwargs)
self._set_value(value)
return ext_value_filled
class BasicConstraints(asn1_univ.Sequence):
"""Custom BasicConstraint implementation until pyasn1_modules is fixes."""
componentType = asn1_namedtype.NamedTypes(
asn1_namedtype.DefaultedNamedType('cA', asn1_univ.Boolean(False)),
asn1_namedtype.OptionalNamedType(
'pathLenConstraint',
asn1_univ.Integer().subtype(
subtypeSpec=asn1_constraint.ValueRangeConstraint(0, 64)))
)
class NameConstraints(asn1_univ.Sequence):
"""Custom NameConstraints implementation until pyasn1_modules is fixed."""
componentType = asn1_namedtype.NamedTypes(
asn1_namedtype.OptionalNamedType(
'permittedSubtrees',
rfc5280.GeneralSubtrees().subtype(
implicitTag=asn1_tag.Tag(asn1_tag.tagClassContext,
asn1_tag.tagFormatConstructed, 0))),
asn1_namedtype.OptionalNamedType(
'excludedSubtrees',
rfc5280.GeneralSubtrees().subtype(
implicitTag=asn1_tag.Tag(asn1_tag.tagClassContext,
asn1_tag.tagFormatConstructed, 1)))
)
class X509Extension(object):
"""Abstraction for the pyasn1 Extension structures.
The object should normally be constructed using `construct_extension`,
which will choose the right extension type based on the id.
Each extension has an immutable oid and a spec of the internal value
representation.
Unknown extension types can be still represented by the
X509Extension object and copied/serialised without understanding the
value details. The value will not be displayed properly in the logs
in the case.
"""
_oid = None
spec = None
"""An X509 V3 Certificate extension."""
def __init__(self, ext=None):
if ext is None:
if self.spec is None:
raise errors.X509Error("cannot create generic extension")
self._ext = rfc5280.Extension()
self._ext['extnID'] = self._oid
self._set_value(self._get_default_value())
else:
if not isinstance(ext, rfc5280.Extension):
raise errors.X509Error("extension has incorrect type")
self._ext = ext
@classmethod
def _get_default_value(cls):
# if there are any non-optional fields, this needs to be defined in
# the class
return cls.spec()
def __str__(self):
return "%s: %s" % (self.get_name(), self.get_value_as_str())
def get_value_as_str(self):
return "<unknown>"
def get_oid(self):
return self._ext['extnID']
def get_name(self):
"""Get the extension name as a python string."""
oid = self.get_oid()
return EXTENSION_NAMES.get(oid, oid)
def get_critical(self):
return self._ext['critical']
def set_critical(self, critical):
self._ext['critical'] = critical
def _get_value(self):
return decoder.decode(self._ext['extnValue'].asOctets(),
asn1Spec=self.spec())[0]
def _set_value(self, value):
if not isinstance(value, self.spec):
raise errors.X509Error("extension value has incorrect type")
self._ext['extnValue'] = encoder.encode(value)
def as_der(self):
return encoder.encode(self._ext)
def as_asn1(self):
return self._ext
class X509ExtensionBasicConstraints(X509Extension):
spec = BasicConstraints
_oid = rfc5280.id_ce_basicConstraints
@uses_ext_value
def get_ca(self, ext_value=None):
return bool(ext_value['cA'])
@modifies_ext_value
def set_ca(self, ca, ext_value=None):
ext_value['cA'] = ca
return ext_value
@uses_ext_value
def get_path_len_constraint(self, ext_value=None):
return ext_value['pathLenConstraint']
@modifies_ext_value
def set_path_len_constraint(self, length, ext_value=None):
ext_value['pathLenConstraint'] = length
return ext_value
def __str__(self):
return "basicConstraints: CA: %s, pathLen: %s" % (
str(self.get_ca()).upper(), self.get_path_len_constraint())
class X509ExtensionKeyUsage(X509Extension):
spec = rfc5280.KeyUsage
_oid = rfc5280.id_ce_keyUsage
fields = dict(spec.namedValues.namedValues)
inv_fields = dict((v, k) for k, v in spec.namedValues.namedValues)
@classmethod
def _get_default_value(cls):
# if there are any non-optional fields, this needs to be defined in
# the class
return cls.spec("''B")
@uses_ext_value
def get_usage(self, usage, ext_value=None):
usage = LONG_KEY_USAGE_NAMES.get(usage, usage)
pos = self.fields[usage]
if pos >= len(ext_value):
return False
return bool(ext_value[pos])
@uses_ext_value
def get_all_usages(self, ext_value=None):
return [self.inv_fields[i] for i, enabled in enumerate(ext_value)
if enabled]
@modifies_ext_value
def set_usage(self, usage, state, ext_value=None):
usage = LONG_KEY_USAGE_NAMES.get(usage, usage)
pos = self.fields[usage]
values = [x for x in ext_value]
if state:
while pos >= len(values):
values.append(0)
values[pos] = 1
else:
if pos < len(values):
values[pos] = 0
bits = ''.join(str(x) for x in values)
return self.spec("'%s'B" % bits)
def __str__(self):
return "keyUsage: " + ", ".join(self.get_all_usages())
class X509ExtensionSubjectAltName(X509Extension):
spec = rfc5280.SubjectAltName
_oid = rfc5280.id_ce_subjectAltName
@uses_ext_value
def get_dns_ids(self, ext_value=None):
dns_ids = []
for name in ext_value:
if name.getName() != 'dNSName':
continue
component = name.getComponent()
dns_id = component.asOctets().decode(component.encoding)
dns_ids.append(dns_id)
return dns_ids
@uses_ext_value
def get_ips(self, ext_value=None):
ips = []
for name in ext_value:
if name.getName() != 'iPAddress':
continue
ips.append(utils.asn1_to_netaddr(name.getComponent()))
return ips
@uses_ext_value
def has_unknown_entries(self, ext_value=None):
for name in ext_value:
if name.getName() not in ('dNSName', 'iPAddress'):
return True
return False
@modifies_ext_value
def add_dns_id(self, dns_id, validate=True, ext_value=None):
new_pos = len(ext_value)
ext_value[new_pos] = None
ext_value[new_pos]['dNSName'] = dns_id
return ext_value
@modifies_ext_value
def add_ip(self, ip, ext_value=None):
if not isinstance(ip, netaddr.IPAddress):
raise errors.X509Error("not a real ip address provided")
new_pos = len(ext_value)
ext_value[new_pos] = None
ext_value[new_pos]['iPAddress'] = utils.netaddr_to_asn1(ip)
return ext_value
@uses_ext_value
def __str__(self, ext_value=None):
entries = ["DNS:%s" % (x,) for x in self.get_dns_ids()]
entries += ["IP:%s" % (x,) for x in self.get_ips()]
return "subjectAltName: " + ", ".join(entries)
class X509ExtensionNameConstraints(X509Extension):
spec = NameConstraints
_oid = rfc5280.id_ce_nameConstraints
def _get_permitted(self, ext_value):
return ext_value['permittedSubtrees'] or []
def _get_excluded(self, ext_value):
return ext_value['excludedSubtrees'] or []
@uses_ext_value
def get_permitted_length(self, ext_value=None):
return len(self._get_permitted(ext_value))
@uses_ext_value
def get_permitted_name(self, n, ext_value=None):
name = self._get_permitted(ext_value)[n]['base']
return (name.getName(), name.getComponent())
@uses_ext_value
def get_permitted_range(self, n, ext_value=None):
entry = self._get_permitted(ext_value)[n]
return (entry['minimum'], entry['maximum'])
@uses_ext_value
def get_excluded_length(self, ext_value=None):
return len(self._get_excluded(ext_value))
@uses_ext_value
def get_excluded_name(self, n, ext_value=None):
name = self._get_excluded(ext_value)[n]['base']
return (name.getName(), name.getComponent())
@uses_ext_value
def get_excluded_range(self, n, ext_value=None):
entry = self._get_excluded(ext_value)[n]
return (entry['minimum'], entry['maximum'])
def _add_to_tree(self, ext_value, tree_name, position, name_type, name):
if ext_value[tree_name] is None:
ext_value[tree_name] = None
ext_value[tree_name][position] = None
ext_value[tree_name][position]['base'] = None
ext_value[tree_name][position]['base'][name_type] = name
ext_value[tree_name][position]['minimum'] = 0
# maximum should be missing (RFC5280/4.2.1.10)
@modifies_ext_value
def add_permitted(self, name_type, name, ext_value=None):
last = self.get_permitted_length()
self._add_to_tree(ext_value, 'permittedSubtrees', last,
name_type, name)
return ext_value
@modifies_ext_value
def add_excluded(self, name_type, name, ext_value=None):
last = self.get_excluded_length()
self._add_to_tree(ext_value, 'excludedSubtrees', last, name_type, name)
return ext_value
class X509ExtensionExtendedKeyUsage(X509Extension):
spec = rfc5280.ExtKeyUsageSyntax
_oid = rfc5280.id_ce_extKeyUsage
_valid = list(EXT_KEY_USAGE_NAMES.keys())
@uses_ext_value
def get_all_usages(self, ext_value=None):
return [usage for usage in ext_value]
@uses_ext_value
def get_usage(self, usage, ext_value=None):
if usage not in self._valid:
raise ValueError("usage not valid")
return (usage in ext_value)
@modifies_ext_value
def set_usage(self, usage, state, ext_value=None):
if usage not in self._valid:
raise ValueError("usage not valid")
if state:
if usage not in ext_value:
ext_value[len(ext_value)] = usage
else:
if usage in ext_value:
old = [x for x in ext_value if x != usage]
ext_value.clear()
for i, x in enumerate(old):
ext_value[i] = x
return ext_value
@uses_ext_value
def __str__(self, ext_value=None):
usages = [EXT_KEY_USAGE_NAMES.get(u) for u in ext_value]
return "extKeyUsage: " + ", ".join(usages)
class X509ExtensionAuthorityKeyId(X509Extension):
spec = rfc5280.AuthorityKeyIdentifier
_oid = rfc5280.id_ce_authorityKeyIdentifier
@uses_ext_value
def get_key_id(self, ext_value=None):
ki = ext_value['keyIdentifier']
if ki:
return ki.asOctets()
else:
return None
@uses_ext_value
def get_serial(self, ext_value=None):
return ext_value['authorityCertSerialNumber']
@modifies_ext_value
def set_key_id(self, key, ext_value=None):
# new extension, pyasn1 cannot remove values
new_ext = self.spec()
new_ext['keyIdentifier'] = key
return new_ext
@modifies_ext_value
def set_serial(self, serial, ext_value=None):
# new extension, pyasn1 cannot remove values
new_ext = self.spec()
new_ext['authorityCertSerialNumber'] = int(serial)
return new_ext
class X509ExtensionSubjectKeyId(X509Extension):
spec = rfc5280.SubjectKeyIdentifier
_oid = rfc5280.id_ce_subjectKeyIdentifier
@classmethod
def _get_default_value(cls):
return cls.spec(b"")
@uses_ext_value
def get_key_id(self, ext_value=None):
return ext_value.asOctets()
@modifies_ext_value
def set_key_id(self, key, ext_value=None):
return self.spec(key)
EXTENSION_CLASSES = {
rfc5280.id_ce_basicConstraints: X509ExtensionBasicConstraints,
rfc5280.id_ce_keyUsage: X509ExtensionKeyUsage,
rfc5280.id_ce_extKeyUsage: X509ExtensionExtendedKeyUsage,
rfc5280.id_ce_subjectAltName: X509ExtensionSubjectAltName,
rfc5280.id_ce_nameConstraints: X509ExtensionNameConstraints,
rfc5280.id_ce_authorityKeyIdentifier: X509ExtensionAuthorityKeyId,
rfc5280.id_ce_subjectKeyIdentifier: X509ExtensionSubjectKeyId,
}
def construct_extension(ext):
"""Construct an extension object of the right type.
While X509Extension can provide basic access to the extension elements,
it cannot parse details of extensions. This function detects which type
should be used based on the extension id.
If the type is unknown, generic X509Extension is used instead.
"""
if not isinstance(ext, rfc5280.Extension):
raise errors.X509Error("extension has incorrect type")
ext_class = EXTENSION_CLASSES.get(ext['extnID'], X509Extension)
return ext_class(ext)

172
anchor/X509/name.py

@ -1,172 +0,0 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
from pyasn1.codec.der import decoder
from pyasn1.codec.der import encoder
from pyasn1.type import error as asn1_error
from pyasn1.type import univ as asn1_univ
from anchor.asn1 import rfc5280
from anchor.X509 import errors
OID_commonName = rfc5280.id_at_commonName
OID_localityName = rfc5280.id_at_localityName
OID_stateOrProvinceName = rfc5280.id_at_stateOrProvinceName
OID_organizationName = rfc5280.id_at_organizationName
OID_organizationalUnitName = rfc5280.id_at_organizationalUnitName
OID_countryName = rfc5280.id_at_countryName
OID_pkcs9_emailAddress = rfc5280.id_emailAddress
OID_surname = rfc5280.id_at_surname
OID_givenName = rfc5280.id_at_givenName
name_oids = {
rfc5280.id_at_name: rfc5280.X520name,
rfc5280.id_at_surname: rfc5280.X520name,
rfc5280.id_at_givenName: rfc5280.X520name,
rfc5280.id_at_initials: rfc5280.X520name,
rfc5280.id_at_generationQualifier: rfc5280.X520name,
rfc5280.id_at_commonName: rfc5280.X520CommonName,
rfc5280.id_at_localityName: rfc5280.X520LocalityName,
rfc5280.id_at_stateOrProvinceName: rfc5280.X520StateOrProvinceName,
rfc5280.id_at_organizationName: rfc5280.X520OrganizationName,
rfc5280.id_at_organizationalUnitName: rfc5280.X520OrganizationalUnitName,
rfc5280.id_at_title: rfc5280.X520Title,
rfc5280.id_at_dnQualifier: rfc5280.X520dnQualifier,
rfc5280.id_at_countryName: rfc5280.X520countryName,
rfc5280.id_emailAddress: rfc5280.EmailAddress,
}
code_names = {
rfc5280.id_at_commonName: "CN",
rfc5280.id_at_localityName: "L",
rfc5280.id_at_stateOrProvinceName: "ST",
rfc5280.id_at_organizationName: "O",
rfc5280.id_at_organizationalUnitName: "OU",
rfc5280.id_at_countryName: "C",
rfc5280.id_at_givenName: "GN",
rfc5280.id_at_surname: "SN",
rfc5280.id_emailAddress: "emailAddress",
}
short_names = {
rfc5280.id_at_commonName: "commonName",
rfc5280.id_at_localityName: "localityName",
rfc5280.id_at_stateOrProvinceName: "stateOrProvinceName",
rfc5280.id_at_organizationName: "organizationName",
rfc5280.id_at_organizationalUnitName: "organizationalUnitName",
rfc5280.id_at_countryName: "countryName",
rfc5280.id_at_givenName: "givenName",
rfc5280.id_at_surname: "surname",
rfc5280.id_emailAddress: "emailAddress",
}
class X509Name(object):
"""An X509 Name object."""
class Entry():
"""An X509 Name sub-entry object."""
def __init__(self, obj):
self._obj = obj
def __str__(self):
return "%s: %s" % (self.get_name(), self.get_value())
def get_oid(self):
return self._obj[0]['type']
def get_name(self):
"""Get the name of this entry.
:return: entry name as a python string
"""
oid = self.get_oid()
return short_names.get(oid, str(oid))
def get_code(self):
"""Get the name of this entry.
:return: entry name as a python string
"""