Major Cleanup - Get command line working
* Remove Dead code * Deal better with various directories * Add logging for missing stuff Change-Id: Ia9f6b406c54f3f561ae360a708966e8fa49fb396
This commit is contained in:
parent
0b225f9563
commit
e73c875bde
|
@ -15,7 +15,6 @@ from oslo_config import cfg
|
|||
from oslo_log import log
|
||||
|
||||
|
||||
from kolla_kubernetes.common import file_utils
|
||||
from kolla_kubernetes import service
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
@ -31,8 +30,7 @@ class Run(command.Command):
|
|||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
service.run_service(parsed_args.service,
|
||||
file_utils.get_services_dir(CONF.service_dir))
|
||||
service.run_service(parsed_args.service)
|
||||
|
||||
|
||||
class Kill(command.Command):
|
||||
|
@ -44,5 +42,4 @@ class Kill(command.Command):
|
|||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
service.kill_service(parsed_args.service,
|
||||
file_utils.get_services_dir(CONF.service_dir))
|
||||
service.kill_service(parsed_args.service)
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
|
||||
import os.path
|
||||
import shlex
|
||||
import sys
|
||||
|
||||
|
@ -21,7 +20,6 @@ from cliff import interactive
|
|||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from kolla_kubernetes.common import file_utils
|
||||
from kolla_kubernetes.common import utils
|
||||
|
||||
PROJECT = 'kolla_kubernetes'
|
||||
|
@ -37,10 +35,8 @@ log.set_defaults(
|
|||
|
||||
cli_opts = [
|
||||
cfg.StrOpt('service-dir',
|
||||
default=utils.env(
|
||||
'KM_SERVICE_DIR', default=os.path.join(
|
||||
file_utils.find_base_dir(), 'services')),
|
||||
help='Directory with services, (Env: KM_SERVICE_DIR)'),
|
||||
default=utils.env('K8S_SERVICE_DIR'),
|
||||
help='Directory with services, (Env: K8S_SERVICE_DIR)'),
|
||||
]
|
||||
CONF.register_cli_opts(cli_opts)
|
||||
|
||||
|
@ -74,9 +70,6 @@ class KollaKubernetesShell(app.App):
|
|||
def configure_logging(self):
|
||||
return
|
||||
|
||||
def initialize_app(self, argv):
|
||||
self.options.service_dir = CONF.service_dir
|
||||
|
||||
def print_help(self):
|
||||
outputs = []
|
||||
max_len = 0
|
||||
|
|
|
@ -10,11 +10,8 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import errno
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import sys
|
||||
|
||||
from oslo_utils import importutils
|
||||
|
||||
|
@ -24,29 +21,19 @@ from kolla_kubernetes import exception
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def find_os_type():
|
||||
return platform.linux_distribution()[0]
|
||||
|
||||
|
||||
def mkdir_p(path):
|
||||
try:
|
||||
os.makedirs(path)
|
||||
except OSError as exc: # Python >2.5
|
||||
if exc.errno == errno.EEXIST and os.path.isdir(path):
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def get_services_dir(base_dir):
|
||||
if os.path.exists(os.path.join(base_dir, 'services')):
|
||||
return os.path.join(base_dir, 'services')
|
||||
elif os.path.exists(os.path.join(get_src_dir(), 'services')):
|
||||
return os.path.join(get_src_dir(), 'services')
|
||||
def find_config_file(filename):
|
||||
filepath = os.path.join('/etc/kolla', filename)
|
||||
if os.access(filepath, os.R_OK):
|
||||
return filepath
|
||||
raise exception.KollaDirNotFoundException(
|
||||
'Unable to detect kolla-kubernetes directory'
|
||||
)
|
||||
|
||||
def get_service_config_files(service):
|
||||
directory = os.path.join('/etc/kolla/', service)
|
||||
for dirpath, _, filenames in os.walk(directory):
|
||||
for f in filenames:
|
||||
yield os.path.abspath(os.path.join(dirpath, f))
|
||||
|
||||
def get_src_dir():
|
||||
kolla_kubernetes = importutils.import_module('kolla_kubernetes')
|
||||
|
@ -60,50 +47,14 @@ def get_shared_directory():
|
|||
return '/usr/local/share/kolla'
|
||||
elif os.path.exists('/usr/share/kolla'):
|
||||
return '/usr/share/kolla'
|
||||
return None
|
||||
|
||||
|
||||
def find_base_dir():
|
||||
script_path = os.path.dirname(os.path.realpath(sys.argv[0]))
|
||||
base_script_path = os.path.basename(script_path)
|
||||
if base_script_path == 'kolla-kubernetes':
|
||||
return script_path
|
||||
if base_script_path == 'kolla_kubernetes':
|
||||
return os.path.join(script_path, '..')
|
||||
if base_script_path == 'cmd':
|
||||
return os.path.join(script_path, '..', '..')
|
||||
if base_script_path == 'subunit':
|
||||
return get_src_dir()
|
||||
if base_script_path == 'bin':
|
||||
if os.path.exists('/usr/local/share/kolla'):
|
||||
return '/usr/local/share/kolla'
|
||||
elif os.path.exists('/usr/share/kolla'):
|
||||
return '/usr/share/kolla'
|
||||
else:
|
||||
return get_src_dir()
|
||||
raise exception.KollaDirNotFoundException(
|
||||
'Unable to detect kolla-kubernetes directory'
|
||||
)
|
||||
|
||||
|
||||
def find_config_file(filename):
|
||||
filepath = os.path.join('/etc/kolla-kubernetes', filename)
|
||||
if os.access(filepath, os.R_OK):
|
||||
config_file = filepath
|
||||
def find_base_dir():
|
||||
if os.path.exists('/usr/local/share/kolla'):
|
||||
return '/usr/local/share/kolla'
|
||||
elif os.path.exists('/usr/share/kolla'):
|
||||
return '/usr/share/kolla'
|
||||
else:
|
||||
config_file = os.path.join(find_base_dir(),
|
||||
'etc', filename)
|
||||
return config_file
|
||||
|
||||
|
||||
POSSIBLE_PATHS = {'/usr/share/kolla-kubernetes',
|
||||
get_src_dir(),
|
||||
find_base_dir()}
|
||||
|
||||
|
||||
def find_file(filename):
|
||||
for path in POSSIBLE_PATHS:
|
||||
file_path = os.path.join(path, filename)
|
||||
if os.path.exists(file_path):
|
||||
return file_path
|
||||
raise exception.KollaNotFoundException(filename, entity='file')
|
||||
return get_src_dir()
|
||||
|
|
|
@ -10,9 +10,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import os
|
||||
from six.moves.urllib import parse
|
||||
|
||||
|
||||
def env(*args, **kwargs):
|
||||
|
@ -23,24 +21,31 @@ def env(*args, **kwargs):
|
|||
return kwargs.get('default', '')
|
||||
|
||||
|
||||
def dict_update(d, u):
|
||||
"""Recursively update 'd' with 'u' and return the result."""
|
||||
class JvarsDict(dict):
|
||||
"""Dict which can contain the 'global_vars' which are always preserved.
|
||||
|
||||
if not isinstance(u, collections.Mapping):
|
||||
return u
|
||||
They cannot be be overriden by any update nor single item setting.
|
||||
"""
|
||||
|
||||
for k, v in u.items():
|
||||
if isinstance(v, collections.Mapping):
|
||||
d[k] = dict_update(d.get(k, {}), v)
|
||||
else:
|
||||
d[k] = u[k]
|
||||
return d
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(JvarsDict, self).__init__(*args, **kwargs)
|
||||
self.global_vars = {}
|
||||
|
||||
def __setitem__(self, key, value, force=False):
|
||||
if not force and key in self.global_vars:
|
||||
return
|
||||
return super(JvarsDict, self).__setitem__(key, value)
|
||||
|
||||
def get_query_string(search_opts):
|
||||
if search_opts:
|
||||
qparams = sorted(search_opts.items(), key=lambda x: x[0])
|
||||
query_string = "?%s" % parse.urlencode(qparams, doseq=True)
|
||||
else:
|
||||
query_string = ""
|
||||
return query_string
|
||||
def set_force(self, key, value):
|
||||
"""Sets the variable even if it will override a global variable."""
|
||||
return self.__setitem__(key, value, force=True)
|
||||
|
||||
def update(self, other_dict, force=False):
|
||||
if not force:
|
||||
other_dict = {key: value for key, value in other_dict.items()
|
||||
if key not in self.global_vars}
|
||||
super(JvarsDict, self).update(other_dict)
|
||||
|
||||
def set_global_vars(self, global_vars):
|
||||
self.update(global_vars)
|
||||
self.global_vars = global_vars
|
||||
|
|
|
@ -11,7 +11,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import os.path
|
||||
import subprocess
|
||||
import tempfile
|
||||
|
@ -23,6 +22,7 @@ import yaml
|
|||
|
||||
from kolla_kubernetes.common import file_utils
|
||||
from kolla_kubernetes.common import jinja_utils
|
||||
from kolla_kubernetes.common import utils
|
||||
from kolla_kubernetes import service_definition
|
||||
|
||||
LOG = logging.getLogger()
|
||||
|
@ -31,72 +31,29 @@ CONF.import_group('kolla', 'kolla_kubernetes.config')
|
|||
CONF.import_group('kolla_kubernetes', 'kolla_kubernetes.config')
|
||||
|
||||
|
||||
def execute_if_enabled(f):
|
||||
"""Decorator for executing methods only if runner is enabled."""
|
||||
|
||||
@functools.wraps(f)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
if not self._enabled:
|
||||
return
|
||||
return f(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
def _create_working_directory():
|
||||
ts = time.time()
|
||||
ts = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H-%M-%S_')
|
||||
temp_dir = tempfile.mkdtemp(prefix='kolla-' + ts)
|
||||
working_dir = os.path.join(temp_dir, 'kubernetes')
|
||||
os.makedirs(working_dir)
|
||||
return working_dir
|
||||
|
||||
|
||||
class File(object):
|
||||
def __init__(self, conf, name, service_name):
|
||||
self._conf = conf
|
||||
self._name = name
|
||||
self._service_name = service_name
|
||||
|
||||
|
||||
class Command(object):
|
||||
def __init__(self, conf, name, service_name):
|
||||
self._conf = conf
|
||||
self._name = name
|
||||
self._service_name = service_name
|
||||
|
||||
|
||||
class JvarsDict(dict):
|
||||
"""Dict which can contain the 'global_vars' which are always preserved.
|
||||
|
||||
They cannot be be overriden by any update nor single item setting.
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(JvarsDict, self).__init__(*args, **kwargs)
|
||||
self.global_vars = {}
|
||||
|
||||
def __setitem__(self, key, value, force=False):
|
||||
if not force and key in self.global_vars:
|
||||
return
|
||||
return super(JvarsDict, self).__setitem__(key, value)
|
||||
|
||||
def set_force(self, key, value):
|
||||
"""Sets the variable even if it will override a global variable."""
|
||||
return self.__setitem__(key, value, force=True)
|
||||
|
||||
def update(self, other_dict, force=False):
|
||||
if not force:
|
||||
other_dict = {key: value for key, value in other_dict.items()
|
||||
if key not in self.global_vars}
|
||||
super(JvarsDict, self).update(other_dict)
|
||||
|
||||
def set_global_vars(self, global_vars):
|
||||
self.update(global_vars)
|
||||
self.global_vars = global_vars
|
||||
|
||||
|
||||
def _load_variables_from_file(service_dir, project_name):
|
||||
jvars = JvarsDict()
|
||||
def _load_variables_from_file(project_name):
|
||||
jvars = utils.JvarsDict()
|
||||
f = file_utils.find_config_file('globals.yml')
|
||||
if os.path.exists(f):
|
||||
with open(f, 'r') as gf:
|
||||
jvars.set_global_vars(yaml.load(gf))
|
||||
else:
|
||||
LOG.warning('Unable to load %s', f)
|
||||
f = file_utils.find_config_file('passwords.yml')
|
||||
if os.path.exists(f):
|
||||
with open(f, 'r') as gf:
|
||||
jvars.update(yaml.load(gf))
|
||||
else:
|
||||
LOG.warning('Unable to load %s', f)
|
||||
# Apply the basic variables that aren't defined in any config file.
|
||||
jvars.update({
|
||||
'deployment_id': CONF.kolla.deployment_id,
|
||||
|
@ -105,38 +62,30 @@ def _load_variables_from_file(service_dir, project_name):
|
|||
})
|
||||
|
||||
dir = file_utils.get_shared_directory()
|
||||
if dir and os.path.exists(os.path.join(dir, 'ansible/group_vars/all.yml')):
|
||||
all_yml_name = os.path.join(dir, 'ansible/group_vars/all.yml')
|
||||
all_yml = os.path.join(dir, 'ansible/group_vars/all.yml')
|
||||
if dir and os.path.exists(all_yml):
|
||||
all_yml_name = os.path.join(dir, all_yml)
|
||||
jinja_utils.yaml_jinja_render(all_yml_name, jvars)
|
||||
else:
|
||||
LOG.warning('Unable to load %s', all_yml)
|
||||
|
||||
proj_yml_name = os.path.join(dir, 'ansible/roles',
|
||||
project_name, 'defaults', 'main.yml')
|
||||
if dir and os.path.exists(proj_yml_name):
|
||||
jinja_utils.yaml_jinja_render(proj_yml_name, jvars)
|
||||
else:
|
||||
LOG.warning('Path missing %s' % proj_yml_name)
|
||||
LOG.warning('Unable to load %s', proj_yml_name)
|
||||
return jvars
|
||||
|
||||
|
||||
def _build_runner(service_name, service_dir, variables=None):
|
||||
ts = time.time()
|
||||
ts = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d_%H-%M-%S_')
|
||||
temp_dir = tempfile.mkdtemp(prefix='kolla-' + ts)
|
||||
working_dir = os.path.join(temp_dir, 'kubernetes')
|
||||
os.makedirs(working_dir)
|
||||
|
||||
for filename in service_definition.find_service_files(service_name,
|
||||
service_dir):
|
||||
def _build_runner(working_dir, service_name, variables=None):
|
||||
for filename in service_definition.find_service_files(service_name):
|
||||
proj_filename = filename.split('/')[-1].replace('.j2', '')
|
||||
proj_name = filename.split('/')[-2]
|
||||
LOG.debug(
|
||||
'proj_filename : %s proj_name: %s' % (proj_filename, proj_name))
|
||||
|
||||
# is this a snapshot or from original src?
|
||||
variables = _load_variables_from_file(service_dir, proj_name)
|
||||
|
||||
# 1. validate the definition with the given variables
|
||||
service_definition.validate(service_name, service_dir, variables)
|
||||
variables = _load_variables_from_file(proj_name)
|
||||
|
||||
content = yaml.load(
|
||||
jinja_utils.jinja_render(filename, variables))
|
||||
|
@ -145,17 +94,17 @@ def _build_runner(service_name, service_dir, variables=None):
|
|||
os.path.join(working_dir, proj_filename))
|
||||
f.write(yaml.dump(content, default_flow_style=False))
|
||||
|
||||
return working_dir
|
||||
|
||||
def run_service(service_name, variables=None):
|
||||
working_dir = _create_working_directory()
|
||||
_build_runner(working_dir, service_name, variables=variables)
|
||||
_deploy_instance(working_dir, service_name)
|
||||
|
||||
|
||||
def run_service(service_name, service_dir, variables=None):
|
||||
directory = _build_runner(service_name, service_dir, variables=variables)
|
||||
_deploy_instance(directory, service_name)
|
||||
|
||||
|
||||
def kill_service(service_name, service_dir, variables=None):
|
||||
directory = _build_runner(service_name, service_dir, variables=variables)
|
||||
_delete_instance(directory, service_name)
|
||||
def kill_service(service_name, variables=None):
|
||||
working_dir = _create_working_directory()
|
||||
_build_runner(working_dir, service_name, variables=variables)
|
||||
_delete_instance(working_dir, service_name)
|
||||
|
||||
|
||||
def _deploy_instance(directory, service_name):
|
||||
|
|
|
@ -12,15 +12,14 @@
|
|||
|
||||
|
||||
import os.path
|
||||
import socket
|
||||
import yaml
|
||||
|
||||
import jinja2
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from kolla_kubernetes.common import jinja_utils
|
||||
from kolla_kubernetes.common import file_utils
|
||||
from kolla_kubernetes import exception
|
||||
|
||||
CONF = cfg.CONF
|
||||
CNF_FIELDS = ('source', 'dest', 'owner', 'perm')
|
||||
CMD_FIELDS = ('run_once', 'dependencies', 'command', 'env',
|
||||
'delay', 'retries', 'files')
|
||||
|
@ -29,123 +28,38 @@ SCOPE_OPTS = ('global', 'local')
|
|||
LOG = log.getLogger()
|
||||
|
||||
|
||||
def find_service_file(service_name, service_dir):
|
||||
# let's be flexible with the input, to make life easy
|
||||
# for users.
|
||||
def get_services_directory():
|
||||
return (CONF.service_dir or
|
||||
os.path.join(file_utils.find_base_dir(), 'services'))
|
||||
|
||||
|
||||
def find_service_files(service_name):
|
||||
service_dir = get_services_directory()
|
||||
LOG.debug('Looking for services files in %s', service_dir)
|
||||
if not os.path.exists(service_dir):
|
||||
raise exception.KollaNotFoundException(service_dir,
|
||||
entity='service directory')
|
||||
|
||||
short_name = service_name.split('/')[-1].replace('_ansible_tasks', '-init')
|
||||
for root, dirs, names in os.walk(service_dir):
|
||||
for name in names:
|
||||
if short_name in name:
|
||||
return os.path.join(root, name)
|
||||
bootstrap_dir = os.path.join(service_dir, '../bootstrap/')
|
||||
LOG.debug('Looking for bootstrap files in %s', service_dir)
|
||||
if not os.path.exists(bootstrap_dir):
|
||||
raise exception.KollaNotFoundException(bootstrap_dir,
|
||||
entity='bootstrap directory')
|
||||
|
||||
raise exception.KollaNotFoundException(service_name,
|
||||
entity='service definition')
|
||||
short_name = service_name.split('/')[-1]
|
||||
|
||||
|
||||
def find_service_files(service_name, service_dir):
|
||||
# let's be flexible with the input, to make life easy
|
||||
# for users.
|
||||
if not os.path.exists(service_dir):
|
||||
raise exception.KollaNotFoundException(service_dir,
|
||||
entity='service directory')
|
||||
|
||||
short_name = service_name.split('/')[-1].replace('_ansible_tasks', '-init')
|
||||
files = []
|
||||
for root, dirs, names in os.walk(service_dir):
|
||||
for name in names:
|
||||
if short_name in name:
|
||||
files.append(os.path.join(root, name))
|
||||
|
||||
for root, dirs, names in os.walk(bootstrap_dir):
|
||||
for name in names:
|
||||
if short_name in name:
|
||||
files.append(os.path.join(root, name))
|
||||
|
||||
if not files:
|
||||
raise exception.KollaNotFoundException(service_name,
|
||||
raise exception.KollaNotFoundException(service_dir,
|
||||
entity='service definition')
|
||||
return files
|
||||
|
||||
|
||||
def inspect(service_name, service_dir):
|
||||
filename = find_service_file(service_name, service_dir)
|
||||
try:
|
||||
required_variables = set.union(
|
||||
jinja_utils.jinja_find_required_variables(filename))
|
||||
except jinja2.exceptions.TemplateNotFound:
|
||||
raise exception.KollaNotFoundException(filename,
|
||||
entity='service definition')
|
||||
return dict(required_variables=list(required_variables))
|
||||
|
||||
|
||||
def validate(service_name, service_dir, variables=None, deps=None):
|
||||
if variables is None:
|
||||
variables = {}
|
||||
if deps is None:
|
||||
deps = {}
|
||||
|
||||
filename = find_service_file(service_name, service_dir)
|
||||
try:
|
||||
cnf = yaml.load(jinja_utils.jinja_render(filename, variables))
|
||||
except jinja2.exceptions.TemplateNotFound:
|
||||
raise exception.KollaNotFoundException(filename,
|
||||
entity='service definition')
|
||||
|
||||
def get_commands():
|
||||
for cmd in cnf.get('commands', {}):
|
||||
yield cmd, cnf['commands'][cmd]
|
||||
if 'service' in cnf:
|
||||
yield 'daemon', cnf['service']['daemon']
|
||||
|
||||
LOG.debug('%s: file found at %s' % (cnf['metadata']['name'], filename))
|
||||
for cmd, cmd_info in get_commands():
|
||||
_validate_command(filename, cmd, cmd_info, deps,
|
||||
cnf['name'], service_dir)
|
||||
return deps
|
||||
|
||||
|
||||
def _validate_config(filename, conf, service_dir):
|
||||
for file in conf:
|
||||
for key in conf[file]:
|
||||
assert key in CNF_FIELDS, '%s: %s not in %s' % (filename,
|
||||
key, CNF_FIELDS)
|
||||
srcs = conf[file]['source']
|
||||
if isinstance(srcs, str):
|
||||
srcs = [srcs]
|
||||
for src in srcs:
|
||||
file_path = os.path.join(service_dir, '..', src)
|
||||
if not file_path.startswith('/etc'):
|
||||
assert os.path.exists(file_path), '%s missing' % file_path
|
||||
|
||||
|
||||
def _validate_command(filename, cmd, cmd_info, deps,
|
||||
service_name, service_dir):
|
||||
for key in cmd_info:
|
||||
assert key in CMD_FIELDS, '%s not in %s' % (key, CMD_FIELDS)
|
||||
|
||||
_, group, role = service_name.split('/')
|
||||
regs = ['%s/%s' % (role, cmd),
|
||||
'%s/%s/%s' % (socket.gethostname(), role, cmd)]
|
||||
reqs = cmd_info.get('dependencies', [])
|
||||
for reg in regs:
|
||||
if reg not in deps:
|
||||
deps[reg] = {'waiters': {}}
|
||||
deps[reg]['registered_by'] = cmd
|
||||
deps[reg]['name'] = cmd
|
||||
deps[reg]['run_by'] = filename
|
||||
for req in reqs:
|
||||
for key in req:
|
||||
assert key in DEP_FIELDS, '%s: %s not in %s' % (filename,
|
||||
key, DEP_FIELDS)
|
||||
scope = req.get('scope', 'global')
|
||||
assert scope in SCOPE_OPTS, '%s: %s not in %s' % (filename,
|
||||
scope, SCOPE_OPTS)
|
||||
req_path = req['path']
|
||||
if scope == 'local':
|
||||
req_path = os.path.join(socket.gethostname(), req_path)
|
||||
if req_path not in deps:
|
||||
deps[req_path] = {'waiters': {}}
|
||||
for reg in regs:
|
||||
deps[req_path]['waiters'][cmd] = reg
|
||||
if 'files' in cmd_info:
|
||||
_validate_config(filename, cmd_info['files'], service_dir)
|
||||
LOG.debug('%s: command "%s" validated' % (service_name, cmd))
|
||||
|
|
|
@ -18,4 +18,4 @@ class FindBaseDirTest(base.BaseTestCase):
|
|||
|
||||
def test_when_is_a_test(self):
|
||||
tdir = file_utils.find_base_dir()
|
||||
self.assertEqual(self.project_dir, tdir)
|
||||
self.assertIsNotNone(tdir)
|
||||
|
|
|
@ -1,41 +0,0 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from kolla_kubernetes.common import utils
|
||||
from kolla_kubernetes.tests import base
|
||||
|
||||
|
||||
class TestDictUpdate(base.BaseTestCase):
|
||||
|
||||
def test_flat_no_overwrites(self):
|
||||
a = {'a': 'foo', 'b': 'no'}
|
||||
b = {'c': 'foo', 'd': 'no'}
|
||||
expect = {'a': 'foo', 'c': 'foo', 'b': 'no', 'd': 'no'}
|
||||
self.assertEqual(expect, utils.dict_update(a, b))
|
||||
|
||||
def test_flat_with_overwrites(self):
|
||||
a = {'a': 'foo', 'b': 'no'}
|
||||
b = {'c': 'foo', 'b': 'yes'}
|
||||
expect = {'a': 'foo', 'c': 'foo', 'b': 'yes'}
|
||||
self.assertEqual(expect, utils.dict_update(a, b))
|
||||
|
||||
def test_nested_no_overwrites(self):
|
||||
a = {'a': 'foo', 'b': {'bb': 'no'}}
|
||||
b = {'c': 'foo'}
|
||||
expect = {'a': 'foo', 'c': 'foo', 'b': {'bb': 'no'}}
|
||||
self.assertEqual(expect, utils.dict_update(a, b))
|
||||
|
||||
def test_nested_with_overwrites(self):
|
||||
a = {'a': 'foo', 'b': {'bb': 'no'}}
|
||||
b = {'c': 'foo', 'b': {'bb': 'yes'}}
|
||||
expect = {'a': 'foo', 'c': 'foo', 'b': {'bb': 'yes'}}
|
||||
self.assertEqual(expect, utils.dict_update(a, b))
|
Loading…
Reference in New Issue