108 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			108 lines
		
	
	
		
			3.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""Tests for handling of userdata within cloud init"""
 | 
						|
 | 
						|
import logging
 | 
						|
import StringIO
 | 
						|
 | 
						|
from email.mime.base import MIMEBase
 | 
						|
 | 
						|
from mocker import MockerTestCase
 | 
						|
 | 
						|
import cloudinit
 | 
						|
from cloudinit.DataSource import DataSource
 | 
						|
 | 
						|
 | 
						|
instance_id = "i-testing"
 | 
						|
 | 
						|
 | 
						|
class FakeDataSource(DataSource):
 | 
						|
 | 
						|
    def __init__(self, userdata):
 | 
						|
        DataSource.__init__(self)
 | 
						|
        self.metadata = {'instance-id': instance_id}
 | 
						|
        self.userdata_raw = userdata
 | 
						|
 | 
						|
 | 
						|
class TestConsumeUserData(MockerTestCase):
 | 
						|
 | 
						|
    _log_handler = None
 | 
						|
    _log = None
 | 
						|
    log_file = None
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.mock_write = self.mocker.replace("cloudinit.util.write_file",
 | 
						|
            passthrough=False)
 | 
						|
        self.mock_write(self.get_ipath("cloud_config"), "", 0600)
 | 
						|
        self.capture_log()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self._log.removeHandler(self._log_handler)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_ipath(name):
 | 
						|
        return "%s/instances/%s%s" % (cloudinit.varlibdir, instance_id,
 | 
						|
            cloudinit.pathmap[name])
 | 
						|
 | 
						|
    def capture_log(self):
 | 
						|
        self.log_file = StringIO.StringIO()
 | 
						|
        self._log_handler = logging.StreamHandler(self.log_file)
 | 
						|
        self._log_handler.setLevel(logging.DEBUG)
 | 
						|
        self._log = logging.getLogger(cloudinit.logger_name)
 | 
						|
        self._log.addHandler(self._log_handler)
 | 
						|
 | 
						|
    def test_unhandled_type_warning(self):
 | 
						|
        """Raw text without magic is ignored but shows warning"""
 | 
						|
        self.mocker.replay()
 | 
						|
        ci = cloudinit.CloudInit()
 | 
						|
        ci.datasource = FakeDataSource("arbitrary text\n")
 | 
						|
        ci.consume_userdata()
 | 
						|
        self.assertEqual(
 | 
						|
            "Unhandled non-multipart userdata starting 'arbitrary text...'\n",
 | 
						|
            self.log_file.getvalue())
 | 
						|
 | 
						|
    def test_mime_text_plain(self):
 | 
						|
        """Mime message of type text/plain is ignored without warning"""
 | 
						|
        self.mocker.replay()
 | 
						|
        ci = cloudinit.CloudInit()
 | 
						|
        message = MIMEBase("text", "plain")
 | 
						|
        message.set_payload("Just text")
 | 
						|
        ci.datasource = FakeDataSource(message.as_string())
 | 
						|
        ci.consume_userdata()
 | 
						|
        self.assertEqual("", self.log_file.getvalue())
 | 
						|
 | 
						|
    def test_shellscript(self):
 | 
						|
        """Raw text starting #!/bin/sh is treated as script"""
 | 
						|
        script = "#!/bin/sh\necho hello\n"
 | 
						|
        outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
 | 
						|
        self.mock_write(outpath, script, 0700)
 | 
						|
        self.mocker.replay()
 | 
						|
        ci = cloudinit.CloudInit()
 | 
						|
        ci.datasource = FakeDataSource(script)
 | 
						|
        ci.consume_userdata()
 | 
						|
        self.assertEqual("", self.log_file.getvalue())
 | 
						|
 | 
						|
    def test_mime_text_x_shellscript(self):
 | 
						|
        """Mime message of type text/x-shellscript is treated as script"""
 | 
						|
        script = "#!/bin/sh\necho hello\n"
 | 
						|
        outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
 | 
						|
        self.mock_write(outpath, script, 0700)
 | 
						|
        self.mocker.replay()
 | 
						|
        ci = cloudinit.CloudInit()
 | 
						|
        message = MIMEBase("text", "x-shellscript")
 | 
						|
        message.set_payload(script)
 | 
						|
        ci.datasource = FakeDataSource(message.as_string())
 | 
						|
        ci.consume_userdata()
 | 
						|
        self.assertEqual("", self.log_file.getvalue())
 | 
						|
 | 
						|
    def test_mime_text_plain_shell(self):
 | 
						|
        """Mime type text/plain starting #!/bin/sh is treated as script"""
 | 
						|
        script = "#!/bin/sh\necho hello\n"
 | 
						|
        outpath = cloudinit.get_ipath_cur("scripts") + "/part-001"
 | 
						|
        self.mock_write(outpath, script, 0700)
 | 
						|
        self.mocker.replay()
 | 
						|
        ci = cloudinit.CloudInit()
 | 
						|
        message = MIMEBase("text", "plain")
 | 
						|
        message.set_payload(script)
 | 
						|
        ci.datasource = FakeDataSource(message.as_string())
 | 
						|
        ci.consume_userdata()
 | 
						|
        self.assertEqual("", self.log_file.getvalue())
 |