deb-mistral/mistral/services/scheduler.py
Winson Chan 55540e116e Replace copy.copy with copy.deepcopy
The function copy.copy only copies reference and so if the object has dict
or nested dicts, these instances can be modified by code that has reference
to the original dict.

Change-Id: Ib2680f09b5a5d45fd7180e95ab5fedef05664c5b
Closes-Bug: #1503851
2015-10-07 19:55:31 +00:00

205 lines
6.9 KiB
Python

# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import datetime
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import periodic_task
from oslo_service import threadgroup
from oslo_utils import importutils
from mistral import context
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
# {scheduler_instance: thread_group}
_schedulers = {}
def schedule_call(factory_method_path, target_method_name,
run_after, serializers=None, **method_args):
"""Add this call specification to DB, and then after run_after
seconds service CallScheduler invokes the target_method.
:param factory_method_path: Full python-specific path to
factory method for target object construction.
:param target_method_name: Name of target object method which
will be invoked.
:param run_after: Value in seconds.
param serializers: map of argument names and their serializer class paths.
Use when an argument is an object of specific type, and needs to be
serialized. Example:
{ "result": "mistral.utils.serializer.ResultSerializer"}
Serializer for the object type must implement serializer interface
in mistral/utils/serializer.py
:param method_args: Target method keyword arguments.
"""
ctx = context.ctx().to_dict() if context.has_ctx() else {}
execution_time = (datetime.datetime.now() +
datetime.timedelta(seconds=run_after))
if serializers:
for arg_name, serializer_path in serializers.items():
if arg_name not in method_args:
raise exc.MistralException(
"Serializable method argument %s"
" not found in method_args=%s"
% (arg_name, method_args))
try:
serializer = importutils.import_class(serializer_path)()
except ImportError as e:
raise ImportError("Cannot import class %s: %s"
% (serializer_path, e))
method_args[arg_name] = serializer.serialize(
method_args[arg_name]
)
values = {
'factory_method_path': factory_method_path,
'target_method_name': target_method_name,
'execution_time': execution_time,
'auth_context': ctx,
'serializers': serializers,
'method_arguments': method_args,
'processing': False
}
db_api.create_delayed_call(values)
class CallScheduler(periodic_task.PeriodicTasks):
# TODO(rakhmerov): Think how to make 'spacing' configurable.
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def run_delayed_calls(self, ctx=None):
time_filter = datetime.datetime.now() + datetime.timedelta(
seconds=1)
# Wrap delayed calls processing in transaction to
# guarantee that calls will be processed just once.
# Do delete query to DB first to force hanging up all
# parallel transactions.
# It should work on isolation level 'READ-COMMITTED',
# 'REPEATABLE-READ' and above.
#
# 'REPEATABLE-READ' is by default in MySQL and
# 'READ-COMMITTED is by default in PostgreSQL.
delayed_calls = []
with db_api.transaction():
candidate_calls = db_api.get_delayed_calls_to_start(
time_filter
)
calls_to_make = []
for call in candidate_calls:
# Mark this delayed call has been processed in order to
# prevent calling from parallel transaction.
result, number_of_updated = db_api.update_delayed_call(
id=call.id,
values={'processing': True},
query_filter={"processing": False}
)
# If number_of_updated != 1 other scheduler already
# updated.
if number_of_updated == 1:
calls_to_make.append(result)
for call in calls_to_make:
LOG.debug('Processing next delayed call: %s', call)
context.set_ctx(context.MistralContext(call.auth_context))
if call.factory_method_path:
factory = importutils.import_class(
call.factory_method_path
)
target_method = getattr(factory(), call.target_method_name)
else:
target_method = importutils.import_class(
call.target_method_name
)
method_args = copy.deepcopy(call.method_arguments)
if call.serializers:
# Deserialize arguments.
for arg_name, ser_path in call.serializers.items():
serializer = importutils.import_class(ser_path)()
deserialized = serializer.deserialize(
method_args[arg_name]
)
method_args[arg_name] = deserialized
delayed_calls.append((target_method, method_args))
for (target_method, method_args) in delayed_calls:
# Transaction is needed here because some of the
# target_method can use the DB
with db_api.transaction():
try:
# Call the method.
target_method(**method_args)
except Exception as e:
LOG.error(
"Delayed call failed [exception=%s]", e
)
with db_api.transaction():
for call in calls_to_make:
try:
# Delete calls that were processed.
db_api.delete_delayed_call(call.id)
except Exception as e:
LOG.error(
"failed to delete call [call=%s, "
"exception=%s]", call, e
)
def setup():
tg = threadgroup.ThreadGroup()
scheduler = CallScheduler(CONF)
tg.add_dynamic_timer(
scheduler.run_periodic_tasks,
initial_delay=None,
periodic_interval_max=1,
context=None
)
_schedulers[scheduler] = tg
return tg
def stop_all_schedulers():
for scheduler, tg in _schedulers.items():
tg.stop()
del _schedulers[scheduler]