cloudbase-init/cloudbaseinit/tests/metadata/services/test_opennebulaservice.py

349 lines
11 KiB
Python

# Copyright 2014 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
import textwrap
import unittest
try:
import unittest.mock as mock
except ImportError:
import mock
from cloudbaseinit.metadata.services import base
from cloudbaseinit.metadata.services import opennebulaservice
from cloudbaseinit.models import network as network_model
from cloudbaseinit.tests import testutils
MAC = "54:EE:75:19:F4:61" # output must be upper
ADDRESS = "192.168.122.101"
NETMASK = "255.255.255.0"
BROADCAST = "192.168.122.255"
GATEWAY = "192.168.122.1"
DNSNS = "8.8.8.8 8.8.4.4"
PUBLIC_KEY = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAACAQDJitRvac/fr1jWrZw"
"j6mgDxlrBN2xAtKExtm5cPkexQUuxTma61ZijP/aWiQg9Q93baSwsBi"
"IPM0SO1ro0szv84cC9GmSHWVOnCVWGY3nojplqL5VfV9NDLlmSceFc5"
"cLpUTMnoUiXt8QXfDm50gh/5vGgJJXuMz1BKwfJH232ajM5r9xUfKDZ"
"jzhTVooPlWoJJmn6xJDOJG7cjszZpv2N+Xzq7GRo6fa7ygTASOnES5t"
"vbcqM8432P6Bg7Hkr2bOjQF11RyJofFcOvECKfbX4jQ9JGzbocNnepw"
"2YlV08UYa/8aoFgzyo/FiR6cc/jQupbFIe92xBSNiMEioeZ26nTac6C"
"oRQXEKrb95Ntg7ysYUqjKQFWJdx6AW7hlE8mMjA6nRqvswXsp1atNdU"
"DylyVxlvUHo9rEHEs3GKjkO4tr8KKR0N+oWVAO8S2RfSaD/wFcTokW8"
"DeLz2Fnc04pyqOnCjdG7b7HqQVUupuxJNc3EUxZEjbUYiDi22MWF0Oa"
"vM7e0xZHMOsdhUPUUnBWngETuOTVSo26bRfzOcUzjwyv2n5PS9rvzYz"
"ooXIqcK4BdJ8TLh4OQZwV862PjiafxxWC1L90Tou+BkMTFvwoiWDGMc"
"ckPkjvg6p9E2viSFgaKMq2S6EjbzsHG/9BilLBDHLOcbhUU6E76dqGk"
"4jl0ZzQ== jfontan@zooloo")
HOST_NAME = "ws2012r2"
USER_DATA = """#cloud-config
bootcmd:
- ifdown -a
runcmd:
- curl http://10.0.1.1:8999/I_am_alive
write_files:
- encoding: b64
content: RG9lcyBpdCB3b3JrPwo=
owner: root:root
path: /etc/test_file
permissions: '\''0644'\''
packages:
- ruby2.0"""
CONTEXT = """
DISK_ID='1'
ETH0_DNS='{dnsns}'
ETH0_GATEWAY='{gateway}'
ETH0_IP='{address}'
ETH0_MASK='{netmask}'
ETH0_MAC='{mac}'
ETH0_SEARCH_DOMAIN='example.org'
NETWORK='YES'
SET_HOSTNAME='{host_name}'
SSH_PUBLIC_KEY='{public_key}'
TARGET='hda'
USER_DATA='{user_data}'
""".format(
dnsns=DNSNS,
gateway=GATEWAY,
address=ADDRESS,
netmask=NETMASK,
mac=MAC.lower(), # warning: mac is in lowercase
host_name=HOST_NAME,
public_key=PUBLIC_KEY,
user_data=USER_DATA
)
CONTEXT2 = ("""
ETH1_DNS='{dnsns}'
ETH1_GATEWAY='{gateway}'
ETH1_IP='{address}'
ETH1_MASK='{netmask}'
ETH1_MAC='{mac}'
""" + CONTEXT).format(
dnsns=DNSNS,
gateway=GATEWAY,
address=ADDRESS,
netmask=NETMASK,
mac=MAC.lower()
)
OPEN = mock.mock_open(read_data=CONTEXT.encode())
def _get_nic_details(iid=0):
details = network_model.NetworkDetails(
opennebulaservice.IF_FORMAT.format(iid=iid),
MAC,
ADDRESS,
None,
NETMASK,
None,
BROADCAST,
GATEWAY,
None,
DNSNS.split(" ")
)
return details
class _TestOpenNebulaService(unittest.TestCase):
def setUp(self):
self._service = opennebulaservice.OpenNebulaService()
@mock.patch("six.moves.builtins.open", new=OPEN)
class TestOpenNebulaService(_TestOpenNebulaService):
@classmethod
def setUpClass(cls):
OPEN.return_value.read.return_value = CONTEXT.encode()
def _test_parse_shell_variables(self, crlf=False, comment=False):
content = textwrap.dedent("""
VAR1='1'
var2='abcdef'
VAR_VAR3='aaa.bbb.123.ccc'
# suddenly, a comment
VaR4='aaa
bbb
x -- c
d: e
'
ivar=10
""")
if comment:
content += "# A simple comment\n"
if crlf:
content = content.replace("\n", "\r\n")
pairs = self._service._parse_shell_variables(content.encode())
_pairs = {
"VAR1": b"1",
"var2": b"abcdef",
"VAR_VAR3": b"aaa.bbb.123.ccc",
"VaR4": b"aaa\nbbb\nx -- c\nd: e\n",
"ivar": 10
}
if crlf:
for key, value in _pairs.items():
if isinstance(value, bytes):
_pairs[key] = value.replace(b"\n", b"\r\n")
self.assertEqual(_pairs, pairs)
def test_parse_shell_variables(self):
# 1. no CRLF, no comment
# 2. CRLF, no comment
# 3. no CRLF, comment
for crlf, comment in (
(False, False),
(True, False),
(False, True)):
self._test_parse_shell_variables(crlf=crlf, comment=comment)
def test_calculate_netmask(self):
address, gateway, _netmask = (
"192.168.0.10",
"192.168.1.1",
"255.255.0.0"
)
netmask = self._service._calculate_netmask(address, gateway)
self.assertEqual(_netmask, netmask)
def test_compute_broadcast(self):
address, netmask, _broadcast = (
"192.168.0.10",
"255.255.0.0",
"192.168.255.255"
)
broadcast = self._service._compute_broadcast(address, netmask)
self.assertEqual(_broadcast, broadcast)
@mock.patch("cloudbaseinit.metadata.services"
".opennebulaservice.os.path")
@mock.patch("cloudbaseinit.metadata.services"
".opennebulaservice.osutils_factory")
def _test_load(self, mock_osutils_factory, mock_os_path, level=0):
# fake data
fakes = {
"drive": "mount_point",
"label": "fake_label",
"context_path": "fake_path",
"context_data": "fake_data"
}
# mocking part
mock_osutils = mock.MagicMock()
mock_osutils_factory.get_os_utils.return_value = mock_osutils
mock_osutils.get_cdrom_drives.return_value = []
# custom mocking according to level of testing
if level > 1:
mock_osutils.get_cdrom_drives.return_value = [fakes["drive"]]
mock_osutils.get_volume_label.return_value = fakes["label"]
mock_os_path.join.return_value = fakes["context_path"]
mock_os_path.isfile.return_value = False
if level > 2:
mock_os_path.isfile.return_value = True
# run the method being tested
with testutils.LogSnatcher('cloudbaseinit.metadata.services.'
'opennebulaservice'):
ret = self._service.load()
# check calls
if level > 0:
mock_osutils_factory.get_os_utils.assert_called_once_with()
mock_osutils.get_cdrom_drives.assert_called_once_with()
if level > 1:
(mock_osutils.get_volume_label
.assert_called_once_with(fakes["drive"]))
mock_os_path.join.assert_called_once_with(
"mount_point", opennebulaservice.CONTEXT_FILE)
mock_os_path.isfile.assert_called_once_with("fake_path")
# check response and members
if level in (1, 2):
self.assertFalse(ret)
elif level == 3:
self.assertTrue(ret)
self.assertEqual(fakes["context_path"],
self._service._context_path)
def test_load_no_drives(self):
self._test_load(level=1)
def test_load_no_relevant_drive(self):
self._test_load(level=2)
def test_load_relevant_drive(self):
self._test_load(level=3)
def test_parse_context(self):
with self.assertRaises(base.NotExistingMetadataException):
self._service._parse_context()
self._service._context_path = "path"
self._service._parse_context()
open.assert_called_with("path", "rb")
self.assertTrue(self._service._dict_content)
def test_get_data(self):
self._service._context_path = "path"
self._service._parse_context()
with self.assertRaises(base.NotExistingMetadataException):
self._service._get_data("smt")
var = opennebulaservice.ADDRESS[0].format(iid=0)
ret = self._service._get_data(var).decode()
self.assertEqual(ADDRESS, ret)
class TestLoadedOpenNebulaService(_TestOpenNebulaService):
def setUp(self):
super(TestLoadedOpenNebulaService, self).setUp()
self.load_context()
def load_context(self, context=CONTEXT):
self._service._raw_content = context.encode()
vardict = self._service._parse_shell_variables(
self._service._raw_content
)
self._service._dict_content = vardict
def test_get_cache_data(self):
names = ["smt"]
with self.assertRaises(base.NotExistingMetadataException):
self._service._get_cache_data(names)
names.append(opennebulaservice.ADDRESS[0].format(iid=0))
ret = self._service._get_cache_data(names).decode()
self.assertEqual(ADDRESS, ret)
def test_get_instance_id(self):
self.assertEqual(
opennebulaservice.INSTANCE_ID,
self._service.get_instance_id()
)
def test_get_host_name(self):
self.assertEqual(
HOST_NAME,
self._service.get_host_name()
)
def test_get_user_data(self):
self.assertEqual(
USER_DATA.encode(),
self._service.get_user_data()
)
def test_get_public_keys(self):
self.assertEqual(
[PUBLIC_KEY],
self._service.get_public_keys()
)
def _test_get_network_details(self, netmask=True):
if not netmask:
context = re.sub(r"ETH0_MASK='(\d+\.){3}\d+'", "", CONTEXT)
self.load_context(context=context)
details = _get_nic_details()
self.assertEqual(
[details],
self._service.get_network_details()
)
def test_get_network_details(self):
self._test_get_network_details(netmask=True)
def test_get_network_details_predict(self):
self._test_get_network_details(netmask=False)
def test_multiple_nics(self):
self.load_context(context=CONTEXT2)
nic0 = _get_nic_details(iid=0)
nic1 = _get_nic_details(iid=1)
network_details = [nic0, nic1]
self.assertEqual(
network_details,
self._service.get_network_details()
)
@mock.patch("cloudbaseinit.metadata.services"
".opennebulaservice.OpenNebulaService._get_cache_data")
def test_get_network_details_exception(self, mock_get_cache):
mock_mac = mock_address = mock.MagicMock()
mock_mac.upper.return_value = None
mock_address.side_effect = None
exc = base.NotExistingMetadataException
mock_get_cache.side_effect = [mock_mac, mock_address, exc, exc]
result_details = self._service.get_network_details()
self.assertEqual(result_details, [])