Refactor loguserdata.py so it can be tested.

- Use distutils.version.LooseVersion for cloud-init version check
- Fix bug 1100287 by setting the following modes:
  - 0600 /var/log/heat-provision.log
  - 0700 /var/lib/heat
  - 0700 /var/lib/cloud/data/cfn-userdata (was 0111!)
- Full test coverage except for where __name__ == '__main__'
- File size has gone from 1218 bytes to 1636. If necessary we could reduce size in the future by using short names

This works for me when launching a template. At least if there are any regressions they can have a test written for the fix.
Change-Id: I04e773a743ec210e90394e50d2bb70c70664e80e
This commit is contained in:
Steve Baker 2013-02-05 10:10:11 +13:00
parent ed14ca1426
commit c598d0d3e8
3 changed files with 202 additions and 21 deletions

View File

View File

@ -2,40 +2,64 @@
import sys
import os
import stat
import subprocess
import datetime
import pkg_resources
from distutils.version import LooseVersion
import errno
path = '/var/lib/cloud/data'
ci_version = pkg_resources.get_distribution('cloud-init').version.split('.')
if ci_version[0] <= 0 and ci_version[1] < 6:
# pre 0.6.0 - user data executed via cloudinit, not this helper
with open('/var/log/heat-provision.log', 'w') as log:
def chk_ci_version():
v = LooseVersion(pkg_resources.get_distribution('cloud-init').version)
return v >= LooseVersion('0.6.0')
def create_log(path):
fd = os.open(path, os.O_WRONLY | os.O_CREAT, 0600)
return os.fdopen(fd, 'w')
def call(args, log):
log.write('%s\n' % ' '.join(args))
log.flush()
p = subprocess.Popen(args, stdout=log, stderr=log)
p.wait()
return p.returncode
def main(log):
if not chk_ci_version():
# pre 0.6.0 - user data executed via cloudinit, not this helper
log.write('Unable to log provisioning, need a newer version of'
' cloud-init\n')
sys.exit(0)
return -1
os.chmod(os.path.join(path, 'cfn-userdata'),
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
userdata_path = os.path.join(path, 'cfn-userdata')
os.chmod(userdata_path, 0700)
with open('/var/log/heat-provision.log', 'w') as log:
log.write('Provision began: %s\n' % datetime.datetime.now())
log.flush()
p = subprocess.Popen(os.path.join(path, 'cfn-userdata'),
stdout=log, stderr=log)
p.wait()
returncode = call([userdata_path], log)
log.write('Provision done: %s\n' % datetime.datetime.now())
if p.returncode:
sys.exit(p.returncode)
if returncode:
return returncode
try:
os.makedirs('/var/lib/heat')
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.makedirs('/var/lib/heat', 0700)
except OSError as e:
if e.errno != errno.EEXIST:
raise
with open('/var/lib/heat/provision-finished', 'w') as log:
log.write('%s\n' % datetime.datetime.now())
if __name__ == '__main__':
with create_log('/var/log/heat-provision.log') as log:
returncode = main(log)
if returncode:
log.write('Provision failed')
sys.exit(returncode)
with create_log('/var/lib/heat/provision-finished') as log:
log.write('%s\n' % datetime.datetime.now())

View File

@ -0,0 +1,157 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mox
import os
import pkg_resources
import subprocess
import unittest
import stat
import StringIO
from nose.plugins.attrib import attr
import heat.cloudinit.loguserdata as loguserdata
class FakeCiVersion():
def __init__(self, version=None):
self.version = version
class FakePOpen():
def __init__(self, returncode=0):
self.returncode = returncode
def wait(self):
pass
@attr(tag=['unit'])
@attr(speed='fast')
class LoguserdataTest(unittest.TestCase):
def setUp(self):
self.m = mox.Mox()
self.m.StubOutWithMock(pkg_resources, 'get_distribution')
self.m.StubOutWithMock(subprocess, 'Popen')
self.m.StubOutWithMock(os, 'chmod')
self.m.StubOutWithMock(os, 'makedirs')
def tearDown(self):
self.m.UnsetStubs()
def test_ci_version(self):
# too old versions
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.5.0'))
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.5.9'))
# new enough versions
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.6.0'))
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.7.0'))
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('1.0'))
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('2.0'))
self.m.ReplayAll()
self.assertFalse(loguserdata.chk_ci_version())
self.assertFalse(loguserdata.chk_ci_version())
self.assertTrue(loguserdata.chk_ci_version())
self.assertTrue(loguserdata.chk_ci_version())
self.assertTrue(loguserdata.chk_ci_version())
self.assertTrue(loguserdata.chk_ci_version())
self.m.VerifyAll()
def test_call(self):
log = StringIO.StringIO()
subprocess.Popen(
['echo', 'hi'],
stderr=log,
stdout=log).AndReturn(FakePOpen(0))
self.m.ReplayAll()
self.assertEqual(0, loguserdata.call(['echo', 'hi'], log))
self.m.VerifyAll()
def test_create_log(self):
log_name = os.tmpnam()
with loguserdata.create_log(log_name) as log:
log.write('testing')
log = open(log_name, 'r')
self.assertEqual('testing', log.read())
mode = os.stat(log_name).st_mode
self.assertEqual(0600, stat.S_IMODE(mode))
def test_main(self):
log = StringIO.StringIO()
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.7.0'))
os.chmod('/var/lib/cloud/data/cfn-userdata', 0700).AndReturn(None)
subprocess.Popen(
['/var/lib/cloud/data/cfn-userdata'],
stderr=log,
stdout=log).AndReturn(FakePOpen(0))
os.makedirs('/var/lib/heat', 0700).AndReturn(None)
self.m.ReplayAll()
loguserdata.main(log)
self.m.VerifyAll()
def test_main_fails(self):
log = StringIO.StringIO()
#fail on ci version
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.5.0'))
#fail on execute cfn-userdata
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.7.0'))
os.chmod('/var/lib/cloud/data/cfn-userdata', 0700).AndReturn(None)
subprocess.Popen(
['/var/lib/cloud/data/cfn-userdata'],
stderr=log,
stdout=log).AndReturn(FakePOpen(-2))
#fail on create directories
pkg_resources.get_distribution('cloud-init').AndReturn(
FakeCiVersion('0.7.0'))
os.chmod('/var/lib/cloud/data/cfn-userdata', 0700).AndReturn(None)
subprocess.Popen(
['/var/lib/cloud/data/cfn-userdata'],
stderr=log,
stdout=log).AndReturn(FakePOpen(0))
os.makedirs('/var/lib/heat', 0700).AndRaise(OSError())
self.m.ReplayAll()
self.assertEqual(-1, loguserdata.main(log))
self.assertEqual(-2, loguserdata.main(log))
self.assertRaises(OSError, loguserdata.main, log)
self.m.VerifyAll()