Added logging. Debugged with ConfigDrive.

Signed-off-by: Pino de Candia <giuseppe.decandia@gmail.com>
This commit is contained in:
Pino de Candia 2017-11-20 21:47:15 +00:00
parent 2eeebc8978
commit 6075058e1d
4 changed files with 69 additions and 18 deletions

View File

@ -3,7 +3,7 @@ import models
from tatu.db.persistence import SQLAlchemySessionManager
def create_app(sa):
api = falcon.API(middleware=[sa])
api = falcon.API(middleware=[models.Logger(), sa])
api.add_route('/authorities', models.Authorities())
api.add_route('/authorities/{auth_id}', models.Authority())
api.add_route('/usercerts', models.UserCerts())

View File

@ -1,5 +1,6 @@
import falcon
import json
import logging
import uuid
from tatu.db import models as db
from Crypto.PublicKey import RSA
@ -31,6 +32,18 @@ def validate(req, resp, resource, params):
raise falcon.HTTPBadRequest('The POST/PUT request is missing a body.')
validate_uuids(req, params)
class Logger(object):
def __init__(self):
self.logger = logging.getLogger('gunicorn.error')
def process_response(self, req, resp, resource, params):
self.logger.debug(
'Request {0} {1} with body {2} produced'
'response with status {3} location {4} and body {5}'.format(
req.method, req.relative_uri,
req.body if hasattr(req, 'body') else 'None',
resp.status, resp.location, resp.body))
class Authorities(object):
@falcon.before(validate)
@ -129,7 +142,7 @@ class HostCert(object):
'host_id': host.host_id,
'fingerprint': host.fingerprint,
'auth_id': host.auth_id,
'key-cert.pub': host.pubkey,
'key-cert.pub': host.cert,
}
resp.body = json.dumps(body)
resp.status = falcon.HTTP_OK

View File

@ -50,7 +50,7 @@ def createUserCert(session, user_id, auth_id, pub):
# Retrieve the authority's private key and generate the certificate
auth = getAuthority(session, auth_id)
if auth is None:
raise falcon.HTTPNotFound()
raise falcon.HTTPNotFound(description='No Authority found with that ID')
fingerprint = sshpubkeys.SSHKey(pub).hash()
certRecord = session.query(UserCert).get([user_id, fingerprint])
if certRecord is not None:
@ -74,7 +74,7 @@ class Token(Base):
token_id = sa.Column(sa.String(36), primary_key=True,
default=generate_uuid)
auth_id = sa.Column(sa.String(36), sa.ForeignKey('authorities.auth_id'))
host_id = sa.Column(sa.String(36))
host_id = sa.Column(sa.String(36), index=True, unique=True)
hostname = sa.Column(sa.String(36))
used = sa.Column(sa.Boolean, default=False)
date_used = sa.Column(sa.DateTime, default=datetime.min)
@ -84,7 +84,15 @@ def createToken(session, host_id, auth_id, hostname):
# Validate the certificate authority
auth = getAuthority(session, auth_id)
if auth is None:
raise falcon.HTTPNotFound()
raise falcon.HTTPNotFound(description='No Authority found with that ID')
#Check whether a token was already created for this host_id
try:
token = session.query(Token).filter(Token.host_id == host_id).one()
if token is not None:
return token
except:
pass
token = Token(host_id=host_id,
auth_id=auth_id,
hostname=hostname)
@ -109,15 +117,26 @@ def getHostCert(session, host_id, fingerprint):
def createHostCert(session, token_id, host_id, pub):
token = session.query(Token).get(token_id)
if token is None:
raise falcon.HTTPNotFound()
if token.used:
raise falcon.HTTPForbidden(description='The presented token was previously used')
raise falcon.HTTPNotFound(description='No Token found with that ID')
if token.host_id != host_id:
raise falcon.HTTPForbidden(description='The token is not valid for this instance ID')
raise falcon.HTTPConflict(description='The token is not valid for this instance ID')
fingerprint = sshpubkeys.SSHKey(pub).hash()
if token.used:
if token.fingerprint_used != fingerprint:
raise falcon.HTTPConflict(description='The token was previously used with a different public key')
# The token was already used for same host and pub key. Return record.
host = session.query(HostCert).get([host_id, fingerprint])
if host is None:
raise falcon.HTTPInternalServerError(
description='The token was used, but no corresponding Host record was found.')
if host.token_id == token_id:
return host
raise falcon.HTTPConflict(description='The presented token was previously used')
auth = getAuthority(session, token.auth_id)
if auth is None:
raise falcon.HTTPNotFound()
fingerprint = sshpubkeys.SSHKey(pub).hash()
raise falcon.HTTPNotFound(description='No Authority found with that ID')
certRecord = session.query(HostCert).get([host_id, fingerprint])
if certRecord is not None:
raise falcon.HTTPConflict('This public key is already signed.')
@ -128,6 +147,7 @@ def createHostCert(session, token_id, host_id, pub):
fingerprint=fingerprint,
auth_id=token.auth_id,
token_id=token_id,
pubkey = pub,
cert=cert,
hostname=token.hostname)
session.add(host)

View File

@ -289,6 +289,21 @@ def test_post_token_and_host(client):
assert location[2] == host_id
assert location[3] == sshpubkeys.SSHKey(host_pub_key).hash()
@pytest.mark.dependency(depends=['test_post_token_and_host'])
def test_post_token_same_host_id(client):
# Posting with the same host ID should return the same token
token = token_request()
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
)
assert response.status == falcon.HTTP_CREATED
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'hosttokens'
# The token id should be the same as that from the previous test.
assert token_id == location_path[-1]
@pytest.mark.dependency(depends=['test_post_token_and_host'])
def test_get_host(client):
response = client.simulate_get('/hostcerts/' + host_id + '/' + host_fingerprint)
@ -311,7 +326,7 @@ def test_get_host_with_bad_uuid(client):
assert response.status == falcon.HTTP_BAD_REQUEST
def test_post_token_unknown_auth(client):
token = token_request(random_uuid())
token = token_request(auth=random_uuid())
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
@ -330,7 +345,7 @@ def test_post_host_with_bogus_token(client):
@pytest.mark.dependency(depends=['test_post_token_and_host'])
def test_post_host_with_wrong_host_id(client):
# Get a new token for the same host_id as the base test.
token = token_request()
token = token_request(host=random_uuid())
response = client.simulate_post(
'/hosttokens',
body=json.dumps(token)
@ -348,11 +363,12 @@ def test_post_host_with_wrong_host_id(client):
'/hostcerts',
body=json.dumps(host)
)
assert response.status == falcon.HTTP_CONFLICT
@pytest.mark.dependency(depends=['test_post_token_and_host'])
def test_post_host_same_public_key_fails(client):
# Use a new token compared to the test this depends on.
# Show that using the same host ID and public key fails.
def test_post_host_different_public_key_fails(client):
# Use the same token compared to the test this depends on.
# Show that using the same host ID and different public key fails.
token = token_request()
response = client.simulate_post(
'/hosttokens',
@ -362,7 +378,9 @@ def test_post_host_same_public_key_fails(client):
assert 'location' in response.headers
location_path = response.headers['location'].split('/')
assert location_path[1] == 'hosttokens'
host = host_request(location_path[-1])
key = RSA.generate(2048)
pub_key = key.publickey().exportKey('OpenSSH')
host = host_request(location_path[-1], pub_key=pub_key)
response = client.simulate_post(
'/hostcerts',
body=json.dumps(host)
@ -380,4 +398,4 @@ def test_post_host_with_used_token(client):
'/hostcerts',
body=json.dumps(host)
)
assert response.status == falcon.HTTP_FORBIDDEN
assert response.status == falcon.HTTP_CONFLICT