Network proxy methods

Change-Id: I15c9dea06d3cdcd54d1a1cce5defb01d3a701f31
This commit is contained in:
Terry Howe 2014-11-01 21:03:48 -04:00
parent 48384dd506
commit b2732b9089
2 changed files with 610 additions and 0 deletions
openstack
network/v2
tests/network/v2

View File

@ -10,8 +10,319 @@
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import extension
from openstack.network.v2 import floatingip
from openstack.network.v2 import health_monitor
from openstack.network.v2 import listener
from openstack.network.v2 import load_balancer
from openstack.network.v2 import metering_label
from openstack.network.v2 import metering_label_rule
from openstack.network.v2 import network
from openstack.network.v2 import pool
from openstack.network.v2 import pool_member
from openstack.network.v2 import port
from openstack.network.v2 import quota
from openstack.network.v2 import router
from openstack.network.v2 import security_group
from openstack.network.v2 import security_group_rule
from openstack.network.v2 import subnet
class Proxy(object):
def __init__(self, session):
self.session = session
def find_extension(self, name_or_id):
return extension.Extension.find(self.session, name_or_id)
def list_extension(self):
return extension.Extension.list(self.session)
def create_ip(self, **data):
return floatingip.FloatingIP(data).create(self.session)
def delete_ip(self, **data):
floatingip.FloatingIP(**data).delete(self.session)
def find_available_ip(self):
return floatingip.FloatingIP.find_available(self.session)
def find_ip(self, name_or_id):
return floatingip.FloatingIP.find(self.session, name_or_id)
def get_ip(self, **data):
return floatingip.FloatingIP(**data).get(self.session)
def list_ips(self, **params):
return floatingip.FloatingIP.list(self.session, **params)
def update_ip(self, **data):
return floatingip.FloatingIP(**data).update(self.session)
def create_health_monitor(self, **data):
return health_monitor.HealthMonitor(data).create(self.session)
def delete_health_monitor(self, **data):
health_monitor.HealthMonitor(data).delete(self.session)
def find_health_monitor(self, name_or_id):
return health_monitor.HealthMonitor.find(self.session, name_or_id)
def get_health_monitor(self, **data):
return health_monitor.HealthMonitor(data).get(self.session)
def list_health_monitor(self):
return health_monitor.HealthMonitor.list(self.session)
def update_health_monitor(self, **data):
return health_monitor.HealthMonitor(data).update(self.session)
def create_listener(self, **data):
return listener.Listener(data).create(self.session)
def delete_listener(self, **data):
listener.Listener(data).delete(self.session)
def find_listener(self, name_or_id):
return listener.Listener.find(self.session, name_or_id)
def get_listener(self, **data):
return listener.Listener(data).get(self.session)
def list_listener(self):
return listener.Listener.list(self.session)
def update_listener(self, **data):
return listener.Listener(data).update(self.session)
def create_load_balancer(self, **data):
return load_balancer.LoadBalancer(data).create(self.session)
def delete_load_balancer(self, **data):
load_balancer.LoadBalancer(data).delete(self.session)
def find_load_balancer(self, name_or_id):
return load_balancer.LoadBalancer.find(self.session, name_or_id)
def get_load_balancer(self, **data):
return load_balancer.LoadBalancer(data).get(self.session)
def list_load_balancer(self):
return load_balancer.LoadBalancer.list(self.session)
def update_load_balancer(self, **data):
return load_balancer.LoadBalancer(data).update(self.session)
def create_metering_label(self, **data):
return metering_label.MeteringLabel(data).create(self.session)
def delete_metering_label(self, **data):
metering_label.MeteringLabel(data).delete(self.session)
def find_metering_label(self, name_or_id):
return metering_label.MeteringLabel.find(self.session, name_or_id)
def get_metering_label(self, **data):
return metering_label.MeteringLabel(data).get(self.session)
def list_metering_label(self):
return metering_label.MeteringLabel.list(self.session)
def update_metering_label(self, **data):
return metering_label.MeteringLabel(data).update(self.session)
def create_metering_label_rule(self, **data):
return metering_label_rule.MeteringLabelRule(data).create(self.session)
def delete_metering_label_rule(self, **data):
metering_label_rule.MeteringLabelRule(data).delete(self.session)
def find_metering_label_rule(self, name_or_id):
return metering_label_rule.MeteringLabelRule.find(self.session,
name_or_id)
def get_metering_label_rule(self, **data):
return metering_label_rule.MeteringLabelRule(data).get(self.session)
def list_metering_label_rule(self):
return metering_label_rule.MeteringLabelRule.list(self.session)
def update_metering_label_rule(self, **data):
return metering_label_rule.MeteringLabelRule(data).update(self.session)
def create_network(self, **data):
return network.Network(data).create(self.session)
def delete_network(self, **data):
network.Network(data).delete(self.session)
def find_network(self, name_or_id):
return network.Network.find(self.session, name_or_id)
def get_network(self, **data):
return network.Network(data).get(self.session)
def list_networks(self, **params):
return network.Network.list(self.session, **params)
def update_network(self, **data):
return network.Network(data).update(self.session)
def create_pool(self, **data):
return pool.Pool(data).create(self.session)
def delete_pool(self, **data):
pool.Pool(data).delete(self.session)
def find_pool(self, name_or_id):
return pool.Pool.find(self.session, name_or_id)
def get_pool(self, **data):
return pool.Pool(data).get(self.session)
def list_pool(self):
return pool.Pool.list(self.session)
def update_pool(self, **data):
return pool.Pool(data).update(self.session)
def create_pool_member(self, **data):
return pool_member.PoolMember(data).create(self.session)
def delete_pool_member(self, **data):
pool_member.PoolMember(data).delete(self.session)
def find_pool_member(self, name_or_id):
return pool_member.PoolMember.find(self.session, name_or_id)
def get_pool_member(self, **data):
return pool_member.PoolMember(data).get(self.session)
def list_pool_member(self):
return pool_member.PoolMember.list(self.session)
def update_pool_member(self, **data):
return pool_member.PoolMember(data).update(self.session)
def create_port(self, **data):
return port.Port(data).create(self.session)
def delete_port(self, **data):
return port.Port(data).delete(self.session)
def find_port(self, name_or_id):
return port.Port.find(self.session, name_or_id)
def get_port(self, **data):
return port.Port(data).get(self.session)
def list_ports(self, **params):
return port.Port.list(self.session, **params)
def update_port(self, **data):
return port.Port(data).update(self.session)
def add_ip_to_port(self, port, ip):
ip['port_id'] = port.id
return ip.update(self.session)
def remove_ip_from_port(self, ip):
ip['port_id'] = None
return ip.update(self.session)
def get_subnet_ports(self, subnet_id):
result = []
ports = self.list_ports()
for puerta in ports:
for fixed_ip in puerta.fixed_ips:
if fixed_ip['subnet_id'] == subnet_id:
result.append(puerta)
return result
def list_quota(self):
return quota.Quota.list(self.session)
def create_router(self, **data):
return router.Router(data).create(self.session)
def delete_router(self, **data):
return router.Router(**data).delete(self.session)
def find_router(self, name_or_id):
return router.Router.find(self.session, name_or_id)
def get_router(self, **data):
return router.Router(**data).get(self.session)
def list_routers(self, **params):
return router.Router.list(self.session, **params)
def update_router(self, **data):
return router.Router(**data).update(self.session)
def router_add_interface(self, router, subnet_id):
router.add_interface(self.session, subnet_id)
def router_remove_interface(self, router, subnet_id):
router.remove_interface(self.session, subnet_id)
def create_security_group(self, **data):
return security_group.SecurityGroup(data).create(self.session)
def delete_security_group(self, **data):
return security_group.SecurityGroup(**data).delete(self.session)
def find_security_group(self, name_or_id):
return security_group.SecurityGroup.find(self.session, name_or_id)
def get_security_group(self, **data):
return security_group.SecurityGroup(**data).get(self.session)
def list_security_groups(self, **params):
return security_group.SecurityGroup.list(self.session, **params)
def update_security_group(self, **data):
return security_group.SecurityGroup(**data).update(self.session)
def create_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(data)
return obj.create(self.session)
def delete_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(**data)
return obj.delete(self.session)
def find_security_group_rule(self, name_or_id):
return security_group_rule.SecurityGroupRule.find(self.session,
name_or_id)
def get_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(**data)
return obj.get(self.session)
def list_security_group_rules(self, **params):
return security_group_rule.SecurityGroupRule.list(self.session,
**params)
def update_security_group_rule(self, **data):
obj = security_group_rule.SecurityGroupRule(**data)
return obj.update(self.session)
def create_subnet(self, **data):
return subnet.Subnet(data).create(self.session)
def delete_subnet(self, **data):
return subnet.Subnet(**data).delete(self.session)
def find_subnet(self, name_or_id):
return subnet.Subnet.find(self.session, name_or_id)
def get_subnet(self, **data):
return subnet.Subnet(**data).get(self.session)
def list_subnets(self, **params):
return subnet.Subnet.list(self.session, **params)
def update_subnet(self, **data):
return subnet.Subnet(**data).update(self.session)

View File

@ -0,0 +1,299 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.network.v2 import _proxy
from openstack.tests import test_proxy_base
class TestNetworkProxy(test_proxy_base.TestProxyBase):
def setUp(self):
super(TestNetworkProxy, self).setUp()
self.proxy = _proxy.Proxy(self.session)
def test_extension(self):
self.verify_find('openstack.network.v2.extension.Extension.find',
self.proxy.find_extension)
self.verify_list('openstack.network.v2.extension.Extension.list',
self.proxy.list_extension)
def test_ip(self):
self.verify_create('openstack.network.v2.floatingip.FloatingIP.create',
self.proxy.create_ip)
self.verify_delete('openstack.network.v2.floatingip.FloatingIP.delete',
self.proxy.delete_ip)
self.verify_find('openstack.network.v2.floatingip.FloatingIP.find',
self.proxy.find_ip)
self.verify_get('openstack.network.v2.floatingip.FloatingIP.get',
self.proxy.get_ip)
self.verify_list('openstack.network.v2.floatingip.FloatingIP.list',
self.proxy.list_ips)
self.verify_update('openstack.network.v2.floatingip.FloatingIP.update',
self.proxy.update_ip)
def test_health_monitor(self):
self.verify_create(
'openstack.network.v2.health_monitor.HealthMonitor.create',
self.proxy.create_health_monitor
)
self.verify_delete(
'openstack.network.v2.health_monitor.HealthMonitor.delete',
self.proxy.delete_health_monitor
)
self.verify_find(
'openstack.network.v2.health_monitor.HealthMonitor.find',
self.proxy.find_health_monitor
)
self.verify_get(
'openstack.network.v2.health_monitor.HealthMonitor.get',
self.proxy.get_health_monitor
)
self.verify_list(
'openstack.network.v2.health_monitor.HealthMonitor.list',
self.proxy.list_health_monitor
)
self.verify_update(
'openstack.network.v2.health_monitor.HealthMonitor.update',
self.proxy.update_health_monitor
)
def test_listener(self):
self.verify_create('openstack.network.v2.listener.Listener.create',
self.proxy.create_listener)
self.verify_delete('openstack.network.v2.listener.Listener.delete',
self.proxy.delete_listener)
self.verify_find('openstack.network.v2.listener.Listener.find',
self.proxy.find_listener)
self.verify_get('openstack.network.v2.listener.Listener.get',
self.proxy.get_listener)
self.verify_list('openstack.network.v2.listener.Listener.list',
self.proxy.list_listener)
self.verify_update('openstack.network.v2.listener.Listener.update',
self.proxy.update_listener)
def test_load_balancer(self):
self.verify_create(
'openstack.network.v2.load_balancer.LoadBalancer.create',
self.proxy.create_load_balancer
)
self.verify_delete(
'openstack.network.v2.load_balancer.LoadBalancer.delete',
self.proxy.delete_load_balancer
)
self.verify_find(
'openstack.network.v2.load_balancer.LoadBalancer.find',
self.proxy.find_load_balancer
)
self.verify_get('openstack.network.v2.load_balancer.LoadBalancer.get',
self.proxy.get_load_balancer)
self.verify_list(
'openstack.network.v2.load_balancer.LoadBalancer.list',
self.proxy.list_load_balancer
)
self.verify_update(
'openstack.network.v2.load_balancer.LoadBalancer.update',
self.proxy.update_load_balancer
)
def test_metering_label(self):
self.verify_create(
'openstack.network.v2.metering_label.MeteringLabel.create',
self.proxy.create_metering_label
)
self.verify_delete(
'openstack.network.v2.metering_label.MeteringLabel.delete',
self.proxy.delete_metering_label
)
self.verify_find(
'openstack.network.v2.metering_label.MeteringLabel.find',
self.proxy.find_metering_label
)
self.verify_get(
'openstack.network.v2.metering_label.MeteringLabel.get',
self.proxy.get_metering_label
)
self.verify_list(
'openstack.network.v2.metering_label.MeteringLabel.list',
self.proxy.list_metering_label
)
self.verify_update(
'openstack.network.v2.metering_label.MeteringLabel.update',
self.proxy.update_metering_label
)
def test_metering_label_rule(self):
self.verify_create(
('openstack.network.v2.' +
'metering_label_rule.MeteringLabelRule.create'),
self.proxy.create_metering_label_rule
)
self.verify_delete(
('openstack.network.v2.' +
'metering_label_rule.MeteringLabelRule.delete'),
self.proxy.delete_metering_label_rule
)
self.verify_find(
'openstack.network.v2.metering_label_rule.MeteringLabelRule.find',
self.proxy.find_metering_label_rule
)
self.verify_get(
'openstack.network.v2.metering_label_rule.MeteringLabelRule.get',
self.proxy.get_metering_label_rule
)
self.verify_list(
'openstack.network.v2.metering_label_rule.MeteringLabelRule.list',
self.proxy.list_metering_label_rule
)
self.verify_update(
('openstack.network.v2.' +
'metering_label_rule.MeteringLabelRule.update'),
self.proxy.update_metering_label_rule
)
def test_network(self):
self.verify_create('openstack.network.v2.network.Network.create',
self.proxy.create_network)
self.verify_delete('openstack.network.v2.network.Network.delete',
self.proxy.delete_network)
self.verify_find('openstack.network.v2.network.Network.find',
self.proxy.find_network)
self.verify_get('openstack.network.v2.network.Network.get',
self.proxy.get_network)
self.verify_list('openstack.network.v2.network.Network.list',
self.proxy.list_networks)
self.verify_update('openstack.network.v2.network.Network.update',
self.proxy.update_network)
def test_pool(self):
self.verify_create('openstack.network.v2.pool.Pool.create',
self.proxy.create_pool)
self.verify_delete('openstack.network.v2.pool.Pool.delete',
self.proxy.delete_pool)
self.verify_find('openstack.network.v2.pool.Pool.find',
self.proxy.find_pool)
self.verify_get('openstack.network.v2.pool.Pool.get',
self.proxy.get_pool)
self.verify_list('openstack.network.v2.pool.Pool.list',
self.proxy.list_pool)
self.verify_update('openstack.network.v2.pool.Pool.update',
self.proxy.update_pool)
def test_pool_member(self):
self.verify_create(
'openstack.network.v2.pool_member.PoolMember.create',
self.proxy.create_pool_member
)
self.verify_delete(
'openstack.network.v2.pool_member.PoolMember.delete',
self.proxy.delete_pool_member
)
self.verify_find('openstack.network.v2.pool_member.PoolMember.find',
self.proxy.find_pool_member)
self.verify_get('openstack.network.v2.pool_member.PoolMember.get',
self.proxy.get_pool_member)
self.verify_list('openstack.network.v2.pool_member.PoolMember.list',
self.proxy.list_pool_member)
self.verify_update(
'openstack.network.v2.pool_member.PoolMember.update',
self.proxy.update_pool_member
)
def test_port(self):
self.verify_create('openstack.network.v2.port.Port.create',
self.proxy.create_port)
self.verify_delete('openstack.network.v2.port.Port.delete',
self.proxy.delete_port)
self.verify_find('openstack.network.v2.port.Port.find',
self.proxy.find_port)
self.verify_get('openstack.network.v2.port.Port.get',
self.proxy.get_port)
self.verify_list('openstack.network.v2.port.Port.list',
self.proxy.list_ports)
self.verify_update('openstack.network.v2.port.Port.update',
self.proxy.update_port)
def test_quota(self):
self.verify_list('openstack.network.v2.quota.Quota.list',
self.proxy.list_quota)
def test_router(self):
self.verify_create('openstack.network.v2.router.Router.create',
self.proxy.create_router)
self.verify_delete('openstack.network.v2.router.Router.delete',
self.proxy.delete_router)
self.verify_find('openstack.network.v2.router.Router.find',
self.proxy.find_router)
self.verify_get('openstack.network.v2.router.Router.get',
self.proxy.get_router)
self.verify_list('openstack.network.v2.router.Router.list',
self.proxy.list_routers)
self.verify_update('openstack.network.v2.router.Router.update',
self.proxy.update_router)
def test_security_group(self):
self.verify_create(
'openstack.network.v2.security_group.SecurityGroup.create',
self.proxy.create_security_group)
self.verify_delete(
'openstack.network.v2.security_group.SecurityGroup.delete',
self.proxy.delete_security_group)
self.verify_find(
'openstack.network.v2.security_group.SecurityGroup.find',
self.proxy.find_security_group)
self.verify_get(
'openstack.network.v2.security_group.SecurityGroup.get',
self.proxy.get_security_group)
self.verify_list(
'openstack.network.v2.security_group.SecurityGroup.list',
self.proxy.list_security_groups)
self.verify_update(
'openstack.network.v2.security_group.SecurityGroup.update',
self.proxy.update_security_group)
def test_security_group_rule(self):
self.verify_create(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.create'),
self.proxy.create_security_group_rule)
self.verify_delete(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.delete'),
self.proxy.delete_security_group_rule)
self.verify_find(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.find'),
self.proxy.find_security_group_rule)
self.verify_get(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.get'),
self.proxy.get_security_group_rule)
self.verify_list(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.list'),
self.proxy.list_security_group_rules)
self.verify_update(
('openstack.network.v2.' +
'security_group_rule.SecurityGroupRule.update'),
self.proxy.update_security_group_rule)
def test_subnet(self):
self.verify_create('openstack.network.v2.subnet.Subnet.create',
self.proxy.create_subnet)
self.verify_delete('openstack.network.v2.subnet.Subnet.delete',
self.proxy.delete_subnet)
self.verify_find('openstack.network.v2.subnet.Subnet.find',
self.proxy.find_subnet)
self.verify_get('openstack.network.v2.subnet.Subnet.get',
self.proxy.get_subnet)
self.verify_list('openstack.network.v2.subnet.Subnet.list',
self.proxy.list_subnets)
self.verify_update('openstack.network.v2.subnet.Subnet.update',
self.proxy.update_subnet)