Scheduler in HA

1) Create new column "processing" in the delayed_calls_v2 table
2) Scheduler takes only objects that marked as processing=false
3) After processing a call scheduler deletes it.

Implements: blueprint scheduler-ha
Change-Id: I465a9c2d7443c352a1cad35f48f5228b64844ded
This commit is contained in:
Limor Stotland 2015-08-11 07:07:12 +00:00
parent a5fba7ca52
commit 3326affb89
6 changed files with 246 additions and 43 deletions

View File

@ -0,0 +1,36 @@
# Copyright 2015 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""add a Boolean column 'processed' to the table delayed_calls_v2
Revision ID: 006
Revises: 004
Create Date: 2015-08-09 09:44:38.289271
"""
# revision identifiers, used by Alembic.
revision = '006'
down_revision = '005'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.add_column(
'delayed_calls_v2',
sa.Column('processing', sa.Boolean, default=False, nullable=False)
)

View File

@ -325,6 +325,9 @@ def delete_task_executions(**kwargs):
# Delayed calls.
def get_delayed_calls_to_start(time):
return IMPL.get_delayed_calls_to_start(time)
def create_delayed_call(values):
return IMPL.create_delayed_call(values)
@ -334,8 +337,12 @@ def delete_delayed_call(id):
return IMPL.delete_delayed_call(id)
def get_delayed_calls_to_start(time):
return IMPL.get_delayed_calls_to_start(time)
def update_delayed_call(id, values, query_filter=None):
return IMPL.update_delayed_call(id, values, query_filter)
def get_delayed_call(id):
return IMPL.get_delayed_call(id)
# Cron triggers.

View File

@ -18,6 +18,7 @@ import sys
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_db import sqlalchemy as oslo_sqlalchemy
from oslo_log import log as logging
from oslo_utils import timeutils
import sqlalchemy as sa
@ -738,11 +739,49 @@ def get_delayed_calls_to_start(time, session=None):
query = b.model_query(models.DelayedCall)
query = query.filter(models.DelayedCall.execution_time < time)
query = query.filter_by(processing=False)
query = query.order_by(models.DelayedCall.execution_time)
return query.all()
@b.session_aware()
def update_delayed_call(id, values, query_filter=None, session=None):
if query_filter:
try:
specimen = models.DelayedCall(id=id, **query_filter)
delayed_call = b.model_query(
models.DelayedCall).update_on_match(specimen=specimen,
surrogate_key='id',
values=values)
return delayed_call, 1
except oslo_sqlalchemy.update_match.NoRowsMatched as e:
LOG.debug(
"No rows matched for update call [id=%s, values=%s, "
"query_filter=%s,"
"exception=%s]", id, values, query_filter, e
)
return None, 0
else:
delayed_call = get_delayed_call(id=id, session=session)
delayed_call.update(values)
return delayed_call, len(session.dirty)
@b.session_aware()
def get_delayed_call(id, session=None):
delayed_call = _get_delayed_call(id=id, session=session)
if not delayed_call:
raise exc.NotFoundException("Delayed Call not found [id=%s]" % id)
return delayed_call
@b.session_aware()
def get_expired_executions(time, session=None):
query = b.model_query(models.WorkflowExecution)

View File

@ -254,6 +254,7 @@ class DelayedCall(mb.MistralModelBase):
serializers = sa.Column(st.JsonDictType())
auth_context = sa.Column(st.JsonDictType())
execution_time = sa.Column(sa.DateTime, nullable=False)
processing = sa.Column(sa.Boolean, default=False, nullable=False)
class Environment(mb.MistralSecureModelBase):

View File

@ -37,7 +37,6 @@ _schedulers = {}
def schedule_call(factory_method_path, target_method_name,
run_after, serializers=None, **method_args):
"""Add this call specification to DB, and then after run_after
seconds service CallScheduler invokes the target_method.
@ -62,9 +61,10 @@ def schedule_call(factory_method_path, target_method_name,
if serializers:
for arg_name, serializer_path in serializers.items():
if arg_name not in method_args:
raise exc.MistralException("Serializable method argument %s"
" not found in method_args=%s"
% (arg_name, method_args))
raise exc.MistralException(
"Serializable method argument %s"
" not found in method_args=%s"
% (arg_name, method_args))
try:
serializer = importutils.import_class(serializer_path)()
except ImportError as e:
@ -81,7 +81,8 @@ def schedule_call(factory_method_path, target_method_name,
'execution_time': execution_time,
'auth_context': ctx,
'serializers': serializers,
'method_arguments': method_args
'method_arguments': method_args,
'processing': False
}
db_api.create_delayed_call(values)
@ -91,7 +92,8 @@ class CallScheduler(periodic_task.PeriodicTasks):
# TODO(rakhmerov): Think how to make 'spacing' configurable.
@periodic_task.periodic_task(spacing=1, run_immediately=True)
def run_delayed_calls(self, ctx=None):
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=1)
time_filter = datetime.datetime.now() + datetime.timedelta(
seconds=1)
# Wrap delayed calls processing in transaction to
# guarantee that calls will be processed just once.
@ -105,54 +107,77 @@ class CallScheduler(periodic_task.PeriodicTasks):
delayed_calls = []
with db_api.transaction():
for call in db_api.get_delayed_calls_to_start(time_filter):
# Delete this delayed call from DB before the making call in
# order to prevent calling from parallel transaction.
db_api.delete_delayed_call(call.id)
candidate_calls = db_api.get_delayed_calls_to_start(
time_filter
)
calls_to_make = []
LOG.debug('Processing next delayed call: %s', call)
for call in candidate_calls:
# Mark this delayed call has been processed in order to
# prevent calling from parallel transaction.
result, number_of_updated = db_api.update_delayed_call(
id=call.id,
values={'processing': True},
query_filter={"processing": False}
)
context.set_ctx(context.MistralContext(call.auth_context))
# If number_of_updated != 1 other scheduler already
# updated.
if number_of_updated == 1:
calls_to_make.append(result)
if call.factory_method_path:
factory = importutils.import_class(
call.factory_method_path
for call in calls_to_make:
LOG.debug('Processing next delayed call: %s', call)
context.set_ctx(context.MistralContext(call.auth_context))
if call.factory_method_path:
factory = importutils.import_class(
call.factory_method_path
)
target_method = getattr(factory(), call.target_method_name)
else:
target_method = importutils.import_class(
call.target_method_name
)
method_args = copy.copy(call.method_arguments)
if call.serializers:
# Deserialize arguments.
for arg_name, ser_path in call.serializers.items():
serializer = importutils.import_class(ser_path)()
deserialized = serializer.deserialize(
method_args[arg_name]
)
target_method = getattr(factory(), call.target_method_name)
else:
target_method = importutils.import_class(
call.target_method_name
)
method_args[arg_name] = deserialized
method_args = copy.copy(call.method_arguments)
delayed_calls.append((target_method, method_args))
if call.serializers:
# Deserialize arguments.
for arg_name, ser_path in call.serializers.items():
serializer = importutils.import_class(ser_path)()
deserialized = serializer.deserialize(
method_args[arg_name]
)
method_args[arg_name] = deserialized
delayed_calls.append((target_method, method_args))
# TODO(m4dcoder): Troubleshoot deadlocks with PostgreSQL and MySQL.
# The queries in the target method such as
# mistral.engine.task_handler.run_action can deadlock
# with delete_delayed_call. Please keep the scope of the
# transaction short.
for (target_method, method_args) in delayed_calls:
# Transaction is needed here because some of the
# target_method can use the DB
with db_api.transaction():
try:
# Call the method.
target_method(**method_args)
except Exception as e:
LOG.debug(
"Delayed call failed [call=%s, exception=%s]", call, e
LOG.error(
"Delayed call failed [exception=%s]", e
)
with db_api.transaction():
for call in calls_to_make:
try:
# Delete calls that were processed.
db_api.delete_delayed_call(call.id)
except Exception as e:
LOG.error(
"failed to delete call [call=%s, "
"exception=%s]", call, e
)

View File

@ -18,6 +18,7 @@ import eventlet
import mock
from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import scheduler
from mistral.tests import base
from mistral.workflow import utils as wf_utils
@ -26,6 +27,8 @@ from mistral.workflow import utils as wf_utils
FACTORY_METHOD_NAME = ('mistral.tests.unit.services.test_scheduler.'
'factory_method')
TARGET_METHOD_NAME = FACTORY_METHOD_NAME
FAILED_TO_SEND_TARGET_NAME = ('mistral.tests.unit.services.test_scheduler.'
'failed_to_send')
DELAY = 1.5
WAIT = DELAY * 3
@ -39,6 +42,16 @@ def factory_method():
)
def failed_to_send():
raise exc.EngineException("Test")
def update_call_failed(id, values):
return None, 0
MOCK_UPDATE_CALL_FAILED = mock.MagicMock(side_effect=update_call_failed)
class SchedulerServiceTest(base.DbTestCase):
def setUp(self):
super(SchedulerServiceTest, self).setUp()
@ -188,3 +201,85 @@ class SchedulerServiceTest(base.DbTestCase):
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(0, len(calls))
@mock.patch(TARGET_METHOD_NAME)
def test_scheduler_delete_calls(self, method):
def stop_thread_groups():
[tg.stop() for tg in self.tgs]
self.tgs = [scheduler.setup(), scheduler.setup()]
self.addCleanup(stop_thread_groups)
method_args = {'name': 'task', 'id': '321'}
scheduler.schedule_call(
None,
TARGET_METHOD_NAME,
DELAY,
**method_args
)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=2)
calls = db_api.get_delayed_calls_to_start(time_filter)
self._assert_single_item(calls, target_method_name=TARGET_METHOD_NAME)
eventlet.sleep(WAIT)
self.assertRaises(exc.NotFoundException,
db_api.get_delayed_call,
calls[0].id
)
@mock.patch(TARGET_METHOD_NAME)
def test_processing_true_does_not_return_in_get_delayed_calls_to_start(
self,
method):
execution_time = (datetime.datetime.now() +
datetime.timedelta(seconds=DELAY))
values = {
'factory_method_path': None,
'target_method_name': TARGET_METHOD_NAME,
'execution_time': execution_time,
'auth_context': None,
'serializers': None,
'method_arguments': None,
'processing': True
}
call = db_api.create_delayed_call(values)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=10)
calls = db_api.get_delayed_calls_to_start(time_filter)
self.assertEqual(0, len(calls))
db_api.delete_delayed_call(call.id)
@mock.patch.object(db_api, 'update_delayed_call', MOCK_UPDATE_CALL_FAILED)
def test_scheduler_doesnt_handel_calls_the_failed_on_update(self):
def stop_thread_groups():
[tg.stop() for tg in self.tgs]
self.tgs = [scheduler.setup(), scheduler.setup()]
self.addCleanup(stop_thread_groups)
method_args = {'name': 'task', 'id': '321'}
scheduler.schedule_call(
None,
TARGET_METHOD_NAME,
DELAY,
**method_args
)
time_filter = datetime.datetime.now() + datetime.timedelta(seconds=2)
calls = db_api.get_delayed_calls_to_start(time_filter)
eventlet.sleep(WAIT)
# If the scheduler does handel calls that failed on update
# NotFoundException will raise.
db_api.get_delayed_call(calls[0].id)
db_api.delete_delayed_call(calls[0].id)