add test_utils.py

Change-Id: Ic101a648627c69aab04177474b690b966cb4aae2
This commit is contained in:
lu-meihong 2019-06-17 20:44:03 -07:00
parent 232ac73a79
commit 98ae755d78

View File

@ -0,0 +1,93 @@
# (c) Copyright 2019 ZTE Corporation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import shutil
import sys
import tempfile
import unittest
import mock
from mock import patch
from freezer.scheduler import utils
job_list = [{"job_id": "test"}]
class TestUtils(unittest.TestCase):
def setUp(self):
self.client = mock.Mock()
self.client.clients.create = mock.Mock(return_value='test')
self.client.jobs.list = mock.Mock(return_value=job_list)
self.client.client_id = "test"
def test_do_register(self):
ret = utils.do_register(self.client, args=None)
self.assertEqual(0, ret)
def test_find_config_files(self):
temp = tempfile.NamedTemporaryFile('wb', delete=True,
suffix='.conf')
ret = utils.find_config_files(temp.name)
self.assertEqual([temp.name], ret)
temp.close()
self.assertFalse(os.path.exists(temp.name))
@unittest.skipIf(sys.version_info.major == 3,
'Not supported on python v 3.x')
def test_find_config_files_path(self):
temp = tempfile.NamedTemporaryFile('wb', delete=True,
suffix='.conf')
temp_path = os.path.dirname(temp.name)
ret = utils.find_config_files(temp_path)
self.assertEqual([temp.name], ret)
temp.close()
self.assertFalse(os.path.exists(temp.name))
@unittest.skipIf(sys.version_info.major == 3,
'Not supported on python v 3.x')
def test_get_jobs_from_disk(self):
temp = tempfile.mkdtemp()
file = '/'.join([temp, "test.conf"])
data = '{"job_id": "test"}'
with open(file, 'wb') as f:
f.write(data)
ret = utils.get_jobs_from_disk(temp)
self.assertEqual(job_list, ret)
shutil.rmtree(temp)
self.assertFalse(os.path.exists(file))
def test_save_jobs_to_disk(self):
job_doc_list = job_list
tmpdir = tempfile.mkdtemp()
utils.save_jobs_to_disk(job_doc_list, tmpdir)
file = '/'.join([tmpdir, "job_test.conf"])
self.assertTrue(os.path.exists(file))
shutil.rmtree(tmpdir)
def test_get_active_jobs_from_api(self):
ret = utils.get_active_jobs_from_api(self.client)
self.assertEqual(job_list, ret)
@patch('psutil.Process')
def test_terminate_subprocess(self, mock_process_constructor):
mock_pro = mock_process_constructor.return_value
seffect = mock.Mock(
side_effect=Exception('Process 35 does not exists anymore'))
mock_pro.raiseError.side_effect = seffect
with self.assertRaises(Exception) as cm: # noqa
utils.terminate_subprocess(35, "test")
the_exception = cm.exception
self.assertIn('does not exists anymore',
str(the_exception))