From a599d9b04c908a46e09265e0637ce647f317f2a9 Mon Sep 17 00:00:00 2001 From: Vishvananda Ishaya Date: Tue, 20 Jul 2010 09:15:36 -0500 Subject: [PATCH] Test cleanup, make driver return dictionaries and construct objects in manager --- nova/auth/ldapdriver.py | 45 +++++++------- nova/auth/manager.py | 105 ++++++++++++++++++++------------- nova/tests/api_unittest.py | 23 ++++---- nova/tests/auth_unittest.py | 95 ++++++++++++++--------------- nova/tests/network_unittest.py | 79 +++++++++++++------------ 5 files changed, 187 insertions(+), 160 deletions(-) diff --git a/nova/auth/ldapdriver.py b/nova/auth/ldapdriver.py index 89c4defd..d330ae72 100644 --- a/nova/auth/ldapdriver.py +++ b/nova/auth/ldapdriver.py @@ -28,7 +28,6 @@ import logging from nova import exception from nova import flags -from nova.auth import manager try: import ldap @@ -322,7 +321,7 @@ class LdapDriver(object): keys = self.get_key_pairs(uid) if keys != None: for key in keys: - self.delete_key_pair(uid, key.name) + self.delete_key_pair(uid, key['name']) def __role_to_dn(self, role, project_id=None): """Convert role to corresponding dn""" @@ -438,38 +437,38 @@ class LdapDriver(object): """Convert ldap attributes to User object""" if attr == None: return None - return manager.User( - id = attr['uid'][0], - name = attr['cn'][0], - access = attr['accessKey'][0], - secret = attr['secretKey'][0], - admin = (attr['isAdmin'][0] == 'TRUE') - ) + return { + 'id': attr['uid'][0], + 'name': attr['cn'][0], + 'access': attr['accessKey'][0], + 'secret': attr['secretKey'][0], + 'admin': (attr['isAdmin'][0] == 'TRUE') + } def __to_key_pair(self, owner, attr): """Convert ldap attributes to KeyPair object""" if attr == None: return None - return manager.KeyPair( - id = attr['cn'][0], - name = attr['cn'][0], - owner_id = owner, - public_key = attr['sshPublicKey'][0], - fingerprint = attr['keyFingerprint'][0], - ) + return { + 'id': attr['cn'][0], + 'name': attr['cn'][0], + 'owner_id': owner, + 'public_key': attr['sshPublicKey'][0], + 'fingerprint': attr['keyFingerprint'][0], + } def __to_project(self, attr): """Convert ldap attributes to Project object""" if attr == None: return None member_dns = attr.get('member', []) - return manager.Project( - id = attr['cn'][0], - name = attr['cn'][0], - project_manager_id = self.__dn_to_uid(attr['projectManager'][0]), - description = attr.get('description', [None])[0], - member_ids = [self.__dn_to_uid(x) for x in member_dns] - ) + return { + 'id': attr['cn'][0], + 'name': attr['cn'][0], + 'project_manager_id': self.__dn_to_uid(attr['projectManager'][0]), + 'description': attr.get('description', [None])[0], + 'member_ids': [self.__dn_to_uid(x) for x in member_dns] + } def __dn_to_uid(self, dn): """Convert user dn to uid""" diff --git a/nova/auth/manager.py b/nova/auth/manager.py index 87cfd9a9..2facffe5 100644 --- a/nova/auth/manager.py +++ b/nova/auth/manager.py @@ -33,9 +33,9 @@ from nova import datastore from nova import exception from nova import flags from nova import objectstore # for flags -from nova import signer from nova import utils from nova.auth import ldapdriver +from nova.auth import signer FLAGS = flags.FLAGS # NOTE(vish): a user with one of these roles will be a superuser and @@ -187,6 +187,14 @@ class Project(AuthBase): def project_manager(self): return AuthManager().get_user(self.project_manager_id) + @property + def vpn_ip(self): + return AuthManager().get_project_vpn_ip(self) + + @property + def vpn_port(self): + return AuthManager().get_project_vpn_port(self) + def has_manager(self, user): return AuthManager().is_project_manager(user, self) @@ -314,16 +322,6 @@ class AuthManager(object): def __init__(self, *args, **kwargs): self.driver_class = kwargs.get('driver_class', ldapdriver.LdapDriver) - if FLAGS.fake_tests: - try: - self.create_user('fake', 'fake', 'fake') - except: pass - try: - self.create_user('user', 'user', 'user') - except: pass - try: - self.create_user('admin', 'admin', 'admin', True) - except: pass def authenticate(self, access, signature, params, verb='GET', server_string='127.0.0.1:8773', path='/', @@ -508,6 +506,21 @@ class AuthManager(object): with self.driver_class() as drv: drv.remove_role(User.safe_id(user), role, Project.safe_id(project)) + def get_project(self, pid): + """Get project object by id""" + with self.driver_class() as drv: + project_dict = drv.get_project(pid) + if project_dict: + return Project(**project_dict) + + def get_projects(self): + """Retrieves list of all projects""" + with self.driver_class() as drv: + project_list = drv.get_projects() + if not project_list: + return [] + return [Project(**project_dict) for project_dict in project_list] + def create_project(self, name, manager_user, description=None, member_users=None): """Create a project @@ -532,26 +545,14 @@ class AuthManager(object): """ if member_users: member_users = [User.safe_id(u) for u in member_users] - # NOTE(vish): try to associate a vpn ip and port first because - # if it throws an exception, we save having to - # create and destroy a project - Vpn.create(name) with self.driver_class() as drv: - return drv.create_project(name, - User.safe_id(manager_user), - description, - member_users) - - def get_projects(self): - """Retrieves list of all projects""" - with self.driver_class() as drv: - return drv.get_projects() - - - def get_project(self, pid): - """Get project object by id""" - with self.driver_class() as drv: - return drv.get_project(pid) + project_dict = drv.create_project(name, + User.safe_id(manager_user), + description, + member_users) + if project_dict: + Vpn.create(project_dict['id']) + return Project(**project_dict) def add_to_project(self, user, project): """Add user to project""" @@ -577,6 +578,12 @@ class AuthManager(object): return drv.remove_from_project(User.safe_id(user), Project.safe_id(project)) + def get_project_vpn_ip(self, project): + return Vpn(Project.safe_id(project)).ip + + def get_project_vpn_port(self, project): + return Vpn(Project.safe_id(project)).port + def delete_project(self, project): """Deletes a project""" with self.driver_class() as drv: @@ -585,17 +592,24 @@ class AuthManager(object): def get_user(self, uid): """Retrieves a user by id""" with self.driver_class() as drv: - return drv.get_user(uid) + user_dict = drv.get_user(uid) + if user_dict: + return User(**user_dict) def get_user_from_access_key(self, access_key): """Retrieves a user by access key""" with self.driver_class() as drv: - return drv.get_user_from_access_key(access_key) + user_dict = drv.get_user_from_access_key(access_key) + if user_dict: + return User(**user_dict) def get_users(self): """Retrieves a list of all users""" with self.driver_class() as drv: - return drv.get_users() + user_list = drv.get_users() + if not user_list: + return [] + return [User(**user_dict) for user_dict in user_list] def create_user(self, name, access=None, secret=None, admin=False): """Creates a user @@ -622,7 +636,9 @@ class AuthManager(object): if access == None: access = str(uuid.uuid4()) if secret == None: secret = str(uuid.uuid4()) with self.driver_class() as drv: - return drv.create_user(name, access, secret, admin) + user_dict = drv.create_user(name, access, secret, admin) + if user_dict: + return User(**user_dict) def delete_user(self, user): """Deletes a user""" @@ -660,18 +676,27 @@ class AuthManager(object): def create_key_pair(self, user, key_name, public_key, fingerprint): """Creates a key pair for user""" with self.driver_class() as drv: - return drv.create_key_pair(User.safe_id(user), key_name, - public_key, fingerprint) + kp_dict = drv.create_key_pair(User.safe_id(user), + key_name, + public_key, + fingerprint) + if kp_dict: + return KeyPair(**kp_dict) def get_key_pair(self, user, key_name): """Retrieves a key pair for user""" with self.driver_class() as drv: - return drv.get_key_pair(User.safe_id(user), key_name) + kp_dict = drv.get_key_pair(User.safe_id(user), key_name) + if kp_dict: + return KeyPair(**kp_dict) def get_key_pairs(self, user): """Retrieves all key pairs for user""" with self.driver_class() as drv: - return drv.get_key_pairs(User.safe_id(user)) + kp_list = drv.get_key_pairs(User.safe_id(user)) + if not kp_list: + return [] + return [KeyPair(**kp_dict) for kp_dict in kp_list] def delete_key_pair(self, user, key_name): """Deletes a key pair for user""" @@ -686,7 +711,7 @@ class AuthManager(object): project = user.id pid = Project.safe_id(project) rc = self.__generate_rc(user.access, user.secret, pid) - private_key, signed_cert = self.__generate_x509_cert(user.id, pid) + private_key, signed_cert = self._generate_x509_cert(user.id, pid) vpn = Vpn(pid) configfile = open(FLAGS.vpn_client_template,"r") @@ -726,7 +751,7 @@ class AuthManager(object): } return rc - def __generate_x509_cert(self, uid, pid): + def _generate_x509_cert(self, uid, pid): """Generate x509 cert for user""" (private_key, csr) = crypto.generate_x509_cert( self.__cert_subject(uid)) diff --git a/nova/tests/api_unittest.py b/nova/tests/api_unittest.py index 5c26192b..4477a1fe 100644 --- a/nova/tests/api_unittest.py +++ b/nova/tests/api_unittest.py @@ -150,7 +150,7 @@ class ApiEc2TestCase(test.BaseTestCase): def setUp(self): super(ApiEc2TestCase, self).setUp() - self.users = manager.AuthManager() + self.manager = manager.AuthManager() self.cloud = cloud.CloudController() self.host = '127.0.0.1' @@ -175,25 +175,22 @@ class ApiEc2TestCase(test.BaseTestCase): def test_describe_instances(self): self.expect_http() self.mox.ReplayAll() - try: - self.users.create_user('fake', 'fake', 'fake') - except Exception, _err: - pass # User may already exist + user = self.manager.create_user('fake', 'fake', 'fake') + project = self.manager.create_project('fake', 'fake', 'fake') self.assertEqual(self.ec2.get_all_instances(), []) - self.users.delete_user('fake') + self.manager.delete_project(project) + self.manager.delete_user(user) def test_get_all_key_pairs(self): self.expect_http() self.mox.ReplayAll() keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd") for x in range(random.randint(4, 8))) - try: - self.users.create_user('fake', 'fake', 'fake') - except Exception, _err: - pass # User may already exist - self.users.generate_key_pair('fake', keyname) + user = self.manager.create_user('fake', 'fake', 'fake') + project = self.manager.create_project('fake', 'fake', 'fake') + self.manager.generate_key_pair(user.id, keyname) rv = self.ec2.get_all_key_pairs() self.assertTrue(filter(lambda k: k.name == keyname, rv)) - self.users.delete_user('fake') - + self.manager.delete_project(project) + self.manager.delete_user(user) diff --git a/nova/tests/auth_unittest.py b/nova/tests/auth_unittest.py index 000f6bf1..0cd377b7 100644 --- a/nova/tests/auth_unittest.py +++ b/nova/tests/auth_unittest.py @@ -37,29 +37,29 @@ class AuthTestCase(test.BaseTestCase): super(AuthTestCase, self).setUp() self.flags(fake_libvirt=True, fake_storage=True) - self.users = manager.AuthManager() + self.manager = manager.AuthManager() def test_001_can_create_users(self): - self.users.create_user('test1', 'access', 'secret') - self.users.create_user('test2') + self.manager.create_user('test1', 'access', 'secret') + self.manager.create_user('test2') def test_002_can_get_user(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') def test_003_can_retreive_properties(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') self.assertEqual('test1', user.id) self.assertEqual('access', user.access) self.assertEqual('secret', user.secret) def test_004_signature_is_valid(self): - #self.assertTrue(self.users.authenticate( **boto.generate_url ... ? ? ? )) + #self.assertTrue(self.manager.authenticate( **boto.generate_url ... ? ? ? )) pass #raise NotImplementedError def test_005_can_get_credentials(self): return - credentials = self.users.get_user('test1').get_credentials() + credentials = self.manager.get_user('test1').get_credentials() self.assertEqual(credentials, 'export EC2_ACCESS_KEY="access"\n' + 'export EC2_SECRET_KEY="secret"\n' + @@ -68,14 +68,14 @@ class AuthTestCase(test.BaseTestCase): 'export EC2_USER_ID="test1"\n') def test_006_test_key_storage(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') user.create_key_pair('public', 'key', 'fingerprint') key = user.get_key_pair('public') self.assertEqual('key', key.public_key) self.assertEqual('fingerprint', key.fingerprint) def test_007_test_key_generation(self): - user = self.users.get_user('test1') + user = self.manager.get_user('test1') private_key, fingerprint = user.generate_key_pair('public2') key = RSA.load_key_string(private_key, callback=lambda: None) bio = BIO.MemoryBuffer() @@ -87,71 +87,71 @@ class AuthTestCase(test.BaseTestCase): converted.split(" ")[1].strip()) def test_008_can_list_key_pairs(self): - keys = self.users.get_user('test1').get_key_pairs() + keys = self.manager.get_user('test1').get_key_pairs() self.assertTrue(filter(lambda k: k.name == 'public', keys)) self.assertTrue(filter(lambda k: k.name == 'public2', keys)) def test_009_can_delete_key_pair(self): - self.users.get_user('test1').delete_key_pair('public') - keys = self.users.get_user('test1').get_key_pairs() + self.manager.get_user('test1').delete_key_pair('public') + keys = self.manager.get_user('test1').get_key_pairs() self.assertFalse(filter(lambda k: k.name == 'public', keys)) def test_010_can_list_users(self): - users = self.users.get_users() + users = self.manager.get_users() logging.warn(users) self.assertTrue(filter(lambda u: u.id == 'test1', users)) def test_101_can_add_user_role(self): - self.assertFalse(self.users.has_role('test1', 'itsec')) - self.users.add_role('test1', 'itsec') - self.assertTrue(self.users.has_role('test1', 'itsec')) + self.assertFalse(self.manager.has_role('test1', 'itsec')) + self.manager.add_role('test1', 'itsec') + self.assertTrue(self.manager.has_role('test1', 'itsec')) def test_199_can_remove_user_role(self): - self.assertTrue(self.users.has_role('test1', 'itsec')) - self.users.remove_role('test1', 'itsec') - self.assertFalse(self.users.has_role('test1', 'itsec')) + self.assertTrue(self.manager.has_role('test1', 'itsec')) + self.manager.remove_role('test1', 'itsec') + self.assertFalse(self.manager.has_role('test1', 'itsec')) def test_201_can_create_project(self): - project = self.users.create_project('testproj', 'test1', 'A test project', ['test1']) - self.assertTrue(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + project = self.manager.create_project('testproj', 'test1', 'A test project', ['test1']) + self.assertTrue(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) self.assertEqual(project.name, 'testproj') self.assertEqual(project.description, 'A test project') self.assertEqual(project.project_manager_id, 'test1') self.assertTrue(project.has_member('test1')) def test_202_user1_is_project_member(self): - self.assertTrue(self.users.get_user('test1').is_project_member('testproj')) + self.assertTrue(self.manager.get_user('test1').is_project_member('testproj')) def test_203_user2_is_not_project_member(self): - self.assertFalse(self.users.get_user('test2').is_project_member('testproj')) + self.assertFalse(self.manager.get_user('test2').is_project_member('testproj')) def test_204_user1_is_project_manager(self): - self.assertTrue(self.users.get_user('test1').is_project_manager('testproj')) + self.assertTrue(self.manager.get_user('test1').is_project_manager('testproj')) def test_205_user2_is_not_project_manager(self): - self.assertFalse(self.users.get_user('test2').is_project_manager('testproj')) + self.assertFalse(self.manager.get_user('test2').is_project_manager('testproj')) def test_206_can_add_user_to_project(self): - self.users.add_to_project('test2', 'testproj') - self.assertTrue(self.users.get_project('testproj').has_member('test2')) + self.manager.add_to_project('test2', 'testproj') + self.assertTrue(self.manager.get_project('testproj').has_member('test2')) def test_208_can_remove_user_from_project(self): - self.users.remove_from_project('test2', 'testproj') - self.assertFalse(self.users.get_project('testproj').has_member('test2')) + self.manager.remove_from_project('test2', 'testproj') + self.assertFalse(self.manager.get_project('testproj').has_member('test2')) def test_209_can_generate_x509(self): # MUST HAVE RUN CLOUD SETUP BY NOW self.cloud = cloud.CloudController() self.cloud.setup() - private_key, signed_cert_string = self.users.get_project('testproj').generate_x509_cert('test1') - logging.debug(signed_cert_string) + _key, cert_str = self.manager._generate_x509_cert('test1', 'testproj') + logging.debug(cert_str) # Need to verify that it's signed by the right intermediate CA full_chain = crypto.fetch_ca(project_id='testproj', chain=True) int_cert = crypto.fetch_ca(project_id='testproj', chain=False) cloud_cert = crypto.fetch_ca() logging.debug("CA chain:\n\n =====\n%s\n\n=====" % full_chain) - signed_cert = X509.load_cert_string(signed_cert_string) + signed_cert = X509.load_cert_string(cert_str) chain_cert = X509.load_cert_string(full_chain) int_cert = X509.load_cert_string(int_cert) cloud_cert = X509.load_cert_string(cloud_cert) @@ -164,42 +164,45 @@ class AuthTestCase(test.BaseTestCase): self.assertFalse(signed_cert.verify(cloud_cert.get_pubkey())) def test_210_can_add_project_role(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.add_role('test1', 'sysadmin') + self.manager.add_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) project.add_role('test1', 'sysadmin') self.assertTrue(project.has_role('test1', 'sysadmin')) def test_211_can_remove_project_role(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assertTrue(project.has_role('test1', 'sysadmin')) project.remove_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) - self.users.remove_role('test1', 'sysadmin') + self.manager.remove_role('test1', 'sysadmin') self.assertFalse(project.has_role('test1', 'sysadmin')) def test_212_vpn_ip_and_port_looks_valid(self): - project = self.users.get_project('testproj') + project = self.manager.get_project('testproj') self.assert_(project.vpn_ip) self.assert_(project.vpn_port >= FLAGS.vpn_start_port) self.assert_(project.vpn_port <= FLAGS.vpn_end_port) def test_213_too_many_vpns(self): - for i in xrange(users.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): - users.Vpn.create("vpnuser%s" % i) - self.assertRaises(users.NoMorePorts, users.Vpn.create, "boom") + vpns = [] + for i in xrange(manager.Vpn.num_ports_for_ip(FLAGS.vpn_ip)): + vpns.append(manager.Vpn.create("vpnuser%s" % i)) + self.assertRaises(manager.NoMorePorts, manager.Vpn.create, "boom") + for vpn in vpns: + vpn.destroy() def test_299_can_delete_project(self): - self.users.delete_project('testproj') - self.assertFalse(filter(lambda p: p.name == 'testproj', self.users.get_projects())) + self.manager.delete_project('testproj') + self.assertFalse(filter(lambda p: p.name == 'testproj', self.manager.get_projects())) def test_999_can_delete_users(self): - self.users.delete_user('test1') - users = self.users.get_users() + self.manager.delete_user('test1') + users = self.manager.get_users() self.assertFalse(filter(lambda u: u.id == 'test1', users)) - self.users.delete_user('test2') - self.assertEqual(self.users.get_user('test2'), None) + self.manager.delete_user('test2') + self.assertEqual(self.manager.get_user('test2'), None) if __name__ == "__main__": diff --git a/nova/tests/network_unittest.py b/nova/tests/network_unittest.py index 68cd488b..0e1b5506 100644 --- a/nova/tests/network_unittest.py +++ b/nova/tests/network_unittest.py @@ -39,59 +39,61 @@ class NetworkTestCase(test.TrialTestCase): logging.getLogger().setLevel(logging.DEBUG) self.manager = manager.AuthManager() self.dnsmasq = FakeDNSMasq() - try: - self.manager.create_user('netuser', 'netuser', 'netuser') - except: pass + self.user = self.manager.create_user('netuser', 'netuser', 'netuser') + self.projects = [] + self.projects.append(self.manager.create_project('netuser', + 'netuser', + 'netuser')) for i in range(0, 6): name = 'project%s' % i - if not self.manager.get_project(name): - self.manager.create_project(name, 'netuser', name) + self.projects.append(self.manager.create_project(name, + 'netuser', + name)) self.network = network.PublicNetworkController() def tearDown(self): super(NetworkTestCase, self).tearDown() - for i in range(0, 6): - name = 'project%s' % i - self.manager.delete_project(name) - self.manager.delete_user('netuser') + for project in self.projects: + self.manager.delete_project(project) + self.manager.delete_user(self.user) def test_public_network_allocation(self): pubnet = IPy.IP(flags.FLAGS.public_range) - address = self.network.allocate_ip("netuser", "project0", "public") + address = self.network.allocate_ip(self.user.id, self.projects[0].id, "public") self.assertTrue(IPy.IP(address) in pubnet) self.assertTrue(IPy.IP(address) in self.network.network) def test_allocate_deallocate_ip(self): address = network.allocate_ip( - "netuser", "project0", utils.generate_mac()) + self.user.id, self.projects[0].id, utils.generate_mac()) logging.debug("Was allocated %s" % (address)) - net = network.get_project_network("project0", "default") - self.assertEqual(True, is_in_project(address, "project0")) + net = network.get_project_network(self.projects[0].id, "default") + self.assertEqual(True, is_in_project(address, self.projects[0].id)) mac = utils.generate_mac() hostname = "test-host" self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name) rv = network.deallocate_ip(address) # Doesn't go away until it's dhcp released - self.assertEqual(True, is_in_project(address, "project0")) + self.assertEqual(True, is_in_project(address, self.projects[0].id)) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) - self.assertEqual(False, is_in_project(address, "project0")) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) def test_range_allocation(self): mac = utils.generate_mac() secondmac = utils.generate_mac() hostname = "test-host" address = network.allocate_ip( - "netuser", "project0", mac) + self.user.id, self.projects[0].id, mac) secondaddress = network.allocate_ip( - "netuser", "project1", secondmac) - net = network.get_project_network("project0", "default") - secondnet = network.get_project_network("project1", "default") + self.user, "project1", secondmac) + net = network.get_project_network(self.projects[0].id, "default") + secondnet = network.get_project_network(self.projects[1].id, "default") - self.assertEqual(True, is_in_project(address, "project0")) - self.assertEqual(True, is_in_project(secondaddress, "project1")) - self.assertEqual(False, is_in_project(address, "project1")) + self.assertEqual(True, is_in_project(address, self.projects[0].id)) + self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id)) + self.assertEqual(False, is_in_project(address, self.projects[1].id)) # Addresses are allocated before they're issued self.dnsmasq.issue_ip(mac, address, hostname, net.bridge_name) @@ -100,34 +102,34 @@ class NetworkTestCase(test.TrialTestCase): rv = network.deallocate_ip(address) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) - self.assertEqual(False, is_in_project(address, "project0")) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) # First address release shouldn't affect the second - self.assertEqual(True, is_in_project(secondaddress, "project1")) + self.assertEqual(True, is_in_project(secondaddress, self.projects[1].id)) rv = network.deallocate_ip(secondaddress) self.dnsmasq.release_ip(secondmac, secondaddress, hostname, secondnet.bridge_name) - self.assertEqual(False, is_in_project(secondaddress, "project1")) + self.assertEqual(False, is_in_project(secondaddress, self.projects[1].id)) def test_subnet_edge(self): - secondaddress = network.allocate_ip("netuser", "project0", + secondaddress = network.allocate_ip(self.user.id, self.projects[0].id, utils.generate_mac()) hostname = "toomany-hosts" - for project in range(1,5): - project_id = "project%s" % (project) + for i in range(1,5): + project_id = self.projects[i].id mac = utils.generate_mac() mac2 = utils.generate_mac() mac3 = utils.generate_mac() address = network.allocate_ip( - "netuser", project_id, mac) + self.user, project_id, mac) address2 = network.allocate_ip( - "netuser", project_id, mac2) + self.user, project_id, mac2) address3 = network.allocate_ip( - "netuser", project_id, mac3) - self.assertEqual(False, is_in_project(address, "project0")) - self.assertEqual(False, is_in_project(address2, "project0")) - self.assertEqual(False, is_in_project(address3, "project0")) + self.user, project_id, mac3) + self.assertEqual(False, is_in_project(address, self.projects[0].id)) + self.assertEqual(False, is_in_project(address2, self.projects[0].id)) + self.assertEqual(False, is_in_project(address3, self.projects[0].id)) rv = network.deallocate_ip(address) rv = network.deallocate_ip(address2) rv = network.deallocate_ip(address3) @@ -135,7 +137,7 @@ class NetworkTestCase(test.TrialTestCase): self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) self.dnsmasq.release_ip(mac2, address2, hostname, net.bridge_name) self.dnsmasq.release_ip(mac3, address3, hostname, net.bridge_name) - net = network.get_project_network("project0", "default") + net = network.get_project_network(self.projects[0].id, "default") rv = network.deallocate_ip(secondaddress) self.dnsmasq.release_ip(mac, address, hostname, net.bridge_name) @@ -150,22 +152,23 @@ class NetworkTestCase(test.TrialTestCase): Network size is 32, there are 5 addresses reserved for VPN. So we should get 23 usable addresses """ - net = network.get_project_network("project0", "default") + net = network.get_project_network(self.projects[0].id, "default") hostname = "toomany-hosts" macs = {} addresses = {} for i in range(0, 22): macs[i] = utils.generate_mac() - addresses[i] = network.allocate_ip("netuser", "project0", macs[i]) + addresses[i] = network.allocate_ip(self.user.id, self.projects[0].id, macs[i]) self.dnsmasq.issue_ip(macs[i], addresses[i], hostname, net.bridge_name) - self.assertRaises(NoMoreAddresses, network.allocate_ip, "netuser", "project0", utils.generate_mac()) + self.assertRaises(NoMoreAddresses, network.allocate_ip, self.user.id, self.projects[0].id, utils.generate_mac()) for i in range(0, 22): rv = network.deallocate_ip(addresses[i]) self.dnsmasq.release_ip(macs[i], addresses[i], hostname, net.bridge_name) def is_in_project(address, project_id): + print address, list(network.get_project_network(project_id).list_addresses()) return address in network.get_project_network(project_id).list_addresses() def _get_project_addresses(project_id):