diff --git a/glance/cmd/agent_notification.py b/glance/cmd/agent_notification.py new file mode 100644 index 00000000..6d643f1d --- /dev/null +++ b/glance/cmd/agent_notification.py @@ -0,0 +1,30 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from glance import listener +from glance.openstack.common import service as os_service +from glance import service + + +def main(): + service.prepare_service() + launcher = os_service.ProcessLauncher() + launcher.launch_service( + listener.ListenerService(), + workers=service.get_workers('listener')) + launcher.wait() + +if __name__ == "__main__": + main() diff --git a/glance/listener.py b/glance/listener.py new file mode 100644 index 00000000..a9b790ee --- /dev/null +++ b/glance/listener.py @@ -0,0 +1,90 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo.config import cfg +from oslo import messaging +from oslo_log import log as logging +import stevedore + +from glance import i18n +from glance.openstack.common import service as os_service + +LOG = logging.getLogger(__name__) +_ = i18n._ +_LE = i18n._LE + + +class NotificationEndpoint(object): + + def __init__(self): + self.plugins = get_plugins() + self.notification_target_map = dict() + for plugin in self.plugins: + try: + event_list = plugin.obj.get_notification_supported_events() + for event in event_list: + self.notification_target_map[event.lower()] = plugin.obj + except Exception as e: + LOG.error(_LE("Failed to retrieve supported notification" + " events from search plugins " + "%(ext)s: %(e)s") % + {'ext': plugin.name, 'e': e}) + + def info(self, ctxt, publisher_id, event_type, payload, metadata): + event_type_l = event_type.lower() + if event_type_l in self.notification_target_map: + plugin = self.notification_target_map[event_type_l] + handler = plugin.get_notification_handler() + handler.process( + ctxt, + publisher_id, + event_type, + payload, + metadata) + + +class ListenerService(os_service.Service): + def __init__(self, *args, **kwargs): + super(ListenerService, self).__init__(*args, **kwargs) + self.listeners = [] + + def start(self): + super(ListenerService, self).start() + transport = messaging.get_transport(cfg.CONF) + targets = [ + messaging.Target(topic="notifications", exchange="glance") + ] + endpoints = [ + NotificationEndpoint() + ] + listener = messaging.get_notification_listener( + transport, + targets, + endpoints) + listener.start() + self.listeners.append(listener) + + def stop(self): + for listener in self.listeners: + listener.stop() + listener.wait() + super(ListenerService, self).stop() + + +def get_plugins(): + namespace = 'glance.search.index_backend' + ext_manager = stevedore.extension.ExtensionManager( + namespace, invoke_on_load=True) + return ext_manager.extensions diff --git a/glance/search/plugins/base.py b/glance/search/plugins/base.py index b2ec6c4f..ac7dcc1d 100644 --- a/glance/search/plugins/base.py +++ b/glance/search/plugins/base.py @@ -117,3 +117,24 @@ class IndexBase(object): def get_mapping(self): """Get an index mapping.""" return {} + + def get_notification_handler(self): + """Get the notification handler which implements NotificationBase.""" + return None + + def get_notification_supported_events(self): + """Get the list of suppported event types.""" + return [] + + +@six.add_metaclass(abc.ABCMeta) +class NotificationBase(object): + + def __init__(self, engine, index_name, document_type): + self.engine = engine + self.index_name = index_name + self.document_type = document_type + + @abc.abstractmethod + def process(self, ctxt, publisher_id, event_type, payload, metadata): + """Process the incoming notification message.""" diff --git a/glance/search/plugins/images.py b/glance/search/plugins/images.py index a46fc187..6b0d6821 100644 --- a/glance/search/plugins/images.py +++ b/glance/search/plugins/images.py @@ -22,6 +22,7 @@ from glance.common import property_utils import glance.db from glance.db.sqlalchemy import models from glance.search.plugins import base +from glance.search.plugins import images_notification_handler class ImageIndex(base.IndexBase): @@ -150,3 +151,13 @@ class ImageIndex(base.IndexBase): document[image_property.name] = image_property.value return document + + def get_notification_handler(self): + return images_notification_handler.ImageHandler( + self.engine, + self.get_index_name(), + self.get_document_type() + ) + + def get_notification_supported_events(self): + return ['image.create', 'image.update', 'image.delete'] diff --git a/glance/search/plugins/images_notification_handler.py b/glance/search/plugins/images_notification_handler.py new file mode 100644 index 00000000..d5346db0 --- /dev/null +++ b/glance/search/plugins/images_notification_handler.py @@ -0,0 +1,83 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_log import log as logging +import oslo_messaging + +from glance.common import utils +from glance.search.plugins import base + +LOG = logging.getLogger(__name__) + + +class ImageHandler(base.NotificationBase): + + def __init__(self, *args, **kwargs): + super(ImageHandler, self).__init__(*args, **kwargs) + self.image_delete_keys = ['deleted_at', 'deleted', + 'is_public', 'properties'] + + def process(self, ctxt, publisher_id, event_type, payload, metadata): + try: + actions = { + "image.create": self.create, + "image.update": self.update, + "image.delete": self.delete + } + actions[event_type](payload) + return oslo_messaging.NotificationResult.HANDLED + except Exception as e: + LOG.error(utils.exception_to_str(e)) + + def create(self, payload): + id = payload['id'] + payload = self.format_image(payload) + self.engine.create( + index=self.index_name, + doc_type=self.document_type, + body=payload, + id=id + ) + + def update(self, payload): + id = payload['id'] + payload = self.format_image(payload) + doc = {"doc": payload} + self.engine.update( + index=self.index_name, + doc_type=self.document_type, + body=doc, + id=id + ) + + def delete(self, payload): + id = payload['id'] + self.engine.delete( + index=self.index_name, + doc_type=self.document_type, + id=id + ) + + def format_image(self, payload): + visibility = 'public' if payload['is_public'] else 'private' + payload['visibility'] = visibility + + payload.update(payload.get('properties', '{}')) + + for key in payload.keys(): + if key in self.image_delete_keys: + del payload[key] + + return payload diff --git a/glance/search/plugins/metadefs.py b/glance/search/plugins/metadefs.py index 3ff5d86e..0b631432 100644 --- a/glance/search/plugins/metadefs.py +++ b/glance/search/plugins/metadefs.py @@ -20,6 +20,7 @@ import six import glance.db from glance.db.sqlalchemy import models_metadef as models from glance.search.plugins import base +from glance.search.plugins import metadefs_notification_handler class MetadefIndex(base.IndexBase): @@ -228,3 +229,31 @@ class MetadefIndex(base.IndexBase): return { 'name': tag.name } + + def get_notification_handler(self): + return metadefs_notification_handler.MetadefHandler( + self.engine, + self.get_index_name(), + self.get_document_type() + ) + + def get_notification_supported_events(self): + return [ + "metadef_namespace.create", + "metadef_namespace.update", + "metadef_namespace.delete", + "metadef_object.create", + "metadef_object.update", + "metadef_object.delete", + "metadef_property.create", + "metadef_property.update", + "metadef_property.delete", + "metadef_tag.create", + "metadef_tag.update", + "metadef_tag.delete", + "metadef_resource_type.create", + "metadef_resource_type.delete", + "metadef_namespace.delete_properties", + "metadef_namespace.delete_objects", + "metadef_namespace.delete_tags" + ] diff --git a/glance/search/plugins/metadefs_notification_handler.py b/glance/search/plugins/metadefs_notification_handler.py new file mode 100644 index 00000000..18b16872 --- /dev/null +++ b/glance/search/plugins/metadefs_notification_handler.py @@ -0,0 +1,251 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from oslo_log import log as logging +import oslo_messaging + +from glance.common import utils +from glance.search.plugins import base + +LOG = logging.getLogger(__name__) + + +class MetadefHandler(base.NotificationBase): + + def __init__(self, *args, **kwargs): + super(MetadefHandler, self).__init__(*args, **kwargs) + self.namespace_delete_keys = ['deleted_at', 'deleted', 'created_at', + 'updated_at', 'namespace_old'] + self.property_delete_keys = ['deleted', 'deleted_at', + 'name_old', 'namespace', 'name'] + + def process(self, ctxt, publisher_id, event_type, payload, metadata): + try: + actions = { + "metadef_namespace.create": self.create_ns, + "metadef_namespace.update": self.update_ns, + "metadef_namespace.delete": self.delete_ns, + "metadef_object.create": self.create_obj, + "metadef_object.update": self.update_obj, + "metadef_object.delete": self.delete_obj, + "metadef_property.create": self.create_prop, + "metadef_property.update": self.update_prop, + "metadef_property.delete": self.delete_prop, + "metadef_resource_type.create": self.create_rs, + "metadef_resource_type.delete": self.delete_rs, + "metadef_tag.create": self.create_tag, + "metadef_tag.update": self.update_tag, + "metadef_tag.delete": self.delete_tag, + "metadef_namespace.delete_properties": self.delete_props, + "metadef_namespace.delete_objects": self.delete_objects, + "metadef_namespace.delete_tags": self.delete_tags + } + actions[event_type](payload) + return oslo_messaging.NotificationResult.HANDLED + except Exception as e: + LOG.error(utils.exception_to_str(e)) + + def run_create(self, id, payload): + self.engine.create( + index=self.index_name, + doc_type=self.document_type, + body=payload, + id=id + ) + + def run_update(self, id, payload, script=False): + if script: + self.engine.update( + index=self.index_name, + doc_type=self.document_type, + body=payload, + id=id) + else: + doc = {"doc": payload} + self.engine.update( + index=self.index_name, + doc_type=self.document_type, + body=doc, + id=id) + + def run_delete(self, id): + self.engine.delete( + index=self.index_name, + doc_type=self.document_type, + id=id + ) + + def create_ns(self, payload): + id = payload['namespace'] + self.run_create(id, self.format_namespace(payload)) + + def update_ns(self, payload): + id = payload['namespace_old'] + self.run_update(id, self.format_namespace(payload)) + + def delete_ns(self, payload): + id = payload['namespace'] + self.run_delete(id) + + def create_obj(self, payload): + id = payload['namespace'] + object = self.format_object(payload) + self.create_entity(id, "objects", object) + + def update_obj(self, payload): + id = payload['namespace'] + object = self.format_object(payload) + self.update_entity(id, "objects", object, + payload['name_old'], "name") + + def delete_obj(self, payload): + id = payload['namespace'] + self.delete_entity(id, "objects", payload['name'], "name") + + def create_prop(self, payload): + id = payload['namespace'] + property = self.format_property(payload) + self.create_entity(id, "properties", property) + + def update_prop(self, payload): + id = payload['namespace'] + property = self.format_property(payload) + self.update_entity(id, "properties", property, + payload['name_old'], "property") + + def delete_prop(self, payload): + id = payload['namespace'] + self.delete_entity(id, "properties", payload['name'], "property") + + def create_rs(self, payload): + id = payload['namespace'] + resource_type = dict() + resource_type['name'] = payload['name'] + if payload['prefix']: + resource_type['prefix'] = payload['prefix'] + if payload['properties_target']: + resource_type['properties_target'] = payload['properties_target'] + + self.create_entity(id, "resource_types", resource_type) + + def delete_rs(self, payload): + id = payload['namespace'] + self.delete_entity(id, "resource_types", payload['name'], "name") + + def create_tag(self, payload): + id = payload['namespace'] + tag = dict() + tag['name'] = payload['name'] + + self.create_entity(id, "tags", tag) + + def update_tag(self, payload): + id = payload['namespace'] + tag = dict() + tag['name'] = payload['name'] + + self.update_entity(id, "tags", tag, payload['name_old'], "name") + + def delete_tag(self, payload): + id = payload['namespace'] + self.delete_entity(id, "tags", payload['name'], "name") + + def delete_props(self, payload): + self.delete_field(payload, "properties") + + def delete_objects(self, payload): + self.delete_field(payload, "objects") + + def delete_tags(self, payload): + self.delete_field(payload, "tags") + + def create_entity(self, id, entity, entity_data): + script = ("if (ctx._source.containsKey('%(entity)s'))" + "{ctx._source.%(entity)s += entity_item }" + "else {ctx._source.%(entity)s=entity_list};" % + {"entity": entity}) + + params = { + "entity_item": entity_data, + "entity_list": [entity_data] + } + payload = {"script": script, "params": params} + self.run_update(id, payload=payload, script=True) + + def update_entity(self, id, entity, entity_data, entity_id, field_name): + entity_id = entity_id.lower() + script = ("obj=null; for(entity_item :ctx._source.%(entity)s)" + "{if(entity_item['%(field_name)s'].toLowerCase() " + " == entity_id ) obj=entity_item;};" + "if(obj!=null)ctx._source.%(entity)s.remove(obj);" + "if (ctx._source.containsKey('%(entity)s'))" + "{ctx._source.%(entity)s += entity_item; }" + "else {ctx._source.%(entity)s=entity_list;}" % + {"entity": entity, "field_name": field_name}) + params = { + "entity_item": entity_data, + "entity_list": [entity_data], + "entity_id": entity_id + } + payload = {"script": script, "params": params} + self.run_update(id, payload=payload, script=True) + + def delete_entity(self, id, entity, entity_id, field_name): + entity_id = entity_id.lower() + script = ("obj=null; for(entity_item :ctx._source.%(entity)s)" + "{if(entity_item['%(field_name)s'].toLowerCase() " + " == entity_id ) obj=entity_item;};" + "if(obj!=null)ctx._source.%(entity)s.remove(obj);" % + {"entity": entity, "field_name": field_name}) + params = { + "entity_id": entity_id + } + payload = {"script": script, "params": params} + self.run_update(id, payload=payload, script=True) + + def delete_field(self, payload, field): + id = payload['namespace'] + script = ("if (ctx._source.containsKey('%(field)s'))" + "{ctx._source.remove('%(field)s')}") % {"field": field} + payload = {"script": script} + self.run_update(id, payload=payload, script=True) + + def format_namespace(self, payload): + for key in self.namespace_delete_keys: + if key in payload.keys(): + del payload[key] + return payload + + def format_object(self, payload): + formatted_object = dict() + formatted_object['name'] = payload['name'] + formatted_object['description'] = payload['description'] + if payload['required']: + formatted_object['required'] = payload['required'] + formatted_object['properties'] = [] + for property in payload['properties']: + formatted_property = self.format_property(property) + formatted_object['properties'].append(formatted_property) + return formatted_object + + def format_property(self, payload): + prop_data = dict() + prop_data['property'] = payload['name'] + for key, value in six.iteritems(payload): + if key not in self.property_delete_keys and value: + prop_data[key] = value + return prop_data diff --git a/glance/service.py b/glance/service.py new file mode 100644 index 00000000..12c62237 --- /dev/null +++ b/glance/service.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# +# Copyright 2012-2014 eNovance +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import socket +import sys + +from oslo.config import cfg +from oslo import i18n +import oslo.messaging +from oslo_log import log + +CONF = cfg.CONF + +OPTS = [ + cfg.StrOpt('host', + default=socket.gethostname(), + help='Name of this node, which must be valid in an AMQP ' + 'key. Can be an opaque identifier. For ZeroMQ only, must ' + 'be a valid host name, FQDN, or IP address.'), + cfg.IntOpt('listener_workers', + default=1, + help='Number of workers for notification service. A single ' + 'notification agent is enabled by default.'), + cfg.IntOpt('http_timeout', + default=600, + help='Timeout seconds for HTTP requests. Set it to None to ' + 'disable timeout.'), +] +CONF.register_opts(OPTS) + +CLI_OPTS = [ + cfg.StrOpt('os-username', + deprecated_group="DEFAULT", + default=os.environ.get('OS_USERNAME', 'glance'), + help='User name to use for OpenStack service access.'), + cfg.StrOpt('os-password', + deprecated_group="DEFAULT", + secret=True, + default=os.environ.get('OS_PASSWORD', 'admin'), + help='Password to use for OpenStack service access.'), + cfg.StrOpt('os-tenant-id', + deprecated_group="DEFAULT", + default=os.environ.get('OS_TENANT_ID', ''), + help='Tenant ID to use for OpenStack service access.'), + cfg.StrOpt('os-tenant-name', + deprecated_group="DEFAULT", + default=os.environ.get('OS_TENANT_NAME', 'admin'), + help='Tenant name to use for OpenStack service access.'), + cfg.StrOpt('os-cacert', + default=os.environ.get('OS_CACERT'), + help='Certificate chain for SSL validation.'), + cfg.StrOpt('os-auth-url', + deprecated_group="DEFAULT", + default=os.environ.get('OS_AUTH_URL', + 'http://localhost:5000/v2.0'), + help='Auth URL to use for OpenStack service access.'), + cfg.StrOpt('os-region-name', + deprecated_group="DEFAULT", + default=os.environ.get('OS_REGION_NAME'), + help='Region name to use for OpenStack service endpoints.'), + cfg.StrOpt('os-endpoint-type', + default=os.environ.get('OS_ENDPOINT_TYPE', 'publicURL'), + help='Type of endpoint in Identity service catalog to use for ' + 'communication with OpenStack services.'), + cfg.BoolOpt('insecure', + default=False, + help='Disables X.509 certificate validation when an ' + 'SSL connection to Identity Service is established.'), +] +CONF.register_cli_opts(CLI_OPTS, group="service_credentials") + +LOG = log.getLogger(__name__) +_DEFAULT_LOG_LEVELS = ['keystonemiddleware=WARN', 'stevedore=WARN'] + + +class WorkerException(Exception): + """Exception for errors relating to service workers.""" + + +def get_workers(name): + return 1 + + +def prepare_service(argv=None): + i18n.enable_lazy() + log.set_defaults(_DEFAULT_LOG_LEVELS) + log.register_options(CONF) + if argv is None: + argv = sys.argv + CONF(argv[1:], project='glance-search') + log.setup(cfg.CONF, 'glance-search') + oslo.messaging.set_transport_defaults('glance')