From ca37d25e7c48c22a205354e9e1bb1667fb3c45c0 Mon Sep 17 00:00:00 2001 From: zengchen Date: Fri, 19 Feb 2016 18:21:23 +0800 Subject: [PATCH] Implement executor of OperationEngine Change-Id: I82c0690b60f0e9d61c4b759107b96928ad149122 Closes-Bug: #1547392 --- setup.cfg | 3 +- smaug/common/constants.py | 11 ++ smaug/db/api.py | 18 +++ smaug/db/sqlalchemy/api.py | 31 ++++ .../migrate_repo/versions/001_smaug_init.py | 1 + smaug/db/sqlalchemy/models.py | 1 + smaug/objects/scheduled_operation_log.py | 6 + smaug/objects/scheduled_operation_state.py | 1 + .../engine/executors/__init__.py | 0 .../operationengine/engine/executors/base.py | 2 +- .../executors/scheduled_operation_executor.py | 140 ++++++++++++++++ .../engine/executors/thread_pool_executor.py | 95 +++++++++++ .../operationengine/engine/trigger_manager.py | 7 +- .../operationengine/operation_manager.py | 8 +- .../operationengine/operations/__init__.py | 22 ++- .../operations/protect_operation.py | 8 +- .../scheduled_operation_state.py | 25 --- smaug/tests/unit/db/test_models.py | 34 +++- .../objects/test_scheduled_operation_state.py | 2 +- .../engine/executors/__init__.py | 0 .../executors/test_thread_pool_executor.py | 152 ++++++++++++++++++ 21 files changed, 528 insertions(+), 39 deletions(-) create mode 100644 smaug/services/operationengine/engine/executors/__init__.py create mode 100644 smaug/services/operationengine/engine/executors/scheduled_operation_executor.py create mode 100644 smaug/services/operationengine/engine/executors/thread_pool_executor.py delete mode 100644 smaug/services/operationengine/scheduled_operation_state.py create mode 100644 smaug/tests/unit/operationengine/engine/executors/__init__.py create mode 100644 smaug/tests/unit/operationengine/engine/executors/test_thread_pool_executor.py diff --git a/setup.cfg b/setup.cfg index 6831c9d7..023486e8 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,11 +44,12 @@ smaug.protections = smaug-volume-protection-plugin = smaug.services.protection.plugins.cinder_backup_plugin:CinderBackupPlugin smaug.provider = provider-registry = smaug.services.protection.provider:ProviderRegistry - smaug.protectables = project = smaug.services.protection.protectable_plugins.project:ProjectProtectablePlugin server = smaug.services.protection.protectable_plugins.server:ServerProtectablePlugin volume = smaug.services.protection.protectable_plugins.volume:VolumeProtectablePlugin +smaug.operationengine.engine.executor = + executor = smaug.operationengine.engine.executors.thread_pool_executor:ThreadPoolExecutor [build_sphinx] source-dir = doc/source diff --git a/smaug/common/constants.py b/smaug/common/constants.py index 7ca9ce61..21fc9185 100644 --- a/smaug/common/constants.py +++ b/smaug/common/constants.py @@ -44,3 +44,14 @@ RESOURCE_STATUS_AVAILABLE = 'available' RESOURCE_STATUS_DELETING = 'deleting' RESOURCE_STATUS_DELETED = 'deleted' RESOURCE_STATUS_UNDEFINED = 'undefined' + +# scheduled operation state +OPERATION_STATE_INIT = 'init' +OPERATION_STATE_REGISTERED = 'registered' +OPERATION_STATE_TRIGGERED = 'triggered' +OPERATION_STATE_RUNNING = 'running' +OPERATION_STATE_DELETED = 'deleted' + +# scheduled operation run type +OPERATION_RUN_TYPE_EXECUTE = 'execute' +OPERATION_RUN_TYPE_RESUME = 'resume' diff --git a/smaug/db/api.py b/smaug/db/api.py index 9af5e9d8..5abc6b07 100644 --- a/smaug/db/api.py +++ b/smaug/db/api.py @@ -392,6 +392,24 @@ def scheduled_operation_log_delete(context, log_id): return IMPL.scheduled_operation_log_delete(context, log_id) +def scheduled_operation_log_delete_oldest(context, operation_id, + retained_num, excepted_states=[]): + """Delete the oldest scheduled operation logs from the database. + + :param context: The security context + :param operation_id: ID of the scheduled operation + :param retained_num: The number of retained logs + :param excepted_states: If the state of log is in excepted_states, + it will not be deleted. + """ + return IMPL.scheduled_operation_log_delete_oldest(context, operation_id, + retained_num, + excepted_states) + + +################### + + def plan_get(context, plan_id): """Get a plan or raise if it does not exist.""" return IMPL.plan_get(context, plan_id) diff --git a/smaug/db/sqlalchemy/api.py b/smaug/db/sqlalchemy/api.py index eb04dc06..910a22dc 100644 --- a/smaug/db/sqlalchemy/api.py +++ b/smaug/db/sqlalchemy/api.py @@ -31,6 +31,7 @@ from oslo_utils import timeutils from sqlalchemy import or_ from sqlalchemy.orm import joinedload from sqlalchemy.orm import RelationshipProperty +from sqlalchemy.sql import expression from sqlalchemy.sql.expression import literal_column from sqlalchemy.sql import func @@ -646,6 +647,36 @@ def scheduled_operation_log_delete(context, log_id): session.flush() +def scheduled_operation_log_delete_oldest(context, operation_id, + retained_num, excepted_states): + table = models.ScheduledOperationLog + session = get_session() + with session.begin(): + result = model_query(context, table, session=session).filter_by( + operation_id=operation_id).order_by( + expression.desc(table.created_at)).limit(retained_num).all() + + if not result or len(result) < retained_num: + return + oldest_create_time = result[-1]['created_at'] + + if excepted_states and isinstance(excepted_states, list): + filters = expression.and_( + table.operation_id == operation_id, + table.created_at < oldest_create_time, + table.state.notin_(excepted_states)) + else: + filters = expression.and_( + table.operation_id == operation_id, + table.created_at < oldest_create_time) + + model_query(context, table, session=session).filter( + filters).delete(synchronize_session=False) + + +################### + + def _resource_refs(resource_list, meta_class): resource_refs = [] if resource_list: diff --git a/smaug/db/sqlalchemy/migrate_repo/versions/001_smaug_init.py b/smaug/db/sqlalchemy/migrate_repo/versions/001_smaug_init.py index 7848d3ca..962915c9 100644 --- a/smaug/db/sqlalchemy/migrate_repo/versions/001_smaug_init.py +++ b/smaug/db/sqlalchemy/migrate_repo/versions/001_smaug_init.py @@ -126,6 +126,7 @@ def define_tables(meta): Column('service_id', Integer, ForeignKey('services.id'), nullable=False), Column('state', String(length=32), nullable=False), + Column('end_time_for_run', DateTime), mysql_engine='InnoDB' ) diff --git a/smaug/db/sqlalchemy/models.py b/smaug/db/sqlalchemy/models.py index 32ca5775..4089576b 100644 --- a/smaug/db/sqlalchemy/models.py +++ b/smaug/db/sqlalchemy/models.py @@ -109,6 +109,7 @@ class ScheduledOperationState(BASE, SmaugBase): nullable=False) service_id = Column(Integer, ForeignKey('services.id'), nullable=False) state = Column(String(32), nullable=False) + end_time_for_run = Column(DateTime) operation = orm.relationship( ScheduledOperation, diff --git a/smaug/objects/scheduled_operation_log.py b/smaug/objects/scheduled_operation_log.py index 644be50c..b3807414 100644 --- a/smaug/objects/scheduled_operation_log.py +++ b/smaug/objects/scheduled_operation_log.py @@ -77,3 +77,9 @@ class ScheduledOperationLog(base.SmaugPersistentObject, base.SmaugObject, def destroy(self): if self.id is not None: db.scheduled_operation_log_delete(self._context, self.id) + + @base.remotable_classmethod + def destroy_oldest(cls, context, operation_id, + retained_num, excepted_states=[]): + db.scheduled_operation_log_delete_oldest( + context, operation_id, retained_num, excepted_states) diff --git a/smaug/objects/scheduled_operation_state.py b/smaug/objects/scheduled_operation_state.py index 1fb7ae08..983763c5 100644 --- a/smaug/objects/scheduled_operation_state.py +++ b/smaug/objects/scheduled_operation_state.py @@ -36,6 +36,7 @@ class ScheduledOperationState(base.SmaugPersistentObject, base.SmaugObject, 'operation_id': fields.UUIDField(), 'service_id': fields.IntegerField(), 'state': fields.StringField(), + 'end_time_for_run': fields.DateTimeField(nullable=True), 'operation': fields.ObjectField("ScheduledOperation") } diff --git a/smaug/services/operationengine/engine/executors/__init__.py b/smaug/services/operationengine/engine/executors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/smaug/services/operationengine/engine/executors/base.py b/smaug/services/operationengine/engine/executors/base.py index 7e079245..a357ef90 100644 --- a/smaug/services/operationengine/engine/executors/base.py +++ b/smaug/services/operationengine/engine/executors/base.py @@ -24,7 +24,7 @@ class BaseExecutor(object): @abstractmethod def execute_operation(self, operation_id, triggered_time, - expect_start_time=None, window_time=None, **kwargs): + expect_start_time, window_time, **kwargs): """Execute an operation. :param operation_id: ID of operation diff --git a/smaug/services/operationengine/engine/executors/scheduled_operation_executor.py b/smaug/services/operationengine/engine/executors/scheduled_operation_executor.py new file mode 100644 index 00000000..a472598d --- /dev/null +++ b/smaug/services/operationengine/engine/executors/scheduled_operation_executor.py @@ -0,0 +1,140 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + +from abc import abstractmethod +from datetime import datetime +from datetime import timedelta +from oslo_config import cfg +from oslo_log import log as logging +from oslo_utils import timeutils + +from smaug.common import constants +from smaug import context +from smaug.i18n import _LE, _LW +from smaug import objects +from smaug.services.operationengine.engine.executors import base +from smaug.services.operationengine import operation_manager + +CONF = cfg.CONF + +LOG = logging.getLogger(__name__) + + +class ScheduledOperationExecutor(base.BaseExecutor): + + _CHECK_ITEMS = { + 'is_waiting': 'is_waiting', + 'is_canceled': 'is_canceled' + } + + def __init__(self): + super(ScheduledOperationExecutor, self).__init__() + self._operation_manager = operation_manager.OperationManager() + + def execute_operation(self, operation_id, triggered_time, + expect_start_time, window_time, **kwargs): + + if self._check_operation(operation_id, self._CHECK_ITEMS.values()): + LOG.warn(_LW("Execute operation(%s), it can't be executed"), + operation_id) + return + + end_time_for_run = expect_start_time + timedelta(seconds=window_time) + ret = self._update_operation_state( + operation_id, + {'state': constants.OPERATION_STATE_TRIGGERED, + 'end_time_for_run': end_time_for_run}) + if not ret: + return + + param = { + 'operation_id': operation_id, + 'triggered_time': triggered_time, + 'expect_start_time': expect_start_time, + 'window_time': window_time, + 'run_type': constants.OPERATION_RUN_TYPE_EXECUTE + } + self._execute_operation(operation_id, self._run_operation, param) + + def resume_operation(self, operation_id, **kwargs): + end_time = kwargs.get('end_time_for_run') + now = datetime.utcnow() + if not isinstance(end_time, datetime) or now > end_time: + return + + window = int(timeutils.delta_seconds(now, end_time)) + param = { + 'operation_id': operation_id, + 'triggered_time': now, + 'expect_start_time': now, + 'window_time': window, + 'run_type': constants.OPERATION_RUN_TYPE_RESUME + } + self._execute_operation(operation_id, self._run_operation, param) + + def _run_operation(self, operation_id, param): + + self._update_operation_state( + operation_id, + {'state': constants.OPERATION_STATE_RUNNING}) + + try: + check_item = [self._CHECK_ITEMS['is_canceled']] + if self._check_operation(operation_id, check_item): + return + + try: + operation = objects.ScheduledOperation.get_by_id( + context.get_admin_context(), operation_id) + except Exception: + LOG.exception(_LE("Run operation(%s), get operation failed"), + operation_id) + return + + try: + self._operation_manager.run_operation( + operation.operation_type, operation.project_id, + operation.operation_definition, + param=param) + except Exception: + LOG.exception(_LE("Run operation(%s) failed"), operation_id) + + finally: + self._update_operation_state( + operation_id, + {'state': constants.OPERATION_STATE_REGISTERED}) + + def _update_operation_state(self, operation_id, updates): + + ctxt = context.get_admin_context() + try: + state_ref = objects.ScheduledOperationState.get_by_operation_id( + ctxt, operation_id) + for item, value in six.iteritems(updates): + setattr(state_ref, item, value) + state_ref.save() + except Exception: + LOG.exception(_LE("Execute operation(%s), update state failed"), + operation_id) + return False + return True + + @abstractmethod + def _execute_operation(self, operation_id, funtion, param): + pass + + @abstractmethod + def _check_operation(self, operation_id, check_items): + """Check whether the item in check_items happens""" + pass diff --git a/smaug/services/operationengine/engine/executors/thread_pool_executor.py b/smaug/services/operationengine/engine/executors/thread_pool_executor.py new file mode 100644 index 00000000..cd3f9827 --- /dev/null +++ b/smaug/services/operationengine/engine/executors/thread_pool_executor.py @@ -0,0 +1,95 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from collections import defaultdict +from concurrent import futures +from oslo_config import cfg +from oslo_log import log as logging +from threading import RLock + +from smaug.i18n import _LE +from smaug.services.operationengine.engine.executors import\ + scheduled_operation_executor as base_executor + +executor_opts = [ + cfg.IntOpt('thread_count', + default=10, + help='The count of thread which executor will start') + ] + +CONF = cfg.CONF +CONF.register_opts(executor_opts) + +LOG = logging.getLogger(__name__) + + +class ThreadPoolExecutor(base_executor.ScheduledOperationExecutor): + + def __init__(self, thread_count=None): + super(ThreadPoolExecutor, self).__init__() + + if thread_count is None: + thread_count = CONF.thread_count + + self._pool = futures.ThreadPoolExecutor(thread_count) + self._operation_to_run = defaultdict(int) + self._operation_to_cancel = set() + self._lock = RLock() + + self._check_functions = { + self._CHECK_ITEMS['is_waiting']: lambda op_id: ( + op_id in self._operation_to_run), + + self._CHECK_ITEMS['is_canceled']: lambda op_id: ( + op_id in self._operation_to_cancel), + } + + def shutdown(self, wait=True): + self._pool.shutdown(wait) + self._operation_to_run.clear() + self._operation_to_cancel.clear() + + def cancel_operation(self, operation_id): + with self._lock: + if operation_id in self._operation_to_run: + self._operation_to_cancel.add(operation_id) + + def _check_operation(self, operation_id, check_items): + with self._lock: + return any(self._check_functions[item](operation_id) + for item in check_items) + + def _execute_operation(self, operation_id, function, param): + + def callback(f): + self._finish_operation(operation_id) + + with self._lock: + self._operation_to_run[operation_id] += 1 + + try: + f = self._pool.submit(function, operation_id, param) + f.add_done_callback(callback) + + except Exception: + self._operation_to_run[operation_id] -= 1 + LOG.exception(_LE("Submit operation(%(o_id)s) failed."), + operation_id) + + def _finish_operation(self, operation_id): + with self._lock: + self._operation_to_run[operation_id] -= 1 + if 0 == self._operation_to_run[operation_id]: + del self._operation_to_run[operation_id] + + if operation_id in self._operation_to_cancel: + self._operation_to_cancel.remove(operation_id) diff --git a/smaug/services/operationengine/engine/trigger_manager.py b/smaug/services/operationengine/engine/trigger_manager.py index abfaa072..f7f5e7a8 100644 --- a/smaug/services/operationengine/engine/trigger_manager.py +++ b/smaug/services/operationengine/engine/trigger_manager.py @@ -13,6 +13,7 @@ """ Manage all triggers. """ +from stevedore import driver as import_driver from smaug import exception from smaug.i18n import _ @@ -34,8 +35,10 @@ class TriggerManager(object): # } self._trigger_obj_map = {} - # TODO(zengchen) create executor - self._executor = None + executor_cls = import_driver.DriverManager( + 'smaug.operationengine.engine.executor', + 'executor').driver + self._executor = executor_cls() def shutdown(self): diff --git a/smaug/services/operationengine/operation_manager.py b/smaug/services/operationengine/operation_manager.py index 18e33867..eba3fac0 100644 --- a/smaug/services/operationengine/operation_manager.py +++ b/smaug/services/operationengine/operation_manager.py @@ -56,9 +56,9 @@ class OperationManager(object): cls = self._get_operation_cls(operation_type) cls.check_operation_definition(operation_definition) - def execute_operation(self, operation_type, project_id, - operation_definition): - """Execute operation. + def run_operation(self, operation_type, project_id, + operation_definition, **kwargs): + """Run operation. :param operation_type: the type of operation :param project_id: the id of tenant @@ -66,4 +66,4 @@ class OperationManager(object): :raise InvalidInput if the operation_type is invalid. """ cls = self._get_operation_cls(operation_type) - cls.execute(project_id, operation_definition) + cls.run_operation(project_id, operation_definition, **kwargs) diff --git a/smaug/services/operationengine/operations/__init__.py b/smaug/services/operationengine/operations/__init__.py index 799ed7fc..33f561e7 100644 --- a/smaug/services/operationengine/operations/__init__.py +++ b/smaug/services/operationengine/operations/__init__.py @@ -17,6 +17,7 @@ Operation classes from abc import ABCMeta import six +from smaug.common import constants from smaug import loadables @@ -34,11 +35,30 @@ class Operation(object): pass @classmethod - def execute(self, project_id, operation_definition): + def run_operation(cls, project_id, operation_definition, **kwargs): + param = kwargs.get('param') + if constants.OPERATION_RUN_TYPE_EXECUTE == param['run_type']: + cls._execute(project_id, operation_definition, param) + else: + cls._resume(project_id, operation_definition, param) + + @classmethod + def _execute(self, project_id, operation_definition, param): """Execute operation. :param project_id: the id of tenant :param operation_definition: the definition of operation + :param param: dict, other parameters + """ + pass + + @classmethod + def _resume(self, project_id, operation_definition, param): + """Resume operation. + + :param project_id: the id of tenant + :param operation_definition: the definition of operation + :param param: dict, other parameters """ pass diff --git a/smaug/services/operationengine/operations/protect_operation.py b/smaug/services/operationengine/operations/protect_operation.py index 6e7d8960..380400e0 100644 --- a/smaug/services/operationengine/operations/protect_operation.py +++ b/smaug/services/operationengine/operations/protect_operation.py @@ -27,7 +27,13 @@ class ProtectOperation(operations.Operation): raise exception.InvalidOperationDefinition(reason=reason) @classmethod - def execute(self, project_id, operation_definition): + def _execute(self, project_id, operation_definition, param): + # plan_id = operation_definition.get("plan_id") + # TODO(chenzeng): invoke create checkpoint interface + pass + + @classmethod + def _resume(self, project_id, operation_definition, param): # plan_id = operation_definition.get("plan_id") # TODO(chenzeng): invoke create checkpoint interface pass diff --git a/smaug/services/operationengine/scheduled_operation_state.py b/smaug/services/operationengine/scheduled_operation_state.py deleted file mode 100644 index f9071cb4..00000000 --- a/smaug/services/operationengine/scheduled_operation_state.py +++ /dev/null @@ -1,25 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -Scheduled operation state -""" - -INIT = 'init' - -REGISTERED = 'registered' - -TRIGGERED = 'triggered' - -RUNNING = 'running' - -DELETED = 'deleted' diff --git a/smaug/tests/unit/db/test_models.py b/smaug/tests/unit/db/test_models.py index b3b7adfa..e3fd0387 100644 --- a/smaug/tests/unit/db/test_models.py +++ b/smaug/tests/unit/db/test_models.py @@ -12,6 +12,8 @@ """Tests for Models Database.""" +from datetime import datetime +from datetime import timedelta from oslo_config import cfg from oslo_utils import uuidutils import six @@ -292,11 +294,14 @@ class ScheduledOperationLogTestCase(base.TestCase): def setUp(self): super(ScheduledOperationLogTestCase, self).setUp() self.ctxt = context.get_admin_context() + self.operation_id = '0354ca9ddcd046b693340d78759fd274' - def _create_scheduled_operation_log(self): + def _create_scheduled_operation_log(self, state='in_progress', + created_at=datetime.now()): values = { - 'operation_id': '0354ca9ddcd046b693340d78759fd274', - 'state': 'in_progress', + 'operation_id': self.operation_id, + 'state': state, + 'created_at': created_at } return db.scheduled_operation_log_create(self.ctxt, values) @@ -320,6 +325,29 @@ class ScheduledOperationLogTestCase(base.TestCase): db.scheduled_operation_log_delete, self.ctxt, 100) + def test_scheduled_operation_log_delete_oldest(self): + log_ids = [] + states = ['success', 'in_progress', 'success', 'success'] + for i in range(4): + t = datetime.now() + timedelta(hours=i) + log = self._create_scheduled_operation_log( + states[i], t) + log_ids.append(log['id']) + + db.scheduled_operation_log_delete_oldest( + self.ctxt, self.operation_id, 3) + self.assertRaises(exception.ScheduledOperationLogNotFound, + db.scheduled_operation_log_get, + self.ctxt, log_ids[0]) + + db.scheduled_operation_log_delete_oldest( + self.ctxt, self.operation_id, 1, ['in_progress']) + log_ref = db.scheduled_operation_log_get(self.ctxt, log_ids[1]) + self.assertEqual('in_progress', log_ref['state']) + self.assertRaises(exception.ScheduledOperationLogNotFound, + db.scheduled_operation_log_get, + self.ctxt, log_ids[2]) + def test_scheduled_operation_log_update(self): log_ref = self._create_scheduled_operation_log() log_id = log_ref['id'] diff --git a/smaug/tests/unit/objects/test_scheduled_operation_state.py b/smaug/tests/unit/objects/test_scheduled_operation_state.py index 05f57cfb..6d05c747 100644 --- a/smaug/tests/unit/objects/test_scheduled_operation_state.py +++ b/smaug/tests/unit/objects/test_scheduled_operation_state.py @@ -136,6 +136,6 @@ class TestScheduledOperationStateList(test_objects.BaseObjectsTestCase): 'service_id': service_id, 'state': 'triggered', } - state = objects.ScheduledOperationState(self, **state_info) + state = objects.ScheduledOperationState(self.context, **state_info) state.create() return state diff --git a/smaug/tests/unit/operationengine/engine/executors/__init__.py b/smaug/tests/unit/operationengine/engine/executors/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/smaug/tests/unit/operationengine/engine/executors/test_thread_pool_executor.py b/smaug/tests/unit/operationengine/engine/executors/test_thread_pool_executor.py new file mode 100644 index 00000000..05cd07fc --- /dev/null +++ b/smaug/tests/unit/operationengine/engine/executors/test_thread_pool_executor.py @@ -0,0 +1,152 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from datetime import datetime +from datetime import timedelta +import mock +import time + +from smaug.common import constants +from smaug import context +from smaug import objects +from smaug.services.operationengine.engine.executors import\ + thread_pool_executor +from smaug.tests import base + +NOW = datetime.utcnow() +Log_ID = 0 +Operation_ID = "123" + +Fake_Log = { + 'created_at': NOW, + 'deleted_at': None, + 'updated_at': NOW, + 'deleted': False, + 'id': Log_ID, + 'operation_id': Operation_ID, + 'expect_start_time': NOW, + 'triggered_time': NOW, + 'actual_start_time': NOW, + 'end_time': NOW, + 'state': 'in_progress', + 'extend_info': '', +} + +Fake_Operation = { + 'created_at': NOW, + 'deleted_at': None, + 'updated_at': NOW, + 'deleted': False, + 'id': Operation_ID, + 'name': 'protect vm', + 'operation_type': 'protect', + 'project_id': '123', + 'trigger_id': '0354ca9ddcd046b693340d78759fd275', + 'operation_definition': '{}' +} + + +class ThreadPoolExecutorTestCase(base.TestCase): + + def setUp(self): + super(ThreadPoolExecutorTestCase, self).setUp() + + self._executor = thread_pool_executor.ThreadPoolExecutor() + self.context = context.get_admin_context() + + def tearDown(self): + super(ThreadPoolExecutorTestCase, self).tearDown() + self._executor.shutdown() + + @mock.patch('smaug.services.operationengine.operations.' + 'protect_operation.ProtectOperation._execute') + def test_execute_operation(self, execute): + operation = self._create_operation() + self._create_operation_state(operation.id, 0) + + now = datetime.utcnow() + window_time = 30 + + self._executor.execute_operation(operation.id, now, now, window_time) + + time.sleep(1) + + self.assertEqual(0, len(self._executor._operation_to_run)) + + param = { + 'operation_id': operation.id, + 'triggered_time': now, + 'expect_start_time': now, + 'window_time': window_time, + 'run_type': constants.OPERATION_RUN_TYPE_EXECUTE + } + execute.assert_called_once_with( + operation.project_id, operation.operation_definition, param) + + state = objects.ScheduledOperationState.get_by_operation_id( + self.context, operation.id) + self.assertTrue(state.end_time_for_run is not None) + self.assertTrue(constants.OPERATION_STATE_REGISTERED == state.state) + + @mock.patch('smaug.services.operationengine.operations.' + 'protect_operation.ProtectOperation._resume') + def test_resume_operation(self, resume): + operation = self._create_operation() + self._create_operation_state(operation.id, 0) + + now = datetime.utcnow() + window_time = 30 + + self._executor.resume_operation(operation.id, end_time_for_run=( + now + timedelta(seconds=window_time))) + + time.sleep(1) + + self.assertEqual(0, len(self._executor._operation_to_run)) + + self.assertTrue(resume.called) + + state = objects.ScheduledOperationState.get_by_operation_id( + self.context, operation.id) + self.assertTrue(constants.OPERATION_STATE_REGISTERED == state.state) + + def test_cancel_operation(self): + operation_id = '123' + + self._executor.cancel_operation(operation_id) + self.assertEqual(0, len(self._executor._operation_to_cancel)) + + self._executor._operation_to_run[operation_id] = 0 + self._executor.cancel_operation(Operation_ID) + self.assertEqual(1, len(self._executor._operation_to_cancel)) + + def _create_operation(self, trigger_id='123'): + operation_info = { + 'name': 'protect vm', + 'operation_type': 'protect', + 'project_id': '123', + 'trigger_id': trigger_id, + 'operation_definition': {} + } + operation = objects.ScheduledOperation(self.context, **operation_info) + operation.create() + return operation + + def _create_operation_state(self, operation_id, service_id): + state_info = { + 'operation_id': operation_id, + 'service_id': service_id, + 'state': constants.OPERATION_STATE_INIT, + } + state = objects.ScheduledOperationState(self.context, **state_info) + state.create() + return state