diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py index 53c15ecf69..27c1b6aa8c 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_policy.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_policy.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -20,34 +21,6 @@ from openstackclient.tests.functional.network.v2 import common class NetworkQosPolicyTests(common.NetworkTests): """Functional tests for QoS policy""" - HEADERS = ['Name'] - FIELDS = ['name'] - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.NAME = uuid.uuid4().hex - - opts = cls.get_opts(cls.FIELDS) - raw_output = cls.openstack( - 'network qos policy create ' + - cls.NAME + - opts - ) - cls.assertOutput(cls.NAME + "\n", raw_output) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos policy delete ' + - cls.NAME - ) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosPolicyTests, cls).tearDownClass() def setUp(self): super(NetworkQosPolicyTests, self).setUp() @@ -55,33 +28,46 @@ class NetworkQosPolicyTests(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.NAME = uuid.uuid4().hex + cmd_output = json.loads(self.openstack( + 'network qos policy create -f json ' + + self.NAME + )) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.NAME) + self.assertEqual(self.NAME, cmd_output['name']) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos policy delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + raw_output = self.openstack( + 'network qos policy delete ' + policy_name) + self.assertEqual('', raw_output) + def test_qos_policy_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos policy list' + opts) - self.assertIn(self.NAME, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy list -f json')) + self.assertIn(self.NAME, [p['Name'] for p in cmd_output]) def test_qos_policy_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual(self.NAME + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertEqual(self.NAME, cmd_output['name']) def test_qos_policy_set(self): self.openstack('network qos policy set --share ' + self.NAME) - opts = self.get_opts(['shared']) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual("True\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertTrue(cmd_output['shared']) def test_qos_policy_default(self): self.openstack('network qos policy set --default ' + self.NAME) - opts = self.get_opts(['is_default']) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual("True\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertTrue(cmd_output['is_default']) self.openstack('network qos policy set --no-default ' + self.NAME) - opts = self.get_opts(['is_default']) - raw_output = self.openstack('network qos policy show ' + self.NAME + - opts) - self.assertEqual("False\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos policy show -f json ' + self.NAME)) + self.assertFalse(cmd_output['is_default']) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py index 8b34422fb1..770abe94fc 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_rule.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import json import uuid from openstackclient.tests.functional.network.v2 import common @@ -20,51 +21,6 @@ from openstackclient.tests.functional.network.v2 import common class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests): """Functional tests for QoS minimum bandwidth rule""" - RULE_ID = None - MIN_KBPS = 2800 - MIN_KBPS_MODIFIED = 7500 - DIRECTION = '--egress' - HEADERS = ['ID'] - FIELDS = ['id'] - TYPE = 'minimum-bandwidth' - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - cls.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex - - opts = cls.get_opts(cls.FIELDS) - cls.openstack( - 'network qos policy create ' + - cls.QOS_POLICY_NAME - ) - cls.RULE_ID = cls.openstack( - 'network qos rule create ' + - '--type ' + cls.TYPE + ' ' + - '--min-kbps ' + str(cls.MIN_KBPS) + ' ' + - cls.DIRECTION + ' ' + - cls.QOS_POLICY_NAME + - opts - ) - cls.assertsOutputNotNone(cls.RULE_ID) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos rule delete ' + - cls.QOS_POLICY_NAME + ' ' + - cls.RULE_ID - ) - cls.openstack( - 'network qos policy delete ' + - cls.QOS_POLICY_NAME - ) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosRuleTestsMinimumBandwidth, cls).tearDownClass() def setUp(self): super(NetworkQosRuleTestsMinimumBandwidth, self).setUp() @@ -72,72 +28,67 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + + self.openstack( + 'network qos policy create ' + + self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type minimum-bandwidth ' + + '--min-kbps 2800 ' + + '--egress ' + + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete ' + + self.QOS_POLICY_NAME + ' ' + + self.RULE_ID) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete ' + policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type minimum-bandwidth ' + + '--min-kbps 2800 ' + + '--egress ' + policy_name + )) + raw_output = self.openstack( + 'network qos rule delete ' + + policy_name + ' ' + rule['id']) + self.assertEqual('', raw_output) + def test_qos_rule_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos rule list ' - + self.QOS_POLICY_NAME + opts) - self.assertIn(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json ' + self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) def test_qos_rule_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(self.RULE_ID, cmd_output['id']) def test_qos_rule_set(self): - self.openstack('network qos rule set --min-kbps ' + - str(self.MIN_KBPS_MODIFIED) + ' ' + + self.openstack('network qos rule set --min-kbps 7500 ' + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) - opts = self.get_opts(['min_kbps']) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(7500, cmd_output['min_kbps']) class NetworkQosRuleTestsDSCPMarking(common.NetworkTests): """Functional tests for QoS DSCP marking rule""" - RULE_ID = None - QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex - DSCP_MARK = 8 - DSCP_MARK_MODIFIED = 32 - HEADERS = ['ID'] - FIELDS = ['id'] - TYPE = 'dscp-marking' - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - opts = cls.get_opts(cls.FIELDS) - cls.openstack( - 'network qos policy create ' + - cls.QOS_POLICY_NAME - ) - cls.RULE_ID = cls.openstack( - 'network qos rule create ' + - '--type ' + cls.TYPE + ' ' + - '--dscp-mark ' + str(cls.DSCP_MARK) + ' ' + - cls.QOS_POLICY_NAME + - opts - ) - cls.assertsOutputNotNone(cls.RULE_ID) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos rule delete ' + - cls.QOS_POLICY_NAME + ' ' + - cls.RULE_ID - ) - cls.openstack( - 'network qos policy delete ' + cls.QOS_POLICY_NAME) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosRuleTestsDSCPMarking, cls).tearDownClass() def setUp(self): super(NetworkQosRuleTestsDSCPMarking, self).setUp() @@ -145,78 +96,63 @@ class NetworkQosRuleTestsDSCPMarking(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + self.openstack( + 'network qos policy create ' + + self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type dscp-marking ' + + '--dscp-mark 8 ' + + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete ' + policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type dscp-marking ' + + '--dscp-mark 8 ' + policy_name + )) + raw_output = self.openstack( + 'network qos rule delete ' + + policy_name + ' ' + rule['id']) + self.assertEqual('', raw_output) + def test_qos_rule_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos rule list ' - + self.QOS_POLICY_NAME + opts) - self.assertIn(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json ' + self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) def test_qos_rule_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(self.RULE_ID, cmd_output['id']) def test_qos_rule_set(self): - self.openstack('network qos rule set --dscp-mark ' + - str(self.DSCP_MARK_MODIFIED) + ' ' + + self.openstack('network qos rule set --dscp-mark 32 ' + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) - opts = self.get_opts(['dscp_mark']) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(32, cmd_output['dscp_mark']) class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests): """Functional tests for QoS bandwidth limit rule""" - RULE_ID = None - QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex - MAX_KBPS = 10000 - MAX_KBPS_MODIFIED = 15000 - MAX_BURST_KBITS = 1400 - MAX_BURST_KBITS_MODIFIED = 1800 - RULE_DIRECTION = 'egress' - RULE_DIRECTION_MODIFIED = 'ingress' - HEADERS = ['ID'] - FIELDS = ['id'] - TYPE = 'bandwidth-limit' - - @classmethod - def setUpClass(cls): - common.NetworkTests.setUpClass() - if cls.haz_network: - opts = cls.get_opts(cls.FIELDS) - cls.openstack( - 'network qos policy create ' + - cls.QOS_POLICY_NAME - ) - cls.RULE_ID = cls.openstack( - 'network qos rule create ' + - '--type ' + cls.TYPE + ' ' + - '--max-kbps ' + str(cls.MAX_KBPS) + ' ' + - '--max-burst-kbits ' + str(cls.MAX_BURST_KBITS) + ' ' + - '--' + cls.RULE_DIRECTION + ' ' + - cls.QOS_POLICY_NAME + - opts - ) - cls.assertsOutputNotNone(cls.RULE_ID) - - @classmethod - def tearDownClass(cls): - try: - if cls.haz_network: - raw_output = cls.openstack( - 'network qos rule delete ' + - cls.QOS_POLICY_NAME + ' ' + - cls.RULE_ID - ) - cls.openstack( - 'network qos policy delete ' + cls.QOS_POLICY_NAME) - cls.assertOutput('', raw_output) - finally: - super(NetworkQosRuleTestsBandwidthLimit, cls).tearDownClass() def setUp(self): super(NetworkQosRuleTestsBandwidthLimit, self).setUp() @@ -224,30 +160,65 @@ class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests): if not self.haz_network: self.skipTest("No Network service present") + self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex + self.openstack( + 'network qos policy create ' + + self.QOS_POLICY_NAME + ) + self.addCleanup(self.openstack, + 'network qos policy delete ' + self.QOS_POLICY_NAME) + cmd_output = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type bandwidth-limit ' + + '--max-kbps 10000 ' + + '--max-burst-kbits 1400 ' + + '--egress ' + + self.QOS_POLICY_NAME + )) + self.RULE_ID = cmd_output['id'] + self.addCleanup(self.openstack, + 'network qos rule delete ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) + self.assertTrue(self.RULE_ID) + + def test_qos_rule_create_delete(self): + # This is to check the output of qos rule delete + policy_name = uuid.uuid4().hex + self.openstack('network qos policy create -f json ' + policy_name) + self.addCleanup(self.openstack, + 'network qos policy delete ' + policy_name) + rule = json.loads(self.openstack( + 'network qos rule create -f json ' + + '--type bandwidth-limit ' + + '--max-kbps 10000 ' + + '--max-burst-kbits 1400 ' + + '--egress ' + policy_name + )) + raw_output = self.openstack( + 'network qos rule delete ' + + policy_name + ' ' + rule['id']) + self.assertEqual('', raw_output) + def test_qos_rule_list(self): - opts = self.get_opts(self.HEADERS) - raw_output = self.openstack('network qos rule list ' - + self.QOS_POLICY_NAME + opts) - self.assertIn(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule list -f json ' + + self.QOS_POLICY_NAME)) + self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output]) def test_qos_rule_show(self): - opts = self.get_opts(self.FIELDS) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - self.assertEqual(self.RULE_ID, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(self.RULE_ID, cmd_output['id']) def test_qos_rule_set(self): - self.openstack('network qos rule set --max-kbps ' + - str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' + - str(self.MAX_BURST_KBITS_MODIFIED) + ' ' + - '--' + self.RULE_DIRECTION_MODIFIED + ' ' + + self.openstack('network qos rule set --max-kbps 15000 ' + + '--max-burst-kbits 1800 ' + + '--ingress ' + self.QOS_POLICY_NAME + ' ' + self.RULE_ID) - opts = self.get_opts(['direction', 'max_burst_kbps', 'max_kbps']) - raw_output = self.openstack('network qos rule show ' + - self.QOS_POLICY_NAME + ' ' + self.RULE_ID + - opts) - expected = (str(self.RULE_DIRECTION_MODIFIED) + "\n" + - str(self.MAX_BURST_KBITS_MODIFIED) + "\n" + - str(self.MAX_KBPS_MODIFIED) + "\n") - self.assertEqual(expected, raw_output) + cmd_output = json.loads(self.openstack( + 'network qos rule show -f json ' + + self.QOS_POLICY_NAME + ' ' + self.RULE_ID)) + self.assertEqual(15000, cmd_output['max_kbps']) + self.assertEqual(1800, cmd_output['max_burst_kbps']) + self.assertEqual('ingress', cmd_output['direction']) diff --git a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py index d76129367c..a6ee3e1006 100644 --- a/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py +++ b/openstackclient/tests/functional/network/v2/test_network_qos_rule_type.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +import json + from openstackclient.tests.functional.network.v2 import common @@ -29,6 +31,7 @@ class NetworkQosRuleTypeTests(common.NetworkTests): self.skipTest("No Network service present") def test_qos_rule_type_list(self): - raw_output = self.openstack('network qos rule type list') + cmd_output = json.loads(self.openstack( + 'network qos rule type list -f json')) for rule_type in self.AVAILABLE_RULE_TYPES: - self.assertIn(rule_type, raw_output) + self.assertIn(rule_type, [x['Type'] for x in cmd_output])