Add config option for statsd
Add new section to config file with configuration for statsd host and port. Add modification in notification engines. According to the monasca-agent change: https://review.openstack.org/#/c/381417 Change-Id: I6e99bae5e50f3670ae8b4a0c7ac5af3c747bcd68
This commit is contained in:
parent
02e52aef27
commit
722ecaa7a5
@ -62,7 +62,8 @@ sent out multiple times. To minimize this risk a number of techniques are used:
|
||||
Yaml config file by default is in '/etc/monasca/notification.yaml', a sample is in this project.
|
||||
|
||||
## Monitoring
|
||||
statsd is incorporated into the daemon and will send all stats to localhost on udp port 8125.
|
||||
statsd is incorporated into the daemon and will send all stats to statsd server launched by monasca-agent.
|
||||
Default host and port points at **localhost:8125**.
|
||||
|
||||
- Counters
|
||||
- ConsumedFromKafka
|
||||
|
@ -1,5 +1,6 @@
|
||||
# (C) Copyright 2016 Hewlett Packard Enterprise Development LP
|
||||
#
|
||||
# Copyright 2016 FUJITSU LIMITED
|
||||
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
@ -13,6 +14,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
import monascastatsd
|
||||
|
||||
from monasca_common.simport import simport
|
||||
|
||||
@ -21,6 +23,9 @@ from monasca_notification.notification import Notification
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
NOTIFICATION_DIMENSIONS = {'service': 'monitoring',
|
||||
'component': 'monasca-notification'}
|
||||
|
||||
|
||||
def get_db_repo(config):
|
||||
if 'database' in config and 'repo_driver' in config['database']:
|
||||
@ -69,3 +74,17 @@ def grab_stored_notification_method(db_repo, notification_id):
|
||||
stored_notification = db_repo.get_notification(notification_id)
|
||||
|
||||
return stored_notification
|
||||
|
||||
|
||||
def get_statsd_client(config, dimensions=None):
|
||||
local_dims = dimensions.copy() if dimensions else {}
|
||||
local_dims.update(NOTIFICATION_DIMENSIONS)
|
||||
if 'statsd' in config:
|
||||
client = monascastatsd.Client(name='monasca',
|
||||
host=config['statsd'].get('host', 'localhost'),
|
||||
port=config['statsd'].get('port', 8125),
|
||||
dimensions=local_dims)
|
||||
else:
|
||||
client = monascastatsd.Client(name='monasca',
|
||||
dimensions=local_dims)
|
||||
return client
|
||||
|
@ -14,13 +14,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import monascastatsd
|
||||
import time
|
||||
|
||||
from monasca_common.kafka import consumer
|
||||
from monasca_common.kafka import producer
|
||||
from monasca_notification.common.utils import get_statsd_client
|
||||
from processors.alarm_processor import AlarmProcessor
|
||||
from processors.base import BaseProcessor
|
||||
from processors.notification_processor import NotificationProcessor
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -31,8 +30,8 @@ class NotificationEngine(object):
|
||||
self._topics = {}
|
||||
self._topics['notification_topic'] = config['kafka']['notification_topic']
|
||||
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
|
||||
self._statsd = monascastatsd.Client(name='monasca',
|
||||
dimensions=BaseProcessor.dimensions)
|
||||
|
||||
self._statsd = get_statsd_client(config)
|
||||
self._consumer = consumer.KafkaConsumer(
|
||||
config['kafka']['url'],
|
||||
config['zookeeper']['url'],
|
||||
|
@ -15,7 +15,6 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
import monascastatsd
|
||||
import time
|
||||
|
||||
from monasca_common.kafka import consumer
|
||||
@ -23,7 +22,7 @@ from monasca_common.kafka import producer
|
||||
from monasca_notification.common.repositories import exceptions
|
||||
from monasca_notification.common.utils import construct_notification_object
|
||||
from monasca_notification.common.utils import get_db_repo
|
||||
from processors import base
|
||||
from monasca_notification.common.utils import get_statsd_client
|
||||
from processors import notification_processor
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -33,9 +32,7 @@ class PeriodicEngine(object):
|
||||
def __init__(self, config, period):
|
||||
self._topic_name = config['kafka']['periodic'][period]
|
||||
|
||||
self._statsd = monascastatsd.Client(
|
||||
name='monasca',
|
||||
dimensions=base.BaseProcessor.dimensions)
|
||||
self._statsd = get_statsd_client(config)
|
||||
|
||||
zookeeper_path = config['zookeeper']['periodic_path'][period]
|
||||
self._consumer = consumer.KafkaConsumer(config['kafka']['url'],
|
||||
|
@ -15,25 +15,22 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
import monascastatsd
|
||||
import time
|
||||
|
||||
from monasca_notification.common.repositories import exceptions as exc
|
||||
from monasca_notification.common.utils import get_db_repo
|
||||
from monasca_notification.common.utils import get_statsd_client
|
||||
from monasca_notification import notification
|
||||
from monasca_notification import notification_exceptions
|
||||
from monasca_notification.processors import base
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AlarmProcessor(base.BaseProcessor):
|
||||
class AlarmProcessor(object):
|
||||
def __init__(self, alarm_ttl, config):
|
||||
self._alarm_ttl = alarm_ttl
|
||||
self._statsd = monascastatsd.Client(
|
||||
name='monasca',
|
||||
dimensions=base.BaseProcessor.dimensions)
|
||||
self._statsd = get_statsd_client(config)
|
||||
self._db_repo = get_db_repo(config)
|
||||
|
||||
@staticmethod
|
||||
|
@ -1,19 +0,0 @@
|
||||
# (C) Copyright 2014-2015 Hewlett Packard Enterprise Development Company LP
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
class BaseProcessor(object):
|
||||
|
||||
dimensions = {'service': 'monitoring', 'component': 'monasca-notification'}
|
@ -14,19 +14,18 @@
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import monascastatsd
|
||||
|
||||
from monasca_notification.common.utils import get_db_repo
|
||||
from monasca_notification.processors import base
|
||||
from monasca_notification.common.utils import get_statsd_client
|
||||
from monasca_notification.types import notifiers
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NotificationProcessor(base.BaseProcessor):
|
||||
class NotificationProcessor(object):
|
||||
|
||||
def __init__(self, config):
|
||||
self.statsd = monascastatsd.Client(name='monasca', dimensions=base.BaseProcessor.dimensions)
|
||||
self.statsd = get_statsd_client(config)
|
||||
notifiers.init(self.statsd)
|
||||
notifiers.load_plugins(config['notification_types'])
|
||||
notifiers.config(config['notification_types'])
|
||||
|
@ -15,14 +15,13 @@
|
||||
|
||||
import json
|
||||
import logging
|
||||
import monascastatsd
|
||||
import time
|
||||
|
||||
from monasca_common.kafka import consumer
|
||||
from monasca_common.kafka import producer
|
||||
from monasca_notification.common.utils import construct_notification_object
|
||||
from monasca_notification.common.utils import get_db_repo
|
||||
from processors import base
|
||||
from monasca_notification.common.utils import get_statsd_client
|
||||
from processors import notification_processor
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@ -37,9 +36,7 @@ class RetryEngine(object):
|
||||
self._topics['notification_topic'] = config['kafka']['notification_topic']
|
||||
self._topics['retry_topic'] = config['kafka']['notification_retry_topic']
|
||||
|
||||
self._statsd = monascastatsd.Client(
|
||||
name='monasca',
|
||||
dimensions=base.BaseProcessor.dimensions)
|
||||
self._statsd = get_statsd_client(config)
|
||||
|
||||
self._consumer = consumer.KafkaConsumer(
|
||||
config['kafka']['url'],
|
||||
|
@ -115,3 +115,6 @@ logging: # Used in logging.dictConfig
|
||||
handlers:
|
||||
- console
|
||||
level: DEBUG
|
||||
statsd:
|
||||
host: 'localhost'
|
||||
port: 8125
|
||||
|
@ -55,14 +55,16 @@ class TestAlarmProcessor(unittest.TestCase):
|
||||
mock_mysql.cursor.return_value = mock_mysql
|
||||
mock_mysql.__iter__.return_value = sql_response
|
||||
|
||||
mysql_config = {'mysql': {'ssl': None,
|
||||
'host': 'mysql_host',
|
||||
'port': 'mysql_port',
|
||||
'user': 'mysql_user',
|
||||
'db': 'dbname',
|
||||
'passwd': 'mysql_passwd'}}
|
||||
config = {'mysql': {'ssl': None,
|
||||
'host': 'mysql_host',
|
||||
'port': 'mysql_port',
|
||||
'user': 'mysql_user',
|
||||
'db': 'dbname',
|
||||
'passwd': 'mysql_passwd'},
|
||||
'statsd': {'host': 'localhost',
|
||||
'port': 8125}}
|
||||
|
||||
processor = alarm_processor.AlarmProcessor(600, mysql_config)
|
||||
processor = alarm_processor.AlarmProcessor(600, config)
|
||||
|
||||
return processor.to_notification(alarm)
|
||||
|
||||
|
@ -54,6 +54,9 @@ class TestNotificationProcessor(unittest.TestCase):
|
||||
'db': 'dbname',
|
||||
'passwd': 'mysql_passwd'}
|
||||
|
||||
self.statsd_config = {'host': 'localhost',
|
||||
'port': 8125}
|
||||
|
||||
def tearDown(self):
|
||||
pass
|
||||
|
||||
@ -61,7 +64,7 @@ class TestNotificationProcessor(unittest.TestCase):
|
||||
# Test helper functions
|
||||
# ------------------------------------------------------------------------
|
||||
@mock.patch('pymysql.connect')
|
||||
@mock.patch('monasca_notification.processors.notification_processor.monascastatsd')
|
||||
@mock.patch('monasca_notification.common.utils.monascastatsd')
|
||||
@mock.patch('monasca_notification.types.notifiers.email_notifier.smtplib')
|
||||
@mock.patch('monasca_notification.processors.notification_processor.notifiers.log')
|
||||
def _start_processor(self, notifications, mock_log, mock_smtp, mock_statsd, mock_pymsql):
|
||||
@ -76,6 +79,7 @@ class TestNotificationProcessor(unittest.TestCase):
|
||||
config = {}
|
||||
config["email"] = self.email_config
|
||||
config["mysql"] = self.mysql_config
|
||||
config["statsd"] = self.statsd_config
|
||||
config["notification_types"] = {}
|
||||
|
||||
processor = (notification_processor.NotificationProcessor(config))
|
||||
|
53
tests/test_utils.py
Normal file
53
tests/test_utils.py
Normal file
@ -0,0 +1,53 @@
|
||||
# Copyright 2016 FUJITSU LIMITED
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from mock import patch
|
||||
import unittest
|
||||
|
||||
from monasca_notification.common import utils
|
||||
|
||||
|
||||
class TestStatsdConnection(unittest.TestCase):
|
||||
extra_dimensions = {'foo': 'bar'}
|
||||
base_name = 'monasca'
|
||||
|
||||
def test_statsd_default_connection(self):
|
||||
config = {}
|
||||
with patch(
|
||||
'monasca_notification.common.utils.monascastatsd.Client') as c:
|
||||
utils.get_statsd_client(config)
|
||||
c.assert_called_once_with(dimensions=utils.NOTIFICATION_DIMENSIONS,
|
||||
name=self.base_name)
|
||||
|
||||
def test_statsd_config_connection(self):
|
||||
port_number = 9999
|
||||
hostname = 'www.example.org'
|
||||
config = {'statsd': {'host': hostname, 'port': port_number}}
|
||||
with patch(
|
||||
'monasca_notification.common.utils.monascastatsd.Client') as c:
|
||||
utils.get_statsd_client(config)
|
||||
c.assert_called_once_with(dimensions=utils.NOTIFICATION_DIMENSIONS,
|
||||
name=self.base_name,
|
||||
port=port_number,
|
||||
host=hostname)
|
||||
|
||||
def test_statsd_update_dimmensions(self):
|
||||
config = {}
|
||||
expected_dimensions = utils.NOTIFICATION_DIMENSIONS.copy()
|
||||
expected_dimensions.update(self.extra_dimensions)
|
||||
with patch(
|
||||
'monasca_notification.common.utils.monascastatsd.Client') as c:
|
||||
utils.get_statsd_client(config, dimensions=self.extra_dimensions)
|
||||
c.assert_called_once_with(dimensions=expected_dimensions,
|
||||
name=self.base_name)
|
Loading…
x
Reference in New Issue
Block a user