Implement executor of OperationEngine

Change-Id: I82c0690b60f0e9d61c4b759107b96928ad149122
Closes-Bug: #1547392
This commit is contained in:
zengchen 2016-02-19 18:21:23 +08:00 committed by Saggi Mizrahi
parent 09162ed50c
commit ca37d25e7c
21 changed files with 528 additions and 39 deletions

View File

@ -44,11 +44,12 @@ smaug.protections =
smaug-volume-protection-plugin = smaug.services.protection.plugins.cinder_backup_plugin:CinderBackupPlugin
smaug.provider =
provider-registry = smaug.services.protection.provider:ProviderRegistry
smaug.protectables =
project = smaug.services.protection.protectable_plugins.project:ProjectProtectablePlugin
server = smaug.services.protection.protectable_plugins.server:ServerProtectablePlugin
volume = smaug.services.protection.protectable_plugins.volume:VolumeProtectablePlugin
smaug.operationengine.engine.executor =
executor = smaug.operationengine.engine.executors.thread_pool_executor:ThreadPoolExecutor
[build_sphinx]
source-dir = doc/source

View File

@ -44,3 +44,14 @@ RESOURCE_STATUS_AVAILABLE = 'available'
RESOURCE_STATUS_DELETING = 'deleting'
RESOURCE_STATUS_DELETED = 'deleted'
RESOURCE_STATUS_UNDEFINED = 'undefined'
# scheduled operation state
OPERATION_STATE_INIT = 'init'
OPERATION_STATE_REGISTERED = 'registered'
OPERATION_STATE_TRIGGERED = 'triggered'
OPERATION_STATE_RUNNING = 'running'
OPERATION_STATE_DELETED = 'deleted'
# scheduled operation run type
OPERATION_RUN_TYPE_EXECUTE = 'execute'
OPERATION_RUN_TYPE_RESUME = 'resume'

View File

@ -392,6 +392,24 @@ def scheduled_operation_log_delete(context, log_id):
return IMPL.scheduled_operation_log_delete(context, log_id)
def scheduled_operation_log_delete_oldest(context, operation_id,
retained_num, excepted_states=[]):
"""Delete the oldest scheduled operation logs from the database.
:param context: The security context
:param operation_id: ID of the scheduled operation
:param retained_num: The number of retained logs
:param excepted_states: If the state of log is in excepted_states,
it will not be deleted.
"""
return IMPL.scheduled_operation_log_delete_oldest(context, operation_id,
retained_num,
excepted_states)
###################
def plan_get(context, plan_id):
"""Get a plan or raise if it does not exist."""
return IMPL.plan_get(context, plan_id)

View File

@ -31,6 +31,7 @@ from oslo_utils import timeutils
from sqlalchemy import or_
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import RelationshipProperty
from sqlalchemy.sql import expression
from sqlalchemy.sql.expression import literal_column
from sqlalchemy.sql import func
@ -646,6 +647,36 @@ def scheduled_operation_log_delete(context, log_id):
session.flush()
def scheduled_operation_log_delete_oldest(context, operation_id,
retained_num, excepted_states):
table = models.ScheduledOperationLog
session = get_session()
with session.begin():
result = model_query(context, table, session=session).filter_by(
operation_id=operation_id).order_by(
expression.desc(table.created_at)).limit(retained_num).all()
if not result or len(result) < retained_num:
return
oldest_create_time = result[-1]['created_at']
if excepted_states and isinstance(excepted_states, list):
filters = expression.and_(
table.operation_id == operation_id,
table.created_at < oldest_create_time,
table.state.notin_(excepted_states))
else:
filters = expression.and_(
table.operation_id == operation_id,
table.created_at < oldest_create_time)
model_query(context, table, session=session).filter(
filters).delete(synchronize_session=False)
###################
def _resource_refs(resource_list, meta_class):
resource_refs = []
if resource_list:

View File

@ -126,6 +126,7 @@ def define_tables(meta):
Column('service_id', Integer, ForeignKey('services.id'),
nullable=False),
Column('state', String(length=32), nullable=False),
Column('end_time_for_run', DateTime),
mysql_engine='InnoDB'
)

View File

@ -109,6 +109,7 @@ class ScheduledOperationState(BASE, SmaugBase):
nullable=False)
service_id = Column(Integer, ForeignKey('services.id'), nullable=False)
state = Column(String(32), nullable=False)
end_time_for_run = Column(DateTime)
operation = orm.relationship(
ScheduledOperation,

View File

@ -77,3 +77,9 @@ class ScheduledOperationLog(base.SmaugPersistentObject, base.SmaugObject,
def destroy(self):
if self.id is not None:
db.scheduled_operation_log_delete(self._context, self.id)
@base.remotable_classmethod
def destroy_oldest(cls, context, operation_id,
retained_num, excepted_states=[]):
db.scheduled_operation_log_delete_oldest(
context, operation_id, retained_num, excepted_states)

View File

@ -36,6 +36,7 @@ class ScheduledOperationState(base.SmaugPersistentObject, base.SmaugObject,
'operation_id': fields.UUIDField(),
'service_id': fields.IntegerField(),
'state': fields.StringField(),
'end_time_for_run': fields.DateTimeField(nullable=True),
'operation': fields.ObjectField("ScheduledOperation")
}

View File

@ -24,7 +24,7 @@ class BaseExecutor(object):
@abstractmethod
def execute_operation(self, operation_id, triggered_time,
expect_start_time=None, window_time=None, **kwargs):
expect_start_time, window_time, **kwargs):
"""Execute an operation.
:param operation_id: ID of operation

View File

@ -0,0 +1,140 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
from abc import abstractmethod
from datetime import datetime
from datetime import timedelta
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from smaug.common import constants
from smaug import context
from smaug.i18n import _LE, _LW
from smaug import objects
from smaug.services.operationengine.engine.executors import base
from smaug.services.operationengine import operation_manager
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class ScheduledOperationExecutor(base.BaseExecutor):
_CHECK_ITEMS = {
'is_waiting': 'is_waiting',
'is_canceled': 'is_canceled'
}
def __init__(self):
super(ScheduledOperationExecutor, self).__init__()
self._operation_manager = operation_manager.OperationManager()
def execute_operation(self, operation_id, triggered_time,
expect_start_time, window_time, **kwargs):
if self._check_operation(operation_id, self._CHECK_ITEMS.values()):
LOG.warn(_LW("Execute operation(%s), it can't be executed"),
operation_id)
return
end_time_for_run = expect_start_time + timedelta(seconds=window_time)
ret = self._update_operation_state(
operation_id,
{'state': constants.OPERATION_STATE_TRIGGERED,
'end_time_for_run': end_time_for_run})
if not ret:
return
param = {
'operation_id': operation_id,
'triggered_time': triggered_time,
'expect_start_time': expect_start_time,
'window_time': window_time,
'run_type': constants.OPERATION_RUN_TYPE_EXECUTE
}
self._execute_operation(operation_id, self._run_operation, param)
def resume_operation(self, operation_id, **kwargs):
end_time = kwargs.get('end_time_for_run')
now = datetime.utcnow()
if not isinstance(end_time, datetime) or now > end_time:
return
window = int(timeutils.delta_seconds(now, end_time))
param = {
'operation_id': operation_id,
'triggered_time': now,
'expect_start_time': now,
'window_time': window,
'run_type': constants.OPERATION_RUN_TYPE_RESUME
}
self._execute_operation(operation_id, self._run_operation, param)
def _run_operation(self, operation_id, param):
self._update_operation_state(
operation_id,
{'state': constants.OPERATION_STATE_RUNNING})
try:
check_item = [self._CHECK_ITEMS['is_canceled']]
if self._check_operation(operation_id, check_item):
return
try:
operation = objects.ScheduledOperation.get_by_id(
context.get_admin_context(), operation_id)
except Exception:
LOG.exception(_LE("Run operation(%s), get operation failed"),
operation_id)
return
try:
self._operation_manager.run_operation(
operation.operation_type, operation.project_id,
operation.operation_definition,
param=param)
except Exception:
LOG.exception(_LE("Run operation(%s) failed"), operation_id)
finally:
self._update_operation_state(
operation_id,
{'state': constants.OPERATION_STATE_REGISTERED})
def _update_operation_state(self, operation_id, updates):
ctxt = context.get_admin_context()
try:
state_ref = objects.ScheduledOperationState.get_by_operation_id(
ctxt, operation_id)
for item, value in six.iteritems(updates):
setattr(state_ref, item, value)
state_ref.save()
except Exception:
LOG.exception(_LE("Execute operation(%s), update state failed"),
operation_id)
return False
return True
@abstractmethod
def _execute_operation(self, operation_id, funtion, param):
pass
@abstractmethod
def _check_operation(self, operation_id, check_items):
"""Check whether the item in check_items happens"""
pass

View File

@ -0,0 +1,95 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from collections import defaultdict
from concurrent import futures
from oslo_config import cfg
from oslo_log import log as logging
from threading import RLock
from smaug.i18n import _LE
from smaug.services.operationengine.engine.executors import\
scheduled_operation_executor as base_executor
executor_opts = [
cfg.IntOpt('thread_count',
default=10,
help='The count of thread which executor will start')
]
CONF = cfg.CONF
CONF.register_opts(executor_opts)
LOG = logging.getLogger(__name__)
class ThreadPoolExecutor(base_executor.ScheduledOperationExecutor):
def __init__(self, thread_count=None):
super(ThreadPoolExecutor, self).__init__()
if thread_count is None:
thread_count = CONF.thread_count
self._pool = futures.ThreadPoolExecutor(thread_count)
self._operation_to_run = defaultdict(int)
self._operation_to_cancel = set()
self._lock = RLock()
self._check_functions = {
self._CHECK_ITEMS['is_waiting']: lambda op_id: (
op_id in self._operation_to_run),
self._CHECK_ITEMS['is_canceled']: lambda op_id: (
op_id in self._operation_to_cancel),
}
def shutdown(self, wait=True):
self._pool.shutdown(wait)
self._operation_to_run.clear()
self._operation_to_cancel.clear()
def cancel_operation(self, operation_id):
with self._lock:
if operation_id in self._operation_to_run:
self._operation_to_cancel.add(operation_id)
def _check_operation(self, operation_id, check_items):
with self._lock:
return any(self._check_functions[item](operation_id)
for item in check_items)
def _execute_operation(self, operation_id, function, param):
def callback(f):
self._finish_operation(operation_id)
with self._lock:
self._operation_to_run[operation_id] += 1
try:
f = self._pool.submit(function, operation_id, param)
f.add_done_callback(callback)
except Exception:
self._operation_to_run[operation_id] -= 1
LOG.exception(_LE("Submit operation(%(o_id)s) failed."),
operation_id)
def _finish_operation(self, operation_id):
with self._lock:
self._operation_to_run[operation_id] -= 1
if 0 == self._operation_to_run[operation_id]:
del self._operation_to_run[operation_id]
if operation_id in self._operation_to_cancel:
self._operation_to_cancel.remove(operation_id)

View File

@ -13,6 +13,7 @@
"""
Manage all triggers.
"""
from stevedore import driver as import_driver
from smaug import exception
from smaug.i18n import _
@ -34,8 +35,10 @@ class TriggerManager(object):
# }
self._trigger_obj_map = {}
# TODO(zengchen) create executor
self._executor = None
executor_cls = import_driver.DriverManager(
'smaug.operationengine.engine.executor',
'executor').driver
self._executor = executor_cls()
def shutdown(self):

View File

@ -56,9 +56,9 @@ class OperationManager(object):
cls = self._get_operation_cls(operation_type)
cls.check_operation_definition(operation_definition)
def execute_operation(self, operation_type, project_id,
operation_definition):
"""Execute operation.
def run_operation(self, operation_type, project_id,
operation_definition, **kwargs):
"""Run operation.
:param operation_type: the type of operation
:param project_id: the id of tenant
@ -66,4 +66,4 @@ class OperationManager(object):
:raise InvalidInput if the operation_type is invalid.
"""
cls = self._get_operation_cls(operation_type)
cls.execute(project_id, operation_definition)
cls.run_operation(project_id, operation_definition, **kwargs)

View File

@ -17,6 +17,7 @@ Operation classes
from abc import ABCMeta
import six
from smaug.common import constants
from smaug import loadables
@ -34,11 +35,30 @@ class Operation(object):
pass
@classmethod
def execute(self, project_id, operation_definition):
def run_operation(cls, project_id, operation_definition, **kwargs):
param = kwargs.get('param')
if constants.OPERATION_RUN_TYPE_EXECUTE == param['run_type']:
cls._execute(project_id, operation_definition, param)
else:
cls._resume(project_id, operation_definition, param)
@classmethod
def _execute(self, project_id, operation_definition, param):
"""Execute operation.
:param project_id: the id of tenant
:param operation_definition: the definition of operation
:param param: dict, other parameters
"""
pass
@classmethod
def _resume(self, project_id, operation_definition, param):
"""Resume operation.
:param project_id: the id of tenant
:param operation_definition: the definition of operation
:param param: dict, other parameters
"""
pass

View File

@ -27,7 +27,13 @@ class ProtectOperation(operations.Operation):
raise exception.InvalidOperationDefinition(reason=reason)
@classmethod
def execute(self, project_id, operation_definition):
def _execute(self, project_id, operation_definition, param):
# plan_id = operation_definition.get("plan_id")
# TODO(chenzeng): invoke create checkpoint interface
pass
@classmethod
def _resume(self, project_id, operation_definition, param):
# plan_id = operation_definition.get("plan_id")
# TODO(chenzeng): invoke create checkpoint interface
pass

View File

@ -1,25 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduled operation state
"""
INIT = 'init'
REGISTERED = 'registered'
TRIGGERED = 'triggered'
RUNNING = 'running'
DELETED = 'deleted'

View File

@ -12,6 +12,8 @@
"""Tests for Models Database."""
from datetime import datetime
from datetime import timedelta
from oslo_config import cfg
from oslo_utils import uuidutils
import six
@ -292,11 +294,14 @@ class ScheduledOperationLogTestCase(base.TestCase):
def setUp(self):
super(ScheduledOperationLogTestCase, self).setUp()
self.ctxt = context.get_admin_context()
self.operation_id = '0354ca9ddcd046b693340d78759fd274'
def _create_scheduled_operation_log(self):
def _create_scheduled_operation_log(self, state='in_progress',
created_at=datetime.now()):
values = {
'operation_id': '0354ca9ddcd046b693340d78759fd274',
'state': 'in_progress',
'operation_id': self.operation_id,
'state': state,
'created_at': created_at
}
return db.scheduled_operation_log_create(self.ctxt, values)
@ -320,6 +325,29 @@ class ScheduledOperationLogTestCase(base.TestCase):
db.scheduled_operation_log_delete,
self.ctxt, 100)
def test_scheduled_operation_log_delete_oldest(self):
log_ids = []
states = ['success', 'in_progress', 'success', 'success']
for i in range(4):
t = datetime.now() + timedelta(hours=i)
log = self._create_scheduled_operation_log(
states[i], t)
log_ids.append(log['id'])
db.scheduled_operation_log_delete_oldest(
self.ctxt, self.operation_id, 3)
self.assertRaises(exception.ScheduledOperationLogNotFound,
db.scheduled_operation_log_get,
self.ctxt, log_ids[0])
db.scheduled_operation_log_delete_oldest(
self.ctxt, self.operation_id, 1, ['in_progress'])
log_ref = db.scheduled_operation_log_get(self.ctxt, log_ids[1])
self.assertEqual('in_progress', log_ref['state'])
self.assertRaises(exception.ScheduledOperationLogNotFound,
db.scheduled_operation_log_get,
self.ctxt, log_ids[2])
def test_scheduled_operation_log_update(self):
log_ref = self._create_scheduled_operation_log()
log_id = log_ref['id']

View File

@ -136,6 +136,6 @@ class TestScheduledOperationStateList(test_objects.BaseObjectsTestCase):
'service_id': service_id,
'state': 'triggered',
}
state = objects.ScheduledOperationState(self, **state_info)
state = objects.ScheduledOperationState(self.context, **state_info)
state.create()
return state

View File

@ -0,0 +1,152 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
import mock
import time
from smaug.common import constants
from smaug import context
from smaug import objects
from smaug.services.operationengine.engine.executors import\
thread_pool_executor
from smaug.tests import base
NOW = datetime.utcnow()
Log_ID = 0
Operation_ID = "123"
Fake_Log = {
'created_at': NOW,
'deleted_at': None,
'updated_at': NOW,
'deleted': False,
'id': Log_ID,
'operation_id': Operation_ID,
'expect_start_time': NOW,
'triggered_time': NOW,
'actual_start_time': NOW,
'end_time': NOW,
'state': 'in_progress',
'extend_info': '',
}
Fake_Operation = {
'created_at': NOW,
'deleted_at': None,
'updated_at': NOW,
'deleted': False,
'id': Operation_ID,
'name': 'protect vm',
'operation_type': 'protect',
'project_id': '123',
'trigger_id': '0354ca9ddcd046b693340d78759fd275',
'operation_definition': '{}'
}
class ThreadPoolExecutorTestCase(base.TestCase):
def setUp(self):
super(ThreadPoolExecutorTestCase, self).setUp()
self._executor = thread_pool_executor.ThreadPoolExecutor()
self.context = context.get_admin_context()
def tearDown(self):
super(ThreadPoolExecutorTestCase, self).tearDown()
self._executor.shutdown()
@mock.patch('smaug.services.operationengine.operations.'
'protect_operation.ProtectOperation._execute')
def test_execute_operation(self, execute):
operation = self._create_operation()
self._create_operation_state(operation.id, 0)
now = datetime.utcnow()
window_time = 30
self._executor.execute_operation(operation.id, now, now, window_time)
time.sleep(1)
self.assertEqual(0, len(self._executor._operation_to_run))
param = {
'operation_id': operation.id,
'triggered_time': now,
'expect_start_time': now,
'window_time': window_time,
'run_type': constants.OPERATION_RUN_TYPE_EXECUTE
}
execute.assert_called_once_with(
operation.project_id, operation.operation_definition, param)
state = objects.ScheduledOperationState.get_by_operation_id(
self.context, operation.id)
self.assertTrue(state.end_time_for_run is not None)
self.assertTrue(constants.OPERATION_STATE_REGISTERED == state.state)
@mock.patch('smaug.services.operationengine.operations.'
'protect_operation.ProtectOperation._resume')
def test_resume_operation(self, resume):
operation = self._create_operation()
self._create_operation_state(operation.id, 0)
now = datetime.utcnow()
window_time = 30
self._executor.resume_operation(operation.id, end_time_for_run=(
now + timedelta(seconds=window_time)))
time.sleep(1)
self.assertEqual(0, len(self._executor._operation_to_run))
self.assertTrue(resume.called)
state = objects.ScheduledOperationState.get_by_operation_id(
self.context, operation.id)
self.assertTrue(constants.OPERATION_STATE_REGISTERED == state.state)
def test_cancel_operation(self):
operation_id = '123'
self._executor.cancel_operation(operation_id)
self.assertEqual(0, len(self._executor._operation_to_cancel))
self._executor._operation_to_run[operation_id] = 0
self._executor.cancel_operation(Operation_ID)
self.assertEqual(1, len(self._executor._operation_to_cancel))
def _create_operation(self, trigger_id='123'):
operation_info = {
'name': 'protect vm',
'operation_type': 'protect',
'project_id': '123',
'trigger_id': trigger_id,
'operation_definition': {}
}
operation = objects.ScheduledOperation(self.context, **operation_info)
operation.create()
return operation
def _create_operation_state(self, operation_id, service_id):
state_info = {
'operation_id': operation_id,
'service_id': service_id,
'state': constants.OPERATION_STATE_INIT,
}
state = objects.ScheduledOperationState(self.context, **state_info)
state.create()
return state