1. Got this test working again using the new code.
2. Adjusted so that instead of capturing all logging, only a certain level is captured (now that we have lots more logging)
This commit is contained in:
parent
1751a121f9
commit
a9933e3def
@ -1,107 +1,144 @@
|
||||
"""Tests for handling of userdata within cloud init"""
|
||||
|
||||
import logging
|
||||
import StringIO
|
||||
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import tempfile
|
||||
|
||||
from email.mime.base import MIMEBase
|
||||
|
||||
from mocker import MockerTestCase
|
||||
|
||||
import cloudinit
|
||||
from cloudinit.DataSource import DataSource
|
||||
from cloudinit import helpers
|
||||
from cloudinit import log
|
||||
from cloudinit import sources
|
||||
from cloudinit import stages
|
||||
from cloudinit import util
|
||||
|
||||
INSTANCE_ID = "i-testing"
|
||||
|
||||
|
||||
instance_id = "i-testing"
|
||||
|
||||
|
||||
class FakeDataSource(DataSource):
|
||||
class FakeDataSource(sources.DataSource):
|
||||
|
||||
def __init__(self, userdata):
|
||||
DataSource.__init__(self)
|
||||
self.metadata = {'instance-id': instance_id}
|
||||
sources.DataSource.__init__(self, {}, None, None)
|
||||
self.metadata = {'instance-id': INSTANCE_ID}
|
||||
self.userdata_raw = userdata
|
||||
|
||||
|
||||
# FIXME: these tests shouldn't be checking log output??
|
||||
# Weirddddd...
|
||||
|
||||
|
||||
class TestConsumeUserData(MockerTestCase):
|
||||
|
||||
_log_handler = None
|
||||
_log = None
|
||||
log_file = None
|
||||
|
||||
def setUp(self):
|
||||
MockerTestCase.setUp(self)
|
||||
# Replace the write so no actual files
|
||||
# get written out...
|
||||
self.mock_write = self.mocker.replace("cloudinit.util.write_file",
|
||||
passthrough=False)
|
||||
self.mock_write(self.get_ipath("cloud_config"), "", 0600)
|
||||
self.capture_log()
|
||||
self._log = None
|
||||
self._log_file = None
|
||||
self._log_handler = None
|
||||
|
||||
def tearDown(self):
|
||||
self._log.removeHandler(self._log_handler)
|
||||
MockerTestCase.tearDown(self)
|
||||
if self._log_handler and self._log:
|
||||
self._log.removeHandler(self._log_handler)
|
||||
|
||||
@staticmethod
|
||||
def get_ipath(name):
|
||||
return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id,
|
||||
cloudinit.pathmap[name])
|
||||
|
||||
def capture_log(self):
|
||||
self.log_file = StringIO.StringIO()
|
||||
self._log_handler = logging.StreamHandler(self.log_file)
|
||||
self._log_handler.setLevel(logging.DEBUG)
|
||||
self._log = logging.getLogger(cloudinit.logger_name)
|
||||
def capture_log(self, lvl=logging.DEBUG):
|
||||
log_file = StringIO.StringIO()
|
||||
self._log_handler = logging.StreamHandler(log_file)
|
||||
self._log_handler.setLevel(lvl)
|
||||
self._log = log.getLogger()
|
||||
self._log.addHandler(self._log_handler)
|
||||
return log_file
|
||||
|
||||
def test_unhandled_type_warning(self):
|
||||
"""Raw text without magic is ignored but shows warning"""
|
||||
ci = stages.Init()
|
||||
data = "arbitrary text\n"
|
||||
ci.datasource = FakeDataSource(data)
|
||||
|
||||
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
|
||||
self.mocker.replay()
|
||||
ci = cloudinit.CloudInit()
|
||||
ci.datasource = FakeDataSource("arbitrary text\n")
|
||||
ci.consume_userdata()
|
||||
self.assertEqual(
|
||||
"Unhandled non-multipart userdata starting 'arbitrary text...'\n",
|
||||
self.log_file.getvalue())
|
||||
|
||||
log_file = self.capture_log(logging.WARNING)
|
||||
ci.fetch()
|
||||
ci.consume()
|
||||
self.assertIn(
|
||||
"Unhandled non-multipart (text/x-not-multipart) userdata:",
|
||||
log_file.getvalue())
|
||||
|
||||
def test_mime_text_plain(self):
|
||||
"""Mime message of type text/plain is ignored without warning"""
|
||||
self.mocker.replay()
|
||||
ci = cloudinit.CloudInit()
|
||||
"""Mime message of type text/plain is ignored but shows warning"""
|
||||
ci = stages.Init()
|
||||
message = MIMEBase("text", "plain")
|
||||
message.set_payload("Just text")
|
||||
ci.datasource = FakeDataSource(message.as_string())
|
||||
ci.consume_userdata()
|
||||
self.assertEqual("", self.log_file.getvalue())
|
||||
|
||||
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
|
||||
self.mocker.replay()
|
||||
|
||||
log_file = self.capture_log(logging.WARNING)
|
||||
ci.fetch()
|
||||
ci.consume()
|
||||
self.assertIn(
|
||||
"Unhandled unknown content-type (text/plain)",
|
||||
log_file.getvalue())
|
||||
|
||||
|
||||
def test_shellscript(self):
|
||||
"""Raw text starting #!/bin/sh is treated as script"""
|
||||
ci = stages.Init()
|
||||
script = "#!/bin/sh\necho hello\n"
|
||||
outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
|
||||
ci.datasource = FakeDataSource(script)
|
||||
|
||||
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
|
||||
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
|
||||
self.mock_write(outpath, script, 0700)
|
||||
self.mocker.replay()
|
||||
ci = cloudinit.CloudInit()
|
||||
ci.datasource = FakeDataSource(script)
|
||||
ci.consume_userdata()
|
||||
self.assertEqual("", self.log_file.getvalue())
|
||||
|
||||
log_file = self.capture_log(logging.WARNING)
|
||||
ci.fetch()
|
||||
ci.consume()
|
||||
self.assertEqual("", log_file.getvalue())
|
||||
|
||||
def test_mime_text_x_shellscript(self):
|
||||
"""Mime message of type text/x-shellscript is treated as script"""
|
||||
ci = stages.Init()
|
||||
script = "#!/bin/sh\necho hello\n"
|
||||
outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
|
||||
self.mock_write(outpath, script, 0700)
|
||||
self.mocker.replay()
|
||||
ci = cloudinit.CloudInit()
|
||||
message = MIMEBase("text", "x-shellscript")
|
||||
message.set_payload(script)
|
||||
ci.datasource = FakeDataSource(message.as_string())
|
||||
ci.consume_userdata()
|
||||
self.assertEqual("", self.log_file.getvalue())
|
||||
|
||||
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
|
||||
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
|
||||
self.mock_write(outpath, script, 0700)
|
||||
self.mocker.replay()
|
||||
|
||||
log_file = self.capture_log(logging.WARNING)
|
||||
ci.fetch()
|
||||
ci.consume()
|
||||
self.assertEqual("", log_file.getvalue())
|
||||
|
||||
def test_mime_text_plain_shell(self):
|
||||
"""Mime type text/plain starting #!/bin/sh is treated as script"""
|
||||
ci = stages.Init()
|
||||
script = "#!/bin/sh\necho hello\n"
|
||||
outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
|
||||
self.mock_write(outpath, script, 0700)
|
||||
self.mocker.replay()
|
||||
ci = cloudinit.CloudInit()
|
||||
message = MIMEBase("text", "plain")
|
||||
message.set_payload(script)
|
||||
ci.datasource = FakeDataSource(message.as_string())
|
||||
ci.consume_userdata()
|
||||
self.assertEqual("", self.log_file.getvalue())
|
||||
|
||||
outpath = os.path.join(ci.paths.get_ipath_cur("scripts"), "part-001")
|
||||
self.mock_write(outpath, script, 0700)
|
||||
self.mock_write(ci.paths.get_ipath("cloud_config"), "", 0600)
|
||||
self.mocker.replay()
|
||||
|
||||
log_file = self.capture_log(logging.WARNING)
|
||||
ci.fetch()
|
||||
ci.consume()
|
||||
self.assertEqual("", log_file.getvalue())
|
||||
|
Loading…
Reference in New Issue
Block a user