Convert to python3

Convert the percona cluster charm to python3.

Remove Trusty testing.

Change-Id: Ia5ae43f16caffb5c4356d3f5616e0383e23b5f50
This commit is contained in:
David Ames 2019-06-27 16:28:45 -07:00
parent 9fc4f23b80
commit 681cdf8e45
16 changed files with 231 additions and 120 deletions

View File

@ -1,4 +1,3 @@
- project:
templates:
- python-charm-jobs
- openstack-python35-jobs-nonvoting
- python35-charm-jobs

13
actions/__init__.py Normal file
View File

@ -0,0 +1,13 @@
# Copyright 2019 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1,4 +1,4 @@
#!/usr/bin/python
#!/usr/bin/env python3
import os
import sys
@ -6,7 +6,18 @@ import subprocess
import traceback
from time import gmtime, strftime
sys.path.append('hooks')
_path = os.path.dirname(os.path.realpath(__file__))
_hooks = os.path.abspath(os.path.join(_path, '../hooks'))
_root = os.path.abspath(os.path.join(_path, '..'))
def _add_path(path):
if path not in sys.path:
sys.path.insert(1, path)
_add_path(_hooks)
_add_path(_root)
from charmhelpers.core.hookenv import (
action_get,

13
hooks/__init__.py Normal file
View File

@ -0,0 +1,13 @@
# Copyright 2019 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -1 +0,0 @@
../charmhelpers

View File

@ -11,7 +11,7 @@ check_and_install() {
fi
}
PYTHON="python"
PYTHON="python3"
for dep in ${DEPS[@]}; do
check_and_install ${PYTHON} ${dep}

View File

@ -1,11 +1,23 @@
#!/usr/bin/python
#!/usr/bin/env python3
# TODO: Support changes to root and sstuser passwords
import collections
import sys
import json
import os
import socket
import subprocess
_path = os.path.dirname(os.path.realpath(__file__))
_root = os.path.abspath(os.path.join(_path, '..'))
def _add_path(path):
if path not in sys.path:
sys.path.insert(1, path)
_add_path(_root)
from charmhelpers.core.hookenv import (
Hooks, UnregisteredHookError,
is_relation_made,
@ -531,7 +543,8 @@ def config_changed():
# Empty hosts if cluster_series_upgrading
if not clustered_once() or cluster_series_upgrading:
hosts = []
log("Leader unit - bootstrap required=%s" % (not leader_bootstrapped),
log("Leader unit - bootstrap required={}"
.format(not leader_bootstrapped),
DEBUG)
render_config_restart_on_changed(hosts,
bootstrap=not leader_bootstrapped)
@ -598,7 +611,7 @@ def cluster_joined():
relation_settings['cluster-address'] = get_cluster_host_ip()
log("Setting cluster relation: '%s'" % (relation_settings),
log("Setting cluster relation: '{}'".format(relation_settings),
level=INFO)
relation_set(relation_settings=relation_settings)
@ -613,7 +626,7 @@ def cluster_changed():
# NOTE(jamespage): deprecated - leader-election
rdata = relation_get()
inc_list = []
for attr in rdata.iterkeys():
for attr in rdata.keys():
if attr not in ['hostname', 'private-address', 'cluster-address',
'public-address', 'ready']:
inc_list.append(attr)
@ -717,13 +730,13 @@ def get_db_host(client_hostname, interface='shared-db'):
if is_address_in_network(access_network, vip):
return vip
log("Unable to identify a VIP in the access-network '%s'" %
(access_network), level=WARNING)
log("Unable to identify a VIP in the access-network '{}'"
.format(access_network), level=WARNING)
else:
return get_address_in_network(access_network)
else:
log("Client address '%s' not in access-network '%s'" %
(client_ip, access_network), level=WARNING)
log("Client address '{}' not in access-network '{}'"
.format(client_ip, access_network), level=WARNING)
else:
try:
# NOTE(jamespage)
@ -755,10 +768,11 @@ def configure_db_for_hosts(hosts, database, username, db_helper):
"""Hosts may be a json-encoded list of hosts or a single hostname."""
try:
hosts = json.loads(hosts)
log("Multiple hostnames provided by relation: %s" % (', '.join(hosts)),
log("Multiple hostnames provided by relation: {}"
.format(', '.join(hosts)),
level=DEBUG)
except ValueError:
log("Single hostname provided by relation: %s" % (hosts),
log("Single hostname provided by relation: {}".format(hosts),
level=DEBUG)
hosts = [hosts]
@ -791,7 +805,7 @@ def shared_db_changed(relation_id=None, unit=None):
peer_store_and_set(relation_id=relation_id,
relation_settings={'access-network': access_network})
singleset = set(['database', 'username', 'hostname'])
singleset = {'database', 'username', 'hostname'}
if singleset.issubset(settings):
# Process a single database configuration
hostname = settings['hostname']
@ -805,8 +819,8 @@ def shared_db_changed(relation_id=None, unit=None):
# database access if remote unit has presented a
# hostname or ip address thats within the configured
# network cidr
log("Host '%s' not in access-network '%s' - ignoring" %
(normalized_address, access_network), level=INFO)
log("Host '{}' not in access-network '{}' - ignoring"
.format(normalized_address, access_network), level=INFO)
return
# NOTE: do this before querying access grants
@ -843,16 +857,16 @@ def shared_db_changed(relation_id=None, unit=None):
# }
# }
#
databases = {}
for k, v in settings.iteritems():
databases = collections.OrderedDict()
for k, v in settings.items():
db = k.split('_')[0]
x = '_'.join(k.split('_')[1:])
if db not in databases:
databases[db] = {}
databases[db] = collections.OrderedDict()
databases[db][x] = v
allowed_units = {}
return_data = {}
allowed_units = collections.OrderedDict()
return_data = collections.OrderedDict()
for db in databases:
if singleset.issubset(databases[db]):
database = databases[db]['database']
@ -876,10 +890,10 @@ def shared_db_changed(relation_id=None, unit=None):
a_units = db_helper.get_allowed_units(database, username,
relation_id=relation_id)
a_units = ' '.join(unit_sorted(a_units))
allowed_units_key = '%s_allowed_units' % (db)
allowed_units_key = '{}_allowed_units'.format(db)
allowed_units[allowed_units_key] = a_units
return_data['%s_password' % (db)] = password
return_data['{}_password'.format(db)] = password
return_data[allowed_units_key] = a_units
db_host = get_db_host(hostname)

View File

@ -1,4 +1,5 @@
''' General utilities for percona '''
import collections
import subprocess
from subprocess import Popen, PIPE
import socket
@ -101,8 +102,8 @@ class InconsistentUUIDError(Exception):
"""Raised when the leader and the unit have different UUIDs set"""
def __init__(self, leader_uuid, unit_uuid):
super(InconsistentUUIDError, self).__init__(
"Leader UUID ('%s') != Unit UUID ('%s')" % (leader_uuid,
unit_uuid))
"Leader UUID ('{}') != Unit UUID ('{}')"
.format(leader_uuid, unit_uuid))
class DesyncedException(Exception):
@ -147,14 +148,16 @@ def seeded():
def mark_seeded():
''' Mark service unit as seeded '''
with open(SEEDED_MARKER.format(data_dir=resolve_data_dir()),
'w') as seeded:
'w', encoding="utf-8") as seeded:
seeded.write('done')
def setup_percona_repo():
''' Configure service unit to use percona repositories '''
with open('/etc/apt/sources.list.d/percona.list', 'w') as sources:
sources.write(REPO.format(release=lsb_release()['DISTRIB_CODENAME']))
sources.write(
REPO.format(
release=lsb_release()['DISTRIB_CODENAME']).encode('utf-8'))
subprocess.check_call(['apt-key', 'add', KEY])
@ -167,7 +170,7 @@ def resolve_hostname_to_ip(hostname):
try:
import dns.resolver
except ImportError:
apt_install(filter_installed_packages(['python-dnspython']),
apt_install(filter_installed_packages(['python3-dnspython']),
fatal=True)
import dns.resolver
@ -211,7 +214,7 @@ def is_sufficient_peers():
if units < min_size:
log("Insufficient number of peer units to form cluster "
"(expected=%s, got=%s)" % (min_size, units), level=INFO)
"(expected={}, got={})".format(min_size, units), level=INFO)
return False
else:
log("Sufficient number of peer units to form cluster {}"
@ -238,7 +241,7 @@ def get_cluster_hosts():
@side_effect update_hosts_file called for IPv6 hostname resolution
@returns list of hosts
"""
hosts_map = {}
hosts_map = collections.OrderedDict()
local_cluster_address = get_cluster_host_ip()
@ -263,8 +266,8 @@ def get_cluster_hosts():
(unit, hostname, cluster_address), level=DEBUG)
continue
else:
log("(unit=%s) hostname '%s' provided by cluster relation "
"for addr %s" % (unit, hostname, cluster_address),
log("(unit=%s) hostname '{}' provided by cluster relation "
"for addr {}".format(unit, hostname, cluster_address),
level=DEBUG)
hosts_map[cluster_address] = hostname
@ -287,8 +290,7 @@ def get_cluster_hosts():
update_hosts_file(hosts_map)
# Return a sorted list to avoid uneccessary restarts
hosts.sort()
return hosts
return sorted(hosts)
SQL_SST_USER_SETUP = ("GRANT {permissions} ON *.* "
@ -339,10 +341,12 @@ def configure_mysql_root_password(password):
m_helper = get_db_helper()
root_pass = m_helper.get_mysql_root_password(password)
for package in packages:
dconf.stdin.write("%s %s/root_password password %s\n" %
(package, package, root_pass))
dconf.stdin.write("%s %s/root_password_again password %s\n" %
(package, package, root_pass))
dconf.stdin.write("{} {}/root_password password {}\n"
.format(package, package, root_pass)
.encode("utf-8"))
dconf.stdin.write("{} {}/root_password_again password {}\n"
.format(package, package, root_pass)
.encode("utf-8"))
dconf.communicate()
dconf.wait()
@ -359,21 +363,21 @@ def relation_clear(r_id=None):
**settings)
def update_hosts_file(map):
def update_hosts_file(_map):
"""Percona does not currently like ipv6 addresses so we need to use dns
names instead. In order to make them resolvable we ensure they are in
/etc/hosts.
See https://bugs.launchpad.net/galera/+bug/1130595 for some more info.
"""
with open(HOSTS_FILE, 'r') as hosts:
with open(HOSTS_FILE, 'r', encoding="utf-8") as hosts:
lines = hosts.readlines()
log("Updating %s with: %s (current: %s)" % (HOSTS_FILE, map, lines),
log("Updating {} with: {} (current: {})".format(HOSTS_FILE, _map, lines),
level=DEBUG)
newlines = []
for ip, hostname in map.items():
for ip, hostname in list(_map.items()):
if not ip or not hostname:
continue
@ -383,16 +387,17 @@ def update_hosts_file(map):
if len(line) < 2 or not (_line[0] == ip or hostname in _line[1:]):
keepers.append(line)
else:
log("Marking line '%s' for update or removal" % (line.strip()),
log("Marking line '{}' for update or removal"
.format(line.strip()),
level=DEBUG)
lines = keepers
newlines.append("%s %s\n" % (ip, hostname))
newlines.append("{} {}\n".format(ip, hostname))
lines += newlines
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
with open(tmpfile.name, 'w') as hosts:
with open(tmpfile.name, 'w', encoding="utf-8") as hosts:
for line in lines:
hosts.write(line)
@ -408,15 +413,10 @@ def assert_charm_supports_ipv6():
"versions less than Trusty 14.04")
def _cmp(x, y):
"""Shim for py2 py3 compat."""
return (x > y) - (x < y)
def unit_sorted(units):
"""Return a sorted list of unit names."""
return sorted(
units, lambda a, b: _cmp(int(a.split('/')[-1]), int(b.split('/')[-1])))
units, key=lambda a: int(a.split('/')[-1]))
def install_mysql_ocf():
@ -431,7 +431,8 @@ def install_mysql_ocf():
log('Installing %s' % dest_file, level='INFO')
shutil.copy(src_file, dest_file)
else:
log("'%s' already exists, skipping" % dest_file, level='INFO')
log("'{}' already exists, skipping"
.format(dest_file), level='INFO')
def get_wsrep_value(key):
@ -445,7 +446,7 @@ def get_wsrep_value(key):
cursor = m_helper.connection.cursor()
ret = None
try:
cursor.execute("show status like '%s'" % (key))
cursor.execute("show status like '{}'".format(key))
ret = cursor.fetchall()
except:
log("Failed to get '%s'", ERROR)
@ -630,14 +631,14 @@ def update_bootstrap_uuid():
raise LeaderNoBootstrapUUIDError()
wsrep_ready = get_wsrep_value('wsrep_ready') or ""
log("wsrep_ready: '%s'" % wsrep_ready, DEBUG)
log("wsrep_ready: '{}'".format(wsrep_ready), DEBUG)
if wsrep_ready.lower() in ['on', 'ready']:
cluster_state_uuid = get_wsrep_value('wsrep_cluster_state_uuid')
else:
cluster_state_uuid = None
if not cluster_state_uuid:
log("UUID is empty: '%s'" % cluster_state_uuid, level=DEBUG)
log("UUID is empty: '{}'".format(cluster_state_uuid), level=DEBUG)
return False
elif lead_cluster_state_uuid != cluster_state_uuid:
# this may mean 2 things:
@ -851,8 +852,8 @@ def create_binlogs_directory():
binlogs_directory = os.path.dirname(config('binlogs-path'))
data_dir = resolve_data_dir() + '/'
if binlogs_directory.startswith(data_dir):
raise Exception("Configured binlogs directory (%s) must not be inside "
"mysql data dir" % (binlogs_directory))
raise Exception("Configured binlogs directory ({}) must not be inside "
"mysql data dir".format(binlogs_directory))
if not os.path.isdir(binlogs_directory):
mkdir(binlogs_directory, 'mysql', 'mysql', 0o750)
@ -917,7 +918,7 @@ def cluster_ready():
if int(min_cluster_size) == 1:
return seeded()
peers = {}
peers = collections.OrderedDict()
for relation_id in relation_ids('cluster'):
units = related_units(relation_id) or []
if local_unit() not in units:
@ -1025,7 +1026,8 @@ def update_root_password():
m_helper.connect(user='root', password=new_root_passwd)
m_helper.execute('select 1;')
except OperationalError as ex:
log("Error connecting using new password: %s" % str(ex), level=DEBUG)
log("Error connecting using new password: {}"
.format(str(ex)), level=DEBUG)
log(('Cannot connect using new password, not updating password in '
'the relation'), level=WARNING)
return
@ -1200,7 +1202,7 @@ def get_databases_to_replicate():
entries = config('databases-to-replicate').strip().split(';')
try:
for entry in entries:
databases_and_tables = {}
databases_and_tables = collections.OrderedDict()
entry_split = entry.split(':')
databases_and_tables['database'] = (
check_invalid_chars(entry_split[0]))
@ -1215,7 +1217,7 @@ def get_databases_to_replicate():
except InvalidCharacters as e:
raise InvalidDatabasesToReplicate(
"The configuration setting databases-to-replicate is malformed. {}"
.format(e.message))
.format(e))
return databases_to_replicate
@ -1244,9 +1246,9 @@ def check_invalid_chars(data, bad_chars_re="[\^\\/?%*:|\"'<>., ]"):
for data_string in data_strings:
m = re.search(bad_chars_re, data_string)
if m:
raise(InvalidCharacters(
raise InvalidCharacters(
"Invalid character '{}' in '{}'"
.format(m.group(0), data_string)))
.format(m.group(0), data_string))
return data

View File

@ -12,7 +12,6 @@ series:
- bionic
- cosmic
- disco
- trusty
extra-bindings:
access:
provides:

View File

@ -1,8 +0,0 @@
#!/usr/bin/env python
import basic_deployment
if __name__ == "__main__":
t = basic_deployment.BasicDeployment(units=3, series='trusty')
t.run()

41
tox.ini
View File

@ -2,7 +2,7 @@
# This file is managed centrally by release-tools and should not be modified
# within individual charm repos.
[tox]
envlist = pep8,py27
envlist = pep8,py3
skipsdist = True
[testenv]
@ -15,11 +15,7 @@ install_command =
commands = stestr run {posargs}
whitelist_externals = juju
passenv = HOME TERM AMULET_* CS_API_*
[testenv:py27]
basepython = python2.7
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
deps = -r{toxinidir}/test-requirements.txt
[testenv:py35]
basepython = python3.5
@ -31,6 +27,11 @@ basepython = python3.6
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:py3]
basepython = python3
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
[testenv:pep8]
basepython = python3
deps = -r{toxinidir}/requirements.txt
@ -38,6 +39,34 @@ deps = -r{toxinidir}/requirements.txt
commands = flake8 {posargs} hooks unit_tests tests actions lib
charm-proof
[testenv:cover]
# Technique based heavily upon
# https://github.com/openstack/nova/blob/master/tox.ini
basepython = python3
deps = -r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
setenv =
{[testenv]setenv}
PYTHON=coverage run
commands =
coverage erase
stestr run {posargs}
coverage combine
coverage html -d cover
coverage xml -o cover/coverage.xml
coverage report
[coverage:run]
branch = True
concurrency = multiprocessing
parallel = True
source =
.
omit =
.tox/*
*/charmhelpers/*
unit_tests/*
[testenv:venv]
basepython = python3
commands = {posargs}

View File

@ -1,4 +1,40 @@
# Copyright 2016 Canonical Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import os
import sys
sys.path.append('hooks')
sys.path.append('actions')
_path = os.path.dirname(os.path.realpath(__file__))
_actions = os.path.abspath(os.path.join(_path, '../actions'))
_hooks = os.path.abspath(os.path.join(_path, '../hooks'))
_charmhelpers = os.path.abspath(os.path.join(_path, '../charmhelpers'))
_unit_tests = os.path.abspath(os.path.join(_path, '../unit_tests'))
_scripts = os.path.abspath(os.path.join(_path, '../scripts'))
def _add_path(path):
if path not in sys.path:
sys.path.insert(1, path)
_add_path(_actions)
_add_path(_hooks)
_add_path(_charmhelpers)
_add_path(_unit_tests)
# python-apt is not installed as part of test-requirements but is imported by
# some charmhelpers modules so create a fake import.
sys.modules['apt'] = mock.Mock()
sys.modules['MySQLdb'] = mock.Mock()

View File

@ -1,15 +1,8 @@
import sys
import mock
from mock import patch
from test_utils import CharmTestCase
# python-apt is not installed as part of test-requirements but is imported by
# some charmhelpers modules so create a fake import.
sys.modules['apt'] = mock.Mock()
sys.modules['MySQLdb'] = mock.Mock()
# we have to patch out harden decorator because hooks/percona_hooks.py gets
# imported via actions.py and will freak out if it trys to run in the context
# of a test.
@ -18,7 +11,7 @@ with patch('percona_utils.register_configs') as configs, \
mock_dec.side_effect = (lambda *dargs, **dkwargs: lambda f:
lambda *args, **kwargs: f(*args, **kwargs))
configs.return_value = 'test-config'
import actions
from actions import actions
class PauseTestCase(CharmTestCase):
@ -39,7 +32,7 @@ class ResumeTestCase(CharmTestCase):
actions, ["resume_unit_helper"])
def test_pauses_services(self):
with patch('actions.config_changed') as config_changed:
with patch('actions.actions.config_changed') as config_changed:
actions.resume([])
self.resume_unit_helper.assert_called_once_with('test-config')
config_changed.assert_called_once_with()

View File

@ -637,7 +637,7 @@ class TestUpgradeCharm(CharmTestCase):
]
def print_log(self, msg, level=None):
print('juju-log: %s: %s' % (level, msg))
print("juju-log: {}: {}".format(level, msg))
def setUp(self):
CharmTestCase.setUp(self, hooks, self.TO_PATCH)
@ -709,7 +709,7 @@ class TestConfigs(CharmTestCase):
'of %s. ' % f)
raise Exception
return yaml.safe_load(open(config).read())['options']
return yaml.safe_load(open(config, encoding="UTF-8").read())['options']
def _get_default_config(self):
'''Load default charm config from config.yaml return as a dict.
@ -717,7 +717,7 @@ class TestConfigs(CharmTestCase):
'''
default_config = {}
config = self._load_config()
for k, v in config.iteritems():
for k, v in config.items():
if 'default' in v:
default_config[k] = v['default']
else:

View File

@ -1,10 +1,9 @@
import collections
import os
import sys
import tempfile
import mock
sys.modules['MySQLdb'] = mock.Mock()
import percona_utils
from test_utils import CharmTestCase
@ -27,61 +26,73 @@ class UtilsTests(CharmTestCase):
@mock.patch("percona_utils.log")
def test_update_empty_hosts_file(self, mock_log):
map = {'1.2.3.4': 'my-host'}
_map = {'1.2.3.4': 'my-host'}
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
percona_utils.HOSTS_FILE = tmpfile.name
percona_utils.HOSTS_FILE = tmpfile.name
percona_utils.update_hosts_file(map)
percona_utils.update_hosts_file(_map)
with open(tmpfile.name, 'r') as fd:
with open(tmpfile.name, 'r', encoding="UTF-8") as fd:
lines = fd.readlines()
os.remove(tmpfile.name)
self.assertEqual(len(lines), 1)
self.assertEqual(lines[0], "%s %s\n" % (map.items()[0]))
self.assertEqual(lines[0],
"{} {}\n".format(list(_map.keys())[0],
list(_map.values())[0]))
@mock.patch("percona_utils.log")
def test_update_hosts_file_w_dup(self, mock_log):
map = {'1.2.3.4': 'my-host'}
_map = {'1.2.3.4': 'my-host'}
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
percona_utils.HOSTS_FILE = tmpfile.name
with open(tmpfile.name, 'w') as fd:
fd.write("%s %s\n" % (map.items()[0]))
with open(tmpfile.name, 'w', encoding="UTF-8") as fd:
fd.write("{} {}\n".format(list(_map.keys())[0],
list(_map.values())[0]))
percona_utils.update_hosts_file(map)
percona_utils.update_hosts_file(_map)
with open(tmpfile.name, 'r') as fd:
with open(tmpfile.name, 'r', encoding="UTF-8") as fd:
lines = fd.readlines()
os.remove(tmpfile.name)
self.assertEqual(len(lines), 1)
self.assertEqual(lines[0], "%s %s\n" % (map.items()[0]))
self.assertEqual(lines[0],
"{} {}\n".format(list(_map.keys())[0],
list(_map.values())[0]))
@mock.patch("percona_utils.log")
def test_update_hosts_file_entry(self, mock_log):
altmap = {'1.1.1.1': 'alt-host'}
map = {'1.1.1.1': 'hostA',
'2.2.2.2': 'hostB',
'3.3.3.3': 'hostC',
'4.4.4.4': 'hostD'}
_map = collections.OrderedDict()
_map['1.1.1.1'] = 'hostA'
_map['2.2.2.2'] = 'hostB'
_map['3.3.3.3'] = 'hostC'
_map['4.4.4.4'] = 'hostD'
with tempfile.NamedTemporaryFile(delete=False) as tmpfile:
percona_utils.HOSTS_FILE = tmpfile.name
with open(tmpfile.name, 'w') as fd:
with open(tmpfile.name, 'w', encoding="UTF-8") as fd:
fd.write("#somedata\n")
fd.write("%s %s\n" % (altmap.items()[0]))
fd.write("{} {}\n".format(list(altmap.keys())[0],
list(altmap.values())[0]))
percona_utils.update_hosts_file(map)
percona_utils.update_hosts_file(_map)
with open(percona_utils.HOSTS_FILE, 'r') as fd:
with open(percona_utils.HOSTS_FILE, 'r', encoding="UTF-8") as fd:
lines = fd.readlines()
os.remove(tmpfile.name)
self.assertEqual(len(lines), 5)
print("XXX", lines)
self.assertEqual(lines[0], "#somedata\n")
self.assertEqual(lines[1], "%s %s\n" % (map.items()[0]))
self.assertEqual(lines[4], "%s %s\n" % (map.items()[3]))
self.assertEqual(lines[1],
"{} {}\n".format(list(_map.keys())[0],
list(_map.values())[0]))
self.assertEqual(lines[4],
"{} {}\n".format(list(_map.keys())[3],
list(_map.values())[3]))
@mock.patch("percona_utils.get_cluster_host_ip")
@mock.patch("percona_utils.log")
@ -885,7 +896,7 @@ class TestUpdateBootstrapUUID(CharmTestCase):
self.log.side_effect = self.juju_log
def juju_log(self, msg, level=None):
print('juju-log %s: %s' % (level, msg))
print("juju-log {}: {}".format(level, msg))
def test_no_bootstrap_uuid(self):
self.leader_get.return_value = None

View File

@ -27,7 +27,7 @@ def load_config():
'of %s. ' % __file__)
raise Exception
return yaml.safe_load(open(config).read())['options']
return yaml.safe_load(open(config, encoding="UTF-8").read())['options']
def get_default_config():
@ -37,7 +37,7 @@ def get_default_config():
'''
default_config = {}
config = load_config()
for k, v in config.iteritems():
for k, v in config.items():
if 'default' in v:
default_config[k] = v['default']
else: