From aa30eba97f4a72d904b50087b3a8e2690d9a1b76 Mon Sep 17 00:00:00 2001 From: Min Pae Date: Tue, 14 Apr 2015 15:47:58 -0700 Subject: [PATCH] Adding unit test for taskflow service - modified cue.taskflow.service to accept persistence and jobboard as an argument - added a functional test to post a job to jobboard, launch a cue.taskflow.service instance, and have the job be consumed - added unit tests to bring test coverage to 100% Change-Id: I44bf335bc7b7ef7756e90184db34817b288ec5b6 --- cue/taskflow/service.py | 94 +++++++------ cue/tests/functional/taskflow/test_service.py | 113 ++++++++++++++++ cue/tests/unit/taskflow/__init__.py | 0 cue/tests/unit/taskflow/test_service.py | 128 ++++++++++++++++++ 4 files changed, 295 insertions(+), 40 deletions(-) create mode 100644 cue/tests/functional/taskflow/test_service.py create mode 100644 cue/tests/unit/taskflow/__init__.py create mode 100644 cue/tests/unit/taskflow/test_service.py diff --git a/cue/taskflow/service.py b/cue/taskflow/service.py index 1c99a9b8..73ffe599 100644 --- a/cue/taskflow/service.py +++ b/cue/taskflow/service.py @@ -12,8 +12,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import contextlib -import logging as std_logging + import signal import threading import time @@ -22,7 +21,6 @@ from oslo.config import cfg from oslo_log import log as logging from taskflow.conductors import single_threaded -from cue.common import policy import cue.taskflow.client as tf_client import cue.version as version @@ -43,16 +41,19 @@ class ConductorService(object): This wrapper is compatible with both single process and multi-process launchers. """ - def __init__(self, host=None, jobboard_name=None, jobboard_conf=None, - persistence_conf=None, engine_conf=None, wait_timeout=None, - *args, **kwargs): + def __init__(self, host=None, jobboard=None, jobboard_name=None, + jobboard_conf=None, persistence=None, persistence_conf=None, + engine_conf=None, wait_timeout=None, *args, **kwargs): """Constructor for ConductorService :param host: Name to be used to identify the host running the conductor + :param jobboard: Jobboard instance to be used by conductor service :param jobboard_name: Name of the jobboard :param jobboard_conf: Configuration parameters for the jobboard backend. This configuration is passed forward to :meth:`cue.taskflow.client.Client.jobboard`. + :param persistence: Persistence instance to be used by conductor + service :param persistence_conf: Configuration parameters for the persistence backend. This configuration is passed forward to @@ -75,8 +76,10 @@ class ConductorService(object): self._host = host + self._jobboard = jobboard self._jobboard_name = jobboard_name self._jobboard_conf = jobboard_conf + self._persistence = persistence self._persistence_conf = persistence_conf self._engine_conf = engine_conf self._wait_timeout = wait_timeout @@ -87,16 +90,19 @@ class ConductorService(object): self._signal_list = None @classmethod - def create(cls, host=None, jobboard_name=None, jobboard_conf=None, - persistence_conf=None, engine_conf=None, wait_timeout=1, - *args, **kwargs): + def create(cls, host=None, jobboard=None, jobboard_name=None, + jobboard_conf=None, persistence=None, persistence_conf=None, + engine_conf=None, wait_timeout=1, *args, **kwargs): """Factory method for creating a ConductorService instance :param host: Name to be used to identify the host running the conductor + :param jobboard: Jobboard instance to be used by conductor service :param jobboard_name: Name of the jobboard :param jobboard_conf: Configuration parameters for the jobboard backend. This configuration is passed forward to :meth:`cue.taskflow.client.Client.jobboard`. + :param persistence: Persistence instance to be used by conductor + service :param persistence_conf: Configuration parameters for the persistence backend. This configuration is passed forward to @@ -117,55 +123,63 @@ class ConductorService(object): engine_conf = engine_conf or {} engine_conf.setdefault('engine', CONF.taskflow.engine_type) - return cls(host, jobboard_name, jobboard_conf, persistence_conf, - engine_conf, wait_timeout, *args, **kwargs) + return cls(host, jobboard, jobboard_name, jobboard_conf, persistence, + persistence_conf, engine_conf, wait_timeout, *args, + **kwargs) def start(self): """Interface to start the ConductorService.""" - CONF.log_opt_values(LOG, std_logging.INFO) - - policy.init() - version_string = version.version_info.version_string() LOG.debug("Starting runner %s on board %s", version_string, self._jobboard_name) - with contextlib.closing( - tf_client.create_persistence(conf=self._persistence_conf) - ) as persistence: - with contextlib.closing( - tf_client.create_jobboard( - board_name=self._jobboard_name, - conf=self._jobboard_conf, - persistence=persistence, - ) - ) as jobboard: - self._conductor = single_threaded.SingleThreadedConductor( - name=self._host, - jobboard=jobboard, - persistence=persistence, - engine=self._engine_conf['engine'], - wait_timeout=self._wait_timeout) + persistence = self._persistence + jobboard = self._jobboard + try: + # Create persistence and/or jobboard if they weren't passed in + if persistence is None: + persistence = tf_client.create_persistence( + conf=self._persistence_conf) - time.sleep(0.5) - if threading.current_thread().name == 'MainThread': - t = threading.Thread(target=self._conductor.run) - t.start() - signal.pause() - else: - self._conductor.run() + if jobboard is None: + jobboard = tf_client.create_jobboard( + board_name=self._jobboard_name, + conf=self._jobboard_conf, + persistence=persistence, + ) + + self._conductor = single_threaded.SingleThreadedConductor( + name=self._host, + jobboard=jobboard, + persistence=persistence, + engine=self._engine_conf['engine'], + wait_timeout=self._wait_timeout) + + time.sleep(0.5) + if threading.current_thread().name == 'MainThread': + t = threading.Thread(target=self._conductor.run) + t.start() + signal.pause() + else: + self._conductor.run() + + finally: + # Close persistence and jobboard if they were created by us + if self._persistence is None: + persistence.close() + + if self._jobboard is None: + jobboard.close() self._shutdown_event.set() def stop(self): """Interface to stop the ConductorService.""" - self._shutdown = True self._conductor.stop() def wait(self): """Interface to wait for ConductorService to complete.""" self._shutdown_event.wait() - super(ConductorService, self).wait() def handle_signals(self, signals=None, handler=None): """Set signal handlers diff --git a/cue/tests/functional/taskflow/test_service.py b/cue/tests/functional/taskflow/test_service.py new file mode 100644 index 00000000..0e8dc470 --- /dev/null +++ b/cue/tests/functional/taskflow/test_service.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import time + +from oslo.utils import uuidutils +import taskflow.patterns.linear_flow as linear_flow +import taskflow.task +import zake.fake_client as zake_client + +import cue.taskflow.client as tf_client +import cue.taskflow.service as tf_service +from cue.tests.functional import base + + +class TimesTwo(taskflow.task.Task): + def execute(self, test_arg): + return (test_arg * 2) + + +def create_flow(): + return linear_flow.Flow('test flow').add( + TimesTwo(), + ) + + +class TaskflowServiceTest(base.FunctionalTestCase): + def setUp(self): + super(TaskflowServiceTest, self).setUp() + _zk_client = zake_client.FakeClient() + self.persistence = tf_client.create_persistence(client=_zk_client) + self.jobboard = tf_client.create_jobboard("service_test", + persistence=self.persistence, + client=_zk_client) + + self.tf_client = tf_client.Client("service_test", + persistence=self.persistence, + jobboard=self.jobboard) + + self.tf_service = tf_service.ConductorService.create( + host="service_test", + jobboard=self.tf_client.jobboard, + persistence=self.tf_client.persistence, + ) + + def tearDown(self): + self.persistence.close() + self.jobboard.close() + + super(TaskflowServiceTest, self).tearDown() + + def test_consume_background(self): + job_args = { + 'test_arg': 5 + } + tx_uuid = uuidutils.generate_uuid() + + pre_count = self.tf_client.jobboard.job_count + job = self.tf_client.post(create_flow, job_args, tx_uuid=tx_uuid) + post_count = self.tf_client.jobboard.job_count + expected = pre_count + 1 + + self.assertEqual(expected, post_count, + "expected %d jobs in the jobboard after a post, " + "got %d" % (expected, post_count)) + + job_list = self.tf_client.joblist() + self.assertEqual(expected, len(job_list), + "expected %d jobs in the joblist, " + "got %d" % (expected, post_count)) + posted_job = {} + for j in job_list: + if j.uuid == job.uuid: + posted_job = j + + self.assertDictEqual(posted_job.__dict__, job.__dict__, + "Job in jobboard differs from job returned by " + "Client.post method") + + pre_count = self.tf_client.jobboard.job_count + self.assertGreater(pre_count, 0, "Job count is expected to be greater " + "than 0 !(%d > 0)" % pre_count) + + t = threading.Thread(target=self.tf_service.start) + t.start() + time.sleep(1) + post_count = self.tf_client.jobboard.job_count + expected = 0 + + self.assertGreater(pre_count, post_count, "Job count before starting " + "the taskflow service is expected to be greater " + "than after starting the service !(%d > %d)" + % (pre_count, post_count)) + + self.assertEqual(expected, post_count, + "expected %d jobs in the jobboard after a claim, " + "got %d" % (expected, post_count)) + + self.tf_service.stop() + self.tf_service.wait() diff --git a/cue/tests/unit/taskflow/__init__.py b/cue/tests/unit/taskflow/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cue/tests/unit/taskflow/test_service.py b/cue/tests/unit/taskflow/test_service.py new file mode 100644 index 00000000..44d0f3f7 --- /dev/null +++ b/cue/tests/unit/taskflow/test_service.py @@ -0,0 +1,128 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import signal +import threading + +import mock + +import cue.taskflow.service as tf_service +from cue.tests.unit import base + + +class TaskflowServiceTest(base.UnitTestCase): + def setUp(self): + super(TaskflowServiceTest, self).setUp() + + def tearDown(self): + super(TaskflowServiceTest, self).tearDown() + + @mock.patch('cue.taskflow.client.create_persistence') + @mock.patch('cue.taskflow.client.create_jobboard') + @mock.patch('signal.pause') + @mock.patch('threading.Thread.start') + def test_service_thread_launch(self, mock_thread_start, mock_signal_pause, + mock_create_jobboard, + mock_create_persistence): + assert threading.Thread.start is mock_thread_start + assert signal.pause is mock_signal_pause + service = tf_service.ConductorService.create("service_test") + service.start() + self.assertTrue(mock_thread_start.called, + "Service thread was not started") + self.assertTrue(mock_signal_pause.called, + "signal.pause was not called") + service.stop() + + @mock.patch('cue.taskflow.client.create_persistence') + @mock.patch('cue.taskflow.client.create_jobboard') + @mock.patch('signal.pause') + @mock.patch('threading.Thread.start') + def test_create_jobboard_connection(self, mock_thread_start, + mock_signal_pause, + mock_create_jobboard, + mock_create_persistence): + assert threading.Thread.start is mock_thread_start + assert signal.pause is mock_signal_pause + + class Closable(object): + def close(self): + pass + + mock_create_jobboard.return_value = Closable() + mock_create_persistence.return_value = Closable() + + service = tf_service.ConductorService.create("service_test") + service.start() + self.assertTrue(mock_create_persistence.called, + "Persistence was not created") + self.assertTrue(mock_create_jobboard.called, + "Jobboard was not created") + service.stop() + + @mock.patch('cue.taskflow.client.create_persistence') + @mock.patch('cue.taskflow.client.create_jobboard') + @mock.patch('signal.signal') + def test_set_sighandler(self, mock_signal_signal, mock_create_jobboard, + mock_create_persistence): + assert signal.signal is mock_signal_signal + + # Set signal handler + service = tf_service.ConductorService.create("service_test") + service.handle_signals() + self.assertTrue(mock_signal_signal.called, + "Signal handler not called") + + @mock.patch('cue.taskflow.client.create_persistence') + @mock.patch('cue.taskflow.client.create_jobboard') + @mock.patch('signal.signal') + def test_change_sighandler(self, mock_signal_signal, mock_create_jobboard, + mock_create_persistence): + assert signal.signal is mock_signal_signal + + # Set signal handler + service = tf_service.ConductorService.create("service_test") + service.handle_signals() + self.assertTrue(mock_signal_signal.called, + "Signal handler not called") + mock_signal_signal.called = False + + # Change signal handler + service.handle_signals(signals=[signal.SIGQUIT]) + self.assertTrue(mock_signal_signal.called, + "Signal handler not called") + + @mock.patch('cue.taskflow.client.create_persistence') + @mock.patch('cue.taskflow.client.create_jobboard') + @mock.patch('signal.signal') + @mock.patch('signal.pause') + @mock.patch('threading.Thread.start') + def test_trigger_sighandler(self, mock_thread_start, mock_signal_pause, + mock_signal_signal, mock_create_jobboard, + mock_create_persistence): + assert signal.signal is mock_signal_signal + + # Call sig handler + service = tf_service.ConductorService.create("service_test") + service.start() + service.sighandler(signal.SIGQUIT, None) + self.assertTrue(mock_signal_signal.called, + "Signal handler not called") + service.stop() + + def test_unsupported_engine(self): + engine_conf = {'engine': 'worker'} + self.assertRaises(ValueError, tf_service.ConductorService, + engine_conf=engine_conf) \ No newline at end of file