fenix/fenix/workflow/workflows/k8s.py

985 lines
44 KiB
Python

# Copyright (c) 2020 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
from importlib import import_module
try:
from importlib.machinery import SourceFileLoader
def mod_loader_action_instance(mname, mpath, session_instance,
ap_db_instance):
mi = SourceFileLoader(mname, mpath).load_module()
return mi.ActionPlugin(session_instance, ap_db_instance)
except ImportError:
from imp import load_source
def mod_loader_action_instance(mname, mpath, session_instance,
ap_db_instance):
mi = load_source(mname, mpath)
return mi.ActionPlugin(session_instance, ap_db_instance)
from keystoneclient import client as ks_client
from kubernetes import client
from kubernetes import config
from kubernetes.client.rest import ApiException
import os
from oslo_log import log as logging
import time
from fenix.db import api as db_api
from fenix.db import exceptions as db_exc
from fenix.utils.thread import run_async
from fenix.utils.time import datetime_to_str
from fenix.utils.time import is_time_after_time
from fenix.utils.time import reply_time_str
from fenix.utils.time import time_now_str
from fenix.workflow.workflow import BaseWorkflow
LOG = logging.getLogger(__name__)
class Workflow(BaseWorkflow):
def __init__(self, conf, session_id, data):
super(Workflow, self).__init__(conf, session_id, data)
config.load_kube_config()
v_api = client.VersionApi()
self.kapi = client.CoreV1Api()
self.ks = ks_client.Client(version='v3', session=self.auth_session)
LOG.info("%s: initialized with Kubernetes: %s" %
(self.session_id,
v_api.get_code_with_http_info()[0].git_version))
self.hosts = self._init_hosts_by_services()
LOG.info('%s: Execute pre action plugins' % (self.session_id))
self.maintenance_by_plugin_type("localhost", "pre")
self.group_impacted_members = {}
def _init_hosts_by_services(self):
LOG.info("%s: Dicovering hosts by services" % self.session_id)
nodes = self.kapi.list_node().items
hosts = []
for node in nodes:
host = {}
host['hostname'] = node.metadata.name
if 'node-role.kubernetes.io/master' in node.metadata.labels.keys():
host['type'] = 'controller'
else:
host['type'] = 'compute'
if node.spec.unschedulable:
host['disabled'] = True
else:
host['disabled'] = False
host['maintained'] = False
hosts.append(host)
return db_api.create_hosts_by_details(self.session_id, hosts)
def get_worker_nodes(self):
nodes = self.kapi.list_node().items
worker_hosts = self.get_compute_hosts()
return [n for n in nodes if n.metadata.name in worker_hosts]
def is_node_cordoned(self, node_name):
host = self.get_host_by_name(node_name)
return host.disabled
def cordon(self, node_name):
LOG.info("%s: cordon %s" % (self.session_id, node_name))
host = self.get_host_by_name(node_name)
body = {"apiVersion": "v1", "spec": {"unschedulable": True}}
self.kapi.patch_node(node_name, body)
host.disabled = True
def uncordon(self, node_name):
LOG.info("%s: uncordon %s" % (self.session_id, node_name))
host = self.get_host_by_name(node_name)
body = {"apiVersion": "v1", "spec": {"unschedulable": None}}
self.kapi.patch_node(node_name, body)
host.disabled = False
def _pod_by_id(self, pod_id):
return [p for p in self.kapi.list_pod_for_all_namespaces().items
if p.metadata.uid == pod_id][0]
def _pods_by_node_and_controller(self, node_name, contoller):
return [p for p in self.kapi.list_pod_for_all_namespaces().items
if p.metadata.owner_references[0].kind == contoller and
p.spec.node_name == node_name and
p.metadata.namespace != 'kube-system']
def _pods_by_nodes_and_controller(self, node_names, contoller):
return [p for p in self.kapi.list_pod_for_all_namespaces().items
if p.metadata.owner_references[0].kind == contoller and
p.spec.node_name in node_names and
p.metadata.namespace != 'kube-system']
def _get_pod_by_name_and_namespace(self, name, namespace):
try:
pod = self.kapi.read_namespaced_pod(name, namespace)
except ApiException:
pod = None
return pod
# TBD remove as deprecated
def _get_pod_host_and_state(self, name):
return [(p.spec.node_name, p.status.phase) for p in
self.kapi.list_pod_for_all_namespaces().items
if p.metadata.name == name][0]
# TBD remove as deprecated
def wait_pod_evicted(self, name, orig_host, orig_state):
host, state = self._get_pod_host_and_state(name)
check = 60
last_state = orig_state
last_host = orig_host
while host == orig_host or state != orig_state:
if host != last_host or state != last_state:
# log only if either value changed since last round
LOG.info("%s: pod: %s %s on host %s" %
(self.session_id, name, state, host))
last_state = state
last_host = host
if check == 0:
raise Exception('Pod %s eviction timout' % name)
check -= 1
time.sleep(1)
host, state = self._get_pod_host_and_state(name)
# TBD remove as deprecated
def drain(self, node_name):
LOG.info("%s: drain %s" % (self.session_id, node_name))
if not self.is_node_cordoned(node_name):
self.cordon(node_name)
for pod in self._pods_by_node_and_controller(node_name,
'ReplicaSet'):
namespace = pod.metadata.namespace
name = pod.metadata.name
orig_host = pod.spec.node_name
orig_state = pod.status.phase
# For now k8s namespace will be the user and project in OpenStack
# keystone. Keycloak or webhook for keystone should be used
body = client.V1beta1Eviction()
body.api_version = "policy/v1beta1"
body.kind = "Eviction"
body.metadata = {"name": name, "namespace": namespace}
LOG.info("%s: Evicting pod: %s %s on host %s" %
(self.session_id, name, orig_state, orig_host))
try:
self.kapi.create_namespaced_pod_eviction(name,
namespace,
body)
except ApiException as e:
LOG.error("Exception when calling create_namespaced_pod_"
"eviction: %s\n" % e)
# self.wait_pod_evicted(name, orig_host, orig_state)
LOG.info("%s: Evicted pod: %s" % (self.session_id, name))
# VNFM should keep track of constraints, not Fenix
# db_api.remove_project_instance(pod_id)
# self.notify_action_done(self.instance_by_id(pod_id))
LOG.info("%s: drained %s" % (self.session_id, node_name))
def evict(self, pod, recovery_time):
namespace = pod.metadata.namespace
name = pod.metadata.name
pod_id = pod.metadata.uid
LOG.info("%s: Evict: %s: %s" % (self.session_id, pod_id, name))
orig_host = pod.spec.node_name
orig_state = pod.status.phase
# For now k8s namespace will be the user and project in OpenStack
# keystone. Keycloak or webhook for keystone should be used
body = client.V1beta1Eviction()
body.api_version = "policy/v1beta1"
body.kind = "Eviction"
body.metadata = {"name": name,
"namespace": namespace}
LOG.info("%s: Evicting pod: %s %s on host %s" %
(self.session_id, name, orig_state, orig_host))
try:
self.kapi.create_namespaced_pod_eviction(name,
namespace,
body)
except ApiException as e:
LOG.error("Exception when calling create_namespaced_pod_"
"eviction: %s\n" % e)
# Need to start timer to wait new POD initialization with recovery time
# TBD this might first check new POD STATUS == running and then
# still wait instance_group.recovery_time. This might be tricky as we
# do not know new POD. We might check new pods, but there might be more
# than one becasue parallel actions. Somehow we would need to be able
# to map evicted POD for new to make this enhancement
# tried adding "labels": {"previous_pod_id": pod_id} in above body.
# that did not result to this label to be in new POD
timer = 'RECOVERY_%s_TIMEOUT' % pod_id
self.start_timer(recovery_time, timer)
time.sleep(1)
pod = self._get_pod_by_name_and_namespace(name, namespace)
check = 40
LOG.info("%s: Waiting pod: %s eviction from host %s ..." %
(self.session_id, name, orig_host))
while pod:
if check == 0:
raise Exception('Pod %s still not deleted in eviction' % name)
check -= 1
time.sleep(1)
pod = self._get_pod_by_name_and_namespace(name, namespace)
LOG.info("%s: Evicted pod: %s: %s" % (self.session_id, pod_id, name))
return True
def _fenix_instance(self, project_id, instance_id, instance_name, host,
state, details=None, action=None, project_state=None,
action_done=False):
instance = {'session_id': self.session_id,
'instance_id': instance_id,
'action': action,
'project_id': project_id,
'instance_id': instance_id,
'project_state': project_state,
'state': state,
'instance_name': instance_name,
'action_done': action_done,
'host': host,
'details': details}
return instance
def initialize_server_info(self):
project_ids = {}
instances = []
worker_hosts = self.get_compute_hosts()
pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet')
for pod in pods:
host = pod.spec.node_name
# Map K8S namespace as user and project in keystone
if pod.metadata.namespace not in project_ids.keys():
project_id = str(self.ks.projects.list(
name=pod.metadata.namespace)[0].id)
project_ids[pod.metadata.namespace] = project_id
else:
project_id = project_ids[pod.metadata.namespace]
instance_name = pod.metadata.name
instance_id = pod.metadata.uid
state = pod.status.phase # Running
instances.append(self._fenix_instance(project_id, instance_id,
instance_name, host, state))
if project_ids:
self.projects = self.init_projects(project_ids.values())
else:
LOG.info('%s: No projects on nodes under maintenance' %
self.session_id)
if len(instances):
self.instances = self.add_instances(instances)
else:
LOG.info('%s: No instances on nodes under maintenance' %
self.session_id)
LOG.info(str(self))
def update_instance(self, project_id, instance_id, instance_name, host,
state, details=None):
if self.instance_id_found(instance_id):
# TBD Might need to update instance variables here if not done
# somewhere else
return
elif self.instance_name_found(instance_name):
# Project has made re-instantiation, remove old add new
old_instance = self.instance_by_name(instance_name)
instance = self._fenix_instance(project_id, instance_id,
instance_name, host,
state, details,
old_instance.action,
old_instance.project_state,
old_instance.action_done)
self.instances.append(self.add_instance(instance))
self.remove_instance(old_instance)
else:
# Instance new, as project has added instances
instance = self._fenix_instance(project_id, instance_id,
instance_name, host,
state, details)
self.instances.append(self.add_instance(instance))
def remove_non_existing_instances(self, instance_ids):
remove_instances = [instance for instance in
self.instances if instance.instance_id not in
instance_ids]
for instance in remove_instances:
# Instance deleted, as project possibly scaled down
self.remove_instance(instance)
def update_server_info(self):
# TBD This keeps internal instance information up-to-date and prints
# it out. Same could be done by updating the information when changed
# Anyhow this also double checks information against K8S
project_ids = {}
instance_ids = []
worker_hosts = self.get_compute_hosts()
pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet')
for pod in pods:
host = pod.spec.node_name
# Map K8S namespace as user and project in keystone
if pod.metadata.namespace not in project_ids.keys():
project_id = self.ks.projects.list(
name=pod.metadata.namespace)[0].id
project_ids[pod.metadata.namespace] = project_id
else:
project_id = project_ids[pod.metadata.namespace]
instance_name = pod.metadata.name
instance_id = pod.metadata.uid
state = pod.status.phase # Running
details = None
self.update_instance(project_id, instance_id, instance_name, host,
state, details)
instance_ids.append(instance_id)
self.remove_non_existing_instances(instance_ids)
LOG.info(str(self))
def projects_with_constraints(self):
project_ids = self.project_ids_with_instance_group()
for project_id in self.projects():
if project_id not in project_ids:
LOG.error('%s: project_id %s not '
'set any instance_group' %
(self.session_id, project_id))
return False
return True
def confirm_maintenance(self):
allowed_actions = []
actions_at = self.session.maintenance_at
state = 'MAINTENANCE'
self.set_projets_state(state)
all_replied = False
project_not_replied = None
retry = 2
while not all_replied:
for project in self.project_names():
if (project_not_replied is not None and project not in
project_not_replied):
continue
LOG.info('\nMAINTENANCE to project %s\n' % project)
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
self.session_id,
project)
reply_at = reply_time_str(self.conf.project_maintenance_reply)
if is_time_after_time(reply_at, actions_at):
LOG.error('%s: No time for project to answer in state: %s'
% (self.session_id, state))
self.state("MAINTENANCE_FAILED")
return False
metadata = self.session.meta
self._project_notify(project, instance_ids, allowed_actions,
actions_at, reply_at, state, metadata)
self.start_timer(self.conf.project_maintenance_reply,
'MAINTENANCE_TIMEOUT')
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
if not all_replied:
if retry == 0:
LOG.info('confirm_maintenance failed after retries')
break
else:
LOG.info('confirm_maintenance retry')
projects = self.get_projects_with_state()
project_not_replied = (
self._project_names_in_state(projects, state))
retry -= 1
return all_replied
def worker_nodes_cpu_info(self, system_reserved):
# TBD system_reserved is now just questimated according to what
# flannel, kubelet... needs on top of pods
workers_info = {}
worker_hosts = self.get_compute_hosts()
workers = self.get_worker_nodes()
pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet')
for worker in workers:
cpus = int(worker.status.capacity[u'cpu']) - system_reserved
name = worker.metadata.name
workers_info[name] = {'cpus_used': 0,
'cpus': cpus,
'name': name}
for pod in [p for p in pods if p.spec.node_name == name]:
cpus_used = 0
for container in pod.spec.containers:
try:
cpus_used += int(container.resources.requests[u'cpu'])
except AttributeError:
# container does not need to have
# resources.requests.cpu
pass
if cpus_used > 0:
workers_info[name]['cpus_used'] += cpus_used
if workers_info[name]['cpus_used'] > workers_info[name]['cpus']:
LOG.error('%s overbooked: %s' %
(name, workers_info[name]))
raise Exception('%s overbooked: %s' %
(name, workers_info[name]))
LOG.info('workers_info:\n%s' % workers_info)
return workers_info
def confirm_scale_in(self):
allowed_actions = []
actions_at = reply_time_str(self.conf.project_scale_in_reply)
reply_at = actions_at
state = 'SCALE_IN'
self.set_projets_state(state)
all_replied = False
project_not_replied = None
retry = 2
while not all_replied:
for project in self.project_names():
if (project_not_replied is not None and project not in
project_not_replied):
continue
LOG.info('\nSCALE_IN to project %s\n' % project)
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
self.session_id,
project)
metadata = self.session.meta
self._project_notify(project, instance_ids, allowed_actions,
actions_at, reply_at, state, metadata)
self.start_timer(self.conf.project_scale_in_reply,
'SCALE_IN_TIMEOUT')
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
if not all_replied:
if retry == 0:
LOG.info('confirm_scale_in failed after retries')
break
else:
LOG.info('confirm_scale_in retry')
projects = self.get_projects_with_state()
project_not_replied = (
self._project_names_in_state(projects, state))
retry -= 1
return all_replied
def need_scale_in(self):
# TBD see if there is enough free capacity, so we do not need to scale
# TBD this should be calculated according to instance and
# instance_group constraints
workers_info = self.worker_nodes_cpu_info(2)
prev_cpus = 0
free_cpus = 0
prev_hostname = ''
LOG.info('checking workers CPU capacity')
for worker in workers_info.values():
hostname = worker['name']
cpus = worker['cpus']
cpus_used = worker['cpus_used']
if prev_cpus != 0 and prev_cpus != cpus:
raise Exception('%s: %d cpus on %s does not match to'
'%d on %s'
% (self.session_id, cpus, hostname,
prev_cpus, prev_hostname))
free_cpus += cpus - cpus_used
prev_cpus = cpus
prev_hostname = hostname
if free_cpus >= cpus:
# TBD cpu capacity might be too scattered so moving instances from
# one host to other host still might not succeed. At least with
# NUMA and CPU pinning, one should calculate and ask specific
# instances
return False
else:
return True
def find_host_to_be_empty(self, need_empty, weighted_hosts):
print("need_empty: %s" % need_empty)
hosts_to_be_empty = []
for instances in sorted(weighted_hosts.keys()):
print("instances in weighted_hosts: %s" % instances)
weighted_candidates = weighted_hosts[instances]
if len(weighted_candidates) == need_empty:
# Happened to be exact match to needed
hosts_to_be_empty = weighted_hosts[instances]
print("hosts to be empty: %s" % hosts_to_be_empty)
elif len(weighted_candidates) > need_empty:
# More candidates than we need, dig deeper to act_instances
for host in weighted_candidates:
print("host to be empty: %s" % host)
hosts_to_be_empty.append(host)
if len(hosts_to_be_empty) == need_empty:
break
if len(hosts_to_be_empty) == need_empty:
break
if len(hosts_to_be_empty) != need_empty:
print("we failed to search hosts to be empty!!!")
return hosts_to_be_empty
def make_empty_hosts(self, state):
# TBD, calculate how many nodes can be made empty, now just very simple
# According to where is least pods
weighted_hosts = {}
empty_hosts = []
for host in self.get_compute_hosts():
instances = len(self.instances_by_host(host))
if instances == 0:
self.empty_hosts.append(host)
self.cordon(host)
LOG.info("host %s empty" % host)
else:
if instances not in weighted_hosts:
weighted_hosts[instances] = [host]
else:
weighted_hosts[instances].append(host)
if len(empty_hosts):
# TBD We just need empty host to initial POC testing
return True
else:
need_empty = 1
hosts_to_be_empty = self.find_host_to_be_empty(need_empty,
weighted_hosts)
thrs = []
for host in hosts_to_be_empty:
thrs.append(self.actions_to_have_empty_host(host, state))
# self._wait_host_empty(host)
for thr in thrs:
thr.join()
return True
@run_async
def instance_action(self, instance, state, target_host=None):
if not self.confirm_instance_action(instance, state):
raise Exception('%s: instance %s action %s '
'confirmation failed' %
(self.session_id, instance.instance_id,
instance.action))
# TBD from constraints or override in instance.action
LOG.info('Action %s instance %s ' % (instance.action,
instance.instance_id))
try:
instance_constraints = (
db_api.project_instance_get(instance.instance_id))
group_id = instance_constraints.group_id
instance_group = db_api.instance_group_get(group_id)
if group_id not in self.group_impacted_members:
self.group_impacted_members[group_id] = 0
max_parallel = instance_group.max_impacted_members
LOG.info("%s - instance_group: %s max_impacted_members: %s "
"recovery_time: %s" %
(instance.instance_id, instance_group.group_name,
max_parallel, instance_group.recovery_time))
except db_exc.FenixDBNotFound:
raise Exception('failed to get %s constraints' %
(instance.instance_id))
while max_parallel < self.group_impacted_members[group_id]:
LOG.info('%s waiting in group queue / max_parallel %s/%s' %
(instance.instance_id,
self.group_impacted_members[group_id],
max_parallel))
time.sleep(5)
self.group_impacted_members[group_id] += 1
LOG.debug("%s Reserved / max_impacted_members: %s/%s" %
(instance.instance_id, self.group_impacted_members[group_id],
max_parallel))
if instance.action == 'OWN_ACTION':
pass
elif instance.action == 'EVICTION':
pod = self._pod_by_id(instance.instance_id)
if not self.evict(pod, instance_group.recovery_time):
self.group_impacted_members[group_id] -= 1
LOG.debug("%s Reservation freed. remain / "
"max_impacted_members:%s/%s"
% (instance.instance_id,
self.group_impacted_members[group_id],
max_parallel))
raise Exception('%s: instance %s action '
'%s failed' %
(self.session_id, instance.instance_id,
instance.action))
else:
self.group_impacted_members[group_id] -= 1
LOG.debug("%s Reservation freed. remain / "
"max_impacted_members:%s/%s"
% (instance.instance_id,
self.group_impacted_members[group_id],
max_parallel))
raise Exception('%s: instance %s action '
'%s not supported' %
(self.session_id, instance.instance_id,
instance.action))
# We need to obey recovery time for instance group before
# decrease self.group_impacted_members[group_id] to allow
# one more instances of same group to be affected by any move
# operation
if instance_group.recovery_time > 0:
timer = 'RECOVERY_%s_TIMEOUT' % instance.instance_id
LOG.info("%s wait POD to recover from move..."
% instance.instance_id)
while not self.is_timer_expired(timer):
time.sleep(1)
self.notify_action_done(instance)
self.group_impacted_members[group_id] -= 1
LOG.debug("%s Reservation freed. remain / max_impacted_members: %s/%s"
% (instance.instance_id,
self.group_impacted_members[group_id],
max_parallel))
@run_async
def actions_to_have_empty_host(self, host, state, target_host=None):
# TBD we only support EVICTION of all pods with drain(host)
# Need parallel hosts and make_empty_hosts to calculate
thrs = []
LOG.info('actions_to_have_empty_host %s' % host)
instances = self.instances_by_host(host)
if not instances:
raise Exception('No instances on host: %s' % host)
self.cordon(host)
for instance in instances:
LOG.info('move %s from %s' % (instance.instance_name, host))
thrs.append(self.instance_action(instance, state,
target_host))
# thrs.append(self.confirm_instance_action(instance, state))
for thr in thrs:
thr.join()
if state == 'PLANNED_MAINTENANCE':
self.host_maintenance(host)
def confirm_instance_action(self, instance, state):
instance_id = instance.instance_id
LOG.info('%s to instance %s' % (state, instance_id))
allowed_actions = ['EVICTION', 'OWN_ACTION']
try:
instance_constraints = db_api.project_instance_get(instance_id)
wait_time = instance_constraints.lead_time
LOG.info("%s actions_at from constraints lead_time: %s" %
(instance_id, wait_time))
except db_exc.FenixDBNotFound:
wait_time = self.conf.project_maintenance_reply
actions_at = reply_time_str(wait_time)
reply_at = actions_at
instance.project_state = state
metadata = self.session.meta
retry = 2
replied = False
while not replied:
metadata = self.session.meta
self._project_notify(instance.project_id, [instance_id],
allowed_actions, actions_at, reply_at,
state, metadata)
timer = '%s_%s_TIMEOUT' % (state, instance_id)
self.start_timer(self.conf.project_maintenance_reply, timer)
replied = self.wait_instance_reply_state(state, instance, timer)
if not replied:
if retry == 0:
LOG.info('confirm_instance_action for %s failed after '
'retries' % instance.instance_id)
break
else:
LOG.info('confirm_instance_action for %s retry'
% instance.instance_id)
else:
break
retry -= 1
return replied
def confirm_maintenance_complete(self):
state = 'MAINTENANCE_COMPLETE'
metadata = self.session.meta
actions_at = reply_time_str(self.conf.project_scale_in_reply)
reply_at = actions_at
self.set_projets_state(state)
all_replied = False
project_not_replied = None
retry = 2
while not all_replied:
for project in self.project_names():
if (project_not_replied is not None and project not in
project_not_replied):
continue
LOG.info('%s to project %s' % (state, project))
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
self.session_id,
project)
allowed_actions = []
self._project_notify(project, instance_ids, allowed_actions,
actions_at, reply_at, state, metadata)
self.start_timer(self.conf.project_scale_in_reply,
'%s_TIMEOUT' % state)
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
if not all_replied:
if retry == 0:
LOG.info('confirm_maintenance_complete failed after '
'retries')
break
else:
LOG.info('confirm_maintenance_complete retry')
projects = self.get_projects_with_state()
project_not_replied = (
self._project_names_in_state(projects, state))
retry -= 1
return all_replied
def notify_action_done(self, instance):
instance_ids = [instance.instance_id]
project = instance.project_id
allowed_actions = []
actions_at = None
reply_at = None
state = "INSTANCE_ACTION_DONE"
instance.project_state = state
metadata = "{}"
self._project_notify(project, instance_ids, allowed_actions,
actions_at, reply_at, state, metadata)
def maintenance_by_plugin_type(self, hostname, plugin_type):
aps = self.get_action_plugins_by_type(plugin_type)
session_dir = "%s/%s" % (self.conf.engine.local_cache_dir,
self.session_id)
download_plugin_dir = session_dir + "/actions/"
if aps:
LOG.info("%s: Calling action plug-ins with type %s" %
(self.session_id, plugin_type))
for ap in aps:
ap_name = "fenix.workflow.actions.%s" % ap.plugin
LOG.info("%s: Calling action plug-in module: %s" %
(self.session_id, ap_name))
ap_db_instance = self._create_action_plugin_instance(ap.plugin,
hostname)
try:
action_plugin = getattr(import_module(ap_name),
'ActionPlugin')
ap_instance = action_plugin(self, ap_db_instance)
except ImportError:
download_plugin_file = "%s/%s.py" % (download_plugin_dir,
ap.plugin)
LOG.info("%s: Trying from: %s" % (self.session_id,
download_plugin_file))
if os.path.isfile(download_plugin_file):
ap_instance = (
mod_loader_action_instance(ap_name,
download_plugin_file,
self,
ap_db_instance))
else:
raise Exception('%s: could not find action plugin %s' %
(self.session_id, ap.plugin))
ap_instance.run()
if ap_db_instance.state:
LOG.info('%s: %s finished with %s host %s' %
(self.session_id, ap.plugin,
ap_db_instance.state, hostname))
if 'FAILED' in ap_db_instance.state:
raise Exception('%s: %s finished with %s host %s' %
(self.session_id, ap.plugin,
ap_db_instance.state, hostname))
else:
raise Exception('%s: %s reported no state for host %s' %
(self.session_id, ap.plugin, hostname))
# If ap_db_instance failed, we keep it for state
db_api.remove_action_plugin_instance(ap_db_instance)
else:
LOG.info("%s: No action plug-ins with type %s" %
(self.session_id, plugin_type))
def _wait_host_empty(self, host):
check = 60
pods = self._pods_by_node_and_controller(host, 'ReplicaSet')
while pods:
if check == 0:
raise Exception('Wait empty host %s timout' % host)
elif not check % 5:
LOG.info('...waiting host %s empty' % host)
check -= 1
time.sleep(1)
pods = self._pods_by_node_and_controller(host, 'ReplicaSet')
LOG.info('Host %s empty' % host)
@run_async
def host_maintenance_async(self, hostname):
self.host_maintenance(hostname)
def host_maintenance(self, hostname):
host = self.get_host_by_name(hostname)
if host.type == "compute":
self._wait_host_empty(hostname)
LOG.info('IN_MAINTENANCE %s' % hostname)
self._admin_notify(self.conf.service_user.os_project_name,
hostname,
'IN_MAINTENANCE',
self.session_id)
for plugin_type in ["host", host.type]:
LOG.info('%s: Execute %s action plugins' % (self.session_id,
plugin_type))
self.maintenance_by_plugin_type(hostname, plugin_type)
self._admin_notify(self.conf.service_user.os_project_name,
hostname,
'MAINTENANCE_COMPLETE',
self.session_id)
if host.type == "compute":
self.uncordon(hostname)
LOG.info('MAINTENANCE_COMPLETE %s' % hostname)
host.maintained = True
def maintenance(self):
LOG.info("%s: maintenance called" % self.session_id)
self.initialize_server_info()
time.sleep(1)
self.state('START_MAINTENANCE')
if not self.projects_with_constraints:
self.state('MAINTENANCE_FAILED')
return
if not self.confirm_maintenance():
self.state('MAINTENANCE_FAILED')
return
maintenance_empty_hosts = self.get_empty_computes()
if len(maintenance_empty_hosts) == 0:
if self.need_scale_in():
LOG.info('%s: Need to scale in to get capacity for '
'empty host' % (self.session_id))
self.state('SCALE_IN')
else:
LOG.info('%s: Free capacity, but need empty host' %
(self.session_id))
self.state('PREPARE_MAINTENANCE')
else:
LOG.info('Empty host found')
self.state('START_MAINTENANCE')
if self.session.maintenance_at > datetime.datetime.utcnow():
time_now = time_now_str()
LOG.info('Time now: %s maintenance starts: %s....' %
(time_now, datetime_to_str(self.session.maintenance_at)))
td = self.session.maintenance_at - datetime.datetime.utcnow()
self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT')
while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'):
time.sleep(1)
time_now = time_now_str()
LOG.info('Time to start maintenance: %s' % time_now)
def scale_in(self):
LOG.info("%s: scale in" % self.session_id)
# TBD we just blindly ask to scale_in to have at least one
# empty compute. With NUMA and CPU pinning and together with
# how many instances can be affected at the same time, we should
# calculate and ask scaling of specific instances
if not self.confirm_scale_in():
self.state('MAINTENANCE_FAILED')
return
# TBD it takes time to have proper information updated about free
# capacity. Should make sure instances removed by other means than
# sleeping here
time.sleep(4)
self.update_server_info()
maintenance_empty_hosts = self.get_empty_computes()
if len(maintenance_empty_hosts) == 0:
if self.need_scale_in():
LOG.info('%s: Need to scale in more to get capacity for '
'empty host' % (self.session_id))
self.state('SCALE_IN')
else:
LOG.info('%s: Free capacity, but need empty host' %
(self.session_id))
self.state('PREPARE_MAINTENANCE')
else:
LOG.info('Empty host found')
self.state('START_MAINTENANCE')
def prepare_maintenance(self):
LOG.info("%s: prepare_maintenance called" % self.session_id)
if not self.make_empty_hosts('PREPARE_MAINTENANCE'):
LOG.error('make_empty_hosts failed')
self.state('MAINTENANCE_FAILED')
else:
self.state('START_MAINTENANCE')
self.update_server_info()
def start_maintenance(self):
LOG.info("%s: start_maintenance called" % self.session_id)
empty_hosts = self.get_empty_computes()
if not empty_hosts:
LOG.error("%s: No empty host to be maintained" % self.session_id)
self.state('MAINTENANCE_FAILED')
return
for host_name in self.get_compute_hosts():
self.cordon(host_name)
thrs = []
for host_name in empty_hosts:
# LOG.info("%s: Maintaining %s" % (self.session_id, host_name))
thrs.append(self.host_maintenance_async(host_name))
# LOG.info("%s: Maintained %s" % (self.session_id, host_name))
for thr in thrs:
thr.join()
time.sleep(1)
self.update_server_info()
self.state('PLANNED_MAINTENANCE')
def planned_maintenance(self):
LOG.info("%s: planned_maintenance called" % self.session_id)
maintained_hosts = self.get_maintained_hosts_by_type('compute')
compute_hosts = self.get_compute_hosts()
not_maintained_hosts = ([host for host in compute_hosts if host
not in maintained_hosts])
empty_compute_hosts = self.get_empty_computes()
parallel = len(empty_compute_hosts)
not_maintained = len(not_maintained_hosts)
while not_maintained:
if not_maintained < parallel:
parallel = not_maintained
thrs = []
for index in range(parallel):
shost = not_maintained_hosts[index]
thost = empty_compute_hosts[index]
thrs.append(
self.actions_to_have_empty_host(shost,
'PLANNED_MAINTENANCE',
thost))
for thr in thrs:
thr.join()
empty_compute_hosts = self.get_empty_computes()
del not_maintained_hosts[:parallel]
parallel = len(empty_compute_hosts)
not_maintained = len(not_maintained_hosts)
self.update_server_info()
LOG.info("%s: planned_maintenance done" % self.session_id)
self.state('MAINTENANCE_COMPLETE')
def maintenance_complete(self):
LOG.info("%s: maintenance_complete called" % self.session_id)
LOG.info('%s: Execute post action plugins' % self.session_id)
self.maintenance_by_plugin_type("localhost", "post")
LOG.info('Projects may still need to up scale back to full '
'capcity')
if not self.confirm_maintenance_complete():
self.state('MAINTENANCE_FAILED')
return
self.update_server_info()
self.state('MAINTENANCE_DONE')
def maintenance_done(self):
LOG.info("%s: MAINTENANCE_DONE" % self.session_id)
def maintenance_failed(self):
LOG.info("%s: MAINTENANCE_FAILED" % self.session_id)
def cleanup(self):
LOG.info("%s: cleanup" % self.session_id)
db_api.remove_session(self.session_id)