merge from upstream
This commit is contained in:
		
							
								
								
									
										6
									
								
								Authors
									
									
									
									
									
								
							
							
						
						
									
										6
									
								
								Authors
									
									
									
									
									
								
							| @@ -4,8 +4,8 @@ Anthony Young <sleepsonthefloor@gmail.com> | ||||
| Antony Messerli <ant@openstack.org> | ||||
| Armando Migliaccio <Armando.Migliaccio@eu.citrix.com> | ||||
| Chiradeep Vittal <chiradeep@cloud.com> | ||||
| Chris Behrens <cbehrens@codestud.com> | ||||
| Chmouel Boudjnah <chmouel@chmouel.com> | ||||
| Chris Behrens <cbehrens@codestud.com> | ||||
| Cory Wright <corywright@gmail.com> | ||||
| David Pravec <David.Pravec@danix.org> | ||||
| Dean Troyer <dtroyer@gmail.com> | ||||
| @@ -14,6 +14,7 @@ Ed Leafe <ed@leafe.com> | ||||
| Eldar Nugaev <enugaev@griddynamics.com> | ||||
| Eric Day <eday@oddments.org> | ||||
| Ewan Mellor <ewan.mellor@citrix.com> | ||||
| Hisaharu Ishii <ishii.hisaharu@lab.ntt.co.jp> | ||||
| Hisaki Ohara <hisaki.ohara@intel.com> | ||||
| Ilya Alekseyev <ialekseev@griddynamics.com> | ||||
| Jay Pipes <jaypipes@gmail.com> | ||||
| @@ -26,11 +27,14 @@ Josh Kearney <josh.kearney@rackspace.com> | ||||
| Joshua McKenty <jmckenty@gmail.com> | ||||
| Justin Santa Barbara <justin@fathomdb.com> | ||||
| Ken Pepple <ken.pepple@gmail.com> | ||||
| Koji Iida <iida.koji@lab.ntt.co.jp> | ||||
| Lorin Hochstein <lorin@isi.edu> | ||||
| Matt Dietz <matt.dietz@rackspace.com> | ||||
| Michael Gundlach <michael.gundlach@rackspace.com> | ||||
| Monsyne Dragon <mdragon@rackspace.com> | ||||
| Monty Taylor <mordred@inaugust.com> | ||||
| MORITA Kazutaka <morita.kazutaka@gmail.com> | ||||
| Nachi Ueno <ueno.nachi@lab.ntt.co.jp> <openstack@lab.ntt.co.jp> <nati.ueno@gmail.com> <nova@u4> | ||||
| Paul Voccio <paul@openstack.org> | ||||
| Rick Clark <rick@openstack.org> | ||||
| Rick Harris <rconradharris@gmail.com> | ||||
|   | ||||
| @@ -91,6 +91,7 @@ flags.DECLARE('num_networks', 'nova.network.manager') | ||||
| flags.DECLARE('network_size', 'nova.network.manager') | ||||
| flags.DECLARE('vlan_start', 'nova.network.manager') | ||||
| flags.DECLARE('vpn_start', 'nova.network.manager') | ||||
| flags.DECLARE('fixed_range_v6', 'nova.network.manager') | ||||
|  | ||||
|  | ||||
| class VpnCommands(object): | ||||
| @@ -439,11 +440,12 @@ class NetworkCommands(object): | ||||
|     """Class for managing networks.""" | ||||
|  | ||||
|     def create(self, fixed_range=None, num_networks=None, | ||||
|                network_size=None, vlan_start=None, vpn_start=None): | ||||
|                network_size=None, vlan_start=None, vpn_start=None, | ||||
|                fixed_range_v6=None): | ||||
|         """Creates fixed ips for host by range | ||||
|         arguments: [fixed_range=FLAG], [num_networks=FLAG], | ||||
|                    [network_size=FLAG], [vlan_start=FLAG], | ||||
|                    [vpn_start=FLAG]""" | ||||
|                    [vpn_start=FLAG], [fixed_range_v6=FLAG]""" | ||||
|         if not fixed_range: | ||||
|             fixed_range = FLAGS.fixed_range | ||||
|         if not num_networks: | ||||
| @@ -454,11 +456,13 @@ class NetworkCommands(object): | ||||
|             vlan_start = FLAGS.vlan_start | ||||
|         if not vpn_start: | ||||
|             vpn_start = FLAGS.vpn_start | ||||
|         if not fixed_range_v6: | ||||
|             fixed_range_v6 = FLAGS.fixed_range_v6 | ||||
|         net_manager = utils.import_object(FLAGS.network_manager) | ||||
|         net_manager.create_networks(context.get_admin_context(), | ||||
|                                     fixed_range, int(num_networks), | ||||
|                                     int(network_size), int(vlan_start), | ||||
|                                     int(vpn_start)) | ||||
|                                     int(vpn_start), fixed_range_v6) | ||||
|  | ||||
|  | ||||
| class ServiceCommands(object): | ||||
|   | ||||
| @@ -721,7 +721,7 @@ class AuthManager(object): | ||||
|         if project is None: | ||||
|             project = user.id | ||||
|         pid = Project.safe_id(project) | ||||
|         return self.__generate_rc(user.access, user.secret, pid, use_dmz) | ||||
|         return self.__generate_rc(user, pid, use_dmz) | ||||
|  | ||||
|     @staticmethod | ||||
|     def __generate_rc(user, pid, use_dmz=True, host=None): | ||||
|   | ||||
| @@ -76,6 +76,10 @@ class InvalidInputException(Error): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| class TimeoutException(Error): | ||||
|     pass | ||||
|  | ||||
|  | ||||
| def wrap_exception(f): | ||||
|     def _wrap(*args, **kw): | ||||
|         try: | ||||
|   | ||||
							
								
								
									
										99
									
								
								nova/test.py
									
									
									
									
									
								
							
							
						
						
									
										99
									
								
								nova/test.py
									
									
									
									
									
								
							| @@ -23,14 +23,10 @@ and some black magic for inline callbacks. | ||||
| """ | ||||
|  | ||||
| import datetime | ||||
| import sys | ||||
| import time | ||||
| import unittest | ||||
|  | ||||
| import mox | ||||
| import stubout | ||||
| from twisted.internet import defer | ||||
| from twisted.trial import unittest as trial_unittest | ||||
|  | ||||
| from nova import context | ||||
| from nova import db | ||||
| @@ -74,7 +70,8 @@ class TestCase(unittest.TestCase): | ||||
|                                                           FLAGS.fixed_range, | ||||
|                                                           5, 16, | ||||
|                                                           FLAGS.vlan_start, | ||||
|                                                           FLAGS.vpn_start) | ||||
|                                                           FLAGS.vpn_start, | ||||
|                                                           FLAGS.fixed_range_v6) | ||||
|  | ||||
|         # emulate some of the mox stuff, we can't use the metaclass | ||||
|         # because it screws with our generators | ||||
| @@ -139,95 +136,3 @@ class TestCase(unittest.TestCase): | ||||
|  | ||||
|         _wrapped.func_name = self.originalAttach.func_name | ||||
|         rpc.Consumer.attach_to_eventlet = _wrapped | ||||
|  | ||||
|  | ||||
| class TrialTestCase(trial_unittest.TestCase): | ||||
|     """Test case base class for all unit tests""" | ||||
|     def setUp(self): | ||||
|         """Run before each test method to initialize test environment""" | ||||
|         super(TrialTestCase, self).setUp() | ||||
|         # NOTE(vish): We need a better method for creating fixtures for tests | ||||
|         #             now that we have some required db setup for the system | ||||
|         #             to work properly. | ||||
|         self.start = datetime.datetime.utcnow() | ||||
|         ctxt = context.get_admin_context() | ||||
|         if db.network_count(ctxt) != 5: | ||||
|             network_manager.VlanManager().create_networks(ctxt, | ||||
|                                                           FLAGS.fixed_range, | ||||
|                                                           5, 16, | ||||
|                                                           FLAGS.vlan_start, | ||||
|                                                           FLAGS.vpn_start) | ||||
|  | ||||
|         # emulate some of the mox stuff, we can't use the metaclass | ||||
|         # because it screws with our generators | ||||
|         self.mox = mox.Mox() | ||||
|         self.stubs = stubout.StubOutForTesting() | ||||
|         self.flag_overrides = {} | ||||
|         self.injected = [] | ||||
|         self._original_flags = FLAGS.FlagValuesDict() | ||||
|  | ||||
|     def tearDown(self): | ||||
|         """Runs after each test method to finalize/tear down test | ||||
|         environment.""" | ||||
|         try: | ||||
|             self.mox.UnsetStubs() | ||||
|             self.stubs.UnsetAll() | ||||
|             self.stubs.SmartUnsetAll() | ||||
|             self.mox.VerifyAll() | ||||
|             # NOTE(vish): Clean up any ips associated during the test. | ||||
|             ctxt = context.get_admin_context() | ||||
|             db.fixed_ip_disassociate_all_by_timeout(ctxt, FLAGS.host, | ||||
|                                                     self.start) | ||||
|             db.network_disassociate_all(ctxt) | ||||
|             for x in self.injected: | ||||
|                 try: | ||||
|                     x.stop() | ||||
|                 except AssertionError: | ||||
|                     pass | ||||
|  | ||||
|             if FLAGS.fake_rabbit: | ||||
|                 fakerabbit.reset_all() | ||||
|  | ||||
|             db.security_group_destroy_all(ctxt) | ||||
|             super(TrialTestCase, self).tearDown() | ||||
|         finally: | ||||
|             self.reset_flags() | ||||
|  | ||||
|     def flags(self, **kw): | ||||
|         """Override flag variables for a test""" | ||||
|         for k, v in kw.iteritems(): | ||||
|             if k in self.flag_overrides: | ||||
|                 self.reset_flags() | ||||
|                 raise Exception( | ||||
|                         'trying to override already overriden flag: %s' % k) | ||||
|             self.flag_overrides[k] = getattr(FLAGS, k) | ||||
|             setattr(FLAGS, k, v) | ||||
|  | ||||
|     def reset_flags(self): | ||||
|         """Resets all flag variables for the test.  Runs after each test""" | ||||
|         FLAGS.Reset() | ||||
|         for k, v in self._original_flags.iteritems(): | ||||
|             setattr(FLAGS, k, v) | ||||
|  | ||||
|     def run(self, result=None): | ||||
|         test_method = getattr(self, self._testMethodName) | ||||
|         setattr(self, | ||||
|                 self._testMethodName, | ||||
|                 self._maybeInlineCallbacks(test_method, result)) | ||||
|         rv = super(TrialTestCase, self).run(result) | ||||
|         setattr(self, self._testMethodName, test_method) | ||||
|         return rv | ||||
|  | ||||
|     def _maybeInlineCallbacks(self, func, result): | ||||
|         def _wrapped(): | ||||
|             g = func() | ||||
|             if isinstance(g, defer.Deferred): | ||||
|                 return g | ||||
|             if not hasattr(g, 'send'): | ||||
|                 return defer.succeed(g) | ||||
|  | ||||
|             inlined = defer.inlineCallbacks(func) | ||||
|             d = inlined() | ||||
|             return d | ||||
|         _wrapped.func_name = func.func_name | ||||
|         return _wrapped | ||||
|   | ||||
| @@ -265,6 +265,72 @@ class ApiEc2TestCase(test.TestCase): | ||||
|  | ||||
|         return | ||||
|  | ||||
|     def test_authorize_revoke_security_group_cidr_v6(self): | ||||
|         """ | ||||
|         Test that we can add and remove CIDR based rules | ||||
|         to a security group for IPv6 | ||||
|         """ | ||||
|         self.expect_http() | ||||
|         self.mox.ReplayAll() | ||||
|         user = self.manager.create_user('fake', 'fake', 'fake') | ||||
|         project = self.manager.create_project('fake', 'fake', 'fake') | ||||
|  | ||||
|         # At the moment, you need both of these to actually be netadmin | ||||
|         self.manager.add_role('fake', 'netadmin') | ||||
|         project.add_role('fake', 'netadmin') | ||||
|  | ||||
|         security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd") | ||||
|                                       for x in range(random.randint(4, 8))) | ||||
|  | ||||
|         group = self.ec2.create_security_group(security_group_name, | ||||
|                                                'test group') | ||||
|  | ||||
|         self.expect_http() | ||||
|         self.mox.ReplayAll() | ||||
|         group.connection = self.ec2 | ||||
|  | ||||
|         group.authorize('tcp', 80, 81, '::/0') | ||||
|  | ||||
|         self.expect_http() | ||||
|         self.mox.ReplayAll() | ||||
|  | ||||
|         rv = self.ec2.get_all_security_groups() | ||||
|         # I don't bother checkng that we actually find it here, | ||||
|         # because the create/delete unit test further up should | ||||
|         # be good enough for that. | ||||
|         for group in rv: | ||||
|             if group.name == security_group_name: | ||||
|                 self.assertEquals(len(group.rules), 1) | ||||
|                 self.assertEquals(int(group.rules[0].from_port), 80) | ||||
|                 self.assertEquals(int(group.rules[0].to_port), 81) | ||||
|                 self.assertEquals(len(group.rules[0].grants), 1) | ||||
|                 self.assertEquals(str(group.rules[0].grants[0]), '::/0') | ||||
|  | ||||
|         self.expect_http() | ||||
|         self.mox.ReplayAll() | ||||
|         group.connection = self.ec2 | ||||
|  | ||||
|         group.revoke('tcp', 80, 81, '::/0') | ||||
|  | ||||
|         self.expect_http() | ||||
|         self.mox.ReplayAll() | ||||
|  | ||||
|         self.ec2.delete_security_group(security_group_name) | ||||
|  | ||||
|         self.expect_http() | ||||
|         self.mox.ReplayAll() | ||||
|         group.connection = self.ec2 | ||||
|  | ||||
|         rv = self.ec2.get_all_security_groups() | ||||
|  | ||||
|         self.assertEqual(len(rv), 1) | ||||
|         self.assertEqual(rv[0].name, 'default') | ||||
|  | ||||
|         self.manager.delete_project(project) | ||||
|         self.manager.delete_user(user) | ||||
|  | ||||
|         return | ||||
|  | ||||
|     def test_authorize_revoke_security_group_foreign_group(self): | ||||
|         """ | ||||
|         Test that we can grant and revoke another security group access | ||||
|   | ||||
| @@ -129,10 +129,13 @@ class CloudTestCase(test.TestCase): | ||||
|         vol2 = db.volume_create(self.context, {}) | ||||
|         result = self.cloud.describe_volumes(self.context) | ||||
|         self.assertEqual(len(result['volumeSet']), 2) | ||||
|         volume_id = cloud.id_to_ec2_id(vol2['id'], 'vol-%08x') | ||||
|         result = self.cloud.describe_volumes(self.context, | ||||
|                                              volume_id=[vol2['id']]) | ||||
|                                              volume_id=[volume_id]) | ||||
|         self.assertEqual(len(result['volumeSet']), 1) | ||||
|         self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['id']) | ||||
|         self.assertEqual( | ||||
|                 cloud.ec2_id_to_id(result['volumeSet'][0]['volumeId']), | ||||
|                 vol2['id']) | ||||
|         db.volume_destroy(self.context, vol1['id']) | ||||
|         db.volume_destroy(self.context, vol2['id']) | ||||
|  | ||||
| @@ -391,7 +394,8 @@ class CloudTestCase(test.TestCase): | ||||
|  | ||||
|     def test_update_of_volume_display_fields(self): | ||||
|         vol = db.volume_create(self.context, {}) | ||||
|         self.cloud.update_volume(self.context, vol['id'], | ||||
|         self.cloud.update_volume(self.context, | ||||
|                                  cloud.id_to_ec2_id(vol['id'], 'vol-%08x'), | ||||
|                                  display_name='c00l v0lum3') | ||||
|         vol = db.volume_get(self.context, vol['id']) | ||||
|         self.assertEqual('c00l v0lum3', vol['display_name']) | ||||
| @@ -399,7 +403,8 @@ class CloudTestCase(test.TestCase): | ||||
|  | ||||
|     def test_update_of_volume_wont_update_private_fields(self): | ||||
|         vol = db.volume_create(self.context, {}) | ||||
|         self.cloud.update_volume(self.context, vol['id'], | ||||
|         self.cloud.update_volume(self.context, | ||||
|                                  cloud.id_to_ec2_id(vol['id'], 'vol-%08x'), | ||||
|                                  mountpoint='/not/here') | ||||
|         vol = db.volume_get(self.context, vol['id']) | ||||
|         self.assertEqual(None, vol['mountpoint']) | ||||
|   | ||||
| @@ -155,6 +155,13 @@ class ComputeTestCase(test.TestCase): | ||||
|         self.compute.reboot_instance(self.context, instance_id) | ||||
|         self.compute.terminate_instance(self.context, instance_id) | ||||
|  | ||||
|     def test_set_admin_password(self): | ||||
|         """Ensure instance can have its admin password set""" | ||||
|         instance_id = self._create_instance() | ||||
|         self.compute.run_instance(self.context, instance_id) | ||||
|         self.compute.set_admin_password(self.context, instance_id) | ||||
|         self.compute.terminate_instance(self.context, instance_id) | ||||
|  | ||||
|     def test_snapshot(self): | ||||
|         """Ensure instance can be snapshotted""" | ||||
|         instance_id = self._create_instance() | ||||
|   | ||||
| @@ -9,7 +9,7 @@ def _fake_context(): | ||||
|     return context.RequestContext(1, 1) | ||||
|  | ||||
|  | ||||
| class RootLoggerTestCase(test.TrialTestCase): | ||||
| class RootLoggerTestCase(test.TestCase): | ||||
|     def setUp(self): | ||||
|         super(RootLoggerTestCase, self).setUp() | ||||
|         self.log = log.logging.root | ||||
| @@ -46,7 +46,7 @@ class RootLoggerTestCase(test.TrialTestCase): | ||||
|         self.assert_(True)  # didn't raise exception | ||||
|  | ||||
|  | ||||
| class NovaFormatterTestCase(test.TrialTestCase): | ||||
| class NovaFormatterTestCase(test.TestCase): | ||||
|     def setUp(self): | ||||
|         super(NovaFormatterTestCase, self).setUp() | ||||
|         self.flags(logging_context_format_string="HAS CONTEXT "\ | ||||
| @@ -78,7 +78,7 @@ class NovaFormatterTestCase(test.TrialTestCase): | ||||
|         self.assertEqual("NOCTXT: baz --DBG\n", self.stream.getvalue()) | ||||
|  | ||||
|  | ||||
| class NovaLoggerTestCase(test.TrialTestCase): | ||||
| class NovaLoggerTestCase(test.TestCase): | ||||
|     def setUp(self): | ||||
|         super(NovaLoggerTestCase, self).setUp() | ||||
|         self.flags(default_log_levels=["nova-test=AUDIT"], verbose=False) | ||||
| @@ -96,7 +96,7 @@ class NovaLoggerTestCase(test.TrialTestCase): | ||||
|         self.assertEqual(log.AUDIT, l.level) | ||||
|  | ||||
|  | ||||
| class VerboseLoggerTestCase(test.TrialTestCase): | ||||
| class VerboseLoggerTestCase(test.TestCase): | ||||
|     def setUp(self): | ||||
|         super(VerboseLoggerTestCase, self).setUp() | ||||
|         self.flags(default_log_levels=["nova.test=AUDIT"], verbose=True) | ||||
|   | ||||
| @@ -96,6 +96,28 @@ class NetworkTestCase(test.TestCase): | ||||
|         self.context.project_id = self.projects[project_num].id | ||||
|         self.network.deallocate_fixed_ip(self.context, address) | ||||
|  | ||||
|     def test_private_ipv6(self): | ||||
|         """Make sure ipv6 is OK""" | ||||
|         if FLAGS.use_ipv6: | ||||
|             instance_ref = self._create_instance(0) | ||||
|             address = self._create_address(0, instance_ref['id']) | ||||
|             network_ref = db.project_get_network( | ||||
|                                                  context.get_admin_context(), | ||||
|                                                  self.context.project_id) | ||||
|             address_v6 = db.instance_get_fixed_address_v6( | ||||
|                                                  context.get_admin_context(), | ||||
|                                                  instance_ref['id']) | ||||
|             self.assertEqual(instance_ref['mac_address'], | ||||
|                              utils.to_mac(address_v6)) | ||||
|             instance_ref2 = db.fixed_ip_get_instance_v6( | ||||
|                                                  context.get_admin_context(), | ||||
|                                                  address_v6) | ||||
|             self.assertEqual(instance_ref['id'], instance_ref2['id']) | ||||
|             self.assertEqual(address_v6, | ||||
|                              utils.to_global_ipv6( | ||||
|                                                  network_ref['cidr_v6'], | ||||
|                                                  instance_ref['mac_address'])) | ||||
|  | ||||
|     def test_public_network_association(self): | ||||
|         """Makes sure that we can allocaate a public ip""" | ||||
|         # TODO(vish): better way of adding floating ips | ||||
|   | ||||
| @@ -28,7 +28,7 @@ from nova import test | ||||
| FLAGS = flags.FLAGS | ||||
|  | ||||
|  | ||||
| class TwistdTestCase(test.TrialTestCase): | ||||
| class TwistdTestCase(test.TestCase): | ||||
|     def setUp(self): | ||||
|         super(TwistdTestCase, self).setUp() | ||||
|         self.Options = twistd.WrapTwistedOptions(twistd.TwistdServerOptions) | ||||
|   | ||||
| @@ -31,6 +31,7 @@ from nova.compute import power_state | ||||
| from nova.virt import xenapi_conn | ||||
| from nova.virt.xenapi import fake as xenapi_fake | ||||
| from nova.virt.xenapi import volume_utils | ||||
| from nova.virt.xenapi.vmops import SimpleDH | ||||
| from nova.tests.db import fakes as db_fakes | ||||
| from nova.tests.xenapi import stubs | ||||
|  | ||||
| @@ -262,3 +263,29 @@ class XenAPIVMTestCase(test.TestCase): | ||||
|         instance = db.instance_create(values) | ||||
|         self.conn.spawn(instance) | ||||
|         return instance | ||||
|  | ||||
|  | ||||
| class XenAPIDiffieHellmanTestCase(test.TestCase): | ||||
|     """ | ||||
|     Unit tests for Diffie-Hellman code | ||||
|     """ | ||||
|     def setUp(self): | ||||
|         super(XenAPIDiffieHellmanTestCase, self).setUp() | ||||
|         self.alice = SimpleDH() | ||||
|         self.bob = SimpleDH() | ||||
|  | ||||
|     def test_shared(self): | ||||
|         alice_pub = self.alice.get_public() | ||||
|         bob_pub = self.bob.get_public() | ||||
|         alice_shared = self.alice.compute_shared(bob_pub) | ||||
|         bob_shared = self.bob.compute_shared(alice_pub) | ||||
|         self.assertEquals(alice_shared, bob_shared) | ||||
|  | ||||
|     def test_encryption(self): | ||||
|         msg = "This is a top-secret message" | ||||
|         enc = self.alice.encrypt(msg) | ||||
|         dec = self.bob.decrypt(enc) | ||||
|         self.assertEquals(dec, msg) | ||||
|  | ||||
|     def tearDown(self): | ||||
|         super(XenAPIDiffieHellmanTestCase, self).tearDown() | ||||
|   | ||||
| @@ -31,6 +31,8 @@ import struct | ||||
| import sys | ||||
| import time | ||||
| from xml.sax import saxutils | ||||
| import re | ||||
| import netaddr | ||||
|  | ||||
| from eventlet import event | ||||
| from eventlet import greenthread | ||||
| @@ -201,6 +203,40 @@ def last_octet(address): | ||||
|     return int(address.split(".")[-1]) | ||||
|  | ||||
|  | ||||
| def  get_my_linklocal(interface): | ||||
|     try: | ||||
|         if_str = execute("ip -f inet6 -o addr show %s" % interface) | ||||
|         condition = "\s+inet6\s+([0-9a-f:]+/\d+)\s+scope\s+link" | ||||
|         links = [re.search(condition, x) for x in if_str[0].split('\n')] | ||||
|         address = [w.group(1) for w in links if w is not None] | ||||
|         if address[0] is not None: | ||||
|             return address[0] | ||||
|         else: | ||||
|             return 'fe00::' | ||||
|     except IndexError as ex: | ||||
|         LOG.warn(_("Couldn't get Link Local IP of %s :%s"), interface, ex) | ||||
|     except ProcessExecutionError as ex: | ||||
|         LOG.warn(_("Couldn't get Link Local IP of %s :%s"), interface, ex) | ||||
|     except: | ||||
|         return 'fe00::' | ||||
|  | ||||
|  | ||||
| def to_global_ipv6(prefix, mac): | ||||
|     mac64 = netaddr.EUI(mac).eui64().words | ||||
|     int_addr = int(''.join(['%02x' % i for i in mac64]), 16) | ||||
|     mac64_addr = netaddr.IPAddress(int_addr) | ||||
|     maskIP = netaddr.IPNetwork(prefix).ip | ||||
|     return (mac64_addr ^ netaddr.IPAddress('::0200:0:0:0') | maskIP).format() | ||||
|  | ||||
|  | ||||
| def to_mac(ipv6_address): | ||||
|     address = netaddr.IPAddress(ipv6_address) | ||||
|     mask1 = netaddr.IPAddress("::ffff:ffff:ffff:ffff") | ||||
|     mask2 = netaddr.IPAddress("::0200:0:0:0") | ||||
|     mac64 = netaddr.EUI(int(address & mask1 ^ mask2)).words | ||||
|     return ":".join(["%02x" % i for i in mac64[0:3] + mac64[5:8]]) | ||||
|  | ||||
|  | ||||
| def utcnow(): | ||||
|     """Overridable version of datetime.datetime.utcnow.""" | ||||
|     if utcnow.override_time: | ||||
|   | ||||
		Reference in New Issue
	
	Block a user
	 Andy Smith
					Andy Smith