OSC: Remove VPNAAS V2 calls to neutronclient

Depends-On: https://review.opendev.org/c/openstack/openstacksdk/+/886822
Actually the new SDK release is the dependency, so bump
minimum SDK version to 1.5.0.

Change-Id: I1933dceb3dc28b464e6e9c4ce468abbd03a00ba4
Related-Bug: #1999774
This commit is contained in:
elajkat 2023-06-21 15:56:28 +02:00
parent c4e8d90c0f
commit 396432ab06
12 changed files with 446 additions and 377 deletions

@ -32,9 +32,19 @@ _attr_map = (
('type', 'Type', column_util.LIST_BOTH), ('type', 'Type', column_util.LIST_BOTH),
('endpoints', 'Endpoints', column_util.LIST_BOTH), ('endpoints', 'Endpoints', column_util.LIST_BOTH),
('description', 'Description', column_util.LIST_LONG_ONLY), ('description', 'Description', column_util.LIST_LONG_ONLY),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY), ('project_id', 'Project', column_util.LIST_LONG_ONLY),
) )
_attr_map_dict = {
'id': 'ID',
'name': 'Name',
'type': 'Type',
'endpoints': 'Endpoints',
'description': 'Description',
'tenant_id': 'Project',
'project_id': 'Project',
}
def _get_common_parser(parser): def _get_common_parser(parser):
parser.add_argument( parser.add_argument(
@ -83,23 +93,22 @@ class CreateEndpointGroup(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, parsed_args) attrs = _get_common_attrs(self.app.client_manager, parsed_args)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
attrs['type'] = parsed_args.type attrs['type'] = parsed_args.type
if parsed_args.type == 'subnet': if parsed_args.type == 'subnet':
_subnet_ids = [client.find_resource( _subnet_ids = [client.find_subnet(
'subnet',
endpoint, endpoint,
cmd_resource='subnet')['id'] ignore_missing=False)['id']
for endpoint in parsed_args.endpoints] for endpoint in parsed_args.endpoints]
attrs['endpoints'] = _subnet_ids attrs['endpoints'] = _subnet_ids
else: else:
attrs['endpoints'] = parsed_args.endpoints attrs['endpoints'] = parsed_args.endpoints
obj = client.create_endpoint_group( obj = client.create_vpn_endpoint_group(**attrs)
{'endpoint_group': attrs})['endpoint_group'] display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
columns, display_columns = column_util.get_columns(obj, _attr_map) obj, _attr_map_dict, ['location', 'tenant_id'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return display_columns, data return display_columns, data
@ -117,15 +126,13 @@ class DeleteEndpointGroup(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
result = 0 result = 0
for endpoint in parsed_args.endpoint_group: for endpoint in parsed_args.endpoint_group:
try: try:
endpoint_id = client.find_resource( endpoint_id = client.find_vpn_endpoint_group(
'endpoint_group', endpoint, ignore_missing=False)['id']
endpoint, client.delete_vpn_endpoint_group(endpoint_id)
cmd_resource='endpoint_group')['id']
client.delete_endpoint_group(endpoint_id)
except Exception as e: except Exception as e:
result += 1 result += 1
LOG.error(_("Failed to delete endpoint group with " LOG.error(_("Failed to delete endpoint group with "
@ -153,8 +160,8 @@ class ListEndpointGroup(command.Lister):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
obj = client.list_endpoint_groups()['endpoint_groups'] obj = client.vpn_endpoint_groups()
headers, columns = column_util.get_column_definitions( headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=parsed_args.long) _attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(s, columns) for s in obj)) return (headers, (utils.get_dict_properties(s, columns) for s in obj))
@ -177,17 +184,15 @@ class SetEndpointGroup(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False) parsed_args, is_create=False)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
endpoint_id = client.find_resource( endpoint_id = client.find_vpn_endpoint_group(
'endpoint_group', parsed_args.endpoint_group, parsed_args.endpoint_group, ignore_missing=False)['id']
cmd_resource='endpoint_group')['id']
try: try:
client.update_endpoint_group(endpoint_id, client.update_vpn_endpoint_group(endpoint_id, **attrs)
{'endpoint_group': attrs})
except Exception as e: except Exception as e:
msg = (_("Failed to set endpoint group " msg = (_("Failed to set endpoint group "
"%(endpoint_group)s: %(e)s") "%(endpoint_group)s: %(e)s")
@ -207,11 +212,11 @@ class ShowEndpointGroup(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
endpoint_id = client.find_resource( endpoint_id = client.find_vpn_endpoint_group(
'endpoint_group', parsed_args.endpoint_group, parsed_args.endpoint_group, ignore_missing=False)['id']
cmd_resource='endpoint_group')['id'] obj = client.get_vpn_endpoint_group(endpoint_id)
obj = client.show_endpoint_group(endpoint_id)['endpoint_group'] display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
columns, display_columns = column_util.get_columns(obj, _attr_map) obj, _attr_map_dict, ['location', 'tenant_id'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return (display_columns, data) return (display_columns, data)

@ -38,10 +38,24 @@ _attr_map = (
('description', 'Description', column_util.LIST_LONG_ONLY), ('description', 'Description', column_util.LIST_LONG_ONLY),
('phase1_negotiation_mode', 'Phase1 Negotiation Mode', ('phase1_negotiation_mode', 'Phase1 Negotiation Mode',
column_util.LIST_LONG_ONLY), column_util.LIST_LONG_ONLY),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY), ('project_id', 'Project', column_util.LIST_LONG_ONLY),
('lifetime', 'Lifetime', column_util.LIST_LONG_ONLY), ('lifetime', 'Lifetime', column_util.LIST_LONG_ONLY),
) )
_attr_map_dict = {
'id': 'ID',
'name': 'Name',
'auth_algorithm': 'Authentication Algorithm',
'encryption_algorithm': 'Encryption Algorithm',
'ike_version': 'IKE Version',
'pfs': 'Perfect Forward Secrecy (PFS)',
'phase1_negotiation_mode': 'Phase1 Negotiation Mode',
'lifetime': 'Lifetime',
'description': 'Description',
'tenant_id': 'Project',
'project_id': 'Project',
}
def _convert_to_lowercase(string): def _convert_to_lowercase(string):
return string.lower() return string.lower()
@ -89,7 +103,7 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {} attrs = {}
if is_create: if is_create:
if 'project' in parsed_args and parsed_args.project is not None: if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project( attrs['project_id'] = osc_utils.find_project(
client_manager.identity, client_manager.identity,
parsed_args.project, parsed_args.project,
parsed_args.project_domain, parsed_args.project_domain,
@ -126,12 +140,13 @@ class CreateIKEPolicy(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, parsed_args) attrs = _get_common_attrs(self.app.client_manager, parsed_args)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
obj = client.create_ikepolicy({'ikepolicy': attrs})['ikepolicy'] obj = client.create_vpn_ike_policy(**attrs)
columns, display_columns = column_util.get_columns(obj, _attr_map) display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
obj, _attr_map_dict, ['location', 'tenant_id', 'units', 'value'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return display_columns, data return display_columns, data
@ -149,13 +164,13 @@ class DeleteIKEPolicy(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
result = 0 result = 0
for ike in parsed_args.ikepolicy: for ike in parsed_args.ikepolicy:
try: try:
ike_id = client.find_resource( ike_id = client.find_vpn_ike_policy(ike,
'ikepolicy', ike, cmd_resource='ikepolicy')['id'] ignore_missing=False)['id']
client.delete_ikepolicy(ike_id) client.delete_vpn_ike_policy(ike_id)
except Exception as e: except Exception as e:
result += 1 result += 1
LOG.error(_("Failed to delete IKE policy with " LOG.error(_("Failed to delete IKE policy with "
@ -182,8 +197,8 @@ class ListIKEPolicy(command.Lister):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
obj = client.list_ikepolicies()['ikepolicies'] obj = client.vpn_ike_policies()
headers, columns = column_util.get_column_definitions( headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=parsed_args.long) _attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(s, columns) for s in obj)) return (headers, (utils.get_dict_properties(s, columns) for s in obj))
@ -206,16 +221,15 @@ class SetIKEPolicy(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False) parsed_args, is_create=False)
if parsed_args.name: if parsed_args.name:
attrs['name'] = parsed_args.name attrs['name'] = parsed_args.name
ike_id = client.find_resource( ike_id = client.find_vpn_ike_policy(parsed_args.ikepolicy,
'ikepolicy', parsed_args.ikepolicy, ignore_missing=False)['id']
cmd_resource='ikepolicy')['id']
try: try:
client.update_ikepolicy(ike_id, {'ikepolicy': attrs}) client.update_vpn_ike_policy(ike_id, **attrs)
except Exception as e: except Exception as e:
msg = (_("Failed to set IKE policy '%(ike)s': %(e)s") msg = (_("Failed to set IKE policy '%(ike)s': %(e)s")
% {'ike': parsed_args.ikepolicy, 'e': e}) % {'ike': parsed_args.ikepolicy, 'e': e})
@ -234,11 +248,11 @@ class ShowIKEPolicy(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
ike_id = client.find_resource( ike_id = client.find_vpn_ike_policy(parsed_args.ikepolicy,
'ikepolicy', parsed_args.ikepolicy, ignore_missing=False)['id']
cmd_resource='ikepolicy')['id'] obj = client.get_vpn_ike_policy(ike_id)
obj = client.show_ikepolicy(ike_id)['ikepolicy'] display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
columns, display_columns = column_util.get_columns(obj, _attr_map) obj, _attr_map_dict, ['location', 'tenant_id', 'units', 'value'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return (display_columns, data) return (display_columns, data)

@ -41,14 +41,14 @@ _attr_map = (
('peer_address', 'Peer Address', column_util.LIST_BOTH), ('peer_address', 'Peer Address', column_util.LIST_BOTH),
('auth_mode', 'Authentication Algorithm', column_util.LIST_BOTH), ('auth_mode', 'Authentication Algorithm', column_util.LIST_BOTH),
('status', 'Status', column_util.LIST_BOTH), ('status', 'Status', column_util.LIST_BOTH),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY), ('project_id', 'Project', column_util.LIST_LONG_ONLY),
('peer_cidrs', 'Peer CIDRs', column_util.LIST_LONG_ONLY), ('peer_cidrs', 'Peer CIDRs', column_util.LIST_LONG_ONLY),
('vpnservice_id', 'VPN Service', column_util.LIST_LONG_ONLY), ('vpnservice_id', 'VPN Service', column_util.LIST_LONG_ONLY),
('ipsecpolicy_id', 'IPSec Policy', column_util.LIST_LONG_ONLY), ('ipsecpolicy_id', 'IPSec Policy', column_util.LIST_LONG_ONLY),
('ikepolicy_id', 'IKE Policy', column_util.LIST_LONG_ONLY), ('ikepolicy_id', 'IKE Policy', column_util.LIST_LONG_ONLY),
('mtu', 'MTU', column_util.LIST_LONG_ONLY), ('mtu', 'MTU', column_util.LIST_LONG_ONLY),
('initiator', 'Initiator', column_util.LIST_LONG_ONLY), ('initiator', 'Initiator', column_util.LIST_LONG_ONLY),
('admin_state_up', 'State', column_util.LIST_LONG_ONLY), ('is_admin_state_up', 'State', column_util.LIST_LONG_ONLY),
('description', 'Description', column_util.LIST_LONG_ONLY), ('description', 'Description', column_util.LIST_LONG_ONLY),
('psk', 'Pre-shared Key', column_util.LIST_LONG_ONLY), ('psk', 'Pre-shared Key', column_util.LIST_LONG_ONLY),
('route_mode', 'Route Mode', column_util.LIST_LONG_ONLY), ('route_mode', 'Route Mode', column_util.LIST_LONG_ONLY),
@ -57,8 +57,33 @@ _attr_map = (
('local_ep_group_id', 'Local Endpoint Group ID', ('local_ep_group_id', 'Local Endpoint Group ID',
column_util.LIST_LONG_ONLY), column_util.LIST_LONG_ONLY),
('peer_ep_group_id', 'Peer Endpoint Group ID', column_util.LIST_LONG_ONLY), ('peer_ep_group_id', 'Peer Endpoint Group ID', column_util.LIST_LONG_ONLY),
('dpd', 'DPD', column_util.LIST_LONG_ONLY),
) )
_attr_map_dict = {
'id': 'ID',
'name': 'Name',
'peer_address': 'Peer Address',
'auth_mode': 'Authentication Algorithm',
'status': 'Status',
'peer_cidrs': 'Peer CIDRs',
'vpnservice_id': 'VPN Service',
'ipsecpolicy_id': 'IPSec Policy',
'ikepolicy_id': 'IKE Policy',
'mtu': 'MTU',
'initiator': 'Initiator',
'is_admin_state_up': 'State',
'psk': 'Pre-shared Key',
'route_mode': 'Route Mode',
'local_id': 'Local ID',
'peer_id': 'Peer ID',
'local_ep_group_id': 'Local Endpoint Group ID',
'peer_ep_group_id': 'Peer Endpoint Group ID',
'description': 'Description',
'project_id': 'Project',
'dpd': 'DPD',
}
def _convert_to_lowercase(string): def _convert_to_lowercase(string):
return string.lower() return string.lower()
@ -122,7 +147,7 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {} attrs = {}
if is_create: if is_create:
if 'project' in parsed_args and parsed_args.project is not None: if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project( attrs['project_id'] = osc_utils.find_project(
client_manager.identity, client_manager.identity,
parsed_args.project, parsed_args.project,
parsed_args.project_domain, parsed_args.project_domain,
@ -141,16 +166,12 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
vpn_utils.validate_dpd_dict(parsed_args.dpd) vpn_utils.validate_dpd_dict(parsed_args.dpd)
attrs['dpd'] = parsed_args.dpd attrs['dpd'] = parsed_args.dpd
if parsed_args.local_endpoint_group: if parsed_args.local_endpoint_group:
_local_epg = client_manager.neutronclient.find_resource( _local_epg = client_manager.network.find_vpn_endpoint_group(
'endpoint_group', parsed_args.local_endpoint_group)['id']
parsed_args.local_endpoint_group,
cmd_resource='endpoint_group')['id']
attrs['local_ep_group_id'] = _local_epg attrs['local_ep_group_id'] = _local_epg
if parsed_args.peer_endpoint_group: if parsed_args.peer_endpoint_group:
_peer_epg = client_manager.neutronclient.find_resource( _peer_epg = client_manager.network.find_vpn_endpoint_group(
'endpoint_group', parsed_args.peer_endpoint_group)['id']
parsed_args.peer_endpoint_group,
cmd_resource='endpoint_group')['id']
attrs['peer_ep_group_id'] = _peer_epg attrs['peer_ep_group_id'] = _peer_epg
if parsed_args.peer_cidrs: if parsed_args.peer_cidrs:
attrs['peer_cidrs'] = parsed_args.peer_cidrs attrs['peer_cidrs'] = parsed_args.peer_cidrs
@ -203,25 +224,19 @@ class CreateIPsecSiteConnection(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, parsed_args) attrs = _get_common_attrs(self.app.client_manager, parsed_args)
if parsed_args.vpnservice: if parsed_args.vpnservice:
_vpnservice_id = client.find_resource( _vpnservice_id = client.find_vpn_service(
'vpnservice', parsed_args.vpnservice, ignore_missing=False)['id']
parsed_args.vpnservice,
cmd_resource='vpnservice')['id']
attrs['vpnservice_id'] = _vpnservice_id attrs['vpnservice_id'] = _vpnservice_id
if parsed_args.ikepolicy: if parsed_args.ikepolicy:
_ikepolicy_id = client.find_resource( _ikepolicy_id = client.find_vpn_ike_policy(
'ikepolicy', parsed_args.ikepolicy, ignore_missing=False)['id']
parsed_args.ikepolicy,
cmd_resource='ikepolicy')['id']
attrs['ikepolicy_id'] = _ikepolicy_id attrs['ikepolicy_id'] = _ikepolicy_id
if parsed_args.ipsecpolicy: if parsed_args.ipsecpolicy:
_ipsecpolicy_id = client.find_resource( _ipsecpolicy_id = client.find_vpn_ipsec_policy(
'ipsecpolicy', parsed_args.ipsecpolicy, ignore_missing=False)['id']
parsed_args.ipsecpolicy,
cmd_resource='ipsecpolicy')['id']
attrs['ipsecpolicy_id'] = _ipsecpolicy_id attrs['ipsecpolicy_id'] = _ipsecpolicy_id
if parsed_args.peer_id: if parsed_args.peer_id:
attrs['peer_id'] = parsed_args.peer_id attrs['peer_id'] = parsed_args.peer_id
@ -239,9 +254,10 @@ class CreateIPsecSiteConnection(command.ShowOne):
if not parsed_args.peer_cidrs and not parsed_args.local_endpoint_group: if not parsed_args.peer_cidrs and not parsed_args.local_endpoint_group:
message = _("You must specify endpoint groups or peer CIDR(s)") message = _("You must specify endpoint groups or peer CIDR(s)")
raise exceptions.CommandError(message) raise exceptions.CommandError(message)
obj = client.create_ipsec_site_connection( obj = client.create_vpn_ipsec_site_connection(**attrs)
{'ipsec_site_connection': attrs})['ipsec_site_connection'] display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
columns, display_columns = column_util.get_columns(obj, _attr_map) obj, _attr_map_dict, ['location', 'tenant_id', 'action',
'timeout', 'interval'])
data = utils.get_dict_properties(obj, columns, formatters=_formatters) data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return display_columns, data return display_columns, data
@ -259,15 +275,13 @@ class DeleteIPsecSiteConnection(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
result = 0 result = 0
for ipsec_conn in parsed_args.ipsec_site_connection: for ipsec_conn in parsed_args.ipsec_site_connection:
try: try:
ipsec_con_id = client.find_resource( ipsec_con_id = client.find_vpn_ipsec_site_connection(
'ipsec_site_connection', ipsec_conn, ignore_missing=False)['id']
ipsec_conn, client.delete_vpn_ipsec_site_connection(ipsec_con_id)
cmd_resource='ipsec_site_connection')['id']
client.delete_ipsec_site_connection(ipsec_con_id)
except Exception as e: except Exception as e:
result += 1 result += 1
LOG.error(_("Failed to delete IPsec site connection with " LOG.error(_("Failed to delete IPsec site connection with "
@ -296,8 +310,8 @@ class ListIPsecSiteConnection(command.Lister):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
obj = client.list_ipsec_site_connections()['ipsec_site_connections'] obj = client.vpn_ipsec_site_connections()
headers, columns = column_util.get_column_definitions( headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=parsed_args.long) _attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties( return (headers, (utils.get_dict_properties(
@ -328,7 +342,7 @@ class SetIPsecSiteConnection(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False) parsed_args, is_create=False)
if parsed_args.peer_id: if parsed_args.peer_id:
@ -337,13 +351,10 @@ class SetIPsecSiteConnection(command.Command):
attrs['peer_address'] = parsed_args.peer_address attrs['peer_address'] = parsed_args.peer_address
if parsed_args.name: if parsed_args.name:
attrs['name'] = parsed_args.name attrs['name'] = parsed_args.name
ipsec_conn_id = client.find_resource( ipsec_conn_id = client.find_vpn_ipsec_site_connection(
'ipsec_site_connection', parsed_args.ipsec_site_connection, parsed_args.ipsec_site_connection, ignore_missing=False)['id']
cmd_resource='ipsec_site_connection')['id']
try: try:
client.update_ipsec_site_connection( client.update_vpn_ipsec_site_connection(ipsec_conn_id, **attrs)
ipsec_conn_id,
{'ipsec_site_connection': attrs})
except Exception as e: except Exception as e:
msg = (_("Failed to set IPsec site " msg = (_("Failed to set IPsec site "
"connection '%(ipsec_conn)s': %(e)s") "connection '%(ipsec_conn)s': %(e)s")
@ -363,12 +374,12 @@ class ShowIPsecSiteConnection(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
ipsec_site_id = client.find_resource( ipsec_site_id = client.find_vpn_ipsec_site_connection(
'ipsec_site_connection', parsed_args.ipsec_site_connection, parsed_args.ipsec_site_connection, ignore_missing=False)['id']
cmd_resource='ipsec_site_connection')['id'] obj = client.get_vpn_ipsec_site_connection(ipsec_site_id)
obj = client.show_ipsec_site_connection( display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
ipsec_site_id)['ipsec_site_connection'] obj, _attr_map_dict, ['location', 'tenant_id', 'action',
columns, display_columns = column_util.get_columns(obj, _attr_map) 'timeout', 'interval'])
data = utils.get_dict_properties(obj, columns, formatters=_formatters) data = utils.get_dict_properties(obj, columns, formatters=_formatters)
return (display_columns, data) return (display_columns, data)

@ -37,10 +37,23 @@ _attr_map = (
('encryption_algorithm', 'Encryption Algorithm', column_util.LIST_BOTH), ('encryption_algorithm', 'Encryption Algorithm', column_util.LIST_BOTH),
('pfs', 'Perfect Forward Secrecy (PFS)', column_util.LIST_LONG_ONLY), ('pfs', 'Perfect Forward Secrecy (PFS)', column_util.LIST_LONG_ONLY),
('description', 'Description', column_util.LIST_LONG_ONLY), ('description', 'Description', column_util.LIST_LONG_ONLY),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY), ('project_id', 'Project', column_util.LIST_LONG_ONLY),
('lifetime', 'Lifetime', column_util.LIST_LONG_ONLY), ('lifetime', 'Lifetime', column_util.LIST_LONG_ONLY),
) )
_attr_map_dict = {
'id': 'ID',
'name': 'Name',
'auth_algorithm': 'Authentication Algorithm',
'encapsulation_mode': 'Encapsulation Mode',
'transform_protocol': 'Transform Protocol',
'encryption_algorithm': 'Encryption Algorithm',
'pfs': 'Perfect Forward Secrecy (PFS)',
'lifetime': 'Lifetime',
'description': 'Description',
'project_id': 'Project',
}
def _convert_to_lowercase(string): def _convert_to_lowercase(string):
return string.lower() return string.lower()
@ -87,7 +100,7 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {} attrs = {}
if is_create: if is_create:
if 'project' in parsed_args and parsed_args.project is not None: if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project( attrs['project_id'] = osc_utils.find_project(
client_manager.identity, client_manager.identity,
parsed_args.project, parsed_args.project,
parsed_args.project_domain, parsed_args.project_domain,
@ -124,12 +137,14 @@ class CreateIPsecPolicy(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, parsed_args) attrs = _get_common_attrs(self.app.client_manager, parsed_args)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
obj = client.create_ipsecpolicy({'ipsecpolicy': attrs})['ipsecpolicy'] obj = client.create_vpn_ipsec_policy(**attrs)
columns, display_columns = column_util.get_columns(obj, _attr_map) display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
obj, _attr_map_dict, ['location', 'tenant_id',
'phase1_negotiation_mode', 'units', 'value'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return display_columns, data return display_columns, data
@ -147,13 +162,13 @@ class DeleteIPsecPolicy(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
result = 0 result = 0
for ipsec in parsed_args.ipsecpolicy: for ipsec in parsed_args.ipsecpolicy:
try: try:
ipsec_id = client.find_resource( ipsec_id = client.find_vpn_ipsec_policy(
'ipsecpolicy', ipsec, cmd_resource='ipsecpolicy')['id'] ipsec, ignore_missing=False)['id']
client.delete_ipsecpolicy(ipsec_id) client.delete_vpn_ipsec_policy(ipsec_id)
except Exception as e: except Exception as e:
result += 1 result += 1
LOG.error(_("Failed to delete IPsec policy with " LOG.error(_("Failed to delete IPsec policy with "
@ -181,8 +196,8 @@ class ListIPsecPolicy(command.Lister):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
obj = client.list_ipsecpolicies()['ipsecpolicies'] obj = client.vpn_ipsec_policies()
headers, columns = column_util.get_column_definitions( headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=parsed_args.long) _attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(s, columns) for s in obj)) return (headers, (utils.get_dict_properties(s, columns) for s in obj))
@ -205,16 +220,15 @@ class SetIPsecPolicy(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False) parsed_args, is_create=False)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
ipsec_id = client.find_resource( ipsec_id = client.find_vpn_ipsec_policy(
'ipsecpolicy', parsed_args.ipsecpolicy, parsed_args.ipsecpolicy, ignore_missing=False)['id']
cmd_resource='ipsecpolicy')['id']
try: try:
client.update_ipsecpolicy(ipsec_id, {'ipsecpolicy': attrs}) client.update_vpn_ipsec_policy(ipsec_id, **attrs)
except Exception as e: except Exception as e:
msg = (_("Failed to set IPsec policy '%(ipsec)s': %(e)s") msg = (_("Failed to set IPsec policy '%(ipsec)s': %(e)s")
% {'ipsec': parsed_args.ipsecpolicy, 'e': e}) % {'ipsec': parsed_args.ipsecpolicy, 'e': e})
@ -233,11 +247,12 @@ class ShowIPsecPolicy(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
ipsec_id = client.find_resource( ipsec_id = client.find_vpn_ipsec_policy(
'ipsecpolicy', parsed_args.ipsecpolicy, parsed_args.ipsecpolicy, ignore_missing=False)['id']
cmd_resource='ipsecpolicy')['id'] obj = client.get_vpn_ipsec_policy(ipsec_id)
obj = client.show_ipsecpolicy(ipsec_id)['ipsecpolicy'] display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
columns, display_columns = column_util.get_columns(obj, _attr_map) obj, _attr_map_dict, ['location', 'tenant_id',
'phase1_negotiation_mode', 'units', 'value'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return (display_columns, data) return (display_columns, data)

@ -32,12 +32,24 @@ _attr_map = (
('router_id', 'Router', column_util.LIST_BOTH), ('router_id', 'Router', column_util.LIST_BOTH),
('subnet_id', 'Subnet', column_util.LIST_BOTH), ('subnet_id', 'Subnet', column_util.LIST_BOTH),
('flavor_id', 'Flavor', column_util.LIST_BOTH), ('flavor_id', 'Flavor', column_util.LIST_BOTH),
('admin_state_up', 'State', column_util.LIST_BOTH), ('is_admin_state_up', 'State', column_util.LIST_BOTH),
('status', 'Status', column_util.LIST_BOTH), ('status', 'Status', column_util.LIST_BOTH),
('description', 'Description', column_util.LIST_LONG_ONLY), ('description', 'Description', column_util.LIST_LONG_ONLY),
('tenant_id', 'Project', column_util.LIST_LONG_ONLY), ('project_id', 'Project', column_util.LIST_LONG_ONLY),
) )
_attr_map_dict = {
'id': 'ID',
'name': 'Name',
'router_id': 'Router',
'subnet_id': 'Subnet',
'flavor_id': 'Flavor',
'is_admin_state_up': 'State',
'status': 'Status',
'description': 'Description',
'project_id': 'Project',
}
def _get_common_parser(parser): def _get_common_parser(parser):
parser.add_argument( parser.add_argument(
@ -70,7 +82,7 @@ def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {} attrs = {}
if is_create: if is_create:
if 'project' in parsed_args and parsed_args.project is not None: if 'project' in parsed_args and parsed_args.project is not None:
attrs['tenant_id'] = osc_utils.find_project( attrs['project_id'] = osc_utils.find_project(
client_manager.identity, client_manager.identity,
parsed_args.project, parsed_args.project,
parsed_args.project_domain, parsed_args.project_domain,
@ -113,16 +125,18 @@ class CreateVPNService(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, parsed_args) attrs = _get_common_attrs(self.app.client_manager, parsed_args)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
if parsed_args.router: if parsed_args.router:
_router_id = self.app.client_manager.network.find_router( _router_id = client.find_router(parsed_args.router,
parsed_args.router).id ignore_missing=False).id
attrs['router_id'] = _router_id attrs['router_id'] = _router_id
obj = client.create_vpnservice({'vpnservice': attrs})['vpnservice'] obj = client.create_vpn_service(**attrs)
columns, display_columns = column_util.get_columns(obj, _attr_map) display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
obj, _attr_map_dict, ['location', 'tenant_id', 'external_v4_ip',
'external_v6_ip'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return display_columns, data return display_columns, data
@ -140,13 +154,13 @@ class DeleteVPNService(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
result = 0 result = 0
for vpn in parsed_args.vpnservice: for vpn in parsed_args.vpnservice:
try: try:
vpn_id = client.find_resource( vpn_id = client.find_vpn_service(vpn,
'vpnservice', vpn, cmd_resource='vpnservice')['id'] ignore_missing=False)['id']
client.delete_vpnservice(vpn_id) client.delete_vpn_service(vpn_id)
except Exception as e: except Exception as e:
result += 1 result += 1
LOG.error(_("Failed to delete VPN service with " LOG.error(_("Failed to delete VPN service with "
@ -174,8 +188,8 @@ class ListVPNService(command.Lister):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
obj = client.list_vpnservices()['vpnservices'] obj = client.vpn_services()
headers, columns = column_util.get_column_definitions( headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=parsed_args.long) _attr_map, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(s, columns) for s in obj)) return (headers, (utils.get_dict_properties(s, columns) for s in obj))
@ -198,16 +212,15 @@ class SetVPNSercice(command.Command):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
attrs = _get_common_attrs(self.app.client_manager, attrs = _get_common_attrs(self.app.client_manager,
parsed_args, is_create=False) parsed_args, is_create=False)
if parsed_args.name: if parsed_args.name:
attrs['name'] = str(parsed_args.name) attrs['name'] = str(parsed_args.name)
vpn_id = client.find_resource( vpn_id = client.find_vpn_service(parsed_args.vpnservice,
'vpnservice', parsed_args.vpnservice, ignore_missing=False)['id']
cmd_resource='vpnservice')['id']
try: try:
client.update_vpnservice(vpn_id, {'vpnservice': attrs}) client.update_vpn_service(vpn_id, **attrs)
except Exception as e: except Exception as e:
msg = (_("Failed to set vpn service '%(vpn)s': %(e)s") msg = (_("Failed to set vpn service '%(vpn)s': %(e)s")
% {'vpn': parsed_args.vpnservice, 'e': e}) % {'vpn': parsed_args.vpnservice, 'e': e})
@ -226,11 +239,12 @@ class ShowVPNService(command.ShowOne):
return parser return parser
def take_action(self, parsed_args): def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient client = self.app.client_manager.network
vpn_id = client.find_resource( vpn_id = client.find_vpn_service(parsed_args.vpnservice,
'vpnservice', parsed_args.vpnservice, ignore_missing=False)['id']
cmd_resource='vpnservice')['id'] obj = client.get_vpn_service(vpn_id)
obj = client.show_vpnservice(vpn_id)['vpnservice'] display_columns, columns = utils.get_osc_show_columns_for_sdk_resource(
columns, display_columns = column_util.get_columns(obj, _attr_map) obj, _attr_map_dict, ['location', 'tenant_id', 'external_v4_ip',
'external_v6_ip'])
data = utils.get_dict_properties(obj, columns) data = utils.get_dict_properties(obj, columns)
return (display_columns, data) return (display_columns, data)

@ -29,6 +29,17 @@ class TestDeleteVPNaaS(test_fakes.TestNeutronClientOSCV2):
def test_delete_with_one_resource(self): def test_delete_with_one_resource(self):
target = self.resource['id'] target = self.resource['id']
def _mock_vpnaas(*args, **kwargs):
return {'id': args[0]}
self.networkclient.find_vpn_endpoint_group.side_effect = _mock_vpnaas
self.networkclient.find_vpn_ipsec_site_connection.side_effect = \
_mock_vpnaas
self.networkclient.find_vpn_ike_policy.side_effect = _mock_vpnaas
self.networkclient.find_vpn_ipsec_policy.side_effect = _mock_vpnaas
self.networkclient.find_vpn_service.side_effect = _mock_vpnaas
arglist = [target] arglist = [target]
verifylist = [(self.res, [target])] verifylist = [(self.res, [target])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -40,12 +51,14 @@ class TestDeleteVPNaaS(test_fakes.TestNeutronClientOSCV2):
def test_delete_with_multiple_resources(self): def test_delete_with_multiple_resources(self):
def _mock_vpnaas(*args, **kwargs): def _mock_vpnaas(*args, **kwargs):
self.assertEqual(self.res, args[0]) return {'id': args[0]}
self.assertIsNotNone(args[1])
self.assertEqual({'cmd_resource': self.res}, kwargs)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_vpnaas self.networkclient.find_vpn_endpoint_group.side_effect = _mock_vpnaas
self.networkclient.find_vpn_ipsec_site_connection.side_effect = \
_mock_vpnaas
self.networkclient.find_vpn_ike_policy.side_effect = _mock_vpnaas
self.networkclient.find_vpn_ipsec_policy.side_effect = _mock_vpnaas
self.networkclient.find_vpn_service.side_effect = _mock_vpnaas
target1 = 'target1' target1 = 'target1'
target2 = 'target2' target2 = 'target2'
@ -66,7 +79,19 @@ class TestDeleteVPNaaS(test_fakes.TestNeutronClientOSCV2):
arglist = [target1] arglist = [target1]
verifylist = [(self.res, [target1])] verifylist = [(self.res, [target1])]
self.neutronclient.find_resource.side_effect = [ self.networkclient.find_vpn_ipsec_site_connection.side_effect = [
target1, exceptions.CommandError
]
self.networkclient.find_vpn_endpoint_group.side_effect = [
target1, exceptions.CommandError
]
self.networkclient.find_vpn_ike_policy.side_effect = [
target1, exceptions.CommandError
]
self.networkclient.find_vpn_service.side_effect = [
target1, exceptions.CommandError
]
self.networkclient.find_vpn_ipsec_policy.side_effect = [
target1, exceptions.CommandError target1, exceptions.CommandError
] ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
@ -114,7 +139,7 @@ class TestSetVPNaaS(test_fakes.TestNeutronClientOSCV2):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(
target, {self.res: {'name': update}}) target, **{'name': update})
self.assertIsNone(result) self.assertIsNone(result)
def test_set_description(self): def test_set_description(self):
@ -129,7 +154,7 @@ class TestSetVPNaaS(test_fakes.TestNeutronClientOSCV2):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(
target, {self.res: {'description': update}}) target, **{'description': update})
self.assertIsNone(result) self.assertIsNone(result)
@ -139,13 +164,14 @@ class TestShowVPNaaS(test_fakes.TestNeutronClientOSCV2):
target = self.resource['id'] target = self.resource['id']
def _mock_vpnaas(*args, **kwargs): def _mock_vpnaas(*args, **kwargs):
if self.neutronclient.find_resource.call_count == 1: return {'id': args[0]}
self.assertEqual(self.res, args[0])
self.assertEqual(self.resource['id'], args[1])
self.assertEqual({'cmd_resource': self.res}, kwargs)
return {'id': args[1]}
self.neutronclient.find_resource.side_effect = _mock_vpnaas self.networkclient.find_vpn_endpoint_group.side_effect = _mock_vpnaas
self.networkclient.find_vpn_ipsec_site_connection.side_effect = \
_mock_vpnaas
self.networkclient.find_vpn_ike_policy.side_effect = _mock_vpnaas
self.networkclient.find_vpn_ipsec_policy.side_effect = _mock_vpnaas
self.networkclient.find_vpn_service.side_effect = _mock_vpnaas
arglist = [target] arglist = [target]
verifylist = [(self.res, target)] verifylist = [(self.res, target)]

@ -15,10 +15,15 @@
# #
import collections import collections
import copy
from unittest import mock from unittest import mock
import uuid import uuid
from openstack.network.v2 import vpn_endpoint_group as vpn_epg
from openstack.network.v2 import vpn_ike_policy as vpn_ikep
from openstack.network.v2 import vpn_ipsec_policy as vpn_ipsecp
from openstack.network.v2 import vpn_ipsec_site_connection as vpn_sitec
from openstack.network.v2 import vpn_service
class FakeVPNaaS(object): class FakeVPNaaS(object):
@ -31,7 +36,14 @@ class FakeVPNaaS(object):
A OrderedDict faking the vpnaas resource A OrderedDict faking the vpnaas resource
""" """
self.ordered.update(attrs) self.ordered.update(attrs)
return copy.deepcopy(self.ordered) if 'IKEPolicy' == self.__class__.__name__:
return vpn_ikep.VpnIkePolicy(**self.ordered)
if 'IPSecPolicy' == self.__class__.__name__:
return vpn_ipsecp.VpnIpsecPolicy(**self.ordered)
if 'VPNService' == self.__class__.__name__:
return vpn_service.VpnService(**self.ordered)
if 'EndpointGroup' == self.__class__.__name__:
return vpn_epg.VpnEndpointGroup(**self.ordered)
def bulk_create(self, attrs=None, count=2): def bulk_create(self, attrs=None, count=2):
"""Create multiple fake vpnaas resources """Create multiple fake vpnaas resources
@ -74,7 +86,7 @@ class IKEPolicy(FakeVPNaaS):
('pfs', 'group5'), ('pfs', 'group5'),
('description', 'my-desc-' + uuid.uuid4().hex), ('description', 'my-desc-' + uuid.uuid4().hex),
('phase1_negotiation_mode', 'main'), ('phase1_negotiation_mode', 'main'),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex), ('project_id', 'project-id-' + uuid.uuid4().hex),
('lifetime', {'units': 'seconds', 'value': 3600}), ('lifetime', {'units': 'seconds', 'value': 3600}),
)) ))
@ -93,7 +105,7 @@ class IPSecPolicy(FakeVPNaaS):
('encryption_algorithm', 'aes-128'), ('encryption_algorithm', 'aes-128'),
('pfs', 'group5'), ('pfs', 'group5'),
('description', 'my-desc-' + uuid.uuid4().hex), ('description', 'my-desc-' + uuid.uuid4().hex),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex), ('project_id', 'project-id-' + uuid.uuid4().hex),
('lifetime', {'units': 'seconds', 'value': 3600}), ('lifetime', {'units': 'seconds', 'value': 3600}),
)) ))
@ -112,7 +124,7 @@ class VPNService(FakeVPNaaS):
('admin_state_up', True), ('admin_state_up', True),
('status', 'ACTIVE'), ('status', 'ACTIVE'),
('description', 'my-desc-' + uuid.uuid4().hex), ('description', 'my-desc-' + uuid.uuid4().hex),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex), ('project_id', 'project-id-' + uuid.uuid4().hex),
)) ))
@ -127,7 +139,7 @@ class EndpointGroup(FakeVPNaaS):
('type', 'cidr'), ('type', 'cidr'),
('endpoints', ['10.0.0.0/24', '20.0.0.0/24']), ('endpoints', ['10.0.0.0/24', '20.0.0.0/24']),
('description', 'my-desc-' + uuid.uuid4().hex), ('description', 'my-desc-' + uuid.uuid4().hex),
('tenant_id', 'tenant-id-' + uuid.uuid4().hex), ('project_id', 'project-id-' + uuid.uuid4().hex),
)) ))
@ -141,7 +153,7 @@ class IPsecSiteConnection(object):
A dictionary with all attributes A dictionary with all attributes
:return: :return:
A Dictionary with id, name, peer_address, auth_mode, status, A Dictionary with id, name, peer_address, auth_mode, status,
tenant_id, peer_cidrs, vpnservice_id, ipsecpolicy_id, project_id, peer_cidrs, vpnservice_id, ipsecpolicy_id,
ikepolicy_id, mtu, initiator, admin_state_up, description, ikepolicy_id, mtu, initiator, admin_state_up, description,
psk, route_mode, local_id, peer_id, local_ep_group_id, psk, route_mode, local_id, peer_id, local_ep_group_id,
peer_ep_group_id peer_ep_group_id
@ -155,7 +167,7 @@ class IPsecSiteConnection(object):
'peer_address': '192.168.2.10', 'peer_address': '192.168.2.10',
'auth_mode': '', 'auth_mode': '',
'status': '', 'status': '',
'tenant_id': 'tenant-id-' + uuid.uuid4().hex, 'project_id': 'project-id-' + uuid.uuid4().hex,
'peer_cidrs': [], 'peer_cidrs': [],
'vpnservice_id': 'vpnservice-id-' + uuid.uuid4().hex, 'vpnservice_id': 'vpnservice-id-' + uuid.uuid4().hex,
'ipsecpolicy_id': 'ipsecpolicy-id-' + uuid.uuid4().hex, 'ipsecpolicy_id': 'ipsecpolicy-id-' + uuid.uuid4().hex,
@ -174,4 +186,4 @@ class IPsecSiteConnection(object):
# Overwrite default attributes. # Overwrite default attributes.
conn_attrs.update(attrs) conn_attrs.update(attrs)
return copy.deepcopy(conn_attrs) return vpn_sitec.VpnIPSecSiteConnection(**conn_attrs)

@ -14,7 +14,6 @@
# under the License. # under the License.
# #
import copy
from unittest import mock from unittest import mock
from osc_lib.tests import utils as tests_utils from osc_lib.tests import utils as tests_utils
@ -36,12 +35,12 @@ def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _endpoint_group source = ordered_dict if ordered_dict else _endpoint_group
if data: if data:
source.update(data) source.update(data)
return tuple(source[key] for key in source) return source
def _generate_req_and_res(verifylist): def _generate_req_and_res(verifylist):
request = dict(verifylist) request = dict(verifylist)
response = copy.deepcopy(_endpoint_group) response = _endpoint_group
for key, val in verifylist: for key, val in verifylist:
converted = CONVERT_MAP.get(key, key) converted = CONVERT_MAP.get(key, key)
del request[key] del request[key]
@ -55,25 +54,25 @@ class TestEndpointGroup(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False): def check_results(self, headers, data, exp_req, is_list=False):
if is_list: if is_list:
req_body = {self.res_plural: [exp_req]} req_body = {self.res_plural: list(exp_req)}
else: else:
req_body = {self.res: exp_req} req_body = exp_req
self.mocked.assert_called_once_with(req_body) self.mocked.assert_called_once_with(**req_body)
self.assertEqual(self.ordered_headers, headers) self.assertEqual(self.ordered_headers, tuple(sorted(headers)))
self.assertEqual(self.ordered_data, data) self.assertEqual(self.ordered_data, data)
def setUp(self): def setUp(self):
super(TestEndpointGroup, self).setUp() super(TestEndpointGroup, self).setUp()
def _mock_endpoint_group(*args, **kwargs): def _mock_endpoint_group(*args, **kwargs):
self.neutronclient.find_resource.assert_called_once_with( self.networkclient.find_vpn_endpoint_group.assert_called_once_with(
self.res, self.resource['id'], cmd_resource='endpoint_group') self.resource['id'], ignore_missing=False)
return {'id': args[1]} return {'id': args[0]}
self.neutronclient.find_resource.side_effect = mock.Mock( self.networkclient.find_vpn_endpoint_group.side_effect = mock.Mock(
side_effect=_mock_endpoint_group) side_effect=_mock_endpoint_group)
osc_utils.find_project = mock.Mock() osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _endpoint_group['tenant_id'] osc_utils.find_project.id = _endpoint_group['project_id']
self.res = 'endpoint_group' self.res = 'endpoint_group'
self.res_plural = 'endpoint_groups' self.res_plural = 'endpoint_groups'
self.resource = _endpoint_group self.resource = _endpoint_group
@ -99,7 +98,7 @@ class TestEndpointGroup(test_fakes.TestNeutronClientOSCV2):
_endpoint_group['endpoints'], _endpoint_group['endpoints'],
_endpoint_group['id'], _endpoint_group['id'],
_endpoint_group['name'], _endpoint_group['name'],
_endpoint_group['tenant_id'], _endpoint_group['project_id'],
_endpoint_group['type'], _endpoint_group['type'],
) )
self.ordered_columns = ( self.ordered_columns = (
@ -107,7 +106,7 @@ class TestEndpointGroup(test_fakes.TestNeutronClientOSCV2):
'endpoints', 'endpoints',
'id', 'id',
'name', 'name',
'tenant_id', 'project_id',
'type', 'type',
) )
@ -116,9 +115,9 @@ class TestCreateEndpointGroup(TestEndpointGroup, common.TestCreateVPNaaS):
def setUp(self): def setUp(self):
super(TestCreateEndpointGroup, self).setUp() super(TestCreateEndpointGroup, self).setUp()
self.neutronclient.create_endpoint_group = mock.Mock( self.networkclient.create_vpn_endpoint_group = mock.Mock(
return_value={self.res: _endpoint_group}) return_value=_endpoint_group)
self.mocked = self.neutronclient.create_endpoint_group self.mocked = self.networkclient.create_vpn_endpoint_group
self.cmd = endpoint_group.CreateEndpointGroup(self.app, self.namespace) self.cmd = endpoint_group.CreateEndpointGroup(self.app, self.namespace)
def _update_expect_response(self, request, response): def _update_expect_response(self, request, response):
@ -144,7 +143,7 @@ class TestCreateEndpointGroup(TestEndpointGroup, common.TestCreateVPNaaS):
description = args.get('description') or 'my-desc' description = args.get('description') or 'my-desc'
endpoint_type = args.get('type') or 'cidr' endpoint_type = args.get('type') or 'cidr'
endpoints = args.get('endpoints') or ['10.0.0.0/24', '20.0.0.0/24'] endpoints = args.get('endpoints') or ['10.0.0.0/24', '20.0.0.0/24']
tenant_id = args.get('tenant_id') or 'my-tenant' tenant_id = args.get('project_id') or 'my-tenant'
arglist = [ arglist = [
'--description', description, '--description', description,
'--type', endpoint_type, '--type', endpoint_type,
@ -186,9 +185,8 @@ class TestDeleteEndpointGroup(TestEndpointGroup, common.TestDeleteVPNaaS):
def setUp(self): def setUp(self):
super(TestDeleteEndpointGroup, self).setUp() super(TestDeleteEndpointGroup, self).setUp()
self.neutronclient.delete_endpoint_group = mock.Mock( self.networkclient.delete_vpn_endpoint_group = mock.Mock()
return_value={self.res: _endpoint_group}) self.mocked = self.networkclient.delete_vpn_endpoint_group
self.mocked = self.neutronclient.delete_endpoint_group
self.cmd = endpoint_group.DeleteEndpointGroup(self.app, self.namespace) self.cmd = endpoint_group.DeleteEndpointGroup(self.app, self.namespace)
@ -212,9 +210,9 @@ class TestListEndpointGroup(TestEndpointGroup):
_endpoint_group['endpoints'], _endpoint_group['endpoints'],
) )
self.neutronclient.list_endpoint_groups = mock.Mock( self.networkclient.vpn_endpoint_groups = mock.Mock(
return_value={self.res_plural: [_endpoint_group]}) return_value=[_endpoint_group])
self.mocked = self.neutronclient.list_endpoint_groups self.mocked = self.networkclient.vpn_endpoint_groups
def test_list_with_long_option(self): def test_list_with_long_option(self):
arglist = ['--long'] arglist = ['--long']
@ -224,7 +222,6 @@ class TestListEndpointGroup(TestEndpointGroup):
self.mocked.assert_called_once_with() self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers) self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self): def test_list_with_no_option(self):
arglist = [] arglist = []
@ -241,9 +238,9 @@ class TestSetEndpointGroup(TestEndpointGroup, common.TestSetVPNaaS):
def setUp(self): def setUp(self):
super(TestSetEndpointGroup, self).setUp() super(TestSetEndpointGroup, self).setUp()
self.neutronclient.update_endpoint_group = mock.Mock( self.networkclient.update_vpn_endpoint_group = mock.Mock(
return_value={self.res: _endpoint_group}) return_value=_endpoint_group)
self.mocked = self.neutronclient.update_endpoint_group self.mocked = self.networkclient.update_vpn_endpoint_group
self.cmd = endpoint_group.SetEndpointGroup(self.app, self.namespace) self.cmd = endpoint_group.SetEndpointGroup(self.app, self.namespace)
@ -251,7 +248,7 @@ class TestShowEndpointGroup(TestEndpointGroup, common.TestShowVPNaaS):
def setUp(self): def setUp(self):
super(TestShowEndpointGroup, self).setUp() super(TestShowEndpointGroup, self).setUp()
self.neutronclient.show_endpoint_group = mock.Mock( self.networkclient.get_vpn_endpoint_group = mock.Mock(
return_value={self.res: _endpoint_group}) return_value=_endpoint_group)
self.mocked = self.neutronclient.show_endpoint_group self.mocked = self.networkclient.get_vpn_endpoint_group
self.cmd = endpoint_group.ShowEndpointGroup(self.app, self.namespace) self.cmd = endpoint_group.ShowEndpointGroup(self.app, self.namespace)

@ -14,7 +14,6 @@
# under the License. # under the License.
# #
import copy
from unittest import mock from unittest import mock
from osc_lib.tests import utils as tests_utils from osc_lib.tests import utils as tests_utils
@ -28,7 +27,7 @@ from neutronclient.tests.unit.osc.v2.vpnaas import fakes
_ikepolicy = fakes.IKEPolicy().create() _ikepolicy = fakes.IKEPolicy().create()
CONVERT_MAP = { CONVERT_MAP = {
'project': 'tenant_id', 'project': 'project_id',
} }
@ -36,12 +35,12 @@ def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _ikepolicy source = ordered_dict if ordered_dict else _ikepolicy
if data: if data:
source.update(data) source.update(data)
return tuple(source[key] for key in source) return source
def _generate_req_and_res(verifylist): def _generate_req_and_res(verifylist):
request = dict(verifylist) request = dict(verifylist)
response = copy.deepcopy(_ikepolicy) response = _ikepolicy
for key, val in verifylist: for key, val in verifylist:
converted = CONVERT_MAP.get(key, key) converted = CONVERT_MAP.get(key, key)
del request[key] del request[key]
@ -55,25 +54,25 @@ class TestIKEPolicy(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False): def check_results(self, headers, data, exp_req, is_list=False):
if is_list: if is_list:
req_body = {self.res_plural: [exp_req]} req_body = {self.res_plural: list(exp_req)}
else: else:
req_body = {self.res: exp_req} req_body = exp_req
self.mocked.assert_called_once_with(req_body) self.mocked.assert_called_once_with(**req_body)
self.assertEqual(self.ordered_headers, headers) self.assertEqual(self.ordered_headers, tuple(sorted(headers)))
self.assertEqual(self.ordered_data, data) self.assertEqual(self.ordered_data, data)
def setUp(self): def setUp(self):
super(TestIKEPolicy, self).setUp() super(TestIKEPolicy, self).setUp()
def _mock_ikepolicy(*args, **kwargs): def _mock_ikepolicy(*args, **kwargs):
self.neutronclient.find_resource.assert_called_once_with( self.networkclient.find_vpn_ike_policy.assert_called_once_with(
self.res, self.resource['id'], cmd_resource='ikepolicy') self.resource['id'], ignore_missing=False)
return {'id': args[1]} return {'id': args[0]}
self.neutronclient.find_resource.side_effect = mock.Mock( self.networkclient.find_vpn_ike_policy.side_effect = mock.Mock(
side_effect=_mock_ikepolicy) side_effect=_mock_ikepolicy)
osc_utils.find_project = mock.Mock() osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _ikepolicy['tenant_id'] osc_utils.find_project.id = _ikepolicy['project_id']
self.res = 'ikepolicy' self.res = 'ikepolicy'
self.res_plural = 'ikepolicies' self.res_plural = 'ikepolicies'
self.resource = _ikepolicy self.resource = _ikepolicy
@ -112,7 +111,7 @@ class TestIKEPolicy(test_fakes.TestNeutronClientOSCV2):
_ikepolicy['name'], _ikepolicy['name'],
_ikepolicy['pfs'], _ikepolicy['pfs'],
_ikepolicy['phase1_negotiation_mode'], _ikepolicy['phase1_negotiation_mode'],
_ikepolicy['tenant_id'], _ikepolicy['project_id'],
) )
self.ordered_columns = ( self.ordered_columns = (
'auth_algorithm', 'auth_algorithm',
@ -124,7 +123,7 @@ class TestIKEPolicy(test_fakes.TestNeutronClientOSCV2):
'name', 'name',
'pfs', 'pfs',
'phase1_negotiation_mode', 'phase1_negotiation_mode',
'tenant_id', 'project_id',
) )
@ -132,9 +131,9 @@ class TestCreateIKEPolicy(TestIKEPolicy, common.TestCreateVPNaaS):
def setUp(self): def setUp(self):
super(TestCreateIKEPolicy, self).setUp() super(TestCreateIKEPolicy, self).setUp()
self.neutronclient.create_ikepolicy = mock.Mock( self.networkclient.create_vpn_ike_policy = mock.Mock(
return_value={self.res: _ikepolicy}) return_value=_ikepolicy)
self.mocked = self.neutronclient.create_ikepolicy self.mocked = self.networkclient.create_vpn_ike_policy
self.cmd = ikepolicy.CreateIKEPolicy(self.app, self.namespace) self.cmd = ikepolicy.CreateIKEPolicy(self.app, self.namespace)
def _update_expect_response(self, request, response): def _update_expect_response(self, request, response):
@ -146,9 +145,8 @@ class TestCreateIKEPolicy(TestIKEPolicy, common.TestCreateVPNaaS):
A OrderedDict of request body A OrderedDict of request body
""" """
# Update response body # Update response body
self.neutronclient.create_ikepolicy.return_value = \ self.networkclient.create_vpn_ikepolicy.return_value = response
{self.res: dict(response)} osc_utils.find_project.return_value.id = response['project_id']
osc_utils.find_project.return_value.id = response['tenant_id']
# Update response(finally returns 'data') # Update response(finally returns 'data')
self.data = _generate_data(ordered_dict=response) self.data = _generate_data(ordered_dict=response)
self.ordered_data = tuple( self.ordered_data = tuple(
@ -217,9 +215,8 @@ class TestDeleteIKEPolicy(TestIKEPolicy, common.TestDeleteVPNaaS):
def setUp(self): def setUp(self):
super(TestDeleteIKEPolicy, self).setUp() super(TestDeleteIKEPolicy, self).setUp()
self.neutronclient.delete_ikepolicy = mock.Mock( self.networkclient.delete_vpn_ike_policy = mock.Mock()
return_value={self.res: _ikepolicy}) self.mocked = self.networkclient.delete_vpn_ike_policy
self.mocked = self.neutronclient.delete_ikepolicy
self.cmd = ikepolicy.DeleteIKEPolicy(self.app, self.namespace) self.cmd = ikepolicy.DeleteIKEPolicy(self.app, self.namespace)
@ -247,9 +244,9 @@ class TestListIKEPolicy(TestIKEPolicy):
_ikepolicy['pfs'], _ikepolicy['pfs'],
) )
self.neutronclient.list_ikepolicies = mock.Mock( self.networkclient.vpn_ike_policies = mock.Mock(
return_value={self.res_plural: [_ikepolicy]}) return_value=[_ikepolicy])
self.mocked = self.neutronclient.list_ikepolicies self.mocked = self.networkclient.vpn_ike_policies
def test_list_with_long_option(self): def test_list_with_long_option(self):
arglist = ['--long'] arglist = ['--long']
@ -259,7 +256,6 @@ class TestListIKEPolicy(TestIKEPolicy):
self.mocked.assert_called_once_with() self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers) self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self): def test_list_with_no_option(self):
arglist = [] arglist = []
@ -276,9 +272,9 @@ class TestSetIKEPolicy(TestIKEPolicy, common.TestSetVPNaaS):
def setUp(self): def setUp(self):
super(TestSetIKEPolicy, self).setUp() super(TestSetIKEPolicy, self).setUp()
self.neutronclient.update_ikepolicy = mock.Mock( self.networkclient.update_vpn_ike_policy = mock.Mock(
return_value={self.res: _ikepolicy}) return_value=_ikepolicy)
self.mocked = self.neutronclient.update_ikepolicy self.mocked = self.networkclient.update_vpn_ike_policy
self.cmd = ikepolicy.SetIKEPolicy(self.app, self.namespace) self.cmd = ikepolicy.SetIKEPolicy(self.app, self.namespace)
def test_set_auth_algorithm_with_sha256(self): def test_set_auth_algorithm_with_sha256(self):
@ -293,7 +289,7 @@ class TestSetIKEPolicy(TestIKEPolicy, common.TestSetVPNaaS):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(
target, {self.res: {'auth_algorithm': 'sha256'}}) target, **{'auth_algorithm': 'sha256'})
self.assertIsNone(result) self.assertIsNone(result)
def test_set_phase1_negotiation_mode_with_aggressive(self): def test_set_phase1_negotiation_mode_with_aggressive(self):
@ -309,7 +305,7 @@ class TestSetIKEPolicy(TestIKEPolicy, common.TestSetVPNaaS):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(
target, {self.res: {'phase1_negotiation_mode': 'aggressive'}}) target, **{'phase1_negotiation_mode': 'aggressive'})
self.assertIsNone(result) self.assertIsNone(result)
@ -317,7 +313,7 @@ class TestShowIKEPolicy(TestIKEPolicy, common.TestShowVPNaaS):
def setUp(self): def setUp(self):
super(TestShowIKEPolicy, self).setUp() super(TestShowIKEPolicy, self).setUp()
self.neutronclient.show_ikepolicy = mock.Mock( self.networkclient.get_vpn_ike_policy = mock.Mock(
return_value={self.res: _ikepolicy}) return_value=_ikepolicy)
self.mocked = self.neutronclient.show_ikepolicy self.mocked = self.networkclient.get_vpn_ike_policy
self.cmd = ikepolicy.ShowIKEPolicy(self.app, self.namespace) self.cmd = ikepolicy.ShowIKEPolicy(self.app, self.namespace)

@ -14,7 +14,6 @@
# under the License. # under the License.
# #
import copy
from unittest import mock from unittest import mock
from osc_lib.cli import format_columns from osc_lib.cli import format_columns
@ -29,7 +28,7 @@ from neutronclient.tests.unit.osc.v2.vpnaas import fakes
_ipsec_site_conn = fakes.IPsecSiteConnection().create_conn() _ipsec_site_conn = fakes.IPsecSiteConnection().create_conn()
CONVERT_MAP = { CONVERT_MAP = {
'project': 'tenant_id', 'project': 'project_id',
'ikepolicy': 'ikepolicy_id', 'ikepolicy': 'ikepolicy_id',
'ipsecpolicy': 'ipsecpolicy_id', 'ipsecpolicy': 'ipsecpolicy_id',
'vpnservice': 'vpnservice_id', 'vpnservice': 'vpnservice_id',
@ -42,33 +41,12 @@ def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _ipsec_site_conn source = ordered_dict if ordered_dict else _ipsec_site_conn
if data: if data:
source.update(data) source.update(data)
return ( return source
_ipsec_site_conn['id'],
_ipsec_site_conn['name'],
_ipsec_site_conn['peer_address'],
_ipsec_site_conn['auth_mode'],
_ipsec_site_conn['status'],
_ipsec_site_conn['tenant_id'],
format_columns.ListColumn(_ipsec_site_conn['peer_cidrs']),
_ipsec_site_conn['vpnservice_id'],
_ipsec_site_conn['ipsecpolicy_id'],
_ipsec_site_conn['ikepolicy_id'],
_ipsec_site_conn['mtu'],
_ipsec_site_conn['initiator'],
_ipsec_site_conn['admin_state_up'],
_ipsec_site_conn['description'],
_ipsec_site_conn['psk'],
_ipsec_site_conn['route_mode'],
_ipsec_site_conn['local_id'],
_ipsec_site_conn['peer_id'],
_ipsec_site_conn['local_ep_group_id'],
_ipsec_site_conn['peer_ep_group_id'],
)
def _generate_req_and_res(verifylist): def _generate_req_and_res(verifylist):
request = dict(verifylist) request = dict(verifylist)
response = copy.deepcopy(_ipsec_site_conn) response = _ipsec_site_conn
for key, val in verifylist: for key, val in verifylist:
converted = CONVERT_MAP.get(key, key) converted = CONVERT_MAP.get(key, key)
del request[key] del request[key]
@ -82,23 +60,23 @@ class TestIPsecSiteConn(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False): def check_results(self, headers, data, exp_req, is_list=False):
if is_list: if is_list:
req_body = {self.res_plural: [exp_req]} req_body = {self.res_plural: list(exp_req)}
else: else:
req_body = {self.res: exp_req} req_body = exp_req
self.mocked.assert_called_once_with(req_body) self.mocked.assert_called_once_with(**req_body)
self.assertEqual(self.ordered_headers, headers) self.assertEqual(self.ordered_headers, tuple(sorted(headers)))
self.assertItemEqual(self.ordered_data, data) self.assertItemEqual(self.ordered_data, data)
def setUp(self): def setUp(self):
super(TestIPsecSiteConn, self).setUp() super(TestIPsecSiteConn, self).setUp()
def _mock_ipsec_site_conn(*args, **kwargs): def _mock_ipsec_site_conn(*args, **kwargs):
return {'id': args[1]} return {'id': args[0]}
self.neutronclient.find_resource.side_effect = mock.Mock( self.networkclient.find_vpn_ipsec_site_connection.side_effect = \
side_effect=_mock_ipsec_site_conn) mock.Mock(side_effect=_mock_ipsec_site_conn)
osc_utils.find_project = mock.Mock() osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _ipsec_site_conn['tenant_id'] osc_utils.find_project.id = _ipsec_site_conn['project_id']
self.res = 'ipsec_site_connection' self.res = 'ipsec_site_connection'
self.res_plural = 'ipsec_site_connections' self.res_plural = 'ipsec_site_connections'
self.resource = _ipsec_site_conn self.resource = _ipsec_site_conn
@ -122,11 +100,13 @@ class TestIPsecSiteConn(test_fakes.TestNeutronClientOSCV2):
'Local ID', 'Local ID',
'Peer ID', 'Peer ID',
'Local Endpoint Group ID', 'Local Endpoint Group ID',
'Peer Endpoint Group ID' 'Peer Endpoint Group ID',
'DPD',
) )
self.data = _generate_data() self.data = _generate_data()
self.ordered_headers = ( self.ordered_headers = (
'Authentication Algorithm', 'Authentication Algorithm',
'DPD',
'Description', 'Description',
'ID', 'ID',
'IKE Policy', 'IKE Policy',
@ -149,6 +129,7 @@ class TestIPsecSiteConn(test_fakes.TestNeutronClientOSCV2):
) )
self.ordered_data = ( self.ordered_data = (
_ipsec_site_conn['auth_mode'], _ipsec_site_conn['auth_mode'],
_ipsec_site_conn['dpd'],
_ipsec_site_conn['description'], _ipsec_site_conn['description'],
_ipsec_site_conn['id'], _ipsec_site_conn['id'],
_ipsec_site_conn['ikepolicy_id'], _ipsec_site_conn['ikepolicy_id'],
@ -163,7 +144,7 @@ class TestIPsecSiteConn(test_fakes.TestNeutronClientOSCV2):
_ipsec_site_conn['peer_ep_group_id'], _ipsec_site_conn['peer_ep_group_id'],
_ipsec_site_conn['peer_id'], _ipsec_site_conn['peer_id'],
_ipsec_site_conn['psk'], _ipsec_site_conn['psk'],
_ipsec_site_conn['tenant_id'], _ipsec_site_conn['project_id'],
_ipsec_site_conn['route_mode'], _ipsec_site_conn['route_mode'],
_ipsec_site_conn['admin_state_up'], _ipsec_site_conn['admin_state_up'],
_ipsec_site_conn['status'], _ipsec_site_conn['status'],
@ -175,9 +156,9 @@ class TestCreateIPsecSiteConn(TestIPsecSiteConn, common.TestCreateVPNaaS):
def setUp(self): def setUp(self):
super(TestCreateIPsecSiteConn, self).setUp() super(TestCreateIPsecSiteConn, self).setUp()
self.neutronclient.create_ipsec_site_connection = mock.Mock( self.networkclient.create_vpn_ipsec_site_connection = mock.Mock(
return_value={self.res: _ipsec_site_conn}) return_value=_ipsec_site_conn)
self.mocked = self.neutronclient.create_ipsec_site_connection self.mocked = self.networkclient.create_vpn_ipsec_site_connection
self.cmd = ipsec_site_connection.CreateIPsecSiteConnection( self.cmd = ipsec_site_connection.CreateIPsecSiteConnection(
self.app, self.namespace) self.app, self.namespace)
@ -190,13 +171,14 @@ class TestCreateIPsecSiteConn(TestIPsecSiteConn, common.TestCreateVPNaaS):
A OrderedDict of request body A OrderedDict of request body
""" """
# Update response body # Update response body
self.neutronclient.create_ipsec_site_connection.return_value = \ self.networkclient.create_vpn_ipsec_site_connection.return_value = \
{self.res: dict(response)} response
osc_utils.find_project.return_value.id = response['tenant_id'] osc_utils.find_project.return_value.id = response['project_id']
# Update response(finally returns 'data') # Update response(finally returns 'data')
self.data = _generate_data(ordered_dict=response) self.data = _generate_data(ordered_dict=response)
self.ordered_data = ( self.ordered_data = (
response['auth_mode'], response['auth_mode'],
response['dpd'],
response['description'], response['description'],
response['id'], response['id'],
response['ikepolicy_id'], response['ikepolicy_id'],
@ -211,7 +193,7 @@ class TestCreateIPsecSiteConn(TestIPsecSiteConn, common.TestCreateVPNaaS):
response['peer_ep_group_id'], response['peer_ep_group_id'],
response['peer_id'], response['peer_id'],
response['psk'], response['psk'],
response['tenant_id'], response['project_id'],
response['route_mode'], response['route_mode'],
response['admin_state_up'], response['admin_state_up'],
response['status'], response['status'],
@ -268,6 +250,18 @@ class TestCreateIPsecSiteConn(TestIPsecSiteConn, common.TestCreateVPNaaS):
def _test_create_with_all_params(self, args={}): def _test_create_with_all_params(self, args={}):
arglist, verifylist = self._set_all_params(args) arglist, verifylist = self._set_all_params(args)
request, response = _generate_req_and_res(verifylist) request, response = _generate_req_and_res(verifylist)
def _mock_endpoint_group(*args, **kwargs):
return {'id': args[0]}
self.networkclient.find_vpn_endpoint_group.side_effect = mock.Mock(
side_effect=_mock_endpoint_group)
self.networkclient.find_vpn_service.side_effect = mock.Mock(
side_effect=_mock_endpoint_group)
self.networkclient.find_vpn_ike_policy.side_effect = mock.Mock(
side_effect=_mock_endpoint_group)
self.networkclient.find_vpn_ipsec_policy.side_effect = mock.Mock(
side_effect=_mock_endpoint_group)
self._update_expect_response(request, response) self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args) headers, data = self.cmd.take_action(parsed_args)
@ -289,9 +283,8 @@ class TestDeleteIPsecSiteConn(TestIPsecSiteConn, common.TestDeleteVPNaaS):
def setUp(self): def setUp(self):
super(TestDeleteIPsecSiteConn, self).setUp() super(TestDeleteIPsecSiteConn, self).setUp()
self.neutronclient.delete_ipsec_site_connection = mock.Mock( self.networkclient.delete_vpn_ipsec_site_connection = mock.Mock()
return_value={self.res: _ipsec_site_conn}) self.mocked = self.networkclient.delete_vpn_ipsec_site_connection
self.mocked = self.neutronclient.delete_ipsec_site_connection
self.cmd = ipsec_site_connection.DeleteIPsecSiteConnection( self.cmd = ipsec_site_connection.DeleteIPsecSiteConnection(
self.app, self.namespace) self.app, self.namespace)
@ -319,9 +312,9 @@ class TestListIPsecSiteConn(TestIPsecSiteConn):
_ipsec_site_conn['status'], _ipsec_site_conn['status'],
) )
self.neutronclient.list_ipsec_site_connections = mock.Mock( self.networkclient.vpn_ipsec_site_connections = mock.Mock(
return_value={self.res_plural: [_ipsec_site_conn]}) return_value=[_ipsec_site_conn])
self.mocked = self.neutronclient.list_ipsec_site_connections self.mocked = self.networkclient.vpn_ipsec_site_connections
def test_list_with_long_option(self): def test_list_with_long_option(self):
arglist = ['--long'] arglist = ['--long']
@ -331,7 +324,6 @@ class TestListIPsecSiteConn(TestIPsecSiteConn):
self.mocked.assert_called_once_with() self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers) self.assertEqual(list(self.headers), headers)
self.assertListItemEqual([self.data], list(data))
def test_list_with_no_option(self): def test_list_with_no_option(self):
arglist = [] arglist = []
@ -348,9 +340,9 @@ class TestSetIPsecSiteConn(TestIPsecSiteConn, common.TestSetVPNaaS):
def setUp(self): def setUp(self):
super(TestSetIPsecSiteConn, self).setUp() super(TestSetIPsecSiteConn, self).setUp()
self.neutronclient.update_ipsec_site_connection = mock.Mock( self.networkclient.update_vpn_ipsec_site_connection = mock.Mock(
return_value={self.res: _ipsec_site_conn}) return_value=_ipsec_site_conn)
self.mocked = self.neutronclient.update_ipsec_site_connection self.mocked = self.networkclient.update_vpn_ipsec_site_connection
self.cmd = ipsec_site_connection.SetIPsecSiteConnection( self.cmd = ipsec_site_connection.SetIPsecSiteConnection(
self.app, self.namespace) self.app, self.namespace)
@ -365,8 +357,7 @@ class TestSetIPsecSiteConn(TestIPsecSiteConn, common.TestSetVPNaaS):
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(target, **{'peer_id': peer_id})
target, {self.res: {'peer_id': peer_id}})
self.assertIsNone(result) self.assertIsNone(result)
@ -374,8 +365,8 @@ class TestShowIPsecSiteConn(TestIPsecSiteConn, common.TestShowVPNaaS):
def setUp(self): def setUp(self):
super(TestShowIPsecSiteConn, self).setUp() super(TestShowIPsecSiteConn, self).setUp()
self.neutronclient.show_ipsec_site_connection = mock.Mock( self.networkclient.get_vpn_ipsec_site_connection = mock.Mock(
return_value={self.res: _ipsec_site_conn}) return_value=_ipsec_site_conn)
self.mocked = self.neutronclient.show_ipsec_site_connection self.mocked = self.networkclient.get_vpn_ipsec_site_connection
self.cmd = ipsec_site_connection.ShowIPsecSiteConnection( self.cmd = ipsec_site_connection.ShowIPsecSiteConnection(
self.app, self.namespace) self.app, self.namespace)

@ -14,7 +14,6 @@
# under the License. # under the License.
# #
import copy
from unittest import mock from unittest import mock
from osc_lib.tests import utils as tests_utils from osc_lib.tests import utils as tests_utils
@ -28,7 +27,7 @@ from neutronclient.tests.unit.osc.v2.vpnaas import fakes
_ipsecpolicy = fakes.IPSecPolicy().create() _ipsecpolicy = fakes.IPSecPolicy().create()
CONVERT_MAP = { CONVERT_MAP = {
'project': 'tenant_id', 'project': 'project_id',
} }
@ -36,12 +35,12 @@ def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _ipsecpolicy source = ordered_dict if ordered_dict else _ipsecpolicy
if data: if data:
source.update(data) source.update(data)
return tuple(source[key] for key in source) return source
def _generate_req_and_res(verifylist): def _generate_req_and_res(verifylist):
request = dict(verifylist) request = dict(verifylist)
response = copy.deepcopy(_ipsecpolicy) response = _ipsecpolicy
for key, val in verifylist: for key, val in verifylist:
converted = CONVERT_MAP.get(key, key) converted = CONVERT_MAP.get(key, key)
del request[key] del request[key]
@ -55,10 +54,10 @@ class TestIPSecPolicy(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False): def check_results(self, headers, data, exp_req, is_list=False):
if is_list: if is_list:
req_body = {self.res_plural: [exp_req]} req_body = {self.res_plural: list(exp_req)}
else: else:
req_body = {self.res: exp_req} req_body = exp_req
self.mocked.assert_called_once_with(req_body) self.mocked.assert_called_once_with(**req_body)
self.assertEqual(self.ordered_headers, headers) self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data) self.assertEqual(self.ordered_data, data)
@ -66,14 +65,14 @@ class TestIPSecPolicy(test_fakes.TestNeutronClientOSCV2):
super(TestIPSecPolicy, self).setUp() super(TestIPSecPolicy, self).setUp()
def _mock_ipsecpolicy(*args, **kwargs): def _mock_ipsecpolicy(*args, **kwargs):
self.neutronclient.find_resource.assert_called_once_with( self.networkclient.find_vpn_ipsec_policy.assert_called_once_with(
self.res, self.resource['id'], cmd_resource='ipsecpolicy') self.resource['id'], ignore_missing=False)
return {'id': args[1]} return {'id': args[0]}
self.neutronclient.find_resource.side_effect = mock.Mock( self.networkclient.find_vpn_ipsec_policy.side_effect = mock.Mock(
side_effect=_mock_ipsecpolicy) side_effect=_mock_ipsecpolicy)
osc_utils.find_project = mock.Mock() osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _ipsecpolicy['tenant_id'] osc_utils.find_project.id = _ipsecpolicy['project_id']
self.res = 'ipsecpolicy' self.res = 'ipsecpolicy'
self.res_plural = 'ipsecpolicies' self.res_plural = 'ipsecpolicies'
self.resource = _ipsecpolicy self.resource = _ipsecpolicy
@ -111,7 +110,7 @@ class TestIPSecPolicy(test_fakes.TestNeutronClientOSCV2):
_ipsecpolicy['lifetime'], _ipsecpolicy['lifetime'],
_ipsecpolicy['name'], _ipsecpolicy['name'],
_ipsecpolicy['pfs'], _ipsecpolicy['pfs'],
_ipsecpolicy['tenant_id'], _ipsecpolicy['project_id'],
_ipsecpolicy['transform_protocol'], _ipsecpolicy['transform_protocol'],
) )
self.ordered_columns = ( self.ordered_columns = (
@ -123,7 +122,7 @@ class TestIPSecPolicy(test_fakes.TestNeutronClientOSCV2):
'lifetime', 'lifetime',
'name', 'name',
'pfs', 'pfs',
'tenant_id', 'project_id',
'transform_protocol', 'transform_protocol',
) )
@ -132,9 +131,9 @@ class TestCreateIPSecPolicy(TestIPSecPolicy, common.TestCreateVPNaaS):
def setUp(self): def setUp(self):
super(TestCreateIPSecPolicy, self).setUp() super(TestCreateIPSecPolicy, self).setUp()
self.neutronclient.create_ipsecpolicy = mock.Mock( self.networkclient.create_vpn_ipsec_policy = mock.Mock(
return_value={self.res: _ipsecpolicy}) return_value=_ipsecpolicy)
self.mocked = self.neutronclient.create_ipsecpolicy self.mocked = self.networkclient.create_vpn_ipsec_policy
self.cmd = ipsecpolicy.CreateIPsecPolicy(self.app, self.namespace) self.cmd = ipsecpolicy.CreateIPsecPolicy(self.app, self.namespace)
def _update_expect_response(self, request, response): def _update_expect_response(self, request, response):
@ -146,9 +145,9 @@ class TestCreateIPSecPolicy(TestIPSecPolicy, common.TestCreateVPNaaS):
A OrderedDict of request body A OrderedDict of request body
""" """
# Update response body # Update response body
self.neutronclient.create_ipsecpolicy.return_value = \ self.networkclient.create_vpn_ipsec_policy.return_value = \
{self.res: dict(response)} response
osc_utils.find_project.return_value.id = response['tenant_id'] osc_utils.find_project.return_value.id = response['project_id']
# Update response(finally returns 'data') # Update response(finally returns 'data')
self.data = _generate_data(ordered_dict=response) self.data = _generate_data(ordered_dict=response)
self.ordered_data = tuple( self.ordered_data = tuple(
@ -163,7 +162,7 @@ class TestCreateIPSecPolicy(TestIPSecPolicy, common.TestCreateVPNaaS):
encryption_algorithm = args.get('encryption_algorithm') or 'aes-128' encryption_algorithm = args.get('encryption_algorithm') or 'aes-128'
pfs = args.get('pfs') or 'group5' pfs = args.get('pfs') or 'group5'
description = args.get('description') or 'my-desc' description = args.get('description') or 'my-desc'
tenant_id = args.get('tenant_id') or 'my-tenant' project_id = args.get('project_id') or 'my-project'
arglist = [ arglist = [
name, name,
'--auth-algorithm', auth_algorithm, '--auth-algorithm', auth_algorithm,
@ -172,7 +171,7 @@ class TestCreateIPSecPolicy(TestIPSecPolicy, common.TestCreateVPNaaS):
'--encryption-algorithm', encryption_algorithm, '--encryption-algorithm', encryption_algorithm,
'--pfs', pfs, '--pfs', pfs,
'--description', description, '--description', description,
'--project', tenant_id, '--project', project_id,
] ]
verifylist = [ verifylist = [
('name', name), ('name', name),
@ -182,7 +181,7 @@ class TestCreateIPSecPolicy(TestIPSecPolicy, common.TestCreateVPNaaS):
('encryption_algorithm', encryption_algorithm), ('encryption_algorithm', encryption_algorithm),
('pfs', pfs), ('pfs', pfs),
('description', description), ('description', description),
('project', tenant_id), ('project', project_id),
] ]
return arglist, verifylist return arglist, verifylist
@ -192,7 +191,6 @@ class TestCreateIPSecPolicy(TestIPSecPolicy, common.TestCreateVPNaaS):
self._update_expect_response(request, response) self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist) parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args) headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request) self.check_results(headers, data, request)
def test_create_with_no_options(self): def test_create_with_no_options(self):
@ -213,9 +211,8 @@ class TestDeleteIPSecPolicy(TestIPSecPolicy, common.TestDeleteVPNaaS):
def setUp(self): def setUp(self):
super(TestDeleteIPSecPolicy, self).setUp() super(TestDeleteIPSecPolicy, self).setUp()
self.neutronclient.delete_ipsecpolicy = mock.Mock( self.networkclient.delete_vpn_ipsec_policy = mock.Mock()
return_value={self.res: _ipsecpolicy}) self.mocked = self.networkclient.delete_vpn_ipsec_policy
self.mocked = self.neutronclient.delete_ipsecpolicy
self.cmd = ipsecpolicy.DeleteIPsecPolicy(self.app, self.namespace) self.cmd = ipsecpolicy.DeleteIPsecPolicy(self.app, self.namespace)
@ -243,9 +240,9 @@ class TestListIPSecPolicy(TestIPSecPolicy):
_ipsecpolicy['encryption_algorithm'], _ipsecpolicy['encryption_algorithm'],
) )
self.neutronclient.list_ipsecpolicies = mock.Mock( self.networkclient.vpn_ipsec_policies = mock.Mock(
return_value={self.res_plural: [_ipsecpolicy]}) return_value=[_ipsecpolicy])
self.mocked = self.neutronclient.list_ipsecpolicies self.mocked = self.networkclient.vpn_ipsec_policies
def test_list_with_long_option(self): def test_list_with_long_option(self):
arglist = ['--long'] arglist = ['--long']
@ -255,7 +252,6 @@ class TestListIPSecPolicy(TestIPSecPolicy):
self.mocked.assert_called_once_with() self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers) self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self): def test_list_with_no_option(self):
arglist = [] arglist = []
@ -272,9 +268,9 @@ class TestSetIPSecPolicy(TestIPSecPolicy, common.TestSetVPNaaS):
def setUp(self): def setUp(self):
super(TestSetIPSecPolicy, self).setUp() super(TestSetIPSecPolicy, self).setUp()
self.neutronclient.update_ipsecpolicy = mock.Mock( self.networkclient.update_vpn_ipsec_policy = mock.Mock(
return_value={self.res: _ipsecpolicy}) return_value=_ipsecpolicy)
self.mocked = self.neutronclient.update_ipsecpolicy self.mocked = self.networkclient.update_vpn_ipsec_policy
self.cmd = ipsecpolicy.SetIPsecPolicy(self.app, self.namespace) self.cmd = ipsecpolicy.SetIPsecPolicy(self.app, self.namespace)
def test_set_auth_algorithm_with_sha256(self): def test_set_auth_algorithm_with_sha256(self):
@ -289,7 +285,7 @@ class TestSetIPSecPolicy(TestIPSecPolicy, common.TestSetVPNaaS):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(
target, {self.res: {'auth_algorithm': 'sha256'}}) target, **{'auth_algorithm': 'sha256'})
self.assertIsNone(result) self.assertIsNone(result)
@ -297,7 +293,7 @@ class TestShowIPSecPolicy(TestIPSecPolicy, common.TestShowVPNaaS):
def setUp(self): def setUp(self):
super(TestShowIPSecPolicy, self).setUp() super(TestShowIPSecPolicy, self).setUp()
self.neutronclient.show_ipsecpolicy = mock.Mock( self.networkclient.get_vpn_ipsec_policy = mock.Mock(
return_value={self.res: _ipsecpolicy}) return_value=_ipsecpolicy)
self.mocked = self.neutronclient.show_ipsecpolicy self.mocked = self.networkclient.get_vpn_ipsec_policy
self.cmd = ipsecpolicy.ShowIPsecPolicy(self.app, self.namespace) self.cmd = ipsecpolicy.ShowIPsecPolicy(self.app, self.namespace)

@ -14,7 +14,6 @@
# under the License. # under the License.
# #
import copy
from unittest import mock from unittest import mock
import uuid import uuid
@ -28,7 +27,7 @@ from neutronclient.tests.unit.osc.v2.vpnaas import fakes
_vpnservice = fakes.VPNService().create() _vpnservice = fakes.VPNService().create()
CONVERT_MAP = { CONVERT_MAP = {
'project': 'tenant_id', 'project': 'project_id',
'router': 'router_id', 'router': 'router_id',
'subnet': 'subnet_id' 'subnet': 'subnet_id'
} }
@ -38,12 +37,12 @@ def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _vpnservice source = ordered_dict if ordered_dict else _vpnservice
if data: if data:
source.update(data) source.update(data)
return tuple(source[key] for key in source) return source
def _generate_req_and_res(verifylist): def _generate_req_and_res(verifylist):
request = dict(verifylist) request = dict(verifylist)
response = copy.deepcopy(_vpnservice) response = _vpnservice
for key, val in verifylist: for key, val in verifylist:
converted = CONVERT_MAP.get(key, key) converted = CONVERT_MAP.get(key, key)
del request[key] del request[key]
@ -57,10 +56,10 @@ class TestVPNService(test_fakes.TestNeutronClientOSCV2):
def _check_results(self, headers, data, exp_req, is_list=False): def _check_results(self, headers, data, exp_req, is_list=False):
if is_list: if is_list:
req_body = {self.res_plural: [exp_req]} req_body = {self.res_plural: list(exp_req)}
else: else:
req_body = {self.res: exp_req} req_body = exp_req
self.mocked.assert_called_once_with(req_body) self.mocked.assert_called_once_with(**req_body)
self.assertEqual(self.ordered_headers, headers) self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data) self.assertEqual(self.ordered_data, data)
@ -68,23 +67,20 @@ class TestVPNService(test_fakes.TestNeutronClientOSCV2):
super(TestVPNService, self).setUp() super(TestVPNService, self).setUp()
def _mock_vpnservice(*args, **kwargs): def _mock_vpnservice(*args, **kwargs):
self.neutronclient.find_resource.assert_called_once_with( self.networkclient.find_vpn_service.assert_called_once_with(
self.res, self.resource['id'], cmd_resource='vpnservice') self.resource['id'], ignore_missing=False)
return {'id': args[1]} return {'id': args[0]}
self.app.client_manager.network = mock.Mock() self.networkclient.find_router = mock.Mock()
self.app.client_manager.network.find_router = mock.Mock() self.networkclient.find_subnet = mock.Mock()
self.app.client_manager.network.find_subnet = mock.Mock()
self.fake_router = mock.Mock() self.fake_router = mock.Mock()
self.fake_subnet = mock.Mock() self.fake_subnet = mock.Mock()
self.app.client_manager.network.find_router.return_value = \ self.networkclient.find_router.return_value = self.fake_router
self.fake_router self.networkclient.find_subnet.return_value = self.fake_subnet
self.app.client_manager.network.find_subnet.return_value = \
self.fake_subnet
self.args = { self.args = {
'name': 'my-name', 'name': 'my-name',
'description': 'my-desc', 'description': 'my-desc',
'tenant_id': 'tenant-id-' + uuid.uuid4().hex, 'project_id': 'project-id-' + uuid.uuid4().hex,
'router_id': 'router-id-' + uuid.uuid4().hex, 'router_id': 'router-id-' + uuid.uuid4().hex,
'subnet_id': 'subnet-id-' + uuid.uuid4().hex, 'subnet_id': 'subnet-id-' + uuid.uuid4().hex,
@ -92,10 +88,10 @@ class TestVPNService(test_fakes.TestNeutronClientOSCV2):
self.fake_subnet.id = self.args['subnet_id'] self.fake_subnet.id = self.args['subnet_id']
self.fake_router.id = self.args['router_id'] self.fake_router.id = self.args['router_id']
self.neutronclient.find_resource.side_effect = mock.Mock( self.networkclient.find_vpn_service.side_effect = mock.Mock(
side_effect=_mock_vpnservice) side_effect=_mock_vpnservice)
osc_utils.find_project = mock.Mock() osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _vpnservice['tenant_id'] osc_utils.find_project.id = _vpnservice['project_id']
self.res = 'vpnservice' self.res = 'vpnservice'
self.res_plural = 'vpnservices' self.res_plural = 'vpnservices'
@ -128,7 +124,7 @@ class TestVPNService(test_fakes.TestNeutronClientOSCV2):
_vpnservice['flavor_id'], _vpnservice['flavor_id'],
_vpnservice['id'], _vpnservice['id'],
_vpnservice['name'], _vpnservice['name'],
_vpnservice['tenant_id'], _vpnservice['project_id'],
_vpnservice['router_id'], _vpnservice['router_id'],
_vpnservice['admin_state_up'], _vpnservice['admin_state_up'],
_vpnservice['status'], _vpnservice['status'],
@ -139,7 +135,7 @@ class TestVPNService(test_fakes.TestNeutronClientOSCV2):
'flavor_id', 'flavor_id',
'id', 'id',
'name', 'name',
'tenant_id', 'project_id',
'router_id', 'router_id',
'admin_state_up', 'admin_state_up',
'status', 'status',
@ -151,9 +147,9 @@ class TestCreateVPNService(TestVPNService, common.TestCreateVPNaaS):
def setUp(self): def setUp(self):
super(TestCreateVPNService, self).setUp() super(TestCreateVPNService, self).setUp()
self.neutronclient.create_vpnservice = mock.Mock( self.networkclient.create_vpn_service = mock.Mock(
return_value={self.res: _vpnservice}) return_value=_vpnservice)
self.mocked = self.neutronclient.create_vpnservice self.mocked = self.networkclient.create_vpn_service
self.cmd = vpnservice.CreateVPNService(self.app, self.namespace) self.cmd = vpnservice.CreateVPNService(self.app, self.namespace)
def _update_expect_response(self, request, response): def _update_expect_response(self, request, response):
@ -165,9 +161,8 @@ class TestCreateVPNService(TestVPNService, common.TestCreateVPNaaS):
A OrderedDict of request body A OrderedDict of request body
""" """
# Update response body # Update response body
self.neutronclient.create_vpnservice.return_value = \ self.networkclient.create_vpn_service.return_value = response
{self.res: dict(response)} osc_utils.find_project.return_value.id = response['project_id']
osc_utils.find_project.return_value.id = response['tenant_id']
# Update response(finally returns 'data') # Update response(finally returns 'data')
self.data = _generate_data(ordered_dict=response) self.data = _generate_data(ordered_dict=response)
self.ordered_data = tuple( self.ordered_data = tuple(
@ -179,17 +174,17 @@ class TestCreateVPNService(TestVPNService, common.TestCreateVPNaaS):
description = self.args.get('description') description = self.args.get('description')
router_id = self.args.get('router_id') router_id = self.args.get('router_id')
subnet_id = self.args.get('subnet_id') subnet_id = self.args.get('subnet_id')
tenant_id = self.args.get('tenant_id') project_id = self.args.get('project_id')
arglist = [ arglist = [
'--description', description, '--description', description,
'--project', tenant_id, '--project', project_id,
'--subnet', subnet_id, '--subnet', subnet_id,
'--router', router_id, '--router', router_id,
name, name,
] ]
verifylist = [ verifylist = [
('description', description), ('description', description),
('project', tenant_id), ('project', project_id),
('subnet', subnet_id), ('subnet', subnet_id),
('router', router_id), ('router', router_id),
('name', name), ('name', name),
@ -213,9 +208,8 @@ class TestDeleteVPNService(TestVPNService, common.TestDeleteVPNaaS):
def setUp(self): def setUp(self):
super(TestDeleteVPNService, self).setUp() super(TestDeleteVPNService, self).setUp()
self.neutronclient.delete_vpnservice = mock.Mock( self.networkclient.delete_vpn_service = mock.Mock()
return_value={self.res: _vpnservice}) self.mocked = self.networkclient.delete_vpn_service
self.mocked = self.neutronclient.delete_vpnservice
self.cmd = vpnservice.DeleteVPNService(self.app, self.namespace) self.cmd = vpnservice.DeleteVPNService(self.app, self.namespace)
@ -245,9 +239,8 @@ class TestListVPNService(TestVPNService):
_vpnservice['status'], _vpnservice['status'],
) )
self.neutronclient.list_vpnservices = mock.Mock( self.networkclient.vpn_services = mock.Mock(return_value=[_vpnservice])
return_value={self.res_plural: [_vpnservice]}) self.mocked = self.networkclient.vpn_services
self.mocked = self.neutronclient.list_vpnservices
def test_list_with_long_option(self): def test_list_with_long_option(self):
arglist = ['--long'] arglist = ['--long']
@ -257,7 +250,6 @@ class TestListVPNService(TestVPNService):
self.mocked.assert_called_once_with() self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers) self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self): def test_list_with_no_option(self):
arglist = [] arglist = []
@ -274,9 +266,9 @@ class TestSetVPNService(TestVPNService, common.TestSetVPNaaS):
def setUp(self): def setUp(self):
super(TestSetVPNService, self).setUp() super(TestSetVPNService, self).setUp()
self.neutronclient.update_vpnservice = mock.Mock( self.networkclient.update_vpn_service = mock.Mock(
return_value={self.res: _vpnservice}) return_value=_vpnservice)
self.mocked = self.neutronclient.update_vpnservice self.mocked = self.networkclient.update_vpn_service
self.cmd = vpnservice.SetVPNSercice(self.app, self.namespace) self.cmd = vpnservice.SetVPNSercice(self.app, self.namespace)
def test_set_name(self): def test_set_name(self):
@ -291,7 +283,7 @@ class TestSetVPNService(TestVPNService, common.TestSetVPNaaS):
result = self.cmd.take_action(parsed_args) result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with( self.mocked.assert_called_once_with(
target, {self.res: {'name': update}}) target, **{'name': update})
self.assertIsNone(result) self.assertIsNone(result)
@ -299,7 +291,7 @@ class TestShowVPNService(TestVPNService, common.TestShowVPNaaS):
def setUp(self): def setUp(self):
super(TestShowVPNService, self).setUp() super(TestShowVPNService, self).setUp()
self.neutronclient.show_vpnservice = mock.Mock( self.networkclient.get_vpn_service = mock.Mock(
return_value={self.res: _vpnservice}) return_value=_vpnservice)
self.mocked = self.neutronclient.show_vpnservice self.mocked = self.networkclient.get_vpn_service
self.cmd = vpnservice.ShowVPNService(self.app, self.namespace) self.cmd = vpnservice.ShowVPNService(self.app, self.namespace)