Add notifications to Sahara

Added following notifications to Sahara:
 * Notification about creating cluster
 * Notification about deleting cluster
 * Notification about updating cluster status

Added unit tests for messaging and notify functions
Docs updated

Partially implement: blueprint ceilometer-integration

Change-Id: I7add0e477ab16f189c533857ad6a158ee41242de
This commit is contained in:
Vitaly Gridnev 2014-07-23 16:50:03 +04:00
parent 9b9c9127b4
commit 3a520be3a3
12 changed files with 400 additions and 8 deletions

View File

@ -101,15 +101,24 @@ Ubuntu 12.04 system.
# Enable Sahara
ENABLED_SERVICES+=,sahara
4. Start DevStack:
4. Also Sahara can send notifications to Ceilometer by default at DevStack, if Ceilometer is enabled.
As example, if you want to enable Ceilometer you can add following lines to file ``localrc``:
.. sourcecode:: bash
enable_service ceilometer-acompute ceilometer-acentral ceilometer-anotification ceilometer-collector
enable_service ceilometer-alarm-evaluator,ceilometer-alarm-notifier
enable_service ceilometer-api
5. Start DevStack:
.. sourcecode:: console
$ ./stack.sh
5. Once previous step is finished Devstack will print Horizon URL. Navigate to this URL and login with login "admin" and password from localrc.
6. Once previous step is finished Devstack will print Horizon URL. Navigate to this URL and login with login "admin" and password from localrc.
6. Now we need to modify security rules. It will allow to connect to VMs directly from your host. Navigate to Project -> Compute -> Access & Security tab and edit default Security Group rules by clicking on Manage Rules button. Here add following four rules by cliking the Add Rules button on the right hand side top corner:
7. Now we need to modify security rules. It will allow to connect to VMs directly from your host. Navigate to Project -> Compute -> Access & Security tab and edit default Security Group rules by clicking on Manage Rules button. Here add following four rules by cliking the Add Rules button on the right hand side top corner:
+-----------+------------+-------------+------------+------------------+
| Direction | Ether Type | IP Protocol | Port Range | Remote |
@ -124,7 +133,7 @@ Ubuntu 12.04 system.
+-----------+------------+-------------+------------+------------------+
7. Congratulations! You have OpenStack running in your VM and ready to launch VMs inside that VM :)
8. Congratulations! You have OpenStack running in your VM and ready to launch VMs inside that VM :)
Managing Sahara in DevStack

View File

@ -50,3 +50,65 @@ level for troubleshooting, there are two parameters in the config:
``verbose`` and ``debug``. If the former is set to true, Sahara will start
to write logs of INFO level and above. If ``debug`` is set to true,
Sahara will write all the logs, including the DEBUG ones.
Sahara notifications configuration
==================================
Sahara can send notifications to Ceilometer, if it's enabled.
If you want to enable notifications you should switch to ``[DEFAULT]``
section and set:
.. sourcecode:: cfg
enable_notifications = true
notification_driver = messaging
..
The current default for Sahara is to use the backend that utilizes RabbitMQ
as the message broker. You should configure your backend. It's recommended to use
Rabbit or Qpid.
If you are using Rabbit as a backend, then you should set:
.. sourcecode:: cfg
rpc_backend = rabbit
..
And after that you should specify following options:
``rabbit_host``, ``rabbit_port``, ``rabbit_userid``,
``rabbit_password``, ``rabbit_virtual_host`` and ``rabbit_hosts``.
As example you can see default values of these options:
.. sourcecode:: cfg
rabbit_host=localhost
rabbit_port=5672
rabbit_hosts=$rabbit_host:$rabbit_port
rabbit_userid=guest
rabbit_password=guest
rabbit_virtual_host=/
..
If you are using Qpid as backend, then you should set:
.. sourcecode:: cfg
rpc_backend = qpid
..
And after that you should specify following options:
``qpid_hostname``, ``qpid_port``, ``qpid_username``,
``qpid_password`` and ``qpid_hosts``.
As example you can see default values of these options:
.. sourcecode:: cfg
qpid_hostname=localhost
qpid_port=5672
qpid_hosts=$qpid_hostname:$qpid_port
qpid_username=
qpid_password=
..

View File

@ -438,6 +438,21 @@
#swift_topology_file=etc/sahara/swift.topology
#
# Options defined in sahara.utils.notification.sender
#
# Notification level for outgoing notifications (string value)
#notification_level=INFO
# Notification publisher_id for outgoing notifications (string
# value)
#notification_publisher_id=<None>
# Enables sending notifications to Ceilometer (boolean value)
#enable_notifications=false
#
# Options defined in sahara.utils.openstack.keystone
#

View File

@ -38,6 +38,7 @@ from sahara.service import ops as service_ops
from sahara.service import periodic
from sahara.utils import api as api_utils
from sahara.utils import remote
from sahara.utils import rpc as messaging
LOG = log.getLogger(__name__)
@ -74,6 +75,8 @@ def setup_common(possible_topdir, service_name):
LOG.info(_LI('Starting Sahara %s'), service_name)
messaging.setup()
if service_name != 'all-in-one':
LOG.warn(
_LW("Distributed mode is in the alpha state, it's recommended to "

View File

@ -24,6 +24,7 @@ from sahara.openstack.common import log as logging
from sahara.plugins import base as plugin_base
from sahara.plugins import provisioning
from sahara.utils import general as g
from sahara.utils.notification import sender
from sahara.utils.openstack import nova
@ -93,6 +94,8 @@ def scale_cluster(id, data):
def create_cluster(values):
ctx = context.ctx()
cluster = conductor.cluster_create(ctx, values)
sender.notify(ctx, cluster.id, cluster.name, "New",
"create")
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
# validating cluster
@ -110,12 +113,15 @@ def create_cluster(values):
def terminate_cluster(id):
g.change_cluster_status(id, "Deleting")
OPS.terminate_cluster(id)
cluster = g.change_cluster_status(id, "Deleting")
OPS.terminate_cluster(id)
sender.notify(context.ctx(), cluster.id, cluster.name, cluster.status,
"delete")
# ClusterTemplate ops
def get_cluster_templates():
return conductor.cluster_template_get_all(context.ctx())

View File

@ -0,0 +1,63 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara import context
from sahara.tests.unit import base
from sahara.utils.notification import sender
from sahara.utils import rpc as messaging
class NotificationTest(base.SaharaTestCase):
def setUp(self):
super(NotificationTest, self).setUp()
def _make_sample(self):
ctx = context.ctx()
self.ctx = ctx
self.cluster_id = 'someId'
self.cluster_name = 'someName'
self.cluster_status = 'someStatus'
sender.notify(ctx, self.cluster_id, self.cluster_name,
self.cluster_status, "update")
self.create_mock('update')
def create_mock(self, action):
self.expected = mock.call(self.ctx,
'sahara.cluster.%s' % action,
{'cluster_id': self.cluster_id,
'cluster_name': self.cluster_name,
'cluster_status': self.cluster_status,
'project_id': self.ctx.tenant_id,
'user_id': self.ctx.user_id})
@mock.patch('oslo.messaging.notify.notifier.Notifier.info')
def test_update_cluster(self, mock_notify):
self.override_config("enable_notifications", True)
messaging.setup("fake://", optional=True)
self._make_sample()
self.assertEqual([self.expected],
mock_notify.call_args_list)
if messaging.TRANSPORT:
messaging.TRANSPORT.cleanup()
messaging.TRANSPORT = messaging.NOTIFIER = None

View File

@ -0,0 +1,78 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara import main
from sahara.tests.unit import base
from sahara.utils import rpc as messaging
_ALIASES = {
'sahara.openstack.common.rpc.impl_kombu': 'rabbit',
'sahara.openstack.common.rpc.impl_qpid': 'qpid',
'sahara.openstack.common.rpc.impl_zmq': 'zmq',
}
class TestMessagingSetup(base.SaharaTestCase):
def setUp(self):
super(TestMessagingSetup, self).setUp()
self.override_config('enable_notifications', True)
def _install(self):
messaging.setup('fake://', optional=True)
self.assertNotEqual(None, messaging.TRANSPORT)
self.assertNotEqual(None, messaging.NOTIFIER)
def _remove_install(self):
if messaging.TRANSPORT:
messaging.TRANSPORT.cleanup()
messaging.TRANSPORT = messaging.NOTIFIER = None
@mock.patch('oslo.messaging.set_transport_defaults')
def test_set_defaults(self, mock_transport):
self._install()
expected = [
mock.call('sahara')
]
self.assertEqual(expected, mock_transport.call_args_list)
self._remove_install()
@mock.patch('oslo.messaging.get_transport')
def test_get_transport(self, mock_transport):
self._install()
expected = [
mock.call(main.CONF, 'fake://', aliases=_ALIASES)
]
self.assertEqual(expected, mock_transport.call_args_list)
self._remove_install()
@mock.patch('oslo.messaging.Notifier')
def test_notifier(self, mock_init):
self._install()
serializer = messaging.SERIALIZER
expected = [
mock.call(messaging.TRANSPORT, serializer=serializer)
]
self.assertEqual(expected, mock_init.call_args_list)
self._remove_install()

View File

@ -19,6 +19,7 @@ from sahara import conductor as c
from sahara import context
from sahara.i18n import _LI
from sahara.openstack.common import log as logging
from sahara.utils.notification import sender
conductor = c.API
LOG = logging.getLogger(__name__)
@ -69,8 +70,13 @@ def change_cluster_status(cluster, status, status_description=None):
update_dict["status_description"] = status_description
cluster = conductor.cluster_update(context.ctx(), cluster, update_dict)
LOG.info(_LI("Cluster status has been changed: id=%(id)s, New status="
"%(status)s"), {'id': cluster.id, 'status': cluster.status})
sender.notify(context.ctx(), cluster.id, cluster.name, cluster.status,
"update")
return cluster

View File

View File

@ -0,0 +1,85 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from sahara.openstack.common import log as logging
from sahara.utils import rpc as messaging
LOG = logging.getLogger(__name__)
SERVICE = 'sahara'
EVENT_TEMPLATE = "sahara.cluster.%s"
notifier_opts = [
cfg.StrOpt('notification_level',
default='INFO',
help='Notification level for outgoing notifications'),
cfg.StrOpt('notification_publisher_id',
help='Notification publisher_id for outgoing notifications'),
cfg.BoolOpt('enable_notifications',
default=False,
help='Enables sending notifications to Ceilometer')
]
CONF = cfg.CONF
CONF.register_opts(notifier_opts)
def _get_publisher():
publisher_id = CONF.notification_publisher_id
if publisher_id is None:
publisher_id = SERVICE
return publisher_id
def _notify(context, event_type, level, body):
client = messaging.get_notifier(_get_publisher())
method = getattr(client, level.lower())
method(context, event_type, body)
def _body(
cluster_id,
cluster_name,
cluster_status,
tenant_id,
user_id):
result = {
'cluster_id': cluster_id,
'cluster_name': cluster_name,
'cluster_status': cluster_status,
'project_id': tenant_id,
'user_id': user_id,
}
return result
def notify(context, cluster_id, cluster_name, cluster_status, ev_type):
"""Sends notification about creating/updating/deleting cluster."""
if not cfg.CONF.enable_notifications:
return
LOG.debug("Notification about cluster (id=%(id)s, name=%(name)s, "
"type=%(type)s, status = %(status)s) is going to be sent",
{'id': cluster_id, 'name': cluster_name, 'type': ev_type,
'status': cluster_status})
level = CONF.notification_level
_notify(context, EVENT_TEMPLATE % ev_type, level,
_body(cluster_id, cluster_name, cluster_status, context.tenant_id,
context.user_id))

View File

@ -19,12 +19,48 @@ from oslo import messaging
from sahara import context
from sahara.i18n import _LE
from sahara.i18n import _LI
from sahara.openstack.common import jsonutils
from sahara.openstack.common import log as logging
TRANSPORT = None
NOTIFIER = None
SERIALIZER = None
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
_ALIASES = {
'sahara.openstack.common.rpc.impl_kombu': 'rabbit',
'sahara.openstack.common.rpc.impl_qpid': 'qpid',
'sahara.openstack.common.rpc.impl_zmq': 'zmq',
}
class ContextSerializer(messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
return self._base.deserialize_entity(ctxt, entity)
@staticmethod
def serialize_context(ctxt):
return ctxt.to_dict()
@staticmethod
def deserialize_context(ctxt):
pass
class JsonPayloadSerializer(messaging.NoOpSerializer):
@classmethod
def serialize_entity(cls, context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
class RPCClient(object):
def __init__(self, target):
@ -48,8 +84,7 @@ class RPCServer(object):
target=target,
transport=messaging.get_transport(cfg.CONF),
endpoints=[ContextEndpointHandler(self)],
executor='eventlet'
)
executor='eventlet')
def start(self):
self.__server.start()
@ -76,3 +111,33 @@ class ContextEndpointHandler(object):
LOG.error(_LE("No %(method)s method found implemented in "
"%(class)s class"),
{'method': name, 'class': self.__endpoint})
def setup(url=None, optional=False):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER, SERIALIZER
if not cfg.CONF.enable_notifications:
LOG.info(_LI("Notifications disabled"))
return
LOG.info(_LI("Notifications enabled"))
messaging.set_transport_defaults('sahara')
SERIALIZER = ContextSerializer(JsonPayloadSerializer())
try:
TRANSPORT = messaging.get_transport(cfg.CONF, url,
aliases=_ALIASES)
except messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
raise
if TRANSPORT:
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=SERIALIZER)
def get_notifier(publisher_id):
"""Return a configured oslo.messaging notifier."""
return NOTIFIER.prepare(publisher_id=publisher_id)