5c2557dbaf
Now that we no longer support py27, we can use the standard library unittest.mock module instead of the third party mock lib. Change-Id: Ib6e4e7f89a9990cfb42afa209878812340109ecf Signed-off-by: Sean McGinnis <sean.mcginnis@gmail.com>
1418 lines
54 KiB
Python
1418 lines
54 KiB
Python
#
|
|
# Copyright 2013 Hewlett-Packard Development Company, L.P.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import json
|
|
import os
|
|
import tempfile
|
|
from unittest import mock
|
|
|
|
import boto.cloudformation as cfn
|
|
import fixtures
|
|
import testtools
|
|
import testtools.matchers as ttm
|
|
|
|
from heat_cfntools.cfntools import cfn_helper
|
|
|
|
|
|
def popen_root_calls(calls, shell=False):
|
|
kwargs = {'env': None, 'cwd': None, 'stderr': -1, 'stdout': -1,
|
|
'shell': shell}
|
|
return [
|
|
mock.call(call, **kwargs)
|
|
for call in calls
|
|
]
|
|
|
|
|
|
class FakePOpen(object):
|
|
def __init__(self, stdout='', stderr='', returncode=0):
|
|
self.returncode = returncode
|
|
self.stdout = stdout
|
|
self.stderr = stderr
|
|
|
|
def communicate(self):
|
|
return (self.stdout, self.stderr)
|
|
|
|
def wait(self):
|
|
pass
|
|
|
|
|
|
@mock.patch.object(cfn_helper.pwd, 'getpwnam')
|
|
@mock.patch.object(cfn_helper.os, 'seteuid')
|
|
@mock.patch.object(cfn_helper.os, 'geteuid')
|
|
class TestCommandRunner(testtools.TestCase):
|
|
|
|
def test_command_runner(self, mock_geteuid, mock_seteuid, mock_getpwnam):
|
|
def returns(*args, **kwargs):
|
|
if args[0][0] == '/bin/command1':
|
|
return FakePOpen('All good')
|
|
elif args[0][0] == '/bin/command2':
|
|
return FakePOpen('Doing something', 'error', -1)
|
|
else:
|
|
raise Exception('This should never happen')
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
cmd2 = cfn_helper.CommandRunner(['/bin/command2'])
|
|
cmd1 = cfn_helper.CommandRunner(['/bin/command1'],
|
|
nextcommand=cmd2)
|
|
cmd1.run('root')
|
|
self.assertEqual(
|
|
'CommandRunner:\n\tcommand: [\'/bin/command1\']\n\tstdout: '
|
|
'All good',
|
|
str(cmd1))
|
|
self.assertEqual(
|
|
'CommandRunner:\n\tcommand: [\'/bin/command2\']\n\tstatus: '
|
|
'-1\n\tstdout: Doing something\n\tstderr: error',
|
|
str(cmd2))
|
|
calls = popen_root_calls([['/bin/command1'], ['/bin/command2']])
|
|
mock_popen.assert_has_calls(calls)
|
|
|
|
def test_privileges_are_lowered_for_non_root_user(self, mock_geteuid,
|
|
mock_seteuid,
|
|
mock_getpwnam):
|
|
pw_entry = mock.Mock()
|
|
pw_entry.pw_uid = 1001
|
|
mock_getpwnam.return_value = pw_entry
|
|
mock_geteuid.return_value = 0
|
|
calls = [mock.call(1001), mock.call(0)]
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
command = ['/bin/command', '--option=value', 'arg1', 'arg2']
|
|
cmd = cfn_helper.CommandRunner(command)
|
|
cmd.run(user='nonroot')
|
|
self.assertTrue(mock_geteuid.called)
|
|
mock_getpwnam.assert_called_once_with('nonroot')
|
|
mock_seteuid.assert_has_calls(calls)
|
|
self.assertTrue(mock_popen.called)
|
|
|
|
def test_run_returns_when_cannot_set_privileges(self, mock_geteuid,
|
|
mock_seteuid,
|
|
mock_getpwnam):
|
|
msg = '[Error 1] Permission Denied'
|
|
mock_seteuid.side_effect = Exception(msg)
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
command = ['/bin/command2']
|
|
cmd = cfn_helper.CommandRunner(command)
|
|
cmd.run(user='nonroot')
|
|
self.assertTrue(mock_getpwnam.called)
|
|
self.assertTrue(mock_seteuid.called)
|
|
self.assertFalse(mock_popen.called)
|
|
self.assertEqual(126, cmd.status)
|
|
self.assertEqual(msg, cmd.stderr)
|
|
|
|
def test_privileges_are_restored_for_command_failure(self, mock_geteuid,
|
|
mock_seteuid,
|
|
mock_getpwnam):
|
|
pw_entry = mock.Mock()
|
|
pw_entry.pw_uid = 1001
|
|
mock_getpwnam.return_value = pw_entry
|
|
mock_geteuid.return_value = 0
|
|
calls = [mock.call(1001), mock.call(0)]
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = ValueError('Something wrong')
|
|
command = ['/bin/command', '--option=value', 'arg1', 'arg2']
|
|
cmd = cfn_helper.CommandRunner(command)
|
|
self.assertRaises(ValueError, cmd.run, user='nonroot')
|
|
self.assertTrue(mock_geteuid.called)
|
|
mock_getpwnam.assert_called_once_with('nonroot')
|
|
mock_seteuid.assert_has_calls(calls)
|
|
self.assertTrue(mock_popen.called)
|
|
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
class TestPackages(testtools.TestCase):
|
|
|
|
def test_yum_install(self, mock_cp):
|
|
|
|
def returns(*args, **kwargs):
|
|
if args[0][0] == 'rpm' and args[0][1] == '-q':
|
|
return FakePOpen(returncode=1)
|
|
else:
|
|
return FakePOpen(returncode=0)
|
|
|
|
calls = [['which', 'yum']]
|
|
for pack in ('httpd', 'wordpress', 'mysql-server'):
|
|
calls.append(['rpm', '-q', pack])
|
|
calls.append(['yum', '-y', '--showduplicates', 'list',
|
|
'available', pack])
|
|
calls = popen_root_calls(calls)
|
|
|
|
packages = {
|
|
"yum": {
|
|
"mysql-server": [],
|
|
"httpd": [],
|
|
"wordpress": []
|
|
}
|
|
}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
cfn_helper.PackagesHandler(packages).apply_packages()
|
|
mock_popen.assert_has_calls(calls, any_order=True)
|
|
|
|
def test_dnf_install_yum_unavailable(self, mock_cp):
|
|
|
|
def returns(*args, **kwargs):
|
|
if ((args[0][0] == 'rpm' and args[0][1] == '-q')
|
|
or (args[0][0] == 'which' and args[0][1] == 'yum')):
|
|
return FakePOpen(returncode=1)
|
|
else:
|
|
return FakePOpen(returncode=0)
|
|
|
|
calls = [['which', 'yum']]
|
|
for pack in ('httpd', 'wordpress', 'mysql-server'):
|
|
calls.append(['rpm', '-q', pack])
|
|
calls.append(['dnf', '-y', '--showduplicates', 'list',
|
|
'available', pack])
|
|
calls = popen_root_calls(calls)
|
|
|
|
packages = {
|
|
"yum": {
|
|
"mysql-server": [],
|
|
"httpd": [],
|
|
"wordpress": []
|
|
}
|
|
}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
cfn_helper.PackagesHandler(packages).apply_packages()
|
|
mock_popen.assert_has_calls(calls, any_order=True)
|
|
|
|
def test_dnf_install(self, mock_cp):
|
|
|
|
def returns(*args, **kwargs):
|
|
if args[0][0] == 'rpm' and args[0][1] == '-q':
|
|
return FakePOpen(returncode=1)
|
|
else:
|
|
return FakePOpen(returncode=0)
|
|
|
|
calls = []
|
|
for pack in ('httpd', 'wordpress', 'mysql-server'):
|
|
calls.append(['rpm', '-q', pack])
|
|
calls.append(['dnf', '-y', '--showduplicates', 'list',
|
|
'available', pack])
|
|
calls = popen_root_calls(calls)
|
|
|
|
packages = {
|
|
"dnf": {
|
|
"mysql-server": [],
|
|
"httpd": [],
|
|
"wordpress": []
|
|
}
|
|
}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
cfn_helper.PackagesHandler(packages).apply_packages()
|
|
mock_popen.assert_has_calls(calls, any_order=True)
|
|
|
|
def test_zypper_install(self, mock_cp):
|
|
|
|
def returns(*args, **kwargs):
|
|
if args[0][0].startswith('rpm') and args[0][1].startswith('-q'):
|
|
return FakePOpen(returncode=1)
|
|
else:
|
|
return FakePOpen(returncode=0)
|
|
|
|
calls = []
|
|
for pack in ('httpd', 'wordpress', 'mysql-server'):
|
|
calls.append(['rpm', '-q', pack])
|
|
calls.append(['zypper', '-n', '--no-refresh', 'search', pack])
|
|
calls = popen_root_calls(calls)
|
|
|
|
packages = {
|
|
"zypper": {
|
|
"mysql-server": [],
|
|
"httpd": [],
|
|
"wordpress": []
|
|
}
|
|
}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
cfn_helper.PackagesHandler(packages).apply_packages()
|
|
mock_popen.assert_has_calls(calls, any_order=True)
|
|
|
|
def test_apt_install(self, mock_cp):
|
|
packages = {
|
|
"apt": {
|
|
"mysql-server": [],
|
|
"httpd": [],
|
|
"wordpress": []
|
|
}
|
|
}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen(returncode=0)
|
|
cfn_helper.PackagesHandler(packages).apply_packages()
|
|
self.assertTrue(mock_popen.called)
|
|
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
class TestServicesHandler(testtools.TestCase):
|
|
|
|
def test_services_handler_systemd(self, mock_cp):
|
|
calls = []
|
|
returns = []
|
|
|
|
# apply_services
|
|
calls.append(['/bin/systemctl', 'enable', 'httpd.service'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/bin/systemctl', 'enable', 'mysqld.service'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/bin/systemctl', 'status', 'mysqld.service'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/bin/systemctl', 'start', 'mysqld.service'])
|
|
returns.append(FakePOpen())
|
|
|
|
# monitor_services not running
|
|
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
|
|
returns.append(FakePOpen())
|
|
|
|
calls = popen_root_calls(calls)
|
|
|
|
calls.extend(popen_root_calls(['/bin/services_restarted'], shell=True))
|
|
returns.append(FakePOpen())
|
|
|
|
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
|
|
'mysqld.service']]))
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.extend(popen_root_calls([['/bin/systemctl', 'start',
|
|
'mysqld.service']]))
|
|
returns.append(FakePOpen())
|
|
|
|
calls.extend(popen_root_calls(['/bin/services_restarted'], shell=True))
|
|
returns.append(FakePOpen())
|
|
|
|
# monitor_services running
|
|
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
|
|
'httpd.service']]))
|
|
returns.append(FakePOpen())
|
|
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
|
|
'mysqld.service']]))
|
|
returns.append(FakePOpen())
|
|
|
|
services = {
|
|
"systemd": {
|
|
"mysqld": {"enabled": "true", "ensureRunning": "true"},
|
|
"httpd": {"enabled": "true", "ensureRunning": "true"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.return_value = True
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
# services not running
|
|
sh.monitor_services()
|
|
|
|
# services running
|
|
sh.monitor_services()
|
|
mock_popen.assert_has_calls(calls, any_order=True)
|
|
mock_exists.assert_called_with('/bin/systemctl')
|
|
|
|
def test_services_handler_systemd_disabled(self, mock_cp):
|
|
calls = []
|
|
|
|
# apply_services
|
|
calls.append(['/bin/systemctl', 'disable', 'httpd.service'])
|
|
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
|
|
calls.append(['/bin/systemctl', 'stop', 'httpd.service'])
|
|
calls.append(['/bin/systemctl', 'disable', 'mysqld.service'])
|
|
calls.append(['/bin/systemctl', 'status', 'mysqld.service'])
|
|
calls.append(['/bin/systemctl', 'stop', 'mysqld.service'])
|
|
calls = popen_root_calls(calls)
|
|
|
|
services = {
|
|
"systemd": {
|
|
"mysqld": {"enabled": "false", "ensureRunning": "false"},
|
|
"httpd": {"enabled": "false", "ensureRunning": "false"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.return_value = True
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen()
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
mock_popen.assert_has_calls(calls, any_order=True)
|
|
mock_exists.assert_called_with('/bin/systemctl')
|
|
|
|
def test_services_handler_sysv_service_chkconfig(self, mock_cp):
|
|
|
|
def exists(*args, **kwargs):
|
|
return args[0] != '/bin/systemctl'
|
|
|
|
calls = []
|
|
returns = []
|
|
|
|
# apply_services
|
|
calls.append(['/sbin/chkconfig', 'httpd', 'on'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/sbin/service', 'httpd', 'status'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/sbin/service', 'httpd', 'start'])
|
|
returns.append(FakePOpen())
|
|
|
|
# monitor_services not running
|
|
calls.append(['/sbin/service', 'httpd', 'status'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/sbin/service', 'httpd', 'start'])
|
|
returns.append(FakePOpen())
|
|
|
|
calls = popen_root_calls(calls)
|
|
|
|
calls.extend(popen_root_calls(['/bin/services_restarted'], shell=True))
|
|
returns.append(FakePOpen())
|
|
|
|
# monitor_services running
|
|
calls.extend(popen_root_calls([['/sbin/service', 'httpd', 'status']]))
|
|
returns.append(FakePOpen())
|
|
|
|
services = {
|
|
"sysvinit": {
|
|
"httpd": {"enabled": "true", "ensureRunning": "true"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.side_effect = exists
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
# services not running
|
|
sh.monitor_services()
|
|
|
|
# services running
|
|
sh.monitor_services()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_exists.assert_any_call('/bin/systemctl')
|
|
mock_exists.assert_any_call('/sbin/service')
|
|
mock_exists.assert_any_call('/sbin/chkconfig')
|
|
|
|
def test_services_handler_sysv_disabled_service_chkconfig(self, mock_cp):
|
|
def exists(*args, **kwargs):
|
|
return args[0] != '/bin/systemctl'
|
|
|
|
calls = []
|
|
|
|
# apply_services
|
|
calls.append(['/sbin/chkconfig', 'httpd', 'off'])
|
|
calls.append(['/sbin/service', 'httpd', 'status'])
|
|
calls.append(['/sbin/service', 'httpd', 'stop'])
|
|
|
|
calls = popen_root_calls(calls)
|
|
|
|
services = {
|
|
"sysvinit": {
|
|
"httpd": {"enabled": "false", "ensureRunning": "false"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.side_effect = exists
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen()
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_exists.assert_any_call('/bin/systemctl')
|
|
mock_exists.assert_any_call('/sbin/service')
|
|
mock_exists.assert_any_call('/sbin/chkconfig')
|
|
|
|
def test_services_handler_sysv_systemctl(self, mock_cp):
|
|
calls = []
|
|
returns = []
|
|
|
|
# apply_services
|
|
calls.append(['/bin/systemctl', 'enable', 'httpd.service'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
|
|
returns.append(FakePOpen())
|
|
|
|
# monitor_services not running
|
|
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/bin/systemctl', 'start', 'httpd.service'])
|
|
returns.append(FakePOpen())
|
|
|
|
shell_calls = []
|
|
shell_calls.append('/bin/services_restarted')
|
|
returns.append(FakePOpen())
|
|
|
|
calls = popen_root_calls(calls)
|
|
calls.extend(popen_root_calls(shell_calls, shell=True))
|
|
|
|
# monitor_services running
|
|
calls.extend(popen_root_calls([['/bin/systemctl', 'status',
|
|
'httpd.service']]))
|
|
returns.append(FakePOpen())
|
|
|
|
services = {
|
|
"sysvinit": {
|
|
"httpd": {"enabled": "true", "ensureRunning": "true"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.return_value = True
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
# services not running
|
|
sh.monitor_services()
|
|
|
|
# services running
|
|
sh.monitor_services()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_exists.assert_called_with('/bin/systemctl')
|
|
|
|
def test_services_handler_sysv_disabled_systemctl(self, mock_cp):
|
|
calls = []
|
|
|
|
# apply_services
|
|
calls.append(['/bin/systemctl', 'disable', 'httpd.service'])
|
|
calls.append(['/bin/systemctl', 'status', 'httpd.service'])
|
|
calls.append(['/bin/systemctl', 'stop', 'httpd.service'])
|
|
|
|
calls = popen_root_calls(calls)
|
|
|
|
services = {
|
|
"sysvinit": {
|
|
"httpd": {"enabled": "false", "ensureRunning": "false"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.return_value = True
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen()
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_exists.assert_called_with('/bin/systemctl')
|
|
|
|
def test_services_handler_sysv_service_updaterc(self, mock_cp):
|
|
calls = []
|
|
returns = []
|
|
|
|
# apply_services
|
|
calls.append(['/usr/sbin/update-rc.d', 'httpd', 'enable'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/usr/sbin/service', 'httpd', 'status'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/usr/sbin/service', 'httpd', 'start'])
|
|
returns.append(FakePOpen())
|
|
|
|
# monitor_services not running
|
|
calls.append(['/usr/sbin/service', 'httpd', 'status'])
|
|
returns.append(FakePOpen(returncode=-1))
|
|
calls.append(['/usr/sbin/service', 'httpd', 'start'])
|
|
returns.append(FakePOpen())
|
|
|
|
shell_calls = []
|
|
shell_calls.append('/bin/services_restarted')
|
|
returns.append(FakePOpen())
|
|
|
|
calls = popen_root_calls(calls)
|
|
calls.extend(popen_root_calls(shell_calls, shell=True))
|
|
|
|
# monitor_services running
|
|
calls.extend(popen_root_calls([['/usr/sbin/service', 'httpd',
|
|
'status']]))
|
|
returns.append(FakePOpen())
|
|
|
|
services = {
|
|
"sysvinit": {
|
|
"httpd": {"enabled": "true", "ensureRunning": "true"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.return_value = False
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
# services not running
|
|
sh.monitor_services()
|
|
|
|
# services running
|
|
sh.monitor_services()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_exists.assert_any_call('/bin/systemctl')
|
|
mock_exists.assert_any_call('/sbin/service')
|
|
mock_exists.assert_any_call('/sbin/chkconfig')
|
|
|
|
def test_services_handler_sysv_disabled_service_updaterc(self, mock_cp):
|
|
calls = []
|
|
returns = []
|
|
|
|
# apply_services
|
|
calls.append(['/usr/sbin/update-rc.d', 'httpd', 'disable'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/usr/sbin/service', 'httpd', 'status'])
|
|
returns.append(FakePOpen())
|
|
calls.append(['/usr/sbin/service', 'httpd', 'stop'])
|
|
returns.append(FakePOpen())
|
|
|
|
calls = popen_root_calls(calls)
|
|
|
|
services = {
|
|
"sysvinit": {
|
|
"httpd": {"enabled": "false", "ensureRunning": "false"}
|
|
}
|
|
}
|
|
hooks = [
|
|
cfn_helper.Hook(
|
|
'hook1',
|
|
'service.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'root',
|
|
'/bin/services_restarted')
|
|
]
|
|
|
|
with mock.patch('os.path.exists') as mock_exists:
|
|
mock_exists.return_value = False
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
sh = cfn_helper.ServicesHandler(services, 'resource1', hooks)
|
|
sh.apply_services()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_exists.assert_any_call('/bin/systemctl')
|
|
mock_exists.assert_any_call('/sbin/service')
|
|
mock_exists.assert_any_call('/sbin/chkconfig')
|
|
|
|
|
|
class TestHupConfig(testtools.TestCase):
|
|
|
|
def test_load_main_section(self):
|
|
fcreds = tempfile.NamedTemporaryFile()
|
|
fcreds.write('AWSAccessKeyId=foo\nAWSSecretKey=bar\n'.encode('UTF-8'))
|
|
fcreds.flush()
|
|
|
|
main_conf = tempfile.NamedTemporaryFile()
|
|
main_conf.write(('''[main]
|
|
stack=teststack
|
|
credential-file=%s''' % fcreds.name).encode('UTF-8'))
|
|
main_conf.flush()
|
|
mainconfig = cfn_helper.HupConfig([open(main_conf.name)])
|
|
self.assertEqual(
|
|
'{stack: teststack, credential_file: %s, '
|
|
'region: nova, interval:10}' % fcreds.name,
|
|
str(mainconfig))
|
|
main_conf.close()
|
|
|
|
main_conf = tempfile.NamedTemporaryFile()
|
|
main_conf.write(('''[main]
|
|
stack=teststack
|
|
region=region1
|
|
credential-file=%s-invalid
|
|
interval=120''' % fcreds.name).encode('UTF-8'))
|
|
main_conf.flush()
|
|
e = self.assertRaises(cfn_helper.InvalidCredentialsException,
|
|
cfn_helper.HupConfig,
|
|
[open(main_conf.name)])
|
|
self.assertIn('invalid credentials file', str(e))
|
|
fcreds.close()
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_hup_config(self, mock_cp):
|
|
hooks_conf = tempfile.NamedTemporaryFile()
|
|
|
|
def write_hook_conf(f, name, triggers, path, action):
|
|
f.write((
|
|
'[%s]\ntriggers=%s\npath=%s\naction=%s\nrunas=root\n\n' % (
|
|
name, triggers, path, action)).encode('UTF-8'))
|
|
|
|
write_hook_conf(
|
|
hooks_conf,
|
|
'hook2',
|
|
'service2.restarted',
|
|
'Resources.resource2.Metadata',
|
|
'/bin/hook2')
|
|
write_hook_conf(
|
|
hooks_conf,
|
|
'hook1',
|
|
'service1.restarted',
|
|
'Resources.resource1.Metadata',
|
|
'/bin/hook1')
|
|
write_hook_conf(
|
|
hooks_conf,
|
|
'hook3',
|
|
'service3.restarted',
|
|
'Resources.resource3.Metadata',
|
|
'/bin/hook3')
|
|
write_hook_conf(
|
|
hooks_conf,
|
|
'cfn-http-restarted',
|
|
'service.restarted',
|
|
'Resources.resource.Metadata',
|
|
'/bin/cfn-http-restarted')
|
|
hooks_conf.flush()
|
|
|
|
fcreds = tempfile.NamedTemporaryFile()
|
|
fcreds.write('AWSAccessKeyId=foo\nAWSSecretKey=bar\n'.encode('UTF-8'))
|
|
fcreds.flush()
|
|
|
|
main_conf = tempfile.NamedTemporaryFile()
|
|
main_conf.write(('''[main]
|
|
stack=teststack
|
|
credential-file=%s
|
|
region=region1
|
|
interval=120''' % fcreds.name).encode('UTF-8'))
|
|
main_conf.flush()
|
|
|
|
mainconfig = cfn_helper.HupConfig([
|
|
open(main_conf.name),
|
|
open(hooks_conf.name)])
|
|
unique_resources = mainconfig.unique_resources_get()
|
|
self.assertThat([
|
|
'resource',
|
|
'resource1',
|
|
'resource2',
|
|
'resource3',
|
|
], ttm.Equals(sorted(unique_resources)))
|
|
|
|
hooks = sorted(mainconfig.hooks,
|
|
key=lambda hook: hook.resource_name_get())
|
|
self.assertEqual(len(hooks), 4)
|
|
self.assertEqual(
|
|
'{cfn-http-restarted, service.restarted,'
|
|
' Resources.resource.Metadata, root, /bin/cfn-http-restarted}',
|
|
str(hooks[0]))
|
|
self.assertEqual(
|
|
'{hook1, service1.restarted, Resources.resource1.Metadata,'
|
|
' root, /bin/hook1}', str(hooks[1]))
|
|
self.assertEqual(
|
|
'{hook2, service2.restarted, Resources.resource2.Metadata,'
|
|
' root, /bin/hook2}', str(hooks[2]))
|
|
self.assertEqual(
|
|
'{hook3, service3.restarted, Resources.resource3.Metadata,'
|
|
' root, /bin/hook3}', str(hooks[3]))
|
|
|
|
calls = []
|
|
calls.extend(popen_root_calls(['/bin/cfn-http-restarted'], shell=True))
|
|
calls.extend(popen_root_calls(['/bin/hook1'], shell=True))
|
|
calls.extend(popen_root_calls(['/bin/hook2'], shell=True))
|
|
calls.extend(popen_root_calls(['/bin/hook3'], shell=True))
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen('All good')
|
|
|
|
for hook in hooks:
|
|
hook.event(hook.triggers, None, hook.resource_name_get())
|
|
|
|
hooks_conf.close()
|
|
fcreds.close()
|
|
main_conf.close()
|
|
mock_popen.assert_has_calls(calls)
|
|
|
|
|
|
class TestCfnHelper(testtools.TestCase):
|
|
|
|
def _check_metadata_content(self, content, value):
|
|
with tempfile.NamedTemporaryFile() as metadata_info:
|
|
metadata_info.write(content.encode('UTF-8'))
|
|
metadata_info.flush()
|
|
port = cfn_helper.metadata_server_port(metadata_info.name)
|
|
self.assertEqual(value, port)
|
|
|
|
def test_metadata_server_port(self):
|
|
self._check_metadata_content("http://172.20.42.42:8000\n", 8000)
|
|
|
|
def test_metadata_server_port_https(self):
|
|
self._check_metadata_content("https://abc.foo.bar:6969\n", 6969)
|
|
|
|
def test_metadata_server_port_noport(self):
|
|
self._check_metadata_content("http://172.20.42.42\n", None)
|
|
|
|
def test_metadata_server_port_justip(self):
|
|
self._check_metadata_content("172.20.42.42", None)
|
|
|
|
def test_metadata_server_port_weird(self):
|
|
self._check_metadata_content("::::", None)
|
|
self._check_metadata_content("beforecolons:aftercolons", None)
|
|
|
|
def test_metadata_server_port_emptyfile(self):
|
|
self._check_metadata_content("\n", None)
|
|
self._check_metadata_content("", None)
|
|
|
|
def test_metadata_server_nofile(self):
|
|
random_filename = self.getUniqueString()
|
|
self.assertIsNone(cfn_helper.metadata_server_port(random_filename))
|
|
|
|
def test_to_boolean(self):
|
|
self.assertTrue(cfn_helper.to_boolean(True))
|
|
self.assertTrue(cfn_helper.to_boolean('true'))
|
|
self.assertTrue(cfn_helper.to_boolean('yes'))
|
|
self.assertTrue(cfn_helper.to_boolean('1'))
|
|
self.assertTrue(cfn_helper.to_boolean(1))
|
|
|
|
self.assertFalse(cfn_helper.to_boolean(False))
|
|
self.assertFalse(cfn_helper.to_boolean('false'))
|
|
self.assertFalse(cfn_helper.to_boolean('no'))
|
|
self.assertFalse(cfn_helper.to_boolean('0'))
|
|
self.assertFalse(cfn_helper.to_boolean(0))
|
|
self.assertFalse(cfn_helper.to_boolean(None))
|
|
self.assertFalse(cfn_helper.to_boolean('fingle'))
|
|
|
|
def test_parse_creds_file(self):
|
|
def parse_creds_test(file_contents, creds_match):
|
|
with tempfile.NamedTemporaryFile(mode='w') as fcreds:
|
|
fcreds.write(file_contents)
|
|
fcreds.flush()
|
|
creds = cfn_helper.parse_creds_file(fcreds.name)
|
|
self.assertThat(creds_match, ttm.Equals(creds))
|
|
parse_creds_test(
|
|
'AWSAccessKeyId=foo\nAWSSecretKey=bar\n',
|
|
{'AWSAccessKeyId': 'foo', 'AWSSecretKey': 'bar'}
|
|
)
|
|
parse_creds_test(
|
|
'AWSAccessKeyId =foo\nAWSSecretKey= bar\n',
|
|
{'AWSAccessKeyId': 'foo', 'AWSSecretKey': 'bar'}
|
|
)
|
|
parse_creds_test(
|
|
'AWSAccessKeyId = foo\nAWSSecretKey = bar\n',
|
|
{'AWSAccessKeyId': 'foo', 'AWSSecretKey': 'bar'}
|
|
)
|
|
|
|
|
|
class TestMetadataRetrieve(testtools.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestMetadataRetrieve, self).setUp()
|
|
self.tdir = self.useFixture(fixtures.TempDir())
|
|
self.last_file = os.path.join(self.tdir.path, 'last_metadata')
|
|
|
|
def test_metadata_retrieve_files(self):
|
|
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
"/tmp/foo": {"content": "bar"}}}}}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w+') as default_file:
|
|
default_file.write(md_str)
|
|
default_file.flush()
|
|
self.assertThat(default_file.name, ttm.FileContains(md_str))
|
|
|
|
self.assertTrue(
|
|
md.retrieve(default_path=default_file.name,
|
|
last_path=self.last_file))
|
|
|
|
self.assertThat(self.last_file, ttm.FileContains(md_str))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(default_path=default_file.name,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
|
|
def test_metadata_retrieve_none(self):
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
default_file = os.path.join(self.tdir.path, 'default_file')
|
|
|
|
self.assertFalse(md.retrieve(default_path=default_file,
|
|
last_path=self.last_file))
|
|
self.assertIsNone(md._metadata)
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display()
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
|
|
|
|
def test_metadata_retrieve_passed(self):
|
|
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
"/tmp/foo": {"content": "bar"}}}}}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(meta_str=md_data,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertEqual(md_str, str(md))
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display()
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
|
|
"{\"AWS::CloudFormation::Init\": {\"config\": {"
|
|
"\"files\": {\"/tmp/foo\": {\"content\": \"bar\"}"
|
|
"}}}}\n")
|
|
|
|
def test_metadata_retrieve_by_key_passed(self):
|
|
|
|
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(meta_str=md_data,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertEqual(md_str, str(md))
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display("foo")
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
|
|
"{\"bar\": {\"fred.1\": \"abcd\"}}\n")
|
|
|
|
def test_metadata_retrieve_by_nested_key_passed(self):
|
|
|
|
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(meta_str=md_data,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertEqual(md_str, str(md))
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display("foo.bar.'fred.1'")
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(),
|
|
'"abcd"\n')
|
|
|
|
def test_metadata_retrieve_key_none(self):
|
|
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
"/tmp/foo": {"content": "bar"}}}}}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(meta_str=md_data,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertEqual(md_str, str(md))
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display("no_key")
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
|
|
|
|
def test_metadata_retrieve_by_nested_key_none(self):
|
|
|
|
md_data = {"foo": {"bar": {"fred.1": "abcd"}}}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(meta_str=md_data,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertEqual(md_str, str(md))
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display("foo.fred")
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
|
|
|
|
def test_metadata_retrieve_by_nested_key_none_with_matching_string(self):
|
|
|
|
md_data = {"foo": "bar"}
|
|
md_str = json.dumps(md_data)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(md.retrieve(meta_str=md_data,
|
|
last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertEqual(md_str, str(md))
|
|
|
|
displayed = self.useFixture(fixtures.StringStream('stdout'))
|
|
fake_stdout = displayed.stream
|
|
self.useFixture(fixtures.MonkeyPatch('sys.stdout', fake_stdout))
|
|
md.display("foo.bar")
|
|
fake_stdout.flush()
|
|
self.assertEqual(displayed.getDetails()['stdout'].as_text(), "")
|
|
|
|
def test_metadata_creates_cache(self):
|
|
temp_home = tempfile.mkdtemp()
|
|
|
|
def cleanup_temp_home(thome):
|
|
os.unlink(os.path.join(thome, 'cache', 'last_metadata'))
|
|
os.rmdir(os.path.join(thome, 'cache'))
|
|
os.rmdir(os.path.join(thome))
|
|
|
|
self.addCleanup(cleanup_temp_home, temp_home)
|
|
|
|
last_path = os.path.join(temp_home, 'cache', 'last_metadata')
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
"/tmp/foo": {"content": "bar"}}}}}
|
|
md_str = json.dumps(md_data)
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
|
|
self.assertFalse(os.path.exists(last_path),
|
|
"last_metadata file already exists")
|
|
self.assertTrue(md.retrieve(meta_str=md_str, last_path=last_path))
|
|
self.assertTrue(os.path.exists(last_path),
|
|
"last_metadata file should exist")
|
|
# Ensure created dirs and file have right perms
|
|
self.assertTrue(os.stat(last_path).st_mode & 0o600 == 0o600)
|
|
self.assertTrue(
|
|
os.stat(os.path.dirname(last_path)).st_mode & 0o700 == 0o700)
|
|
|
|
def test_is_valid_metadata(self):
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
"/tmp/foo": {"content": "bar"}}}}}
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(
|
|
md.retrieve(meta_str=md_data, last_path=self.last_file))
|
|
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
self.assertTrue(md._is_valid_metadata())
|
|
self.assertThat(
|
|
md_data['AWS::CloudFormation::Init'], ttm.Equals(md._metadata))
|
|
|
|
def test_remote_metadata(self):
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
"/tmp/foo": {"content": "bar"}}}}}
|
|
|
|
with mock.patch.object(
|
|
cfn.CloudFormationConnection, 'describe_stack_resource'
|
|
) as mock_dsr:
|
|
mock_dsr.return_value = {
|
|
'DescribeStackResourceResponse': {
|
|
'DescribeStackResourceResult': {
|
|
'StackResourceDetail': {'Metadata': md_data}}}}
|
|
md = cfn_helper.Metadata(
|
|
'teststack',
|
|
None,
|
|
access_key='foo',
|
|
secret_key='bar')
|
|
self.assertTrue(md.retrieve(last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w') as fcreds:
|
|
fcreds.write('AWSAccessKeyId=foo\nAWSSecretKey=bar\n')
|
|
fcreds.flush()
|
|
md = cfn_helper.Metadata(
|
|
'teststack', None, credentials_file=fcreds.name)
|
|
self.assertTrue(md.retrieve(last_path=self.last_file))
|
|
self.assertThat(md_data, ttm.Equals(md._metadata))
|
|
|
|
def test_nova_meta_with_cache(self):
|
|
meta_in = {"uuid": "f9431d18-d971-434d-9044-5b38f5b4646f",
|
|
"availability_zone": "nova",
|
|
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
|
|
"launch_index": 0,
|
|
"meta": {},
|
|
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
|
|
"name": "as-WikiDatabase-4ykioj3lgi57"}
|
|
md_str = json.dumps(meta_in)
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
with tempfile.NamedTemporaryFile(mode='w+') as default_file:
|
|
default_file.write(md_str)
|
|
default_file.flush()
|
|
self.assertThat(default_file.name, ttm.FileContains(md_str))
|
|
meta_out = md.get_nova_meta(cache_path=default_file.name)
|
|
|
|
self.assertEqual(meta_in, meta_out)
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_nova_meta_curl(self, mock_cp):
|
|
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
|
|
temp_home = tempfile.mkdtemp()
|
|
cache_path = os.path.join(temp_home, 'meta_data.json')
|
|
|
|
def cleanup_temp_home(thome):
|
|
os.unlink(cache_path)
|
|
os.rmdir(thome)
|
|
|
|
self.addCleanup(cleanup_temp_home, temp_home)
|
|
|
|
meta_in = {"uuid": "f9431d18-d971-434d-9044-5b38f5b4646f",
|
|
"availability_zone": "nova",
|
|
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
|
|
"launch_index": 0,
|
|
"meta": {"freddy": "is hungry"},
|
|
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
|
|
"name": "as-WikiDatabase-4ykioj3lgi57"}
|
|
md_str = json.dumps(meta_in)
|
|
|
|
def write_cache_file(*params, **kwargs):
|
|
with open(cache_path, 'w+') as cache_file:
|
|
cache_file.write(md_str)
|
|
cache_file.flush()
|
|
self.assertThat(cache_file.name, ttm.FileContains(md_str))
|
|
return FakePOpen('Downloaded', '', 0)
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = write_cache_file
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
meta_out = md.get_nova_meta(cache_path=cache_path)
|
|
self.assertEqual(meta_in, meta_out)
|
|
mock_popen.assert_has_calls(
|
|
popen_root_calls([['curl', '-o', cache_path, url]]))
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_nova_meta_curl_corrupt(self, mock_cp):
|
|
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
|
|
temp_home = tempfile.mkdtemp()
|
|
cache_path = os.path.join(temp_home, 'meta_data.json')
|
|
|
|
def cleanup_temp_home(thome):
|
|
os.unlink(cache_path)
|
|
os.rmdir(thome)
|
|
|
|
self.addCleanup(cleanup_temp_home, temp_home)
|
|
|
|
md_str = "this { is not really json"
|
|
|
|
def write_cache_file(*params, **kwargs):
|
|
with open(cache_path, 'w+') as cache_file:
|
|
cache_file.write(md_str)
|
|
cache_file.flush()
|
|
self.assertThat(cache_file.name, ttm.FileContains(md_str))
|
|
return FakePOpen('Downloaded', '', 0)
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = write_cache_file
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
meta_out = md.get_nova_meta(cache_path=cache_path)
|
|
self.assertIsNone(meta_out)
|
|
mock_popen.assert_has_calls(
|
|
popen_root_calls([['curl', '-o', cache_path, url]]))
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_nova_meta_curl_failed(self, mock_cp):
|
|
url = 'http://169.254.169.254/openstack/2012-08-10/meta_data.json'
|
|
temp_home = tempfile.mkdtemp()
|
|
cache_path = os.path.join(temp_home, 'meta_data.json')
|
|
|
|
def cleanup_temp_home(thome):
|
|
os.rmdir(thome)
|
|
|
|
self.addCleanup(cleanup_temp_home, temp_home)
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen('Failed', '', 1)
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
meta_out = md.get_nova_meta(cache_path=cache_path)
|
|
self.assertIsNone(meta_out)
|
|
mock_popen.assert_has_calls(
|
|
popen_root_calls([['curl', '-o', cache_path, url]]))
|
|
|
|
def test_get_tags(self):
|
|
fake_tags = {'foo': 'fee',
|
|
'apple': 'red'}
|
|
md_data = {"uuid": "f9431d18-d971-434d-9044-5b38f5b4646f",
|
|
"availability_zone": "nova",
|
|
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
|
|
"launch_index": 0,
|
|
"meta": fake_tags,
|
|
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
|
|
"name": "as-WikiDatabase-4ykioj3lgi57"}
|
|
tags_expect = fake_tags
|
|
tags_expect['InstanceId'] = md_data['uuid']
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
|
|
with mock.patch.object(md, 'get_nova_meta') as mock_method:
|
|
mock_method.return_value = md_data
|
|
tags = md.get_tags()
|
|
mock_method.assert_called_once_with()
|
|
|
|
self.assertEqual(tags_expect, tags)
|
|
|
|
def test_get_instance_id(self):
|
|
uuid = "f9431d18-d971-434d-9044-5b38f5b4646f"
|
|
md_data = {"uuid": uuid,
|
|
"availability_zone": "nova",
|
|
"hostname": "as-wikidatabase-4ykioj3lgi57.novalocal",
|
|
"launch_index": 0,
|
|
"public_keys": {"heat_key": "ssh-rsa etc...\n"},
|
|
"name": "as-WikiDatabase-4ykioj3lgi57"}
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
|
|
with mock.patch.object(md, 'get_nova_meta') as mock_method:
|
|
mock_method.return_value = md_data
|
|
self.assertEqual(md.get_instance_id(), uuid)
|
|
mock_method.assert_called_once_with()
|
|
|
|
|
|
class TestCfnInit(testtools.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestCfnInit, self).setUp()
|
|
self.tdir = self.useFixture(fixtures.TempDir())
|
|
self.last_file = os.path.join(self.tdir.path, 'last_metadata')
|
|
|
|
def test_cfn_init(self):
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w+') as foo_file:
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"files": {
|
|
foo_file.name: {"content": "bar"}}}}}
|
|
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(
|
|
md.retrieve(meta_str=md_data, last_path=self.last_file))
|
|
md.cfn_init()
|
|
self.assertThat(foo_file.name, ttm.FileContains('bar'))
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_cfn_init_with_ignore_errors_false(self, mock_cp):
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"commands": {
|
|
"00_foo": {"command": "/bin/command1",
|
|
"ignoreErrors": "false"}}}}}
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen('Doing something', 'error', -1)
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(
|
|
md.retrieve(meta_str=md_data, last_path=self.last_file))
|
|
self.assertRaises(cfn_helper.CommandsHandlerRunError, md.cfn_init)
|
|
mock_popen.assert_has_calls(popen_root_calls(['/bin/command1'],
|
|
shell=True))
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_cfn_init_with_ignore_errors_true(self, mock_cp):
|
|
calls = []
|
|
returns = []
|
|
calls.extend(popen_root_calls(['/bin/command1'], shell=True))
|
|
returns.append(FakePOpen('Doing something', 'error', -1))
|
|
calls.extend(popen_root_calls(['/bin/command2'], shell=True))
|
|
returns.append(FakePOpen('All good'))
|
|
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"commands": {
|
|
"00_foo": {"command": "/bin/command1",
|
|
"ignoreErrors": "true"},
|
|
"01_bar": {"command": "/bin/command2",
|
|
"ignoreErrors": "false"}
|
|
}}}}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(
|
|
md.retrieve(meta_str=md_data, last_path=self.last_file))
|
|
md.cfn_init()
|
|
mock_popen.assert_has_calls(calls)
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_cfn_init_runs_list_commands_without_shell(self, mock_cp):
|
|
calls = []
|
|
returns = []
|
|
# command supplied as list shouldn't run on shell
|
|
calls.extend(popen_root_calls([['/bin/command1', 'arg']], shell=False))
|
|
returns.append(FakePOpen('Doing something'))
|
|
# command supplied as string should run on shell
|
|
calls.extend(popen_root_calls(['/bin/command2'], shell=True))
|
|
returns.append(FakePOpen('All good'))
|
|
|
|
md_data = {"AWS::CloudFormation::Init": {"config": {"commands": {
|
|
"00_foo": {"command": ["/bin/command1", "arg"]},
|
|
"01_bar": {"command": "/bin/command2"}
|
|
}}}}
|
|
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.side_effect = returns
|
|
md = cfn_helper.Metadata('teststack', None)
|
|
self.assertTrue(
|
|
md.retrieve(meta_str=md_data, last_path=self.last_file))
|
|
md.cfn_init()
|
|
mock_popen.assert_has_calls(calls)
|
|
|
|
|
|
class TestSourcesHandler(testtools.TestCase):
|
|
def test_apply_sources_empty(self):
|
|
sh = cfn_helper.SourcesHandler({})
|
|
sh.apply_sources()
|
|
|
|
def _test_apply_sources(self, url, end_file):
|
|
dest = tempfile.mkdtemp()
|
|
self.addCleanup(os.rmdir, dest)
|
|
sources = {dest: url}
|
|
td = os.path.dirname(end_file)
|
|
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
|
|
calls = popen_root_calls([er % (dest, dest, url)], shell=True)
|
|
|
|
with mock.patch.object(tempfile, 'mkdtemp') as mock_mkdtemp:
|
|
mock_mkdtemp.return_value = td
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen('Curl good')
|
|
sh = cfn_helper.SourcesHandler(sources)
|
|
sh.apply_sources()
|
|
mock_popen.assert_has_calls(calls)
|
|
mock_mkdtemp.assert_called_with()
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_apply_sources_github(self, mock_cp):
|
|
url = "https://github.com/NoSuchProject/tarball/NoSuchTarball"
|
|
dest = tempfile.mkdtemp()
|
|
self.addCleanup(os.rmdir, dest)
|
|
sources = {dest: url}
|
|
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
|
|
calls = popen_root_calls([er % (dest, dest, url)], shell=True)
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen('Curl good')
|
|
sh = cfn_helper.SourcesHandler(sources)
|
|
sh.apply_sources()
|
|
mock_popen.assert_has_calls(calls)
|
|
|
|
@mock.patch.object(cfn_helper, 'controlled_privileges')
|
|
def test_apply_sources_general(self, mock_cp):
|
|
url = "https://website.no.existe/a/b/c/file.tar.gz"
|
|
dest = tempfile.mkdtemp()
|
|
self.addCleanup(os.rmdir, dest)
|
|
sources = {dest: url}
|
|
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | gunzip | tar -xvf -"
|
|
calls = popen_root_calls([er % (dest, dest, url)], shell=True)
|
|
with mock.patch('subprocess.Popen') as mock_popen:
|
|
mock_popen.return_value = FakePOpen('Curl good')
|
|
sh = cfn_helper.SourcesHandler(sources)
|
|
sh.apply_sources()
|
|
mock_popen.assert_has_calls(calls)
|
|
|
|
def test_apply_source_cmd(self):
|
|
sh = cfn_helper.SourcesHandler({})
|
|
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | %s | tar -xvf -"
|
|
dest = '/tmp'
|
|
# test tgz
|
|
url = 'http://www.example.com/a.tgz'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
|
|
# test tar.gz
|
|
url = 'http://www.example.com/a.tar.gz'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
|
|
# test github - tarball 1
|
|
url = 'https://github.com/openstack/heat-cfntools/tarball/master'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
|
|
# test github - tarball 2
|
|
url = 'https://github.com/openstack/heat-cfntools/tarball/master/'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "gunzip"), cmd)
|
|
# test tbz2
|
|
url = 'http://www.example.com/a.tbz2'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "bunzip2"), cmd)
|
|
# test tar.bz2
|
|
url = 'http://www.example.com/a.tar.bz2'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "bunzip2"), cmd)
|
|
# test zip
|
|
er = "mkdir -p '%s'; cd '%s'; curl -s -o '%s' '%s' && unzip -o '%s'"
|
|
url = 'http://www.example.com/a.zip'
|
|
d = "/tmp/tmp2I0yNK"
|
|
tmp = "%s/a.zip" % d
|
|
with mock.patch.object(tempfile, 'mkdtemp') as mock_mkdtemp:
|
|
mock_mkdtemp.return_value = d
|
|
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, tmp, url, tmp), cmd)
|
|
# test gz
|
|
er = "mkdir -p '%s'; cd '%s'; curl -s '%s' | %s > '%s'"
|
|
url = 'http://www.example.com/a.sh.gz'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "gunzip", "a.sh"), cmd)
|
|
# test bz2
|
|
url = 'http://www.example.com/a.sh.bz2'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual(er % (dest, dest, url, "bunzip2", "a.sh"), cmd)
|
|
# test other
|
|
url = 'http://www.example.com/a.sh'
|
|
cmd = sh._apply_source_cmd(dest, url)
|
|
self.assertEqual("", cmd)
|
|
mock_mkdtemp.assert_called_with()
|