Port xenapi test_xenapi to Python 3

* Use base64 module from oslo.serialization to convert things
  as needed
* Replace filter() with a list-comprehension to get a list on
  Python 3

Partially-Implements: blueprint goal-python35

Change-Id: Ibe7e212cea5e6dd32b9b92a814c3ad8222ed2bc6
This commit is contained in:
ChangBo Guo(gcb) 2016-11-23 13:12:46 +08:00
parent 0ae8cbf83e
commit a54c1bc784
3 changed files with 25 additions and 21 deletions

View File

@ -1106,10 +1106,10 @@ iface eth0 inet6 static
self.stubs.Set(crypto, 'ssh_encrypt_text', fake_encrypt_text)
expected_data = ('\n# The following ssh key was injected by '
'Nova\nssh-rsa fake_keydata\n')
expected_data = (b'\n# The following ssh key was injected by '
b'Nova\nssh-rsa fake_keydata\n')
injected_files = [('/root/.ssh/authorized_keys', expected_data)]
injected_files = [(b'/root/.ssh/authorized_keys', expected_data)]
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
key_data='ssh-rsa fake_keydata')
@ -1136,10 +1136,10 @@ iface eth0 inet6 static
self.stubs.Set(crypto, 'ssh_encrypt_text', fake_encrypt_text)
expected_data = ('\n# The following ssh key was injected by '
'Nova\nssh-dsa fake_keydata\n')
expected_data = (b'\n# The following ssh key was injected by '
b'Nova\nssh-dsa fake_keydata\n')
injected_files = [('/root/.ssh/authorized_keys', expected_data)]
injected_files = [(b'/root/.ssh/authorized_keys', expected_data)]
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
key_data='ssh-dsa fake_keydata')
@ -1159,7 +1159,7 @@ iface eth0 inet6 static
self.stubs.Set(stubs.FakeSessionForVMTests,
'_plugin_agent_inject_file', fake_inject_file)
injected_files = [('/tmp/foo', 'foobar')]
injected_files = [(b'/tmp/foo', b'foobar')]
self._test_spawn(IMAGE_VHD, None, None,
os_type="linux", architecture="x86-64",
injected_files=injected_files)
@ -2756,17 +2756,20 @@ class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase):
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p icmp'
' -s 192.168.11.0/24')
self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
match_rules = [rule for rule in self._out_rules if regex.match(rule)]
self.assertGreater(len(match_rules), 0,
"ICMP acceptance rule wasn't added")
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p icmp -m icmp'
' --icmp-type 8 -s 192.168.11.0/24')
self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
match_rules = [rule for rule in self._out_rules if regex.match(rule)]
self.assertGreater(len(match_rules), 0,
"ICMP Echo Request acceptance rule wasn't added")
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p tcp --dport 80:81'
' -s 192.168.10.0/24')
self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
match_rules = [rule for rule in self._out_rules if regex.match(rule)]
self.assertGreater(len(match_rules), 0,
"TCP port 80/81 acceptance rule wasn't added")
def test_static_filters(self):
@ -2810,7 +2813,9 @@ class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase):
continue
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p tcp'
' --dport 80:81 -s %s' % ip['address'])
self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
match_rules = [rule for rule in self._out_rules
if regex.match(rule)]
self.assertGreater(len(match_rules), 0,
"TCP port 80/81 acceptance rule wasn't added")
db.instance_destroy(admin_ctxt, instance_ref['uuid'])
@ -2880,7 +2885,8 @@ class XenAPIDom0IptablesFirewallTestCase(stubs.XenAPITestBase):
self.fw.refresh_security_group_rules(secgroup)
regex = re.compile('\[0\:0\] -A .* -j ACCEPT -p udp --dport 200:299'
' -s 192.168.99.0/24')
self.assertGreater(len(filter(regex.match, self._out_rules)), 0,
match_rules = [rule for rule in self._out_rules if regex.match(rule)]
self.assertGreater(len(match_rules), 0,
"Rules were not updated properly. "
"The rule for UDP acceptance is missing")

View File

@ -13,7 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import base64
import binascii
from distutils import version
import os
@ -21,7 +20,9 @@ import sys
import time
from oslo_log import log as logging
from oslo_serialization import base64
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
from oslo_utils import strutils
from oslo_utils import uuidutils
@ -250,7 +251,7 @@ class XenAPIBasedAgent(object):
ctxt = context.get_admin_context()
enc = crypto.ssh_encrypt_text(sshkey, new_pass)
self.instance.system_metadata.update(
password.convert_password(ctxt, base64.b64encode(enc)))
password.convert_password(ctxt, base64.encode_as_text(enc)))
self.instance.save()
def set_admin_password(self, new_pass):
@ -316,8 +317,8 @@ class XenAPIBasedAgent(object):
LOG.debug('Injecting file path: %r', path, instance=self.instance)
# Files/paths must be base64-encoded for transmission to agent
b64_path = base64.b64encode(path.encode('utf-8'))
b64_contents = base64.b64encode(contents.encode('utf-8'))
b64_path = base64.encode_as_bytes(path)
b64_contents = base64.encode_as_bytes(contents)
args = {'b64_path': b64_path, 'b64_contents': b64_contents}
return self._call_agent('inject_file', args)
@ -422,7 +423,8 @@ class SimpleDH(object):
'pass:%s' % self._shared, '-nosalt']
if decrypt:
cmd.append('-d')
out, err = utils.execute(*cmd, process_input=text)
out, err = utils.execute(*cmd,
process_input=encodeutils.safe_encode(text))
if err:
raise RuntimeError(_('OpenSSL error: %s') % err)
return out

View File

@ -55,10 +55,6 @@ nova.tests.unit.virt.xenapi.client.test_session.CallPluginTestCase
nova.tests.unit.virt.xenapi.test_vm_utils.ResizeFunctionTestCase
nova.tests.unit.virt.xenapi.test_vm_utils.ScanSrTestCase
nova.tests.unit.virt.xenapi.test_vm_utils.UnplugVbdTestCase
nova.tests.unit.virt.xenapi.test_xenapi.HypervisorPoolTestCase
nova.tests.unit.virt.xenapi.test_xenapi.XenAPIDiffieHellmanTestCase
nova.tests.unit.virt.xenapi.test_xenapi.XenAPIDom0IptablesFirewallTestCase
nova.tests.unit.virt.xenapi.test_xenapi.XenAPIVMTestCase
##########################################################################
# NOTE(dims): The following tests randomly fail in the gate. Please be