# Copyright 2016 Canonical Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import mock import pcmk import unittest from distutils.version import StrictVersion CRM_CONFIGURE_SHOW_XML = ''' ''' # noqa CRM_CONFIGURE_SHOW_XML_MAINT_MODE_TRUE = ''' ''' # noqa class TestPcmk(unittest.TestCase): @mock.patch('commands.getstatusoutput') def test_crm_res_running_true(self, getstatusoutput): getstatusoutput.return_value = (0, ("resource res_nova_consoleauth is " "running on: juju-xxx-machine-6")) self.assertTrue(pcmk.crm_res_running('res_nova_consoleauth')) @mock.patch('commands.getstatusoutput') def test_crm_res_running_stopped(self, getstatusoutput): getstatusoutput.return_value = (0, ("resource res_nova_consoleauth is " "NOT running")) self.assertFalse(pcmk.crm_res_running('res_nova_consoleauth')) @mock.patch('commands.getstatusoutput') def test_crm_res_running_undefined(self, getstatusoutput): getstatusoutput.return_value = (1, "foobar") self.assertFalse(pcmk.crm_res_running('res_nova_consoleauth')) @mock.patch('socket.gethostname') @mock.patch('commands.getstatusoutput') def test_wait_for_pcmk(self, getstatusoutput, gethostname): # Pacemaker is down gethostname.return_value = 'hanode-1' getstatusoutput.return_value = (1, 'Not the hostname') with self.assertRaises(pcmk.ServicesNotUp): pcmk.wait_for_pcmk(retries=2, sleep=0) # Pacemaker is up gethostname.return_value = 'hanode-1' getstatusoutput.return_value = (0, 'Hosname: hanode-1') self.assertTrue(pcmk.wait_for_pcmk(retries=2, sleep=0)) @mock.patch('subprocess.check_output') def test_crm_version(self, mock_check_output): # xenial mock_check_output.return_value = "crm 2.2.0\n" ret = pcmk.crm_version() self.assertEqual(StrictVersion('2.2.0'), ret) mock_check_output.assert_called_with(['crm', '--version'], universal_newlines=True) # trusty mock_check_output.mock_reset() mock_check_output.return_value = ("1.2.5 (Build f2f315daf6a5fd7ddea8e5" "64cd289aa04218427d)\n") ret = pcmk.crm_version() self.assertEqual(StrictVersion('1.2.5'), ret) mock_check_output.assert_called_with(['crm', '--version'], universal_newlines=True) @mock.patch('subprocess.check_output') @mock.patch.object(pcmk, 'crm_version') def test_get_property(self, mock_crm_version, mock_check_output): mock_crm_version.return_value = StrictVersion('2.2.0') # xenial mock_check_output.return_value = 'false\n' self.assertEqual('false\n', pcmk.get_property('maintenance-mode')) mock_check_output.assert_called_with(['crm', 'configure', 'show-property', 'maintenance-mode'], universal_newlines=True) mock_crm_version.return_value = StrictVersion('2.4.0') mock_check_output.reset_mock() self.assertEqual('false\n', pcmk.get_property('maintenance-mode')) mock_check_output.assert_called_with(['crm', 'configure', 'get-property', 'maintenance-mode'], universal_newlines=True) @mock.patch('subprocess.check_output') @mock.patch.object(pcmk, 'crm_version') def test_get_property_from_xml(self, mock_crm_version, mock_check_output): mock_crm_version.return_value = StrictVersion('1.2.5') # trusty mock_check_output.return_value = CRM_CONFIGURE_SHOW_XML self.assertRaises(pcmk.PropertyNotFound, pcmk.get_property, 'maintenance-mode') mock_check_output.assert_called_with(['crm', 'configure', 'show', 'xml'], universal_newlines=True) mock_check_output.reset_mock() mock_check_output.return_value = CRM_CONFIGURE_SHOW_XML_MAINT_MODE_TRUE self.assertEqual('true', pcmk.get_property('maintenance-mode')) mock_check_output.assert_called_with(['crm', 'configure', 'show', 'xml'], universal_newlines=True) @mock.patch('subprocess.check_output') def test_set_property(self, mock_check_output): pcmk.set_property('maintenance-mode', 'false') mock_check_output.assert_called_with(['crm', 'configure', 'property', 'maintenance-mode=false'], universal_newlines=True)