Merge "Adding unit test for taskflow service"

This commit is contained in:
Jenkins 2015-05-06 17:54:34 +00:00 committed by Gerrit Code Review
commit 619f474230
4 changed files with 295 additions and 40 deletions

View File

@ -12,8 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import logging as std_logging
import signal
import threading
import time
@ -22,7 +21,6 @@ from oslo.config import cfg
from oslo_log import log as logging
from taskflow.conductors import single_threaded
from cue.common import policy
import cue.taskflow.client as tf_client
import cue.version as version
@ -43,16 +41,19 @@ class ConductorService(object):
This wrapper is compatible with both single process and multi-process
launchers.
"""
def __init__(self, host=None, jobboard_name=None, jobboard_conf=None,
persistence_conf=None, engine_conf=None, wait_timeout=None,
*args, **kwargs):
def __init__(self, host=None, jobboard=None, jobboard_name=None,
jobboard_conf=None, persistence=None, persistence_conf=None,
engine_conf=None, wait_timeout=None, *args, **kwargs):
"""Constructor for ConductorService
:param host: Name to be used to identify the host running the conductor
:param jobboard: Jobboard instance to be used by conductor service
:param jobboard_name: Name of the jobboard
:param jobboard_conf: Configuration parameters for the jobboard
backend. This configuration is passed forward to
:meth:`cue.taskflow.client.Client.jobboard`.
:param persistence: Persistence instance to be used by conductor
service
:param persistence_conf: Configuration parameters for the persistence
backend. This configuration is passed forward
to
@ -75,8 +76,10 @@ class ConductorService(object):
self._host = host
self._jobboard = jobboard
self._jobboard_name = jobboard_name
self._jobboard_conf = jobboard_conf
self._persistence = persistence
self._persistence_conf = persistence_conf
self._engine_conf = engine_conf
self._wait_timeout = wait_timeout
@ -87,16 +90,19 @@ class ConductorService(object):
self._signal_list = None
@classmethod
def create(cls, host=None, jobboard_name=None, jobboard_conf=None,
persistence_conf=None, engine_conf=None, wait_timeout=1,
*args, **kwargs):
def create(cls, host=None, jobboard=None, jobboard_name=None,
jobboard_conf=None, persistence=None, persistence_conf=None,
engine_conf=None, wait_timeout=1, *args, **kwargs):
"""Factory method for creating a ConductorService instance
:param host: Name to be used to identify the host running the conductor
:param jobboard: Jobboard instance to be used by conductor service
:param jobboard_name: Name of the jobboard
:param jobboard_conf: Configuration parameters for the jobboard
backend. This configuration is passed forward to
:meth:`cue.taskflow.client.Client.jobboard`.
:param persistence: Persistence instance to be used by conductor
service
:param persistence_conf: Configuration parameters for the persistence
backend. This configuration is passed forward
to
@ -117,55 +123,63 @@ class ConductorService(object):
engine_conf = engine_conf or {}
engine_conf.setdefault('engine', CONF.taskflow.engine_type)
return cls(host, jobboard_name, jobboard_conf, persistence_conf,
engine_conf, wait_timeout, *args, **kwargs)
return cls(host, jobboard, jobboard_name, jobboard_conf, persistence,
persistence_conf, engine_conf, wait_timeout, *args,
**kwargs)
def start(self):
"""Interface to start the ConductorService."""
CONF.log_opt_values(LOG, std_logging.INFO)
policy.init()
version_string = version.version_info.version_string()
LOG.debug("Starting runner %s on board %s",
version_string, self._jobboard_name)
with contextlib.closing(
tf_client.create_persistence(conf=self._persistence_conf)
) as persistence:
with contextlib.closing(
tf_client.create_jobboard(
board_name=self._jobboard_name,
conf=self._jobboard_conf,
persistence=persistence,
)
) as jobboard:
self._conductor = single_threaded.SingleThreadedConductor(
name=self._host,
jobboard=jobboard,
persistence=persistence,
engine=self._engine_conf['engine'],
wait_timeout=self._wait_timeout)
persistence = self._persistence
jobboard = self._jobboard
try:
# Create persistence and/or jobboard if they weren't passed in
if persistence is None:
persistence = tf_client.create_persistence(
conf=self._persistence_conf)
time.sleep(0.5)
if threading.current_thread().name == 'MainThread':
t = threading.Thread(target=self._conductor.run)
t.start()
signal.pause()
else:
self._conductor.run()
if jobboard is None:
jobboard = tf_client.create_jobboard(
board_name=self._jobboard_name,
conf=self._jobboard_conf,
persistence=persistence,
)
self._conductor = single_threaded.SingleThreadedConductor(
name=self._host,
jobboard=jobboard,
persistence=persistence,
engine=self._engine_conf['engine'],
wait_timeout=self._wait_timeout)
time.sleep(0.5)
if threading.current_thread().name == 'MainThread':
t = threading.Thread(target=self._conductor.run)
t.start()
signal.pause()
else:
self._conductor.run()
finally:
# Close persistence and jobboard if they were created by us
if self._persistence is None:
persistence.close()
if self._jobboard is None:
jobboard.close()
self._shutdown_event.set()
def stop(self):
"""Interface to stop the ConductorService."""
self._shutdown = True
self._conductor.stop()
def wait(self):
"""Interface to wait for ConductorService to complete."""
self._shutdown_event.wait()
super(ConductorService, self).wait()
def handle_signals(self, signals=None, handler=None):
"""Set signal handlers

View File

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from oslo.utils import uuidutils
import taskflow.patterns.linear_flow as linear_flow
import taskflow.task
import zake.fake_client as zake_client
import cue.taskflow.client as tf_client
import cue.taskflow.service as tf_service
from cue.tests.functional import base
class TimesTwo(taskflow.task.Task):
def execute(self, test_arg):
return (test_arg * 2)
def create_flow():
return linear_flow.Flow('test flow').add(
TimesTwo(),
)
class TaskflowServiceTest(base.FunctionalTestCase):
def setUp(self):
super(TaskflowServiceTest, self).setUp()
_zk_client = zake_client.FakeClient()
self.persistence = tf_client.create_persistence(client=_zk_client)
self.jobboard = tf_client.create_jobboard("service_test",
persistence=self.persistence,
client=_zk_client)
self.tf_client = tf_client.Client("service_test",
persistence=self.persistence,
jobboard=self.jobboard)
self.tf_service = tf_service.ConductorService.create(
host="service_test",
jobboard=self.tf_client.jobboard,
persistence=self.tf_client.persistence,
)
def tearDown(self):
self.persistence.close()
self.jobboard.close()
super(TaskflowServiceTest, self).tearDown()
def test_consume_background(self):
job_args = {
'test_arg': 5
}
tx_uuid = uuidutils.generate_uuid()
pre_count = self.tf_client.jobboard.job_count
job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid)
post_count = self.tf_client.jobboard.job_count
expected = pre_count + 1
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a post, "
"got %d" % (expected, post_count))
job_list = self.tf_client.joblist()
self.assertEqual(expected, len(job_list),
"expected %d jobs in the joblist, "
"got %d" % (expected, post_count))
posted_job = {}
for j in job_list:
if j.uuid == job.uuid:
posted_job = j
self.assertDictEqual(posted_job.__dict__, job.__dict__,
"Job in jobboard differs from job returned by "
"Client.post method")
pre_count = self.tf_client.jobboard.job_count
self.assertGreater(pre_count, 0, "Job count is expected to be greater "
"than 0 !(%d > 0)" % pre_count)
t = threading.Thread(target=self.tf_service.start)
t.start()
time.sleep(1)
post_count = self.tf_client.jobboard.job_count
expected = 0
self.assertGreater(pre_count, post_count, "Job count before starting "
"the taskflow service is expected to be greater "
"than after starting the service !(%d > %d)"
% (pre_count, post_count))
self.assertEqual(expected, post_count,
"expected %d jobs in the jobboard after a claim, "
"got %d" % (expected, post_count))
self.tf_service.stop()
self.tf_service.wait()

View File

View File

@ -0,0 +1,128 @@
# -*- coding: utf-8 -*-
# Copyright 2015 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import signal
import threading
import mock
import cue.taskflow.service as tf_service
from cue.tests.unit import base
class TaskflowServiceTest(base.UnitTestCase):
def setUp(self):
super(TaskflowServiceTest, self).setUp()
def tearDown(self):
super(TaskflowServiceTest, self).tearDown()
@mock.patch('cue.taskflow.client.create_persistence')
@mock.patch('cue.taskflow.client.create_jobboard')
@mock.patch('signal.pause')
@mock.patch('threading.Thread.start')
def test_service_thread_launch(self, mock_thread_start, mock_signal_pause,
mock_create_jobboard,
mock_create_persistence):
assert threading.Thread.start is mock_thread_start
assert signal.pause is mock_signal_pause
service = tf_service.ConductorService.create("service_test")
service.start()
self.assertTrue(mock_thread_start.called,
"Service thread was not started")
self.assertTrue(mock_signal_pause.called,
"signal.pause was not called")
service.stop()
@mock.patch('cue.taskflow.client.create_persistence')
@mock.patch('cue.taskflow.client.create_jobboard')
@mock.patch('signal.pause')
@mock.patch('threading.Thread.start')
def test_create_jobboard_connection(self, mock_thread_start,
mock_signal_pause,
mock_create_jobboard,
mock_create_persistence):
assert threading.Thread.start is mock_thread_start
assert signal.pause is mock_signal_pause
class Closable(object):
def close(self):
pass
mock_create_jobboard.return_value = Closable()
mock_create_persistence.return_value = Closable()
service = tf_service.ConductorService.create("service_test")
service.start()
self.assertTrue(mock_create_persistence.called,
"Persistence was not created")
self.assertTrue(mock_create_jobboard.called,
"Jobboard was not created")
service.stop()
@mock.patch('cue.taskflow.client.create_persistence')
@mock.patch('cue.taskflow.client.create_jobboard')
@mock.patch('signal.signal')
def test_set_sighandler(self, mock_signal_signal, mock_create_jobboard,
mock_create_persistence):
assert signal.signal is mock_signal_signal
# Set signal handler
service = tf_service.ConductorService.create("service_test")
service.handle_signals()
self.assertTrue(mock_signal_signal.called,
"Signal handler not called")
@mock.patch('cue.taskflow.client.create_persistence')
@mock.patch('cue.taskflow.client.create_jobboard')
@mock.patch('signal.signal')
def test_change_sighandler(self, mock_signal_signal, mock_create_jobboard,
mock_create_persistence):
assert signal.signal is mock_signal_signal
# Set signal handler
service = tf_service.ConductorService.create("service_test")
service.handle_signals()
self.assertTrue(mock_signal_signal.called,
"Signal handler not called")
mock_signal_signal.called = False
# Change signal handler
service.handle_signals(signals=[signal.SIGQUIT])
self.assertTrue(mock_signal_signal.called,
"Signal handler not called")
@mock.patch('cue.taskflow.client.create_persistence')
@mock.patch('cue.taskflow.client.create_jobboard')
@mock.patch('signal.signal')
@mock.patch('signal.pause')
@mock.patch('threading.Thread.start')
def test_trigger_sighandler(self, mock_thread_start, mock_signal_pause,
mock_signal_signal, mock_create_jobboard,
mock_create_persistence):
assert signal.signal is mock_signal_signal
# Call sig handler
service = tf_service.ConductorService.create("service_test")
service.start()
service.sighandler(signal.SIGQUIT, None)
self.assertTrue(mock_signal_signal.called,
"Signal handler not called")
service.stop()
def test_unsupported_engine(self):
engine_conf = {'engine': 'worker'}
self.assertRaises(ValueError, tf_service.ConductorService,
engine_conf=engine_conf)