Support all the variants for writing files with cloudconfig

The patch fixes a small bug in cloudconfig._process_content, where the
chaining of encoders (gzip and base64 for instance) weren't treated
correctly. It also adds support for processing both lists of dicts and
dicts in the write-files plugin.
Also, we added tests for using !!binary| marker for yaml binary
streams, as well as a couple of other tests.

Change-Id: I9edcdcd93af4e058a0d1f8c9cdaa1b2aa9e9618d
Closes-Bug: #1409270
This commit is contained in:
Claudiu Popa 2014-12-28 17:48:29 +02:00
parent c4297a029f
commit b34f3b4b5d
4 changed files with 79 additions and 14 deletions

View File

@ -17,6 +17,8 @@ import gzip
import io
import os
import six
from cloudbaseinit import exception
from cloudbaseinit.openstack.common import log as logging
from cloudbaseinit.plugins.windows.userdataplugins.cloudconfigplugins import (
@ -56,7 +58,11 @@ def _convert_permissions(permissions):
def _process_content(content, encoding):
"""Decode the content taking into consideration the encoding."""
result = str(content)
result = content
if six.PY3 and not isinstance(result, six.binary_type):
# At this point, content will be string, which is wrong for Python 3.
result = result.encode()
steps = _decode_steps(encoding)
if not steps:
LOG.error("Unknown encoding, doing nothing.")
@ -64,7 +70,7 @@ def _process_content(content, encoding):
for mime_type in _decode_steps(encoding):
if mime_type == GZIP_MIME:
bufferio = io.BytesIO(content)
bufferio = io.BytesIO(result)
with gzip.GzipFile(fileobj=bufferio, mode='rb') as file_handle:
try:
result = file_handle.read()

View File

@ -308,13 +308,23 @@ class TestCloudConfig(unittest.TestCase):
cls.userdata = pkgutil.get_data('cloudbaseinit.tests.resources',
'cloud_config_userdata').decode()
def create_tempfiles(self, number):
for _ in range(number):
tmp = _create_tempfile()
self.addCleanup(os.remove, tmp)
yield tmp
def test_cloud_config_multipart(self):
tmp = _create_tempfile()
self.addCleanup(os.remove, tmp)
b64, b64_binary, gz, gz_binary = list(self.create_tempfiles(4))
service = FakeService(self.userdata.format(b64=tmp))
service = FakeService(self.userdata.format(b64=b64,
b64_binary=b64_binary,
gzip=gz,
gzip_binary=gz_binary))
self.plugin.execute(service, {})
self.assertTrue(os.path.exists(tmp))
with open(tmp) as stream:
self.assertEqual('42', stream.read())
for path in (b64, b64_binary, gz, gz_binary):
self.assertTrue(os.path.exists(path),
"Path {} should exist.".format(path))
with open(path) as stream:
self.assertEqual('42', stream.read())

View File

@ -77,8 +77,10 @@ class WriteFilesPluginTests(unittest.TestCase):
response = write_files._convert_permissions(mock.sentinel.invalid)
self.assertEqual(write_files.DEFAULT_PERMISSIONS, response)
def test_write_file(self):
tmp = self._get_tempfile()
def test_write_file_list(self):
expected_logging = [
"Plugin 'invalid' is currently not supported",
]
code = textwrap.dedent("""
write_files:
- encoding: b64
@ -87,7 +89,23 @@ class WriteFilesPluginTests(unittest.TestCase):
permissions: '0o466'
invalid:
- stuff: 1
""".format(tmp))
""")
self._test_write_file(code, expected_logging)
def test_write_file_dict(self):
code = textwrap.dedent("""
write_files:
encoding: b64
content: NDI=
path: {}
permissions: '0o466'
""")
self._test_write_file(code)
def _test_write_file(self, code, expected_logging=None):
tmp = self._get_tempfile()
code = code.format(tmp)
with testutils.LogSnatcher('cloudbaseinit.plugins.windows.'
'userdataplugins.cloudconfig') as snatcher:
self.plugin.process_non_multipart(code)
@ -97,8 +115,8 @@ class WriteFilesPluginTests(unittest.TestCase):
with open(tmp) as stream:
self.assertEqual('42', stream.read())
self.assertEqual(["Plugin 'invalid' is currently not supported"],
snatcher.output)
if expected_logging is not None:
self.assertEqual(expected_logging, snatcher.output)
# Test that the proper permissions were set. On Windows,
# only the read bit is processed, the rest are ignored.
@ -143,3 +161,24 @@ class WriteFilesPluginTests(unittest.TestCase):
"Processing plugin write_files failed"))
self.assertTrue(snatcher.output[0].endswith("ValueError"))
self.assertFalse(os.path.exists('random_cloudbaseinit_test'))
def test_unknown_encoding(self):
tmp = self._get_tempfile()
code = textwrap.dedent("""
write_files:
- content: NDI=
path: {}
permissions: '0o466'
""".format(tmp))
with testutils.LogSnatcher('cloudbaseinit.plugins.windows.'
'userdataplugins.cloudconfigplugins.'
'write_files') as snatcher:
self.plugin.process_non_multipart(code)
self.assertTrue(os.path.exists(tmp),
"Expected path does not exist.")
with open(tmp) as stream:
self.assertEqual('NDI=', stream.read())
self.assertEqual(["Unknown encoding, doing nothing."],
snatcher.output)

View File

@ -11,4 +11,14 @@ write_files:
- encoding: b64
content: NDI=
path: {b64}
permissions: '0644'
permissions: '0644'
- content: !!binary |
NDI=
path: {b64_binary}
- path: {gzip}
encoding: gzip+b64
content: H4sIAGUfoFQC/zMxAgCIsCQyAgAAAA==
- path: {gzip_binary}
encoding: gzip
content: !!binary |
H4sIAGUfoFQC/zMxAgCIsCQyAgAAAA==