Implement rpc interfaces of OperationEngine

Include two interfaces that are create_scheduled_operation and
delete_scheduled_operation

Change-Id: Ib2542c8913325d8df7a6534a8fce0c39bd22c22a
Closes-Bug: #1541718
This commit is contained in:
zengchen 2016-03-31 11:56:42 +08:00
parent efa5f318d3
commit a50e1521d7
16 changed files with 585 additions and 49 deletions

View File

@ -365,6 +365,9 @@ def _trigger_get(context, id, session=None):
def trigger_create(context, values):
if not values.get('id'):
values['id'] = str(uuid.uuid4())
trigger_ref = models.Trigger()
trigger_ref.update(values)
trigger_ref.save(get_session())
@ -415,6 +418,9 @@ def _scheduled_operation_get(context, id, columns_to_join=[], session=None):
def scheduled_operation_create(context, values):
if not values.get('id'):
values['id'] = str(uuid.uuid4())
operation_ref = models.ScheduledOperation()
operation_ref.update(values)
operation_ref.save(get_session())

View File

@ -217,6 +217,10 @@ class InvalidOperationObject(Invalid):
message = _("The operation %(operation_id)s is invalid")
class DeleteTriggerNotAllowed(NotAuthorized):
message = _("Can not delete trigger %(trigger_id)s")
class ClassNotFound(NotFound):
message = _("Class %(class_name)s could not be found: %(exception)s")

View File

@ -74,7 +74,7 @@ class Manager(base.Base, PeriodicTasks):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def init_host(self):
def init_host(self, **kwargs):
"""Handle initialization if this is a standalone service.
A hook point for services to execute tasks before the services are made
@ -84,6 +84,13 @@ class Manager(base.Base, PeriodicTasks):
"""
pass
def cleanup_host(self):
"""Hook to do cleanup work when the service shuts down.
Child classes should override this method.
"""
pass
def init_host_with_rpc(self):
"""A hook for service to do jobs after RPC is ready.

View File

@ -12,11 +12,13 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_versionedobjects import fields
from smaug import db
from smaug import exception
from smaug.i18n import _
from smaug import objects
from smaug.objects import base
CONF = cfg.CONF
@ -36,28 +38,45 @@ class ScheduledOperation(base.SmaugPersistentObject, base.SmaugObject,
'operation_type': fields.StringField(),
'project_id': fields.StringField(),
'trigger_id': fields.UUIDField(),
'operation_definition': fields.StringField(),
'operation_definition': fields.DictOfStringsField(),
'trigger': fields.DictOfStringsField(),
'trigger': fields.ObjectField("Trigger")
}
INSTANCE_OPTIONAL_JOINED_FIELDS = ['trigger']
@staticmethod
def _from_db_object(context, op, db_op, expected_attrs=[]):
for name, field in op.fields.items():
if name in op.INSTANCE_OPTIONAL_JOINED_FIELDS:
continue
special_fields = set(['operation_definition'] +
op.INSTANCE_OPTIONAL_JOINED_FIELDS)
normal_fields = set(op.fields) - special_fields
for name in normal_fields:
op[name] = db_op.get(name)
op_definition = db_op['operation_definition']
if op_definition:
op['operation_definition'] = jsonutils.loads(op_definition)
if 'trigger' in expected_attrs:
op['trigger'] = db_op['trigger']
if db_op.get('trigger', None) is None:
op.trigger = None
else:
if not op.obj_attr_is_set('trigger'):
op.trigger = objects.Trigger(context)
op.trigger._from_db_object(context, op.trigger,
db_op['trigger'])
op._context = context
op.obj_reset_changes()
return op
@staticmethod
def _convert_operation_definition_to_db_format(updates):
op_definition = updates.pop('operation_definition', None)
if op_definition is not None:
updates['operation_definition'] = jsonutils.dumps(op_definition)
@base.remotable_classmethod
def get_by_id(cls, context, id, expected_attrs=[]):
columns_to_join = [col for col in expected_attrs
@ -74,6 +93,7 @@ class ScheduledOperation(base.SmaugPersistentObject, base.SmaugObject,
reason=_('already created'))
updates = self.smaug_obj_get_changes()
self._convert_operation_definition_to_db_format(updates)
db_op = db.scheduled_operation_create(self._context, updates)
self._from_db_object(self._context, self, db_op)
@ -99,13 +119,14 @@ class ScheduledOperationList(base.ObjectListBase, base.SmaugObject):
fields = {
'objects': fields.ListOfObjectsField('ScheduledOperation'),
}
child_versions = {
'1.0': '1.0'
}
@base.remotable_classmethod
def get_by_filters(cls, context, filters,
sort_key='created_at', sort_dir='desc', limit=None,
marker=None, expected_attrs=None, use_slave=False,
sort_keys=None, sort_dirs=None):
pass
def get_by_filters(cls, context, filters, limit=None,
marker=None, sort_keys=None, sort_dirs=None):
db_operation_list = db.scheduled_operation_get_all_by_filters_sort(
context, filters, limit=limit, marker=marker,
sort_keys=sort_keys, sort_dirs=sort_dirs)
return base.obj_make_list(context, cls(context), ScheduledOperation,
db_operation_list)

View File

@ -12,6 +12,7 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from oslo_versionedobjects import fields
from smaug import db
@ -35,18 +36,31 @@ class Trigger(base.SmaugPersistentObject, base.SmaugObject,
'name': fields.StringField(),
'project_id': fields.StringField(),
'type': fields.StringField(),
'properties': fields.StringField(),
'properties': fields.DictOfStringsField(),
}
@staticmethod
def _from_db_object(context, trigger, db_trigger):
for name, field in trigger.fields.items():
special_fields = set(['properties'])
normal_fields = set(trigger.fields) - special_fields
for name in normal_fields:
trigger[name] = db_trigger.get(name)
properties = db_trigger['properties']
if properties:
trigger['properties'] = jsonutils.loads(properties)
trigger._context = context
trigger.obj_reset_changes()
return trigger
@staticmethod
def _convert_properties_to_db_format(updates):
properties = updates.pop('properties', None)
if properties is not None:
updates['properties'] = jsonutils.dumps(properties)
@base.remotable_classmethod
def get_by_id(cls, context, id):
db_trigger = db.trigger_get(context, id)
@ -60,6 +74,7 @@ class Trigger(base.SmaugPersistentObject, base.SmaugObject,
reason=_('already created'))
updates = self.smaug_obj_get_changes()
self._convert_properties_to_db_format(updates)
db_trigger = db.trigger_create(self._context, updates)
self._from_db_object(self._context, self, db_trigger)
@ -67,6 +82,7 @@ class Trigger(base.SmaugPersistentObject, base.SmaugObject,
def save(self):
updates = self.smaug_obj_get_changes()
if updates and self.id:
self._convert_properties_to_db_format(updates)
db.trigger_update(self._context, self.id, updates)
self.obj_reset_changes()
@ -83,13 +99,14 @@ class TriggerList(base.ObjectListBase, base.SmaugObject):
fields = {
'objects': fields.ListOfObjectsField('Trigger'),
}
child_versions = {
'1.0': '1.0'
}
@base.remotable_classmethod
def get_by_filters(cls, context, filters,
sort_key='created_at', sort_dir='desc', limit=None,
marker=None, expected_attrs=None, use_slave=False,
sort_keys=None, sort_dirs=None):
pass
def get_by_filters(cls, context, filters, limit=None,
marker=None, sort_keys=None, sort_dirs=None):
db_trigger_list = db.trigger_get_all_by_filters_sort(
context, filters, limit=limit, marker=marker,
sort_keys=sort_keys, sort_dirs=sort_dirs)
return base.obj_make_list(context, cls(context), Trigger,
db_trigger_list)

View File

@ -33,6 +33,19 @@ class API(base.Base):
OperationEngineAPI()
super(API, self).__init__(db_driver)
def create_scheduled_operation(self, context, request_spec):
self.operationengine_rpcapi.create_scheduled_operation(context,
request_spec)
def create_scheduled_operation(self, context, operation_id, trigger_id):
self.operationengine_rpcapi.create_scheduled_operation(
context, operation_id, trigger_id)
def delete_scheduled_operation(self, context, operation_id, trigger_id):
self.operationengine_rpcapi.delete_scheduled_operation(
context, operation_id, trigger_id)
def create_trigger(self, context, trigger):
self.operationengine_rpcapi.create_trigger(context, trigger)
def delete_trigger(self, context, trigger_id):
self.operationengine_rpcapi.delete_trigger(context, trigger_id)
def update_trigger(self, context, trigger):
self.operationengine_rpcapi.update_trigger(context, trigger)

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Executor which receives operations and run them.
"""
from abc import ABCMeta
from abc import abstractmethod
import six
@six.add_metaclass(ABCMeta)
class BaseExecutor(object):
@abstractmethod
def execute_operation(self, operation_id, triggered_time,
expect_start_time=None, window_time=None, **kwargs):
"""Execute an operation.
:param operation_id: ID of operation
:param triggered_time: time when the operation is triggered
:param expect_start_time: expect time when to run the operation
:param window_time: time how long to wait to runn the operation after
expect_start_time
"""
pass
@abstractmethod
def cancel_operation(self, operation_id):
"""Cancel the execution of operation.
There is no effective for the operations which are running, but
for operations which are in waiting, they will not be executed.
:param operation_id: ID of operation
"""
pass
@abstractmethod
def shutdown(self):
"""Shutdown the executor"""
pass

View File

@ -0,0 +1,127 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Manage all triggers.
"""
from smaug import exception
from smaug.i18n import _
from smaug.operationengine.engine import triggers as all_triggers
class TriggerManager(object):
"""Manage all trigger classes which are defined at triggers dir."""
def __init__(self):
super(TriggerManager, self).__init__()
all_cls = all_triggers.all_triggers()
self._trigger_cls_map = {cls.TRIGGER_TYPE:
cls for cls in all_cls}
# self._trigger_obj_map = {
# trigger_id: trigger,
# }
self._trigger_obj_map = {}
# TODO(zengchen) create executor
self._executor = None
def shutdown(self):
for trigger_id, trigger in self._trigger_obj_map.items():
trigger.shutdown()
self._trigger_obj_map.clear()
self._trigger_cls_map.clear()
if self._executor:
self._executor.shutdown()
self._executor = None
def check_trigger_definition(self, trigger_type, trigger_definition):
"""Check trigger definition
:param trigger_type: Type of trigger
:param trigger_definition: Definition of trigger
"""
trigger_cls = self._get_trigger_class(trigger_type)
trigger_cls.check_trigger_definition(trigger_definition)
def add_trigger(self, trigger_id, trigger_type, trigger_property):
if trigger_id in self._trigger_obj_map:
msg = (_("Trigger id %s is exist") % trigger_id)
raise exception.InvalidInput(msg)
trigger_cls = self._get_trigger_class(trigger_type)
trigger = trigger_cls(trigger_id, trigger_property, self._executor)
self._trigger_obj_map[trigger_id] = trigger
def remove_trigger(self, trigger_id):
trigger = self._trigger_obj_map.get(trigger_id, None)
if not trigger:
raise exception.TriggerNotFound(id=trigger_id)
if trigger.has_operations():
raise exception.DeleteTriggerNotAllowed(trigger_id=trigger_id)
trigger.shoutdown()
del self._trigger_obj_map[trigger_id]
def update_trigger(self, trigger_id, trigger_property):
trigger = self._trigger_obj_map.get(trigger_id, None)
if not trigger:
raise exception.TriggerNotFound(id=trigger_id)
trigger.update_trigger_property(trigger_property)
def register_operation(self, trigger_id, operation_id, **kwargs):
"""Register operation definition.
:param trigger_id: The ID of the trigger which
the operation is registered to
:param operation_id: ID of the operation
:param kwargs: Any parameters
:raise InvalidInput if the trigger_type is invalid or
other exceptionis register_operation of trigger raises
"""
trigger = self._trigger_obj_map.get(trigger_id, None)
if not trigger:
raise exception.TriggerNotFound(id=trigger_id)
trigger.register_operation(operation_id, **kwargs)
def unregister_operation(self, trigger_id, operation_id, **kwargs):
"""Unregister operation.
:param trigger_id: The ID of the trigger which
the operation is registered to
:param operation_id: ID of the operation
:raise InvalidInput if the trigger_type is invalid or
other exceptionis unregister_operation of trigger raises
"""
trigger = self._trigger_obj_map.get(trigger_id, None)
if not trigger:
raise exception.TriggerNotFound(id=trigger_id)
trigger.unregister_operation(operation_id, **kwargs)
self._executor.cancel_operation(operation_id)
def _get_trigger_class(self, trigger_type):
cls = self._trigger_cls_map.get(trigger_type, None)
if not cls:
msg = (_("Invalid trigger type:%s") % trigger_type)
raise exception.InvalidInput(msg)
return cls

View File

@ -0,0 +1,65 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import ABCMeta
from abc import abstractmethod
import six
from smaug import loadables
@six.add_metaclass(ABCMeta)
class BaseTrigger(object):
"""Trigger base class that all Triggers should inherit from"""
TRIGGER_TYPE = ""
def __init__(self, trigger_id, executor):
super(BaseTrigger, self).__init__()
self._id = trigger_id
self._operation_ids = set()
self._executor = executor
@abstractmethod
def shutdown(self):
pass
@abstractmethod
def register_operation(self, operation_id, **kwargs):
pass
@abstractmethod
def unregister_operation(self, operation_id, **kwargs):
pass
@abstractmethod
def update_trigger_property(self, trigger_property):
pass
@classmethod
def check_trigger_definition(cls, trigger_definition):
pass
def has_operations(self):
return (len(self._operation_ids) != 0)
class TriggerHandler(loadables.BaseLoader):
def __init__(self):
super(TriggerHandler, self).__init__(BaseTrigger)
def all_triggers():
"""Get all trigger classes."""
return TriggerHandler().get_all_classes()

View File

@ -18,8 +18,12 @@ from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging as messaging
from smaug import exception
from smaug import manager
from smaug.services.protection import api as protection_api
from smaug import objects
from smaug.operationengine.engine import trigger_manager
from smaug.operationengine import scheduled_operation_state
CONF = cfg.CONF
@ -36,8 +40,64 @@ class OperationEngineManager(manager.Manager):
def __init__(self, service_name=None,
*args, **kwargs):
super(OperationEngineManager, self).__init__(*args, **kwargs)
self.protection_api = protection_api.API()
self._service_id = None
self._trigger_manager = None
def create_scheduled_operation(self, context, request_spec=None):
LOG.debug("Received a rpc call from a api service."
"request_spec:%s", request_spec)
def init_host(self, **kwargs):
self._trigger_manager = trigger_manager.TriggerManager()
self._service_id = kwargs.get("service_id")
def cleanup_host(self):
self._trigger_manager.shutdown()
@messaging.expected_exceptions(exception.TriggerNotFound,
exception.InvalidInput,
exception.InvalidOperationObject)
def create_scheduled_operation(self, context, operation_id, trigger_id):
LOG.debug("Create scheduled operation.")
# register operation
self._trigger_manager.register_operation(trigger_id, operation_id)
# create ScheduledOperationState record
state_info = {
"operation_id": operation_id,
"service_id": self._service_id,
"state": scheduled_operation_state.REGISTERED
}
operation_state = objects.ScheduledOperationState(
context, **state_info)
try:
operation_state.create()
except Exception:
self._trigger_manager.unregister_operation(
trigger_id, operation_id)
raise
@messaging.expected_exceptions(exception.ScheduledOperationStateNotFound,
exception.TriggerNotFound,
exception.InvalidInput)
def delete_scheduled_operation(self, context, operation_id, trigger_id):
LOG.debug("Delete scheduled operation.")
operation_state = objects.ScheduledOperationState.\
get_by_operation_id(context, operation_id)
if scheduled_operation_state.DELETED != operation_state.state:
operation_state.state = scheduled_operation_state.DELETED
operation_state.save()
self._trigger_manager.unregister_operation(trigger_id, operation_id)
@messaging.expected_exceptions(exception.InvalidInput)
def create_trigger(self, context, trigger):
self._trigger_manager.add_trigger(trigger.id, trigger.type,
trigger.properties)
@messaging.expected_exceptions(exception.TriggerNotFound,
exception.DeleteTriggerNotAllowed)
def delete_trigger(self, context, trigger_id):
self._trigger_manager.remove_trigger(trigger_id)
@messaging.expected_exceptions(exception.TriggerNotFound)
def update_trigger(self, context, trigger):
self._trigger_manager.update_trigger(trigger.id, trigger.properties)

View File

@ -1,5 +1,3 @@
# Copyright 2012, Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -18,7 +16,6 @@ Client side of the OperationEngine manager RPC API.
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_serialization import jsonutils
from smaug import rpc
@ -42,10 +39,24 @@ class OperationEngineAPI(object):
version=self.RPC_API_VERSION)
self.client = rpc.get_client(target, version_cap=None)
def create_scheduled_operation(self, ctxt, request_spec=None):
request_spec_p = jsonutils.to_primitive(request_spec)
def create_scheduled_operation(self, ctxt, operation_id, trigger_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.cast(
ctxt,
'create_scheduled_operation',
request_spec=request_spec_p)
return cctxt.call(ctxt, 'create_scheduled_operation',
operation_id=operation_id, trigger_id=trigger_id)
def delete_scheduled_operation(self, ctxt, operation_id, trigger_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'delete_scheduled_operation',
operation_id=operation_id, trigger_id=trigger_id)
def create_trigger(self, ctxt, trigger):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'create_trigger', trigger=trigger)
def delete_trigger(self, ctxt, trigger_id):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'delete_trigger', trigger_id=trigger_id)
def update_trigger(self, ctxt, trigger):
cctxt = self.client.prepare(version='1.0')
return cctxt.call(ctxt, 'update_trigger', trigger=trigger)

View File

@ -0,0 +1,25 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduled operation state
"""
INIT = 'init'
REGISTERED = 'registered'
TRIGGERED = 'triggered'
RUNNING = 'running'
DELETED = 'deleted'

View File

@ -102,7 +102,6 @@ class Service(service.Service):
LOG.info(_LI('Starting %(topic)s node (version %(version_string)s)'),
{'topic': self.topic, 'version_string': version_string})
self.model_disconnected = False
self.manager.init_host()
ctxt = context.get_admin_context()
try:
service_ref = db.service_get_by_args(ctxt,
@ -112,6 +111,8 @@ class Service(service.Service):
except exception.NotFound:
self._create_service_ref(ctxt)
self.manager.init_host(service_id=self.service_id)
LOG.debug("Creating RPC server for service %s", self.topic)
target = messaging.Target(topic=self.topic, server=self.host)

View File

@ -11,6 +11,7 @@
# under the License.
import mock
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from smaug import objects
@ -39,32 +40,48 @@ class TestScheduledOperation(test_objects.BaseObjectsTestCase):
@mock.patch('smaug.db.scheduled_operation_get')
def test_get_by_id(self, operation_get):
db_op = Fake_Operation
db_op = Fake_Operation.copy()
operation_get.return_value = db_op
op = self.Operation_Class.get_by_id(self.context, Operation_ID)
db_op['operation_definition'] = jsonutils.loads(
db_op['operation_definition'])
self._compare(self, db_op, op)
operation_get.assert_called_once_with(self.context, Operation_ID, [])
@mock.patch('smaug.db.scheduled_operation_get')
def test_get_join_trigger(self, operation_get):
db_op = Fake_Operation.copy()
db_op['trigger'] = {'type': 'time'}
db_op['trigger'] = {
'created_at': NOW,
'deleted_at': None,
'updated_at': NOW,
'deleted': False,
'id': '123',
'name': 'daily',
'project_id': '123',
'type': 'time',
'properties': '{}'
}
operation_get.return_value = db_op
op = self.Operation_Class.get_by_id(self.context,
Operation_ID, ['trigger'])
self._compare(self, db_op, op)
db_op['operation_definition'] = jsonutils.loads(
db_op['operation_definition'])
self.assertEqual(db_op['trigger']['type'], op.trigger.type)
operation_get.assert_called_once_with(self.context,
Operation_ID, ['trigger'])
@mock.patch('smaug.db.scheduled_operation_create')
def test_create(self, operation_create):
db_op = Fake_Operation
db_op = Fake_Operation.copy()
operation_create.return_value = db_op
op = self.Operation_Class(context=self.context)
op.create()
db_op['operation_definition'] = jsonutils.loads(
db_op['operation_definition'])
self._compare(self, db_op, op)
operation_create.assert_called_once_with(self.context, {})

View File

@ -11,6 +11,7 @@
# under the License.
import mock
from oslo_serialization import jsonutils
from oslo_utils import timeutils
from smaug import objects
@ -38,20 +39,22 @@ class TestTrigger(test_objects.BaseObjectsTestCase):
@mock.patch('smaug.db.trigger_get')
def test_get_by_id(self, trigger_get):
db_trigger = Fake_Trigger
db_trigger = Fake_Trigger.copy()
trigger_get.return_value = db_trigger
trigger = self.Trigger_Class.get_by_id(self.context, Trigger_ID)
db_trigger['properties'] = jsonutils.loads(db_trigger['properties'])
self._compare(self, db_trigger, trigger)
trigger_get.assert_called_once_with(self.context, Trigger_ID)
@mock.patch('smaug.db.trigger_create')
def test_create(self, trigger_create):
db_trigger = Fake_Trigger
db_trigger = Fake_Trigger.copy()
trigger_create.return_value = db_trigger
trigger = self.Trigger_Class(context=self.context)
trigger.create()
db_trigger['properties'] = jsonutils.loads(db_trigger['properties'])
self._compare(self, db_trigger, trigger)
trigger_create.assert_called_once_with(self.context, {})

View File

@ -0,0 +1,107 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_messaging.rpc import dispatcher as rpc_dispatcher
from smaug import context
from smaug import objects
from smaug.operationengine import manager as service_manager
from smaug.operationengine import scheduled_operation_state
from smaug.tests import base
class FakeTriggerManager(object):
def register_operation(self, trigger_id, operation_id, **kwargs):
pass
def unregister_operation(self, trigger_id, operation_id, **kwargs):
pass
class OperationEngineManagerTestCase(base.TestCase):
"""Test cases for OperationEngineManager class."""
def setUp(self):
super(OperationEngineManagerTestCase, self).setUp()
self.manager = service_manager.OperationEngineManager()
self.manager._service_id = 0
self.manager._trigger_manager = FakeTriggerManager()
self.ctxt = context.get_admin_context()
self._trigger = self._create_one_trigger()
self._operation = self._create_scheduled_operation(self._trigger.id)
def test_create_operation(self):
operation_id = "1234"
self.manager.create_scheduled_operation(
self.ctxt, operation_id, self._trigger.id)
state_obj = objects.ScheduledOperationState.get_by_operation_id(
self.ctxt, operation_id)
self.assertTrue(state_obj is not None)
def test_delete_operation_get_state_failed(self):
self.assertRaises(rpc_dispatcher.ExpectedException,
self.manager.delete_scheduled_operation,
self.ctxt, self._operation.id, 1)
def test_delete_operation(self):
state = self._create_operation_state(self._operation.id)
self.manager.delete_scheduled_operation(
self.ctxt, self._operation.id, 1)
state = objects.ScheduledOperationState.get_by_operation_id(
self.ctxt, self._operation.id)
self.assertEqual(scheduled_operation_state.DELETED, state.state)
def _create_one_trigger(self):
trigger_info = {
'project_id': "123",
"name": "123",
"type": "time",
"properties": {
"format": "crontab",
"pattern": "* * * * *"
},
}
trigger = objects.Trigger(self.ctxt, **trigger_info)
trigger.create()
return trigger
def _create_scheduled_operation(self, trigger_id):
operation_info = {
"name": "123",
"operation_type": "protect",
"project_id": "123",
"trigger_id": trigger_id,
"operation_definition": {
"plan_id": ""
},
}
operation = objects.ScheduledOperation(self.ctxt, **operation_info)
operation.create()
return operation
def _create_operation_state(self, operation_id):
state_info = {
"operation_id": operation_id,
"service_id": self.manager._service_id,
"state": scheduled_operation_state.REGISTERED
}
operation_state = objects.ScheduledOperationState(context,
**state_info)
operation_state.create()
return operation_state