Add amphora delete command

This patch adds the "openstack loadbalancer amphora delete" command.

Depends-On: https://review.opendev.org/718293
Change-Id: I415d2438ddfe994631ef02637c216f4579dd81a1
This commit is contained in:
Michael Johnson 2020-04-07 22:56:08 -07:00
parent 7408e1dc50
commit 4ca156a5fc
8 changed files with 99 additions and 2 deletions

View File

@ -764,6 +764,20 @@ class OctaviaAPI(api.BaseAPI):
return response return response
@correct_return_codes
def amphora_delete(self, amphora_id):
"""Delete an amphora
:param string amphora_id:
The ID of the amphora to delete
:return:
Response Code from the API
"""
url = const.BASE_SINGLE_AMPHORA_URL.format(uuid=amphora_id)
response = self._delete(url)
return response
@correct_return_codes @correct_return_codes
def amphora_failover(self, amphora_id): def amphora_failover(self, amphora_id):
"""Force failover an amphorae """Force failover an amphorae

View File

@ -251,3 +251,34 @@ class ShowAmphoraStats(command.ShowOne):
return (rows, (utils.get_dict_properties( return (rows, (utils.get_dict_properties(
total_stats, rows, formatters={}))) total_stats, rows, formatters={})))
class DeleteAmphora(command.Command):
"""Delete a amphora"""
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
'amphora_id',
metavar='<amphora-id>',
help='UUID of the amphora to delete.',
)
parser.add_argument(
'--wait',
action='store_true',
help='Wait for action to complete',
)
return parser
def take_action(self, parsed_args):
self.app.client_manager.load_balancer.amphora_delete(
amphora_id=parsed_args.amphora_id)
if parsed_args.wait:
v2_utils.wait_for_delete(
status_f=self.app.client_manager.load_balancer.amphora_show,
res_id=parsed_args.amphora_id, status_field=const.STATUS
)

View File

@ -357,6 +357,7 @@ AVAILABILITYZONEPROFILE_COLUMNS = (
) )
PROVISIONING_STATUS = 'provisioning_status' PROVISIONING_STATUS = 'provisioning_status'
STATUS = 'status'
# TCP/UDP port min/max # TCP/UDP port min/max
MIN_PORT_NUMBER = 1 MIN_PORT_NUMBER = 1

View File

@ -616,7 +616,8 @@ def wait_for_active(status_f, res_id):
message="The resource did not successfully reach ACTIVE status.") message="The resource did not successfully reach ACTIVE status.")
def wait_for_delete(status_f, res_id): def wait_for_delete(status_f, res_id,
status_field=constants.PROVISIONING_STATUS):
class Getter(object): class Getter(object):
@staticmethod @staticmethod
def get(id): def get(id):
@ -626,7 +627,7 @@ def wait_for_delete(status_f, res_id):
success = utils.wait_for_delete( success = utils.wait_for_delete(
manager=Getter, manager=Getter,
res_id=res_id, res_id=res_id,
status_field=constants.PROVISIONING_STATUS, status_field=status_field,
sleep_time=3 sleep_time=3
) )
if not success: if not success:

View File

@ -969,6 +969,15 @@ class TestLoadBalancer(TestOctaviaClient):
self.api.amphora_configure, self.api.amphora_configure,
FAKE_AMP) FAKE_AMP)
def test_delete_amphora(self):
self.requests_mock.register_uri(
'DELETE',
FAKE_OCTAVIA_URL + 'amphorae/' + FAKE_AMP,
status_code=200
)
ret = self.api.amphora_delete(FAKE_AMP)
self.assertEqual(200, ret.status_code)
def test_stats_show_amphora(self): def test_stats_show_amphora(self):
self.requests_mock.register_uri( self.requests_mock.register_uri(
'GET', 'GET',

View File

@ -283,3 +283,39 @@ class TestAmphoraStatsShow(TestAmphora):
column_idx = columns.index('bytes_in') column_idx = columns.index('bytes_in')
bytes_in = self.stats[listener_id] bytes_in = self.stats[listener_id]
self.assertEqual(data[column_idx], bytes_in) self.assertEqual(data[column_idx], bytes_in)
class TestAmphoraDelete(TestAmphora):
def setUp(self):
super(TestAmphoraDelete, self).setUp()
self.cmd = amphora.DeleteAmphora(self.app, None)
def test_amphora_delete(self):
arglist = [self._amp.id]
verifylist = [
('amphora_id', self._amp.id)
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.api_mock.amphora_delete.assert_called_with(
amphora_id=self._amp.id)
@mock.patch('osc_lib.utils.wait_for_delete')
def test_amphora_delete_wait(self, mock_wait):
arglist = [self._amp.id, '--wait']
verifylist = [
('amphora_id', self._amp.id),
('wait', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.cmd.take_action(parsed_args)
self.api_mock.amphora_delete.assert_called_with(
amphora_id=self._amp.id)
mock_wait.assert_called_once_with(
manager=mock.ANY,
res_id=self._amp.id,
sleep_time=mock.ANY,
status_field='status')

View File

@ -0,0 +1,4 @@
---
features:
- |
Added the amphora delete command.

View File

@ -84,6 +84,7 @@ openstack.load_balancer.v2 =
loadbalancer_amphora_list = octaviaclient.osc.v2.amphora:ListAmphora loadbalancer_amphora_list = octaviaclient.osc.v2.amphora:ListAmphora
loadbalancer_amphora_show = octaviaclient.osc.v2.amphora:ShowAmphora loadbalancer_amphora_show = octaviaclient.osc.v2.amphora:ShowAmphora
loadbalancer_amphora_configure = octaviaclient.osc.v2.amphora:ConfigureAmphora loadbalancer_amphora_configure = octaviaclient.osc.v2.amphora:ConfigureAmphora
loadbalancer_amphora_delete = octaviaclient.osc.v2.amphora:DeleteAmphora
loadbalancer_amphora_failover = octaviaclient.osc.v2.amphora:FailoverAmphora loadbalancer_amphora_failover = octaviaclient.osc.v2.amphora:FailoverAmphora
loadbalancer_amphora_stats_show = octaviaclient.osc.v2.amphora:ShowAmphoraStats loadbalancer_amphora_stats_show = octaviaclient.osc.v2.amphora:ShowAmphoraStats
loadbalancer_provider_list = octaviaclient.osc.v2.provider:ListProvider loadbalancer_provider_list = octaviaclient.osc.v2.provider:ListProvider