Adding unit test for taskflow service
- modified cue.taskflow.service to accept persistence and jobboard as an argument - added a functional test to post a job to jobboard, launch a cue.taskflow.service instance, and have the job be consumed - added unit tests to bring test coverage to 100% Change-Id: I44bf335bc7b7ef7756e90184db34817b288ec5b6
This commit is contained in:
parent
ab72de78ce
commit
aa30eba97f
@ -12,8 +12,7 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import contextlib
|
||||
import logging as std_logging
|
||||
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
@ -22,7 +21,6 @@ from oslo.config import cfg
|
||||
from oslo_log import log as logging
|
||||
from taskflow.conductors import single_threaded
|
||||
|
||||
from cue.common import policy
|
||||
import cue.taskflow.client as tf_client
|
||||
import cue.version as version
|
||||
|
||||
@ -43,16 +41,19 @@ class ConductorService(object):
|
||||
This wrapper is compatible with both single process and multi-process
|
||||
launchers.
|
||||
"""
|
||||
def __init__(self, host=None, jobboard_name=None, jobboard_conf=None,
|
||||
persistence_conf=None, engine_conf=None, wait_timeout=None,
|
||||
*args, **kwargs):
|
||||
def __init__(self, host=None, jobboard=None, jobboard_name=None,
|
||||
jobboard_conf=None, persistence=None, persistence_conf=None,
|
||||
engine_conf=None, wait_timeout=None, *args, **kwargs):
|
||||
"""Constructor for ConductorService
|
||||
|
||||
:param host: Name to be used to identify the host running the conductor
|
||||
:param jobboard: Jobboard instance to be used by conductor service
|
||||
:param jobboard_name: Name of the jobboard
|
||||
:param jobboard_conf: Configuration parameters for the jobboard
|
||||
backend. This configuration is passed forward to
|
||||
:meth:`cue.taskflow.client.Client.jobboard`.
|
||||
:param persistence: Persistence instance to be used by conductor
|
||||
service
|
||||
:param persistence_conf: Configuration parameters for the persistence
|
||||
backend. This configuration is passed forward
|
||||
to
|
||||
@ -75,8 +76,10 @@ class ConductorService(object):
|
||||
|
||||
self._host = host
|
||||
|
||||
self._jobboard = jobboard
|
||||
self._jobboard_name = jobboard_name
|
||||
self._jobboard_conf = jobboard_conf
|
||||
self._persistence = persistence
|
||||
self._persistence_conf = persistence_conf
|
||||
self._engine_conf = engine_conf
|
||||
self._wait_timeout = wait_timeout
|
||||
@ -87,16 +90,19 @@ class ConductorService(object):
|
||||
self._signal_list = None
|
||||
|
||||
@classmethod
|
||||
def create(cls, host=None, jobboard_name=None, jobboard_conf=None,
|
||||
persistence_conf=None, engine_conf=None, wait_timeout=1,
|
||||
*args, **kwargs):
|
||||
def create(cls, host=None, jobboard=None, jobboard_name=None,
|
||||
jobboard_conf=None, persistence=None, persistence_conf=None,
|
||||
engine_conf=None, wait_timeout=1, *args, **kwargs):
|
||||
"""Factory method for creating a ConductorService instance
|
||||
|
||||
:param host: Name to be used to identify the host running the conductor
|
||||
:param jobboard: Jobboard instance to be used by conductor service
|
||||
:param jobboard_name: Name of the jobboard
|
||||
:param jobboard_conf: Configuration parameters for the jobboard
|
||||
backend. This configuration is passed forward to
|
||||
:meth:`cue.taskflow.client.Client.jobboard`.
|
||||
:param persistence: Persistence instance to be used by conductor
|
||||
service
|
||||
:param persistence_conf: Configuration parameters for the persistence
|
||||
backend. This configuration is passed forward
|
||||
to
|
||||
@ -117,29 +123,31 @@ class ConductorService(object):
|
||||
engine_conf = engine_conf or {}
|
||||
engine_conf.setdefault('engine', CONF.taskflow.engine_type)
|
||||
|
||||
return cls(host, jobboard_name, jobboard_conf, persistence_conf,
|
||||
engine_conf, wait_timeout, *args, **kwargs)
|
||||
return cls(host, jobboard, jobboard_name, jobboard_conf, persistence,
|
||||
persistence_conf, engine_conf, wait_timeout, *args,
|
||||
**kwargs)
|
||||
|
||||
def start(self):
|
||||
"""Interface to start the ConductorService."""
|
||||
CONF.log_opt_values(LOG, std_logging.INFO)
|
||||
|
||||
policy.init()
|
||||
|
||||
version_string = version.version_info.version_string()
|
||||
LOG.debug("Starting runner %s on board %s",
|
||||
version_string, self._jobboard_name)
|
||||
|
||||
with contextlib.closing(
|
||||
tf_client.create_persistence(conf=self._persistence_conf)
|
||||
) as persistence:
|
||||
with contextlib.closing(
|
||||
tf_client.create_jobboard(
|
||||
persistence = self._persistence
|
||||
jobboard = self._jobboard
|
||||
try:
|
||||
# Create persistence and/or jobboard if they weren't passed in
|
||||
if persistence is None:
|
||||
persistence = tf_client.create_persistence(
|
||||
conf=self._persistence_conf)
|
||||
|
||||
if jobboard is None:
|
||||
jobboard = tf_client.create_jobboard(
|
||||
board_name=self._jobboard_name,
|
||||
conf=self._jobboard_conf,
|
||||
persistence=persistence,
|
||||
)
|
||||
) as jobboard:
|
||||
|
||||
self._conductor = single_threaded.SingleThreadedConductor(
|
||||
name=self._host,
|
||||
jobboard=jobboard,
|
||||
@ -155,17 +163,23 @@ class ConductorService(object):
|
||||
else:
|
||||
self._conductor.run()
|
||||
|
||||
finally:
|
||||
# Close persistence and jobboard if they were created by us
|
||||
if self._persistence is None:
|
||||
persistence.close()
|
||||
|
||||
if self._jobboard is None:
|
||||
jobboard.close()
|
||||
|
||||
self._shutdown_event.set()
|
||||
|
||||
def stop(self):
|
||||
"""Interface to stop the ConductorService."""
|
||||
self._shutdown = True
|
||||
self._conductor.stop()
|
||||
|
||||
def wait(self):
|
||||
"""Interface to wait for ConductorService to complete."""
|
||||
self._shutdown_event.wait()
|
||||
super(ConductorService, self).wait()
|
||||
|
||||
def handle_signals(self, signals=None, handler=None):
|
||||
"""Set signal handlers
|
||||
|
113
cue/tests/functional/taskflow/test_service.py
Normal file
113
cue/tests/functional/taskflow/test_service.py
Normal file
@ -0,0 +1,113 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo.utils import uuidutils
|
||||
import taskflow.patterns.linear_flow as linear_flow
|
||||
import taskflow.task
|
||||
import zake.fake_client as zake_client
|
||||
|
||||
import cue.taskflow.client as tf_client
|
||||
import cue.taskflow.service as tf_service
|
||||
from cue.tests.functional import base
|
||||
|
||||
|
||||
class TimesTwo(taskflow.task.Task):
|
||||
def execute(self, test_arg):
|
||||
return (test_arg * 2)
|
||||
|
||||
|
||||
def create_flow():
|
||||
return linear_flow.Flow('test flow').add(
|
||||
TimesTwo(),
|
||||
)
|
||||
|
||||
|
||||
class TaskflowServiceTest(base.FunctionalTestCase):
|
||||
def setUp(self):
|
||||
super(TaskflowServiceTest, self).setUp()
|
||||
_zk_client = zake_client.FakeClient()
|
||||
self.persistence = tf_client.create_persistence(client=_zk_client)
|
||||
self.jobboard = tf_client.create_jobboard("service_test",
|
||||
persistence=self.persistence,
|
||||
client=_zk_client)
|
||||
|
||||
self.tf_client = tf_client.Client("service_test",
|
||||
persistence=self.persistence,
|
||||
jobboard=self.jobboard)
|
||||
|
||||
self.tf_service = tf_service.ConductorService.create(
|
||||
host="service_test",
|
||||
jobboard=self.tf_client.jobboard,
|
||||
persistence=self.tf_client.persistence,
|
||||
)
|
||||
|
||||
def tearDown(self):
|
||||
self.persistence.close()
|
||||
self.jobboard.close()
|
||||
|
||||
super(TaskflowServiceTest, self).tearDown()
|
||||
|
||||
def test_consume_background(self):
|
||||
job_args = {
|
||||
'test_arg': 5
|
||||
}
|
||||
tx_uuid = uuidutils.generate_uuid()
|
||||
|
||||
pre_count = self.tf_client.jobboard.job_count
|
||||
job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid)
|
||||
post_count = self.tf_client.jobboard.job_count
|
||||
expected = pre_count + 1
|
||||
|
||||
self.assertEqual(expected, post_count,
|
||||
"expected %d jobs in the jobboard after a post, "
|
||||
"got %d" % (expected, post_count))
|
||||
|
||||
job_list = self.tf_client.joblist()
|
||||
self.assertEqual(expected, len(job_list),
|
||||
"expected %d jobs in the joblist, "
|
||||
"got %d" % (expected, post_count))
|
||||
posted_job = {}
|
||||
for j in job_list:
|
||||
if j.uuid == job.uuid:
|
||||
posted_job = j
|
||||
|
||||
self.assertDictEqual(posted_job.__dict__, job.__dict__,
|
||||
"Job in jobboard differs from job returned by "
|
||||
"Client.post method")
|
||||
|
||||
pre_count = self.tf_client.jobboard.job_count
|
||||
self.assertGreater(pre_count, 0, "Job count is expected to be greater "
|
||||
"than 0 !(%d > 0)" % pre_count)
|
||||
|
||||
t = threading.Thread(target=self.tf_service.start)
|
||||
t.start()
|
||||
time.sleep(1)
|
||||
post_count = self.tf_client.jobboard.job_count
|
||||
expected = 0
|
||||
|
||||
self.assertGreater(pre_count, post_count, "Job count before starting "
|
||||
"the taskflow service is expected to be greater "
|
||||
"than after starting the service !(%d > %d)"
|
||||
% (pre_count, post_count))
|
||||
|
||||
self.assertEqual(expected, post_count,
|
||||
"expected %d jobs in the jobboard after a claim, "
|
||||
"got %d" % (expected, post_count))
|
||||
|
||||
self.tf_service.stop()
|
||||
self.tf_service.wait()
|
0
cue/tests/unit/taskflow/__init__.py
Normal file
0
cue/tests/unit/taskflow/__init__.py
Normal file
128
cue/tests/unit/taskflow/test_service.py
Normal file
128
cue/tests/unit/taskflow/test_service.py
Normal file
@ -0,0 +1,128 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import signal
|
||||
import threading
|
||||
|
||||
import mock
|
||||
|
||||
import cue.taskflow.service as tf_service
|
||||
from cue.tests.unit import base
|
||||
|
||||
|
||||
class TaskflowServiceTest(base.UnitTestCase):
|
||||
def setUp(self):
|
||||
super(TaskflowServiceTest, self).setUp()
|
||||
|
||||
def tearDown(self):
|
||||
super(TaskflowServiceTest, self).tearDown()
|
||||
|
||||
@mock.patch('cue.taskflow.client.create_persistence')
|
||||
@mock.patch('cue.taskflow.client.create_jobboard')
|
||||
@mock.patch('signal.pause')
|
||||
@mock.patch('threading.Thread.start')
|
||||
def test_service_thread_launch(self, mock_thread_start, mock_signal_pause,
|
||||
mock_create_jobboard,
|
||||
mock_create_persistence):
|
||||
assert threading.Thread.start is mock_thread_start
|
||||
assert signal.pause is mock_signal_pause
|
||||
service = tf_service.ConductorService.create("service_test")
|
||||
service.start()
|
||||
self.assertTrue(mock_thread_start.called,
|
||||
"Service thread was not started")
|
||||
self.assertTrue(mock_signal_pause.called,
|
||||
"signal.pause was not called")
|
||||
service.stop()
|
||||
|
||||
@mock.patch('cue.taskflow.client.create_persistence')
|
||||
@mock.patch('cue.taskflow.client.create_jobboard')
|
||||
@mock.patch('signal.pause')
|
||||
@mock.patch('threading.Thread.start')
|
||||
def test_create_jobboard_connection(self, mock_thread_start,
|
||||
mock_signal_pause,
|
||||
mock_create_jobboard,
|
||||
mock_create_persistence):
|
||||
assert threading.Thread.start is mock_thread_start
|
||||
assert signal.pause is mock_signal_pause
|
||||
|
||||
class Closable(object):
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
mock_create_jobboard.return_value = Closable()
|
||||
mock_create_persistence.return_value = Closable()
|
||||
|
||||
service = tf_service.ConductorService.create("service_test")
|
||||
service.start()
|
||||
self.assertTrue(mock_create_persistence.called,
|
||||
"Persistence was not created")
|
||||
self.assertTrue(mock_create_jobboard.called,
|
||||
"Jobboard was not created")
|
||||
service.stop()
|
||||
|
||||
@mock.patch('cue.taskflow.client.create_persistence')
|
||||
@mock.patch('cue.taskflow.client.create_jobboard')
|
||||
@mock.patch('signal.signal')
|
||||
def test_set_sighandler(self, mock_signal_signal, mock_create_jobboard,
|
||||
mock_create_persistence):
|
||||
assert signal.signal is mock_signal_signal
|
||||
|
||||
# Set signal handler
|
||||
service = tf_service.ConductorService.create("service_test")
|
||||
service.handle_signals()
|
||||
self.assertTrue(mock_signal_signal.called,
|
||||
"Signal handler not called")
|
||||
|
||||
@mock.patch('cue.taskflow.client.create_persistence')
|
||||
@mock.patch('cue.taskflow.client.create_jobboard')
|
||||
@mock.patch('signal.signal')
|
||||
def test_change_sighandler(self, mock_signal_signal, mock_create_jobboard,
|
||||
mock_create_persistence):
|
||||
assert signal.signal is mock_signal_signal
|
||||
|
||||
# Set signal handler
|
||||
service = tf_service.ConductorService.create("service_test")
|
||||
service.handle_signals()
|
||||
self.assertTrue(mock_signal_signal.called,
|
||||
"Signal handler not called")
|
||||
mock_signal_signal.called = False
|
||||
|
||||
# Change signal handler
|
||||
service.handle_signals(signals=[signal.SIGQUIT])
|
||||
self.assertTrue(mock_signal_signal.called,
|
||||
"Signal handler not called")
|
||||
|
||||
@mock.patch('cue.taskflow.client.create_persistence')
|
||||
@mock.patch('cue.taskflow.client.create_jobboard')
|
||||
@mock.patch('signal.signal')
|
||||
@mock.patch('signal.pause')
|
||||
@mock.patch('threading.Thread.start')
|
||||
def test_trigger_sighandler(self, mock_thread_start, mock_signal_pause,
|
||||
mock_signal_signal, mock_create_jobboard,
|
||||
mock_create_persistence):
|
||||
assert signal.signal is mock_signal_signal
|
||||
|
||||
# Call sig handler
|
||||
service = tf_service.ConductorService.create("service_test")
|
||||
service.start()
|
||||
service.sighandler(signal.SIGQUIT, None)
|
||||
self.assertTrue(mock_signal_signal.called,
|
||||
"Signal handler not called")
|
||||
service.stop()
|
||||
|
||||
def test_unsupported_engine(self):
|
||||
engine_conf = {'engine': 'worker'}
|
||||
self.assertRaises(ValueError, tf_service.ConductorService,
|
||||
engine_conf=engine_conf)
|
Loading…
Reference in New Issue
Block a user