Files
python-cloudkittyclient/cloudkittyclient/tests/functional/v1/test_hashmap.py
Pedro Henrique f8da9dab78 Add support to rating rules with start and end
It was introduced the concept of start and end periods in
Cloudkitty rating rules. Therefore to make Cloudkitty CLI
compatible with the Cloudkitty REST API, we need to add
those new available attributes in the CLI as well.

Change-Id: I0cd9b61fa81232d235c959da551a7840465fae88
Signed-off-by: Pedro Henrique <phpm13@gmail.com>
2025-09-02 12:22:05 -03:00

367 lines
15 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2018 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from datetime import datetime
from datetime import timedelta
from cloudkittyclient.tests.functional import base
class CkHashmapTest(base.BaseFunctionalTest):
def __init__(self, *args, **kwargs):
super(CkHashmapTest, self).__init__(*args, **kwargs)
self.runner = self.cloudkitty
def setUp(self):
super(CkHashmapTest, self).setUp()
self._fields = list()
self._services = list()
self._mappings = list()
self._groups = list()
self._thresholds = list()
def tearDown(self):
super(CkHashmapTest, self).tearDown()
for field in self._fields:
try:
self.runner(
'hashmap field delete', params=field, has_output=False)
except RuntimeError:
pass
for service in self._services:
try:
self.runner(
'hashmap service delete', params=service, has_output=False)
except RuntimeError:
pass
for group in self._groups:
try:
self.runner(
'hashmap group delete', params=group, has_output=False)
except RuntimeError:
pass
for mapping in self._mappings:
try:
self.runner(
'hashmap mapping delete', params=mapping, has_output=False)
except RuntimeError:
pass
for threshold in self._thresholds:
try:
self.runner('hashmap threshold delete',
params=threshold, has_output=False)
except RuntimeError:
pass
def test_list_mapping_types(self):
resp = self.runner('hashmap mapping-types list')
found_types = [elem['Mapping types'] for elem in resp]
self.assertIn('flat', found_types)
self.assertIn('rate', found_types)
def test_create_get_delete_service(self):
# Create service
resp = self.runner('hashmap service create', params='testservice')[0]
self.assertEqual(resp['Name'], 'testservice')
service_id = resp['Service ID']
self._services.append(service_id)
# Check that resp is the same with service get and list
resp_with_sid = self.runner(
'hashmap service get', params=service_id)
resp_without_sid = self.runner('hashmap service list')
self.assertEqual(resp_with_sid, resp_without_sid)
self.assertEqual(len(resp_with_sid), 1)
# Check that deletion works
self.runner('hashmap service delete',
params=resp['Service ID'],
has_output=False)
resp = self.runner('hashmap service list')
self.assertEqual(len(resp), 0)
def test_group_get_create_delete(self):
# Create group
resp = self.runner('hashmap group create', params='testgroup')[0]
self.assertEqual(resp['Name'], 'testgroup')
group_id = resp['Group ID']
self._groups.append(group_id)
resp = self.runner('hashmap group list')
self.assertEqual(len(resp), 1)
# Check that deletion works
self.runner('hashmap group delete',
params=group_id, has_output=False)
resp = self.runner('hashmap group list')
self.assertEqual(len(resp), 0)
def test_create_get_delete_field(self):
# Create service
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
# Create field
resp = self.runner('hashmap field create',
params='{} testfield'.format(service_id))[0]
self.assertEqual(resp['Name'], 'testfield')
self.assertEqual(resp['Service ID'], service_id)
field_id = resp['Field ID']
self._fields.append(field_id)
# Check that resp is the same with field get and list
resp_with_fid = self.runner('hashmap field get', params=field_id)
resp_with_sid = self.runner('hashmap field list', params=service_id)
self.assertEqual(resp_with_fid, resp_with_sid)
self.assertEqual(len(resp_with_fid), 1)
# Check that deletion works
self.runner(
'hashmap field delete', params=field_id, has_output=False)
# resp = self.runner(
# 'hashmap field list', params='-s {}'.format(service_id))
resp = self.runner(
'hashmap field list', params=service_id)
self.assertEqual(len(resp), 0)
def test_create_get_update_delete_mapping_service(self):
future_date = datetime.now() + timedelta(days=1)
date_iso = future_date.isoformat()
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
# Create mapping
resp = self.runner('hashmap mapping create',
params=f'-s {service_id} 12 --start {date_iso}')[0]
mapping_id = resp['Mapping ID']
self._mappings.append(mapping_id)
self.assertEqual(resp['Service ID'], service_id)
self.assertEqual(float(resp['Cost']), float(12))
# Get mapping
resp_with_sid = self.runner(
'hashmap mapping list', params='-s {}'.format(service_id))[0]
resp_with_mid = self.runner(
'hashmap mapping get', params=mapping_id)[0]
self.assertEqual(resp_with_sid, resp_with_mid)
self.assertEqual(resp_with_sid['Mapping ID'], mapping_id)
self.assertEqual(resp_with_sid['Service ID'], service_id)
self.assertEqual(float(resp_with_sid['Cost']), float(12))
# Update mapping
resp = self.runner('hashmap mapping update',
params='--cost 10 {}'.format(mapping_id))[0]
self.assertEqual(float(resp['Cost']), float(10))
# Check that deletion works
self.runner(
'hashmap mapping delete', params=mapping_id, has_output=False)
resp = self.runner(
'hashmap mapping list', params='-s {}'.format(service_id))
self.assertEqual(len(resp), 0)
self.runner(
'hashmap service delete', params=service_id, has_output=False)
def test_create_get_update_delete_mapping_field(self):
future_date = datetime.now() + timedelta(days=1)
date_iso = future_date.isoformat()
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
resp = self.runner('hashmap field create',
params='{} testfield'.format(service_id))[0]
field_id = resp['Field ID']
self._fields.append(field_id)
# Create mapping
resp = self.runner(
'hashmap mapping create',
params=f'--field-id {field_id} 12 --value '
f'testvalue --start {date_iso}')[0]
mapping_id = resp['Mapping ID']
self._mappings.append(service_id)
self.assertEqual(resp['Field ID'], field_id)
self.assertEqual(float(resp['Cost']), float(12))
self.assertEqual(resp['Value'], 'testvalue')
# Get mapping
resp = self.runner(
'hashmap mapping get', params=mapping_id)[0]
self.assertEqual(resp['Mapping ID'], mapping_id)
self.assertEqual(float(resp['Cost']), float(12))
# Update mapping
resp = self.runner('hashmap mapping update',
params='--cost 10 {}'.format(mapping_id))[0]
self.assertEqual(float(resp['Cost']), float(10))
def test_create_get_update_delete_mapping_field_started(self):
resp = self.runner('hashmap service create',
params='testservice_date_started')[0]
service_id = resp['Service ID']
self._services.append(service_id)
resp = self.runner(
'hashmap field create',
params='{} testfield_date_started'.format(service_id))[0]
field_id = resp['Field ID']
self._fields.append(field_id)
# Create mapping
resp = self.runner(
'hashmap mapping create',
params=f'--field-id {field_id} 12 --value '
f'testvalue')[0]
mapping_id = resp['Mapping ID']
self._mappings.append(service_id)
self.assertEqual(resp['Field ID'], field_id)
self.assertEqual(float(resp['Cost']), float(12))
self.assertEqual(resp['Value'], 'testvalue')
# Get mapping
resp = self.runner(
'hashmap mapping get', params=mapping_id)[0]
self.assertEqual(resp['Mapping ID'], mapping_id)
self.assertEqual(float(resp['Cost']), float(12))
# Should not be able to update a rule that is running (start < now)
try:
self.runner('hashmap mapping update',
params='--cost 10 {}'.format(mapping_id))[0]
except RuntimeError as e:
expected_error = ("You are allowed to update only the attribute "
"[end] as this rule is already running as it "
"started on ")
self.assertIn(expected_error, str(e))
def test_group_mappings_get(self):
# Service and group
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
resp = self.runner('hashmap group create', params='testgroup')[0]
group_id = resp['Group ID']
self._groups.append(group_id)
# Create service mapping bleonging to testgroup
resp = self.runner(
'hashmap mapping create',
params='-s {} -g {} 12'.format(service_id, group_id))[0]
mapping_id = resp['Mapping ID']
self._mappings.append(mapping_id)
resp = self.runner('hashmap group mappings get', params=group_id)[0]
self.assertEqual(resp['Group ID'], group_id)
self.assertEqual(float(resp['Cost']), float(12))
def test_create_get_update_delete_threshold_service(self):
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
# Create threshold
resp = self.runner('hashmap threshold create',
params='-s {} 12 0.9'.format(service_id))[0]
threshold_id = resp['Threshold ID']
self._thresholds.append(threshold_id)
self.assertEqual(resp['Service ID'], service_id)
self.assertEqual(float(resp['Level']), float(12))
self.assertEqual(float(resp['Cost']), float(0.9))
# Get threshold
resp_with_sid = self.runner(
'hashmap threshold list', params='-s {}'.format(service_id))[0]
resp_with_tid = self.runner(
'hashmap threshold get', params=threshold_id)[0]
self.assertEqual(resp_with_sid, resp_with_tid)
self.assertEqual(resp_with_sid['Threshold ID'], threshold_id)
self.assertEqual(resp_with_sid['Service ID'], service_id)
self.assertEqual(float(resp_with_sid['Level']), float(12))
self.assertEqual(float(resp_with_sid['Cost']), float(0.9))
# Update threshold
resp = self.runner('hashmap threshold update',
params='--cost 10 {}'.format(threshold_id))[0]
self.assertEqual(float(resp['Cost']), float(10))
# Check that deletion works
self.runner(
'hashmap threshold delete', params=threshold_id, has_output=False)
resp = self.runner(
'hashmap threshold list', params='-s {}'.format(service_id))
self.assertEqual(len(resp), 0)
def test_create_get_update_delete_threshold_field(self):
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
resp = self.runner('hashmap field create',
params='{} testfield'.format(service_id))[0]
field_id = resp['Field ID']
self._fields.append(field_id)
# Create threshold
resp = self.runner(
'hashmap threshold create',
params='--field-id {} 12 0.9'.format(field_id))[0]
threshold_id = resp['Threshold ID']
self._thresholds.append(service_id)
self.assertEqual(resp['Field ID'], field_id)
self.assertEqual(float(resp['Level']), float(12))
self.assertEqual(float(resp['Cost']), float(0.9))
# Get threshold
resp = self.runner('hashmap threshold get', params=threshold_id)[0]
self.assertEqual(resp['Threshold ID'], threshold_id)
self.assertEqual(float(resp['Level']), float(12))
self.assertEqual(float(resp['Cost']), float(0.9))
# Update threshold
resp = self.runner('hashmap threshold update',
params='--cost 10 {}'.format(threshold_id))[0]
self.assertEqual(float(resp['Cost']), float(10))
def test_group_thresholds_get(self):
# Service and group
resp = self.runner('hashmap service create', params='testservice')[0]
service_id = resp['Service ID']
self._services.append(service_id)
resp = self.runner('hashmap group create', params='testgroup')[0]
group_id = resp['Group ID']
self._groups.append(group_id)
# Create service threshold bleonging to testgroup
resp = self.runner(
'hashmap threshold create',
params='-s {} -g {} 12 0.9'.format(service_id, group_id))[0]
threshold_id = resp['Threshold ID']
self._thresholds.append(threshold_id)
resp = self.runner('hashmap group thresholds get', params=group_id)[0]
self.assertEqual(resp['Group ID'], group_id)
self.assertEqual(float(resp['Level']), float(12))
self.assertEqual(float(resp['Cost']), float(0.9))
class OSCHashmapTest(CkHashmapTest):
def __init__(self, *args, **kwargs):
super(OSCHashmapTest, self).__init__(*args, **kwargs)
self.runner = self.openstack