Abstract out functionality into plugins
Abstract handlers into plugins and make handlers more generic Same with Storage and Backends as well Also now you just subclass the BaseAddressHandler and run self._create and self._delete in process_noticiation for Notification handling. This change will allow us to configure options on a pr handler base using the __plugin_type__ + __plugin_name__ of each plugin to add sections in the configs easily like: [handler:nova_fixed] domain = test.com [storage:sql] connection_debug = 100 [backend:bind9] someopt = x Also change the default record format of handlers to use octet data bug #1078935 Change-Id: Ic2ddf5a113dd1a306fce0513da2bfdbda991f647
This commit is contained in:
parent
84dadfdbb1
commit
89a101b266
@ -15,3 +15,11 @@ debug = False
|
||||
allowed_rpc_exception_modules = moniker.exceptions, moniker.openstack.common.exception
|
||||
logging_context_format_string = %(asctime)s %(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s] %(instance)s %(message)s
|
||||
default_log_levels = amqplib=WARN, sqlalchemy=WARN, boto=WARN, suds=INFO, keystone=INFO, eventlet.wsgi.server=WARN, stevedore=WARN
|
||||
|
||||
# Ability to configure each backend individually
|
||||
#[backend:bind9]
|
||||
#rndc-path = /usr/sbin/rndc
|
||||
#rndc-host = 127.0.0.1
|
||||
#rndc-port = 953
|
||||
#rndc-config-file = /etc/rndc.conf
|
||||
#rndc-key-file = /etc/rndc.key
|
||||
|
@ -8,16 +8,32 @@ debug = False
|
||||
# Top-level directory for maintaining moniker's state
|
||||
#state_path = /var/lib/moniker
|
||||
|
||||
# Database connection string.
|
||||
# Database connection string - to configure options for a given implementation
|
||||
# like sqlalchemy or other see below
|
||||
#database_connection = sqlite:///$state_path/moniker.sqlite
|
||||
|
||||
# Driver used for issuing notifications
|
||||
#notification_driver=moniker.openstack.common.notifier.rabbit_notifier
|
||||
|
||||
# List of notification handlers to enable
|
||||
#enabled_notification_handlers = nova
|
||||
|
||||
# There has to be a better way to set these defaults
|
||||
allowed_rpc_exception_modules = moniker.exceptions, moniker.openstack.common.exception
|
||||
logging_context_format_string = %(asctime)s %(levelname)s %(name)s [%(request_id)s %(user)s %(tenant)s] %(instance)s %(message)s
|
||||
default_log_levels = amqplib=WARN, sqlalchemy=WARN, boto=WARN, suds=INFO, keystone=INFO, eventlet.wsgi.server=WARN, stevedore=WARN
|
||||
|
||||
# Driver used for issuing notifications
|
||||
#notification_driver=moniker.openstack.common.notifier.rabbit_notifier
|
||||
|
||||
# List of notification handlers to enable, configuration of these needs to
|
||||
# correspond to a [handler:my_driver] section below or else in the config
|
||||
#enabled_notification_handlers = nova_fixed
|
||||
|
||||
# Sections for *SQL storages
|
||||
#[storage:sqlalchemy]
|
||||
#connection_debug = 100
|
||||
#connection_trace = False
|
||||
#sqlite_synchronous = True
|
||||
#idle_timeout = 3600
|
||||
#max_retries = 10
|
||||
#retry_interval = 10
|
||||
|
||||
#[handler:nova_fixed]
|
||||
#domain_id = <random uuid>
|
||||
#notification_topics = monitor
|
||||
#control_exchange = 'nova'
|
||||
|
@ -23,8 +23,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
class Service(rpc_service.Service):
|
||||
def __init__(self, *args, **kwargs):
|
||||
manager = backend.get_backend()
|
||||
manager.register_opts(cfg.CONF)
|
||||
manager = backend.get_backend(cfg.CONF)
|
||||
|
||||
kwargs.update(
|
||||
host=cfg.CONF.host,
|
||||
|
@ -27,7 +27,7 @@ cfg.CONF.register_opts([
|
||||
])
|
||||
|
||||
|
||||
def get_backend():
|
||||
mgr = driver.DriverManager(BACKEND_NAMESPACE, cfg.CONF.backend_driver,
|
||||
invoke_on_load=True)
|
||||
return mgr.driver
|
||||
def get_backend(conf):
|
||||
mgr = driver.DriverManager(BACKEND_NAMESPACE, cfg.CONF.backend_driver)
|
||||
mgr.driver.register_opts(conf)
|
||||
return mgr.driver()
|
||||
|
@ -15,27 +15,15 @@
|
||||
# under the License.
|
||||
import abc
|
||||
from moniker.openstack.common import log as logging
|
||||
from moniker.plugin import Plugin
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Backend(object):
|
||||
class Backend(Plugin):
|
||||
""" Base class for backend implementations """
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@staticmethod
|
||||
def register_opts(conf):
|
||||
""" Register configuration options """
|
||||
|
||||
def start(self):
|
||||
""" Hook for any necessary startup code """
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
""" Hook for any necessary shutdown code """
|
||||
pass
|
||||
__plugin_type__ = 'backend'
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_domain(self, context, domain):
|
||||
|
@ -26,8 +26,12 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Bind9Backend(base.Backend):
|
||||
def register_opts(self, conf):
|
||||
conf.register_opts([
|
||||
__plugin_name__ = 'bind9'
|
||||
|
||||
@classmethod
|
||||
def get_opts(cls):
|
||||
opts = super(Bind9Backend, cls).get_opts()
|
||||
opts.extend([
|
||||
cfg.StrOpt('rndc-path', default='/usr/sbin/rndc',
|
||||
help='RNDC Path'),
|
||||
cfg.StrOpt('rndc-host', default='127.0.0.1', help='RNDC Host'),
|
||||
@ -36,6 +40,7 @@ class Bind9Backend(base.Backend):
|
||||
help='RNDC Config File'),
|
||||
cfg.StrOpt('rndc-key-file', default=None, help='RNDC Key File'),
|
||||
])
|
||||
return opts
|
||||
|
||||
def start(self):
|
||||
super(Bind9Backend, self).start()
|
||||
@ -131,16 +136,16 @@ class Bind9Backend(base.Backend):
|
||||
|
||||
rndc_call = [
|
||||
'sudo',
|
||||
cfg.CONF.rndc_path,
|
||||
'-s', cfg.CONF.rndc_host,
|
||||
'-p', str(cfg.CONF.rndc_port),
|
||||
self.config.rndc_path,
|
||||
'-s', self.config.rndc_host,
|
||||
'-p', str(self.config.rndc_port),
|
||||
]
|
||||
|
||||
if cfg.CONF.rndc_config_file:
|
||||
rndc_call.extend(['-c', cfg.CONF.rndc_config_file])
|
||||
if self.config.rndc_config_file:
|
||||
rndc_call.extend(['-c', self.config.rndc_config_file])
|
||||
|
||||
if cfg.CONF.rndc_key_file:
|
||||
rndc_call.extend(['-k', cfg.CONF.rndc_key_file])
|
||||
if self.config.rndc_key_file:
|
||||
rndc_call.extend(['-k', self.config.rndc_key_file])
|
||||
|
||||
rndc_op = 'reconfig' if new_domain_flag else 'reload'
|
||||
rndc_call.extend([rndc_op])
|
||||
|
@ -20,6 +20,8 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FakeBackend(base.Backend):
|
||||
__plugin_name__ = 'fake'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FakeBackend, self).__init__(*args, **kwargs)
|
||||
|
||||
|
@ -62,7 +62,6 @@ class Service(rpc_service.Service):
|
||||
def _load_extension(ext):
|
||||
handler_cls = ext.plugin
|
||||
handler_cls.register_opts(cfg.CONF)
|
||||
|
||||
return handler_cls(central_service=self)
|
||||
|
||||
try:
|
||||
|
@ -1,6 +1,7 @@
|
||||
# Copyright 2012 Managed I.T.
|
||||
#
|
||||
# Author: Kiall Mac Innes <kiall@managedit.ie>
|
||||
# Author: Endre Karlson <endre.karlson@bouvet.no>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -14,25 +15,42 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
from moniker.openstack.common import cfg
|
||||
from moniker.openstack.common import log as logging
|
||||
from moniker.context import MonikerContext
|
||||
from moniker.plugin import Plugin
|
||||
from moniker import exceptions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Handler(object):
|
||||
""" Base class for notification handlers """
|
||||
def get_ip_data(addr_dict):
|
||||
ip = addr_dict['address']
|
||||
version = addr_dict['version']
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
data = {
|
||||
'ip_version': version
|
||||
}
|
||||
|
||||
# TODO: Add v6 support
|
||||
if version == 4:
|
||||
data['ip_address'] = ip.replace('.', '-')
|
||||
ip_data = ip.split(".")
|
||||
for i in [0, 1, 2, 3]:
|
||||
data["octet%s" % i] = ip_data[i]
|
||||
return data
|
||||
|
||||
|
||||
class Handler(Plugin):
|
||||
""" Base class for notification handlers """
|
||||
__plugin_type__ = 'handler'
|
||||
|
||||
def __init__(self, central_service):
|
||||
super(Handler, self).__init__()
|
||||
LOG.debug('Loaded handler: %s' % __name__)
|
||||
self.central_service = central_service
|
||||
|
||||
@staticmethod
|
||||
def register_opts(conf):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_exchange_topics(self):
|
||||
"""
|
||||
@ -49,3 +67,78 @@ class Handler(object):
|
||||
@abc.abstractmethod
|
||||
def process_notification(self, event_type, payload):
|
||||
""" Processes a given notification """
|
||||
|
||||
@classmethod
|
||||
def get_opts(cls):
|
||||
return [cfg.StrOpt('domain_id', default=None)]
|
||||
|
||||
def get_domain(self, domain_id):
|
||||
"""
|
||||
Return the domain for this context
|
||||
"""
|
||||
context = MonikerContext.get_admin_context()
|
||||
return self.central_service.get_domain(context, domain_id)
|
||||
|
||||
|
||||
class BaseAddressHandler(Handler):
|
||||
default_format = '%(octet0)s-%(octet1)s-%(octet2)s-%(octet3)s.%(domain)s'
|
||||
|
||||
def _create(self, addresses, extra, managed=True,
|
||||
resource_type=None, resource_id=None):
|
||||
"""
|
||||
Create a a record from addresses
|
||||
|
||||
:param addresses: Address objects like
|
||||
{'version': 4, 'ip': '10.0.0.1'}
|
||||
:param extra: Extra data to use when formatting the record
|
||||
:param managed: Is it a managed resource
|
||||
:param resource_type: The managed resource type
|
||||
:param resource_id: The managed resource ID
|
||||
"""
|
||||
domain = self.get_domain(self.config.domain_id)
|
||||
|
||||
data = extra.copy()
|
||||
data['domain'] = domain['name']
|
||||
|
||||
context = MonikerContext.get_admin_context()
|
||||
|
||||
for addr in addresses:
|
||||
record_data = data.copy()
|
||||
record_data.update(get_ip_data(addr))
|
||||
|
||||
record_name = self.default_format % record_data
|
||||
record_values = {
|
||||
'type': 'A' if addr['version'] == 4 else 'AAAA',
|
||||
'name': record_name,
|
||||
'data': addr['address']}
|
||||
if managed:
|
||||
record_values.update({
|
||||
'managed_resource': managed,
|
||||
'managed_resource_type': resource_type,
|
||||
'managed_resource_id': resource_id})
|
||||
self.central_service.create_record(context, domain['id'],
|
||||
record_values)
|
||||
|
||||
def _delete(self, managed=True, resource_id=None, resource_type='instance',
|
||||
criterion={}):
|
||||
"""
|
||||
Handle a generic delete of a fixed ip within a domain
|
||||
|
||||
:param criterion: Criterion to search and destroy records
|
||||
"""
|
||||
context = MonikerContext.get_admin_context()
|
||||
|
||||
if managed:
|
||||
criterion.update({
|
||||
'managed_resource': managed,
|
||||
'managed_resource_id': resource_id,
|
||||
'managed_resource_type': resource_type
|
||||
})
|
||||
|
||||
records = self.central_service.get_records(context,
|
||||
self.config.domain_id,
|
||||
criterion)
|
||||
for record in records:
|
||||
LOG.debug('Deleting record %s' % record['id'])
|
||||
self.central_service.delete_record(context, self.config.domain_id,
|
||||
record['id'])
|
||||
|
@ -15,39 +15,28 @@
|
||||
# under the License.
|
||||
from moniker.openstack.common import cfg
|
||||
from moniker.openstack.common import log as logging
|
||||
from moniker import exceptions
|
||||
from moniker.context import MonikerContext
|
||||
from moniker.notification_handler.base import Handler
|
||||
from moniker.notification_handler.base import BaseAddressHandler
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NovaHandler(Handler):
|
||||
""" Hanlder for Nova's notifications """
|
||||
class NovaFixedHandler(BaseAddressHandler):
|
||||
__plugin_name__ = 'nova_fixed'
|
||||
""" Handler for Nova's notifications """
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(NovaHandler, self).__init__(*args, **kwargs)
|
||||
|
||||
self.fixed_ip_domain = cfg.CONF.nova_fixed_ip_domain
|
||||
|
||||
if not self.fixed_ip_domain:
|
||||
msg = ('nova_fixed_ip_domain must be configured to use the nova '
|
||||
'handler')
|
||||
raise exceptions.ConfigurationError(msg)
|
||||
|
||||
@staticmethod
|
||||
def register_opts(conf):
|
||||
conf.register_opts([
|
||||
cfg.StrOpt('nova-fixed-ip-domain', default=None),
|
||||
cfg.IntOpt('nova-control-exchange', default='nova'),
|
||||
cfg.ListOpt('nova-notification-topics', default=['monitor'])
|
||||
])
|
||||
@classmethod
|
||||
def get_opts(cls):
|
||||
opts = super(NovaFixedHandler, cls).get_opts()
|
||||
opts.extend([
|
||||
cfg.ListOpt('notification-topics', default=['monitor']),
|
||||
cfg.StrOpt('control-exchange', default='nova')])
|
||||
return opts
|
||||
|
||||
def get_exchange_topics(self):
|
||||
exchange = cfg.CONF.nova_control_exchange
|
||||
exchange = self.config.control_exchange
|
||||
|
||||
topics = [topic + ".info"
|
||||
for topic in cfg.CONF.nova_notification_topics]
|
||||
for topic in self.config.notification_topics]
|
||||
|
||||
return (exchange, topics)
|
||||
|
||||
@ -55,82 +44,18 @@ class NovaHandler(Handler):
|
||||
return [
|
||||
'compute.instance.create.end',
|
||||
'compute.instance.delete.start',
|
||||
# 'compute.instance.rebuild.start', # Needed?
|
||||
# 'compute.instance.rebuild.end', # Needed?
|
||||
# 'compute.instance.exists', # Needed?
|
||||
# 'network.floating_ip.allocate', # Needed?
|
||||
# 'network.floating_ip.deallocate', # Needed?
|
||||
'network.floating_ip.associate',
|
||||
'network.floating_ip.disassociate',
|
||||
]
|
||||
|
||||
def process_notification(self, event_type, payload):
|
||||
LOG.debug('NovaHandler recieved notification - %s' % event_type)
|
||||
LOG.debug('NovaFixedHandler recieved notification - %s' % event_type)
|
||||
|
||||
if event_type == 'compute.instance.create.end':
|
||||
return self.handle_instance_create(payload)
|
||||
self._create(payload['fixed_ips'], payload,
|
||||
resource_id=payload['instance_id'],
|
||||
resource_type='instance')
|
||||
|
||||
elif event_type == 'compute.instance.delete.start':
|
||||
return self.handle_instance_delete(payload)
|
||||
|
||||
elif event_type == 'network.floating_ip.associate':
|
||||
return self.handle_floating_ip_associate(payload)
|
||||
|
||||
elif event_type == 'network.floating_ip.disassociate':
|
||||
return self.handle_floating_ip_disassociate(payload)
|
||||
|
||||
self._delete(resource_id=payload['instance_id'],
|
||||
resource_type='instance')
|
||||
else:
|
||||
raise ValueError('NovaHandler recieved an invalid event type')
|
||||
|
||||
def handle_instance_create(self, payload):
|
||||
context = MonikerContext.get_admin_context()
|
||||
|
||||
# Fetch the FixedIP Domain
|
||||
fixed_ip_domain = self.central_service.get_domain(context,
|
||||
self.fixed_ip_domain)
|
||||
|
||||
# For each fixed ip, create an associated record.
|
||||
for fixed_ip in payload['fixed_ips']:
|
||||
record_name = '%(instance_id)s.%(tenant_id)s.%(domain)s' % dict(
|
||||
instance_id=payload['instance_id'],
|
||||
tenant_id=payload['tenant_id'],
|
||||
domain=fixed_ip_domain['name'])
|
||||
|
||||
record_values = {
|
||||
'type': 'A' if fixed_ip['version'] == 4 else 'AAAA',
|
||||
'name': record_name,
|
||||
'data': fixed_ip['address'],
|
||||
|
||||
'managed_resource': True,
|
||||
'managed_resource_type': u'instance',
|
||||
'managed_resource_id': payload['instance_id'],
|
||||
}
|
||||
|
||||
self.central_service.create_record(context, self.fixed_ip_domain,
|
||||
record_values)
|
||||
|
||||
def handle_instance_delete(self, payload):
|
||||
context = MonikerContext.get_admin_context()
|
||||
|
||||
# Fetch the instances managed records
|
||||
criterion = {
|
||||
'managed_resource': True,
|
||||
'managed_resource_type': u'instance',
|
||||
'managed_resource_id': payload['instance_id']
|
||||
}
|
||||
|
||||
records = self.central_service.get_records(context,
|
||||
self.fixed_ip_domain,
|
||||
criterion)
|
||||
# Delete the matching records
|
||||
for record in records:
|
||||
LOG.debug('Deleting record %s' % record['id'])
|
||||
|
||||
self.central_service.delete_record(context, self.fixed_ip_domain,
|
||||
record['id'])
|
||||
|
||||
def handle_floating_ip_associate(self, payload):
|
||||
pass
|
||||
|
||||
def handle_floating_ip_disassociate(self, payload):
|
||||
pass
|
||||
raise ValueError('NovaFixedHandler recieved an invalid event type')
|
||||
|
114
moniker/plugin.py
Normal file
114
moniker/plugin.py
Normal file
@ -0,0 +1,114 @@
|
||||
# Copyright 2012 Bouvet ASA
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@bouvet.no>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
|
||||
from moniker.openstack.common import cfg
|
||||
from moniker.openstack.common import log as logging
|
||||
from moniker import exceptions
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Plugin(object):
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
__plugin_name__ = None
|
||||
__plugin_type__ = None
|
||||
|
||||
def __init__(self):
|
||||
self.name = self.get_canonical_name()
|
||||
self.config = cfg.CONF[self.name]
|
||||
LOG.debug("Loaded plugin %s", self.name)
|
||||
|
||||
def is_enabled(self):
|
||||
"""
|
||||
Is this Plugin enabled?
|
||||
|
||||
:retval: Boolean
|
||||
"""
|
||||
return True
|
||||
|
||||
@classmethod
|
||||
def get_canonical_name(cls):
|
||||
"""
|
||||
Return the plugin name
|
||||
"""
|
||||
type_ = cls.get_plugin_type()
|
||||
name = cls.get_plugin_name()
|
||||
return "%s:%s" % (type_, name)
|
||||
|
||||
@classmethod
|
||||
def get_plugin_name(cls):
|
||||
return cls.__plugin_name__
|
||||
|
||||
@classmethod
|
||||
def get_plugin_type(cls):
|
||||
return cls.__plugin_type__
|
||||
|
||||
@classmethod
|
||||
def register_group_opts(cls, conf, group_name=None, opts=None):
|
||||
"""
|
||||
Register a set of Options underneath a new Group or Section
|
||||
if you will.
|
||||
|
||||
:param conf: Configuration object
|
||||
:param group_name: Optional group name to register this under
|
||||
Default: ClassName to class_name
|
||||
:param opts: The options to register.
|
||||
"""
|
||||
group_name = group_name or cls.get_canonical_name()
|
||||
if not group_name:
|
||||
raise RuntimeError("Missing name")
|
||||
|
||||
# NOTE(zykes): Always register the group if not the init fails...
|
||||
group = cfg.OptGroup(
|
||||
name=group_name,
|
||||
title="Configuration for %s" % group_name)
|
||||
conf.register_group(group)
|
||||
if opts:
|
||||
conf.register_opts(opts, group=group)
|
||||
else:
|
||||
LOG.debug("No options for %s, skipping registration", group_name)
|
||||
|
||||
@classmethod
|
||||
def register_opts(cls, conf):
|
||||
"""
|
||||
Register the options for this Plugin using the options from
|
||||
cls.get_opts() as a default
|
||||
|
||||
:param conf: Configration object
|
||||
"""
|
||||
opts = cls.get_opts()
|
||||
cls.register_group_opts(conf, opts=opts)
|
||||
|
||||
@classmethod
|
||||
def get_opts(cls):
|
||||
"""
|
||||
Return a list of options for this plugin to be registered underneath
|
||||
it's section
|
||||
"""
|
||||
return []
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start this plugin
|
||||
"""
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop this plugin from doing anything
|
||||
"""
|
@ -29,11 +29,6 @@ cfg.CONF.register_opts([
|
||||
])
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
engine = get_engine(conf)
|
||||
engine.register_opts(conf)
|
||||
|
||||
|
||||
def get_engine_name(string):
|
||||
"""
|
||||
Return the engine name from either a non-dialected or dialected string
|
||||
@ -45,26 +40,25 @@ def get_engine(conf):
|
||||
scheme = urlparse(conf.database_connection).scheme
|
||||
engine_name = get_engine_name(scheme)
|
||||
LOG.debug('Looking for %r engine in %r', engine_name, DRIVER_NAMESPACE)
|
||||
mgr = driver.DriverManager(
|
||||
DRIVER_NAMESPACE,
|
||||
engine_name,
|
||||
invoke_on_load=True)
|
||||
return mgr.driver
|
||||
mgr = driver.DriverManager(DRIVER_NAMESPACE, engine_name)
|
||||
mgr.driver.register_opts(conf)
|
||||
return mgr.driver()
|
||||
|
||||
|
||||
def get_connection(conf):
|
||||
engine = get_engine(conf)
|
||||
engine.register_opts(conf)
|
||||
return engine.get_connection(conf)
|
||||
|
||||
|
||||
def setup_schema():
|
||||
""" Create the DB - Used for testing purposes """
|
||||
LOG.debug("Setting up Schema")
|
||||
connection = get_connection(cfg.CONF)
|
||||
connection.setup_schema()
|
||||
|
||||
|
||||
def teardown_schema():
|
||||
""" Reset the DB to default - Used for testing purposes """
|
||||
LOG.debug("Tearing down Schema")
|
||||
connection = get_connection(cfg.CONF)
|
||||
connection.teardown_schema()
|
||||
|
@ -14,18 +14,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
from moniker.plugin import Plugin
|
||||
|
||||
|
||||
class StorageEngine(object):
|
||||
class StorageEngine(Plugin):
|
||||
""" Base class for storage engines """
|
||||
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
||||
@abc.abstractmethod
|
||||
def register_opts(self, conf):
|
||||
"""
|
||||
Register any configuration options used by this engine.
|
||||
"""
|
||||
__plugin_type__ = 'storage'
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_connection(self, conf):
|
||||
|
@ -14,6 +14,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from sqlalchemy.orm import exc
|
||||
from moniker.openstack.common import cfg
|
||||
from moniker.openstack.common import log as logging
|
||||
from moniker import exceptions
|
||||
from moniker.storage import base
|
||||
@ -23,12 +24,32 @@ from moniker.storage.impl_sqlalchemy.session import get_session
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
SQL_OPTS = [
|
||||
cfg.IntOpt('connection_debug', default=0,
|
||||
help='Verbosity of SQL debugging information. 0=None,'
|
||||
' 100=Everything'),
|
||||
cfg.BoolOpt('connection_trace', default=False,
|
||||
help='Add python stack traces to SQL as comment strings'),
|
||||
cfg.BoolOpt('sqlite_synchronous', default=True,
|
||||
help='If passed, use synchronous mode for sqlite'),
|
||||
cfg.IntOpt('idle_timeout', default=3600,
|
||||
help='timeout before idle sql connections are reaped'),
|
||||
cfg.IntOpt('max_retries', default=10,
|
||||
help='maximum db connection retries during startup. '
|
||||
'(setting -1 implies an infinite retry count)'),
|
||||
cfg.IntOpt('retry_interval', default=10,
|
||||
help='interval between retries of opening a sql connection')
|
||||
]
|
||||
|
||||
|
||||
class SQLAlchemyStorage(base.StorageEngine):
|
||||
OPTIONS = []
|
||||
__plugin_name__ = 'sqlalchemy'
|
||||
|
||||
def register_opts(self, conf):
|
||||
conf.register_opts(self.OPTIONS)
|
||||
@classmethod
|
||||
def get_opts(cls):
|
||||
opts = super(SQLAlchemyStorage, cls).get_opts()
|
||||
opts.extend(SQL_OPTS)
|
||||
return opts
|
||||
|
||||
def get_connection(self, conf):
|
||||
return Connection(conf)
|
||||
|
@ -32,25 +32,6 @@ LOG = logging.getLogger(__name__)
|
||||
_MAKER = None
|
||||
_ENGINE = None
|
||||
|
||||
sql_opts = [
|
||||
cfg.IntOpt('sql_connection_debug', default=0,
|
||||
help='Verbosity of SQL debugging information. 0=None,'
|
||||
' 100=Everything'),
|
||||
cfg.BoolOpt('sql_connection_trace', default=False,
|
||||
help='Add python stack traces to SQL as comment strings'),
|
||||
cfg.BoolOpt('sqlite_synchronous', default=True,
|
||||
help='If passed, use synchronous mode for sqlite'),
|
||||
cfg.IntOpt('sql_idle_timeout', default=3600,
|
||||
help='timeout before idle sql connections are reaped'),
|
||||
cfg.IntOpt('sql_max_retries', default=10,
|
||||
help='maximum db connection retries during startup. '
|
||||
'(setting -1 implies an infinite retry count)'),
|
||||
cfg.IntOpt('sql_retry_interval', default=10,
|
||||
help='interval between retries of opening a sql connection')
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(sql_opts)
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False, autoflush=True):
|
||||
"""Return a SQLAlchemy session."""
|
||||
@ -115,15 +96,15 @@ def get_engine():
|
||||
cfg.CONF.database_connection)
|
||||
|
||||
engine_args = {
|
||||
"pool_recycle": cfg.CONF.sql_idle_timeout,
|
||||
"pool_recycle": cfg.CONF['storage:sqlalchemy'].idle_timeout,
|
||||
"echo": False,
|
||||
'convert_unicode': True,
|
||||
}
|
||||
|
||||
# Map our SQL debug level to SQLAlchemy's options
|
||||
if cfg.CONF.sql_connection_debug >= 100:
|
||||
if cfg.CONF['storage:sqlalchemy'].connection_debug >= 100:
|
||||
engine_args['echo'] = 'debug'
|
||||
elif cfg.CONF.sql_connection_debug >= 50:
|
||||
elif cfg.CONF['storage:sqlalchemy'].connection_debug >= 50:
|
||||
engine_args['echo'] = True
|
||||
|
||||
if "sqlite" in connection_dict.drivername:
|
||||
@ -139,12 +120,12 @@ def get_engine():
|
||||
if 'mysql' in connection_dict.drivername:
|
||||
sqlalchemy.event.listen(_ENGINE, 'checkout', ping_listener)
|
||||
elif "sqlite" in connection_dict.drivername:
|
||||
if not cfg.CONF.sqlite_synchronous:
|
||||
if not cfg.CONF['storage:sqlalchemy'].sqlite_synchronous:
|
||||
sqlalchemy.event.listen(_ENGINE, 'connect',
|
||||
synchronous_switch_listener)
|
||||
sqlalchemy.event.listen(_ENGINE, 'connect', add_regexp_listener)
|
||||
|
||||
if (cfg.CONF.sql_connection_trace and
|
||||
if (cfg.CONF['storage:sqlalchemy'].connection_trace and
|
||||
_ENGINE.dialect.dbapi.__name__ == 'MySQLdb'):
|
||||
import MySQLdb.cursors
|
||||
_do_query = debug_mysql_do_query()
|
||||
@ -156,7 +137,7 @@ def get_engine():
|
||||
if not is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
|
||||
remaining = cfg.CONF.sql_max_retries
|
||||
remaining = cfg.CONF['storage:sqlalchemy'].max_retries
|
||||
if remaining == -1:
|
||||
remaining = 'infinite'
|
||||
while True:
|
||||
@ -164,7 +145,7 @@ def get_engine():
|
||||
LOG.warn(msg % remaining)
|
||||
if remaining != 'infinite':
|
||||
remaining -= 1
|
||||
time.sleep(cfg.CONF.sql_retry_interval)
|
||||
time.sleep(cfg.CONF['storage:sqlalchemy'].retry_interval)
|
||||
try:
|
||||
_ENGINE.connect()
|
||||
break
|
||||
|
@ -38,7 +38,6 @@ class TestCase(unittest2.TestCase):
|
||||
storage.teardown_schema()
|
||||
cfg.CONF.reset()
|
||||
self.mox.UnsetStubs()
|
||||
|
||||
super(TestCase, self).tearDown()
|
||||
|
||||
def config(self, **kwargs):
|
||||
|
@ -13,6 +13,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from moniker.openstack.common import cfg
|
||||
from moniker.openstack.common import log as logging
|
||||
from moniker.tests import TestCase
|
||||
from moniker import backend
|
||||
@ -24,7 +25,7 @@ class BackendDriverTestCase(TestCase):
|
||||
__test__ = False
|
||||
|
||||
def get_backend_driver(self):
|
||||
return backend.get_backend()
|
||||
return backend.get_backend(cfg.CONF)
|
||||
|
||||
def setUp(self):
|
||||
super(BackendDriverTestCase, self).setUp()
|
||||
|
@ -35,16 +35,17 @@ class NovaNotificationHandlerTestCase(NotificationHandlerTestCase):
|
||||
values = {'name': 'exampe.com', 'email': 'info@example.com'}
|
||||
|
||||
domain = self.central_service.create_domain(self.admin_context, values)
|
||||
self.fixed_ip_domain = domain['id']
|
||||
self.domain_id = domain['id']
|
||||
|
||||
# Register handler specific config options
|
||||
nova.NovaHandler.register_opts(cfg.CONF)
|
||||
nova.NovaFixedHandler.register_opts(cfg.CONF)
|
||||
|
||||
# Override default config values
|
||||
self.config(nova_fixed_ip_domain=self.fixed_ip_domain)
|
||||
self.config(domain_id=self.domain_id, group='handler:nova_fixed')
|
||||
|
||||
# Initialize the handler
|
||||
self.handler = nova.NovaHandler(central_service=self.central_service)
|
||||
self.handler = nova.NovaFixedHandler(
|
||||
central_service=self.central_service)
|
||||
|
||||
def test_instance_create_end(self):
|
||||
event_type = 'compute.instance.create.end'
|
||||
@ -54,7 +55,7 @@ class NovaNotificationHandlerTestCase(NotificationHandlerTestCase):
|
||||
|
||||
# Ensure we start with 0 records
|
||||
records = self.central_service.get_records(self.admin_context,
|
||||
self.fixed_ip_domain)
|
||||
self.domain_id)
|
||||
|
||||
self.assertEqual(0, len(records))
|
||||
|
||||
@ -62,7 +63,7 @@ class NovaNotificationHandlerTestCase(NotificationHandlerTestCase):
|
||||
|
||||
# Ensure we now have exactly 1 record
|
||||
records = self.central_service.get_records(self.admin_context,
|
||||
self.fixed_ip_domain)
|
||||
self.domain_id)
|
||||
|
||||
self.assertEqual(len(records), 1)
|
||||
|
||||
@ -82,7 +83,7 @@ class NovaNotificationHandlerTestCase(NotificationHandlerTestCase):
|
||||
|
||||
# Ensure we start with at least 1 record
|
||||
records = self.central_service.get_records(self.admin_context,
|
||||
self.fixed_ip_domain)
|
||||
self.domain_id)
|
||||
|
||||
self.assertGreaterEqual(len(records), 1)
|
||||
|
||||
@ -90,7 +91,7 @@ class NovaNotificationHandlerTestCase(NotificationHandlerTestCase):
|
||||
|
||||
# Ensure we now have exactly 0 records
|
||||
records = self.central_service.get_records(self.admin_context,
|
||||
self.fixed_ip_domain)
|
||||
self.domain_id)
|
||||
|
||||
self.assertEqual(0, len(records))
|
||||
|
||||
|
2
setup.py
2
setup.py
@ -61,7 +61,7 @@ setup(
|
||||
sqlite = moniker.storage.impl_sqlalchemy:SQLAlchemyStorage
|
||||
|
||||
[moniker.notification.handler]
|
||||
nova = moniker.notification_handler.nova:NovaHandler
|
||||
nova_fixed = moniker.notification_handler.nova:NovaFixedHandler
|
||||
|
||||
[moniker.backend]
|
||||
bind9 = moniker.backend.impl_bind9:Bind9Backend
|
||||
|
Loading…
x
Reference in New Issue
Block a user