monasca-log-api python

- single log message with rest api
- parsing / validation for data
- configuration
- bootstrapping
- tox
- unit tests

Change-Id: I7386b3500ee9097383a573bf915da55ce2ff881f
This commit is contained in:
Tomasz Trębski 2015-09-09 23:21:30 +02:00 committed by Tomasz Trębski
parent c798c2cc54
commit e921fd506c
38 changed files with 2393 additions and 0 deletions

1
.gitignore vendored
View File

@ -7,6 +7,7 @@ cover
.coverage
*.egg
*.egg-info
.eggs/
.testrepository
.tox
ChangeLog

View File

@ -76,3 +76,68 @@ Requests flow through the following architectural layers from top to bottom:
## Documentation
* API Specification: [/docs/monasca-log-api-spec.md](/docs/monasca-log-api-spec.md).
## Python monasca-log-api implementation
To install the python api implementation, git clone the source and run the
following command::
```sh
sudo python setup.py install
```
If it installs successfully, you will need to make changes to the following
two files to reflect your system settings, especially where kafka server is
located::
```sh
/etc/monasca/log-api.conf
/etc/monasca/log-api.ini
```
Once the configurations are modified to match your environment, you can start
up the server by following the following instructions.
To start the server, run the following command:
Running the server in foreground mode
```sh
gunicorn -k eventlet --worker-connections=2000 --backlog=1000
--paste /etc/monasca/log-api.ini
```
Running the server as daemons
```sh
gunicorn -k eventlet --worker-connections=2000 --backlog=1000
--paste /etc/monasca/log-api.ini -D
```
To check if the code follows python coding style, run the following command
from the root directory of this project
```sh
tox -e pep8
```
To run all the unit test cases, run the following command from the root
directory of this project
```sh
tox -e py27 (or -e py26, -e py33)
```
# License
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,42 @@
[DEFAULT]
# logging, make sure that the user under whom the server runs has permission
# to write to the directory.
log_file = monasca-log-api.log
log_dir = .
debug=True
# Dispatchers to be loaded to serve restful APIs
[dispatcher]
logs = monasca_log_api.v1.reference.logs:Logs
versions = monasca_log_api.v1.reference.versions:Versions
[dispatcher]
driver = v1_reference
[service]
region = 'pl'
[kafka]
client_id = 'monasca-log-api'
timeout = 60
host = 'localhost:8900'
[kafka_producer]
batch_send_every_n = 10
async = True
ack_timeout = 1000
req_acks = 'Local'
[log_publisher]
topics = 'logs'
[keystone_authtoken]
identity_uri = http://192.168.10.5:35357
auth_uri = http://192.168.10.5:5000
admin_password = admin
admin_user = admin
admin_tenant_name = admin
cafile =
certfile =
keyfile =
insecure = false

View File

@ -0,0 +1,21 @@
[DEFAULT]
name = monasca_log_api
[pipeline:main]
pipeline = auth keystonecontext api
[app:api]
paste.app_factory = monasca_log_api.server:launch
[filter:auth]
paste.filter_factory = keystonemiddleware.auth_token:filter_factory
[filter:keystonecontext]
paste.filter_factory = monasca_log_api.middleware.keystone_context_filter:filter_factory
[server:main]
use = egg:gunicorn#main
host = 127.0.0.1
port = 8082
workers = 1
proc_name = monasca_log_api

2
monasca_log_api/.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
monasca-log-api.log
venv/

View File

View File

View File

@ -0,0 +1,36 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
HTTP_422 = '422 Unprocessable Entity'
class HTTPUnprocessableEntity(falcon.OptionalRepresentation, falcon.HTTPError):
"""HTTPUnprocessableEntity http error
HTTPError that comes with '422 Unprocessable Entity' status
Args:
message(str) - meaningful description of what caused an error
"""
def __init__(self, message, **kwargs):
falcon.HTTPError.__init__(self,
HTTP_422,
'unprocessable_entity',
message,
**kwargs
)

View File

@ -0,0 +1,23 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
LogApiHeader = collections.namedtuple('LogApiHeader', ['name', 'is_required'])
X_TENANT_ID = LogApiHeader(name='X-Tenant-Id', is_required=False)
X_ROLES = LogApiHeader(name='X-Roles', is_required=False)
X_APPLICATION_TYPE = LogApiHeader(name='X-Application-Type', is_required=False)
X_DIMENSIONS = LogApiHeader(name='X_Dimensions', is_required=False)

View File

@ -0,0 +1,30 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
LOG = log.getLogger(__name__)
MONITORING_DELEGATE_ROLE = 'monitoring-delegate'
class LogsApi(object):
def __init__(self):
super(LogsApi, self).__init__()
LOG.info('Initializing LogsApi!')
def on_post(self, req, res):
res.status = falcon.HTTP_501

View File

@ -0,0 +1,28 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
LOG = log.getLogger(__name__)
class VersionsAPI(object):
def __init__(self):
super(VersionsAPI, self).__init__()
LOG.info('Initializing VersionsAPI!')
def on_get(self, req, res, version_id):
res.status = falcon.HTTP_501

View File

@ -0,0 +1,16 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
ENCODING = 'utf8'

View File

View File

@ -0,0 +1,83 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""RequestContext: context for requests that persist through monasca."""
import uuid
from oslo_log import log
from oslo_utils import timeutils
LOG = log.getLogger(__name__)
class RequestContext(object):
"""Security context and request information.
Represents the user taking a given action within the system.
"""
def __init__(self, user_id, project_id, domain_id=None, domain_name=None,
roles=None, timestamp=None, request_id=None,
auth_token=None, user_name=None, project_name=None,
service_catalog=None, user_auth_plugin=None, **kwargs):
"""Creates the Keystone Context. Supports additional parameters:
:param user_auth_plugin:
The auth plugin for the current request's authentication data.
:param kwargs:
Extra arguments that might be present
"""
if kwargs:
LOG.warning(
'Arguments dropped when creating context: %s') % str(kwargs)
self._roles = roles or []
self.timestamp = timeutils.utcnow()
if not request_id:
request_id = self.generate_request_id()
self._request_id = request_id
self._auth_token = auth_token
self._service_catalog = service_catalog
self._domain_id = domain_id
self._domain_name = domain_name
self._user_id = user_id
self._user_name = user_name
self._project_id = project_id
self._project_name = project_name
self._user_auth_plugin = user_auth_plugin
def to_dict(self):
return {'user_id': self._user_id,
'project_id': self._project_id,
'domain_id': self._domain_id,
'domain_name': self._domain_name,
'roles': self._roles,
'timestamp': timeutils.strtime(self._timestamp),
'request_id': self._request_id,
'auth_token': self._auth_token,
'user_name': self._user_name,
'service_catalog': self._service_catalog,
'project_name': self._project_name,
'user': self._user}
def generate_request_id(self):
return b'req-' + str(uuid.uuid4()).encode('ascii')

View File

@ -0,0 +1,109 @@
# Copyright (c) 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
from oslo_middleware import request_id
from oslo_serialization import jsonutils
from monasca_log_api.middleware import context
LOG = log.getLogger(__name__)
def filter_factory(global_conf, **local_conf):
def validator_filter(app):
return KeystoneContextFilter(app, local_conf)
return validator_filter
class KeystoneContextFilter(object):
"""Make a request context from keystone headers."""
def __init__(self, app, conf):
self._app = app
self._conf = conf
def __call__(self, env, start_response):
LOG.debug("Creating Keystone Context Object.")
user_id = env.get('HTTP_X_USER_ID', env.get('HTTP_X_USER'))
if user_id is None:
msg = "Neither X_USER_ID nor X_USER found in request"
LOG.error(msg)
raise falcon.HTTPUnauthorized(title='Forbidden', description=msg)
roles = self._get_roles(env)
project_id = env.get('HTTP_X_PROJECT_ID')
project_name = env.get('HTTP_X_PROJECT_NAME')
domain_id = env.get('HTTP_X_DOMAIN_ID')
domain_name = env.get('HTTP_X_DOMAIN_NAME')
user_name = env.get('HTTP_X_USER_NAME')
req_id = env.get(request_id.ENV_REQUEST_ID)
# Get the auth token
auth_token = env.get('HTTP_X_AUTH_TOKEN',
env.get('HTTP_X_STORAGE_TOKEN'))
service_catalog = None
if env.get('HTTP_X_SERVICE_CATALOG') is not None:
try:
catalog_header = env.get('HTTP_X_SERVICE_CATALOG')
service_catalog = jsonutils.loads(catalog_header)
except ValueError:
msg = "Invalid service catalog json."
LOG.error(msg)
raise falcon.HTTPInternalServerError(msg)
# NOTE(jamielennox): This is a full auth plugin set by auth_token
# middleware in newer versions.
user_auth_plugin = env.get('keystone.token_auth')
# Build a context
ctx = context.RequestContext(user_id,
project_id,
user_name=user_name,
project_name=project_name,
domain_id=domain_id,
domain_name=domain_name,
roles=roles,
auth_token=auth_token,
service_catalog=service_catalog,
request_id=req_id,
user_auth_plugin=user_auth_plugin)
env['monasca.context'] = ctx
LOG.debug("Keystone Context successfully created.")
return self._app(env, start_response)
def _get_roles(self, env):
"""Get the list of roles."""
if 'HTTP_X_ROLES' in env:
roles = env.get('HTTP_X_ROLES', '')
else:
# Fallback to deprecated role header:
roles = env.get('HTTP_X_ROLE', '')
if roles:
LOG.warning(
'Sourcing roles from deprecated X-Role HTTP header')
return [r.strip() for r in roles.split(',')]

View File

View File

@ -0,0 +1,50 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ExceptionWrapper(Exception):
"""Wrapper around exception with custom message.
ExceptionWrapper provides a way to keep old
exceptions but providing to describe the context of error
occurrence.
"""
def __init__(self, message, caught=None):
self._message = message
self._caught = caught
@property
def caught(self):
return self._caught
@property
def message(self):
return self._message
def __str__(self):
return '%s <message=%s, caught=%s>' % (
self.__class__.__name__,
repr(self.message),
repr(self.caught)
)
class PublisherInitException(ExceptionWrapper):
pass
class MessageQueueException(ExceptionWrapper):
pass

View File

@ -0,0 +1,220 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from kafka import client
from kafka import common
from kafka import conn
from kafka import producer
from kafka import protocol
from oslo_config import cfg
from oslo_log import log
import simport
from monasca_log_api.publisher import exceptions
from monasca_log_api.publisher import publisher
LOG = log.getLogger(__name__)
CONF = cfg.CONF
ACK_MAP = {
'None': producer.KeyedProducer.ACK_NOT_REQUIRED,
'Local': producer.KeyedProducer.ACK_AFTER_LOCAL_WRITE,
'Server': producer.KeyedProducer.ACK_AFTER_CLUSTER_COMMIT
}
kafka_opts = [
cfg.StrOpt('client_id',
default=None,
help='Client Id',
required=True),
cfg.IntOpt('timeout',
default=conn.DEFAULT_SOCKET_TIMEOUT_SECONDS,
help='Socket timeout'),
cfg.StrOpt('host',
default=None,
help='List of hosts, comma delimited',
required=True)
]
kafka_group = cfg.OptGroup(name='kafka', title='kafka')
kafka_producer_opts = [
cfg.IntOpt('batch_send_every_n',
default=None,
help='Send every n items'),
cfg.IntOpt('batch_send_every_t',
default=None,
help='Send every n seconds'),
cfg.BoolOpt('async',
default=False,
help='Async communication'),
cfg.StrOpt('reg_acks',
default='None',
help='Acknowledge options'),
cfg.StrOpt('partitioner',
default=None,
help='Partitioner algorithm')
]
kafka_producer_group = cfg.OptGroup(name='kafka_producer',
title='kafka_producer')
CONF.register_group(kafka_group)
CONF.register_opts(kafka_opts, kafka_group)
CONF.register_group(kafka_producer_group)
CONF.register_opts(kafka_producer_opts, kafka_producer_group)
class KafkaPublisher(publisher.Publisher):
def __init__(self, max_retry=3, wait_time=None):
self._producer_conf = None
self._client_conf = None
self._producer = None
self._client = None
self._max_retry = max_retry
self._wait_time = wait_time
LOG.info('Initializing KafkaPublisher <%s>' % self)
def _get_client_conf(self):
if self._client_conf:
return self._client_conf
client_conf = {
'hosts': CONF.kafka.host,
'client_id': CONF.kafka.client_id,
'timeout': CONF.kafka.timeout
}
self._client_conf = client_conf
return client_conf
def _get_producer_conf(self):
if self._producer_conf:
return self._producer_conf
batch_send_every_n = CONF.kafka_producer.batch_send_every_n
batch_send_every_t = CONF.kafka_producer.batch_send_every_t
partitioner = CONF.kafka_producer.partitioner
producer_conf = {
'codec': protocol.CODEC_GZIP,
'batch_send': batch_send_every_n or batch_send_every_t,
'async': CONF.kafka_producer.async,
'reg_acks': ACK_MAP[CONF.kafka_producer.reg_acks]
}
if batch_send_every_t:
producer_conf.update(
{'batch_send_every_t': batch_send_every_t})
if batch_send_every_n:
producer_conf.update(
{'batch_send_every_n': batch_send_every_n})
if partitioner:
partitioner = simport.load(partitioner)
producer_conf.update({'partitioner': partitioner})
self._producer_conf = producer_conf
return producer_conf
def _init_client(self):
if self._client:
self._client.close()
self._producer = None
self._client = None
client_opts = self._get_client_conf()
for i in range(self._max_retry):
kafka_host = client_opts['hosts']
try:
self._client = client.KafkaClient(
client_opts['hosts'],
client_opts['client_id'],
client_opts['timeout']
)
if self._wait_time:
time.sleep(self._wait_time)
break
except common.KafkaUnavailableError as ex:
LOG.error('Server is down at <host="%s">' % kafka_host)
err = ex
except common.LeaderNotAvailableError as ex:
LOG.error('No leader at <host="%s">.' % kafka_host)
err = ex
except Exception as ex:
LOG.error('Initialization failed at <host="%s">.' % kafka_host)
err = ex
if err:
raise err
def _init_producer(self):
try:
if not self._client:
self._init_client()
producer_opts = self._get_producer_conf()
producer_opts.update({'client': self._client})
self._producer = producer.KeyedProducer(*producer_opts)
if self._wait_time:
time.sleep(self._wait_time)
except Exception as ex:
self._producer = None
LOG.exception(ex.message, exc_info=1)
raise exceptions.PublisherInitException(
message='KeyedProducer can not be created at <host="%s>"'
% self._client_conf['host'],
caught=ex)
def send_message(self, topic, key, message):
if not message or not key or not topic:
return
try:
if not self._producer:
self._init_producer()
return self._producer.send(topic, key, message)
except (common.KafkaUnavailableError,
common.LeaderNotAvailableError) as ex:
self._client = None
LOG.error(ex.message, exc_info=1)
raise exceptions.MessageQueueException(
message='Failed to post message to kafka',
caught=ex
)
except Exception as ex:
LOG.error(ex.message, exc_info=1)
raise exceptions.MessageQueueException(
message='Unknown error while sending message to kafka',
caught=ex
)
# TODO(question) How to ensure that connection will be closed when program
# stops ?
def close(self):
if self._client:
self._producer = None
self._client.close()
def __repr__(self):
return 'KafkaPublisher <host=%s>' % (
self._client_conf['hosts'] if self._client_conf else None
)

View File

@ -0,0 +1,29 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Publisher(object):
@abc.abstractmethod
def send_message(self, *args, **kwargs):
return
@abc.abstractmethod
def close(self):
return

88
monasca_log_api/server.py Normal file
View File

@ -0,0 +1,88 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from wsgiref import simple_server
import falcon
from oslo_config import cfg
from oslo_log import log
import paste.deploy
import simport
LOG = log.getLogger(__name__)
CONF = cfg.CONF
dispatcher_opts = [
cfg.StrOpt('versions',
default=None,
help='Versions'),
cfg.StrOpt('logs',
default=None,
help='Logs')
]
dispatcher_group = cfg.OptGroup(name='dispatcher', title='dispatcher')
CONF.register_group(dispatcher_group)
CONF.register_opts(dispatcher_opts, dispatcher_group)
def launch(conf, config_file='/etc/monasca/log-api-config.conf'):
if conf and 'config_file' in conf:
config_file = conf.get('config_file')
log.register_options(CONF)
log.set_defaults()
CONF(args=[],
project='monasca_log_api',
default_config_files=[config_file])
log.setup(CONF, 'monasca_log_api')
app = falcon.API()
load_versions_resource(app)
load_logs_resource(app)
LOG.debug('Dispatcher drivers have been added to the routes!')
return app
def load_logs_resource(app):
logs = simport.load(CONF.dispatcher.logs)()
app.add_route('/v1.0/logs/single', logs)
def load_versions_resource(app):
versions = simport.load(CONF.dispatcher.versions)()
app.add_route("/", versions)
app.add_route("/{version_id}", versions)
if __name__ == '__main__':
base_path = '%s/..' % os.getcwd()
global_conf = {'config_file': (
'%s/%s' % (base_path, '/etc/monasca/log-api-config.conf'))}
wsgi_app = (
paste.deploy.loadapp(
'config:etc/monasca/log-api-config.ini',
relative_to=base_path,
global_conf=global_conf
)
)
httpd = simple_server.make_server('127.0.0.1', 8080, wsgi_app)
httpd.serve_forever()

View File

View File

@ -0,0 +1,20 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import fixture
def mock_config(test):
return test.useFixture(fixture.Config())

View File

@ -0,0 +1,254 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from falcon import testing
from kafka import common
import mock
from monasca_log_api.publisher import exceptions
from monasca_log_api.publisher import kafka_publisher as publisher
from monasca_log_api.tests import base as base_test
TOPIC = 'test'
class MockKafkaClient(object):
pass
class TestConfiguration(testing.TestBase):
def setUp(self):
self.conf = base_test.mock_config(self)
self.instance = publisher.KafkaPublisher()
super(TestConfiguration, self).setUp()
def test_should_not_have_client_conf_at_begin(self):
self.assertIsNone(self.instance._client_conf)
def test_should_not_have_producer_conf_at_begin(self):
self.assertIsNone(self.instance._producer_conf)
def test_should_init_producer_conf(self):
self.instance._get_producer_conf()
self.assertIsNotNone(self.instance._producer_conf)
def test_should_init_client_conf(self):
self.instance._get_client_conf()
self.assertIsNotNone(self.instance._client_conf)
class TestInitialization(testing.TestBase):
def setUp(self):
self.conf = base_test.mock_config(self)
super(TestInitialization, self).setUp()
def test_should_have_init_client_not_set(self):
instance = publisher.KafkaPublisher()
self.assertIsNone(instance._client)
def test_should_have_init_producer_not_set(self):
instance = publisher.KafkaPublisher()
self.assertIsNone(instance._producer)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
side_effect=[common.KafkaUnavailableError])
def test_client_should_fail_kafka_unavailable(self, kafka_client):
instance = publisher.KafkaPublisher()
self.assertRaises(
common.KafkaUnavailableError,
instance._init_client
)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
side_effect=[common.LeaderNotAvailableError])
def test_client_should_fail_leader_unavailable(self, kafka_client):
instance = publisher.KafkaPublisher()
self.assertRaises(
common.LeaderNotAvailableError,
instance._init_client
)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
side_effect=[ValueError])
def test_client_should_fail_other_error(self, kafka_client):
instance = publisher.KafkaPublisher()
self.assertRaises(
ValueError,
instance._init_client
)
@mock.patch('monasca_log_api.publisher.kafka_publisher.client.KafkaClient',
autospec=True)
def test_client_should_initialize(self, kafka_client):
client_id = 'mock_client'
timeout = 3600
hosts = 'localhost:666'
self.conf.config(
client_id=client_id,
timeout=timeout,
host=hosts,
group='kafka'
)
instance = publisher.KafkaPublisher()
instance._init_client()
self.assertIsNotNone(instance._client)
kafka_client.assert_called_with(hosts, client_id, timeout)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
side_effect=[ValueError])
def test_producer_should_fail_any_error(self, producer):
instance = publisher.KafkaPublisher()
instance._client = MockKafkaClient()
instance._client_conf = {
'host': 'localhost'
}
self.assertRaises(
exceptions.PublisherInitException,
instance._init_producer
)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_producer_should_initialize(self, producer):
instance = publisher.KafkaPublisher()
client = MockKafkaClient()
instance._client = client
instance._get_producer_conf = mock.Mock(return_value={})
instance._init_producer()
self.assertIsNotNone(instance._producer)
self.assertTrue(producer.called)
class TestLogic(unittest.TestCase):
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_not_call_producer_for_empty_key(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
instance.send_message(TOPIC, None, 'msg')
self.assertFalse(producer.send.called)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_not_call_producer_for_empty_message(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
instance.send_message(TOPIC, 'key', None)
self.assertFalse(producer.send.called)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_not_call_producer_for_empty_topic(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
instance.send_message(None, 'key', 'msg')
self.assertFalse(producer.send.called)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_fail_kafka_not_available(self, producer):
producer.send.side_effect = [common.KafkaUnavailableError]
instance = publisher.KafkaPublisher()
instance._producer = producer
instance._client = mock.Mock('client')
with self.assertRaises(exceptions.MessageQueueException) as context:
instance.send_message('a', 'b', 'c')
self.assertEqual('Failed to post message to kafka',
context.exception.message)
self.assertIsInstance(context.exception.caught,
common.KafkaUnavailableError)
self.assertIsNone(instance._client)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_fail_leader_not_available(self, producer):
producer.send.side_effect = [common.LeaderNotAvailableError]
instance = publisher.KafkaPublisher()
instance._producer = producer
instance._client = mock.Mock('client')
with self.assertRaises(exceptions.MessageQueueException) as context:
instance.send_message('a', 'b', 'c')
self.assertEqual('Failed to post message to kafka',
context.exception.message)
self.assertIsInstance(context.exception.caught,
common.LeaderNotAvailableError)
self.assertIsNone(instance._client)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_fail_any_error(self, producer):
producer.send.side_effect = [Exception]
instance = publisher.KafkaPublisher()
instance._producer = producer
with self.assertRaises(exceptions.MessageQueueException) as context:
instance.send_message('a', 'b', 'c')
self.assertEqual('Unknown error while sending message to kafka',
context.exception.message)
self.assertIsInstance(context.exception.caught,
Exception)
@mock.patch('monasca_log_api.publisher.kafka_publisher'
'.producer.KeyedProducer',
autospec=True)
def test_should_send_message(self, producer):
producer.send.return_value = None
instance = publisher.KafkaPublisher()
instance._producer = producer
msg = 'msg'
key = 'key'
instance.send_message(TOPIC, key, msg)
producer.send.assert_called_once_with(TOPIC, key, msg)

View File

@ -0,0 +1,180 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from falcon import testing
import mock
import simplejson
from monasca_log_api.publisher import exceptions
from monasca_log_api.tests import base
from monasca_log_api.v1.common import log_publisher
from monasca_log_api.v1.common import service
class TestBuildKey(unittest.TestCase):
def test_should_return_empty_for_none_params(self):
self.assertFalse(log_publisher.LogPublisher._build_key(None, None))
def test_should_return_tenant_id_for_tenant_id_defined_1(self):
tenant_id = 'monasca'
self.assertEqual(
tenant_id,
log_publisher.LogPublisher._build_key(tenant_id, None)
)
def test_should_return_tenant_id_for_tenant_id_defined_2(self):
tenant_id = 'monasca'
self.assertEqual(tenant_id,
log_publisher.LogPublisher._build_key(tenant_id, {}))
def test_should_return_ok_key_1(self):
# Evaluates if key matches value for defined tenant_id and
# application_type
tenant_id = 'monasca'
application_type = 'monasca-log-api'
log_object = {
'application_type': application_type
}
expected_key = tenant_id + application_type
self.assertEqual(expected_key,
log_publisher.LogPublisher._build_key(tenant_id,
log_object))
def test_should_return_ok_key_2(self):
# Evaluates if key matches value for defined tenant_id and
# application_type and single dimension
tenant_id = 'monasca'
application_type = 'monasca-log-api'
dimension = service.Dimension('cpu_time', 50)
log_object = {
'application_type': application_type,
'dimensions': [dimension]
}
expected_key = tenant_id + application_type + dimension.name + str(
dimension.value)
self.assertEqual(expected_key,
log_publisher.LogPublisher._build_key(tenant_id,
log_object))
def test_should_return_ok_key_3(self):
# Evaluates if key matches value for defined tenant_id and
# application_type and two dimensions dimensions given unsorted
tenant_id = 'monasca'
application_type = 'monasca-log-api'
dimension_1 = service.Dimension('disk_usage', 50)
dimension_2 = service.Dimension('cpu_time', 50)
log_object = {
'application_type': application_type,
'dimensions': [dimension_1, dimension_2]
}
expected_key = ''.join([tenant_id, application_type, dimension_2.name,
str(dimension_2.value), dimension_1.name,
str(dimension_1.value)])
self.assertEqual(expected_key,
log_publisher.LogPublisher._build_key(tenant_id,
log_object))
class TestSendMessage(testing.TestBase):
def setUp(self):
self.conf = base.mock_config(self)
return super(TestSendMessage, self).setUp()
def test_should_not_send_empty_message(self):
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock()
instance.send_message({})
self.assertFalse(instance._kafka_publisher.send_message.called)
def test_should_raise_exception(self):
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock(
side_effect=[exceptions.MessageQueueException(1, 1)]
)
msg = {
'log': {
'message': 1
},
'meta': {
'tenantId': 1
}
}
self.assertRaises(exceptions.MessageQueueException,
instance.send_message, msg)
def test_should_send_message(self):
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock(name='send_message',
return_value={})
instance._build_key = mock.Mock(name='_build_key',
return_value='some_key')
msg = {
'log': {
'message': 1,
'application_type': 'monasca_log_api',
'dimensions': [service.Dimension('disk_usage', 50),
service.Dimension('cpu_time', 50)]
},
'meta': {
'tenantId': 1
}
}
instance.send_message(msg)
instance._kafka_publisher.send_message.assert_called_once_with(
self.conf.conf.log_publisher.topics[0],
'some_key',
simplejson.dumps(msg))
def test_should_send_message_multiple_topics(self):
topics = ['logs', 'analyzer', 'tester']
self.conf.config(topics=topics, group='log_publisher')
instance = log_publisher.LogPublisher()
instance._kafka_publisher.send_message = mock.Mock(name='send_message',
return_value={})
instance._build_key = mock.Mock(name='_build_key',
return_value='some_key')
msg = {
'log': {
'message': 1,
'application_type': 'monasca_log_api',
'dimensions': [service.Dimension('disk_usage', 50),
service.Dimension('cpu_time', 50)]
},
'meta': {
'tenantId': 1
}
}
json_msg = simplejson.dumps(msg)
instance.send_message(msg)
self.assertEqual(len(topics),
instance._kafka_publisher.send_message.call_count)
for topic in topics:
instance._kafka_publisher.send_message.assert_any_call(
topic,
'some_key',
json_msg)

View File

@ -0,0 +1,154 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from falcon import testing
import mock
from monasca_log_api.api import exceptions as log_api_exceptions
from monasca_log_api.api import headers
from monasca_log_api.api import logs_api
from monasca_log_api.tests import base
from monasca_log_api.v1.reference import logs
class TestLogs(testing.TestBase):
def before(self):
self.conf = base.mock_config(self)
self.logs_resource = logs.Logs()
self.api.add_route(
'/logs/single',
self.logs_resource
)
def test_should_fail_not_delegate_ok_cross_tenant_id(self):
self.simulate_request(
'/logs/single',
method='POST',
query_string='tenant_id=1',
headers={
'Content-Type': 'application/json'
}
)
self.assertEqual(falcon.HTTP_403, self.srmock.status)
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
def test_should_pass_empty_cross_tenant_id_wrong_role(self,
log_creator,
kafka_publisher):
log_creator.configure_mock(**{'new_log.return_value': None,
'new_log_envelope.return_value': None})
kafka_publisher.configure_mock(**{'send_message.return_value': None})
self.logs_resource._log_creator = log_creator
self.logs_resource._kafka_publisher = kafka_publisher
self.simulate_request(
'/logs/single',
method='POST',
headers={
headers.X_ROLES.name: 'some_role',
headers.X_DIMENSIONS.name: 'a:1',
'Content-Type': 'application/json'
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual(1, kafka_publisher.send_message.call_count)
self.assertEqual(1, log_creator.new_log.call_count)
self.assertEqual(1, log_creator.new_log_envelope.call_count)
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
def test_should_pass_empty_cross_tenant_id_ok_role(self,
log_creator,
kafka_publisher):
log_creator.configure_mock(**{'new_log.return_value': None,
'new_log_envelope.return_value': None})
kafka_publisher.configure_mock(**{'send_message.return_value': None})
self.logs_resource._log_creator = log_creator
self.logs_resource._kafka_publisher = kafka_publisher
self.simulate_request(
'/logs/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: 'a:1',
'Content-Type': 'application/json'
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual(1, kafka_publisher.send_message.call_count)
self.assertEqual(1, log_creator.new_log.call_count)
self.assertEqual(1, log_creator.new_log_envelope.call_count)
@mock.patch('monasca_log_api.v1.common.service.LogCreator')
@mock.patch('monasca_log_api.publisher.kafka_publisher.KafkaPublisher')
def test_should_pass_delegate_cross_tenant_id_ok_role(self,
log_creator,
kafka_publisher):
log_creator.configure_mock(**{'new_log.return_value': None,
'new_log_envelope.return_value': None})
kafka_publisher.configure_mock(**{'send_message.return_value': None})
self.logs_resource._log_creator = log_creator
self.logs_resource._kafka_publisher = kafka_publisher
self.simulate_request(
'/logs/single',
method='POST',
query_string='tenant_id=1',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: 'a:1',
'Content-Type': 'application/json'
}
)
self.assertEqual(falcon.HTTP_204, self.srmock.status)
self.assertEqual(1, kafka_publisher.send_message.call_count)
self.assertEqual(1, log_creator.new_log.call_count)
self.assertEqual(1, log_creator.new_log_envelope.call_count)
def test_should_fail_empty_dimensions_delegate(self):
with mock.patch.object(self.logs_resource._log_creator,
'_read_payload',
return_value=True):
self.simulate_request(
'/logs/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'application/json'
}
)
self.assertEqual(log_api_exceptions.HTTP_422, self.srmock.status)
def test_should_fail_for_invalid_content_type(self):
self.simulate_request(
'/logs/single',
method='POST',
headers={
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
headers.X_DIMENSIONS.name: '',
'Content-Type': 'video/3gpp'
}
)
self.assertEqual(falcon.HTTP_406, self.srmock.status)

View File

@ -0,0 +1,336 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import unittest
import falcon
from falcon import testing
import mock
import simplejson
from monasca_log_api.api import exceptions
from monasca_log_api.api import logs_api
from monasca_log_api.v1.common import service as common_service
class IsDelegate(unittest.TestCase):
def test_is_delegate_ok_role(self):
roles = logs_api.MONITORING_DELEGATE_ROLE
self.assertTrue(common_service.is_delegate(roles))
def test_is_delegate_ok_role_in_roles(self):
roles = logs_api.MONITORING_DELEGATE_ROLE + ',a_role,b_role'
self.assertTrue(common_service.is_delegate(roles))
def test_is_delegate_not_ok_role(self):
roles = 'a_role,b_role'
self.assertFalse(common_service.is_delegate(roles))
class ParseDimensions(unittest.TestCase):
def test_should_fail_for_empty_dimensions(self):
self.assertRaises(exceptions.HTTPUnprocessableEntity,
common_service.parse_dimensions, '')
self.assertRaises(exceptions.HTTPUnprocessableEntity,
common_service.parse_dimensions, None)
def test_should_fail_for_empty_dim_in_dimensions(self):
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.parse_dimensions(',')
self.assertEqual(context.exception.description,
'Dimension cannot be empty')
def test_should_fail_for_invalid_dim_in_dimensions(self):
invalid_dim = 'a'
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.parse_dimensions(invalid_dim)
self.assertEqual(context.exception.description,
'%s is not a valid dimension' % invalid_dim)
def test_should_pass_for_valid_dimensions(self):
dimensions = 'a:1,b:2'
expected = [('a', '1'), ('b', '2')]
self.assertListEqual(expected,
common_service.parse_dimensions(dimensions))
class ParseApplicationType(unittest.TestCase):
def test_should_return_none_for_none(self):
self.assertIsNone(common_service.parse_application_type(None))
def test_should_return_none_for_empty(self):
self.assertIsNone(common_service.parse_application_type(''))
def test_should_return_none_for_whitespace_filled(self):
self.assertIsNone(common_service.parse_application_type(' '))
def test_should_return_value_for_ok_value(self):
app_type = 'monasca'
self.assertEqual(app_type,
common_service.parse_application_type(app_type))
def test_should_return_value_for_ok_value_with_spaces(self):
app_type = ' monasca '
expected = 'monasca'
self.assertEqual(expected,
common_service.parse_application_type(app_type))
class ApplicationTypeValidations(unittest.TestCase):
def test_should_pass_for_empty_app_type(self):
common_service.Validations.validate_application_type()
common_service.Validations.validate_application_type('')
def test_should_fail_for_invalid_length(self):
r_app_type = testing.rand_string(300, 600)
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_application_type(r_app_type)
length = common_service.APPLICATION_TYPE_CONSTRAINTS['MAX_LENGTH']
msg = ('Application type {type} must be '
'{length} characters or less'.format(type=r_app_type,
length=length))
self.assertEqual(context.exception.description, msg)
def test_should_fail_for_invalid_content(self):
r_app_type = '%#$@!'
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_application_type(r_app_type)
msg = ('Application type %s may only contain: "a-z A-Z 0-9 _ - ."' %
r_app_type)
self.assertEqual(context.exception.description, msg)
def test_should_pass_for_ok_app_type(self):
r_app_type = 'monasca'
common_service.Validations.validate_application_type(r_app_type)
class DimensionsValidations(unittest.TestCase):
@unittest.expectedFailure
def test_should_fail_for_none_dimensions(self):
common_service.Validations.validate_dimensions(None)
@unittest.expectedFailure
def test_should_fail_pass_for_non_iterable_dimensions_str(self):
common_service.Validations.validate_dimensions('')
@unittest.expectedFailure
def test_should_fail_pass_for_non_iterable_dimensions_number(self):
common_service.Validations.validate_dimensions(1)
def test_should_pass_for_empty_dimensions_array(self):
common_service.Validations.validate_dimensions([])
def test_should_fail_too_empty_name(self):
dimensions = [('', 1)]
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
msg = 'Dimension name cannot be empty'
self.assertEqual(context.exception.description, msg)
def test_should_fail_too_long_name(self):
name = testing.rand_string(256, 260)
dimensions = [(name, 1)]
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
msg = 'Dimension name %s must be 255 characters or less' % name
self.assertEqual(context.exception.description, msg)
def test_should_fail_underscore_at_begin(self):
name = '_aDim'
dimensions = [(name, 1)]
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
msg = 'Dimension name %s cannot start with underscore (_)' % name
self.assertEqual(context.exception.description, msg)
def test_should_fail_invalid_chars(self):
name = '<>'
dimensions = [(name, 1)]
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
invalid_chars = '> < = { } ( ) \' " , ; &'
msg = 'Dimension name %s may not contain: %s' % (name, invalid_chars)
self.assertEqual(context.exception.description, msg)
def test_should_fail_ok_name_empty_value(self):
name = 'monasca'
dimensions = [(name, '')]
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
msg = 'Dimension value cannot be empty'
self.assertEqual(context.exception.description, msg)
def test_should_fail_ok_name_too_long_value(self):
name = 'monasca'
value = testing.rand_string(256, 300)
dimensions = [(name, value)]
with self.assertRaises(exceptions.HTTPUnprocessableEntity) as context:
common_service.Validations.validate_dimensions(dimensions)
msg = 'Dimension value %s must be 255 characters or less' % value
self.assertEqual(context.exception.description, msg)
def test_should_pass_ok_name_ok_value_empty_service(self):
name = 'monasca'
value = '1'
dimensions = [(name, value)]
common_service.Validations.validate_dimensions(dimensions)
def test_should_pass_ok_name_ok_value_service_SERVICE_DIMENSIONS_as_name(
self):
name = 'some_name'
value = '1'
dimensions = [(name, value)]
common_service.Validations.validate_dimensions(dimensions)
class LogsCreatorPayload(unittest.TestCase):
def setUp(self):
self.instance = common_service.LogCreator()
@mock.patch('io.IOBase')
def test_should_fail_for_unreadable_payload(self, payload):
payload.configure_mock(**{'readable.return_value': False})
self.assertRaises(falcon.HTTPInternalServerError,
self.instance._read_payload, payload, 'a')
@mock.patch('io.IOBase')
def test_should_read_text_for_plain_text(self, payload):
msg = u'Hello World'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
self.assertEqual(msg,
self.instance._read_payload(payload, 'text/plain'))
@mock.patch('io.IOBase')
def test_should_read_json_for_application_json(self, payload):
msg = u'{"path":"/var/log/messages","message":"This is message"}'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
json_msg = simplejson.loads(msg, encoding='utf-8')
self.assertEqual(json_msg,
self.instance._read_payload(payload,
'application/json'))
@mock.patch('io.IOBase')
def test_should_fail_read_text_for_application_json(self, payload):
with self.assertRaises(falcon.HTTPBadRequest) as context:
msg = u'Hello World'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
self.instance._read_payload(payload,
'application/json')
self.assertEqual(context.exception.title,
'Failed to read body as json')
class LogsCreatorNewLog(unittest.TestCase):
def setUp(self):
self.instance = common_service.LogCreator()
@mock.patch('io.IOBase')
def test_should_create_log_from_json(self, payload):
msg = u'Hello World'
path = u'/var/log/messages'
json_msg = u'{"path":"%s","message":"%s"}' % (path, msg)
app_type = 'monasca'
dimensions = 'cpu_time:30'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': json_msg})
expected_log = {
'message': msg,
'application_type': app_type,
'dimensions': [('cpu_time', '30')],
'path': path
}
self.assertEqual(expected_log, self.instance.new_log(
application_type=app_type,
dimensions=dimensions,
payload=payload
))
@mock.patch('io.IOBase')
def test_should_create_log_from_text(self, payload):
msg = u'Hello World'
app_type = 'monasca'
dimensions = 'cpu_time:30'
payload.configure_mock(
**{'readable.return_value': True, 'read.return_value': msg})
expected_log = {
'message': msg,
'application_type': app_type,
'dimensions': [('cpu_time', '30')]
}
self.assertEqual(expected_log, self.instance.new_log(
application_type=app_type,
dimensions=dimensions,
payload=payload,
content_type='text/plain'
))
class LogCreatorNewEnvelope(unittest.TestCase):
def setUp(self):
self.instance = common_service.LogCreator()
def test_should_create_envelope(self):
msg = u'Hello World'
path = u'/var/log/messages'
app_type = 'monasca'
expected_log = {
'message': msg,
'application_type': app_type,
'dimensions': [('cpu_time', '30')],
'path': path
}
tenant_id = 'a_tenant'
none = None
meta = {'tenantId': tenant_id, 'region': none}
expected_envelope = {
'log': expected_log,
'creation_time': datetime.datetime.utcnow(),
'meta': meta
}
with mock.patch.object(self.instance, '_create_meta_info',
return_value=meta):
actual_envelope = self.instance.new_log_envelope(expected_log,
tenant_id)
self.assertEqual(expected_envelope.get('log'),
actual_envelope.get('log'))
self.assertEqual(expected_envelope.get('meta'),
actual_envelope.get('meta'))

View File

View File

View File

@ -0,0 +1,100 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from oslo_log import log
import simplejson
from monasca_log_api.publisher import kafka_publisher
LOG = log.getLogger(__name__)
CONF = cfg.CONF
log_publisher_opts = [
cfg.MultiStrOpt('topics',
default=['logs'],
help='Target topic in kafka')
]
log_publisher_group = cfg.OptGroup(name='log_publisher', title='log_publisher')
cfg.CONF.register_group(log_publisher_group)
cfg.CONF.register_opts(log_publisher_opts, log_publisher_group)
class LogPublisher(object):
def __init__(self):
self._topics = CONF.log_publisher.topics
self._kafka_publisher = kafka_publisher.KafkaPublisher()
LOG.info('Initializing LogPublisher <%s>', self)
@staticmethod
def _build_key(tenant_it, obj):
"""Message key builder
Builds message key using tenant_id and following details of
log message:
- application_type
- dimensions
:param tenant_it: tenant id
:param obj: log instance
:return: key
"""
def comparator(a, b):
"""Comparator for dimensions
:param a: dimension_a, tuple with properties name,value
:param b: dimension_b, tuple with properties name,value
:return: sorting result
"""
if a.name == b.name:
return (a.value > b.value) - (a.value < b.value)
return (a.name > b.name) - (a.name < b.name)
str_list = []
if tenant_it:
str_list.append(str(tenant_it))
if obj:
if 'application_type' in obj and obj['application_type']:
str_list += obj['application_type']
if 'dimensions' in obj and obj['dimensions']:
dimensions = sorted(obj['dimensions'], cmp=comparator)
for name, value in dimensions:
str_list += name
str_list += str(value)
return ''.join(str_list)
def send_message(self, message):
if not message:
return
key = self._build_key(message['meta']['tenantId'], message['log'])
msg = simplejson.dumps(message,
sort_keys=False,
ensure_ascii=False).encode('utf8')
LOG.debug('Build key [%s] for message' % key)
LOG.debug('Sending message {topics=%s,key=%s,message=%s}' %
(self._topics, key, msg))
try:
for topic in self._topics:
self._kafka_publisher.send_message(topic, key, msg)
except Exception as ex:
LOG.error(ex.message)
raise ex

View File

@ -0,0 +1,250 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import datetime
import re
import falcon
from oslo_config import cfg
from oslo_log import log
import simplejson
from monasca_log_api.api import exceptions
from monasca_log_api.api import logs_api
LOG = log.getLogger(__name__)
CONF = cfg.CONF
service_opts = [
cfg.StrOpt('region',
default=None,
help='Region')
]
service_group = cfg.OptGroup(name='service', title='service')
CONF.register_group(service_group)
CONF.register_opts(service_opts, service_group)
APPLICATION_TYPE_CONSTRAINTS = {
'MAX_LENGTH': 255,
'PATTERN': re.compile('^[a-zA-Z0-9_\\.\\-]+$')
}
DIMENSION_NAME_CONSTRAINTS = {
'MAX_LENGTH': 255,
'PATTERN': re.compile('[^><={}(), \'";&]+$')
}
DIMENSION_VALUE_CONSTRAINTS = {
'MAX_LENGTH': 255
}
Dimension = collections.namedtuple('Dimensions', ['name', 'value'])
class Validations(object):
@staticmethod
def validate_application_type(application_type=None):
def validate_length():
if (len(application_type) >
APPLICATION_TYPE_CONSTRAINTS['MAX_LENGTH']):
msg = ('Application type {type} must be '
'{length} characters or less')
raise exceptions.HTTPUnprocessableEntity(
msg.format(
type=application_type,
length=APPLICATION_TYPE_CONSTRAINTS[
'MAX_LENGTH']
)
)
def validate_match():
if (not APPLICATION_TYPE_CONSTRAINTS['PATTERN']
.match(application_type)):
raise exceptions.HTTPUnprocessableEntity(
'Application type %s may only contain: "a-z A-Z 0-9 _ - ."'
% application_type
)
if application_type:
validate_length()
validate_match()
@staticmethod
def validate_dimensions(dimensions):
def validate_name(name):
if not name:
raise exceptions.HTTPUnprocessableEntity(
'Dimension name cannot be empty'
)
if len(name) > DIMENSION_NAME_CONSTRAINTS['MAX_LENGTH']:
raise exceptions.HTTPUnprocessableEntity(
'Dimension name %s must be 255 characters or less' %
name
)
if name[0] == '_':
raise exceptions.HTTPUnprocessableEntity(
'Dimension name %s cannot start with underscore (_)' %
name
)
if not DIMENSION_NAME_CONSTRAINTS['PATTERN'].match(name):
raise exceptions.HTTPUnprocessableEntity(
'Dimension name %s may not contain: %s' %
(name, '> < = { } ( ) \' " , ; &')
)
def validate_value(value):
if not value:
raise exceptions.HTTPUnprocessableEntity(
'Dimension value cannot be empty'
)
if len(value) > DIMENSION_VALUE_CONSTRAINTS['MAX_LENGTH']:
raise exceptions.HTTPUnprocessableEntity(
'Dimension value %s must be 255 characters or less' %
value
)
if (isinstance(dimensions, (list, tuple)) and not
isinstance(dimensions, basestring)):
for dim_name, dim_value in dimensions:
validate_name(dim_name)
validate_value(dim_value)
else:
raise exceptions.HTTPUnprocessableEntity(
'Dimensions %s must be a collections' % dimensions)
class LogCreator(object):
def __init__(self):
self._log = log.getLogger('service.LogCreator')
self._log.info('Initializing LogCreator')
# noinspection PyMethodMayBeStatic
def _create_meta_info(self, tenant_id):
return {
'tenantId': tenant_id,
'region': cfg.CONF.get('region')
}
def _read_payload(self, payload, content_type):
if not payload.readable():
self._log.error('Payload cannot be read, stream not readable')
raise falcon.HTTPInternalServerError(
title='Invalid message',
description='Couldn\'t read the message'
)
try:
content = payload.read()
except Exception as ex:
raise falcon.HTTPBadRequest(title='Failed to read body',
description=ex.message)
if content and content_type == 'application/json':
try:
content = simplejson.loads(content, encoding='utf-8')
except Exception as ex:
raise falcon.HTTPBadRequest(title='Failed to read body as '
'json',
description=ex.message)
return content
def new_log(self,
application_type,
dimensions,
payload,
content_type='application/json',
validate=True):
payload = self._read_payload(payload, content_type)
if not payload:
return None
# normalize_yet_again
application_type = parse_application_type(application_type)
dimensions = parse_dimensions(dimensions)
self._log.debug(
'application_type=%s,dimensions=%s' % (
application_type, dimensions)
)
if validate:
self._log.debug('Validation enabled, proceeding with validation')
Validations.validate_application_type(application_type)
Validations.validate_dimensions(dimensions)
log_object = {}
if content_type == 'application/json':
log_object.update(payload)
else:
log_object.update({'message': payload})
log_object.update({
'application_type': application_type,
'dimensions': dimensions
})
return log_object
def new_log_envelope(self, log_object, tenant_id):
return {
'log': log_object,
'creation_time': datetime.datetime.utcnow(),
'meta': self._create_meta_info(tenant_id)
}
def is_delegate(roles):
if roles:
roles = roles.split(',')
return logs_api.MONITORING_DELEGATE_ROLE in roles
pass
return False
def parse_application_type(app_type):
if app_type:
app_type = app_type.strip()
return app_type if app_type else None
def parse_dimensions(dimensions):
if not dimensions:
raise exceptions.HTTPUnprocessableEntity('Dimension are required')
new_dimensions = []
dimensions = map(str.strip, dimensions.split(','))
for dim in dimensions:
if not dim:
raise exceptions.HTTPUnprocessableEntity(
'Dimension cannot be empty')
elif ':' not in dim:
raise exceptions.HTTPUnprocessableEntity(
'%s is not a valid dimension' % dim)
dim = dim.split(':')
name = str(dim[0].strip()) if dim[0] else None
value = str(dim[1].strip()) if dim[1] else None
if name and value:
new_dimensions.append(Dimension(name, value))
return new_dimensions

View File

View File

@ -0,0 +1,81 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from monasca_log_api.api import headers
from monasca_log_api.api import logs_api
from monasca_log_api.v1.common import log_publisher
from monasca_log_api.v1.common import service
# TODO(idea) perhaps add it as pipeline call right before API, seems generic
def _before_logs_post(req, res, payload, params):
cross_tenant_id = req.get_param('tenant_id')
tenant_id = req.get_header(*headers.X_TENANT_ID)
if not service.is_delegate(req.get_header(*headers.X_ROLES)):
if cross_tenant_id:
raise falcon.HTTPForbidden(
'Permission denied',
'Projects %s cannot POST cross tenant metrics' % tenant_id
)
class Logs(logs_api.LogsApi):
"""Logs Api V1."""
_supported_c_types = {'application/json', 'text/plain'}
def __init__(self):
self._log_creator = service.LogCreator()
self._kafka_publisher = log_publisher.LogPublisher()
super(Logs, self).__init__()
@falcon.before(_before_logs_post)
def on_post(self, req, res):
content_type = req.content_type
if content_type not in self._supported_c_types:
raise falcon.HTTPNotAcceptable(
description='Only %s are accepted as logs representations'
% self._supported_c_types)
cross_tenant_id = req.get_param('tenant_id')
tenant_id = req.get_header(*headers.X_TENANT_ID)
log = self.get_log(request=req)
envelope = self.get_envelope(
log=log,
tenant_id=tenant_id if tenant_id else cross_tenant_id
)
self._kafka_publisher.send_message(envelope)
res.status = falcon.HTTP_204
def get_envelope(self, log, tenant_id):
return self._log_creator.new_log_envelope(
log_object=log,
tenant_id=tenant_id
)
def get_log(self, request):
return self._log_creator.new_log(
application_type=request.get_header(*headers.X_APPLICATION_TYPE),
dimensions=request.get_header(*headers.X_DIMENSIONS),
payload=request.stream,
content_type=request.content_type
)

View File

@ -0,0 +1,87 @@
# Copyright 2015 kornicameister@gmail.com
# Copyright 2015 FUJITSU LIMITED
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import falcon
from oslo_log import log
import simplejson
from monasca_log_api.api import versions_api
from monasca_log_api import constants
LOG = log.getLogger(__name__)
VERSIONS = {
'v1.0': {
'id': 'v1.0',
'links': [
{
'rel': 'self',
'href': ''
},
{
'rel': 'links',
'href': '/logs'
}
],
'status': 'CURRENT',
'updated': "2013-03-06T00:00:00Z"
}
}
class Versions(versions_api.VersionsAPI):
def __init__(self):
super(Versions, self).__init__()
@staticmethod
def handle_none_version_id(req, res, result):
for version in VERSIONS:
VERSIONS[version]['links'][0]['href'] = (
req.uri.decode('utf8') + version)
result['elements'].append(VERSIONS[version])
res.body = simplejson.dumps(result)
res.status = falcon.HTTP_200
@staticmethod
def handle_version_id(req, res, version_id):
if version_id in VERSIONS:
VERSIONS[version_id]['links'][0]['href'] = (
req.uri.decode(constants.ENCODING)
)
for version in VERSIONS:
VERSIONS[version]['links'][0]['href'] = (
req.uri.decode('utf8')
)
VERSIONS[version_id]['links'][1]['href'] = (
req.uri.decode('utf8') +
VERSIONS[version_id]['links'][1]['href']
)
res.body = simplejson.dumps(VERSIONS[version_id])
res.status = falcon.HTTP_200
else:
res.body = 'Invalid Version ID'
res.status = falcon.HTTP_400
def on_get(self, req, res, version_id=None):
result = {
'links': [{
'rel': 'self',
'href': req.uri.decode(constants.ENCODING)
}],
'elements': []
}
if version_id is None:
self.handle_none_version_id(req, res, result)
else:
self.handle_version_id(req, res, version_id)

15
requirements.txt Normal file
View File

@ -0,0 +1,15 @@
paste
falcon==0.3.0
gunicorn>=19.2.0
keystonemiddleware
oslo.config>=1.2.1
oslo.middleware
oslo.log
oslo.serialization
oslo.utils
pastedeploy>=1.3.3
pbr>=1.6.0,<2.0
six>=1.9.0
kafka-python>=0.9.3,<0.9.4
simplejson>=3.8.0
simport

34
setup.cfg Normal file
View File

@ -0,0 +1,34 @@
[metadata]
name = monasca-log-api
summary = OpenStack Monitoring as a Service
description-file =
README.md
author = kornicameister
author-email = kornicameister@gmail.com
home-page = https://launchpad.net/monasca
classifier =
Environment :: OpenStack
Intended Audience :: Information Technology
Intended Audience :: System Administrators
License :: OSI Approved :: Apache Software License
Operating System :: POSIX :: Linux
Programming Language :: Python
Programming Language :: Python :: 2
Programming Language :: Python :: 2.7
[files]
packages =
monasca_log_api
data_files =
/etc/monasca =
etc/monasca/log-api-config.conf
etc/monasca/log-api-config.ini
[entry_points]
console_scripts =
monasca-log-api = monasca_log_api.server:launch
[pbr]
warnerrors = True

8
setup.py Normal file
View File

@ -0,0 +1,8 @@
from setuptools import setup
__author__ = 'kornicameister@gmail.com'
setup(
setup_requires=['pbr'],
pbr=True,
)

7
test-requirements.txt Normal file
View File

@ -0,0 +1,7 @@
fixtures==1.3.1
flake8==2.4.1
coverage>=4.0
hacking>=0.9.6,<0.10
mock
nose

24
tox.ini Normal file
View File

@ -0,0 +1,24 @@
[tox]
envlist = py27,py3,pep8
[testenv]
usedevelop = True
install_command = pip install -U {opts} {packages}
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = nosetests
[testenv:pep8]
commands = flake8 monasca_log_api
[testenv:cover]
setenv =
NOSE_WITH_COVERAGE=1
NOSE_COVER_PACKAGE=monasca_log_api
[flake8]
exclude = .git,.tox,dist,docs,*.egg,build
show-source = True
[hacking]