Add information about the cluster in magnum event notifications

Magnum is sending notifications like cluster create but has no
details regarding the cluster, like cluster UUID. Notifications
from other OpenStack projects contain full detailed information
(e.g. instance UUID in Nova instance create notification).
Detailed notifications are important for other OpenStack
projects like Searchlight or third party projects that cache
information regarding OpenStack objects or have custom actions
running on notification. Caching systems can efficiently update
one single object (e.g. cluster), while without notifications
they need to periodically retrieve object list, which is
inefficient.

Change-Id: I820fbe0659222ba31baf43ca09d2bbb0030ed61f
Story: #2006297
Task: 36009
This commit is contained in:
Emanuel Andrecut 2019-07-11 16:49:58 +03:00
parent 451358a57c
commit e5eade03dc
6 changed files with 100 additions and 24 deletions

View File

@ -69,7 +69,8 @@ class Handler(object):
cert_manager.generate_certificates_to_cluster(cluster, cert_manager.generate_certificates_to_cluster(cluster,
context=context) context=context)
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_PENDING) context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_PENDING,
cluster)
# Get driver # Get driver
cluster_driver = driver.Driver.get_driver_for_cluster(context, cluster_driver = driver.Driver.get_driver_for_cluster(context,
cluster) cluster)
@ -82,7 +83,8 @@ class Handler(object):
cluster.status_reason = six.text_type(e) cluster.status_reason = six.text_type(e)
cluster.save() cluster.save()
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_CREATE, taxonomy.OUTCOME_FAILURE,
cluster)
if isinstance(e, exc.HTTPBadRequest): if isinstance(e, exc.HTTPBadRequest):
e = exception.InvalidParameterValue(message=six.text_type(e)) e = exception.InvalidParameterValue(message=six.text_type(e))
@ -108,7 +110,8 @@ class Handler(object):
) )
if cluster.status not in allow_update_status: if cluster.status not in allow_update_status:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE,
cluster)
operation = _('Updating a cluster when status is ' operation = _('Updating a cluster when status is '
'"%s"') % cluster.status '"%s"') % cluster.status
raise exception.NotSupported(operation=operation) raise exception.NotSupported(operation=operation)
@ -132,7 +135,8 @@ class Handler(object):
# Update cluster # Update cluster
try: try:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING,
cluster)
worker_ng.node_count = node_count worker_ng.node_count = node_count
worker_ng.save() worker_ng.save()
cluster_driver.update_cluster(context, cluster, manager, rollback) cluster_driver.update_cluster(context, cluster, manager, rollback)
@ -146,7 +150,8 @@ class Handler(object):
worker_ng.node_count = old_node_count worker_ng.node_count = old_node_count
worker_ng.save() worker_ng.save()
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE,
cluster)
if isinstance(e, exc.HTTPBadRequest): if isinstance(e, exc.HTTPBadRequest):
e = exception.InvalidParameterValue(message=six.text_type(e)) e = exception.InvalidParameterValue(message=six.text_type(e))
raise e raise e
@ -165,7 +170,8 @@ class Handler(object):
ct.coe) ct.coe)
try: try:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_PENDING) context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_PENDING,
cluster)
cluster_driver.delete_cluster(context, cluster) cluster_driver.delete_cluster(context, cluster)
cluster.status = fields.ClusterStatus.DELETE_IN_PROGRESS cluster.status = fields.ClusterStatus.DELETE_IN_PROGRESS
cluster.status_reason = None cluster.status_reason = None
@ -184,15 +190,18 @@ class Handler(object):
LOG.info('The cluster %s has been deleted by others.', LOG.info('The cluster %s has been deleted by others.',
uuid) uuid)
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_SUCCESS) context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_SUCCESS,
cluster)
return None return None
except exc.HTTPConflict: except exc.HTTPConflict:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE,
cluster)
raise exception.OperationInProgress(cluster_name=cluster.name) raise exception.OperationInProgress(cluster_name=cluster.name)
except Exception as unexp: except Exception as unexp:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_DELETE, taxonomy.OUTCOME_FAILURE,
cluster)
cluster.status = fields.ClusterStatus.DELETE_FAILED cluster.status = fields.ClusterStatus.DELETE_FAILED
cluster.status_reason = six.text_type(unexp) cluster.status_reason = six.text_type(unexp)
cluster.save() cluster.save()
@ -227,7 +236,8 @@ class Handler(object):
) )
if cluster.status not in allow_update_status: if cluster.status not in allow_update_status:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE,
cluster)
operation = _('Resizing a cluster when status is ' operation = _('Resizing a cluster when status is '
'"%s"') % cluster.status '"%s"') % cluster.status
raise exception.NotSupported(operation=operation) raise exception.NotSupported(operation=operation)
@ -248,7 +258,8 @@ class Handler(object):
nodegroup.node_count = node_count nodegroup.node_count = node_count
nodegroup.save() nodegroup.save()
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING,
cluster)
cluster_driver.resize_cluster(context, cluster, resize_manager, cluster_driver.resize_cluster(context, cluster, resize_manager,
node_count, nodes_to_remove, node_count, nodes_to_remove,
nodegroup) nodegroup)
@ -261,7 +272,8 @@ class Handler(object):
nodegroup.node_count = old_node_count nodegroup.node_count = old_node_count
nodegroup.save() nodegroup.save()
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE,
cluster)
if isinstance(e, exc.HTTPBadRequest): if isinstance(e, exc.HTTPBadRequest):
e = exception.InvalidParameterValue(message=six.text_type(e)) e = exception.InvalidParameterValue(message=six.text_type(e))
raise e raise e
@ -287,7 +299,8 @@ class Handler(object):
) )
if cluster.status not in allow_update_status: if cluster.status not in allow_update_status:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE,
cluster)
operation = _('Upgrading a cluster when status is ' operation = _('Upgrading a cluster when status is '
'"%s"') % cluster.status '"%s"') % cluster.status
raise exception.NotSupported(operation=operation) raise exception.NotSupported(operation=operation)
@ -300,7 +313,8 @@ class Handler(object):
# Upgrade cluster # Upgrade cluster
try: try:
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_PENDING,
cluster)
cluster_driver.upgrade_cluster(context, cluster, cluster_template, cluster_driver.upgrade_cluster(context, cluster, cluster_template,
max_batch_size, nodegroup, rollback) max_batch_size, nodegroup, rollback)
cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS cluster.status = fields.ClusterStatus.UPDATE_IN_PROGRESS
@ -310,7 +324,8 @@ class Handler(object):
cluster.status_reason = six.text_type(e) cluster.status_reason = six.text_type(e)
cluster.save() cluster.save()
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE) context, taxonomy.ACTION_UPDATE, taxonomy.OUTCOME_FAILURE,
cluster)
if isinstance(e, exc.HTTPBadRequest): if isinstance(e, exc.HTTPBadRequest):
e = exception.InvalidParameterValue(message=six.text_type(e)) e = exception.InvalidParameterValue(message=six.text_type(e))
raise e raise e

View File

@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
from oslo_utils import uuidutils from oslo_utils import uuidutils
from pycadf import attachment
from pycadf import cadftaxonomy as taxonomy from pycadf import cadftaxonomy as taxonomy
from pycadf import cadftype from pycadf import cadftype
from pycadf import eventfactory from pycadf import eventfactory
@ -98,19 +99,58 @@ def _get_request_audit_info(context):
return initiator return initiator
def notify_about_cluster_operation(context, action, outcome): def _get_event_target(cluster_obj=None):
if cluster_obj:
target = resource.Resource(
id=cluster_obj.uuid,
name=cluster_obj.name,
typeURI='service/magnum/cluster'
)
target.add_attachment(attach_val=attachment.Attachment(
typeURI='service/magnum/cluster',
content={
'status': cluster_obj.status,
'status_reason': cluster_obj.status_reason,
'project_id': cluster_obj.project_id,
'created_at': cluster_obj.created_at,
'updated_at': cluster_obj.updated_at,
'cluster_template_id': cluster_obj.cluster_template_id,
'keypair': cluster_obj.keypair,
'docker_volume_size:': cluster_obj.docker_volume_size,
'labels': cluster_obj.labels,
'master_flavor_id': cluster_obj.master_flavor_id,
'flavor_id': cluster_obj.flavor_id,
'stack_id': cluster_obj.stack_id,
'health_status': cluster_obj.health_status,
'create_timeout': cluster_obj.create_timeout,
'api_address': cluster_obj.api_address,
'discovery_url': cluster_obj.discovery_url,
'node_addresses': cluster_obj.node_addresses,
'master_addresses': cluster_obj.master_addresses,
'node_count': cluster_obj.node_count,
'master_count': cluster_obj.master_count,
},
name='cluster_data'
))
return target
return resource.Resource(typeURI='service/magnum/cluster')
def notify_about_cluster_operation(context, action, outcome, cluster_obj=None):
"""Send a notification about cluster operation. """Send a notification about cluster operation.
:param action: CADF action being audited :param action: CADF action being audited
:param outcome: CADF outcome :param outcome: CADF outcome
:param cluster_obj: the cluster the notification is related to
""" """
notifier = rpc.get_notifier() notifier = rpc.get_notifier()
event = eventfactory.EventFactory().new_event( event = eventfactory.EventFactory().new_event(
eventType=cadftype.EVENTTYPE_ACTIVITY, eventType=cadftype.EVENTTYPE_ACTIVITY,
outcome=outcome, outcome=outcome,
action=action, action=action,
initiator=_get_request_audit_info(context), initiator=_get_request_audit_info(context),
target=resource.Resource(typeURI='service/magnum/cluster'), target=_get_event_target(cluster_obj=cluster_obj),
observer=resource.Resource(typeURI='service/magnum/cluster')) observer=resource.Resource(typeURI='service/magnum/cluster'))
service = 'magnum' service = 'magnum'
event_type = '%(service)s.cluster.%(action)s' % { event_type = '%(service)s.cluster.%(action)s' % {

View File

@ -76,11 +76,11 @@ class ClusterUpdateJob(object):
if self.cluster.status.endswith("_COMPLETE"): if self.cluster.status.endswith("_COMPLETE"):
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
self.ctx, self.status_to_event[self.cluster.status], self.ctx, self.status_to_event[self.cluster.status],
taxonomy.OUTCOME_SUCCESS) taxonomy.OUTCOME_SUCCESS, self.cluster)
if self.cluster.status.endswith("_FAILED"): if self.cluster.status.endswith("_FAILED"):
conductor_utils.notify_about_cluster_operation( conductor_utils.notify_about_cluster_operation(
self.ctx, self.status_to_event[self.cluster.status], self.ctx, self.status_to_event[self.cluster.status],
taxonomy.OUTCOME_FAILURE) taxonomy.OUTCOME_FAILURE, self.cluster)
# if we're done with it, delete it # if we're done with it, delete it
if self.cluster.status == objects.fields.ClusterStatus.DELETE_COMPLETE: if self.cluster.status == objects.fields.ClusterStatus.DELETE_COMPLETE:
# delete all the nodegroups that belong to this cluster # delete all the nodegroups that belong to this cluster

View File

@ -112,6 +112,10 @@ def get_test_cluster(**kw):
for attr in ['trustee_username', 'trustee_password', 'trust_id']: for attr in ['trustee_username', 'trustee_password', 'trust_id']:
if attr in kw: if attr in kw:
attrs[attr] = kw[attr] attrs[attr] = kw[attr]
# Required only in PeriodicTestCase, may break other tests
for attr in ['keypair', 'health_status']:
if attr in kw:
attrs[attr] = kw[attr]
return attrs return attrs

View File

@ -65,31 +65,36 @@ class PeriodicTestCase(base.TestCase):
uuid = uuidutils.generate_uuid() uuid = uuidutils.generate_uuid()
trust_attrs.update({'id': 1, 'stack_id': '11', 'uuid': uuid, trust_attrs.update({'id': 1, 'stack_id': '11', 'uuid': uuid,
'status': cluster_status.CREATE_IN_PROGRESS, 'status': cluster_status.CREATE_IN_PROGRESS,
'status_reason': 'no change'}) 'status_reason': 'no change',
'keypair': 'keipair1', 'health_status': None})
cluster1 = utils.get_test_cluster(**trust_attrs) cluster1 = utils.get_test_cluster(**trust_attrs)
ngs1 = utils.get_nodegroups_for_cluster() ngs1 = utils.get_nodegroups_for_cluster()
uuid = uuidutils.generate_uuid() uuid = uuidutils.generate_uuid()
trust_attrs.update({'id': 2, 'stack_id': '22', 'uuid': uuid, trust_attrs.update({'id': 2, 'stack_id': '22', 'uuid': uuid,
'status': cluster_status.DELETE_IN_PROGRESS, 'status': cluster_status.DELETE_IN_PROGRESS,
'status_reason': 'no change'}) 'status_reason': 'no change',
'keypair': 'keipair1', 'health_status': None})
cluster2 = utils.get_test_cluster(**trust_attrs) cluster2 = utils.get_test_cluster(**trust_attrs)
ngs2 = utils.get_nodegroups_for_cluster() ngs2 = utils.get_nodegroups_for_cluster()
uuid = uuidutils.generate_uuid() uuid = uuidutils.generate_uuid()
trust_attrs.update({'id': 3, 'stack_id': '33', 'uuid': uuid, trust_attrs.update({'id': 3, 'stack_id': '33', 'uuid': uuid,
'status': cluster_status.UPDATE_IN_PROGRESS, 'status': cluster_status.UPDATE_IN_PROGRESS,
'status_reason': 'no change'}) 'status_reason': 'no change',
'keypair': 'keipair1', 'health_status': None})
cluster3 = utils.get_test_cluster(**trust_attrs) cluster3 = utils.get_test_cluster(**trust_attrs)
ngs3 = utils.get_nodegroups_for_cluster() ngs3 = utils.get_nodegroups_for_cluster()
uuid = uuidutils.generate_uuid() uuid = uuidutils.generate_uuid()
trust_attrs.update({'id': 4, 'stack_id': '44', 'uuid': uuid, trust_attrs.update({'id': 4, 'stack_id': '44', 'uuid': uuid,
'status': cluster_status.DELETE_IN_PROGRESS, 'status': cluster_status.DELETE_IN_PROGRESS,
'status_reason': 'no change'}) 'status_reason': 'no change',
'keypair': 'keipair1', 'health_status': None})
cluster4 = utils.get_test_cluster(**trust_attrs) cluster4 = utils.get_test_cluster(**trust_attrs)
ngs4 = utils.get_nodegroups_for_cluster() ngs4 = utils.get_nodegroups_for_cluster()
uuid = uuidutils.generate_uuid() uuid = uuidutils.generate_uuid()
trust_attrs.update({'id': 5, 'stack_id': '55', 'uuid': uuid, trust_attrs.update({'id': 5, 'stack_id': '55', 'uuid': uuid,
'status': cluster_status.ROLLBACK_IN_PROGRESS, 'status': cluster_status.ROLLBACK_IN_PROGRESS,
'status_reason': 'no change'}) 'status_reason': 'no change',
'keypair': 'keipair1', 'health_status': None})
cluster5 = utils.get_test_cluster(**trust_attrs) cluster5 = utils.get_test_cluster(**trust_attrs)
ngs5 = utils.get_nodegroups_for_cluster() ngs5 = utils.get_nodegroups_for_cluster()

View File

@ -0,0 +1,12 @@
---
features:
- |
Add information about the cluster in magnum event notifications.
Previously the CADF notification's target ID was randomly generated and
no other relevant info about the cluster was sent. Cluster details are
now included in the notifications. This is useful for other OpenStack
projects like Searchlight or third party projects that cache information
regarding OpenStack objects or have custom actions running on
notification. Caching systems can now efficiently update one single
object (e.g. cluster), while without notifications they need to
periodically retrieve object list, which is inefficient.