Add pause/resume actions and sync charm-helpers

Adds pause and resume unit to the charm such that the
charm stays paused during maintenance operations.

Change-Id: Ia422aa4077fbdd8cdb0353306c14f5633e2affb8
Partial-Bug: 1558642
This commit is contained in:
Liam Young 2016-03-18 12:22:03 +00:00
parent 6e8dcf8af8
commit bb9ef8b039
15 changed files with 1115 additions and 164 deletions

View File

@ -2,3 +2,7 @@ git-reinstall:
description: Reinstall nova-cloud-controller from the openstack-origin-git repositories.
openstack-upgrade:
description: Perform openstack upgrades. Config option action-managed-upgrade must be set to True.
pause:
description: Pause the nova-cloud-controller unit. This action will stop related services.
resume:
descrpition: Resume the nova-cloud-controller unit. This action will start related services.

48
actions/actions.py Executable file
View File

@ -0,0 +1,48 @@
#!/usr/bin/python
import os
import sys
sys.path.append('hooks/')
from charmhelpers.core.hookenv import action_fail
from nova_cc_utils import (
pause_unit_helper,
resume_unit_helper,
register_configs,
)
def pause(args):
"""Pause the Ceilometer services.
@raises Exception should the service fail to stop.
"""
pause_unit_helper(register_configs())
def resume(args):
"""Resume the Ceilometer services.
@raises Exception should the service fail to start."""
resume_unit_helper(register_configs())
# A dictionary of all the defined actions to callables (which take
# parsed arguments).
ACTIONS = {"pause": pause, "resume": resume}
def main(args):
action_name = os.path.basename(args[0])
try:
action = ACTIONS[action_name]
except KeyError:
return "Action %s undefined" % action_name
else:
try:
action(args)
except Exception as e:
action_fail(str(e))
if __name__ == "__main__":
sys.exit(main(sys.argv))

1
actions/pause Symbolic link
View File

@ -0,0 +1 @@
actions.py

1
actions/resume Symbolic link
View File

@ -0,0 +1 @@
actions.py

View File

@ -27,7 +27,11 @@ import cinderclient.v1.client as cinder_client
import glanceclient.v1.client as glance_client
import heatclient.v1.client as heat_client
import keystoneclient.v2_0 as keystone_client
import novaclient.v1_1.client as nova_client
from keystoneclient.auth.identity import v3 as keystone_id_v3
from keystoneclient import session as keystone_session
from keystoneclient.v3 import client as keystone_client_v3
import novaclient.client as nova_client
import pika
import swiftclient
@ -38,6 +42,8 @@ from charmhelpers.contrib.amulet.utils import (
DEBUG = logging.DEBUG
ERROR = logging.ERROR
NOVA_CLIENT_VERSION = "2"
class OpenStackAmuletUtils(AmuletUtils):
"""OpenStack amulet utilities.
@ -139,7 +145,7 @@ class OpenStackAmuletUtils(AmuletUtils):
return "role {} does not exist".format(e['name'])
return ret
def validate_user_data(self, expected, actual):
def validate_user_data(self, expected, actual, api_version=None):
"""Validate user data.
Validate a list of actual user data vs a list of expected user
@ -150,10 +156,15 @@ class OpenStackAmuletUtils(AmuletUtils):
for e in expected:
found = False
for act in actual:
a = {'enabled': act.enabled, 'name': act.name,
'email': act.email, 'tenantId': act.tenantId,
'id': act.id}
if e['name'] == a['name']:
if e['name'] == act.name:
a = {'enabled': act.enabled, 'name': act.name,
'email': act.email, 'id': act.id}
if api_version == 3:
a['default_project_id'] = getattr(act,
'default_project_id',
'none')
else:
a['tenantId'] = act.tenantId
found = True
ret = self._validate_dict_data(e, a)
if ret:
@ -188,15 +199,30 @@ class OpenStackAmuletUtils(AmuletUtils):
return cinder_client.Client(username, password, tenant, ept)
def authenticate_keystone_admin(self, keystone_sentry, user, password,
tenant):
tenant=None, api_version=None,
keystone_ip=None):
"""Authenticates admin user with the keystone admin endpoint."""
self.log.debug('Authenticating keystone admin...')
unit = keystone_sentry
service_ip = unit.relation('shared-db',
'mysql:shared-db')['private-address']
ep = "http://{}:35357/v2.0".format(service_ip.strip().decode('utf-8'))
return keystone_client.Client(username=user, password=password,
tenant_name=tenant, auth_url=ep)
if not keystone_ip:
keystone_ip = unit.relation('shared-db',
'mysql:shared-db')['private-address']
base_ep = "http://{}:35357".format(keystone_ip.strip().decode('utf-8'))
if not api_version or api_version == 2:
ep = base_ep + "/v2.0"
return keystone_client.Client(username=user, password=password,
tenant_name=tenant, auth_url=ep)
else:
ep = base_ep + "/v3"
auth = keystone_id_v3.Password(
user_domain_name='admin_domain',
username=user,
password=password,
domain_name='admin_domain',
auth_url=ep,
)
sess = keystone_session.Session(auth=auth)
return keystone_client_v3.Client(session=sess)
def authenticate_keystone_user(self, keystone, user, password, tenant):
"""Authenticates a regular user with the keystone public endpoint."""
@ -225,7 +251,8 @@ class OpenStackAmuletUtils(AmuletUtils):
self.log.debug('Authenticating nova user ({})...'.format(user))
ep = keystone.service_catalog.url_for(service_type='identity',
endpoint_type='publicURL')
return nova_client.Client(username=user, api_key=password,
return nova_client.Client(NOVA_CLIENT_VERSION,
username=user, api_key=password,
project_id=tenant, auth_url=ep)
def authenticate_swift_user(self, keystone, user, password, tenant):

View File

@ -24,6 +24,7 @@ import os
import sys
import re
import itertools
import functools
import six
import tempfile
@ -69,7 +70,15 @@ from charmhelpers.contrib.python.packages import (
pip_install,
)
from charmhelpers.core.host import lsb_release, mounts, umount, service_running
from charmhelpers.core.host import (
lsb_release,
mounts,
umount,
service_running,
service_pause,
service_resume,
restart_on_change_helper,
)
from charmhelpers.fetch import apt_install, apt_cache, install_remote
from charmhelpers.contrib.storage.linux.utils import is_block_device, zap_disk
from charmhelpers.contrib.storage.linux.loopback import ensure_loopback_device
@ -128,7 +137,7 @@ SWIFT_CODENAMES = OrderedDict([
('liberty',
['2.3.0', '2.4.0', '2.5.0']),
('mitaka',
['2.5.0']),
['2.5.0', '2.6.0']),
])
# >= Liberty version->codename mapping
@ -763,7 +772,8 @@ def _git_clone_and_install_single(repo, branch, depth, parent_dir, http_proxy,
os.mkdir(parent_dir)
juju_log('Cloning git repo: {}, branch: {}'.format(repo, branch))
repo_dir = install_remote(repo, dest=parent_dir, branch=branch, depth=depth)
repo_dir = install_remote(
repo, dest=parent_dir, branch=branch, depth=depth)
venv = os.path.join(parent_dir, 'venv')
@ -862,66 +872,155 @@ def os_workload_status(configs, required_interfaces, charm_func=None):
return wrap
def set_os_workload_status(configs, required_interfaces, charm_func=None, services=None, ports=None):
"""
Set workload status based on complete contexts.
status-set missing or incomplete contexts
and juju-log details of missing required data.
charm_func is a charm specific function to run checking
for charm specific requirements such as a VIP setting.
def set_os_workload_status(configs, required_interfaces, charm_func=None,
services=None, ports=None):
"""Set the state of the workload status for the charm.
This function also checks for whether the services defined are ACTUALLY
running and that the ports they advertise are open and being listened to.
This calls _determine_os_workload_status() to get the new state, message
and sets the status using status_set()
@param services - OPTIONAL: a [{'service': <string>, 'ports': [<int>]]
The ports are optional.
If services is a [<string>] then ports are ignored.
@param ports - OPTIONAL: an [<int>] representing ports that shoudl be
open.
@returns None
@param configs: a templating.OSConfigRenderer() object
@param required_interfaces: {generic: [specific, specific2, ...]}
@param charm_func: a callable function that returns state, message. The
signature is charm_func(configs) -> (state, message)
@param services: list of strings OR dictionary specifying services/ports
@param ports: OPTIONAL list of port numbers.
@returns state, message: the new workload status, user message
"""
incomplete_rel_data = incomplete_relation_data(configs, required_interfaces)
state = 'active'
missing_relations = []
incomplete_relations = []
state, message = _determine_os_workload_status(
configs, required_interfaces, charm_func, services, ports)
status_set(state, message)
def _determine_os_workload_status(
configs, required_interfaces, charm_func=None,
services=None, ports=None):
"""Determine the state of the workload status for the charm.
This function returns the new workload status for the charm based
on the state of the interfaces, the paused state and whether the
services are actually running and any specified ports are open.
This checks:
1. if the unit should be paused, that it is actually paused. If so the
state is 'maintenance' + message, else 'broken'.
2. that the interfaces/relations are complete. If they are not then
it sets the state to either 'broken' or 'waiting' and an appropriate
message.
3. If all the relation data is set, then it checks that the actual
services really are running. If not it sets the state to 'broken'.
If everything is okay then the state returns 'active'.
@param configs: a templating.OSConfigRenderer() object
@param required_interfaces: {generic: [specific, specific2, ...]}
@param charm_func: a callable function that returns state, message. The
signature is charm_func(configs) -> (state, message)
@param services: list of strings OR dictionary specifying services/ports
@param ports: OPTIONAL list of port numbers.
@returns state, message: the new workload status, user message
"""
state, message = _ows_check_if_paused(services, ports)
if state is None:
state, message = _ows_check_generic_interfaces(
configs, required_interfaces)
if state != 'maintenance' and charm_func:
# _ows_check_charm_func() may modify the state, message
state, message = _ows_check_charm_func(
state, message, lambda: charm_func(configs))
if state is None:
state, message = _ows_check_services_running(services, ports)
if state is None:
state = 'active'
message = "Unit is ready"
juju_log(message, 'INFO')
return state, message
def _ows_check_if_paused(services=None, ports=None):
"""Check if the unit is supposed to be paused, and if so check that the
services/ports (if passed) are actually stopped/not being listened to.
if the unit isn't supposed to be paused, just return None, None
@param services: OPTIONAL services spec or list of service names.
@param ports: OPTIONAL list of port numbers.
@returns state, message or None, None
"""
if is_unit_paused_set():
state, message = check_actually_paused(services=services,
ports=ports)
if state is None:
# we're paused okay, so set maintenance and return
state = "maintenance"
message = "Paused. Use 'resume' action to resume normal service."
return state, message
return None, None
def _ows_check_generic_interfaces(configs, required_interfaces):
"""Check the complete contexts to determine the workload status.
- Checks for missing or incomplete contexts
- juju log details of missing required data.
- determines the correct workload status
- creates an appropriate message for status_set(...)
if there are no problems then the function returns None, None
@param configs: a templating.OSConfigRenderer() object
@params required_interfaces: {generic_interface: [specific_interface], }
@returns state, message or None, None
"""
incomplete_rel_data = incomplete_relation_data(configs,
required_interfaces)
state = None
message = None
charm_state = None
charm_message = None
missing_relations = set()
incomplete_relations = set()
for generic_interface in incomplete_rel_data.keys():
for generic_interface, relations_states in incomplete_rel_data.items():
related_interface = None
missing_data = {}
# Related or not?
for interface in incomplete_rel_data[generic_interface]:
if incomplete_rel_data[generic_interface][interface].get('related'):
for interface, relation_state in relations_states.items():
if relation_state.get('related'):
related_interface = interface
missing_data = incomplete_rel_data[generic_interface][interface].get('missing_data')
# No relation ID for the generic_interface
missing_data = relation_state.get('missing_data')
break
# No relation ID for the generic_interface?
if not related_interface:
juju_log("{} relation is missing and must be related for "
"functionality. ".format(generic_interface), 'WARN')
state = 'blocked'
if generic_interface not in missing_relations:
missing_relations.append(generic_interface)
missing_relations.add(generic_interface)
else:
# Relation ID exists but no related unit
# Relation ID eists but no related unit
if not missing_data:
# Edge case relation ID exists but departing
if ('departed' in hook_name() or 'broken' in hook_name()) \
and related_interface in hook_name():
# Edge case - relation ID exists but departings
_hook_name = hook_name()
if (('departed' in _hook_name or 'broken' in _hook_name) and
related_interface in _hook_name):
state = 'blocked'
if generic_interface not in missing_relations:
missing_relations.append(generic_interface)
missing_relations.add(generic_interface)
juju_log("{} relation's interface, {}, "
"relationship is departed or broken "
"and is required for functionality."
"".format(generic_interface, related_interface), "WARN")
"".format(generic_interface, related_interface),
"WARN")
# Normal case relation ID exists but no related unit
# (joining)
else:
juju_log("{} relations's interface, {}, is related but has "
"no units in the relation."
"".format(generic_interface, related_interface), "INFO")
juju_log("{} relations's interface, {}, is related but has"
" no units in the relation."
"".format(generic_interface, related_interface),
"INFO")
# Related unit exists and data missing on the relation
else:
juju_log("{} relation's interface, {}, is related awaiting "
@ -930,9 +1029,8 @@ def set_os_workload_status(configs, required_interfaces, charm_func=None, servic
", ".join(missing_data)), "INFO")
if state != 'blocked':
state = 'waiting'
if generic_interface not in incomplete_relations \
and generic_interface not in missing_relations:
incomplete_relations.append(generic_interface)
if generic_interface not in missing_relations:
incomplete_relations.add(generic_interface)
if missing_relations:
message = "Missing relations: {}".format(", ".join(missing_relations))
@ -945,9 +1043,22 @@ def set_os_workload_status(configs, required_interfaces, charm_func=None, servic
"".format(", ".join(incomplete_relations))
state = 'waiting'
# Run charm specific checks
if charm_func:
charm_state, charm_message = charm_func(configs)
return state, message
def _ows_check_charm_func(state, message, charm_func_with_configs):
"""Run a custom check function for the charm to see if it wants to
change the state. This is only run if not in 'maintenance' and
tests to see if the new state is more important that the previous
one determined by the interfaces/relations check.
@param state: the previously determined state so far.
@param message: the user orientated message so far.
@param charm_func: a callable function that returns state, message
@returns state, message strings.
"""
if charm_func_with_configs:
charm_state, charm_message = charm_func_with_configs()
if charm_state != 'active' and charm_state != 'unknown':
state = workload_state_compare(state, charm_state)
if message:
@ -956,72 +1067,151 @@ def set_os_workload_status(configs, required_interfaces, charm_func=None, servic
message = "{}, {}".format(message, charm_message)
else:
message = charm_message
return state, message
# If the charm thinks the unit is active, check that the actual services
# really are active.
if services is not None and state == 'active':
# if we're passed the dict() then just grab the values as a list.
if isinstance(services, dict):
services = services.values()
# either extract the list of services from the dictionary, or if
# it is a simple string, use that. i.e. works with mixed lists.
_s = []
for s in services:
if isinstance(s, dict) and 'service' in s:
_s.append(s['service'])
if isinstance(s, str):
_s.append(s)
services_running = [service_running(s) for s in _s]
if not all(services_running):
not_running = [s for s, running in zip(_s, services_running)
if not running]
message = ("Services not running that should be: {}"
.format(", ".join(not_running)))
def _ows_check_services_running(services, ports):
"""Check that the services that should be running are actually running
and that any ports specified are being listened to.
@param services: list of strings OR dictionary specifying services/ports
@param ports: list of ports
@returns state, message: strings or None, None
"""
messages = []
state = None
if services is not None:
services = _extract_services_list_helper(services)
services_running, running = _check_running_services(services)
if not all(running):
messages.append(
"Services not running that should be: {}"
.format(", ".join(_filter_tuples(services_running, False))))
state = 'blocked'
# also verify that the ports that should be open are open
# NB, that ServiceManager objects only OPTIONALLY have ports
port_map = OrderedDict([(s['service'], s['ports'])
for s in services if 'ports' in s])
if state == 'active' and port_map:
all_ports = list(itertools.chain(*port_map.values()))
ports_open = [port_has_listener('0.0.0.0', p)
for p in all_ports]
if not all(ports_open):
not_opened = [p for p, opened in zip(all_ports, ports_open)
if not opened]
map_not_open = OrderedDict()
for service, ports in port_map.items():
closed_ports = set(ports).intersection(not_opened)
if closed_ports:
map_not_open[service] = closed_ports
# find which service has missing ports. They are in service
# order which makes it a bit easier.
message = (
"Services with ports not open that should be: {}"
.format(
", ".join([
"{}: [{}]".format(
service,
", ".join([str(v) for v in ports]))
for service, ports in map_not_open.items()])))
state = 'blocked'
if ports is not None and state == 'active':
# and we can also check ports which we don't know the service for
ports_open = [port_has_listener('0.0.0.0', p) for p in ports]
map_not_open, ports_open = (
_check_listening_on_services_ports(services))
if not all(ports_open):
message = (
# find which service has missing ports. They are in service
# order which makes it a bit easier.
message_parts = {service: ", ".join([str(v) for v in open_ports])
for service, open_ports in map_not_open.items()}
message = ", ".join(
["{}: [{}]".format(s, sp) for s, sp in message_parts.items()])
messages.append(
"Services with ports not open that should be: {}"
.format(message))
state = 'blocked'
if ports is not None:
# and we can also check ports which we don't know the service for
ports_open, ports_open_bools = _check_listening_on_ports_list(ports)
if not all(ports_open_bools):
messages.append(
"Ports which should be open, but are not: {}"
.format(", ".join([str(p) for p, v in zip(ports, ports_open)
.format(", ".join([str(p) for p, v in ports_open
if not v])))
state = 'blocked'
# Set to active if all requirements have been met
if state == 'active':
message = "Unit is ready"
juju_log(message, "INFO")
if state is not None:
message = "; ".join(messages)
return state, message
status_set(state, message)
return None, None
def _extract_services_list_helper(services):
"""Extract a OrderedDict of {service: [ports]} of the supplied services
for use by the other functions.
The services object can either be:
- None : no services were passed (an empty dict is returned)
- a list of strings
- A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
- An array of [{'service': service_name, ...}, ...]
@param services: see above
@returns OrderedDict(service: [ports], ...)
"""
if services is None:
return {}
if isinstance(services, dict):
services = services.values()
# either extract the list of services from the dictionary, or if
# it is a simple string, use that. i.e. works with mixed lists.
_s = OrderedDict()
for s in services:
if isinstance(s, dict) and 'service' in s:
_s[s['service']] = s.get('ports', [])
if isinstance(s, str):
_s[s] = []
return _s
def _check_running_services(services):
"""Check that the services dict provided is actually running and provide
a list of (service, boolean) tuples for each service.
Returns both a zipped list of (service, boolean) and a list of booleans
in the same order as the services.
@param services: OrderedDict of strings: [ports], one for each service to
check.
@returns [(service, boolean), ...], : results for checks
[boolean] : just the result of the service checks
"""
services_running = [service_running(s) for s in services]
return list(zip(services, services_running)), services_running
def _check_listening_on_services_ports(services, test=False):
"""Check that the unit is actually listening (has the port open) on the
ports that the service specifies are open. If test is True then the
function returns the services with ports that are open rather than
closed.
Returns an OrderedDict of service: ports and a list of booleans
@param services: OrderedDict(service: [port, ...], ...)
@param test: default=False, if False, test for closed, otherwise open.
@returns OrderedDict(service: [port-not-open, ...]...), [boolean]
"""
test = not(not(test)) # ensure test is True or False
all_ports = list(itertools.chain(*services.values()))
ports_states = [port_has_listener('0.0.0.0', p) for p in all_ports]
map_ports = OrderedDict()
matched_ports = [p for p, opened in zip(all_ports, ports_states)
if opened == test] # essentially opened xor test
for service, ports in services.items():
set_ports = set(ports).intersection(matched_ports)
if set_ports:
map_ports[service] = set_ports
return map_ports, ports_states
def _check_listening_on_ports_list(ports):
"""Check that the ports list given are being listened to
Returns a list of ports being listened to and a list of the
booleans.
@param ports: LIST or port numbers.
@returns [(port_num, boolean), ...], [boolean]
"""
ports_open = [port_has_listener('0.0.0.0', p) for p in ports]
return zip(ports, ports_open), ports_open
def _filter_tuples(services_states, state):
"""Return a simple list from a list of tuples according to the condition
@param services_states: LIST of (string, boolean): service and running
state.
@param state: Boolean to match the tuple against.
@returns [LIST of strings] that matched the tuple RHS.
"""
return [s for s, b in services_states if b == state]
def workload_state_compare(current_workload_state, workload_state):
@ -1046,8 +1236,7 @@ def workload_state_compare(current_workload_state, workload_state):
def incomplete_relation_data(configs, required_interfaces):
"""
Check complete contexts against required_interfaces
"""Check complete contexts against required_interfaces
Return dictionary of incomplete relation data.
configs is an OSConfigRenderer object with configs registered
@ -1072,19 +1261,13 @@ def incomplete_relation_data(configs, required_interfaces):
'shared-db': {'related': True}}}
"""
complete_ctxts = configs.complete_contexts()
incomplete_relations = []
for svc_type in required_interfaces.keys():
# Avoid duplicates
found_ctxt = False
for interface in required_interfaces[svc_type]:
if interface in complete_ctxts:
found_ctxt = True
if not found_ctxt:
incomplete_relations.append(svc_type)
incomplete_context_data = {}
for i in incomplete_relations:
incomplete_context_data[i] = configs.get_incomplete_context_data(required_interfaces[i])
return incomplete_context_data
incomplete_relations = [
svc_type
for svc_type, interfaces in required_interfaces.items()
if not set(interfaces).intersection(complete_ctxts)]
return {
i: configs.get_incomplete_context_data(required_interfaces[i])
for i in incomplete_relations}
def do_action_openstack_upgrade(package, upgrade_callback, configs):
@ -1145,3 +1328,245 @@ def remote_restart(rel_name, remote_service=None):
relation_set(relation_id=rid,
relation_settings=trigger,
)
def check_actually_paused(services=None, ports=None):
"""Check that services listed in the services object and and ports
are actually closed (not listened to), to verify that the unit is
properly paused.
@param services: See _extract_services_list_helper
@returns status, : string for status (None if okay)
message : string for problem for status_set
"""
state = None
message = None
messages = []
if services is not None:
services = _extract_services_list_helper(services)
services_running, services_states = _check_running_services(services)
if any(services_states):
# there shouldn't be any running so this is a problem
messages.append("these services running: {}"
.format(", ".join(
_filter_tuples(services_running, True))))
state = "blocked"
ports_open, ports_open_bools = (
_check_listening_on_services_ports(services, True))
if any(ports_open_bools):
message_parts = {service: ", ".join([str(v) for v in open_ports])
for service, open_ports in ports_open.items()}
message = ", ".join(
["{}: [{}]".format(s, sp) for s, sp in message_parts.items()])
messages.append(
"these service:ports are open: {}".format(message))
state = 'blocked'
if ports is not None:
ports_open, bools = _check_listening_on_ports_list(ports)
if any(bools):
messages.append(
"these ports which should be closed, but are open: {}"
.format(", ".join([str(p) for p, v in ports_open if v])))
state = 'blocked'
if messages:
message = ("Services should be paused but {}"
.format(", ".join(messages)))
return state, message
def set_unit_paused():
"""Set the unit to a paused state in the local kv() store.
This does NOT actually pause the unit
"""
with unitdata.HookData()() as t:
kv = t[0]
kv.set('unit-paused', True)
def clear_unit_paused():
"""Clear the unit from a paused state in the local kv() store
This does NOT actually restart any services - it only clears the
local state.
"""
with unitdata.HookData()() as t:
kv = t[0]
kv.set('unit-paused', False)
def is_unit_paused_set():
"""Return the state of the kv().get('unit-paused').
This does NOT verify that the unit really is paused.
To help with units that don't have HookData() (testing)
if it excepts, return False
"""
try:
with unitdata.HookData()() as t:
kv = t[0]
# transform something truth-y into a Boolean.
return not(not(kv.get('unit-paused')))
except:
return False
def pause_unit(assess_status_func, services=None, ports=None,
charm_func=None):
"""Pause a unit by stopping the services and setting 'unit-paused'
in the local kv() store.
Also checks that the services have stopped and ports are no longer
being listened to.
An optional charm_func() can be called that can either raise an
Exception or return non None, None to indicate that the unit
didn't pause cleanly.
The signature for charm_func is:
charm_func() -> message: string
charm_func() is executed after any services are stopped, if supplied.
The services object can either be:
- None : no services were passed (an empty dict is returned)
- a list of strings
- A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
- An array of [{'service': service_name, ...}, ...]
@param assess_status_func: (f() -> message: string | None) or None
@param services: OPTIONAL see above
@param ports: OPTIONAL list of port
@param charm_func: function to run for custom charm pausing.
@returns None
@raises Exception(message) on an error for action_fail().
"""
services = _extract_services_list_helper(services)
messages = []
if services:
for service in services.keys():
stopped = service_pause(service)
if not stopped:
messages.append("{} didn't stop cleanly.".format(service))
if charm_func:
try:
message = charm_func()
if message:
messages.append(message)
except Exception as e:
message.append(str(e))
set_unit_paused()
if assess_status_func:
message = assess_status_func()
if message:
messages.append(message)
if messages:
raise Exception("Couldn't pause: {}".format("; ".join(messages)))
def resume_unit(assess_status_func, services=None, ports=None,
charm_func=None):
"""Resume a unit by starting the services and clearning 'unit-paused'
in the local kv() store.
Also checks that the services have started and ports are being listened to.
An optional charm_func() can be called that can either raise an
Exception or return non None to indicate that the unit
didn't resume cleanly.
The signature for charm_func is:
charm_func() -> message: string
charm_func() is executed after any services are started, if supplied.
The services object can either be:
- None : no services were passed (an empty dict is returned)
- a list of strings
- A dictionary (optionally OrderedDict) {service_name: {'service': ..}}
- An array of [{'service': service_name, ...}, ...]
@param assess_status_func: (f() -> message: string | None) or None
@param services: OPTIONAL see above
@param ports: OPTIONAL list of port
@param charm_func: function to run for custom charm resuming.
@returns None
@raises Exception(message) on an error for action_fail().
"""
services = _extract_services_list_helper(services)
messages = []
if services:
for service in services.keys():
started = service_resume(service)
if not started:
messages.append("{} didn't start cleanly.".format(service))
if charm_func:
try:
message = charm_func()
if message:
messages.append(message)
except Exception as e:
message.append(str(e))
clear_unit_paused()
if assess_status_func:
message = assess_status_func()
if message:
messages.append(message)
if messages:
raise Exception("Couldn't resume: {}".format("; ".join(messages)))
def make_assess_status_func(*args, **kwargs):
"""Creates an assess_status_func() suitable for handing to pause_unit()
and resume_unit().
This uses the _determine_os_workload_status(...) function to determine
what the workload_status should be for the unit. If the unit is
not in maintenance or active states, then the message is returned to
the caller. This is so an action that doesn't result in either a
complete pause or complete resume can signal failure with an action_fail()
"""
def _assess_status_func():
state, message = _determine_os_workload_status(*args, **kwargs)
status_set(state, message)
if state not in ['maintenance', 'active']:
return message
return None
return _assess_status_func
def pausable_restart_on_change(restart_map, stopstart=False):
"""A restart_on_change decorator that checks to see if the unit is
paused. If it is paused then the decorated function doesn't fire.
This is provided as a helper, as the @restart_on_change(...) decorator
is in core.host, yet the openstack specific helpers are in this file
(contrib.openstack.utils). Thus, this needs to be an optional feature
for openstack charms (or charms that wish to use the openstack
pause/resume type features).
It is used as follows:
from contrib.openstack.utils import (
pausable_restart_on_change as restart_on_change)
@restart_on_change(restart_map, stopstart=<boolean>)
def some_hook(...):
pass
see core.utils.restart_on_change() for more details.
@param f: the function to decorate
@param restart_map: the restart map {conf_file: [services]}
@param stopstart: DEFAULT false; whether to stop, start or just restart
@returns decorator to use a restart_on_change with pausability
"""
def wrap(f):
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
if is_unit_paused_set():
return f(*args, **kwargs)
# otherwise, normal restart_on_change functionality
return restart_on_change_helper(
(lambda: f(*args, **kwargs)), restart_map, stopstart)
return wrapped_f
return wrap

View File

@ -24,6 +24,8 @@
# Adam Gandelman <adamg@ubuntu.com>
#
import bisect
import errno
import hashlib
import six
import os
@ -163,7 +165,7 @@ class Pool(object):
:return: None
"""
# read-only is easy, writeback is much harder
mode = get_cache_mode(cache_pool)
mode = get_cache_mode(self.service, cache_pool)
if mode == 'readonly':
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'none'])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
@ -171,7 +173,7 @@ class Pool(object):
elif mode == 'writeback':
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'cache-mode', cache_pool, 'forward'])
# Flush the cache and wait for it to return
check_call(['ceph', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
check_call(['rados', '--id', self.service, '-p', cache_pool, 'cache-flush-evict-all'])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove-overlay', self.name])
check_call(['ceph', '--id', self.service, 'osd', 'tier', 'remove', self.name, cache_pool])
@ -259,6 +261,134 @@ class ErasurePool(Pool):
Returns json formatted output"""
def get_mon_map(service):
"""
Returns the current monitor map.
:param service: six.string_types. The Ceph user name to run the command under
:return: json string. :raise: ValueError if the monmap fails to parse.
Also raises CalledProcessError if our ceph command fails
"""
try:
mon_status = check_output(
['ceph', '--id', service,
'mon_status', '--format=json'])
try:
return json.loads(mon_status)
except ValueError as v:
log("Unable to parse mon_status json: {}. Error: {}".format(
mon_status, v.message))
raise
except CalledProcessError as e:
log("mon_status command failed with message: {}".format(
e.message))
raise
def hash_monitor_names(service):
"""
Uses the get_mon_map() function to get information about the monitor
cluster.
Hash the name of each monitor. Return a sorted list of monitor hashes
in an ascending order.
:param service: six.string_types. The Ceph user name to run the command under
:rtype : dict. json dict of monitor name, ip address and rank
example: {
'name': 'ip-172-31-13-165',
'rank': 0,
'addr': '172.31.13.165:6789/0'}
"""
try:
hash_list = []
monitor_list = get_mon_map(service=service)
if monitor_list['monmap']['mons']:
for mon in monitor_list['monmap']['mons']:
hash_list.append(
hashlib.sha224(mon['name'].encode('utf-8')).hexdigest())
return sorted(hash_list)
else:
return None
except (ValueError, CalledProcessError):
raise
def monitor_key_delete(service, key):
"""
Delete a key and value pair from the monitor cluster
:param service: six.string_types. The Ceph user name to run the command under
Deletes a key value pair on the monitor cluster.
:param key: six.string_types. The key to delete.
"""
try:
check_output(
['ceph', '--id', service,
'config-key', 'del', str(key)])
except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(
e.output))
raise
def monitor_key_set(service, key, value):
"""
Sets a key value pair on the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to set.
:param value: The value to set. This will be converted to a string
before setting
"""
try:
check_output(
['ceph', '--id', service,
'config-key', 'put', str(key), str(value)])
except CalledProcessError as e:
log("Monitor config-key put failed with message: {}".format(
e.output))
raise
def monitor_key_get(service, key):
"""
Gets the value of an existing key in the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to search for.
:return: Returns the value of that key or None if not found.
"""
try:
output = check_output(
['ceph', '--id', service,
'config-key', 'get', str(key)])
return output
except CalledProcessError as e:
log("Monitor config-key get failed with message: {}".format(
e.output))
return None
def monitor_key_exists(service, key):
"""
Searches for the existence of a key in the monitor cluster.
:param service: six.string_types. The Ceph user name to run the command under
:param key: six.string_types. The key to search for
:return: Returns True if the key exists, False if not and raises an
exception if an unknown error occurs. :raise: CalledProcessError if
an unknown error occurs
"""
try:
check_call(
['ceph', '--id', service,
'config-key', 'exists', str(key)])
# I can return true here regardless because Ceph returns
# ENOENT if the key wasn't found
return True
except CalledProcessError as e:
if e.returncode == errno.ENOENT:
return False
else:
log("Unknown error from ceph config-get exists: {} {}".format(
e.returncode, e.output))
raise
def get_erasure_profile(service, name):
"""
:param service: six.string_types. The Ceph user name to run the command under

View File

@ -912,6 +912,24 @@ def payload_status_set(klass, pid, status):
subprocess.check_call(cmd)
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def resource_get(name):
"""used to fetch the resource path of the given name.
<name> must match a name of defined resource in metadata.yaml
returns either a path or False if resource not available
"""
if not name:
return False
cmd = ['resource-get', name]
try:
return subprocess.check_output(cmd).decode('UTF-8')
except subprocess.CalledProcessError:
return False
@cached
def juju_version():
"""Full version string (eg. '1.23.3.1-trusty-amd64')"""
@ -976,3 +994,16 @@ def _run_atexit():
for callback, args, kwargs in reversed(_atexit):
callback(*args, **kwargs)
del _atexit[:]
@translate_exc(from_exc=OSError, to_exc=NotImplementedError)
def network_get_primary_address(binding):
'''
Retrieve the primary network address for a named binding
:param binding: string. The name of a relation of extra-binding
:return: string. The primary IP address for the named binding
:raise: NotImplementedError if run on Juju < 2.0
'''
cmd = ['network-get', '--primary-address', binding]
return subprocess.check_output(cmd).strip()

View File

@ -30,6 +30,8 @@ import random
import string
import subprocess
import hashlib
import functools
import itertools
from contextlib import contextmanager
from collections import OrderedDict
@ -428,27 +430,47 @@ def restart_on_change(restart_map, stopstart=False):
restarted if any file matching the pattern got changed, created
or removed. Standard wildcards are supported, see documentation
for the 'glob' module for more information.
@param restart_map: {path_file_name: [service_name, ...]
@param stopstart: DEFAULT false; whether to stop, start OR restart
@returns result from decorated function
"""
def wrap(f):
@functools.wraps(f)
def wrapped_f(*args, **kwargs):
checksums = {path: path_hash(path) for path in restart_map}
f(*args, **kwargs)
restarts = []
for path in restart_map:
if path_hash(path) != checksums[path]:
restarts += restart_map[path]
services_list = list(OrderedDict.fromkeys(restarts))
if not stopstart:
for service_name in services_list:
service('restart', service_name)
else:
for action in ['stop', 'start']:
for service_name in services_list:
service(action, service_name)
return restart_on_change_helper(
(lambda: f(*args, **kwargs)), restart_map, stopstart)
return wrapped_f
return wrap
def restart_on_change_helper(lambda_f, restart_map, stopstart=False):
"""Helper function to perform the restart_on_change function.
This is provided for decorators to restart services if files described
in the restart_map have changed after an invocation of lambda_f().
@param lambda_f: function to call.
@param restart_map: {file: [service, ...]}
@param stopstart: whether to stop, start or restart a service
@returns result of lambda_f()
"""
checksums = {path: path_hash(path) for path in restart_map}
r = lambda_f()
# create a list of lists of the services to restart
restarts = [restart_map[path]
for path in restart_map
if path_hash(path) != checksums[path]]
# create a flat list of ordered services without duplicates from lists
services_list = list(OrderedDict.fromkeys(itertools.chain(*restarts)))
if services_list:
actions = ('stop', 'start') if stopstart else ('restart',)
for action in actions:
for service_name in services_list:
service(action, service_name)
return r
def lsb_release():
"""Return /etc/lsb-release in a dict"""
d = {}

View File

@ -31,7 +31,6 @@ from charmhelpers.core.hookenv import (
)
from charmhelpers.core.host import (
restart_on_change,
service_running,
service_stop,
service_reload,
@ -53,6 +52,8 @@ from charmhelpers.contrib.openstack.utils import (
os_requires_version,
sync_db_with_multi_ipv6_addresses,
set_os_workload_status,
pausable_restart_on_change as restart_on_change,
is_unit_paused_set,
)
from charmhelpers.contrib.openstack.neutron import (
@ -323,7 +324,7 @@ def conditional_neutron_migration():
# running so prod it just in case
[neutron_api_relation_joined(rid=rid, remote_restart=True)
for rid in relation_ids('neutron-api')]
if 'neutron-server' in services():
if 'neutron-server' in services() and not is_unit_paused_set():
service_restart('neutron-server')
@ -895,7 +896,8 @@ def configure_https():
# TODO: improve this by checking if local CN certs are available
# first then checking reload status (see LP #1433114).
service_reload('apache2', restart_on_failure=True)
if not is_unit_paused_set():
service_reload('apache2', restart_on_failure=True)
for rid in relation_ids('identity-service'):
identity_joined(rid=rid)

View File

@ -40,6 +40,10 @@ from charmhelpers.contrib.openstack.utils import (
os_release,
save_script_rc as _save_script_rc,
set_os_workload_status,
is_unit_paused_set,
make_assess_status_func,
pause_unit,
resume_unit,
)
from charmhelpers.fetch import (
@ -619,7 +623,8 @@ def _do_openstack_upgrade(new_src):
if is_elected_leader(CLUSTER_RES):
status_set('maintenance', 'Running nova db migration')
migrate_nova_database()
[service_start(s) for s in services()]
if not is_unit_paused_set():
[service_start(s) for s in services()]
return configs
@ -1018,6 +1023,10 @@ def get_topics():
def cmd_all_services(cmd):
if is_unit_paused_set():
log('Unit is in paused state, not issuing {} to all'
'services'.format(cmd))
return
if cmd == 'start':
for svc in services():
if not service_running(svc):
@ -1337,3 +1346,70 @@ def check_optional_relations(configs):
def is_api_ready(configs):
return (not incomplete_relation_data(configs, REQUIRED_INTERFACES))
def assess_status(configs):
"""Assess status of current unit
Decides what the state of the unit should be based on the current
configuration.
SIDE EFFECT: calls set_os_workload_status(...) which sets the workload
status of the unit.
Also calls status_set(...) directly if paused state isn't complete.
@param configs: a templating.OSConfigRenderer() object
@returns None - this function is executed for its side-effect
"""
assess_status_func(configs)()
def assess_status_func(configs):
"""Helper function to create the function that will assess_status() for
the unit.
Uses charmhelpers.contrib.openstack.utils.make_assess_status_func() to
create the appropriate status function and then returns it.
Used directly by assess_status() and also for pausing and resuming
the unit.
NOTE(ajkavanagh) ports are not checked due to race hazards with services
that don't behave sychronously w.r.t their service scripts. e.g.
apache2.
@param configs: a templating.OSConfigRenderer() object
@return f() -> None : a function that assesses the unit's workload status
"""
return make_assess_status_func(
configs, REQUIRED_INTERFACES,
charm_func=check_optional_relations,
services=services(), ports=None)
def pause_unit_helper(configs):
"""Helper function to pause a unit, and then call assess_status(...) in
effect, so that the status is correctly updated.
Uses charmhelpers.contrib.openstack.utils.pause_unit() to do the work.
@param configs: a templating.OSConfigRenderer() object
@returns None - this function is executed for its side-effect
"""