Monitoring for monasca-log-api
Following commit adds monitoring for monasca-log-api with metrics: * monasca.log.in_logs - amount of logs that API has received * monasca.log.in_logs_rejected - size of received logs in bytes * monasca.log.processing_time_ms - log-api time to process received logs * monasca.log.out_logs - amount of logs pubslished to kafka * monasca.log.out_logs_lost - amount of logs that were lost (critical errors) * monasca.log.publish_time_ms - time log-api needed to pubslish logs * monasca.log.in_bulks_rejected - amount of rejected bulk requests * monasca.log.out_logs_truncated_bytes - amount of truncated bytes from messages Change-Id: Ib4165fe128e87b356415da8423f536d393c89f01
This commit is contained in:
parent
264ddd3c3f
commit
c3bb9b9eb4
|
@ -77,6 +77,7 @@ Requests flow through the following architectural layers from top to bottom:
|
|||
|
||||
* API Specification: [/docs/monasca-log-api-spec.md](/docs/monasca-log-api-spec.md).
|
||||
* Kafka communication: [/docs/monasca-log-api-kafka.md](/docs/monasca-log-api-kafka.md).
|
||||
* API Monitoring: [/docs/monasca-log-api-metrics.md](/docs/monasca-log-api-metrics.md).
|
||||
|
||||
## Python monasca-log-api implementation
|
||||
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
# Monitoring for monasca-log-api
|
||||
|
||||
**monasca-log-api** can be monitored by examining following metrics
|
||||
|
||||
| Name | Meaning | Dimensions |
|
||||
|-------------------------------------------|-------------------------|---------------|
|
||||
| monasca.log.in_logs | Amount of received logs | version |
|
||||
| monasca.log.in_logs_rejected | Amount of rejected logs (see below for details) | version |
|
||||
| monasca.log.in_bulks_rejected | Amount of rejected bulks (see below for details) | version |
|
||||
| monasca.log.in_logs_bytes | Size received logs (a.k.a. *Content-Length)* in bytes | version |
|
||||
| monasca.log.out_logs | Amount of logs published to kafka | |
|
||||
| monasca.log.out_logs_lost | Amount of logs lost during publish phase | |
|
||||
| monasca.log.out_logs_truncated_bytes | Amount of truncated bytes, removed from message | |
|
||||
| monasca.log.publish_time_ms | Time Log-Api needed to publish all logs to kafka | |
|
||||
| monasca.log.processing_time_ms | Time Log-Api needed to process received logs. | version |
|
||||
|
||||
Additionally each metric contains following dimensions:
|
||||
- **component** - monasca-log-api
|
||||
- **service** - monitoring
|
||||
|
||||
## Metrics explained
|
||||
|
||||
### monasca.log.in_logs
|
||||
|
||||
Metric sent with amount of logs that were received by **Log-API**
|
||||
and successfully passed initial validation. For **v2.0** this
|
||||
metric will be always send with value one, for **v3.0** this metric's values
|
||||
are equivalent to amount of element in bulk request.
|
||||
|
||||
### monasca.log.in_logs_rejected <a name="monasca_log_logs_rejected">
|
||||
|
||||
Logs can be rejected because of:
|
||||
|
||||
* checking content-type
|
||||
* checking content-length
|
||||
* reading payload
|
||||
* retrieving logs from payload
|
||||
* validating global dimensions (if set)(only valid for v3.0)
|
||||
|
||||
### monasca.log.in_bulks_rejected (only v3.0)
|
||||
|
||||
In **v2.0** bulk request is equivalent to single request (i.e. single-element bulk).
|
||||
However in **v3.0** rejecting logs is done in two phases.
|
||||
|
||||
*Phase 1* is when there is no way to determine actual amount of logs
|
||||
that were sent by client (see [monasca.log.in_logs_rejected](#monasca_log_logs_rejected))
|
||||
If any of these steps was impossible to be executed entire bulk is
|
||||
considered lost and thus all logs within.
|
||||
|
||||
If *Phase 1* passes, *Phase 2* is executed. At this point every
|
||||
piece of data is available, however still some logs can be rejected,
|
||||
because of:
|
||||
|
||||
* lack of certain fields (i.e. message)
|
||||
* invalid local dimensions (if set)
|
||||
|
||||
In *Phase 2* metric [monasca.log.in_logs_rejected](#monasca_log_logs_rejected)
|
||||
is produced.
|
||||
|
||||
### monasca.log.in_logs_bytes
|
||||
|
||||
Metric allows to track to size of requests API receives.
|
||||
In **v3.0** To simplify implementation it is equivalent to **Content-Length** value.
|
||||
However amount of global dimensions and other metadata when compared
|
||||
to size of logs is negligible.
|
||||
|
||||
### monasca.log.out_logs
|
||||
|
||||
Amount of logs successfully published to kafka queue.
|
||||
|
||||
### monasca.log.out_logs_lost
|
||||
|
||||
Amount of logs that were not sent to kafka and **Log-API** was unable
|
||||
to recover from error situation
|
||||
|
||||
### monasca.log.out_logs_truncated_bytes
|
||||
|
||||
Metric is sent with the amount of bytes that log's message is shorten
|
||||
by if necessary. To read more about truncation see [here](/docs/monasca-log-api-kafka.md).
|
||||
If truncation did not happen, which should be normal situation for most
|
||||
of the time, metric is updated with value **0**.
|
||||
|
||||
### monasca.log.publish_time_ms
|
||||
|
||||
Time that was needed to send all the logs into all the topics.
|
||||
*monasca.log.processing_time_ms* includes value of that metric
|
||||
within. It exists to see how much does publishing take in entire
|
||||
processing.
|
||||
|
||||
### monasca.log.processing_time_ms
|
||||
|
||||
Total amount of time logs spent inside **Log-API**. Metric does not
|
||||
include time needed to communicate with Keystone to authenticate request.
|
||||
As far as possible it is meant to track **Log-API** itself
|
|
@ -0,0 +1,18 @@
|
|||
monasca_log_api.monitoring package
|
||||
==================================
|
||||
|
||||
Submodules
|
||||
----------
|
||||
|
||||
monasca_log_api.monitoring.client module
|
||||
----------------------------------------
|
||||
|
||||
.. automodule:: monasca_log_api.monitoring.client
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
||||
|
||||
.. automodule:: monasca_log_api.monitoring.metrics
|
||||
:members:
|
||||
:undoc-members:
|
||||
:show-inheritance:
|
|
@ -11,6 +11,7 @@ Subpackages
|
|||
monasca_log_api.reference.v3
|
||||
monasca_log_api.reference.common
|
||||
monasca_log_api.middleware
|
||||
monasca_log_api.monitoring
|
||||
monasca_log_api.healthcheck
|
||||
|
||||
Submodules
|
||||
|
|
|
@ -12,6 +12,11 @@ logs_v3 = monasca_log_api.reference.v3.logs:Logs
|
|||
versions = monasca_log_api.reference.versions:Versions
|
||||
healthchecks = monasca_log_api.reference.healthchecks:HealthChecks
|
||||
|
||||
[monitoring]
|
||||
statsd_host = 127.0.0.1
|
||||
statsd_port = 8125
|
||||
statsd_buffer = 50
|
||||
|
||||
[service]
|
||||
max_log_size = 1048576
|
||||
region = 'region-one'
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
import falcon
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_log_api.monitoring import client
|
||||
from monasca_log_api.monitoring import metrics
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
MONITORING_DELEGATE_ROLE = 'monitoring-delegate'
|
||||
|
@ -31,6 +34,28 @@ class LogsApi(object):
|
|||
"""
|
||||
def __init__(self):
|
||||
super(LogsApi, self).__init__()
|
||||
self._statsd = client.get_client()
|
||||
|
||||
# create_common counters, gauges etc.
|
||||
self._metrics_dimensions = dimensions = {'version': self.version}
|
||||
|
||||
self._logs_in_counter = self._statsd.get_counter(
|
||||
name=metrics.LOGS_RECEIVED_METRIC,
|
||||
dimensions=dimensions
|
||||
)
|
||||
self._logs_size_gauge = self._statsd.get_gauge(
|
||||
name=metrics.LOGS_RECEIVED_BYTE_SIZE_METRICS,
|
||||
dimensions=dimensions
|
||||
)
|
||||
self._logs_rejected_counter = self._statsd.get_counter(
|
||||
name=metrics.LOGS_REJECTED_METRIC,
|
||||
dimensions=dimensions
|
||||
)
|
||||
self._logs_processing_time = self._statsd.get_timer(
|
||||
name=metrics.LOGS_PROCESSING_TIME_METRIC,
|
||||
dimensions=dimensions
|
||||
)
|
||||
|
||||
LOG.info('Initializing LogsApi %s!' % self.version)
|
||||
|
||||
def on_post(self, req, res):
|
||||
|
|
|
@ -0,0 +1,103 @@
|
|||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import monascastatsd
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
_DEFAULT_HOST = '127.0.0.1'
|
||||
_DEFAULT_PORT = 8125
|
||||
_DEFAULT_BUFFER_SIZE = 50
|
||||
_DEFAULT_DIMENSIONS = {
|
||||
'service': 'monitoring',
|
||||
'component': 'monasca-log-api'
|
||||
}
|
||||
_CLIENT_NAME = 'monasca'
|
||||
|
||||
monitoring_opts = [
|
||||
cfg.IPOpt('statsd_host',
|
||||
default=_DEFAULT_HOST,
|
||||
help=('IP address of statsd server, default to %s'
|
||||
% _DEFAULT_HOST)),
|
||||
cfg.PortOpt('statsd_port',
|
||||
default=_DEFAULT_PORT,
|
||||
help='Port of statsd server, default to %d' % _DEFAULT_PORT),
|
||||
cfg.IntOpt('statsd_buffer',
|
||||
default=_DEFAULT_BUFFER_SIZE,
|
||||
required=True,
|
||||
help=('Maximum number of metric to buffer before sending, '
|
||||
'default to %d' % _DEFAULT_BUFFER_SIZE)),
|
||||
cfg.DictOpt('dimensions')
|
||||
]
|
||||
|
||||
monitoring_group = cfg.OptGroup(name='monitoring', title='monitoring')
|
||||
|
||||
cfg.CONF.register_group(monitoring_group)
|
||||
cfg.CONF.register_opts(monitoring_opts, monitoring_group)
|
||||
|
||||
|
||||
def get_client(dimensions=None):
|
||||
"""Creates statsd client
|
||||
|
||||
Creates monasca-statsd client using configuration from
|
||||
config file and supplied dimensions.
|
||||
|
||||
Configuration is composed out of ::
|
||||
|
||||
[monitoring]
|
||||
statsd_host = 192.168.10.4
|
||||
statsd_port = 8125
|
||||
statsd_buffer = 50
|
||||
|
||||
Dimensions are appended to following dictionary ::
|
||||
|
||||
{
|
||||
'service': 'monitoring',
|
||||
'component': 'monasca-log-api'
|
||||
}
|
||||
|
||||
Note:
|
||||
Passed dimensions do not override those specified in
|
||||
dictionary above
|
||||
|
||||
:param dict dimensions: Optional dimensions
|
||||
:return: statsd client
|
||||
:rtype: monascastatsd.Client
|
||||
"""
|
||||
dims = _DEFAULT_DIMENSIONS.copy()
|
||||
if dimensions:
|
||||
for key, val in dimensions.items():
|
||||
if key not in _DEFAULT_DIMENSIONS:
|
||||
dims[key] = val
|
||||
else:
|
||||
LOG.warn('Cannot override fixed dimension %s=%s', key,
|
||||
_DEFAULT_DIMENSIONS[key])
|
||||
|
||||
connection = monascastatsd.Connection(
|
||||
host=CONF.monitoring.statsd_host,
|
||||
port=CONF.monitoring.statsd_port,
|
||||
max_buffer_size=CONF.monitoring.statsd_buffer
|
||||
)
|
||||
client = monascastatsd.Client(name=_CLIENT_NAME,
|
||||
connection=connection,
|
||||
dimensions=dims)
|
||||
|
||||
LOG.debug('Created statsd client %s[%s] = %s:%d', _CLIENT_NAME, dims,
|
||||
CONF.monitoring.statsd_host, CONF.monitoring.statsd_port)
|
||||
|
||||
return client
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
LOGS_RECEIVED_METRIC = 'log.in_logs'
|
||||
"""Metrics sent with amount of logs (not requests) API receives"""
|
||||
|
||||
LOGS_REJECTED_METRIC = 'log.in_logs_rejected'
|
||||
"""Metric sent with amount of logs that were rejected
|
||||
(i.e. invalid dimension)"""
|
||||
|
||||
LOGS_BULKS_REJECTED_METRIC = 'log.in_bulks_rejected'
|
||||
"""Metric sent with amount of bulk packages that were rejected due
|
||||
to early stage validation (content-length, content-type).
|
||||
Only valid for v3.0.
|
||||
"""
|
||||
|
||||
LOGS_RECEIVED_BYTE_SIZE_METRICS = 'log.in_logs_bytes'
|
||||
"""Metric sent with size of payloads(a.k.a. Content-Length)
|
||||
(in bytes) API receives"""
|
||||
|
||||
LOGS_PROCESSING_TIME_METRIC = 'log.processing_time_ms'
|
||||
"""Metric sent with time that log-api needed to process each received log.
|
||||
Metric does not include time needed to authorize requests."""
|
||||
|
||||
LOGS_PUBLISHED_METRIC = 'log.out_logs'
|
||||
"""Metric sent with amount of logs published to kafka"""
|
||||
|
||||
LOGS_PUBLISHED_LOST_METRIC = 'log.out_logs_lost'
|
||||
"""Metric sent with amount of logs that were lost due to critical error in
|
||||
publish phase."""
|
||||
|
||||
LOGS_PUBLISH_TIME_METRIC = 'log.publish_time_ms'
|
||||
"""Metric sent with time that publishing took"""
|
||||
|
||||
LOGS_TRUNCATED_METRIC = 'log.out_logs_truncated_bytes'
|
||||
"""Metric sent with amount of truncated bytes from log message"""
|
|
@ -13,6 +13,7 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import falcon
|
||||
import time
|
||||
|
||||
from monasca_common.kafka import producer
|
||||
|
@ -20,6 +21,9 @@ from monasca_common.rest import utils as rest_utils
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from monasca_log_api.monitoring import client
|
||||
from monasca_log_api.monitoring import metrics
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
@ -83,6 +87,23 @@ class LogPublisher(object):
|
|||
self._kafka_publisher = producer.KafkaProducer(
|
||||
url=CONF.log_publisher.kafka_url
|
||||
)
|
||||
|
||||
self._statsd = client.get_client()
|
||||
|
||||
# setup counter, gauges etc
|
||||
self._logs_published_counter = self._statsd.get_counter(
|
||||
metrics.LOGS_PUBLISHED_METRIC
|
||||
)
|
||||
self._publish_time_ms = self._statsd.get_timer(
|
||||
metrics.LOGS_PUBLISH_TIME_METRIC
|
||||
)
|
||||
self._logs_lost_counter = self._statsd.get_counter(
|
||||
metrics.LOGS_PUBLISHED_LOST_METRIC
|
||||
)
|
||||
self._logs_truncated_gauge = self._statsd.get_gauge(
|
||||
metrics.LOGS_TRUNCATED_METRIC
|
||||
)
|
||||
|
||||
LOG.info('Initializing LogPublisher <%s>', self)
|
||||
|
||||
@staticmethod
|
||||
|
@ -143,6 +164,11 @@ class LogPublisher(object):
|
|||
if size_diff > 1:
|
||||
truncate_by = size_diff + _TRUNCATED_PROPERTY_SIZE
|
||||
|
||||
self._logs_truncated_gauge.send(
|
||||
name=None,
|
||||
value=truncate_by
|
||||
)
|
||||
|
||||
LOG.warn(('Detected message that exceeds %d bytes,'
|
||||
'message will be truncated by %d bytes'),
|
||||
max_size,
|
||||
|
@ -157,6 +183,8 @@ class LogPublisher(object):
|
|||
# will just transform message once again without truncation
|
||||
return rest_utils.as_json(envelope)
|
||||
|
||||
self._logs_truncated_gauge.send(name=None, value=0)
|
||||
|
||||
return msg_str
|
||||
|
||||
def send_message(self, messages):
|
||||
|
@ -192,15 +220,16 @@ class LogPublisher(object):
|
|||
msg = self._truncate(message)
|
||||
send_messages.append(msg)
|
||||
|
||||
for topic in self._topics:
|
||||
self._kafka_publisher.publish(topic, send_messages)
|
||||
with self._publish_time_ms.time(name=None):
|
||||
self._publish(send_messages)
|
||||
|
||||
sent_counter = to_sent_counter
|
||||
sent_counter = len(send_messages)
|
||||
except Exception as ex:
|
||||
LOG.error('Failure in publishing messages to kafka')
|
||||
LOG.exception(ex)
|
||||
raise ex
|
||||
finally:
|
||||
self._logs_published_counter.increment(value=sent_counter)
|
||||
if sent_counter == to_sent_counter:
|
||||
LOG.info('Successfully published all [%d] messages',
|
||||
sent_counter)
|
||||
|
@ -209,3 +238,23 @@ class LogPublisher(object):
|
|||
error_str = ('Failed to sent all messages, %d '
|
||||
'messages out of %d have not been published')
|
||||
LOG.error(error_str, failed_to_send, to_sent_counter)
|
||||
|
||||
self._logs_lost_counter.increment(
|
||||
value=failed_to_send
|
||||
)
|
||||
|
||||
def _publish(self, messages):
|
||||
to_sent = len(messages)
|
||||
|
||||
LOG.debug('Publishing %d messages', to_sent)
|
||||
|
||||
try:
|
||||
for topic in self._topics:
|
||||
self._kafka_publisher.publish(
|
||||
topic,
|
||||
messages
|
||||
)
|
||||
LOG.debug('Sent %d messages to topic %s', to_sent, topic)
|
||||
except Exception as ex:
|
||||
raise falcon.HTTPServiceUnavailable('Service unavailable',
|
||||
ex.message, 60)
|
||||
|
|
|
@ -39,33 +39,45 @@ class Logs(logs_api.LogsApi):
|
|||
|
||||
@falcon.deprecated(_DEPRECATED_INFO)
|
||||
def on_post(self, req, res):
|
||||
validation.validate_cross_tenant(
|
||||
tenant_id=req.get_header(*headers.X_TENANT_ID),
|
||||
cross_tenant_id=req.get_param('tenant_id'),
|
||||
roles=req.get_header(*headers.X_ROLES)
|
||||
)
|
||||
validation.validate_payload_size(req)
|
||||
validation.validate_content_type(req, Logs.SUPPORTED_CONTENT_TYPES)
|
||||
with self._logs_processing_time.time(name=None):
|
||||
try:
|
||||
validation.validate_payload_size(req)
|
||||
validation.validate_content_type(req,
|
||||
Logs.SUPPORTED_CONTENT_TYPES)
|
||||
validation.validate_cross_tenant(
|
||||
tenant_id=req.get_header(*headers.X_TENANT_ID),
|
||||
cross_tenant_id=req.get_param('tenant_id'),
|
||||
roles=req.get_header(*headers.X_ROLES)
|
||||
)
|
||||
|
||||
cross_tenant_id = req.get_param('tenant_id')
|
||||
tenant_id = req.get_header(*headers.X_TENANT_ID)
|
||||
cross_tenant_id = req.get_param('tenant_id')
|
||||
tenant_id = req.get_header(*headers.X_TENANT_ID)
|
||||
|
||||
log = self.get_log(request=req)
|
||||
envelope = self.get_envelope(
|
||||
log=log,
|
||||
tenant_id=tenant_id if tenant_id else cross_tenant_id
|
||||
)
|
||||
log = self.get_log(request=req)
|
||||
envelope = self.get_envelope(
|
||||
log=log,
|
||||
tenant_id=tenant_id if tenant_id else cross_tenant_id
|
||||
)
|
||||
|
||||
self._kafka_publisher.send_message(envelope)
|
||||
self._logs_size_gauge.send(name=None,
|
||||
value=int(req.content_length))
|
||||
self._logs_in_counter.increment()
|
||||
except Exception:
|
||||
# any validation that failed means
|
||||
# log is invalid and rejected
|
||||
self._logs_rejected_counter.increment()
|
||||
raise
|
||||
|
||||
res.status = falcon.HTTP_204
|
||||
res.add_link(
|
||||
target=str(_get_v3_link(req)),
|
||||
rel='current', # [RFC5005]
|
||||
title='V3 Logs',
|
||||
type_hint='application/json'
|
||||
)
|
||||
res.append_header('DEPRECATED', 'true')
|
||||
self._kafka_publisher.send_message(envelope)
|
||||
|
||||
res.status = falcon.HTTP_204
|
||||
res.add_link(
|
||||
target=str(_get_v3_link(req)),
|
||||
rel='current', # [RFC5005]
|
||||
title='V3 Logs',
|
||||
type_hint='application/json'
|
||||
)
|
||||
res.append_header('DEPRECATED', 'true')
|
||||
|
||||
def get_envelope(self, log, tenant_id):
|
||||
return self._log_creator.new_log_envelope(
|
||||
|
|
|
@ -20,6 +20,7 @@ from oslo_log import log
|
|||
from monasca_log_api.api import exceptions
|
||||
from monasca_log_api.api import headers
|
||||
from monasca_log_api.api import logs_api
|
||||
from monasca_log_api.monitoring import metrics
|
||||
from monasca_log_api.reference.common import log_publisher
|
||||
from monasca_log_api.reference.common import model
|
||||
from monasca_log_api.reference.common import validation
|
||||
|
@ -38,48 +39,80 @@ class Logs(logs_api.LogsApi):
|
|||
super(Logs, self).__init__()
|
||||
self._log_publisher = log_publisher.LogPublisher()
|
||||
|
||||
def on_post(self, req, res):
|
||||
validation.validate_payload_size(req)
|
||||
validation.validate_content_type(req, Logs.SUPPORTED_CONTENT_TYPES)
|
||||
|
||||
cross_tenant_id = req.get_param('tenant_id')
|
||||
tenant_id = req.get_header(*headers.X_TENANT_ID)
|
||||
validation.validate_cross_tenant(
|
||||
tenant_id=tenant_id,
|
||||
cross_tenant_id=cross_tenant_id,
|
||||
roles=req.get_header(*headers.X_ROLES)
|
||||
self._bulks_rejected_counter = self._statsd.get_counter(
|
||||
name=metrics.LOGS_BULKS_REJECTED_METRIC,
|
||||
dimensions=self._metrics_dimensions
|
||||
)
|
||||
|
||||
request_body = helpers.read_json_msg_body(req)
|
||||
log_list = self._get_logs(request_body)
|
||||
global_dimensions = self._get_global_dimensions(request_body)
|
||||
|
||||
envelopes = []
|
||||
for log_element in log_list:
|
||||
def on_post(self, req, res):
|
||||
with self._logs_processing_time.time(name=None):
|
||||
try:
|
||||
LOG.trace('Processing log %s', log_element)
|
||||
validation.validate_payload_size(req)
|
||||
validation.validate_content_type(req,
|
||||
Logs.SUPPORTED_CONTENT_TYPES)
|
||||
|
||||
validation.validate_log_message(log_element)
|
||||
cross_tenant_id = req.get_param('tenant_id')
|
||||
tenant_id = req.get_header(*headers.X_TENANT_ID)
|
||||
|
||||
dimensions = self._get_dimensions(log_element,
|
||||
global_dimensions)
|
||||
envelope = self._create_log_envelope(tenant_id,
|
||||
cross_tenant_id,
|
||||
dimensions,
|
||||
log_element)
|
||||
envelopes.append(envelope)
|
||||
validation.validate_cross_tenant(
|
||||
tenant_id=tenant_id,
|
||||
cross_tenant_id=cross_tenant_id,
|
||||
roles=req.get_header(*headers.X_ROLES)
|
||||
)
|
||||
|
||||
LOG.trace('Log %s processed into envelope %s',
|
||||
log_element,
|
||||
envelope)
|
||||
request_body = helpers.read_json_msg_body(req)
|
||||
|
||||
log_list = self._get_logs(request_body)
|
||||
global_dimensions = self._get_global_dimensions(request_body)
|
||||
|
||||
except Exception as ex:
|
||||
LOG.error('Entire bulk package has been rejected')
|
||||
LOG.exception(ex)
|
||||
|
||||
self._bulks_rejected_counter.increment(value=1)
|
||||
|
||||
raise ex
|
||||
|
||||
self._bulks_rejected_counter.increment(value=0)
|
||||
self._logs_size_gauge.send(name=None,
|
||||
value=int(req.content_length))
|
||||
|
||||
envelopes = []
|
||||
try:
|
||||
for log_element in log_list:
|
||||
LOG.trace('Processing log %s', log_element)
|
||||
|
||||
validation.validate_log_message(log_element)
|
||||
|
||||
dimensions = self._get_dimensions(log_element,
|
||||
global_dimensions)
|
||||
envelope = self._create_log_envelope(tenant_id,
|
||||
cross_tenant_id,
|
||||
dimensions,
|
||||
log_element)
|
||||
envelopes.append(envelope)
|
||||
|
||||
LOG.trace('Log %s processed into envelope %s',
|
||||
log_element,
|
||||
envelope)
|
||||
except Exception as ex:
|
||||
LOG.error('Failed to process log %s', log_element)
|
||||
LOG.exception(ex)
|
||||
res.status = getattr(ex, 'status', falcon.HTTP_500)
|
||||
return
|
||||
finally:
|
||||
rejected_logs = len(envelopes) - len(log_list)
|
||||
# if entire bulk is rejected because of single error
|
||||
# that means only one counter must be called
|
||||
if rejected_logs < 0:
|
||||
self._logs_rejected_counter.increment(value=len(log_list))
|
||||
else:
|
||||
self._logs_in_counter.increment(value=len(log_list))
|
||||
|
||||
self._send_logs(envelopes)
|
||||
res.status = falcon.HTTP_204
|
||||
# at this point only possible metrics regard
|
||||
# publishing phase
|
||||
self._send_logs(envelopes)
|
||||
res.status = falcon.HTTP_204
|
||||
|
||||
def _get_dimensions(self, log_element, global_dims):
|
||||
"""Get the dimensions in the log element."""
|
||||
|
|
|
@ -188,6 +188,9 @@ class TestSendMessage(testing.TestBase):
|
|||
[json_msg])
|
||||
|
||||
|
||||
@mock.patch(
|
||||
'monasca_log_api.reference.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
class TestTruncation(testing.TestBase):
|
||||
EXTRA_CHARS_SIZE = len(bytearray(ujson.dumps({
|
||||
'log': {
|
||||
|
@ -203,24 +206,15 @@ class TestTruncation(testing.TestBase):
|
|||
super(TestTruncation, self).setUp()
|
||||
self._conf = base.mock_config(self)
|
||||
|
||||
@mock.patch(
|
||||
'monasca_log_api.reference.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
def test_should_not_truncate_message_if_size_is_smaller(self, _):
|
||||
diff_size = random.randint(1, 100)
|
||||
self._run_truncate_test(log_size_factor=-diff_size,
|
||||
truncate_by=0)
|
||||
|
||||
@mock.patch(
|
||||
'monasca_log_api.reference.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
def test_should_not_truncate_message_if_size_equal_to_max(self, _):
|
||||
self._run_truncate_test(log_size_factor=0,
|
||||
truncate_by=0)
|
||||
|
||||
@mock.patch(
|
||||
'monasca_log_api.reference.common.log_publisher.producer'
|
||||
'.KafkaProducer')
|
||||
def test_should_truncate_too_big_message(self, _):
|
||||
diff_size = random.randint(1, 100)
|
||||
max_size = 1000
|
||||
|
@ -258,6 +252,7 @@ class TestTruncation(testing.TestBase):
|
|||
}
|
||||
|
||||
instance = log_publisher.LogPublisher()
|
||||
instance._logs_truncated_gauge.send = meter = mock.Mock()
|
||||
|
||||
envelope_copy = copy.deepcopy(envelope)
|
||||
json_envelope = instance._truncate(envelope_copy)
|
||||
|
@ -275,3 +270,5 @@ class TestTruncation(testing.TestBase):
|
|||
parsed_log_message)
|
||||
|
||||
self.assertEqual(expected_log_message_size, parsed_log_message_len)
|
||||
self.assertEqual(1, meter.call_count)
|
||||
self.assertEqual(truncate_by, meter.mock_calls[0][2]['value'])
|
||||
|
|
|
@ -74,7 +74,8 @@ class TestLogs(testing.TestBase):
|
|||
method='POST',
|
||||
query_string='tenant_id=1',
|
||||
headers={
|
||||
'Content-Type': 'application/json'
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': '0'
|
||||
}
|
||||
)
|
||||
self.assertEqual(falcon.HTTP_403, self.srmock.status)
|
||||
|
|
|
@ -12,16 +12,188 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import random
|
||||
import string
|
||||
import unittest
|
||||
|
||||
from falcon import testing
|
||||
import mock
|
||||
import ujson as json
|
||||
|
||||
from monasca_log_api.api import exceptions as log_api_exceptions
|
||||
from monasca_log_api.api import headers
|
||||
from monasca_log_api.api import logs_api
|
||||
from monasca_log_api.reference.v3 import logs
|
||||
|
||||
|
||||
class TestLogsVersion(unittest.TestCase):
|
||||
ENDPOINT = '/logs'
|
||||
TENANT_ID = 'bob'
|
||||
|
||||
|
||||
def _init_resource(test):
|
||||
resource = logs.Logs()
|
||||
test.api.add_route(ENDPOINT, resource)
|
||||
return resource
|
||||
|
||||
|
||||
def _generate_unique_message(size):
|
||||
letters = string.ascii_lowercase
|
||||
|
||||
def rand(amount, space=True):
|
||||
space = ' ' if space else ''
|
||||
return ''.join((random.choice(letters + space) for _ in range(amount)))
|
||||
|
||||
return rand(size)
|
||||
|
||||
|
||||
def _generate_v3_payload(log_count):
|
||||
v3_logs = [{
|
||||
'message': _generate_unique_message(100),
|
||||
'dimensions': {
|
||||
'hostname': 'host_%d' % it,
|
||||
'component': 'component_%d' % it,
|
||||
'service': 'service_%d' % it
|
||||
}
|
||||
} for it in xrange(log_count)]
|
||||
v3_body = {
|
||||
'dimensions': {
|
||||
'origin': __name__
|
||||
},
|
||||
'logs': v3_logs
|
||||
}
|
||||
|
||||
return v3_body, v3_logs
|
||||
|
||||
|
||||
class TestLogsVersion(unittest.TestCase):
|
||||
@mock.patch('monasca_log_api.reference.v3.logs.log_publisher'
|
||||
'.LogPublisher')
|
||||
def test_should_return_v3_as_version(self, _):
|
||||
logs_resource = logs.Logs()
|
||||
self.assertEqual('v3.0', logs_resource.version)
|
||||
|
||||
|
||||
class TestLogsMonitoring(testing.TestBase):
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_monitor_bulk_rejected(self, _):
|
||||
resource = _init_resource(self)
|
||||
|
||||
resource._logs_in_counter = in_counter = mock.Mock()
|
||||
resource._logs_rejected_counter = rejected_counter = mock.Mock()
|
||||
resource._bulks_rejected_counter = bulk_counter = mock.Mock()
|
||||
resource._logs_size_gauge = size_gauge = mock.Mock()
|
||||
|
||||
resource._get_logs = mock.Mock(
|
||||
side_effect=log_api_exceptions.HTTPUnprocessableEntity(''))
|
||||
|
||||
log_count = 1
|
||||
v3_body, _ = _generate_v3_payload(log_count)
|
||||
payload = json.dumps(v3_body)
|
||||
content_length = len(payload)
|
||||
|
||||
self.simulate_request(
|
||||
ENDPOINT,
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
headers.X_TENANT_ID.name: TENANT_ID,
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': str(content_length)
|
||||
},
|
||||
body=payload
|
||||
)
|
||||
|
||||
self.assertEqual(1, bulk_counter.increment.call_count)
|
||||
self.assertEqual(0, in_counter.increment.call_count)
|
||||
self.assertEqual(0, rejected_counter.increment.call_count)
|
||||
self.assertEqual(0, size_gauge.send.call_count)
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_monitor_not_all_logs_ok(self, _):
|
||||
resource = _init_resource(self)
|
||||
|
||||
resource._logs_in_counter = in_counter = mock.Mock()
|
||||
resource._logs_rejected_counter = rejected_counter = mock.Mock()
|
||||
resource._bulks_rejected_counter = bulk_counter = mock.Mock()
|
||||
resource._logs_size_gauge = size_gauge = mock.Mock()
|
||||
|
||||
log_count = 5
|
||||
reject_logs = 1
|
||||
v3_body, _ = _generate_v3_payload(log_count)
|
||||
payload = json.dumps(v3_body)
|
||||
content_length = len(payload)
|
||||
|
||||
side_effects = [{} for __ in xrange(log_count - reject_logs)]
|
||||
side_effects.append(log_api_exceptions.HTTPUnprocessableEntity(''))
|
||||
|
||||
resource._get_dimensions = mock.Mock(side_effect=side_effects)
|
||||
|
||||
self.simulate_request(
|
||||
ENDPOINT,
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
headers.X_TENANT_ID.name: TENANT_ID,
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': str(content_length)
|
||||
},
|
||||
body=payload
|
||||
)
|
||||
|
||||
self.assertEqual(1, bulk_counter.increment.call_count)
|
||||
self.assertEqual(0,
|
||||
bulk_counter.increment.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(0, in_counter.increment.call_count)
|
||||
|
||||
self.assertEqual(1, rejected_counter.increment.call_count)
|
||||
self.assertEqual(log_count,
|
||||
rejected_counter.increment.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(1, size_gauge.send.call_count)
|
||||
self.assertEqual(content_length,
|
||||
size_gauge.send.mock_calls[0][2]['value'])
|
||||
|
||||
@mock.patch('monasca_log_api.reference.common.log_publisher.LogPublisher')
|
||||
def test_monitor_all_logs_ok(self, _):
|
||||
resource = _init_resource(self)
|
||||
|
||||
resource._logs_in_counter = in_counter = mock.Mock()
|
||||
resource._logs_rejected_counter = rejected_counter = mock.Mock()
|
||||
resource._bulks_rejected_counter = bulk_counter = mock.Mock()
|
||||
resource._logs_size_gauge = size_gauge = mock.Mock()
|
||||
|
||||
resource._send_logs = mock.Mock()
|
||||
|
||||
log_count = 10
|
||||
|
||||
v3_body, _ = _generate_v3_payload(log_count)
|
||||
|
||||
payload = json.dumps(v3_body)
|
||||
content_length = len(payload)
|
||||
self.simulate_request(
|
||||
ENDPOINT,
|
||||
method='POST',
|
||||
headers={
|
||||
headers.X_ROLES.name: logs_api.MONITORING_DELEGATE_ROLE,
|
||||
headers.X_TENANT_ID.name: TENANT_ID,
|
||||
'Content-Type': 'application/json',
|
||||
'Content-Length': str(content_length)
|
||||
},
|
||||
body=payload
|
||||
)
|
||||
|
||||
self.assertEqual(1, bulk_counter.increment.call_count)
|
||||
self.assertEqual(0,
|
||||
bulk_counter.increment.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(1, in_counter.increment.call_count)
|
||||
self.assertEqual(log_count,
|
||||
in_counter.increment.mock_calls[0][2]['value'])
|
||||
|
||||
self.assertEqual(0, rejected_counter.increment.call_count)
|
||||
|
||||
self.assertEqual(1, size_gauge.send.call_count)
|
||||
self.assertEqual(content_length,
|
||||
size_gauge.send.mock_calls[0][2]['value'])
|
||||
|
|
|
@ -0,0 +1,51 @@
|
|||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
||||
from monasca_log_api.monitoring import client
|
||||
|
||||
|
||||
class TestMonitoring(unittest.TestCase):
|
||||
@mock.patch('monasca_log_api.monitoring.client.monascastatsd')
|
||||
def test_should_use_default_dimensions_if_none_specified(self,
|
||||
monascastatsd):
|
||||
client.get_client()
|
||||
|
||||
statsd_client = monascastatsd.Client
|
||||
|
||||
expected_dimensions = client._DEFAULT_DIMENSIONS
|
||||
actual_dimensions = statsd_client.call_args[1]['dimensions']
|
||||
|
||||
self.assertEqual(1, statsd_client.call_count)
|
||||
self.assertEqual(expected_dimensions, actual_dimensions)
|
||||
|
||||
@mock.patch('monasca_log_api.monitoring.client.monascastatsd')
|
||||
def test_should_not_override_fixed_dimensions(self,
|
||||
monascastatsd):
|
||||
dims = {
|
||||
'service': 'foo',
|
||||
'component': 'bar'
|
||||
}
|
||||
|
||||
client.get_client(dims)
|
||||
|
||||
statsd_client = monascastatsd.Client
|
||||
|
||||
expected_dimensions = client._DEFAULT_DIMENSIONS
|
||||
actual_dimensions = statsd_client.call_args[1]['dimensions']
|
||||
|
||||
self.assertEqual(1, statsd_client.call_count)
|
||||
self.assertEqual(expected_dimensions, actual_dimensions)
|
|
@ -12,3 +12,4 @@ pbr>=1.6.0
|
|||
simport
|
||||
monasca-common>=0.0.6
|
||||
eventlet>=0.17.4,!=0.18.0
|
||||
monasca-statsd
|
||||
|
|
Loading…
Reference in New Issue