Add RHEVm and vSphere support as datasource AltCloud

These changes add a new data source to cloud-init to support passing user
data to RHEVm and vSphere.  The user data is passed to RHEVm v3.0 (current
version) using a floppy injection hook and to vSphere via cdrom device.

RHEVm v3.1 will use a method similar to vSphere.  Once available support
for that is also expected.
This commit is contained in:
Scott Moser 2012-08-14 11:40:47 -04:00
commit 2548f7de9c
5 changed files with 802 additions and 0 deletions

View File

@ -1,4 +1,5 @@
0.7.0:
- Added RHEVm and vSphere support as source AltCloud [Joseph VLcek]
- add write-files module (LP: #1012854)
- Add setuptools + cheetah to debian package build dependencies (LP: #1022101)
- Adjust the sysvinit local script to provide 'cloud-init-local' and have

View File

@ -31,6 +31,7 @@ CFG_BUILTIN = {
'datasource_list': [
'NoCloud',
'ConfigDrive',
'AltCloud',
'OVF',
'MAAS',
'Ec2',

View File

@ -0,0 +1,299 @@
# vi: ts=4 expandtab
#
# Copyright (C) 2009-2010 Canonical Ltd.
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2012 Yahoo! Inc.
#
# Author: Joe VLcek <JVLcek@RedHat.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
This file contains code used to gather the user data passed to an
instance on RHEVm and vSphere.
'''
import errno
import os
import os.path
from cloudinit import log as logging
from cloudinit import sources
from cloudinit import util
from cloudinit.util import ProcessExecutionError
LOG = logging.getLogger(__name__)
# Needed file paths
CLOUD_INFO_FILE = '/etc/sysconfig/cloud-info'
# Shell command lists
CMD_DMI_SYSTEM = ['/usr/sbin/dmidecode', '--string', 'system-product-name']
CMD_PROBE_FLOPPY = ['/sbin/modprobe', 'floppy']
CMD_UDEVADM_SETTLE = ['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
META_DATA_NOT_SUPPORTED = {
'block-device-mapping': {},
'instance-id': 455,
'local-hostname': 'localhost',
'placement': {},
}
def read_user_data_callback(mount_dir):
'''
Description:
This callback will be applied by util.mount_cb() on the mounted
file.
Deltacloud file name contains deltacloud. Those not using
Deltacloud but instead instrumenting the injection, could
drop deltacloud from the file name.
Input:
mount_dir - Mount directory
Returns:
User Data
'''
deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
user_data_file = mount_dir + '/user-data.txt'
# First try deltacloud_user_data_file. On failure try user_data_file.
try:
with open(deltacloud_user_data_file, 'r') as user_data_f:
user_data = user_data_f.read().strip()
except:
try:
with open(user_data_file, 'r') as user_data_f:
user_data = user_data_f.read().strip()
except:
util.logexc(LOG, ('Failed accessing user data file.'))
return None
return user_data
class DataSourceAltCloud(sources.DataSource):
def __init__(self, sys_cfg, distro, paths):
sources.DataSource.__init__(self, sys_cfg, distro, paths)
self.seed = None
self.supported_seed_starts = ("/", "file://")
def __str__(self):
mstr = "%s [seed=%s]" % (util.obj_name(self), self.seed)
return mstr
def get_cloud_type(self):
'''
Description:
Get the type for the cloud back end this instance is running on
by examining the string returned by:
dmidecode --string system-product-name
On VMWare/vSphere dmidecode returns: RHEV Hypervisor
On VMWare/vSphere dmidecode returns: VMware Virtual Platform
Input:
None
Returns:
One of the following strings:
'RHEV', 'VSPHERE' or 'UNKNOWN'
'''
cmd = CMD_DMI_SYSTEM
try:
(cmd_out, _err) = util.subp(cmd)
except ProcessExecutionError, _err:
LOG.debug(('Failed command: %s\n%s') % \
(' '.join(cmd), _err.message))
return 'UNKNOWN'
except OSError, _err:
LOG.debug(('Failed command: %s\n%s') % \
(' '.join(cmd), _err.message))
return 'UNKNOWN'
if cmd_out.upper().startswith('RHEV'):
return 'RHEV'
if cmd_out.upper().startswith('VMWARE'):
return 'VSPHERE'
return 'UNKNOWN'
def get_data(self):
'''
Description:
User Data is passed to the launching instance which
is used to perform instance configuration.
Cloud providers expose the user data differently.
It is necessary to determine which cloud provider
the current instance is running on to determine
how to access the user data. Images built with
image factory will contain a CLOUD_INFO_FILE which
contains a string identifying the cloud provider.
Images not built with Imagefactory will try to
determine what the cloud provider is based on system
information.
'''
LOG.debug('Invoked get_data()')
if os.path.exists(CLOUD_INFO_FILE):
try:
cloud_info = open(CLOUD_INFO_FILE)
cloud_type = cloud_info.read().strip().upper()
cloud_info.close()
except:
util.logexc(LOG, 'Unable to access cloud info file.')
return False
else:
cloud_type = self.get_cloud_type()
LOG.debug('cloud_type: ' + str(cloud_type))
if 'RHEV' in cloud_type:
if self.user_data_rhevm():
return True
elif 'VSPHERE' in cloud_type:
if self.user_data_vsphere():
return True
else:
# there was no recognized alternate cloud type
# indicating this handler should not be used.
return False
# No user data found
util.logexc(LOG, ('Failed accessing user data.'))
return False
def user_data_rhevm(self):
'''
RHEVM specific userdata read
If on RHEV-M the user data will be contained on the
floppy device in file <user_data_file>
To access it:
modprobe floppy
Leverage util.mount_cb to:
mkdir <tmp mount dir>
mount /dev/fd0 <tmp mount dir>
The call back passed to util.mount_cb will do:
read <tmp mount dir>/<user_data_file>
'''
return_str = None
# modprobe floppy
try:
cmd = CMD_PROBE_FLOPPY
(cmd_out, _err) = util.subp(cmd)
LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out))
except ProcessExecutionError, _err:
util.logexc(LOG, (('Failed command: %s\n%s') % \
(' '.join(cmd), _err.message)))
return False
except OSError, _err:
util.logexc(LOG, (('Failed command: %s\n%s') % \
(' '.join(cmd), _err.message)))
return False
floppy_dev = '/dev/fd0'
# udevadm settle for floppy device
try:
cmd = CMD_UDEVADM_SETTLE
cmd.append('--exit-if-exists=' + floppy_dev)
(cmd_out, _err) = util.subp(cmd)
LOG.debug(('Command: %s\nOutput%s') % (' '.join(cmd), cmd_out))
except ProcessExecutionError, _err:
util.logexc(LOG, (('Failed command: %s\n%s') % \
(' '.join(cmd), _err.message)))
return False
except OSError, _err:
util.logexc(LOG, (('Failed command: %s\n%s') % \
(' '.join(cmd), _err.message)))
return False
try:
return_str = util.mount_cb(floppy_dev, read_user_data_callback)
except OSError as err:
if err.errno != errno.ENOENT:
raise
except util.MountFailedError:
util.logexc(LOG, ("Failed to mount %s"
" when looking for user data"), floppy_dev)
self.userdata_raw = return_str
self.metadata = META_DATA_NOT_SUPPORTED
if return_str:
return True
else:
return False
def user_data_vsphere(self):
'''
vSphere specific userdata read
If on vSphere the user data will be contained on the
cdrom device in file <user_data_file>
To access it:
Leverage util.mount_cb to:
mkdir <tmp mount dir>
mount /dev/fd0 <tmp mount dir>
The call back passed to util.mount_cb will do:
read <tmp mount dir>/<user_data_file>
'''
return_str = None
cdrom_list = util.find_devs_with('LABEL=CDROM')
for cdrom_dev in cdrom_list:
try:
return_str = util.mount_cb(cdrom_dev, read_user_data_callback)
if return_str:
break
except OSError as err:
if err.errno != errno.ENOENT:
raise
except util.MountFailedError:
util.logexc(LOG, ("Failed to mount %s"
" when looking for user data"), cdrom_dev)
self.userdata_raw = return_str
self.metadata = META_DATA_NOT_SUPPORTED
if return_str:
return True
else:
return False
# Used to match classes to dependencies
# Source DataSourceAltCloud does not really depend on networking.
# In the future 'dsmode' like behavior can be added to offer user
# the ability to run before networking.
datasources = [
(DataSourceAltCloud, (sources.DEP_FILESYSTEM, sources.DEP_NETWORK)),
]
# Return a list of data sources that match this set of dependencies
def get_datasource_list(depends):
return sources.list_from_depends(depends, datasources)

65
doc/altcloud/README Normal file
View File

@ -0,0 +1,65 @@
Data souce AltCloud will be used to pick up user data on
RHEVm and vSphere.
RHEVm:
======
For REHVm v3.0 the userdata is injected into the VM using floppy
injection via the RHEVm dashboard "Custom Properties". The format
of the Custom Properties entry must be:
"floppyinject=user-data.txt:<base64 encoded data>"
e.g.: To pass a simple bash script
% cat simple_script.bash
#!/bin/bash
echo "Hello Joe!" >> /tmp/JJV_Joe_out.txt
% base64 < simple_script.bash
IyEvYmluL2Jhc2gKZWNobyAiSGVsbG8gSm9lISIgPj4gL3RtcC9KSlZfSm9lX291dC50eHQK
To pass this example script to cloud-init running in a RHEVm v3.0 VM
set the "Custom Properties" when creating the RHEMv v3.0 VM to:
floppyinject=user-data.txt:IyEvYmluL2Jhc2gKZWNobyAiSGVsbG8gSm9lISIgPj4gL3RtcC9KSlZfSm9lX291dC50eHQK
NOTE: The prefix with file name must be: "floppyinject=user-data.txt:"
It is also possible to launch a RHEVm v3.0 VM and pass optional user
data to it using the Delta Cloud.
For more inforation on Delta Cloud see: http://deltacloud.apache.org
vSphere:
========
For VMWare's vSphere the userdata is injected into the VM an ISO
via the cdrom. This can be done using the vSphere dashboard
by connecting an ISO image to the CD/DVD drive.
To pass this example script to cloud-init running in a vSphere VM
set the CD/DVD drive when creating the vSphere VM to point to an
ISO on the data store.
The ISO must contain the user data:
For example, to pass the same simple_script.bash to vSphere:
Create the ISO:
===============
% mkdir my-iso
NOTE: The file name on the ISO must be: "user-data.txt"
% cp simple_scirpt.bash my-iso/user-data.txt
% genisoimage -o user-data.iso -r my-iso
Verify the ISO:
===============
% sudo mkdir /media/vsphere_iso
% sudo mount -o loop JoeV_CI_02.iso /media/vsphere_iso
% cat /media/vsphere_iso/user-data.txt
% sudo umount /media/vsphere_iso
Then, launch the vSphere VM the ISO user-data.iso attached as a CDrom.
It is also possible to launch a vSphere VM and pass optional user
data to it using the Delta Cloud.
For more inforation on Delta Cloud see: http://deltacloud.apache.org

View File

@ -0,0 +1,436 @@
# vi: ts=4 expandtab
#
# Copyright (C) 2009-2010 Canonical Ltd.
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2012 Yahoo! Inc.
#
# Author: Joe VLcek <JVLcek@RedHat.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 3, as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
'''
This test file exercises the code in sources DataSourceAltCloud.py
'''
import os
import shutil
import tempfile
from unittest import TestCase
from cloudinit import helpers
# Get the cloudinit.sources.DataSourceAltCloud import items needed.
import cloudinit.sources.DataSourceAltCloud
from cloudinit.sources.DataSourceAltCloud import DataSourceAltCloud
from cloudinit.sources.DataSourceAltCloud import read_user_data_callback
def _write_cloud_info_file(value):
'''
Populate the CLOUD_INFO_FILE which would be populated
with a cloud backend identifier ImageFactory when building
an image with ImageFactory.
'''
cifile = open(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 'w')
cifile.write(value)
cifile.close()
os.chmod(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE, 0664)
def _remove_cloud_info_file():
'''
Remove the test CLOUD_INFO_FILE
'''
os.remove(cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE)
def _write_user_data_files(mount_dir, value):
'''
Populate the deltacloud_user_data_file the user_data_file
which would be populated with user data.
'''
deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
user_data_file = mount_dir + '/user-data.txt'
udfile = open(deltacloud_user_data_file, 'w')
udfile.write(value)
udfile.close()
os.chmod(deltacloud_user_data_file, 0664)
udfile = open(user_data_file, 'w')
udfile.write(value)
udfile.close()
os.chmod(user_data_file, 0664)
def _remove_user_data_files(mount_dir,
dc_file=True,
non_dc_file=True):
'''
Remove the test files: deltacloud_user_data_file and
user_data_file
'''
deltacloud_user_data_file = mount_dir + '/deltacloud-user-data.txt'
user_data_file = mount_dir + '/user-data.txt'
# Ignore any failures removeing files that are already gone.
if dc_file:
try:
os.remove(deltacloud_user_data_file)
except OSError:
pass
if non_dc_file:
try:
os.remove(user_data_file)
except OSError:
pass
class TestGetCloudType(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_cloud_type()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
def tearDown(self):
# Reset
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['dmidecode', '--string', 'system-product-name']
def test_rhev(self):
'''
Test method get_cloud_type() for RHEVm systems.
Forcing dmidecode return to match a RHEVm system: RHEV Hypervisor
'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'RHEV Hypervisor']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals('RHEV', \
dsrc.get_cloud_type())
def test_vsphere(self):
'''
Test method get_cloud_type() for vSphere systems.
Forcing dmidecode return to match a vSphere system: RHEV Hypervisor
'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'VMware Virtual Platform']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals('VSPHERE', \
dsrc.get_cloud_type())
def test_unknown(self):
'''
Test method get_cloud_type() for unknown systems.
Forcing dmidecode return to match an unrecognized return.
'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'Unrecognized Platform']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals('UNKNOWN', \
dsrc.get_cloud_type())
def test_exception1(self):
'''
Test method get_cloud_type() where command dmidecode fails.
'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['ls', 'bad command']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals('UNKNOWN', \
dsrc.get_cloud_type())
def test_exception2(self):
'''
Test method get_cloud_type() where command dmidecode is not available.
'''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['bad command']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals('UNKNOWN', \
dsrc.get_cloud_type())
class TestGetDataCloudInfoFile(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
With a contrived CLOUD_INFO_FILE
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
self.cloud_info_file = tempfile.mkstemp()[1]
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
self.cloud_info_file
def tearDown(self):
# Reset
# Attempt to remove the temp file ignoring errors
try:
os.remove(self.cloud_info_file)
except OSError:
pass
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
def test_rhev(self):
'''Success Test module get_data() forcing RHEV '''
_write_cloud_info_file('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda : True
self.assertEquals(True, dsrc.get_data())
def test_vsphere(self):
'''Success Test module get_data() forcing VSPHERE '''
_write_cloud_info_file('VSPHERE')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda : True
self.assertEquals(True, dsrc.get_data())
def test_fail_rhev(self):
'''Failure Test module get_data() forcing RHEV '''
_write_cloud_info_file('RHEV')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda : False
self.assertEquals(False, dsrc.get_data())
def test_fail_vsphere(self):
'''Failure Test module get_data() forcing VSPHERE '''
_write_cloud_info_file('VSPHERE')
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda : False
self.assertEquals(False, dsrc.get_data())
def test_unrecognized(self):
'''Failure Test module get_data() forcing unrecognized '''
_write_cloud_info_file('unrecognized')
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.get_data())
class TestGetDataNoCloudInfoFile(TestCase):
'''
Test to exercise method: DataSourceAltCloud.get_data()
Without a CLOUD_INFO_FILE
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'no such file'
def tearDown(self):
# Reset
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['dmidecode', '--string', 'system-product-name']
def test_rhev_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing RHEV '''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'RHEV Hypervisor']
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_rhevm = lambda : True
self.assertEquals(True, dsrc.get_data())
def test_vsphere_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing VSPHERE '''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'VMware Virtual Platform']
dsrc = DataSourceAltCloud({}, None, self.paths)
dsrc.user_data_vsphere = lambda : True
self.assertEquals(True, dsrc.get_data())
def test_failure_no_cloud_file(self):
'''Test No cloud info file module get_data() forcing unrecognized '''
cloudinit.sources.DataSourceAltCloud.CMD_DMI_SYSTEM = \
['echo', 'Unrecognized Platform']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.get_data())
class TestUserDataRhevm(TestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_rhevm()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
def tearDown(self):
# Reset
_remove_user_data_files(self.mount_dir)
# Attempt to remove the temp dir ignoring errors
try:
shutil.rmtree(self.mount_dir)
except OSError:
pass
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['/sbin/modprobe', 'floppy']
cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
['/sbin/udevadm', 'settle', '--quiet', '--timeout=5']
def test_mount_cb_fails(self):
'''Test user_data_rhevm() where mount_cb fails'''
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['echo', 'modprobe floppy']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.user_data_rhevm())
def test_modprobe_fails(self):
'''Test user_data_rhevm() where modprobe fails. '''
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['ls', 'modprobe floppy']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.user_data_rhevm())
def test_no_modprobe_cmd(self):
'''Test user_data_rhevm() with no modprobe command. '''
cloudinit.sources.DataSourceAltCloud.CMD_PROBE_FLOPPY = \
['bad command', 'modprobe floppy']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.user_data_rhevm())
def test_udevadm_fails(self):
'''Test user_data_rhevm() where udevadm fails. '''
cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
['ls', 'udevadm floppy']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.user_data_rhevm())
def test_no_udevadm_cmd(self):
'''Test user_data_rhevm() with no udevadm command. '''
cloudinit.sources.DataSourceAltCloud.CMD_UDEVADM_SETTLE = \
['bad command', 'udevadm floppy']
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.user_data_rhevm())
class TestUserDataVsphere(TestCase):
'''
Test to exercise method: DataSourceAltCloud.user_data_vsphere()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
def tearDown(self):
# Reset
_remove_user_data_files(self.mount_dir)
# Attempt to remove the temp dir ignoring errors
try:
shutil.rmtree(self.mount_dir)
except OSError:
pass
cloudinit.sources.DataSourceAltCloud.CLOUD_INFO_FILE = \
'/etc/sysconfig/cloud-info'
def test_user_data_vsphere(self):
'''Test user_data_vsphere() where mount_cb fails'''
cloudinit.sources.DataSourceAltCloud.MEDIA_DIR = self.mount_dir
dsrc = DataSourceAltCloud({}, None, self.paths)
self.assertEquals(False, dsrc.user_data_vsphere())
class TestReadUserDataCallback(TestCase):
'''
Test to exercise method: DataSourceAltCloud.read_user_data_callback()
'''
def setUp(self):
''' Set up '''
self.paths = helpers.Paths({ 'cloud_dir': '/tmp' })
self.mount_dir = tempfile.mkdtemp()
_write_user_data_files(self.mount_dir, 'test user data')
def tearDown(self):
# Reset
_remove_user_data_files(self.mount_dir)
# Attempt to remove the temp dir ignoring errors
try:
shutil.rmtree(self.mount_dir)
except OSError:
pass
def test_callback_both(self):
'''Test read_user_data_callback() with both files'''
self.assertEquals('test user data',
read_user_data_callback(self.mount_dir))
def test_callback_dc(self):
'''Test read_user_data_callback() with only DC file'''
_remove_user_data_files(self.mount_dir,
dc_file=False,
non_dc_file=True)
self.assertEquals('test user data',
read_user_data_callback(self.mount_dir))
def test_callback_non_dc(self):
'''Test read_user_data_callback() with only non-DC file'''
_remove_user_data_files(self.mount_dir,
dc_file=True,
non_dc_file=False)
self.assertEquals('test user data',
read_user_data_callback(self.mount_dir))
def test_callback_none(self):
'''Test read_user_data_callback() no files are found'''
_remove_user_data_files(self.mount_dir)
self.assertEquals(None, read_user_data_callback(self.mount_dir))
# vi: ts=4 expandtab