Add support cnf auto heal and scale

Support container based VNF AutoHeal and AutoScale operation with
External Monitoring Tools.

Add the Fault Management interfaces and CLI to support AutoHeal.
Add the Performance Management interfaces and CLI to support
AutoScale. The Fault Management and Performance Management
interfaces are based on ETSI NFV-SOL 002 v3.3.1 and ETSI NFV-SOL
003 v3.3.1, which are Version "2.0.0" API of Tacker. Add the
Prometheus Plugin that has a interface between tacker and the
External Monitoring Tool.

Implements: blueprint support-auto-lcm
Change-Id: I7023a72b73f17f3746c74d311919497c7c4e8b3f
This commit is contained in:
Koji Shimizu 2022-08-24 08:54:28 +00:00 committed by Ayumu Ueha
parent 3da27cc89a
commit 64d7a87670
21 changed files with 2756 additions and 0 deletions

View File

@ -130,3 +130,16 @@ openstack.tackerclient.v2 =
vnflcm_subsc_list = tackerclient.osc.v1.vnflcm.vnflcm_subsc:ListLccnSubscription
vnflcm_subsc_show = tackerclient.osc.v1.vnflcm.vnflcm_subsc:ShowLccnSubscription
vnflcm_versions = tackerclient.osc.common.vnflcm.vnflcm_versions:VnfLcmVersions
vnfpm_job_create = tackerclient.osc.v2.vnfpm.vnfpm_job:CreateVnfPmJob
vnfpm_job_list = tackerclient.osc.v2.vnfpm.vnfpm_job:ListVnfPmJob
vnfpm_job_show = tackerclient.osc.v2.vnfpm.vnfpm_job:ShowVnfPmJob
vnfpm_job_update = tackerclient.osc.v2.vnfpm.vnfpm_job:UpdateVnfPmJob
vnfpm_job_delete = tackerclient.osc.v2.vnfpm.vnfpm_job:DeleteVnfPmJob
vnfpm_report_show = tackerclient.osc.v2.vnfpm.vnfpm_report:ShowVnfPmReport
vnffm_alarm_list = tackerclient.osc.v2.vnffm.vnffm_alarm:ListVnfFmAlarm
vnffm_alarm_show = tackerclient.osc.v2.vnffm.vnffm_alarm:ShowVnfFmAlarm
vnffm_alarm_update = tackerclient.osc.v2.vnffm.vnffm_alarm:UpdateVnfFmAlarm
vnffm_sub_create = tackerclient.osc.v2.vnffm.vnffm_sub:CreateVnfFmSub
vnffm_sub_list = tackerclient.osc.v2.vnffm.vnffm_sub:ListVnfFmSub
vnffm_sub_show = tackerclient.osc.v2.vnffm.vnffm_sub:ShowVnfFmSub
vnffm_sub_delete = tackerclient.osc.v2.vnffm.vnffm_sub:DeleteVnfFmSub

View File

@ -207,6 +207,10 @@ class InvalidInput(TackerClientException):
message = _("Invalid input: %(reason)s")
class EmptyInput(TackerClientException):
message = _("Empty input: %(reason)s")
class UnsupportedCommandVersion(TackerClientException):
message = _("This command is not supported in version %(version)s")

View File

View File

View File

@ -0,0 +1,62 @@
{
"filter": {
"vnfInstanceSubscriptionFilter": {
"vnfdIds": [
"dummy-vnfdId-1"
],
"vnfProductsFromProviders": [
{
"vnfProvider": "dummy-vnfProvider-1",
"vnfProducts": [
{
"vnfProductName": "dummy-vnfProductName-1-1",
"versions": [
{
"vnfSoftwareVersion": 1.0,
"vnfdVersions": [1.0, 2.0]
}
]
}
]
}
],
"vnfInstanceIds": [
"dummy-vnfInstanceId-1"
],
"vnfInstanceNames": [
"dummy-vnfInstanceName-1"
]
},
"notificationTypes": [
"AlarmNotification"
],
"faultyResourceTypes": [
"COMPUTE"
],
"perceivedSeverities": [
"WARNING"
],
"eventTypes": [
"EQUIPMENT_ALARM"
],
"probableCauses": [
"The server cannot be connected."
]
},
"callbackUri": "/nfvo/notify/alarm",
"authentication": {
"authType": [
"BASIC",
"OAUTH2_CLIENT_CREDENTIALS"
],
"paramsBasic": {
"userName": "nfvo",
"password": "nfvopwd"
},
"paramsOauth2ClientCredentials": {
"clientId": "auth_user_name",
"clientPassword": "auth_password",
"tokenEndpoint": "token_endpoint"
}
}
}

View File

@ -0,0 +1,177 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from osc_lib.command import command
from osc_lib import utils
from tackerclient.i18n import _
from tackerclient.osc import sdk_utils
from tackerclient.osc import utils as tacker_osc_utils
LOG = logging.getLogger(__name__)
_ATTR_MAP = (
('id', 'ID', tacker_osc_utils.LIST_BOTH),
('managedObjectId', 'Managed Object Id', tacker_osc_utils.LIST_BOTH),
('ackState', 'Ack State', tacker_osc_utils.LIST_BOTH),
('eventType', 'Event Type', tacker_osc_utils.LIST_BOTH),
('perceivedSeverity', 'Perceived Severity', tacker_osc_utils.LIST_BOTH),
('probableCause', 'Probable Cause', tacker_osc_utils.LIST_BOTH)
)
_FORMATTERS = {
'vnfcInstanceIds': tacker_osc_utils.FormatComplexDataColumn,
'rootCauseFaultyResource': tacker_osc_utils.FormatComplexDataColumn,
'correlatedAlarmIds': tacker_osc_utils.FormatComplexDataColumn,
'faultDetails': tacker_osc_utils.FormatComplexDataColumn,
'_links': tacker_osc_utils.FormatComplexDataColumn
}
_MIXED_CASE_FIELDS = (
'managedObjectId', 'rootCauseFaultyResource', 'vnfcInstanceIds',
'alarmRaisedTime', 'alarmChangedTime', 'alarmClearedTime',
'alarmAcknowledgedTime', 'ackState', 'perceivedSeverity', 'eventTime',
'eventType', 'faultType', 'probableCause', 'isRootCause',
'correlatedAlarmIds', 'faultDetails'
)
_VNF_FM_ALARM_ID = 'vnf_fm_alarm_id'
def _get_columns(vnffm_alarm_obj, action=None):
if action == 'update':
column_map = {
'ackState': 'Ack State'
}
else:
column_map = {
'id': 'ID',
'managedObjectId': 'Managed Object Id',
'ackState': 'Ack State',
'perceivedSeverity': 'Perceived Severity',
'eventType': 'Event Type',
'probableCause': 'Probable Cause'
}
if action == 'show':
column_map.update({
'vnfcInstanceIds': 'Vnfc Instance Ids',
'rootCauseFaultyResource': 'Root Cause Faulty Resource',
'alarmRaisedTime': 'Alarm Raised Time',
'alarmChangedTime': 'Alarm Changed Time',
'alarmClearedTime': 'Alarm Cleared Time',
'alarmAcknowledgedTime': 'Alarm Acknowledged Time',
'eventTime': 'Event Time',
'faultType': 'Fault Type',
'isRootCause': 'Is Root Cause',
'correlatedAlarmIds': 'Correlated Alarm Ids',
'faultDetails': 'Fault Details',
'_links': 'Links'
})
return sdk_utils.get_osc_show_columns_for_sdk_resource(
vnffm_alarm_obj, column_map)
class ListVnfFmAlarm(command.Lister):
_description = _("List VNF FM alarms")
def get_parser(self, prog_name):
LOG.debug('get_parser(%s)', prog_name)
parser = super(ListVnfFmAlarm, self).get_parser(prog_name)
parser.add_argument(
"--filter",
metavar="<filter>",
help=_("Attribute-based-filtering parameters"),
)
return parser
def take_action(self, parsed_args):
_params = {}
if parsed_args.filter:
_params['filter'] = parsed_args.filter
client = self.app.client_manager.tackerclient
data = client.list_vnf_fm_alarms(**_params)
headers, columns = tacker_osc_utils.get_column_definitions(
_ATTR_MAP, long_listing=True)
return (headers,
(utils.get_dict_properties(
s, columns, formatters=_FORMATTERS,
mixed_case_fields=_MIXED_CASE_FIELDS,
) for s in data['vnf_fm_alarms']))
class ShowVnfFmAlarm(command.ShowOne):
_description = _("Display VNF FM alarm details")
def get_parser(self, prog_name):
parser = super(ShowVnfFmAlarm, self).get_parser(prog_name)
parser.add_argument(
_VNF_FM_ALARM_ID,
metavar="<vnf-fm-alarm-id>",
help=_("VNF FM alarm ID to display"))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
obj = client.show_vnf_fm_alarm(parsed_args.vnf_fm_alarm_id)
display_columns, columns = _get_columns(obj, action='show')
data = utils.get_item_properties(
sdk_utils.DictModel(obj), columns,
mixed_case_fields=_MIXED_CASE_FIELDS,
formatters=_FORMATTERS)
return (display_columns, data)
class UpdateVnfFmAlarm(command.ShowOne):
_description = _("Update information about an individual VNF FM alarm")
def get_parser(self, prog_name):
LOG.debug('get_parser(%s)', prog_name)
parser = super(UpdateVnfFmAlarm, self).get_parser(prog_name)
parser.add_argument(
_VNF_FM_ALARM_ID,
metavar="<vnf-fm-alarm-id>",
help=_("VNF FM alarm ID to update.")
)
update_require_parameters = parser.add_argument_group(
"require arguments"
)
update_require_parameters.add_argument(
"--ack-state",
metavar="<ack-state>",
choices=['ACKNOWLEDGED', 'UNACKNOWLEDGED'],
help=_("Ask state can be 'ACKNOWLEDGED' or 'UNACKNOWLEDGED'."))
return parser
def args2body(self, parsed_args):
body = {'ackState': parsed_args.ack_state}
return body
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
updated_values = client.update_vnf_fm_alarm(
parsed_args.vnf_fm_alarm_id, self.args2body(parsed_args))
display_columns, columns = _get_columns(
updated_values, action='update')
data = utils.get_item_properties(
sdk_utils.DictModel(updated_values), columns,
mixed_case_fields=_MIXED_CASE_FIELDS,
formatters=_FORMATTERS)
return (display_columns, data)

View File

@ -0,0 +1,197 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
from osc_lib.command import command
from osc_lib import utils
from tackerclient.common import exceptions
from tackerclient.i18n import _
from tackerclient.osc import sdk_utils
from tackerclient.osc import utils as tacker_osc_utils
LOG = logging.getLogger(__name__)
_ATTR_MAP = (
('id', 'ID', tacker_osc_utils.LIST_BOTH),
('callbackUri', 'Callback Uri', tacker_osc_utils.LIST_BOTH)
)
_FORMATTERS = {
'filter': tacker_osc_utils.FormatComplexDataColumn,
'_links': tacker_osc_utils.FormatComplexDataColumn
}
_MIXED_CASE_FIELDS = (
'callbackUri'
)
_VNF_FM_SUB_ID = 'vnf_fm_sub_id'
def _get_columns(vnffm_sub_obj):
column_map = {
'id': 'ID',
'filter': 'Filter',
'callbackUri': 'Callback Uri',
'_links': 'Links'
}
return sdk_utils.get_osc_show_columns_for_sdk_resource(
vnffm_sub_obj, column_map)
def jsonfile2body(file_path):
if file_path is None:
msg = _("File %s does not exist")
reason = msg % file_path
raise exceptions.InvalidInput(reason=reason)
if os.access(file_path, os.R_OK) is False:
msg = _("User does not have read privileges to it")
raise exceptions.InvalidInput(reason=msg)
try:
with open(file_path) as f:
body = json.load(f)
except (IOError, ValueError) as ex:
msg = _("Failed to load parameter file. Error: %s")
reason = msg % ex
raise exceptions.InvalidInput(reason=reason)
if not body:
reason = _('The parameter file is empty')
raise exceptions.EmptyInput(reason=reason)
return body
class CreateVnfFmSub(command.ShowOne):
_description = _("Create a new VNF FM subscription")
def get_parser(self, prog_name):
parser = super(CreateVnfFmSub, self).get_parser(prog_name)
parser.add_argument(
'request_file',
metavar="<param-file>",
help=_('Specify create VNF FM subscription request '
'parameters in a json file.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
vnf_fm_sub = client.create_vnf_fm_sub(
jsonfile2body(parsed_args.request_file))
display_columns, columns = _get_columns(vnf_fm_sub)
data = utils.get_item_properties(
sdk_utils.DictModel(vnf_fm_sub), columns,
formatters=_FORMATTERS, mixed_case_fields=_MIXED_CASE_FIELDS)
return (display_columns, data)
class ListVnfFmSub(command.Lister):
_description = _("List VNF FM subs")
def get_parser(self, prog_name):
LOG.debug('get_parser(%s)', prog_name)
parser = super(ListVnfFmSub, self).get_parser(prog_name)
parser.add_argument(
"--filter",
metavar="<filter>",
help=_("Attribute-based-filtering parameters"),
)
return parser
def take_action(self, parsed_args):
_params = {}
if parsed_args.filter:
_params['filter'] = parsed_args.filter
client = self.app.client_manager.tackerclient
data = client.list_vnf_fm_subs(**_params)
headers, columns = tacker_osc_utils.get_column_definitions(
_ATTR_MAP, long_listing=True)
return (headers,
(utils.get_dict_properties(
s, columns, formatters=_FORMATTERS,
mixed_case_fields=_MIXED_CASE_FIELDS,
) for s in data['vnf_fm_subs']))
class ShowVnfFmSub(command.ShowOne):
_description = _("Display VNF FM subscription details")
def get_parser(self, prog_name):
parser = super(ShowVnfFmSub, self).get_parser(prog_name)
parser.add_argument(
_VNF_FM_SUB_ID,
metavar="<vnf-fm-sub-id>",
help=_("VNF FM subscription ID to display"))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
obj = client.show_vnf_fm_sub(parsed_args.vnf_fm_sub_id)
display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(
sdk_utils.DictModel(obj), columns,
mixed_case_fields=_MIXED_CASE_FIELDS,
formatters=_FORMATTERS)
return (display_columns, data)
class DeleteVnfFmSub(command.Command):
_description = _("Delete VNF FM subscription(s)")
def get_parser(self, prog_name):
parser = super(DeleteVnfFmSub, self).get_parser(prog_name)
parser.add_argument(
_VNF_FM_SUB_ID,
metavar="<vnf-fm-sub-id>",
nargs="+",
help=_("VNF FM subscription ID(s) to delete"))
return parser
def take_action(self, parsed_args):
error_count = 0
client = self.app.client_manager.tackerclient
vnf_fm_sub_ids = parsed_args.vnf_fm_sub_id
for sub_id in vnf_fm_sub_ids:
try:
client.delete_vnf_fm_sub(sub_id)
except Exception as e:
error_count += 1
LOG.error(_("Failed to delete VNF FM subscription with "
"ID '%(sub_id)s': %(e)s"),
{'sub_id': sub_id, 'e': e})
total = len(vnf_fm_sub_ids)
if error_count > 0:
msg = (_("Failed to delete %(error_count)s of %(total)s "
"VNF FM subscriptions.") % {'error_count': error_count,
'total': total})
raise exceptions.CommandError(message=msg)
if total > 1:
print(_('All specified VNF FM subscriptions are deleted '
'successfully'))
else:
print(_("VNF FM subscription '%s' deleted "
"successfully") % vnf_fm_sub_ids[0])

View File

View File

@ -0,0 +1,36 @@
{
"objectType": "VNFC",
"objectInstanceIds": [
"object-instance-id-1"
],
"subObjectInstanceIds": [
"sub-object-instance-id-2"
],
"criteria": {
"performanceMetric": [
"VCpuUsageMeanVnf.object-instance-id-1"
],
"performanceMetricGroup": [
"VirtualisedComputeResource"
],
"collectionPeriod": "500",
"reportingPeriod": "1000",
"reportingBoundary": "2022/07/25 10:43:55"
},
"callbackUri": "/nfvo/notify/job",
"authentication": {
"authType": [
"BASIC",
"OAUTH2_CLIENT_CREDENTIALS"
],
"paramsBasic": {
"userName": "nfvo",
"password": "nfvopwd"
},
"paramsOauth2ClientCredentials": {
"clientId": "auth_user_name",
"clientPassword": "auth_password",
"tokenEndpoint": "token_endpoint"
}
}
}

View File

@ -0,0 +1,18 @@
{
"callbackUri": "/nfvo/notify/job",
"authentication": {
"authType": [
"BASIC",
"OAUTH2_CLIENT_CREDENTIALS"
],
"paramsBasic": {
"userName": "nfvo",
"password": "nfvopwd"
},
"paramsOauth2ClientCredentials": {
"clientId": "auth_user_name",
"clientPassword": "auth_password",
"tokenEndpoint": "token_endpoint"
}
}
}

View File

@ -0,0 +1,334 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import os
from functools import reduce
from osc_lib.command import command
from osc_lib import utils
from tackerclient.common import exceptions
from tackerclient.i18n import _
from tackerclient.osc import sdk_utils
from tackerclient.osc import utils as tacker_osc_utils
LOG = logging.getLogger(__name__)
_FORMATTERS = {
'objectInstanceIds': tacker_osc_utils.FormatComplexDataColumn,
'subObjectInstanceIds': tacker_osc_utils.FormatComplexDataColumn,
'criteria': tacker_osc_utils.FormatComplexDataColumn,
'reports': tacker_osc_utils.FormatComplexDataColumn,
'_links': tacker_osc_utils.FormatComplexDataColumn
}
_FORMATTERS_UPDATE = {
'authentication': tacker_osc_utils.FormatComplexDataColumn
}
_MIXED_CASE_FIELDS = (
'objectType', 'objectInstanceIds', 'subObjectInstanceIds', 'callbackUri'
)
_MIXED_CASE_FIELDS_UPDATE = (
'callbackUri'
)
_VNF_PM_JOB_ID = 'vnf_pm_job_id'
def _get_columns(vnfpm_job_obj, action=None):
if action == 'update':
column_map = {
'callbackUri': 'Callback Uri',
'authentication': 'Authentication'
}
else:
column_map = {
'id': 'ID',
'objectType': 'Object Type',
'objectInstanceIds': 'Object Instance Ids',
'subObjectInstanceIds': 'Sub Object Instance Ids',
'criteria': 'Criteria',
'callbackUri': 'Callback Uri',
'reports': 'Reports',
'_links': 'Links'
}
if action == 'show':
column_map.update(
{'reports': 'Reports'}
)
return sdk_utils.get_osc_show_columns_for_sdk_resource(
vnfpm_job_obj, column_map)
def jsonfile2body(file_path):
if file_path is None:
msg = _("File %s does not exist")
reason = msg % file_path
raise exceptions.InvalidInput(reason=reason)
if os.access(file_path, os.R_OK) is False:
msg = _("User does not have read privileges to it")
raise exceptions.InvalidInput(reason=msg)
try:
with open(file_path) as f:
body = json.load(f)
except (IOError, ValueError) as ex:
msg = _("Failed to load parameter file. Error: %s")
reason = msg % ex
raise exceptions.InvalidInput(reason=reason)
if not body:
reason = _('The parameter file is empty')
raise exceptions.EmptyInput(reason=reason)
return body
class CreateVnfPmJob(command.ShowOne):
_description = _("Create a new VNF PM job")
def get_parser(self, prog_name):
parser = super(CreateVnfPmJob, self).get_parser(prog_name)
parser.add_argument(
'request_file',
metavar="<param-file>",
help=_('Specify create VNF PM job request '
'parameters in a json file.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
vnf_pm_job = client.create_vnf_pm_job(
jsonfile2body(parsed_args.request_file))
display_columns, columns = _get_columns(vnf_pm_job)
data = utils.get_item_properties(
sdk_utils.DictModel(vnf_pm_job), columns,
formatters=_FORMATTERS, mixed_case_fields=_MIXED_CASE_FIELDS)
return (display_columns, data)
class ListVnfPmJob(command.Lister):
_description = _("List VNF PM jobs")
def get_parser(self, prog_name):
LOG.debug('get_parser(%s)', prog_name)
parser = super(ListVnfPmJob, self).get_parser(prog_name)
parser.add_argument(
"--filter",
metavar="<filter>",
help=_("Attribute-based-filtering parameters"),
)
fields_exclusive_group = parser.add_mutually_exclusive_group(
required=False)
fields_exclusive_group.add_argument(
"--all_fields",
action="store_true",
default=False,
help=_("Include all complex attributes in the response"),
)
fields_exclusive_group.add_argument(
"--fields",
metavar="fields",
help=_("Complex attributes to be included into the response"),
)
fields_exclusive_group.add_argument(
"--exclude_fields",
metavar="exclude-fields",
help=_("Complex attributes to be excluded from the response"),
)
parser.add_argument(
"--exclude_default",
action="store_true",
default=False,
help=_("Indicates to exclude all complex attributes"
" from the response. This argument can be used alone or"
" with --fields and --filter. For all other combinations"
" tacker server will throw bad request error"),
)
return parser
def case_modify(self, field):
return reduce(
lambda x, y: x + (' ' if y.isupper() else '') + y, field).title()
def get_attributes(self, extra_fields=None, all_fields=False,
exclude_fields=None, exclude_default=False):
fields = ['id', 'objectType', '_links']
complex_fields = [
'objectInstanceIds',
'subObjectInstanceIds',
'criteria',
'reports']
simple_fields = ['callbackUri']
if extra_fields:
fields.extend(extra_fields)
if exclude_fields:
fields.extend([field for field in complex_fields
if field not in exclude_fields])
if all_fields:
fields.extend(complex_fields)
fields.extend(simple_fields)
if exclude_default:
fields.extend(simple_fields)
attrs = []
for field in fields:
if field == '_links':
attrs.extend([(field, 'Links', tacker_osc_utils.LIST_BOTH)])
else:
attrs.extend([(field, self.case_modify(field),
tacker_osc_utils.LIST_BOTH)])
return tuple(attrs)
def take_action(self, parsed_args):
_params = {}
extra_fields = []
exclude_fields = []
all_fields = False
exclude_default = False
if parsed_args.filter:
_params['filter'] = parsed_args.filter
if parsed_args.fields:
_params['fields'] = parsed_args.fields
fields = parsed_args.fields.split(',')
for field in fields:
extra_fields.append(field.split('/')[0])
if parsed_args.exclude_fields:
_params['exclude_fields'] = parsed_args.exclude_fields
fields = parsed_args.exclude_fields.split(',')
exclude_fields.extend(fields)
if parsed_args.exclude_default:
_params['exclude_default'] = None
exclude_default = True
if parsed_args.all_fields:
_params['all_fields'] = None
all_fields = True
client = self.app.client_manager.tackerclient
data = client.list_vnf_pm_jobs(**_params)
headers, columns = tacker_osc_utils.get_column_definitions(
self.get_attributes(extra_fields, all_fields, exclude_fields,
exclude_default), long_listing=True)
return (headers,
(utils.get_dict_properties(
s, columns, formatters=_FORMATTERS,
mixed_case_fields=_MIXED_CASE_FIELDS,
) for s in data['vnf_pm_jobs']))
class ShowVnfPmJob(command.ShowOne):
_description = _("Display VNF PM job details")
def get_parser(self, prog_name):
parser = super(ShowVnfPmJob, self).get_parser(prog_name)
parser.add_argument(
_VNF_PM_JOB_ID,
metavar="<vnf-pm-job-id>",
help=_("VNF PM job ID to display"))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
obj = client.show_vnf_pm_job(parsed_args.vnf_pm_job_id)
display_columns, columns = _get_columns(obj, action='show')
data = utils.get_item_properties(
sdk_utils.DictModel(obj), columns,
mixed_case_fields=_MIXED_CASE_FIELDS,
formatters=_FORMATTERS)
return (display_columns, data)
class UpdateVnfPmJob(command.ShowOne):
_description = _("Update information about an individual VNF PM job")
def get_parser(self, prog_name):
LOG.debug('get_parser(%s)', prog_name)
parser = super(UpdateVnfPmJob, self).get_parser(prog_name)
parser.add_argument(
_VNF_PM_JOB_ID,
metavar="<vnf-pm-job-id>",
help=_("VNF PM job ID to update.")
)
parser.add_argument(
'request_file',
metavar="<param-file>",
help=_('Specify update PM job request '
'parameters in a json file.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
updated_values = client.update_vnf_pm_job(
parsed_args.vnf_pm_job_id,
jsonfile2body(parsed_args.request_file))
display_columns, columns = _get_columns(updated_values, 'update')
data = utils.get_item_properties(
sdk_utils.DictModel(updated_values),
columns, formatters=_FORMATTERS_UPDATE,
mixed_case_fields=_MIXED_CASE_FIELDS_UPDATE)
return (display_columns, data)
class DeleteVnfPmJob(command.Command):
_description = _("Delete VNF PM job")
def get_parser(self, prog_name):
parser = super(DeleteVnfPmJob, self).get_parser(prog_name)
parser.add_argument(
_VNF_PM_JOB_ID,
metavar="<vnf-pm-job-id>",
nargs="+",
help=_("VNF PM job ID(s) to delete"))
return parser
def take_action(self, parsed_args):
error_count = 0
client = self.app.client_manager.tackerclient
vnf_pm_job_ids = parsed_args.vnf_pm_job_id
for job_id in vnf_pm_job_ids:
try:
client.delete_vnf_pm_job(job_id)
except Exception as e:
error_count += 1
LOG.error(_("Failed to delete VNF PM job with "
"ID '%(job_id)s': %(e)s"),
{'job_id': job_id, 'e': e})
total = len(vnf_pm_job_ids)
if error_count > 0:
msg = (_("Failed to delete %(error_count)s of %(total)s "
"VNF PM jobs.") % {'error_count': error_count,
'total': total})
raise exceptions.CommandError(message=msg)
if total > 1:
print(_('All specified VNF PM jobs are deleted '
'successfully'))
else:
print(_("VNF PM job '%s' deleted "
"successfully") % vnf_pm_job_ids[0])

View File

@ -0,0 +1,67 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from osc_lib.command import command
from osc_lib import utils
from tackerclient.i18n import _
from tackerclient.osc import sdk_utils
from tackerclient.osc import utils as tacker_osc_utils
LOG = logging.getLogger(__name__)
_FORMATTERS = {
'entries': tacker_osc_utils.FormatComplexDataColumn
}
_VNF_PM_JOB_ID = 'vnf_pm_job_id'
_VNF_PM_REPORT_ID = 'vnf_pm_report_id'
def _get_columns(vnfpm_report_obj):
column_map = {
'entries': 'Entries'
}
return sdk_utils.get_osc_show_columns_for_sdk_resource(
vnfpm_report_obj, column_map)
class ShowVnfPmReport(command.ShowOne):
_description = _("Display VNF PM report details")
def get_parser(self, prog_name):
parser = super(ShowVnfPmReport, self).get_parser(prog_name)
parser.add_argument(
_VNF_PM_JOB_ID,
metavar="<vnf-pm-job-id>",
help=_("VNF PM job id where the VNF PM report is located"))
parser.add_argument(
_VNF_PM_REPORT_ID,
metavar="<vnf-pm-report-id>",
help=_("VNF PM report ID to display"))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.tackerclient
obj = client.show_vnf_pm_report(
parsed_args.vnf_pm_job_id, parsed_args.vnf_pm_report_id)
display_columns, columns = _get_columns(obj)
data = utils.get_item_properties(
sdk_utils.DictModel(obj),
columns, formatters=_FORMATTERS,
mixed_case_fields=None)
return (display_columns, data)

View File

@ -0,0 +1,307 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import os
from oslo_utils.fixture import uuidsentinel
from unittest import mock
from tackerclient.common import exceptions
from tackerclient.osc import utils as tacker_osc_utils
from tackerclient.osc.v2.vnffm import vnffm_alarm
from tackerclient.tests.unit.osc import base
from tackerclient.tests.unit.osc.v1.fixture_data import client
from tackerclient.tests.unit.osc.v2 import vnffm_alarm_fakes
class TestVnfFmAlarm(base.FixturedTestCase):
client_fixture_class = client.ClientFixture
def setUp(self):
super(TestVnfFmAlarm, self).setUp()
self.url = client.TACKER_URL
self.header = {'content-type': 'application/json'}
self.app = mock.Mock()
self.app_args = mock.Mock()
self.client_manager = self.cs
self.app.client_manager.tackerclient = self.client_manager
def _get_columns_vnffm_alarm(action=None):
if action == 'update':
columns = ['Ack State']
else:
columns = ['ID', 'Managed Object Id', 'Ack State',
'Perceived Severity', 'Event Type', 'Probable Cause']
if action == 'show':
columns.extend([
'Vnfc Instance Ids', 'Root Cause Faulty Resource',
'Alarm Raised Time', 'Alarm Changed Time',
'Alarm Cleared Time', 'Alarm Acknowledged Time',
'Event Time', 'Fault Type', 'Is Root Cause',
'Correlated Alarm Ids', 'Fault Details', 'Links'
])
return columns
class TestListVnfFmAlarm(TestVnfFmAlarm):
def setUp(self):
super(TestListVnfFmAlarm, self).setUp()
self.list_vnf_fm_alarms = vnffm_alarm.ListVnfFmAlarm(
self.app, self.app_args, cmd_name='vnffm alarm list')
def test_take_action(self):
vnffm_alarms_obj = vnffm_alarm_fakes.create_vnf_fm_alarms(
count=3)
parsed_args = self.check_parser(self.list_vnf_fm_alarms, [], [])
self.requests_mock.register_uri(
'GET', os.path.join(self.url, 'vnffm/v1/alarms'),
json=vnffm_alarms_obj, headers=self.header)
actual_columns, data = self.list_vnf_fm_alarms.take_action(parsed_args)
_, columns = tacker_osc_utils.get_column_definitions(
vnffm_alarm._ATTR_MAP, long_listing=True)
expected_data = []
for vnffm_alarm_obj_idx in vnffm_alarms_obj:
expected_data.append(vnffm_alarm_fakes.get_vnffm_alarm_data(
vnffm_alarm_obj_idx, columns=columns))
self.assertCountEqual(_get_columns_vnffm_alarm(action='list'),
actual_columns)
self.assertCountEqual(expected_data, list(data))
def test_take_action_with_filter(self):
vnffm_alarms_obj = vnffm_alarm_fakes.create_vnf_fm_alarms(
count=3)
parsed_args = self.check_parser(
self.list_vnf_fm_alarms,
["--filter", '(eq,perceivedSeverity,WARNING)'],
[('filter', '(eq,perceivedSeverity,WARNING)')])
self.requests_mock.register_uri(
'GET', os.path.join(
self.url,
'vnffm/v1/alarms?filter=(eq,perceivedSeverity,WARNING)'),
json=vnffm_alarms_obj, headers=self.header)
actual_columns, data = self.list_vnf_fm_alarms.take_action(parsed_args)
_, columns = tacker_osc_utils.get_column_definitions(
vnffm_alarm._ATTR_MAP, long_listing=True)
expected_data = []
for vnffm_alarm_obj_idx in vnffm_alarms_obj:
expected_data.append(vnffm_alarm_fakes.get_vnffm_alarm_data(
vnffm_alarm_obj_idx, columns=columns))
self.assertCountEqual(_get_columns_vnffm_alarm(action='list'),
actual_columns)
self.assertListItemsEqual(expected_data, list(data))
def test_take_action_with_incorrect_filter(self):
parsed_args = self.check_parser(
self.list_vnf_fm_alarms,
["--filter", '(perceivedSeverity)'],
[('filter', '(perceivedSeverity)')])
url = os.path.join(
self.url, 'vnffm/v1/alarms?filter=(perceivedSeverity)')
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=400, json={})
self.assertRaises(exceptions.TackerClientException,
self.list_vnf_fm_alarms.take_action,
parsed_args)
def test_take_action_internal_server_error(self):
parsed_args = self.check_parser(
self.list_vnf_fm_alarms,
["--filter", '(eq,perceivedSeverity,WARNING)'],
[('filter', '(eq,perceivedSeverity,WARNING)')])
url = os.path.join(
self.url, 'vnffm/v1/alarms?filter=(eq,perceivedSeverity,WARNING)')
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=500, json={})
self.assertRaises(exceptions.TackerClientException,
self.list_vnf_fm_alarms.take_action,
parsed_args)
class TestShowVnfFmAlarm(TestVnfFmAlarm):
def setUp(self):
super(TestShowVnfFmAlarm, self).setUp()
self.show_vnf_fm_alarm = vnffm_alarm.ShowVnfFmAlarm(
self.app, self.app_args, cmd_name='vnffm alarm show')
def test_take_action(self):
"""Test of take_action()"""
vnffm_alarm_obj = vnffm_alarm_fakes.vnf_fm_alarm_response()
arglist = [vnffm_alarm_obj['id']]
verifylist = [('vnf_fm_alarm_id', vnffm_alarm_obj['id'])]
# command param
parsed_args = self.check_parser(
self.show_vnf_fm_alarm, arglist, verifylist)
url = os.path.join(
self.url, 'vnffm/v1/alarms', vnffm_alarm_obj['id'])
self.requests_mock.register_uri(
'GET', url, headers=self.header, json=vnffm_alarm_obj)
columns, _ = (self.show_vnf_fm_alarm.take_action(parsed_args))
self.assertCountEqual(_get_columns_vnffm_alarm(action='show'),
columns)
def test_take_action_vnf_lcm_op_occ_id_not_found(self):
"""Test if vnf-lcm-op-occ-id does not find."""
arglist = [uuidsentinel.vnf_fm_alarm_id]
verifylist = [('vnf_fm_alarm_id', uuidsentinel.vnf_fm_alarm_id)]
# command param
parsed_args = self.check_parser(
self.show_vnf_fm_alarm, arglist, verifylist)
url = os.path.join(
self.url, 'vnffm/v1/alarms', uuidsentinel.vnf_fm_alarm_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=404, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_fm_alarm.take_action,
parsed_args)
def test_take_action_internal_server_error(self):
"""Test for internal server error."""
arglist = [uuidsentinel.vnf_fm_alarm_id]
verifylist = [('vnf_fm_alarm_id', uuidsentinel.vnf_fm_alarm_id)]
# command param
parsed_args = self.check_parser(
self.show_vnf_fm_alarm, arglist, verifylist)
url = os.path.join(
self.url, 'vnffm/v1/alarms', uuidsentinel.vnf_fm_alarm_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=500, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_fm_alarm.take_action,
parsed_args)
@ddt.ddt
class TestUpdateVnfFmAlarm(TestVnfFmAlarm):
def setUp(self):
super(TestUpdateVnfFmAlarm, self).setUp()
self.update_vnf_fm_alarm = vnffm_alarm.UpdateVnfFmAlarm(
self.app, self.app_args, cmd_name='vnffm alarm update')
@ddt.data('ACKNOWLEDGED', 'UNACKNOWLEDGED')
def test_take_action(self, ack_state):
"""Test of take_action()"""
vnffm_alarm_obj = vnffm_alarm_fakes.vnf_fm_alarm_response(
None, 'update')
arg_list = ['--ack-state', ack_state, uuidsentinel.vnf_fm_alarm_id]
verify_list = [('ack_state', ack_state),
('vnf_fm_alarm_id', uuidsentinel.vnf_fm_alarm_id)]
# command param
parsed_args = self.check_parser(
self.update_vnf_fm_alarm, arg_list, verify_list)
url = os.path.join(
self.url, 'vnffm/v1/alarms', uuidsentinel.vnf_fm_alarm_id)
self.requests_mock.register_uri(
'PATCH', url, headers=self.header, json=vnffm_alarm_obj)
actual_columns, data = (
self.update_vnf_fm_alarm.take_action(parsed_args))
expected_columns = _get_columns_vnffm_alarm(action='update')
self.assertCountEqual(expected_columns, actual_columns)
_, columns = vnffm_alarm._get_columns(
vnffm_alarm_obj, action='update')
expected_data = vnffm_alarm_fakes.get_vnffm_alarm_data(
vnffm_alarm_obj, columns=columns)
self.assertEqual(expected_data, data)
@ddt.data('ACKNOWLEDGED')
def test_take_action_vnf_lcm_op_occ_id_not_found(self, ack_state):
"""Test if vnf-lcm-op-occ-id does not find"""
arg_list = ['--ack-state', ack_state, uuidsentinel.vnf_fm_alarm_id]
verify_list = [('ack_state', ack_state),
('vnf_fm_alarm_id', uuidsentinel.vnf_fm_alarm_id)]
# command param
parsed_args = self.check_parser(
self.update_vnf_fm_alarm, arg_list, verify_list)
url = os.path.join(
self.url, 'vnffm/v1/alarms', uuidsentinel.vnf_fm_alarm_id)
self.requests_mock.register_uri(
'PATCH', url, headers=self.header, status_code=404, json={})
self.assertRaises(exceptions.TackerClientException,
self.update_vnf_fm_alarm.take_action,
parsed_args)
@ddt.data('UNACKNOWLEDGED')
def test_take_action_vnf_lcm_op_occ_state_is_conflict(self, ack_state):
"""Test if vnf-lcm-op-occ state is conflict"""
arg_list = ['--ack-state', ack_state, uuidsentinel.vnf_fm_alarm_id]
verify_list = [('ack_state', ack_state),
('vnf_fm_alarm_id', uuidsentinel.vnf_fm_alarm_id)]
# command param
parsed_args = self.check_parser(
self.update_vnf_fm_alarm, arg_list, verify_list)
url = os.path.join(
self.url, 'vnffm/v1/alarms', uuidsentinel.vnf_fm_alarm_id)
self.requests_mock.register_uri(
'PATCH', url, headers=self.header, status_code=409, json={})
self.assertRaises(exceptions.TackerClientException,
self.update_vnf_fm_alarm.take_action,
parsed_args)

View File

@ -0,0 +1,329 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import os
import sys
from io import StringIO
from oslo_utils.fixture import uuidsentinel
from unittest import mock
from tackerclient.common import exceptions
from tackerclient.osc import utils as tacker_osc_utils
from tackerclient.osc.v2.vnffm import vnffm_sub
from tackerclient.tests.unit.osc import base
from tackerclient.tests.unit.osc.v1.fixture_data import client
from tackerclient.tests.unit.osc.v2 import vnffm_sub_fakes
class TestVnfFmSub(base.FixturedTestCase):
client_fixture_class = client.ClientFixture
def setUp(self):
super(TestVnfFmSub, self).setUp()
self.url = client.TACKER_URL
self.header = {'content-type': 'application/json'}
self.app = mock.Mock()
self.app_args = mock.Mock()
self.client_manager = self.cs
self.app.client_manager.tackerclient = self.client_manager
def _get_columns_vnffm_sub(action=None):
columns = ['ID', 'Callback Uri']
if action == 'show' or action == 'create':
columns.extend(['Filter', 'Links'])
return columns
@ddt.ddt
class TestCreateVnfFmSub(TestVnfFmSub):
def setUp(self):
super(TestCreateVnfFmSub, self).setUp()
self.create_vnf_fm_sub = vnffm_sub.CreateVnfFmSub(
self.app, self.app_args, cmd_name='vnffm sub create')
def test_create_no_args(self):
self.assertRaises(base.ParserException, self.check_parser,
self.create_vnf_fm_sub, [], [])
@ddt.unpack
def test_take_action(self):
param_file = ("./tackerclient/osc/v2/vnffm/samples/"
"create_vnf_fm_subscription_param_sample.json")
arg_list = [param_file]
verify_list = [('request_file', param_file)]
parsed_args = self.check_parser(self.create_vnf_fm_sub, arg_list,
verify_list)
json = vnffm_sub_fakes.vnf_fm_sub_response()
self.requests_mock.register_uri(
'POST', os.path.join(self.url, 'vnffm/v1/subscriptions'),
json=json, headers=self.header)
actual_columns, data = (
self.create_vnf_fm_sub.take_action(parsed_args))
_, attributes = vnffm_sub._get_columns(json)
self.assertCountEqual(_get_columns_vnffm_sub("create"),
actual_columns)
self.assertListItemsEqual(vnffm_sub_fakes.get_vnffm_sub_data(
json, columns=attributes), data)
class TestListVnfFmSub(TestVnfFmSub):
def setUp(self):
super(TestListVnfFmSub, self).setUp()
self.list_vnffm_sub = vnffm_sub.ListVnfFmSub(
self.app, self.app_args, cmd_name='vnffm sub list')
def test_take_action(self):
vnffm_subs_obj = vnffm_sub_fakes.create_vnf_fm_subs(
count=3)
parsed_args = self.check_parser(self.list_vnffm_sub, [], [])
self.requests_mock.register_uri(
'GET', os.path.join(self.url, 'vnffm/v1/subscriptions'),
json=vnffm_subs_obj, headers=self.header)
actual_columns, data = self.list_vnffm_sub.take_action(parsed_args)
_, columns = tacker_osc_utils.get_column_definitions(
vnffm_sub._ATTR_MAP, long_listing=True)
expected_data = []
for vnffm_sub_obj_idx in vnffm_subs_obj:
expected_data.append(vnffm_sub_fakes.get_vnffm_sub_data(
vnffm_sub_obj_idx, columns=columns))
self.assertCountEqual(_get_columns_vnffm_sub(action='list'),
actual_columns)
self.assertCountEqual(expected_data, list(data))
def test_take_action_with_filter(self):
vnffm_subs_obj = vnffm_sub_fakes.create_vnf_fm_subs(
count=3)
parsed_args = self.check_parser(
self.list_vnffm_sub,
["--filter", '(eq,callbackUri,/nfvo/notify/alarm)'],
[('filter', '(eq,callbackUri,/nfvo/notify/alarm)')])
self.requests_mock.register_uri(
'GET', os.path.join(
self.url,
'vnffm/v1/subscriptions?'
'filter=(eq,callbackUri,/nfvo/notify/alarm)'),
json=vnffm_subs_obj, headers=self.header)
actual_columns, data = self.list_vnffm_sub.take_action(parsed_args)
_, columns = tacker_osc_utils.get_column_definitions(
vnffm_sub._ATTR_MAP, long_listing=True)
expected_data = []
for vnffm_sub_obj_idx in vnffm_subs_obj:
expected_data.append(vnffm_sub_fakes.get_vnffm_sub_data(
vnffm_sub_obj_idx, columns=columns))
self.assertCountEqual(_get_columns_vnffm_sub(action='list'),
actual_columns)
self.assertListItemsEqual(expected_data, list(data))
def test_take_action_with_incorrect_filter(self):
parsed_args = self.check_parser(
self.list_vnffm_sub,
["--filter", '(callbackUri)'],
[('filter', '(callbackUri)')])
url = os.path.join(
self.url,
'vnffm/v1/subscriptions?filter=(callbackUri)')
self.requests_mock.register_uri(
'POST', url, headers=self.header, status_code=400, json={})
self.assertRaises(exceptions.TackerClientException,
self.list_vnffm_sub.take_action,
parsed_args)
def test_take_action_internal_server_error(self):
parsed_args = self.check_parser(
self.list_vnffm_sub,
["--filter", '(eq,callbackUri,/nfvo/notify/alarm)'],
[('filter', '(eq,callbackUri,/nfvo/notify/alarm)')])
url = os.path.join(
self.url,
'vnffm/v1/subscriptions?'
'filter=(eq,callbackUri,/nfvo/notify/alarm)')
self.requests_mock.register_uri(
'POST', url, headers=self.header, status_code=500, json={})
self.assertRaises(exceptions.TackerClientException,
self.list_vnffm_sub.take_action,
parsed_args)
class TestShowVnfFmSub(TestVnfFmSub):
def setUp(self):
super(TestShowVnfFmSub, self).setUp()
self.show_vnf_fm_subs = vnffm_sub.ShowVnfFmSub(
self.app, self.app_args, cmd_name='vnffm sub show')
def test_take_action(self):
"""Test of take_action()"""
vnffm_sub_obj = vnffm_sub_fakes.vnf_fm_sub_response()
arg_list = [vnffm_sub_obj['id']]
verify_list = [('vnf_fm_sub_id', vnffm_sub_obj['id'])]
# command param
parsed_args = self.check_parser(
self.show_vnf_fm_subs, arg_list, verify_list)
url = os.path.join(
self.url,
'vnffm/v1/subscriptions',
vnffm_sub_obj['id'])
self.requests_mock.register_uri(
'GET', url, headers=self.header, json=vnffm_sub_obj)
columns, _ = (self.show_vnf_fm_subs.take_action(parsed_args))
self.assertCountEqual(_get_columns_vnffm_sub('show'),
columns)
def test_take_action_vnf_fm_sub_id_not_found(self):
"""Test if vnf-lcm-op-occ-id does not find."""
arg_list = [uuidsentinel.vnf_fm_sub_id]
verify_list = [('vnf_fm_sub_id', uuidsentinel.vnf_fm_sub_id)]
# command param
parsed_args = self.check_parser(
self.show_vnf_fm_subs, arg_list, verify_list)
url = os.path.join(
self.url,
'vnffm/v1/subscriptions',
uuidsentinel.vnf_fm_sub_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=404, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_fm_subs.take_action,
parsed_args)
def test_take_action_internal_server_error(self):
"""Test for internal server error."""
arg_list = [uuidsentinel.vnf_fm_sub_id]
verify_list = [('vnf_fm_sub_id', uuidsentinel.vnf_fm_sub_id)]
# command param
parsed_args = self.check_parser(
self.show_vnf_fm_subs, arg_list, verify_list)
url = os.path.join(
self.url,
'vnffm/v1/subscriptions',
uuidsentinel.vnf_fm_sub_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=500, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_fm_subs.take_action,
parsed_args)
class TestDeleteVnfFmSub(TestVnfFmSub):
def setUp(self):
super(TestDeleteVnfFmSub, self).setUp()
self.delete_vnf_fm_sub = vnffm_sub.DeleteVnfFmSub(
self.app, self.app_args, cmd_name='vnffm sub delete')
# Vnf Fm subscription to delete
self.vnf_fm_subs = vnffm_sub_fakes.create_vnf_fm_subs(count=3)
def _mock_request_url_for_delete(self, index):
url = os.path.join(self.url, 'vnffm/v1/subscriptions',
self.vnf_fm_subs[index]['id'])
self.requests_mock.register_uri('DELETE', url,
headers=self.header, json={})
def test_delete_one_vnf_fm_sub(self):
arg_list = [self.vnf_fm_subs[0]['id']]
verify_list = [('vnf_fm_sub_id',
[self.vnf_fm_subs[0]['id']])]
parsed_args = self.check_parser(self.delete_vnf_fm_sub, arg_list,
verify_list)
self._mock_request_url_for_delete(0)
sys.stdout = buffer = StringIO()
result = self.delete_vnf_fm_sub.take_action(parsed_args)
self.assertIsNone(result)
self.assertEqual(
(f"VNF FM subscription '{self.vnf_fm_subs[0]['id']}' "
f"deleted successfully"), buffer.getvalue().strip())
def test_delete_multiple_vnf_fm_sub(self):
arg_list = []
for obj in self.vnf_fm_subs:
arg_list.append(obj['id'])
verify_list = [('vnf_fm_sub_id', arg_list)]
parsed_args = self.check_parser(self.delete_vnf_fm_sub, arg_list,
verify_list)
for i in range(0, 3):
self._mock_request_url_for_delete(i)
sys.stdout = buffer = StringIO()
result = self.delete_vnf_fm_sub.take_action(parsed_args)
self.assertIsNone(result)
self.assertEqual('All specified VNF FM subscriptions are deleted '
'successfully', buffer.getvalue().strip())
def test_delete_multiple_vnf_fm_sub_exception(self):
arg_list = [
self.vnf_fm_subs[0]['id'],
'xxxx-yyyy-zzzz',
self.vnf_fm_subs[1]['id'],
]
verify_list = [('vnf_fm_sub_id', arg_list)]
parsed_args = self.check_parser(self.delete_vnf_fm_sub,
arg_list, verify_list)
self._mock_request_url_for_delete(0)
url = os.path.join(self.url, 'vnffm/v1/subscriptions',
'xxxx-yyyy-zzzz')
self.requests_mock.register_uri(
'GET', url, exc=exceptions.ConnectionFailed)
self._mock_request_url_for_delete(1)
exception = self.assertRaises(exceptions.CommandError,
self.delete_vnf_fm_sub.take_action,
parsed_args)
self.assertEqual('Failed to delete 1 of 3 VNF FM subscriptions.',
exception.message)

View File

@ -0,0 +1,476 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
import os
import sys
from io import StringIO
from oslo_utils.fixture import uuidsentinel
from unittest import mock
from tackerclient.common import exceptions
from tackerclient.osc import utils as tacker_osc_utils
from tackerclient.osc.v2.vnfpm import vnfpm_job
from tackerclient.tests.unit.osc import base
from tackerclient.tests.unit.osc.v1.fixture_data import client
from tackerclient.tests.unit.osc.v2 import vnfpm_job_fakes
class TestVnfPmJob(base.FixturedTestCase):
client_fixture_class = client.ClientFixture
def setUp(self):
super(TestVnfPmJob, self).setUp()
self.url = client.TACKER_URL
self.header = {'content-type': 'application/json'}
self.app = mock.Mock()
self.app_args = mock.Mock()
self.client_manager = self.cs
self.app.client_manager.tackerclient = self.client_manager
def _get_columns_vnfpm_job(action=None):
if action == 'update':
columns = ['Callback Uri', 'Authentication']
else:
columns = ['ID', 'Object Type', 'Object Instance Ids',
'Sub Object Instance Ids', 'Criteria', 'Callback Uri',
'Reports', 'Links']
if action == 'list':
columns = [
ele for ele in columns if ele not in [
'Criteria', 'Sub Object Instance Ids', 'Reports', 'Links'
]
]
return columns
@ddt.ddt
class TestCreateVnfPmJob(TestVnfPmJob):
def setUp(self):
super(TestCreateVnfPmJob, self).setUp()
self.create_vnf_pm_job = vnfpm_job.CreateVnfPmJob(
self.app, self.app_args, cmd_name='vnfpm job create')
def test_create_no_args(self):
self.assertRaises(base.ParserException, self.check_parser,
self.create_vnf_pm_job, [], [])
@ddt.unpack
def test_take_action(self):
param_file = ("./tackerclient/osc/v2/vnfpm/samples/"
"create_vnf_pm_job_param_sample.json")
arg_list = [param_file]
verify_list = [('request_file', param_file)]
parsed_args = self.check_parser(self.create_vnf_pm_job, arg_list,
verify_list)
json = vnfpm_job_fakes.vnf_pm_job_response()
self.requests_mock.register_uri(
'POST', os.path.join(self.url, 'vnfpm/v2/pm_jobs'),
json=json, headers=self.header)
actual_columns, data = (
self.create_vnf_pm_job.take_action(parsed_args))
self.assertCountEqual(_get_columns_vnfpm_job(),
actual_columns)
_, attributes = vnfpm_job._get_columns(json)
expected_data = vnfpm_job_fakes.get_vnfpm_job_data(
json, columns=attributes)
self.assertListItemsEqual(expected_data, data)
@ddt.ddt
class TestListVnfPmJob(TestVnfPmJob):
def setUp(self):
super(TestListVnfPmJob, self).setUp()
self.list_vnf_pm_jobs = vnfpm_job.ListVnfPmJob(
self.app, self.app_args, cmd_name='vnfpm job list')
self._vnf_pm_jobs = self._get_vnf_pm_jobs()
def _get_vnf_pm_jobs(self):
return vnfpm_job_fakes.create_vnf_pm_jobs(count=3)
def get_list_columns(self, all_fields=False, exclude_fields=None,
extra_fields=None, exclude_default=False):
columns = ['Id', 'Object Type', 'Links']
complex_columns = [
'Object Instance Ids',
'Sub Object Instance Ids',
'Criteria',
'Reports'
]
simple_columns = ['Callback Uri']
if extra_fields:
columns.extend(extra_fields)
if exclude_fields:
columns.extend([field for field in complex_columns
if field not in exclude_fields])
if all_fields:
columns.extend(complex_columns)
columns.extend(simple_columns)
if exclude_default:
columns.extend(simple_columns)
return columns
def _get_mock_response_for_list_vnf_pm_jobs(
self, filter_attribute, json=None):
self.requests_mock.register_uri(
'GET', self.url + '/vnfpm/v2/pm_jobs?' + filter_attribute,
json=json if json else self._get_vnf_pm_jobs(),
headers=self.header)
def test_take_action_default_fields(self):
parsed_args = self.check_parser(self.list_vnf_pm_jobs, [], [])
self.requests_mock.register_uri(
'GET', self.url + '/vnfpm/v2/pm_jobs',
json=self._vnf_pm_jobs, headers=self.header)
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
expected_data = []
_, columns = tacker_osc_utils.get_column_definitions(
self.list_vnf_pm_jobs.get_attributes(), long_listing=True)
for vnf_pm_job_obj in self._vnf_pm_jobs['vnf_pm_jobs']:
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
vnf_pm_job_obj, columns=columns))
self.assertCountEqual(self.get_list_columns(), actual_columns)
self.assertListItemsEqual(expected_data, list(data))
@ddt.data('all_fields', 'exclude_default')
def test_take_action(self, arg):
parsed_args = self.check_parser(
self.list_vnf_pm_jobs,
["--" + arg, "--filter", '(eq,objectType,VNFC)'],
[(arg, True), ('filter', '(eq,objectType,VNFC)')])
vnf_pm_jobs = self._get_vnf_pm_jobs()
self._get_mock_response_for_list_vnf_pm_jobs(
'filter=(eq,objectType,VNFC)&' + arg, json=vnf_pm_jobs)
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
expected_data = []
kwargs = {arg: True}
_, columns = tacker_osc_utils.get_column_definitions(
self.list_vnf_pm_jobs.get_attributes(**kwargs), long_listing=True)
for vnf_pm_job_obj in vnf_pm_jobs['vnf_pm_jobs']:
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
vnf_pm_job_obj, columns=columns))
self.assertCountEqual(self.get_list_columns(**kwargs), actual_columns)
self.assertListItemsEqual(expected_data, list(data))
def test_take_action_with_exclude_fields(self):
parsed_args = self.check_parser(
self.list_vnf_pm_jobs,
["--exclude_fields", 'objectInstanceIds,criteria',
"--filter", '(eq,objectType,VNFC)'],
[('exclude_fields', 'objectInstanceIds,criteria'),
('filter', '(eq,objectType,VNFC)')])
vnf_pm_jobs = self._get_vnf_pm_jobs()
updated_vnf_pm_jobs = {'vnf_pm_jobs': []}
for vnf_pm_job_obj in vnf_pm_jobs['vnf_pm_jobs']:
vnf_pm_job_obj.pop('objectInstanceIds')
vnf_pm_job_obj.pop('criteria')
updated_vnf_pm_jobs['vnf_pm_jobs'].append(vnf_pm_job_obj)
self._get_mock_response_for_list_vnf_pm_jobs(
'filter=(eq,objectType,VNFC)&'
'exclude_fields=objectInstanceIds,criteria',
json=updated_vnf_pm_jobs)
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
expected_data = []
_, columns = tacker_osc_utils.get_column_definitions(
self.list_vnf_pm_jobs.get_attributes(
exclude_fields=['objectInstanceIds', 'criteria']),
long_listing=True)
for updated_vnf_pm_obj in updated_vnf_pm_jobs['vnf_pm_jobs']:
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
updated_vnf_pm_obj, columns=columns))
expected_columns = self.get_list_columns(
exclude_fields=['Object Instance Ids', 'Criteria'])
self.assertCountEqual(expected_columns, actual_columns)
self.assertListItemsEqual(expected_data, list(data))
@ddt.data((['--all_fields', '--fields', 'objectInstanceIds'],
[('all_fields', True), ('fields', 'objectInstanceIds')]),
(['--all_fields', '--exclude_fields', 'criteria'],
[('all_fields', True), ('exclude_fields', 'criteria')]),
(['--fields', 'objectInstanceIds',
'--exclude_fields', 'criteria'],
[('fields', 'objectInstanceIds'),
('exclude_fields', 'criteria')]))
@ddt.unpack
def test_take_action_with_invalid_combination(self, arglist, verifylist):
self.assertRaises(base.ParserException, self.check_parser,
self.list_vnf_pm_jobs, arglist, verifylist)
def test_take_action_with_valid_combination(self):
parsed_args = self.check_parser(
self.list_vnf_pm_jobs,
[
"--fields", 'subObjectInstanceIds,criteria',
"--exclude_default"
],
[
('fields', 'subObjectInstanceIds,criteria'),
('exclude_default', True)
])
vnf_pm_jobs = self._get_vnf_pm_jobs()
updated_vnf_pm_jobs = {'vnf_pm_jobs': []}
for vnf_pm_job_obj in vnf_pm_jobs['vnf_pm_jobs']:
# vnf_pm_job_obj.pop('userDefinedData')
updated_vnf_pm_jobs['vnf_pm_jobs'].append(vnf_pm_job_obj)
self._get_mock_response_for_list_vnf_pm_jobs(
'exclude_default&fields=subObjectInstanceIds,criteria',
json=updated_vnf_pm_jobs)
actual_columns, data = self.list_vnf_pm_jobs.take_action(parsed_args)
expected_data = []
_, columns = tacker_osc_utils.get_column_definitions(
self.list_vnf_pm_jobs.get_attributes(
extra_fields=['subObjectInstanceIds', 'criteria'],
exclude_default=True),
long_listing=True)
for updated_vnf_pm_job_obj in updated_vnf_pm_jobs['vnf_pm_jobs']:
expected_data.append(vnfpm_job_fakes.get_vnfpm_job_data(
updated_vnf_pm_job_obj, columns=columns))
expected_columns = self.get_list_columns(
extra_fields=['Sub Object Instance Ids', 'Criteria'],
exclude_default=True)
self.assertCountEqual(expected_columns, actual_columns)
self.assertListItemsEqual(expected_data, list(data))
class TestShowVnfPmJob(TestVnfPmJob):
def setUp(self):
super(TestShowVnfPmJob, self).setUp()
self.show_vnf_pm_jobs = vnfpm_job.ShowVnfPmJob(
self.app, self.app_args, cmd_name='vnfpm job show')
def test_take_action(self):
"""Test of take_action()"""
vnfpm_job_obj = vnfpm_job_fakes.vnf_pm_job_response()
arg_list = [vnfpm_job_obj['id']]
verify_list = [('vnf_pm_job_id', vnfpm_job_obj['id'])]
# command param
parsed_args = self.check_parser(
self.show_vnf_pm_jobs, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', vnfpm_job_obj['id'])
self.requests_mock.register_uri(
'GET', url, headers=self.header, json=vnfpm_job_obj)
columns, data = (self.show_vnf_pm_jobs.take_action(parsed_args))
self.assertCountEqual(_get_columns_vnfpm_job('show'), columns)
_, attributes = vnfpm_job._get_columns(vnfpm_job_obj)
self.assertListItemsEqual(
vnfpm_job_fakes.get_vnfpm_job_data(
vnfpm_job_obj, columns=attributes), data)
def test_take_action_vnf_pm_job_id_not_found(self):
"""Test if vnf-pm-job-id does not find."""
arg_list = [uuidsentinel.vnf_pm_job_id]
verify_list = [('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id)]
# command param
parsed_args = self.check_parser(
self.show_vnf_pm_jobs, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=404, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_pm_jobs.take_action,
parsed_args)
def test_take_action_internal_server_error(self):
"""Test for internal server error."""
arg_list = [uuidsentinel.vnf_pm_job_id]
verify_list = [('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id)]
# command param
parsed_args = self.check_parser(
self.show_vnf_pm_jobs, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=500, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_pm_jobs.take_action,
parsed_args)
@ddt.ddt
class TestUpdateVnfPmJob(TestVnfPmJob):
def setUp(self):
super(TestUpdateVnfPmJob, self).setUp()
self.update_vnf_pm_job = vnfpm_job.UpdateVnfPmJob(
self.app, self.app_args, cmd_name='vnfpm job update')
def test_take_action(self):
"""Test of take_action()"""
param_file = ("./tackerclient/osc/v2/vnfpm/samples/"
"update_vnf_pm_job_param_sample.json")
arg_list = [uuidsentinel.vnf_pm_job_id, param_file]
verify_list = [
('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id),
('request_file', param_file)
]
vnfpm_job_obj = vnfpm_job_fakes.vnf_pm_job_response(
None, 'update')
# command param
parsed_args = self.check_parser(
self.update_vnf_pm_job, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
self.requests_mock.register_uri(
'PATCH', url, headers=self.header, json=vnfpm_job_obj)
actual_columns, data = (
self.update_vnf_pm_job.take_action(parsed_args))
expected_columns = _get_columns_vnfpm_job(action='update')
self.assertCountEqual(expected_columns, actual_columns)
_, columns = vnfpm_job._get_columns(
vnfpm_job_obj, action='update')
expected_data = vnfpm_job_fakes.get_vnfpm_job_data(
vnfpm_job_obj, columns=columns)
self.assertEqual(expected_data, data)
def test_take_action_vnf_pm_job_id_not_found(self):
"""Test if vnf-pm-job-id does not find"""
param_file = ("./tackerclient/osc/v2/vnfpm/samples/"
"update_vnf_pm_job_param_sample.json")
arg_list = [uuidsentinel.vnf_pm_job_id, param_file]
verify_list = [
('vnf_pm_job_id', uuidsentinel.vnf_pm_job_id),
('request_file', param_file)
]
# command param
parsed_args = self.check_parser(
self.update_vnf_pm_job, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', uuidsentinel.vnf_pm_job_id)
self.requests_mock.register_uri(
'PATCH', url, headers=self.header, status_code=404, json={})
self.assertRaises(exceptions.TackerClientException,
self.update_vnf_pm_job.take_action,
parsed_args)
class TestDeleteVnfPmJob(TestVnfPmJob):
def setUp(self):
super(TestDeleteVnfPmJob, self).setUp()
self.delete_vnf_pm_job = vnfpm_job.DeleteVnfPmJob(
self.app, self.app_args, cmd_name='vnfpm job delete')
# Vnf Fm job to delete
self.vnf_pm_jobs = vnfpm_job_fakes.create_vnf_pm_jobs(
count=3)['vnf_pm_jobs']
def _mock_request_url_for_delete(self, index):
url = os.path.join(self.url, 'vnfpm/v2/pm_jobs',
self.vnf_pm_jobs[index]['id'])
self.requests_mock.register_uri('DELETE', url,
headers=self.header, json={})
def test_delete_one_vnf_pm_job(self):
arg_list = [self.vnf_pm_jobs[0]['id']]
verify_list = [('vnf_pm_job_id',
[self.vnf_pm_jobs[0]['id']])]
parsed_args = self.check_parser(self.delete_vnf_pm_job, arg_list,
verify_list)
self._mock_request_url_for_delete(0)
sys.stdout = buffer = StringIO()
result = self.delete_vnf_pm_job.take_action(parsed_args)
self.assertIsNone(result)
self.assertEqual(
(f"VNF PM job '{self.vnf_pm_jobs[0]['id']}' "
f"deleted successfully"), buffer.getvalue().strip())
def test_delete_multiple_vnf_pm_job(self):
arg_list = []
for obj in self.vnf_pm_jobs:
arg_list.append(obj['id'])
verify_list = [('vnf_pm_job_id', arg_list)]
parsed_args = self.check_parser(self.delete_vnf_pm_job, arg_list,
verify_list)
for i in range(0, 3):
self._mock_request_url_for_delete(i)
sys.stdout = buffer = StringIO()
result = self.delete_vnf_pm_job.take_action(parsed_args)
self.assertIsNone(result)
self.assertEqual('All specified VNF PM jobs are deleted '
'successfully', buffer.getvalue().strip())
def test_delete_multiple_vnf_pm_job_exception(self):
arg_list = [
self.vnf_pm_jobs[0]['id'],
'xxxx-yyyy-zzzz',
self.vnf_pm_jobs[1]['id'],
]
verify_list = [('vnf_pm_job_id', arg_list)]
parsed_args = self.check_parser(self.delete_vnf_pm_job,
arg_list, verify_list)
self._mock_request_url_for_delete(0)
url = os.path.join(self.url, 'vnfpm/v2/jobs',
'xxxx-yyyy-zzzz')
self.requests_mock.register_uri(
'GET', url, exc=exceptions.ConnectionFailed)
self._mock_request_url_for_delete(1)
exception = self.assertRaises(exceptions.CommandError,
self.delete_vnf_pm_job.take_action,
parsed_args)
self.assertEqual('Failed to delete 1 of 3 VNF PM jobs.',
exception.message)

View File

@ -0,0 +1,131 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from oslo_utils.fixture import uuidsentinel
from unittest import mock
from tackerclient.common import exceptions
from tackerclient.osc.v2.vnfpm import vnfpm_report
from tackerclient.tests.unit.osc import base
from tackerclient.tests.unit.osc.v1.fixture_data import client
from tackerclient.tests.unit.osc.v2 import vnfpm_report_fakes
class TestVnfPmReport(base.FixturedTestCase):
client_fixture_class = client.ClientFixture
def setUp(self):
super(TestVnfPmReport, self).setUp()
self.url = client.TACKER_URL
self.header = {'content-type': 'application/json'}
self.app = mock.Mock()
self.app_args = mock.Mock()
self.client_manager = self.cs
self.app.client_manager.tackerclient = self.client_manager
def _get_columns_vnfpm_report():
columns = ['Entries']
return columns
class TestShowVnfPmReport(TestVnfPmReport):
def setUp(self):
super(TestShowVnfPmReport, self).setUp()
self.show_vnf_pm_reports = vnfpm_report.ShowVnfPmReport(
self.app, self.app_args, cmd_name='vnfpm report show')
def test_take_action(self):
"""Test of take_action()"""
vnfpm_report_obj = vnfpm_report_fakes.vnf_pm_report_response()
vnf_pm_job_id = uuidsentinel.vnf_pm_job_id
vnf_pm_report_id = uuidsentinel.vnfpm_report_obj
arg_list = [vnf_pm_job_id, vnf_pm_report_id]
verify_list = [
('vnf_pm_job_id', vnf_pm_job_id),
('vnf_pm_report_id', vnf_pm_report_id)
]
# command param
parsed_args = self.check_parser(
self.show_vnf_pm_reports, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', vnf_pm_job_id,
'reports', vnf_pm_report_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, json=vnfpm_report_obj)
columns, data = (self.show_vnf_pm_reports.take_action(parsed_args))
self.assertCountEqual(_get_columns_vnfpm_report(), columns)
_, attributes = vnfpm_report._get_columns(vnfpm_report_obj)
expected_data = vnfpm_report_fakes.get_vnfpm_report_data(
vnfpm_report_obj, columns=attributes)
print(f'123, {expected_data}')
print(f'456, {data}')
self.assertListItemsEqual(expected_data, data)
def test_take_action_vnf_pm_report_id_not_found(self):
"""Test if vnf-pm-report-id does not find."""
vnf_pm_job_id = uuidsentinel.vnf_pm_job_id
vnf_pm_report_id = uuidsentinel.vnf_pm_report_id
arg_list = [vnf_pm_job_id, vnf_pm_report_id]
verify_list = [
('vnf_pm_job_id', vnf_pm_job_id),
('vnf_pm_report_id', vnf_pm_report_id)
]
# command param
parsed_args = self.check_parser(
self.show_vnf_pm_reports, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', vnf_pm_job_id,
'reports', vnf_pm_report_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=404, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_pm_reports.take_action,
parsed_args)
def test_take_action_internal_server_error(self):
"""Test for internal server error."""
vnf_pm_job_id = uuidsentinel.vnf_pm_job_id
vnf_pm_report_id = uuidsentinel.vnf_pm_report_id
arg_list = [vnf_pm_job_id, vnf_pm_report_id]
verify_list = [
('vnf_pm_job_id', vnf_pm_job_id),
('vnf_pm_report_id', vnf_pm_report_id)
]
# command param
parsed_args = self.check_parser(
self.show_vnf_pm_reports, arg_list, verify_list)
url = os.path.join(
self.url, 'vnfpm/v2/pm_jobs', vnf_pm_job_id,
'reports', vnf_pm_report_id)
self.requests_mock.register_uri(
'GET', url, headers=self.header, status_code=500, json={})
self.assertRaises(exceptions.TackerClientException,
self.show_vnf_pm_reports.take_action,
parsed_args)

View File

@ -0,0 +1,127 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from tackerclient.osc import utils as tacker_osc_utils
def create_vnf_fm_alarms(count=2):
"""Create multiple fake vnf packages.
:param int count:
The number of vnf_fm_alarms to fake
:return:
A list of fake vnf fm alarms dictionary
"""
vnf_fm_alarms = []
for i in range(0, count):
unique_id = uuidutils.generate_uuid()
vnf_fm_alarms.append(vnf_fm_alarm_response(attrs={'id': unique_id}))
return vnf_fm_alarms
def vnf_fm_alarm_response(attrs=None, action=None):
"""Create a fake vnf fm alarm.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeVnfFmAlarm dict
"""
if action == 'update':
fake_vnf_fm_alarm = {
"ackState": "UNACKNOWLEDGED"
}
return fake_vnf_fm_alarm
attrs = attrs or {}
# Set default attributes.
fake_vnf_fm_alarm = {
"id": "78a39661-60a8-4824-b989-88c1b0c3534a",
"managedObjectId": "c61314d0-f583-4ab3-a457-46426bce02d3",
"vnfcInstanceIds": "0e5f3086-4e79-47ed-a694-54c29155fa26",
"rootCauseFaultyResource": {
"faultyResource": {
"vimConnectionId": "0d57e928-86a4-4445-a4bd-1634edae73f3",
"resourceId": "4e6ccbe1-38ec-4b1b-a278-64de09ba01b3",
"vimLevelResourceType": "OS::Nova::Server"
},
"faultyResourceType": "COMPUTE"
},
"alarmRaisedTime": "2021-09-03 10:21:03",
"alarmChangedTime": "2021-09-04 10:21:03",
"alarmClearedTime": "2021-09-05 10:21:03",
"alarmAcknowledgedTime": "2021-09-06 10:21:03",
"ackState": "UNACKNOWLEDGED",
"perceivedSeverity": "WARNING",
"eventTime": "2021-09-07 10:06:03",
"eventType": "EQUIPMENT_ALARM",
"faultType": "Fault Type",
"probableCause": "The server cannot be connected.",
"isRootCause": False,
"correlatedAlarmIds": [
"c88b624e-e997-4b17-b674-10ca2bab62e0",
"c16d41fd-12e2-49a6-bb17-72faf702353f"
],
"faultDetails": [
"Fault",
"Details"
],
"_links": {
"self": {
"href": "/vnffm/v1/alarms/"
"78a39661-60a8-4824-b989-88c1b0c3534a"
},
"objectInstance": {
"href": "/vnflcm/v1/vnf_instances/"
"0e5f3086-4e79-47ed-a694-54c29155fa26"
}
}
}
# Overwrite default attributes.
fake_vnf_fm_alarm.update(attrs)
return fake_vnf_fm_alarm
def get_vnffm_alarm_data(vnf_fm_alarm, columns=None):
"""Get the vnffm alarm.
:return:
A tuple object sorted based on the name of the columns.
"""
complex_attributes = [
'vnfcInstanceIds',
'rootCauseFaultyResource',
'correlatedAlarmIds',
'faultDetails',
'_links'
]
for attribute in complex_attributes:
if vnf_fm_alarm.get(attribute):
vnf_fm_alarm.update(
{attribute: tacker_osc_utils.FormatComplexDataColumn(
vnf_fm_alarm[attribute])})
# return the list of data as per column order
if columns:
return tuple([vnf_fm_alarm[key] for key in columns])
return tuple([vnf_fm_alarm[key] for key in sorted(
vnf_fm_alarm.keys())])

View File

@ -0,0 +1,127 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from tackerclient.osc import utils as tacker_osc_utils
def create_vnf_fm_subs(count=2):
"""Create multiple fake vnf packages.
:param int count:
The number of vnf_fm_subs to fake
:return:
A list of fake vnf fm subs dictionary
"""
vnf_fm_subs = []
for i in range(0, count):
unique_id = uuidutils.generate_uuid()
vnf_fm_subs.append(vnf_fm_sub_response(attrs={'id': unique_id}))
return vnf_fm_subs
def vnf_fm_sub_response(attrs=None):
"""Create a fake vnf fm sub.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A FakeVnfFmAlarm dict
"""
attrs = attrs or {}
# Set default attributes.
fake_vnf_fm_sub = {
"id": "78a39661-60a8-4824-b989-88c1b0c3534a",
"filter": {
"vnfInstanceSubscriptionFilter": {
"vnfdIds": [
"dummy-vnfdId-1"
],
"vnfProductsFromProviders": [
{
"vnfProvider": "dummy-vnfProvider-1",
"vnfProducts": [
{
"vnfProductName": "dummy-vnfProductName-1-1",
"versions": [
{
"vnfSoftwareVersion": 1.0,
"vnfdVersions": [1.0, 2.0]
}
]
}
]
}
],
"vnfInstanceIds": [
"dummy-vnfInstanceId-1"
],
"vnfInstanceNames": [
"dummy-vnfInstanceName-1"
]
},
"notificationTypes": [
"AlarmNotification"
],
"faultyResourceTypes": [
"COMPUTE"
],
"perceivedSeverities": [
"WARNING"
],
"eventTypes": [
"EQUIPMENT_ALARM"
],
"probableCauses": [
"The server cannot be connected."
]
},
"callbackUri": "/nfvo/notify/alarm",
"_links": {
"self": {
"href": "/vnffm/v1/subscriptions/"
"78a39661-60a8-4824-b989-88c1b0c3534a"
}
}
}
# Overwrite default attributes.
fake_vnf_fm_sub.update(attrs)
return fake_vnf_fm_sub
def get_vnffm_sub_data(vnf_fm_sub, columns=None):
"""Get the vnffm sub.
:return:
A tuple object sorted based on the name of the columns.
"""
complex_attributes = ['filter', '_links']
for attribute in complex_attributes:
if vnf_fm_sub.get(attribute):
vnf_fm_sub.update(
{attribute: tacker_osc_utils.FormatComplexDataColumn(
vnf_fm_sub[attribute])})
# return the list of data as per column order
if columns:
return tuple([vnf_fm_sub[key] for key in columns])
return tuple([vnf_fm_sub[key] for key in sorted(
vnf_fm_sub.keys())])

View File

@ -0,0 +1,134 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from tackerclient.osc import utils as tacker_osc_utils
def create_vnf_pm_jobs(count=2):
"""Create multiple fake vnf pm jobs.
:param int count:
The number of vnf_pm_jobs to fake
:return:
A list of fake vnf pm jobs dictionary
"""
vnf_pm_jobs = []
for i in range(0, count):
unique_id = uuidutils.generate_uuid()
vnf_pm_jobs.append(vnf_pm_job_response(attrs={'id': unique_id}))
return {'vnf_pm_jobs': vnf_pm_jobs}
def vnf_pm_job_response(attrs=None, action=None):
"""Create a fake vnf pm job.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A pm job dict
"""
if action == 'update':
fake_vnf_pm_job = {
"callbackUri": "/nfvo/notify/job",
"authentication": {
"authType": [
"BASIC",
"OAUTH2_CLIENT_CREDENTIALS"
],
"paramsBasic": {
"userName": "nfvo",
"password": "nfvopwd"
},
"paramsOauth2ClientCredentials": {
"clientId": "auth_user_name",
"clientPassword": "auth_password",
"tokenEndpoint": "token_endpoint"
}
}
}
return fake_vnf_pm_job
attrs = attrs or {}
# Set default attributes.
fake_vnf_pm_job = {
"id": "2bb72d78-b1d9-48fe-8c64-332654ffeb5d",
"objectType": "VNFC",
"objectInstanceIds": [
"object-instance-id-1"
],
"subObjectInstanceIds": [
"sub-object-instance-id-2"
],
"criteria": {
"performanceMetric": [
"VCpuUsageMeanVnf.object-instance-id-1"
],
"performanceMetricGroup": [
"VirtualisedComputeResource"
],
"collectionPeriod": 500,
"reportingPeriod": 1000,
"reportingBoundary": "2022/07/25 10:43:55"
},
"callbackUri": "/nfvo/notify/job",
"reports": [{
"href": "/vnfpm/v2/pm_jobs/2bb72d78-b1d9-48fe-8c64-332654ffeb5d/"
"reports/09d46aed-3ec2-45d9-bfa2-add431e069b3",
"readyTime": "2022/07/25 10:43:55",
"expiryTime": "2022/07/25 10:43:55",
"fileSize": 9999
}],
"_links": {
"self": {
"href": "/vnfpm/v2/pm_jobs/"
"78a39661-60a8-4824-b989-88c1b0c3534a"
},
"objects": [{
"href": "/vnflcm/v1/vnf_instances/"
"0e5f3086-4e79-47ed-a694-54c29155fa26"
}]
}
}
# Overwrite default attributes.
fake_vnf_pm_job.update(attrs)
return fake_vnf_pm_job
def get_vnfpm_job_data(vnf_pm_job, columns=None):
"""Get the vnfpm job.
:return:
A tuple object sorted based on the name of the columns.
"""
complex_attributes = [
'objectInstanceIds', 'subObjectInstanceIds',
'criteria', 'reports', '_links',
'authentication'
]
for attribute in complex_attributes:
if vnf_pm_job.get(attribute):
vnf_pm_job.update(
{attribute: tacker_osc_utils.FormatComplexDataColumn(
vnf_pm_job[attribute])})
# return the list of data as per column order
if columns is None:
columns = sorted(vnf_pm_job.keys())
return tuple([vnf_pm_job[key] for key in columns])

View File

@ -0,0 +1,73 @@
# Copyright (C) 2022 Fujitsu
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tackerclient.osc import utils as tacker_osc_utils
def vnf_pm_report_response(attrs=None):
"""Create a fake vnf pm report.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A pm report dict
"""
attrs = attrs or {}
# Set default attributes.
fake_vnf_pm_report = {
"entries": [
{
"objectType": "VNFC",
"objectInstanceId": "2bb72d78-b1d9-48fe-8c64-332654ffeb5d",
"subObjectInstanceId": "09d46aed-3ec2-45d9-bfa2-add431e069b3",
"performanceMetric":
"VCpuUsagePeakVnf.2bb72d78-b1d9-48fe-8c64-332654ffeb5d,",
"performanceValues": [
{
"timeStamp": "2022/07/27 08:58:58",
"value": "1.88",
"context": {
"key": "value"
}
}
]
}
]
}
# Overwrite default attributes.
fake_vnf_pm_report.update(attrs)
return fake_vnf_pm_report
def get_vnfpm_report_data(vnf_pm_report, columns=None):
"""Get the vnfpm report.
:return:
A tuple object sorted based on the name of the columns.
"""
attribute = 'entries'
if vnf_pm_report.get(attribute):
vnf_pm_report.update(
{attribute: tacker_osc_utils.FormatComplexDataColumn(
vnf_pm_report[attribute])})
# return the list of data as per column order
if columns is None:
columns = sorted(vnf_pm_report.keys())
return tuple([vnf_pm_report[key] for key in columns])

View File

@ -1049,6 +1049,101 @@ class VnfLCMClient(ClientBase):
return self.get(path, headers={'Version': '2.0.0'})
class VnfFMClient(ClientBase):
headers = {'Version': '1.3.0'}
vnf_fm_alarms_path = '/vnffm/v1/alarms'
vnf_fm_alarm_path = '/vnffm/v1/alarms/%s'
vnf_fm_subs_path = '/vnffm/v1/subscriptions'
vnf_fm_sub_path = '/vnffm/v1/subscriptions/%s'
def build_action(self, action):
return action
@APIParamsCall
def list_vnf_fm_alarms(self, retrieve_all=True, **_params):
vnf_fm_alarms = self.list(
"vnf_fm_alarms", self.vnf_fm_alarms_path, retrieve_all,
headers=self.headers, **_params)
return vnf_fm_alarms
@APIParamsCall
def show_vnf_fm_alarm(self, vnf_fm_alarm_id):
return self.get(
self.vnf_fm_alarm_path % vnf_fm_alarm_id, headers=self.headers)
@APIParamsCall
def update_vnf_fm_alarm(self, vnf_fm_alarm_id, body):
return self.patch(
self.vnf_fm_alarm_path % vnf_fm_alarm_id, body=body,
headers=self.headers)
@APIParamsCall
def create_vnf_fm_sub(self, body):
return self.post(
self.vnf_fm_subs_path, body=body, headers=self.headers)
@APIParamsCall
def list_vnf_fm_subs(self, retrieve_all=True, **_params):
vnf_fm_subs = self.list("vnf_fm_subs", self.vnf_fm_subs_path,
retrieve_all, headers=self.headers, **_params)
return vnf_fm_subs
@APIParamsCall
def show_vnf_fm_sub(self, vnf_fm_sub_id):
return self.get(
self.vnf_fm_sub_path % vnf_fm_sub_id, headers=self.headers)
@APIParamsCall
def delete_vnf_fm_sub(self, vnf_fm_sub_id):
return self.delete(
self.vnf_fm_sub_path % vnf_fm_sub_id, headers=self.headers)
class VnfPMClient(ClientBase):
headers = {'Version': '2.1.0'}
vnf_pm_jobs_path = '/vnfpm/v2/pm_jobs'
vnf_pm_job_path = '/vnfpm/v2/pm_jobs/%s'
vnf_pm_reports_path = '/vnfpm/v2/pm_jobs/%(job_id)s/reports/%(report_id)s'
def build_action(self, action):
return action
@APIParamsCall
def create_vnf_pm_job(self, body):
return self.post(
self.vnf_pm_jobs_path, body=body, headers=self.headers)
@APIParamsCall
def list_vnf_pm_jobs(self, retrieve_all=True, **_params):
vnf_pm_jobs = self.list(
"vnf_pm_jobs", self.vnf_pm_jobs_path, retrieve_all,
headers=self.headers, **_params)
return vnf_pm_jobs
@APIParamsCall
def show_vnf_pm_job(self, vnf_pm_job_id):
return self.get(
self.vnf_pm_job_path % vnf_pm_job_id, headers=self.headers)
@APIParamsCall
def update_vnf_pm_job(self, vnf_pm_job_id, body):
return self.patch(
self.vnf_pm_job_path % vnf_pm_job_id, body=body,
headers=self.headers)
@APIParamsCall
def delete_vnf_pm_job(self, vnf_pm_job_id):
return self.delete(
self.vnf_pm_job_path % vnf_pm_job_id, headers=self.headers)
@APIParamsCall
def show_vnf_pm_report(self, vnf_pm_job_id, vnf_pm_report_id):
return self.get(
self.vnf_pm_reports_path % {
'job_id': vnf_pm_job_id, 'report_id': vnf_pm_report_id
}, headers=self.headers)
class Client(object):
"""Unified interface to interact with multiple applications of tacker service.
@ -1071,6 +1166,8 @@ class Client(object):
def __init__(self, **kwargs):
api_version = kwargs.pop('api_version', '1')
self.vnf_lcm_client = VnfLCMClient(api_version, **kwargs)
self.vnf_fm_client = VnfFMClient(**kwargs)
self.vnf_pm_client = VnfPMClient(**kwargs)
self.vnf_package_client = VnfPackageClient(**kwargs)
self.legacy_client = LegacyClient(**kwargs)
@ -1383,3 +1480,50 @@ class Client(object):
def show_vnf_lcm_versions(self, major_version):
return self.vnf_lcm_client.show_vnf_lcm_versions(major_version)
# VnfFMClient methods.
def list_vnf_fm_alarms(self, retrieve_all=True, **_params):
return self.vnf_fm_client.list_vnf_fm_alarms(
retrieve_all=retrieve_all, **_params)
def show_vnf_fm_alarm(self, vnf_fm_alarm_id):
return self.vnf_fm_client.show_vnf_fm_alarm(vnf_fm_alarm_id)
def update_vnf_fm_alarm(self, vnf_fm_alarm_id, body):
return self.vnf_fm_client.update_vnf_fm_alarm(vnf_fm_alarm_id, body)
def create_vnf_fm_sub(self, body):
return self.vnf_fm_client.create_vnf_fm_sub(body)
def list_vnf_fm_subs(self, retrieve_all=True, **_params):
return self.vnf_fm_client.list_vnf_fm_subs(
retrieve_all=retrieve_all, **_params)
def show_vnf_fm_sub(self, vnf_fm_sub_id):
return self.vnf_fm_client.show_vnf_fm_sub(vnf_fm_sub_id)
def delete_vnf_fm_sub(self, vnf_fm_sub_id):
return self.vnf_fm_client.delete_vnf_fm_sub(vnf_fm_sub_id)
# VnfPMClient methods.
def create_vnf_pm_job(self, body):
return self.vnf_pm_client.create_vnf_pm_job(body)
def list_vnf_pm_jobs(self, retrieve_all=True, **_params):
return self.vnf_pm_client.list_vnf_pm_jobs(
retrieve_all=retrieve_all, **_params)
def show_vnf_pm_job(self, vnf_pm_job_id):
return self.vnf_pm_client.show_vnf_pm_job(vnf_pm_job_id)
def update_vnf_pm_job(self, vnf_pm_job_id, body):
return self.vnf_pm_client.update_vnf_pm_job(vnf_pm_job_id, body)
def delete_vnf_pm_job(self, vnf_pm_job_id):
return self.vnf_pm_client.delete_vnf_pm_job(vnf_pm_job_id)
def show_vnf_pm_report(self, vnf_pm_job_id, vnf_pm_report_id):
return self.vnf_pm_client.show_vnf_pm_report(
vnf_pm_job_id, vnf_pm_report_id)