Add OSC plugin for the service management API

Depends-On: I3e9a62327653ea1dc9b5807f50f250c739c1566d
Change-Id: I49825e80935b357a06b0074c024390cd2ed3a9b9
Implements: blueprint karbor-service-management
This commit is contained in:
Jiao Pengju
2017-10-26 11:05:34 +08:00
parent 50927b5c38
commit 87058d3e24
3 changed files with 203 additions and 0 deletions

View File

@@ -0,0 +1,99 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Data protection V1 os-services action implementations"""
from osc_lib.command import command
from osc_lib import utils as osc_utils
from oslo_log import log as logging
from karborclient.i18n import _
class ListServices(command.Lister):
_description = _("List services.")
log = logging.getLogger(__name__ + ".ListServices")
def get_parser(self, prog_name):
parser = super(ListServices, self).get_parser(prog_name)
parser.add_argument(
'--host',
metavar='<host>',
help=_('Filter results by host'),
)
parser.add_argument(
'--binary',
metavar='<binary>',
help=_('Filter results by binary'),
)
return parser
def take_action(self, parsed_args):
self.log.debug("take_action(%s)", parsed_args)
data_protection_client = self.app.client_manager.data_protection
data = data_protection_client.services.list(
host=parsed_args.host,
binary=parsed_args.binary
)
column_headers = ["Id", "Binary", "Host", "Status", "State",
"Updated_at", "Disabled Reason"]
return (column_headers,
(osc_utils.get_item_properties(
s, column_headers
) for s in data))
class EnableService(command.ShowOne):
_description = _('Enable service')
def get_parser(self, prog_name):
parser = super(EnableService, self).get_parser(prog_name)
parser.add_argument(
'service_id',
metavar='<service_id>',
help=_('The ID of the service.')
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.data_protection
service = client.services.enable(parsed_args.service_id)
return zip(*sorted(service._info.items()))
class DisableService(command.ShowOne):
_description = _('Disable service')
def get_parser(self, prog_name):
parser = super(DisableService, self).get_parser(prog_name)
parser.add_argument(
'service_id',
metavar='<service_id>',
help=_('The ID of the service.'),
)
parser.add_argument(
'--reason',
metavar='<reason>',
help=_('Reason for disabling the service.')
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.data_protection
if parsed_args.reason:
service = client.services.disable_log_reason(
parsed_args.service_id, parsed_args.reason)
else:
service = client.services.disable(parsed_args.service_id)
return zip(*sorted(service._info.items()))

View File

@@ -0,0 +1,101 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from karborclient.osc.v1 import services as osc_services
from karborclient.tests.unit.osc.v1 import fakes
from karborclient.v1 import services
SERVICE_INFO = {
"status": "enabled",
"binary": "karbor-operationengine",
"state": "up",
"updated_at": "2017-10-25T07:06:58.000000",
"host": "fake_host",
"disabled_reason": None,
"id": 1
}
class TestServices(fakes.TestDataProtection):
def setUp(self):
super(TestServices, self).setUp()
self.services_mock = self.app.client_manager.data_protection.services
self.services_mock.reset_mock()
class TestListServices(TestServices):
def setUp(self):
super(TestListServices, self).setUp()
self.services_mock.list.return_value = [
services.Service(None, copy.deepcopy(SERVICE_INFO))]
self.cmd = osc_services.ListServices(self.app, None)
def test_services_list(self):
arg_list = ['--host', 'fake_host']
verify_list = [('host', 'fake_host')]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
columns, data = self.cmd.take_action(parsed_args)
expected_columns = (["Id", "Binary", "Host", "Status", "State",
"Updated_at", "Disabled Reason"])
self.assertEqual(expected_columns, columns)
expected_data = [(1,
"karbor-operationengine",
"fake_host",
"enabled",
"up",
"2017-10-25T07:06:58.000000",
None
)]
self.assertEqual(expected_data, list(data))
class TestEnableService(TestServices):
def setUp(self):
super(TestEnableService, self).setUp()
self.services_mock.enable.return_value = services.Service(
None, copy.deepcopy(SERVICE_INFO))
self.cmd = osc_services.EnableService(self.app, None)
def test_enable_service(self):
arg_list = ['1']
verify_list = [('service_id', '1')]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
self.cmd.take_action(parsed_args)
self.services_mock.enable.assert_called_once_with('1')
class TestDisableService(TestServices):
def setUp(self):
super(TestDisableService, self).setUp()
self.services_mock.disable.return_value = services.Service(
None, copy.deepcopy(SERVICE_INFO))
self.services_mock.disable_log_reason.return_value = services.Service(
None, copy.deepcopy(SERVICE_INFO))
self.cmd = osc_services.DisableService(self.app, None)
def test_disable_service(self):
arg_list = ['1']
verify_list = [('service_id', '1')]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
self.cmd.take_action(parsed_args)
self.services_mock.disable.assert_called_once_with('1')
def test_disable_service_with_reason(self):
arg_list = ['1', '--reason', 'fake_reason']
verify_list = [('service_id', '1'), ('reason', 'fake_reason')]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
self.cmd.take_action(parsed_args)
self.services_mock.disable_log_reason.assert_called_once_with(
'1', 'fake_reason')

View File

@@ -66,6 +66,9 @@ openstack.data_protection.v1 =
data_protection_verification_list = karborclient.osc.v1.verifications:ListVerifications
data_protection_verification_show = karborclient.osc.v1.verifications:ShowVerification
data_protection_verification_create = karborclient.osc.v1.verifications:CreateVerification
data_protection_service_list = karborclient.osc.v1.services:ListServices
data_protection_service_enable = karborclient.osc.v1.services:EnableService
data_protection_service_disable = karborclient.osc.v1.services:DisableService
[compile_catalog]
directory = karborclient/locale