Add a bunch of new merging test files + runner.

This commit is contained in:
Joshua Harlow 2013-04-23 22:53:53 -07:00
parent c7e659f466
commit 7ea735dd35
13 changed files with 94 additions and 137 deletions

View File

@ -0,0 +1 @@
Blah: ['blah2', 'b']

View File

@ -0,0 +1,3 @@
Blah: 3
Blah2: 2
Blah3: [1]

View File

@ -0,0 +1 @@
Blah: [blah2, 'blah1']

View File

@ -0,0 +1,2 @@
#cloud-config
Blah: {}

View File

@ -0,0 +1,3 @@
#cloud-config
Blah: ['blah2']

View File

@ -0,0 +1,5 @@
#cloud-config
Blah: ['b']
merge_how: 'dict(recurse_array,no_replace)+list(append)'

View File

@ -0,0 +1,6 @@
#cloud-config
Blah: 1
Blah2: 2
Blah3: 3

View File

@ -0,0 +1,5 @@
#cloud-config
Blah: 3
Blah2: 2
Blah3: [1]

View File

@ -0,0 +1,4 @@
#cloud-config
Blah: ['blah1']

View File

@ -0,0 +1,4 @@
#cloud-config
Blah: ['blah2']
merge_how: 'dict(recurse_array,no_replace)+list(prepend)'

View File

@ -0,0 +1,3 @@
#cloud-config
Blah:
b: 1

View File

@ -0,0 +1,6 @@
#cloud-config
Blah:
b: null
merge_how: 'dict(allow_delete,no_replace)+list()'

View File

@ -1,142 +1,56 @@
from tests.unittests import helpers
from cloudinit import mergers
from cloudinit.handlers import cloud_config
from cloudinit.handlers import (CONTENT_START, CONTENT_END)
from cloudinit import helpers as c_helpers
from cloudinit import util
import collections
import glob
import os
import re
class TestSimpleRun(helpers.MockerTestCase):
def test_basic_merge(self):
source = {
'Blah': ['blah2'],
'Blah3': 'c',
}
merge_with = {
'Blah2': ['blah3'],
'Blah3': 'b',
'Blah': ['123'],
}
# Basic merge should not do thing special
merge_how = "list()+dict()+str()"
merger_set = mergers.string_extract_mergers(merge_how)
self.assertEquals(3, len(merger_set))
merger = mergers.construct(merger_set)
merged = merger.merge(source, merge_with)
self.assertEquals(merged['Blah'], ['blah2'])
self.assertEquals(merged['Blah2'], ['blah3'])
self.assertEquals(merged['Blah3'], 'c')
class TestSimpleRun(helpers.ResourceUsingTestCase):
def _load_merge_files(self, data_dir):
merge_root = self.resourceLocation(data_dir)
tests = []
source_ids = collections.defaultdict(list)
expected_files = {}
for fn in glob.glob(os.path.join(merge_root, "source*.*yaml")):
base_fn = os.path.basename(fn)
file_id = re.match(r"source(\d+)\-(\d+)[.]yaml", base_fn)
if not file_id:
raise IOError("File %s does not have a numeric identifier"
% (fn))
file_id = int(file_id.group(1))
source_ids[file_id].append(fn)
expected_fn = os.path.join(merge_root,
"expected%s.yaml" % (file_id))
if not os.path.isfile(expected_fn):
raise IOError("No expected file found at %s" % (expected_fn))
expected_files[file_id] = expected_fn
for id in sorted(source_ids.keys()):
source_file_contents = []
for fn in sorted(source_ids[id]):
source_file_contents.append(util.load_file(fn))
expected = util.load_yaml(util.load_file(expected_files[id]))
tests.append((source_file_contents, expected))
return tests
def test_dict_overwrite(self):
source = {
'Blah': ['blah2'],
}
merge_with = {
'Blah': ['123'],
}
# Now lets try a dict overwrite
merge_how = "list()+dict(overwrite)+str()"
merger_set = mergers.string_extract_mergers(merge_how)
self.assertEquals(3, len(merger_set))
merger = mergers.construct(merger_set)
merged = merger.merge(source, merge_with)
self.assertEquals(merged['Blah'], ['123'])
def test_string_append(self):
source = {
'Blah': 'blah2',
}
merge_with = {
'Blah': '345',
}
merge_how = "list()+dict()+str(append)"
merger_set = mergers.string_extract_mergers(merge_how)
self.assertEquals(3, len(merger_set))
merger = mergers.construct(merger_set)
merged = merger.merge(source, merge_with)
self.assertEquals(merged['Blah'], 'blah2345')
def test_list_extend(self):
source = ['abc']
merge_with = ['123']
merge_how = "list(extend)+dict()+str()"
merger_set = mergers.string_extract_mergers(merge_how)
self.assertEquals(3, len(merger_set))
merger = mergers.construct(merger_set)
merged = merger.merge(source, merge_with)
self.assertEquals(merged, ['abc', '123'])
def test_deep_merge(self):
source = {
'a': [1, 'b', 2],
'b': 'blahblah',
'c': {
'e': [1, 2, 3],
'f': 'bigblobof',
'iamadict': {
'ok': 'ok',
}
},
'run': [
'runme',
'runme2',
],
'runmereally': [
'e', ['a'], 'd',
],
}
merge_with = {
'a': ['e', 'f', 'g'],
'b': 'more',
'c': {
'a': 'b',
'f': 'stuff',
},
'run': [
'morecmd',
'moremoremore',
],
'runmereally': [
'blah', ['b'], 'e',
],
}
merge_how = "list(extend)+dict()+str(append)"
merger_set = mergers.string_extract_mergers(merge_how)
self.assertEquals(3, len(merger_set))
merger = mergers.construct(merger_set)
merged = merger.merge(source, merge_with)
self.assertEquals(merged['a'], [1, 'b', 2, 'e', 'f', 'g'])
self.assertEquals(merged['b'], 'blahblahmore')
self.assertEquals(merged['c']['f'], 'bigblobofstuff')
self.assertEquals(merged['run'], ['runme', 'runme2', 'morecmd',
'moremoremore'])
self.assertEquals(merged['runmereally'], ['e', ['a'], 'd', 'blah',
['b'], 'e'])
def test_dict_overwrite_layered(self):
source = {
'Blah3': {
'f': '3',
'g': {
'a': 'b',
}
}
}
merge_with = {
'Blah3': {
'e': '2',
'g': {
'e': 'f',
}
}
}
merge_how = "list()+dict()+str()"
merger_set = mergers.string_extract_mergers(merge_how)
self.assertEquals(3, len(merger_set))
merger = mergers.construct(merger_set)
merged = merger.merge(source, merge_with)
self.assertEquals(merged['Blah3'], {
'e': '2',
'f': '3',
'g': {
'a': 'b',
'e': 'f',
}
})
def test_merge_samples(self):
tests = self._load_merge_files('merge_sources')
paths = c_helpers.Paths({})
cc_handler = cloud_config.CloudConfigPartHandler(paths)
cc_handler.cloud_fn = None
for (payloads, expected_merge) in tests:
cc_handler.handle_part(None, CONTENT_START, None,
None, None, None)
for (i, p) in enumerate(payloads):
cc_handler.handle_part(None, None, "t-%s.yaml" % (i + 1),
p, None, {})
merged_buf = cc_handler.cloud_buf
cc_handler.handle_part(None, CONTENT_END, None,
None, None, None)
self.assertEquals(expected_merge, merged_buf)