Catalog Index Service - Index Update

Provides a listener service for notifications.info topic
and handlers which update the index.

Partially Implements: blueprint catalog-index-service
* Listener and Handler for Glance Images
* Listener and Handler for Glance Metadata Definitions

Change-Id: Ia4dc7e7ae93209d368858d6c78b8d9b2490c43a1
This commit is contained in:
Lakshmi N Sampath 2015-03-04 22:46:16 -08:00
parent 9911f962a4
commit c45e680277
8 changed files with 622 additions and 0 deletions

View File

@ -0,0 +1,30 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from glance import listener
from glance.openstack.common import service as os_service
from glance import service
def main():
service.prepare_service()
launcher = os_service.ProcessLauncher()
launcher.launch_service(
listener.ListenerService(),
workers=service.get_workers('listener'))
launcher.wait()
if __name__ == "__main__":
main()

90
glance/listener.py Normal file
View File

@ -0,0 +1,90 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo.config import cfg
from oslo import messaging
from oslo_log import log as logging
import stevedore
from glance import i18n
from glance.openstack.common import service as os_service
LOG = logging.getLogger(__name__)
_ = i18n._
_LE = i18n._LE
class NotificationEndpoint(object):
def __init__(self):
self.plugins = get_plugins()
self.notification_target_map = dict()
for plugin in self.plugins:
try:
event_list = plugin.obj.get_notification_supported_events()
for event in event_list:
self.notification_target_map[event.lower()] = plugin.obj
except Exception as e:
LOG.error(_LE("Failed to retrieve supported notification"
" events from search plugins "
"%(ext)s: %(e)s") %
{'ext': plugin.name, 'e': e})
def info(self, ctxt, publisher_id, event_type, payload, metadata):
event_type_l = event_type.lower()
if event_type_l in self.notification_target_map:
plugin = self.notification_target_map[event_type_l]
handler = plugin.get_notification_handler()
handler.process(
ctxt,
publisher_id,
event_type,
payload,
metadata)
class ListenerService(os_service.Service):
def __init__(self, *args, **kwargs):
super(ListenerService, self).__init__(*args, **kwargs)
self.listeners = []
def start(self):
super(ListenerService, self).start()
transport = messaging.get_transport(cfg.CONF)
targets = [
messaging.Target(topic="notifications", exchange="glance")
]
endpoints = [
NotificationEndpoint()
]
listener = messaging.get_notification_listener(
transport,
targets,
endpoints)
listener.start()
self.listeners.append(listener)
def stop(self):
for listener in self.listeners:
listener.stop()
listener.wait()
super(ListenerService, self).stop()
def get_plugins():
namespace = 'glance.search.index_backend'
ext_manager = stevedore.extension.ExtensionManager(
namespace, invoke_on_load=True)
return ext_manager.extensions

View File

@ -117,3 +117,24 @@ class IndexBase(object):
def get_mapping(self):
"""Get an index mapping."""
return {}
def get_notification_handler(self):
"""Get the notification handler which implements NotificationBase."""
return None
def get_notification_supported_events(self):
"""Get the list of suppported event types."""
return []
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(object):
def __init__(self, engine, index_name, document_type):
self.engine = engine
self.index_name = index_name
self.document_type = document_type
@abc.abstractmethod
def process(self, ctxt, publisher_id, event_type, payload, metadata):
"""Process the incoming notification message."""

View File

@ -22,6 +22,7 @@ from glance.common import property_utils
import glance.db
from glance.db.sqlalchemy import models
from glance.search.plugins import base
from glance.search.plugins import images_notification_handler
class ImageIndex(base.IndexBase):
@ -150,3 +151,13 @@ class ImageIndex(base.IndexBase):
document[image_property.name] = image_property.value
return document
def get_notification_handler(self):
return images_notification_handler.ImageHandler(
self.engine,
self.get_index_name(),
self.get_document_type()
)
def get_notification_supported_events(self):
return ['image.create', 'image.update', 'image.delete']

View File

@ -0,0 +1,83 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
import oslo_messaging
from glance.common import utils
from glance.search.plugins import base
LOG = logging.getLogger(__name__)
class ImageHandler(base.NotificationBase):
def __init__(self, *args, **kwargs):
super(ImageHandler, self).__init__(*args, **kwargs)
self.image_delete_keys = ['deleted_at', 'deleted',
'is_public', 'properties']
def process(self, ctxt, publisher_id, event_type, payload, metadata):
try:
actions = {
"image.create": self.create,
"image.update": self.update,
"image.delete": self.delete
}
actions[event_type](payload)
return oslo_messaging.NotificationResult.HANDLED
except Exception as e:
LOG.error(utils.exception_to_str(e))
def create(self, payload):
id = payload['id']
payload = self.format_image(payload)
self.engine.create(
index=self.index_name,
doc_type=self.document_type,
body=payload,
id=id
)
def update(self, payload):
id = payload['id']
payload = self.format_image(payload)
doc = {"doc": payload}
self.engine.update(
index=self.index_name,
doc_type=self.document_type,
body=doc,
id=id
)
def delete(self, payload):
id = payload['id']
self.engine.delete(
index=self.index_name,
doc_type=self.document_type,
id=id
)
def format_image(self, payload):
visibility = 'public' if payload['is_public'] else 'private'
payload['visibility'] = visibility
payload.update(payload.get('properties', '{}'))
for key in payload.keys():
if key in self.image_delete_keys:
del payload[key]
return payload

View File

@ -20,6 +20,7 @@ import six
import glance.db
from glance.db.sqlalchemy import models_metadef as models
from glance.search.plugins import base
from glance.search.plugins import metadefs_notification_handler
class MetadefIndex(base.IndexBase):
@ -228,3 +229,31 @@ class MetadefIndex(base.IndexBase):
return {
'name': tag.name
}
def get_notification_handler(self):
return metadefs_notification_handler.MetadefHandler(
self.engine,
self.get_index_name(),
self.get_document_type()
)
def get_notification_supported_events(self):
return [
"metadef_namespace.create",
"metadef_namespace.update",
"metadef_namespace.delete",
"metadef_object.create",
"metadef_object.update",
"metadef_object.delete",
"metadef_property.create",
"metadef_property.update",
"metadef_property.delete",
"metadef_tag.create",
"metadef_tag.update",
"metadef_tag.delete",
"metadef_resource_type.create",
"metadef_resource_type.delete",
"metadef_namespace.delete_properties",
"metadef_namespace.delete_objects",
"metadef_namespace.delete_tags"
]

View File

@ -0,0 +1,251 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from oslo_log import log as logging
import oslo_messaging
from glance.common import utils
from glance.search.plugins import base
LOG = logging.getLogger(__name__)
class MetadefHandler(base.NotificationBase):
def __init__(self, *args, **kwargs):
super(MetadefHandler, self).__init__(*args, **kwargs)
self.namespace_delete_keys = ['deleted_at', 'deleted', 'created_at',
'updated_at', 'namespace_old']
self.property_delete_keys = ['deleted', 'deleted_at',
'name_old', 'namespace', 'name']
def process(self, ctxt, publisher_id, event_type, payload, metadata):
try:
actions = {
"metadef_namespace.create": self.create_ns,
"metadef_namespace.update": self.update_ns,
"metadef_namespace.delete": self.delete_ns,
"metadef_object.create": self.create_obj,
"metadef_object.update": self.update_obj,
"metadef_object.delete": self.delete_obj,
"metadef_property.create": self.create_prop,
"metadef_property.update": self.update_prop,
"metadef_property.delete": self.delete_prop,
"metadef_resource_type.create": self.create_rs,
"metadef_resource_type.delete": self.delete_rs,
"metadef_tag.create": self.create_tag,
"metadef_tag.update": self.update_tag,
"metadef_tag.delete": self.delete_tag,
"metadef_namespace.delete_properties": self.delete_props,
"metadef_namespace.delete_objects": self.delete_objects,
"metadef_namespace.delete_tags": self.delete_tags
}
actions[event_type](payload)
return oslo_messaging.NotificationResult.HANDLED
except Exception as e:
LOG.error(utils.exception_to_str(e))
def run_create(self, id, payload):
self.engine.create(
index=self.index_name,
doc_type=self.document_type,
body=payload,
id=id
)
def run_update(self, id, payload, script=False):
if script:
self.engine.update(
index=self.index_name,
doc_type=self.document_type,
body=payload,
id=id)
else:
doc = {"doc": payload}
self.engine.update(
index=self.index_name,
doc_type=self.document_type,
body=doc,
id=id)
def run_delete(self, id):
self.engine.delete(
index=self.index_name,
doc_type=self.document_type,
id=id
)
def create_ns(self, payload):
id = payload['namespace']
self.run_create(id, self.format_namespace(payload))
def update_ns(self, payload):
id = payload['namespace_old']
self.run_update(id, self.format_namespace(payload))
def delete_ns(self, payload):
id = payload['namespace']
self.run_delete(id)
def create_obj(self, payload):
id = payload['namespace']
object = self.format_object(payload)
self.create_entity(id, "objects", object)
def update_obj(self, payload):
id = payload['namespace']
object = self.format_object(payload)
self.update_entity(id, "objects", object,
payload['name_old'], "name")
def delete_obj(self, payload):
id = payload['namespace']
self.delete_entity(id, "objects", payload['name'], "name")
def create_prop(self, payload):
id = payload['namespace']
property = self.format_property(payload)
self.create_entity(id, "properties", property)
def update_prop(self, payload):
id = payload['namespace']
property = self.format_property(payload)
self.update_entity(id, "properties", property,
payload['name_old'], "property")
def delete_prop(self, payload):
id = payload['namespace']
self.delete_entity(id, "properties", payload['name'], "property")
def create_rs(self, payload):
id = payload['namespace']
resource_type = dict()
resource_type['name'] = payload['name']
if payload['prefix']:
resource_type['prefix'] = payload['prefix']
if payload['properties_target']:
resource_type['properties_target'] = payload['properties_target']
self.create_entity(id, "resource_types", resource_type)
def delete_rs(self, payload):
id = payload['namespace']
self.delete_entity(id, "resource_types", payload['name'], "name")
def create_tag(self, payload):
id = payload['namespace']
tag = dict()
tag['name'] = payload['name']
self.create_entity(id, "tags", tag)
def update_tag(self, payload):
id = payload['namespace']
tag = dict()
tag['name'] = payload['name']
self.update_entity(id, "tags", tag, payload['name_old'], "name")
def delete_tag(self, payload):
id = payload['namespace']
self.delete_entity(id, "tags", payload['name'], "name")
def delete_props(self, payload):
self.delete_field(payload, "properties")
def delete_objects(self, payload):
self.delete_field(payload, "objects")
def delete_tags(self, payload):
self.delete_field(payload, "tags")
def create_entity(self, id, entity, entity_data):
script = ("if (ctx._source.containsKey('%(entity)s'))"
"{ctx._source.%(entity)s += entity_item }"
"else {ctx._source.%(entity)s=entity_list};" %
{"entity": entity})
params = {
"entity_item": entity_data,
"entity_list": [entity_data]
}
payload = {"script": script, "params": params}
self.run_update(id, payload=payload, script=True)
def update_entity(self, id, entity, entity_data, entity_id, field_name):
entity_id = entity_id.lower()
script = ("obj=null; for(entity_item :ctx._source.%(entity)s)"
"{if(entity_item['%(field_name)s'].toLowerCase() "
" == entity_id ) obj=entity_item;};"
"if(obj!=null)ctx._source.%(entity)s.remove(obj);"
"if (ctx._source.containsKey('%(entity)s'))"
"{ctx._source.%(entity)s += entity_item; }"
"else {ctx._source.%(entity)s=entity_list;}" %
{"entity": entity, "field_name": field_name})
params = {
"entity_item": entity_data,
"entity_list": [entity_data],
"entity_id": entity_id
}
payload = {"script": script, "params": params}
self.run_update(id, payload=payload, script=True)
def delete_entity(self, id, entity, entity_id, field_name):
entity_id = entity_id.lower()
script = ("obj=null; for(entity_item :ctx._source.%(entity)s)"
"{if(entity_item['%(field_name)s'].toLowerCase() "
" == entity_id ) obj=entity_item;};"
"if(obj!=null)ctx._source.%(entity)s.remove(obj);" %
{"entity": entity, "field_name": field_name})
params = {
"entity_id": entity_id
}
payload = {"script": script, "params": params}
self.run_update(id, payload=payload, script=True)
def delete_field(self, payload, field):
id = payload['namespace']
script = ("if (ctx._source.containsKey('%(field)s'))"
"{ctx._source.remove('%(field)s')}") % {"field": field}
payload = {"script": script}
self.run_update(id, payload=payload, script=True)
def format_namespace(self, payload):
for key in self.namespace_delete_keys:
if key in payload.keys():
del payload[key]
return payload
def format_object(self, payload):
formatted_object = dict()
formatted_object['name'] = payload['name']
formatted_object['description'] = payload['description']
if payload['required']:
formatted_object['required'] = payload['required']
formatted_object['properties'] = []
for property in payload['properties']:
formatted_property = self.format_property(property)
formatted_object['properties'].append(formatted_property)
return formatted_object
def format_property(self, payload):
prop_data = dict()
prop_data['property'] = payload['name']
for key, value in six.iteritems(payload):
if key not in self.property_delete_keys and value:
prop_data[key] = value
return prop_data

107
glance/service.py Normal file
View File

@ -0,0 +1,107 @@
#!/usr/bin/env python
#
# Copyright 2012-2014 eNovance <licensing@enovance.com>
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import socket
import sys
from oslo.config import cfg
from oslo import i18n
import oslo.messaging
from oslo_log import log
CONF = cfg.CONF
OPTS = [
cfg.StrOpt('host',
default=socket.gethostname(),
help='Name of this node, which must be valid in an AMQP '
'key. Can be an opaque identifier. For ZeroMQ only, must '
'be a valid host name, FQDN, or IP address.'),
cfg.IntOpt('listener_workers',
default=1,
help='Number of workers for notification service. A single '
'notification agent is enabled by default.'),
cfg.IntOpt('http_timeout',
default=600,
help='Timeout seconds for HTTP requests. Set it to None to '
'disable timeout.'),
]
CONF.register_opts(OPTS)
CLI_OPTS = [
cfg.StrOpt('os-username',
deprecated_group="DEFAULT",
default=os.environ.get('OS_USERNAME', 'glance'),
help='User name to use for OpenStack service access.'),
cfg.StrOpt('os-password',
deprecated_group="DEFAULT",
secret=True,
default=os.environ.get('OS_PASSWORD', 'admin'),
help='Password to use for OpenStack service access.'),
cfg.StrOpt('os-tenant-id',
deprecated_group="DEFAULT",
default=os.environ.get('OS_TENANT_ID', ''),
help='Tenant ID to use for OpenStack service access.'),
cfg.StrOpt('os-tenant-name',
deprecated_group="DEFAULT",
default=os.environ.get('OS_TENANT_NAME', 'admin'),
help='Tenant name to use for OpenStack service access.'),
cfg.StrOpt('os-cacert',
default=os.environ.get('OS_CACERT'),
help='Certificate chain for SSL validation.'),
cfg.StrOpt('os-auth-url',
deprecated_group="DEFAULT",
default=os.environ.get('OS_AUTH_URL',
'http://localhost:5000/v2.0'),
help='Auth URL to use for OpenStack service access.'),
cfg.StrOpt('os-region-name',
deprecated_group="DEFAULT",
default=os.environ.get('OS_REGION_NAME'),
help='Region name to use for OpenStack service endpoints.'),
cfg.StrOpt('os-endpoint-type',
default=os.environ.get('OS_ENDPOINT_TYPE', 'publicURL'),
help='Type of endpoint in Identity service catalog to use for '
'communication with OpenStack services.'),
cfg.BoolOpt('insecure',
default=False,
help='Disables X.509 certificate validation when an '
'SSL connection to Identity Service is established.'),
]
CONF.register_cli_opts(CLI_OPTS, group="service_credentials")
LOG = log.getLogger(__name__)
_DEFAULT_LOG_LEVELS = ['keystonemiddleware=WARN', 'stevedore=WARN']
class WorkerException(Exception):
"""Exception for errors relating to service workers."""
def get_workers(name):
return 1
def prepare_service(argv=None):
i18n.enable_lazy()
log.set_defaults(_DEFAULT_LOG_LEVELS)
log.register_options(CONF)
if argv is None:
argv = sys.argv
CONF(argv[1:], project='glance-search')
log.setup(cfg.CONF, 'glance-search')
oslo.messaging.set_transport_defaults('glance')