Merge "Convert network qos functional tests to JSON"
This commit is contained in:
commit
50099d3c72
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from openstackclient.tests.functional.network.v2 import common
|
||||
@ -20,34 +21,6 @@ from openstackclient.tests.functional.network.v2 import common
|
||||
|
||||
class NetworkQosPolicyTests(common.NetworkTests):
|
||||
"""Functional tests for QoS policy"""
|
||||
HEADERS = ['Name']
|
||||
FIELDS = ['name']
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
common.NetworkTests.setUpClass()
|
||||
if cls.haz_network:
|
||||
cls.NAME = uuid.uuid4().hex
|
||||
|
||||
opts = cls.get_opts(cls.FIELDS)
|
||||
raw_output = cls.openstack(
|
||||
'network qos policy create ' +
|
||||
cls.NAME +
|
||||
opts
|
||||
)
|
||||
cls.assertOutput(cls.NAME + "\n", raw_output)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
try:
|
||||
if cls.haz_network:
|
||||
raw_output = cls.openstack(
|
||||
'network qos policy delete ' +
|
||||
cls.NAME
|
||||
)
|
||||
cls.assertOutput('', raw_output)
|
||||
finally:
|
||||
super(NetworkQosPolicyTests, cls).tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
super(NetworkQosPolicyTests, self).setUp()
|
||||
@ -55,33 +28,46 @@ class NetworkQosPolicyTests(common.NetworkTests):
|
||||
if not self.haz_network:
|
||||
self.skipTest("No Network service present")
|
||||
|
||||
self.NAME = uuid.uuid4().hex
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos policy create -f json ' +
|
||||
self.NAME
|
||||
))
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + self.NAME)
|
||||
self.assertEqual(self.NAME, cmd_output['name'])
|
||||
|
||||
def test_qos_rule_create_delete(self):
|
||||
# This is to check the output of qos policy delete
|
||||
policy_name = uuid.uuid4().hex
|
||||
self.openstack('network qos policy create -f json ' + policy_name)
|
||||
raw_output = self.openstack(
|
||||
'network qos policy delete ' + policy_name)
|
||||
self.assertEqual('', raw_output)
|
||||
|
||||
def test_qos_policy_list(self):
|
||||
opts = self.get_opts(self.HEADERS)
|
||||
raw_output = self.openstack('network qos policy list' + opts)
|
||||
self.assertIn(self.NAME, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos policy list -f json'))
|
||||
self.assertIn(self.NAME, [p['Name'] for p in cmd_output])
|
||||
|
||||
def test_qos_policy_show(self):
|
||||
opts = self.get_opts(self.FIELDS)
|
||||
raw_output = self.openstack('network qos policy show ' + self.NAME +
|
||||
opts)
|
||||
self.assertEqual(self.NAME + "\n", raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos policy show -f json ' + self.NAME))
|
||||
self.assertEqual(self.NAME, cmd_output['name'])
|
||||
|
||||
def test_qos_policy_set(self):
|
||||
self.openstack('network qos policy set --share ' + self.NAME)
|
||||
opts = self.get_opts(['shared'])
|
||||
raw_output = self.openstack('network qos policy show ' + self.NAME +
|
||||
opts)
|
||||
self.assertEqual("True\n", raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos policy show -f json ' + self.NAME))
|
||||
self.assertTrue(cmd_output['shared'])
|
||||
|
||||
def test_qos_policy_default(self):
|
||||
self.openstack('network qos policy set --default ' + self.NAME)
|
||||
opts = self.get_opts(['is_default'])
|
||||
raw_output = self.openstack('network qos policy show ' + self.NAME +
|
||||
opts)
|
||||
self.assertEqual("True\n", raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos policy show -f json ' + self.NAME))
|
||||
self.assertTrue(cmd_output['is_default'])
|
||||
|
||||
self.openstack('network qos policy set --no-default ' + self.NAME)
|
||||
opts = self.get_opts(['is_default'])
|
||||
raw_output = self.openstack('network qos policy show ' + self.NAME +
|
||||
opts)
|
||||
self.assertEqual("False\n", raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos policy show -f json ' + self.NAME))
|
||||
self.assertFalse(cmd_output['is_default'])
|
||||
|
@ -13,6 +13,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from openstackclient.tests.functional.network.v2 import common
|
||||
@ -20,51 +21,6 @@ from openstackclient.tests.functional.network.v2 import common
|
||||
|
||||
class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
|
||||
"""Functional tests for QoS minimum bandwidth rule"""
|
||||
RULE_ID = None
|
||||
MIN_KBPS = 2800
|
||||
MIN_KBPS_MODIFIED = 7500
|
||||
DIRECTION = '--egress'
|
||||
HEADERS = ['ID']
|
||||
FIELDS = ['id']
|
||||
TYPE = 'minimum-bandwidth'
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
common.NetworkTests.setUpClass()
|
||||
if cls.haz_network:
|
||||
cls.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
|
||||
|
||||
opts = cls.get_opts(cls.FIELDS)
|
||||
cls.openstack(
|
||||
'network qos policy create ' +
|
||||
cls.QOS_POLICY_NAME
|
||||
)
|
||||
cls.RULE_ID = cls.openstack(
|
||||
'network qos rule create ' +
|
||||
'--type ' + cls.TYPE + ' ' +
|
||||
'--min-kbps ' + str(cls.MIN_KBPS) + ' ' +
|
||||
cls.DIRECTION + ' ' +
|
||||
cls.QOS_POLICY_NAME +
|
||||
opts
|
||||
)
|
||||
cls.assertsOutputNotNone(cls.RULE_ID)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
try:
|
||||
if cls.haz_network:
|
||||
raw_output = cls.openstack(
|
||||
'network qos rule delete ' +
|
||||
cls.QOS_POLICY_NAME + ' ' +
|
||||
cls.RULE_ID
|
||||
)
|
||||
cls.openstack(
|
||||
'network qos policy delete ' +
|
||||
cls.QOS_POLICY_NAME
|
||||
)
|
||||
cls.assertOutput('', raw_output)
|
||||
finally:
|
||||
super(NetworkQosRuleTestsMinimumBandwidth, cls).tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
super(NetworkQosRuleTestsMinimumBandwidth, self).setUp()
|
||||
@ -72,72 +28,67 @@ class NetworkQosRuleTestsMinimumBandwidth(common.NetworkTests):
|
||||
if not self.haz_network:
|
||||
self.skipTest("No Network service present")
|
||||
|
||||
self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
|
||||
|
||||
self.openstack(
|
||||
'network qos policy create ' +
|
||||
self.QOS_POLICY_NAME
|
||||
)
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + self.QOS_POLICY_NAME)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule create -f json ' +
|
||||
'--type minimum-bandwidth ' +
|
||||
'--min-kbps 2800 ' +
|
||||
'--egress ' +
|
||||
self.QOS_POLICY_NAME
|
||||
))
|
||||
self.RULE_ID = cmd_output['id']
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos rule delete ' +
|
||||
self.QOS_POLICY_NAME + ' ' +
|
||||
self.RULE_ID)
|
||||
self.assertTrue(self.RULE_ID)
|
||||
|
||||
def test_qos_rule_create_delete(self):
|
||||
# This is to check the output of qos rule delete
|
||||
policy_name = uuid.uuid4().hex
|
||||
self.openstack('network qos policy create -f json ' + policy_name)
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + policy_name)
|
||||
rule = json.loads(self.openstack(
|
||||
'network qos rule create -f json ' +
|
||||
'--type minimum-bandwidth ' +
|
||||
'--min-kbps 2800 ' +
|
||||
'--egress ' + policy_name
|
||||
))
|
||||
raw_output = self.openstack(
|
||||
'network qos rule delete ' +
|
||||
policy_name + ' ' + rule['id'])
|
||||
self.assertEqual('', raw_output)
|
||||
|
||||
def test_qos_rule_list(self):
|
||||
opts = self.get_opts(self.HEADERS)
|
||||
raw_output = self.openstack('network qos rule list '
|
||||
+ self.QOS_POLICY_NAME + opts)
|
||||
self.assertIn(self.RULE_ID, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule list -f json ' + self.QOS_POLICY_NAME))
|
||||
self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
|
||||
|
||||
def test_qos_rule_show(self):
|
||||
opts = self.get_opts(self.FIELDS)
|
||||
raw_output = self.openstack('network qos rule show ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
|
||||
opts)
|
||||
self.assertEqual(self.RULE_ID, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule show -f json ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
|
||||
self.assertEqual(self.RULE_ID, cmd_output['id'])
|
||||
|
||||
def test_qos_rule_set(self):
|
||||
self.openstack('network qos rule set --min-kbps ' +
|
||||
str(self.MIN_KBPS_MODIFIED) + ' ' +
|
||||
self.openstack('network qos rule set --min-kbps 7500 ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
|
||||
opts = self.get_opts(['min_kbps'])
|
||||
raw_output = self.openstack('network qos rule show ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
|
||||
opts)
|
||||
self.assertEqual(str(self.MIN_KBPS_MODIFIED) + "\n", raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule show -f json ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
|
||||
self.assertEqual(7500, cmd_output['min_kbps'])
|
||||
|
||||
|
||||
class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
|
||||
"""Functional tests for QoS DSCP marking rule"""
|
||||
RULE_ID = None
|
||||
QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
|
||||
DSCP_MARK = 8
|
||||
DSCP_MARK_MODIFIED = 32
|
||||
HEADERS = ['ID']
|
||||
FIELDS = ['id']
|
||||
TYPE = 'dscp-marking'
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
common.NetworkTests.setUpClass()
|
||||
if cls.haz_network:
|
||||
opts = cls.get_opts(cls.FIELDS)
|
||||
cls.openstack(
|
||||
'network qos policy create ' +
|
||||
cls.QOS_POLICY_NAME
|
||||
)
|
||||
cls.RULE_ID = cls.openstack(
|
||||
'network qos rule create ' +
|
||||
'--type ' + cls.TYPE + ' ' +
|
||||
'--dscp-mark ' + str(cls.DSCP_MARK) + ' ' +
|
||||
cls.QOS_POLICY_NAME +
|
||||
opts
|
||||
)
|
||||
cls.assertsOutputNotNone(cls.RULE_ID)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
try:
|
||||
if cls.haz_network:
|
||||
raw_output = cls.openstack(
|
||||
'network qos rule delete ' +
|
||||
cls.QOS_POLICY_NAME + ' ' +
|
||||
cls.RULE_ID
|
||||
)
|
||||
cls.openstack(
|
||||
'network qos policy delete ' + cls.QOS_POLICY_NAME)
|
||||
cls.assertOutput('', raw_output)
|
||||
finally:
|
||||
super(NetworkQosRuleTestsDSCPMarking, cls).tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
super(NetworkQosRuleTestsDSCPMarking, self).setUp()
|
||||
@ -145,78 +96,63 @@ class NetworkQosRuleTestsDSCPMarking(common.NetworkTests):
|
||||
if not self.haz_network:
|
||||
self.skipTest("No Network service present")
|
||||
|
||||
self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
|
||||
self.openstack(
|
||||
'network qos policy create ' +
|
||||
self.QOS_POLICY_NAME
|
||||
)
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + self.QOS_POLICY_NAME)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule create -f json ' +
|
||||
'--type dscp-marking ' +
|
||||
'--dscp-mark 8 ' +
|
||||
self.QOS_POLICY_NAME
|
||||
))
|
||||
self.RULE_ID = cmd_output['id']
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos rule delete ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
|
||||
self.assertTrue(self.RULE_ID)
|
||||
|
||||
def test_qos_rule_create_delete(self):
|
||||
# This is to check the output of qos rule delete
|
||||
policy_name = uuid.uuid4().hex
|
||||
self.openstack('network qos policy create -f json ' + policy_name)
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + policy_name)
|
||||
rule = json.loads(self.openstack(
|
||||
'network qos rule create -f json ' +
|
||||
'--type dscp-marking ' +
|
||||
'--dscp-mark 8 ' + policy_name
|
||||
))
|
||||
raw_output = self.openstack(
|
||||
'network qos rule delete ' +
|
||||
policy_name + ' ' + rule['id'])
|
||||
self.assertEqual('', raw_output)
|
||||
|
||||
def test_qos_rule_list(self):
|
||||
opts = self.get_opts(self.HEADERS)
|
||||
raw_output = self.openstack('network qos rule list '
|
||||
+ self.QOS_POLICY_NAME + opts)
|
||||
self.assertIn(self.RULE_ID, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule list -f json ' + self.QOS_POLICY_NAME))
|
||||
self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
|
||||
|
||||
def test_qos_rule_show(self):
|
||||
opts = self.get_opts(self.FIELDS)
|
||||
raw_output = self.openstack('network qos rule show ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
|
||||
opts)
|
||||
self.assertEqual(self.RULE_ID, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule show -f json ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
|
||||
self.assertEqual(self.RULE_ID, cmd_output['id'])
|
||||
|
||||
def test_qos_rule_set(self):
|
||||
self.openstack('network qos rule set --dscp-mark ' +
|
||||
str(self.DSCP_MARK_MODIFIED) + ' ' +
|
||||
self.openstack('network qos rule set --dscp-mark 32 ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
|
||||
opts = self.get_opts(['dscp_mark'])
|
||||
raw_output = self.openstack('network qos rule show ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
|
||||
opts)
|
||||
self.assertEqual(str(self.DSCP_MARK_MODIFIED) + "\n", raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule show -f json ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
|
||||
self.assertEqual(32, cmd_output['dscp_mark'])
|
||||
|
||||
|
||||
class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests):
|
||||
"""Functional tests for QoS bandwidth limit rule"""
|
||||
RULE_ID = None
|
||||
QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
|
||||
MAX_KBPS = 10000
|
||||
MAX_KBPS_MODIFIED = 15000
|
||||
MAX_BURST_KBITS = 1400
|
||||
MAX_BURST_KBITS_MODIFIED = 1800
|
||||
RULE_DIRECTION = 'egress'
|
||||
RULE_DIRECTION_MODIFIED = 'ingress'
|
||||
HEADERS = ['ID']
|
||||
FIELDS = ['id']
|
||||
TYPE = 'bandwidth-limit'
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
common.NetworkTests.setUpClass()
|
||||
if cls.haz_network:
|
||||
opts = cls.get_opts(cls.FIELDS)
|
||||
cls.openstack(
|
||||
'network qos policy create ' +
|
||||
cls.QOS_POLICY_NAME
|
||||
)
|
||||
cls.RULE_ID = cls.openstack(
|
||||
'network qos rule create ' +
|
||||
'--type ' + cls.TYPE + ' ' +
|
||||
'--max-kbps ' + str(cls.MAX_KBPS) + ' ' +
|
||||
'--max-burst-kbits ' + str(cls.MAX_BURST_KBITS) + ' ' +
|
||||
'--' + cls.RULE_DIRECTION + ' ' +
|
||||
cls.QOS_POLICY_NAME +
|
||||
opts
|
||||
)
|
||||
cls.assertsOutputNotNone(cls.RULE_ID)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
try:
|
||||
if cls.haz_network:
|
||||
raw_output = cls.openstack(
|
||||
'network qos rule delete ' +
|
||||
cls.QOS_POLICY_NAME + ' ' +
|
||||
cls.RULE_ID
|
||||
)
|
||||
cls.openstack(
|
||||
'network qos policy delete ' + cls.QOS_POLICY_NAME)
|
||||
cls.assertOutput('', raw_output)
|
||||
finally:
|
||||
super(NetworkQosRuleTestsBandwidthLimit, cls).tearDownClass()
|
||||
|
||||
def setUp(self):
|
||||
super(NetworkQosRuleTestsBandwidthLimit, self).setUp()
|
||||
@ -224,30 +160,65 @@ class NetworkQosRuleTestsBandwidthLimit(common.NetworkTests):
|
||||
if not self.haz_network:
|
||||
self.skipTest("No Network service present")
|
||||
|
||||
self.QOS_POLICY_NAME = 'qos_policy_' + uuid.uuid4().hex
|
||||
self.openstack(
|
||||
'network qos policy create ' +
|
||||
self.QOS_POLICY_NAME
|
||||
)
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + self.QOS_POLICY_NAME)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule create -f json ' +
|
||||
'--type bandwidth-limit ' +
|
||||
'--max-kbps 10000 ' +
|
||||
'--max-burst-kbits 1400 ' +
|
||||
'--egress ' +
|
||||
self.QOS_POLICY_NAME
|
||||
))
|
||||
self.RULE_ID = cmd_output['id']
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos rule delete ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
|
||||
self.assertTrue(self.RULE_ID)
|
||||
|
||||
def test_qos_rule_create_delete(self):
|
||||
# This is to check the output of qos rule delete
|
||||
policy_name = uuid.uuid4().hex
|
||||
self.openstack('network qos policy create -f json ' + policy_name)
|
||||
self.addCleanup(self.openstack,
|
||||
'network qos policy delete ' + policy_name)
|
||||
rule = json.loads(self.openstack(
|
||||
'network qos rule create -f json ' +
|
||||
'--type bandwidth-limit ' +
|
||||
'--max-kbps 10000 ' +
|
||||
'--max-burst-kbits 1400 ' +
|
||||
'--egress ' + policy_name
|
||||
))
|
||||
raw_output = self.openstack(
|
||||
'network qos rule delete ' +
|
||||
policy_name + ' ' + rule['id'])
|
||||
self.assertEqual('', raw_output)
|
||||
|
||||
def test_qos_rule_list(self):
|
||||
opts = self.get_opts(self.HEADERS)
|
||||
raw_output = self.openstack('network qos rule list '
|
||||
+ self.QOS_POLICY_NAME + opts)
|
||||
self.assertIn(self.RULE_ID, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule list -f json '
|
||||
+ self.QOS_POLICY_NAME))
|
||||
self.assertIn(self.RULE_ID, [rule['ID'] for rule in cmd_output])
|
||||
|
||||
def test_qos_rule_show(self):
|
||||
opts = self.get_opts(self.FIELDS)
|
||||
raw_output = self.openstack('network qos rule show ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
|
||||
opts)
|
||||
self.assertEqual(self.RULE_ID, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule show -f json ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
|
||||
self.assertEqual(self.RULE_ID, cmd_output['id'])
|
||||
|
||||
def test_qos_rule_set(self):
|
||||
self.openstack('network qos rule set --max-kbps ' +
|
||||
str(self.MAX_KBPS_MODIFIED) + ' --max-burst-kbits ' +
|
||||
str(self.MAX_BURST_KBITS_MODIFIED) + ' ' +
|
||||
'--' + self.RULE_DIRECTION_MODIFIED + ' ' +
|
||||
self.openstack('network qos rule set --max-kbps 15000 ' +
|
||||
'--max-burst-kbits 1800 ' +
|
||||
'--ingress ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID)
|
||||
opts = self.get_opts(['direction', 'max_burst_kbps', 'max_kbps'])
|
||||
raw_output = self.openstack('network qos rule show ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID +
|
||||
opts)
|
||||
expected = (str(self.RULE_DIRECTION_MODIFIED) + "\n" +
|
||||
str(self.MAX_BURST_KBITS_MODIFIED) + "\n" +
|
||||
str(self.MAX_KBPS_MODIFIED) + "\n")
|
||||
self.assertEqual(expected, raw_output)
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule show -f json ' +
|
||||
self.QOS_POLICY_NAME + ' ' + self.RULE_ID))
|
||||
self.assertEqual(15000, cmd_output['max_kbps'])
|
||||
self.assertEqual(1800, cmd_output['max_burst_kbps'])
|
||||
self.assertEqual('ingress', cmd_output['direction'])
|
||||
|
@ -13,6 +13,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import json
|
||||
|
||||
from openstackclient.tests.functional.network.v2 import common
|
||||
|
||||
|
||||
@ -29,6 +31,7 @@ class NetworkQosRuleTypeTests(common.NetworkTests):
|
||||
self.skipTest("No Network service present")
|
||||
|
||||
def test_qos_rule_type_list(self):
|
||||
raw_output = self.openstack('network qos rule type list')
|
||||
cmd_output = json.loads(self.openstack(
|
||||
'network qos rule type list -f json'))
|
||||
for rule_type in self.AVAILABLE_RULE_TYPES:
|
||||
self.assertIn(rule_type, raw_output)
|
||||
self.assertIn(rule_type, [x['Type'] for x in cmd_output])
|
||||
|
Loading…
Reference in New Issue
Block a user