Test cleanup, make driver return dictionaries and construct objects in manager

This commit is contained in:
Vishvananda Ishaya
2010-07-20 09:15:36 -05:00
parent 930f3a180b
commit a599d9b04c
5 changed files with 187 additions and 160 deletions

View File

@@ -28,7 +28,6 @@ import logging
from nova import exception
from nova import flags
from nova.auth import manager
try:
import ldap
@@ -322,7 +321,7 @@ class LdapDriver(object):
keys = self.get_key_pairs(uid)
if keys != None:
for key in keys:
self.delete_key_pair(uid, key.name)
self.delete_key_pair(uid, key['name'])
def __role_to_dn(self, role, project_id=None):
"""Convert role to corresponding dn"""
@@ -438,38 +437,38 @@ class LdapDriver(object):
"""Convert ldap attributes to User object"""
if attr == None:
return None
return manager.User(
id = attr['uid'][0],
name = attr['cn'][0],
access = attr['accessKey'][0],
secret = attr['secretKey'][0],
admin = (attr['isAdmin'][0] == 'TRUE')
)
return {
'id': attr['uid'][0],
'name': attr['cn'][0],
'access': attr['accessKey'][0],
'secret': attr['secretKey'][0],
'admin': (attr['isAdmin'][0] == 'TRUE')
}
def __to_key_pair(self, owner, attr):
"""Convert ldap attributes to KeyPair object"""
if attr == None:
return None
return manager.KeyPair(
id = attr['cn'][0],
name = attr['cn'][0],
owner_id = owner,
public_key = attr['sshPublicKey'][0],
fingerprint = attr['keyFingerprint'][0],
)
return {
'id': attr['cn'][0],
'name': attr['cn'][0],
'owner_id': owner,
'public_key': attr['sshPublicKey'][0],
'fingerprint': attr['keyFingerprint'][0],
}
def __to_project(self, attr):
"""Convert ldap attributes to Project object"""
if attr == None:
return None
member_dns = attr.get('member', [])
return manager.Project(
id = attr['cn'][0],
name = attr['cn'][0],
project_manager_id = self.__dn_to_uid(attr['projectManager'][0]),
description = attr.get('description', [None])[0],
member_ids = [self.__dn_to_uid(x) for x in member_dns]
)
return {
'id': attr['cn'][0],
'name': attr['cn'][0],
'project_manager_id': self.__dn_to_uid(attr['projectManager'][0]),
'description': attr.get('description', [None])[0],
'member_ids': [self.__dn_to_uid(x) for x in member_dns]
}
def __dn_to_uid(self, dn):
"""Convert user dn to uid"""

View File

@@ -33,9 +33,9 @@ from nova import datastore
from nova import exception
from nova import flags
from nova import objectstore # for flags
from nova import signer
from nova import utils
from nova.auth import ldapdriver
from nova.auth import signer
FLAGS = flags.FLAGS
# NOTE(vish): a user with one of these roles will be a superuser and
@@ -187,6 +187,14 @@ class Project(AuthBase):
def project_manager(self):
return AuthManager().get_user(self.project_manager_id)
@property
def vpn_ip(self):
return AuthManager().get_project_vpn_ip(self)
@property
def vpn_port(self):
return AuthManager().get_project_vpn_port(self)
def has_manager(self, user):
return AuthManager().is_project_manager(user, self)
@@ -314,16 +322,6 @@ class AuthManager(object):
def __init__(self, *args, **kwargs):
self.driver_class = kwargs.get('driver_class', ldapdriver.LdapDriver)
if FLAGS.fake_tests:
try:
self.create_user('fake', 'fake', 'fake')
except: pass
try:
self.create_user('user', 'user', 'user')
except: pass
try:
self.create_user('admin', 'admin', 'admin', True)
except: pass
def authenticate(self, access, signature, params, verb='GET',
server_string='127.0.0.1:8773', path='/',
@@ -508,6 +506,21 @@ class AuthManager(object):
with self.driver_class() as drv:
drv.remove_role(User.safe_id(user), role, Project.safe_id(project))
def get_project(self, pid):
"""Get project object by id"""
with self.driver_class() as drv:
project_dict = drv.get_project(pid)
if project_dict:
return Project(**project_dict)
def get_projects(self):
"""Retrieves list of all projects"""
with self.driver_class() as drv:
project_list = drv.get_projects()
if not project_list:
return []
return [Project(**project_dict) for project_dict in project_list]
def create_project(self, name, manager_user,
description=None, member_users=None):
"""Create a project
@@ -532,26 +545,14 @@ class AuthManager(object):
"""
if member_users:
member_users = [User.safe_id(u) for u in member_users]
# NOTE(vish): try to associate a vpn ip and port first because
# if it throws an exception, we save having to
# create and destroy a project
Vpn.create(name)
with self.driver_class() as drv:
return drv.create_project(name,
User.safe_id(manager_user),
description,
member_users)
def get_projects(self):
"""Retrieves list of all projects"""
with self.driver_class() as drv:
return drv.get_projects()
def get_project(self, pid):
"""Get project object by id"""
with self.driver_class() as drv:
return drv.get_project(pid)
project_dict = drv.create_project(name,
User.safe_id(manager_user),
description,
member_users)
if project_dict:
Vpn.create(project_dict['id'])
return Project(**project_dict)
def add_to_project(self, user, project):
"""Add user to project"""
@@ -577,6 +578,12 @@ class AuthManager(object):
return drv.remove_from_project(User.safe_id(user),
Project.safe_id(project))
def get_project_vpn_ip(self, project):
return Vpn(Project.safe_id(project)).ip
def get_project_vpn_port(self, project):
return Vpn(Project.safe_id(project)).port
def delete_project(self, project):
"""Deletes a project"""
with self.driver_class() as drv:
@@ -585,17 +592,24 @@ class AuthManager(object):
def get_user(self, uid):
"""Retrieves a user by id"""
with self.driver_class() as drv:
return drv.get_user(uid)
user_dict = drv.get_user(uid)
if user_dict:
return User(**user_dict)
def get_user_from_access_key(self, access_key):
"""Retrieves a user by access key"""
with self.driver_class() as drv:
return drv.get_user_from_access_key(access_key)
user_dict = drv.get_user_from_access_key(access_key)
if user_dict:
return User(**user_dict)
def get_users(self):
"""Retrieves a list of all users"""
with self.driver_class() as drv:
return drv.get_users()
user_list = drv.get_users()
if not user_list:
return []
return [User(**user_dict) for user_dict in user_list]
def create_user(self, name, access=None, secret=None, admin=False):
"""Creates a user
@@ -622,7 +636,9 @@ class AuthManager(object):
if access == None: access = str(uuid.uuid4())
if secret == None: secret = str(uuid.uuid4())
with self.driver_class() as drv:
return drv.create_user(name, access, secret, admin)
user_dict = drv.create_user(name, access, secret, admin)
if user_dict:
return User(**user_dict)
def delete_user(self, user):
"""Deletes a user"""
@@ -660,18 +676,27 @@ class AuthManager(object):
def create_key_pair(self, user, key_name, public_key, fingerprint):
"""Creates a key pair for user"""
with self.driver_class() as drv:
return drv.create_key_pair(User.safe_id(user), key_name,
public_key, fingerprint)
kp_dict = drv.create_key_pair(User.safe_id(user),
key_name,
public_key,
fingerprint)
if kp_dict:
return KeyPair(**kp_dict)
def get_key_pair(self, user, key_name):
"""Retrieves a key pair for user"""
with self.driver_class() as drv:
return drv.get_key_pair(User.safe_id(user), key_name)
kp_dict = drv.get_key_pair(User.safe_id(user), key_name)
if kp_dict:
return KeyPair(**kp_dict)
def get_key_pairs(self, user):
"""Retrieves all key pairs for user"""
with self.driver_class() as drv:
return drv.get_key_pairs(User.safe_id(user))
kp_list = drv.get_key_pairs(User.safe_id(user))
if not kp_list:
return []
return [KeyPair(**kp_dict) for kp_dict in kp_list]
def delete_key_pair(self, user, key_name):
"""Deletes a key pair for user"""
@@ -686,7 +711,7 @@ class AuthManager(object):
project = user.id
pid = Project.safe_id(project)
rc = self.__generate_rc(user.access, user.secret, pid)
private_key, signed_cert = self.__generate_x509_cert(user.id, pid)
private_key, signed_cert = self._generate_x509_cert(user.id, pid)
vpn = Vpn(pid)
configfile = open(FLAGS.vpn_client_template,"r")
@@ -726,7 +751,7 @@ class AuthManager(object):
}
return rc
def __generate_x509_cert(self, uid, pid):
def _generate_x509_cert(self, uid, pid):
"""Generate x509 cert for user"""
(private_key, csr) = crypto.generate_x509_cert(
self.__cert_subject(uid))

View File

@@ -150,7 +150,7 @@ class ApiEc2TestCase(test.BaseTestCase):
def setUp(self):
super(ApiEc2TestCase, self).setUp()
self.users = manager.AuthManager()
self.manager = manager.AuthManager()
self.cloud = cloud.CloudController()
self.host = '127.0.0.1'
@@ -175,25 +175,22 @@ class ApiEc2TestCase(test.BaseTestCase):
def test_describe_instances(self):
self.expect_http()
self.mox.ReplayAll()
try:
self.users.create_user('fake', 'fake', 'fake')
except Exception, _err:
pass # User may already exist
user = self.manager.create_user('fake', 'fake', 'fake')
project = self.manager.create_project('fake', 'fake', 'fake')
self.assertEqual(self.ec2.get_all_instances(), [])
self.users.delete_user('fake')
self.manager.delete_project(project)
self.manager.delete_user(user)
def test_get_all_key_pairs(self):
self.expect_http()
self.mox.ReplayAll()
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8)))
try:
self.users.create_user('fake', 'fake', 'fake')
except Exception, _err:
pass # User may already exist
self.users.generate_key_pair('fake', keyname)
user = self.manager.create_user('fake', 'fake', 'fake')
project = self.manager.create_project('fake', 'fake', 'fake')
self.manager.generate_key_pair(user.id, keyname)
rv = self.ec2.get_all_key_pairs()
self.assertTrue(filter(lambda k: k.name == keyname, rv))
self.users.delete_user('fake')
self.manager.delete_project(project)
self.manager.delete_user(user)

View File

@@ -37,29 +37,29 @@ class AuthTestCase(test.BaseTestCase):
super(AuthTestCase, self).setUp()
self.flags(fake_libvirt=True,
fake_storage=True)
self.users = manager.AuthManager()
self.manager = manager.AuthManager()
def test_001_can_create_users(self):
self.users.create_user('test1', 'access', 'secret')
self.users.create_user('test2')
self.manager.create_user('test1', 'access', 'secret')
self.manager.create_user('test2')
def test_002_can_get_user(self):
user = self.users.get_user('test1')
user = self.manager.get_user('test1')
def test_003_can_retreive_properties(self):
user = self.users.get_user('test1')
user = self.manager.get_user('test1')
self.assertEqual('test1', user.id)
self.assertEqual('access', user.access)
self.assertEqual('secret', user.secret)
def test_004_signature_is_valid(self):
#self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? ))
#self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? ))
pass
#raise NotImplementedError
def test_005_can_get_credentials(self):
return
credentials = self.users.get_user('test1').get_credentials()
credentials = self.manager.get_user('test1').get_credentials()
self.assertEqual(credentials,
'export EC2_ACCESS_KEY="access"\n' +
'export EC2_SECRET_KEY="secret"\n' +
@@ -68,14 +68,14 @@ class AuthTestCase(test.BaseTestCase):
'export EC2_USER_ID="test1"\n')
def test_006_test_key_storage(self):
user = self.users.get_user('test1')
user = self.manager.get_user('test1')
user.create_key_pair('public', 'key', 'fingerprint')
key = user.get_key_pair('public')
self.assertEqual('key', key.public_key)
self.assertEqual('fingerprint', key.fingerprint)
def test_007_test_key_generation(self):
user = self.users.get_user('test1')
user = self.manager.get_user('test1')
private_key, fingerprint = user.generate_key_pair('public2')
key = RSA.load_key_string(private_key, callback=lambda: None)
bio = BIO.MemoryBuffer()
@@ -87,71 +87,71 @@ class AuthTestCase(test.BaseTestCase):
converted.split(" ")[1].strip())
def test_008_can_list_key_pairs(self):
keys = self.users.get_user('test1').get_key_pairs()
keys = self.manager.get_user('test1').get_key_pairs()
self.assertTrue(filter(lambda k: k.name == 'public', keys))
self.assertTrue(filter(lambda k: k.name == 'public2', keys))
def test_009_can_delete_key_pair(self):
self.users.get_user('test1').delete_key_pair('public')
keys = self.users.get_user('test1').get_key_pairs()
self.manager.get_user('test1').delete_key_pair('public')
keys = self.manager.get_user('test1').get_key_pairs()
self.assertFalse(filter(lambda k: k.name == 'public', keys))
def test_010_can_list_users(self):
users = self.users.get_users()
users = self.manager.get_users()
logging.warn(users)
self.assertTrue(filter(lambda u: u.id == 'test1', users))
def test_101_can_add_user_role(self):
self.assertFalse(self.users.has_role('test1', 'itsec'))
self.users.add_role('test1', 'itsec')
self.assertTrue(self.users.has_role('test1', 'itsec'))
self.assertFalse(self.manager.has_role('test1', 'itsec'))
self.manager.add_role('test1', 'itsec')
self.assertTrue(self.manager.has_role('test1', 'itsec'))
def test_199_can_remove_user_role(self):
self.assertTrue(self.users.has_role('test1', 'itsec'))
self.users.remove_role('test1', 'itsec')
self.assertFalse(self.users.has_role('test1', 'itsec'))
self.assertTrue(self.manager.has_role('test1', 'itsec'))
self.manager.remove_role('test1', 'itsec')
self.assertFalse(self.manager.has_role('test1', 'itsec'))
def test_201_can_create_project(self):
project = self.users.create_project('testproj', 'test1', 'A test project', ['test1'])
self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects()))
project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1'])
self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects()))
self.assertEqual(project.name, 'testproj')
self.assertEqual(project.description, 'A test project')
self.assertEqual(project.project_manager_id, 'test1')
self.assertTrue(project.has_member('test1'))
def test_202_user1_is_project_member(self):
self.assertTrue(self.users.get_user('test1').is_project_member('testproj'))
self.assertTrue(self.manager.get_user('test1').is_project_member('testproj'))
def test_203_user2_is_not_project_member(self):
self.assertFalse(self.users.get_user('test2').is_project_member('testproj'))
self.assertFalse(self.manager.get_user('test2').is_project_member('testproj'))
def test_204_user1_is_project_manager(self):
self.assertTrue(self.users.get_user('test1').is_project_manager('testproj'))
self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj'))
def test_205_user2_is_not_project_manager(self):
self.assertFalse(self.users.get_user('test2').is_project_manager('testproj'))
self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj'))
def test_206_can_add_user_to_project(self):
self.users.add_to_project('test2', 'testproj')
self.assertTrue(self.users.get_project('testproj').has_member('test2'))
self.manager.add_to_project('test2', 'testproj')
self.assertTrue(self.manager.get_project('testproj').has_member('test2'))
def test_208_can_remove_user_from_project(self):
self.users.remove_from_project('test2', 'testproj')
self.assertFalse(self.users.get_project('testproj').has_member('test2'))
self.manager.remove_from_project('test2', 'testproj')
self.assertFalse(self.manager.get_project('testproj').has_member('test2'))
def test_209_can_generate_x509(self):
# MUST HAVE RUN CLOUD SETUP BY NOW
self.cloud = cloud.CloudController()
self.cloud.setup()
private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1')
logging.debug(signed_cert_string)
_key, cert_str = self.manager._generate_x509_cert('test1', 'testproj')
logging.debug(cert_str)
# Need to verify that it's signed by the right intermediate CA
full_chain = crypto.fetch_ca(project_id='testproj', chain=True)
int_cert = crypto.fetch_ca(project_id='testproj', chain=False)
cloud_cert = crypto.fetch_ca()
logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain)
signed_cert = X509.load_cert_string(signed_cert_string)
signed_cert = X509.load_cert_string(cert_str)
chain_cert = X509.load_cert_string(full_chain)
int_cert = X509.load_cert_string(int_cert)
cloud_cert = X509.load_cert_string(cloud_cert)
@@ -164,42 +164,45 @@ class AuthTestCase(test.BaseTestCase):
self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey()))
def test_210_can_add_project_role(self):
project = self.users.get_project('testproj')
project = self.manager.get_project('testproj')
self.assertFalse(project.has_role('test1', 'sysadmin'))
self.users.add_role('test1', 'sysadmin')
self.manager.add_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
project.add_role('test1', 'sysadmin')
self.assertTrue(project.has_role('test1', 'sysadmin'))
def test_211_can_remove_project_role(self):
project = self.users.get_project('testproj')
project = self.manager.get_project('testproj')
self.assertTrue(project.has_role('test1', 'sysadmin'))
project.remove_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
self.users.remove_role('test1', 'sysadmin')
self.manager.remove_role('test1', 'sysadmin')
self.assertFalse(project.has_role('test1', 'sysadmin'))
def test_212_vpn_ip_and_port_looks_valid(self):
project = self.users.get_project('testproj')
project = self.manager.get_project('testproj')
self.assert_(project.vpn_ip)
self.assert_(project.vpn_port >= FLAGS.vpn_start_port)
self.assert_(project.vpn_port <= FLAGS.vpn_end_port)
def test_213_too_many_vpns(self):
for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)):
users.Vpn.create("vpnuser%s" % i)
self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom")
vpns = []
for i in xrange(manager.Vpn.num_ports_for_ip(FLAGS.vpn_ip)):
vpns.append(manager.Vpn.create("vpnuser%s" % i))
self.assertRaises(manager.NoMorePorts, manager.Vpn.create, "boom")
for vpn in vpns:
vpn.destroy()
def test_299_can_delete_project(self):
self.users.delete_project('testproj')
self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects()))
self.manager.delete_project('testproj')
self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects()))
def test_999_can_delete_users(self):
self.users.delete_user('test1')
users = self.users.get_users()
self.manager.delete_user('test1')
users = self.manager.get_users()
self.assertFalse(filter(lambda u: u.id == 'test1', users))
self.users.delete_user('test2')
self.assertEqual(self.users.get_user('test2'), None)
self.manager.delete_user('test2')
self.assertEqual(self.manager.get_user('test2'), None)
if __name__ == "__main__":

View File

@@ -39,59 +39,61 @@ class NetworkTestCase(test.TrialTestCase):
logging.getLogger().setLevel(logging.DEBUG)
self.manager = manager.AuthManager()
self.dnsmasq = FakeDNSMasq()
try:
self.manager.create_user('netuser', 'netuser', 'netuser')
except: pass
self.user = self.manager.create_user('netuser', 'netuser', 'netuser')
self.projects = []
self.projects.append(self.manager.create_project('netuser',
'netuser',
'netuser'))
for i in range(0, 6):
name = 'project%s' % i
if not self.manager.get_project(name):
self.manager.create_project(name, 'netuser', name)
self.projects.append(self.manager.create_project(name,
'netuser',
name))
self.network = network.PublicNetworkController()
def tearDown(self):
super(NetworkTestCase, self).tearDown()
for i in range(0, 6):
name = 'project%s' % i
self.manager.delete_project(name)
self.manager.delete_user('netuser')
for project in self.projects:
self.manager.delete_project(project)
self.manager.delete_user(self.user)
def test_public_network_allocation(self):
pubnet = IPy.IP(flags.FLAGS.public_range)
address = self.network.allocate_ip("netuser", "project0", "public")
address = self.network.allocate_ip(self.user.id, self.projects[0].id, "public")
self.assertTrue(IPy.IP(address) in pubnet)
self.assertTrue(IPy.IP(address) in self.network.network)
def test_allocate_deallocate_ip(self):
address = network.allocate_ip(
"netuser", "project0", utils.generate_mac())
self.user.id, self.projects[0].id, utils.generate_mac())
logging.debug("Was allocated %s" % (address))
net = network.get_project_network("project0", "default")
self.assertEqual(True, is_in_project(address, "project0"))
net = network.get_project_network(self.projects[0].id, "default")
self.assertEqual(True, is_in_project(address, self.projects[0].id))
mac = utils.generate_mac()
hostname = "test-host"
self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name)
rv = network.deallocate_ip(address)
# Doesn't go away until it's dhcp released
self.assertEqual(True, is_in_project(address, "project0"))
self.assertEqual(True, is_in_project(address, self.projects[0].id))
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
self.assertEqual(False, is_in_project(address, "project0"))
self.assertEqual(False, is_in_project(address, self.projects[0].id))
def test_range_allocation(self):
mac = utils.generate_mac()
secondmac = utils.generate_mac()
hostname = "test-host"
address = network.allocate_ip(
"netuser", "project0", mac)
self.user.id, self.projects[0].id, mac)
secondaddress = network.allocate_ip(
"netuser", "project1", secondmac)
net = network.get_project_network("project0", "default")
secondnet = network.get_project_network("project1", "default")
self.user, "project1", secondmac)
net = network.get_project_network(self.projects[0].id, "default")
secondnet = network.get_project_network(self.projects[1].id, "default")
self.assertEqual(True, is_in_project(address, "project0"))
self.assertEqual(True, is_in_project(secondaddress, "project1"))
self.assertEqual(False, is_in_project(address, "project1"))
self.assertEqual(True, is_in_project(address, self.projects[0].id))
self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id))
self.assertEqual(False, is_in_project(address, self.projects[1].id))
# Addresses are allocated before they're issued
self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name)
@@ -100,34 +102,34 @@ class NetworkTestCase(test.TrialTestCase):
rv = network.deallocate_ip(address)
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
self.assertEqual(False, is_in_project(address, "project0"))
self.assertEqual(False, is_in_project(address, self.projects[0].id))
# First address release shouldn't affect the second
self.assertEqual(True, is_in_project(secondaddress, "project1"))
self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id))
rv = network.deallocate_ip(secondaddress)
self.dnsmasq.release_ip(secondmac, secondaddress,
hostname, secondnet.bridge_name)
self.assertEqual(False, is_in_project(secondaddress, "project1"))
self.assertEqual(False, is_in_project(secondaddress, self.projects[1].id))
def test_subnet_edge(self):
secondaddress = network.allocate_ip("netuser", "project0",
secondaddress = network.allocate_ip(self.user.id, self.projects[0].id,
utils.generate_mac())
hostname = "toomany-hosts"
for project in range(1,5):
project_id = "project%s" % (project)
for i in range(1,5):
project_id = self.projects[i].id
mac = utils.generate_mac()
mac2 = utils.generate_mac()
mac3 = utils.generate_mac()
address = network.allocate_ip(
"netuser", project_id, mac)
self.user, project_id, mac)
address2 = network.allocate_ip(
"netuser", project_id, mac2)
self.user, project_id, mac2)
address3 = network.allocate_ip(
"netuser", project_id, mac3)
self.assertEqual(False, is_in_project(address, "project0"))
self.assertEqual(False, is_in_project(address2, "project0"))
self.assertEqual(False, is_in_project(address3, "project0"))
self.user, project_id, mac3)
self.assertEqual(False, is_in_project(address, self.projects[0].id))
self.assertEqual(False, is_in_project(address2, self.projects[0].id))
self.assertEqual(False, is_in_project(address3, self.projects[0].id))
rv = network.deallocate_ip(address)
rv = network.deallocate_ip(address2)
rv = network.deallocate_ip(address3)
@@ -135,7 +137,7 @@ class NetworkTestCase(test.TrialTestCase):
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
self.dnsmasq.release_ip(mac2, address2, hostname, net.bridge_name)
self.dnsmasq.release_ip(mac3, address3, hostname, net.bridge_name)
net = network.get_project_network("project0", "default")
net = network.get_project_network(self.projects[0].id, "default")
rv = network.deallocate_ip(secondaddress)
self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name)
@@ -150,22 +152,23 @@ class NetworkTestCase(test.TrialTestCase):
Network size is 32, there are 5 addresses reserved for VPN.
So we should get 23 usable addresses
"""
net = network.get_project_network("project0", "default")
net = network.get_project_network(self.projects[0].id, "default")
hostname = "toomany-hosts"
macs = {}
addresses = {}
for i in range(0, 22):
macs[i] = utils.generate_mac()
addresses[i] = network.allocate_ip("netuser", "project0", macs[i])
addresses[i] = network.allocate_ip(self.user.id, self.projects[0].id, macs[i])
self.dnsmasq.issue_ip(macs[i], addresses[i], hostname, net.bridge_name)
self.assertRaises(NoMoreAddresses, network.allocate_ip, "netuser", "project0", utils.generate_mac())
self.assertRaises(NoMoreAddresses, network.allocate_ip, self.user.id, self.projects[0].id, utils.generate_mac())
for i in range(0, 22):
rv = network.deallocate_ip(addresses[i])
self.dnsmasq.release_ip(macs[i], addresses[i], hostname, net.bridge_name)
def is_in_project(address, project_id):
print address, list(network.get_project_network(project_id).list_addresses())
return address in network.get_project_network(project_id).list_addresses()
def _get_project_addresses(project_id):