Implement cloud-config runcmd

If the userdata is of type cloud-config, the runcmd entry can contain
multiple entries with commands that will be executed, in the order
of their definition.

The commands can be given as a string or as an array of strings, the
first item being the binary to be executed and the rest being the
parameters of that binary.

The commands will be aggregated and written into one single shell file,
in the order of their definition.
On Windows, the file will be executed by the native Windows
shell cmd.exe.

Example userdata file:

 - 'dir C:\\'
 - ['echo', '1']


Change-Id: Ie307e08f8c4108c7bf9108543cc90b6a7fa2e7ae
This commit is contained in:
Adrian Vladu 2019-12-02 16:05:35 +02:00
parent 630345dd38
commit ce74218315
6 changed files with 197 additions and 0 deletions

View File

@ -46,3 +46,5 @@ VOL_ACT_AVMA = "AVMA"

View File

@ -209,3 +209,10 @@ class BaseOSUtils(object):
def take_path_ownership(self, path, username=None):
raise NotImplementedError()
def get_default_script_exec_header(self):
"""File header where the cloud-config runcmd will be aggregated.
Example: `#!/bin/bash` for bash or `rem cmd` for cmd.
raise NotImplementedError()

View File

@ -36,6 +36,7 @@ import win32security
import win32service
import winerror
from cloudbaseinit import constant
from cloudbaseinit import exception
from cloudbaseinit.osutils import base
from cloudbaseinit.utils import classloader
@ -1742,3 +1743,6 @@ class WindowsUtils(base.BaseOSUtils):
ls = info['FileVersionLS']
return (win32api.HIWORD(ms), win32api.LOWORD(ms),
win32api.HIWORD(ls), win32api.LOWORD(ls))
def get_default_script_exec_header(self):
return constant.SCRIPT_HEADER_CMD

View File

@ -30,6 +30,8 @@ PLUGINS = {
'ntp': 'cloudbaseinit.plugins.common.userdataplugins.'
'runcmd': 'cloudbaseinit.plugins.common.userdataplugins.'

View File

@ -0,0 +1,86 @@
# Copyright 2019 Cloudbase Solutions Srl
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import six
from oslo_log import log as oslo_logging
from cloudbaseinit import exception
from cloudbaseinit.osutils import factory
from cloudbaseinit.plugins.common import execcmd
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
from cloudbaseinit.plugins.common import userdatautils
LOG = oslo_logging.getLogger(__name__)
class RunCmdPlugin(base.BaseCloudConfigPlugin):
"""Aggregate and execute cloud-config runcmd entries in a shell.
The runcmd entries can be a string or an array of strings.
The prefered shell is given by the OS platform.
Example for Windows, where cmd.exe is the prefered shell:
- ['dir', 'C:\']
- 'dir C:\'
def _unify_scripts(commands, env_header):
script_content = env_header + os.linesep
entries = 0
for command in commands:
if isinstance(command, six.string_types):
script_content += command
elif isinstance(command, (list, tuple)):
subcommand_content = []
for subcommand in command:
subcommand_content.append("%s" % subcommand)
script_content += ' '.join(subcommand_content)
raise exception.CloudbaseInitException(
"Unrecognized type '%r' in cmd content" % type(command))
script_content += os.linesep
entries += 1"Found %d cloud-config runcmd entries." % entries)
return script_content
def process(self, data):
if not data:'No cloud-config runcmd entries found.')
return"Running cloud-config runcmd entries.")
osutils = factory.get_os_utils()
env_header = osutils.get_default_script_exec_header()
ret_val = userdatautils.execute_user_data_script(
self._unify_scripts(data, env_header).encode())
_, reboot = execcmd.get_plugin_return_value(ret_val)
return reboot
except Exception as ex:
LOG.warning("An error occurred during runcmd execution: '%s'"
% ex)
return False

View File

@ -0,0 +1,96 @@
# Copyright 2019 Cloudbase Solutions Srl
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import unittest
import unittest.mock as mock
except ImportError:
import mock
from oslo_config import cfg
from cloudbaseinit import exception
from cloudbaseinit.plugins.common.userdataplugins.cloudconfigplugins import (
from cloudbaseinit.tests import testutils
class RunCmdPluginTest(unittest.TestCase):
def setUp(self):
self._runcmd_plugin = runcmd.RunCmdPlugin()
def test_unify_scripts(self):
run_commands = ['echo 1', 'echo 2']
fake_hader = 'fake_header'
result = self._runcmd_plugin._unify_scripts(run_commands, fake_hader)
ln_sep = os.linesep
expected_result = 'fake_header%secho 1%secho 2%s' % (ln_sep, ln_sep,
self.assertEqual(result, expected_result)
def test_unify_scripts_fail(self):
run_commands = [{'cmd': 'fake_cmd'}]
with self.assertRaises(exception.CloudbaseInitException) as cm:
self._runcmd_plugin._unify_scripts(run_commands, 'fake_header')
expected = ("Unrecognized type '%s' in cmd content"
% type(run_commands[0]))
self.assertEqual(expected, str(cm.exception))
def test_process_basic_data(self, mock_os_utils, mock_userdata):
run_commands = ['echo 1', 'echo 2', ['echo', '1'], 'exit 1003']
mock_userdata.return_value = 1003
mock_utils = mock.MagicMock()
mock_utils.get_default_script_exec_header.return_value = 'fake_header'
mock_os_utils.return_value = mock_utils
expected_logging = [
"Running cloud-config runcmd entries.",
"Found 4 cloud-config runcmd entries.",
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'runcmd') as snatcher:
result_process = self._runcmd_plugin.process(run_commands)
self.assertEqual(expected_logging, snatcher.output)
self.assertEqual(result_process, True)
def test_process_wrong_cmd_type(self, mock_os_utils):
run_commands = [{'cmd': 'fake_cmd'}]
expected_logging = [
"Running cloud-config runcmd entries.",
"An error occurred during runcmd execution: 'Unrecognized type "
"'%s' in cmd content'" % type(run_commands[0])
with testutils.LogSnatcher('cloudbaseinit.plugins.common.'
'runcmd') as snatcher:
result_process = self._runcmd_plugin.process(run_commands)
self.assertEqual(expected_logging, snatcher.output)
self.assertEqual(result_process, False)