996 lines
44 KiB
Python
996 lines
44 KiB
Python
# Copyright (c) 2020 Nokia Corporation.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import datetime
|
|
from importlib import import_module
|
|
try:
|
|
from importlib.machinery import SourceFileLoader
|
|
|
|
def mod_loader_action_instance(mname, mpath, session_instance,
|
|
ap_db_instance):
|
|
mi = SourceFileLoader(mname, mpath).load_module()
|
|
return mi.ActionPlugin(session_instance, ap_db_instance)
|
|
except ImportError:
|
|
from imp import load_source
|
|
|
|
def mod_loader_action_instance(mname, mpath, session_instance,
|
|
ap_db_instance):
|
|
mi = load_source(mname, mpath)
|
|
return mi.ActionPlugin(session_instance, ap_db_instance)
|
|
|
|
from keystoneclient import client as ks_client
|
|
from kubernetes import client
|
|
from kubernetes.client.rest import ApiException
|
|
from kubernetes import config
|
|
import os
|
|
from oslo_log import log as logging
|
|
import time
|
|
|
|
from fenix.db import api as db_api
|
|
from fenix.db import exceptions as db_exc
|
|
from fenix.utils.thread import run_async
|
|
from fenix.utils.time import datetime_to_str
|
|
from fenix.utils.time import is_time_after_time
|
|
from fenix.utils.time import reply_time_str
|
|
from fenix.utils.time import time_now_str
|
|
|
|
|
|
from fenix.workflow.workflow import BaseWorkflow
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class Workflow(BaseWorkflow):
|
|
|
|
def __init__(self, conf, session_id, data):
|
|
super(Workflow, self).__init__(conf, session_id, data)
|
|
|
|
config.load_kube_config()
|
|
v_api = client.VersionApi()
|
|
self.kapi = client.CoreV1Api()
|
|
self.ks = ks_client.Client(version='v3', session=self.auth_session)
|
|
LOG.info("%s: initialized with Kubernetes: %s" %
|
|
(self.session_id,
|
|
v_api.get_code_with_http_info()[0].git_version))
|
|
if not data:
|
|
self.hosts = db_api.get_hosts(session_id)
|
|
else:
|
|
self.hosts = self._init_hosts_by_services()
|
|
LOG.info('%s: Execute pre action plugins' % (self.session_id))
|
|
self.maintenance_by_plugin_type("localhost", "pre")
|
|
self.group_impacted_members = {}
|
|
|
|
def _init_hosts_by_services(self):
|
|
LOG.info("%s: Dicovering hosts by services" % self.session_id)
|
|
nodes = self.kapi.list_node().items
|
|
hosts = []
|
|
for node in nodes:
|
|
host = {}
|
|
host['hostname'] = node.metadata.name
|
|
if 'node-role.kubernetes.io/master' in node.metadata.labels.keys():
|
|
host['type'] = 'controller'
|
|
else:
|
|
host['type'] = 'compute'
|
|
|
|
if node.spec.unschedulable:
|
|
host['disabled'] = True
|
|
else:
|
|
host['disabled'] = False
|
|
host['maintained'] = False
|
|
hosts.append(host)
|
|
|
|
return db_api.create_hosts_by_details(self.session_id, hosts)
|
|
|
|
def get_worker_nodes(self):
|
|
nodes = self.kapi.list_node().items
|
|
worker_hosts = self.get_compute_hosts()
|
|
return [n for n in nodes if n.metadata.name in worker_hosts]
|
|
|
|
def is_node_cordoned(self, node_name):
|
|
host = self.get_host_by_name(node_name)
|
|
return host.disabled
|
|
|
|
def cordon(self, node_name):
|
|
LOG.info("%s: cordon %s" % (self.session_id, node_name))
|
|
host = self.get_host_by_name(node_name)
|
|
body = {"apiVersion": "v1", "spec": {"unschedulable": True}}
|
|
self.kapi.patch_node(node_name, body)
|
|
host.disabled = True
|
|
db_api.update_host(host)
|
|
|
|
def uncordon(self, node_name):
|
|
LOG.info("%s: uncordon %s" % (self.session_id, node_name))
|
|
host = self.get_host_by_name(node_name)
|
|
body = {"apiVersion": "v1", "spec": {"unschedulable": None}}
|
|
self.kapi.patch_node(node_name, body)
|
|
host.disabled = False
|
|
db_api.update_host(host)
|
|
|
|
def _pod_by_id(self, pod_id):
|
|
return [p for p in self.kapi.list_pod_for_all_namespaces().items
|
|
if p.metadata.uid == pod_id][0]
|
|
|
|
def _pods_by_node_and_controller(self, node_name, contoller):
|
|
return [p for p in self.kapi.list_pod_for_all_namespaces().items
|
|
if p.metadata.owner_references[0].kind == contoller and
|
|
p.spec.node_name == node_name and
|
|
p.metadata.namespace != 'kube-system']
|
|
|
|
def _pods_by_nodes_and_controller(self, node_names, contoller):
|
|
return [p for p in self.kapi.list_pod_for_all_namespaces().items
|
|
if p.metadata.owner_references[0].kind == contoller and
|
|
p.spec.node_name in node_names and
|
|
p.metadata.namespace != 'kube-system']
|
|
|
|
def _get_pod_by_name_and_namespace(self, name, namespace):
|
|
try:
|
|
pod = self.kapi.read_namespaced_pod(name, namespace)
|
|
except ApiException:
|
|
pod = None
|
|
return pod
|
|
|
|
# TBD remove as deprecated
|
|
def _get_pod_host_and_state(self, name):
|
|
return [(p.spec.node_name, p.status.phase) for p in
|
|
self.kapi.list_pod_for_all_namespaces().items
|
|
if p.metadata.name == name][0]
|
|
|
|
# TBD remove as deprecated
|
|
def wait_pod_evicted(self, name, orig_host, orig_state):
|
|
host, state = self._get_pod_host_and_state(name)
|
|
check = 60
|
|
last_state = orig_state
|
|
last_host = orig_host
|
|
while host == orig_host or state != orig_state:
|
|
if host != last_host or state != last_state:
|
|
# log only if either value changed since last round
|
|
LOG.info("%s: pod: %s %s on host %s" %
|
|
(self.session_id, name, state, host))
|
|
last_state = state
|
|
last_host = host
|
|
if check == 0:
|
|
raise Exception('Pod %s eviction timout' % name)
|
|
check -= 1
|
|
time.sleep(1)
|
|
host, state = self._get_pod_host_and_state(name)
|
|
|
|
# TBD remove as deprecated
|
|
def drain(self, node_name):
|
|
LOG.info("%s: drain %s" % (self.session_id, node_name))
|
|
if not self.is_node_cordoned(node_name):
|
|
self.cordon(node_name)
|
|
for pod in self._pods_by_node_and_controller(node_name,
|
|
'ReplicaSet'):
|
|
namespace = pod.metadata.namespace
|
|
name = pod.metadata.name
|
|
orig_host = pod.spec.node_name
|
|
orig_state = pod.status.phase
|
|
# For now k8s namespace will be the user and project in OpenStack
|
|
# keystone. Keycloak or webhook for keystone should be used
|
|
body = client.V1beta1Eviction()
|
|
body.api_version = "policy/v1beta1"
|
|
body.kind = "Eviction"
|
|
body.metadata = {"name": name, "namespace": namespace}
|
|
LOG.info("%s: Evicting pod: %s %s on host %s" %
|
|
(self.session_id, name, orig_state, orig_host))
|
|
try:
|
|
self.kapi.create_namespaced_pod_eviction(name,
|
|
namespace,
|
|
body)
|
|
except ApiException as e:
|
|
LOG.error("Exception when calling create_namespaced_pod_"
|
|
"eviction: %s\n" % e)
|
|
# self.wait_pod_evicted(name, orig_host, orig_state)
|
|
LOG.info("%s: Evicted pod: %s" % (self.session_id, name))
|
|
# VNFM should keep track of constraints, not Fenix
|
|
# db_api.remove_project_instance(pod_id)
|
|
# self.notify_action_done(self.instance_by_id(pod_id))
|
|
LOG.info("%s: drained %s" % (self.session_id, node_name))
|
|
|
|
def evict(self, pod, recovery_time):
|
|
namespace = pod.metadata.namespace
|
|
name = pod.metadata.name
|
|
pod_id = pod.metadata.uid
|
|
LOG.info("%s: Evict: %s: %s" % (self.session_id, pod_id, name))
|
|
orig_host = pod.spec.node_name
|
|
orig_state = pod.status.phase
|
|
# For now k8s namespace will be the user and project in OpenStack
|
|
# keystone. Keycloak or webhook for keystone should be used
|
|
body = client.V1beta1Eviction()
|
|
body.api_version = "policy/v1beta1"
|
|
body.kind = "Eviction"
|
|
body.metadata = {"name": name,
|
|
"namespace": namespace}
|
|
|
|
LOG.info("%s: Evicting pod: %s %s on host %s" %
|
|
(self.session_id, name, orig_state, orig_host))
|
|
try:
|
|
self.kapi.create_namespaced_pod_eviction(name,
|
|
namespace,
|
|
body)
|
|
except ApiException as e:
|
|
LOG.error("Exception when calling create_namespaced_pod_"
|
|
"eviction: %s\n" % e)
|
|
# Need to start timer to wait new POD initialization with recovery time
|
|
# TBD this might first check new POD STATUS == running and then
|
|
# still wait instance_group.recovery_time. This might be tricky as we
|
|
# do not know new POD. We might check new pods, but there might be more
|
|
# than one becasue parallel actions. Somehow we would need to be able
|
|
# to map evicted POD for new to make this enhancement
|
|
# tried adding "labels": {"previous_pod_id": pod_id} in above body.
|
|
# that did not result to this label to be in new POD
|
|
timer = 'RECOVERY_%s_TIMEOUT' % pod_id
|
|
self.start_timer(recovery_time, timer)
|
|
time.sleep(1)
|
|
pod = self._get_pod_by_name_and_namespace(name, namespace)
|
|
check = 40
|
|
LOG.info("%s: Waiting pod: %s eviction from host %s ..." %
|
|
(self.session_id, name, orig_host))
|
|
while pod:
|
|
if check == 0:
|
|
raise Exception('Pod %s still not deleted in eviction' % name)
|
|
check -= 1
|
|
time.sleep(1)
|
|
pod = self._get_pod_by_name_and_namespace(name, namespace)
|
|
LOG.info("%s: Evicted pod: %s: %s" % (self.session_id, pod_id, name))
|
|
return True
|
|
|
|
def _fenix_instance(self, project_id, instance_id, instance_name, host,
|
|
state, details=None, action=None, project_state=None,
|
|
action_done=False):
|
|
instance = {'session_id': self.session_id,
|
|
'instance_id': instance_id,
|
|
'action': action,
|
|
'project_id': project_id,
|
|
'instance_id': instance_id,
|
|
'project_state': project_state,
|
|
'state': state,
|
|
'instance_name': instance_name,
|
|
'action_done': action_done,
|
|
'host': host,
|
|
'details': details}
|
|
return instance
|
|
|
|
def initialize_server_info(self):
|
|
project_ids = {}
|
|
instances = []
|
|
worker_hosts = self.get_compute_hosts()
|
|
pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet')
|
|
for pod in pods:
|
|
host = pod.spec.node_name
|
|
# Map K8S namespace as user and project in keystone
|
|
if pod.metadata.namespace not in project_ids.keys():
|
|
project_id = str(self.ks.projects.list(
|
|
name=pod.metadata.namespace)[0].id)
|
|
project_ids[pod.metadata.namespace] = project_id
|
|
else:
|
|
project_id = project_ids[pod.metadata.namespace]
|
|
instance_name = pod.metadata.name
|
|
instance_id = pod.metadata.uid
|
|
state = pod.status.phase # Running
|
|
instances.append(self._fenix_instance(project_id, instance_id,
|
|
instance_name, host, state))
|
|
if project_ids:
|
|
self.projects = self.init_projects(project_ids.values())
|
|
else:
|
|
LOG.info('%s: No projects on nodes under maintenance' %
|
|
self.session_id)
|
|
if len(instances):
|
|
self.instances = self.add_instances(instances)
|
|
else:
|
|
LOG.info('%s: No instances on nodes under maintenance' %
|
|
self.session_id)
|
|
LOG.info(str(self))
|
|
|
|
def update_instance(self, project_id, instance_id, instance_name, host,
|
|
state, details=None):
|
|
if self.instance_id_found(instance_id):
|
|
# TBD Might need to update instance variables here if not done
|
|
# somewhere else
|
|
return
|
|
elif self.instance_name_found(instance_name):
|
|
# Project has made re-instantiation, remove old add new
|
|
old_instance = self.instance_by_name(instance_name)
|
|
instance = self._fenix_instance(project_id, instance_id,
|
|
instance_name, host,
|
|
state, details,
|
|
old_instance.action,
|
|
old_instance.project_state,
|
|
old_instance.action_done)
|
|
self.instances.append(self.add_instance(instance))
|
|
self.remove_instance(old_instance)
|
|
else:
|
|
# Instance new, as project has added instances
|
|
instance = self._fenix_instance(project_id, instance_id,
|
|
instance_name, host,
|
|
state, details)
|
|
self.instances.append(self.add_instance(instance))
|
|
|
|
def remove_non_existing_instances(self, instance_ids):
|
|
remove_instances = [instance for instance in
|
|
self.instances if instance.instance_id not in
|
|
instance_ids]
|
|
for instance in remove_instances:
|
|
# Instance deleted, as project possibly scaled down
|
|
self.remove_instance(instance)
|
|
|
|
def update_server_info(self):
|
|
# TBD This keeps internal instance information up-to-date and prints
|
|
# it out. Same could be done by updating the information when changed
|
|
# Anyhow this also double checks information against K8S
|
|
project_ids = {}
|
|
instance_ids = []
|
|
worker_hosts = self.get_compute_hosts()
|
|
pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet')
|
|
for pod in pods:
|
|
host = pod.spec.node_name
|
|
# Map K8S namespace as user and project in keystone
|
|
if pod.metadata.namespace not in project_ids.keys():
|
|
project_id = self.ks.projects.list(
|
|
name=pod.metadata.namespace)[0].id
|
|
project_ids[pod.metadata.namespace] = project_id
|
|
else:
|
|
project_id = project_ids[pod.metadata.namespace]
|
|
instance_name = pod.metadata.name
|
|
instance_id = pod.metadata.uid
|
|
state = pod.status.phase # Running
|
|
details = None
|
|
self.update_instance(project_id, instance_id, instance_name, host,
|
|
state, details)
|
|
instance_ids.append(instance_id)
|
|
self.remove_non_existing_instances(instance_ids)
|
|
LOG.info(str(self))
|
|
|
|
def projects_with_constraints(self):
|
|
project_ids = self.project_ids_with_instance_group()
|
|
for project_id in self.projects():
|
|
if project_id not in project_ids:
|
|
LOG.error('%s: project_id %s not '
|
|
'set any instance_group' %
|
|
(self.session_id, project_id))
|
|
return False
|
|
return True
|
|
|
|
def confirm_maintenance(self):
|
|
allowed_actions = []
|
|
actions_at = self.session.maintenance_at
|
|
state = 'MAINTENANCE'
|
|
self.set_projets_state(state)
|
|
all_replied = False
|
|
project_not_replied = None
|
|
retry = 2
|
|
while not all_replied:
|
|
for project in self.project_names():
|
|
if (project_not_replied is not None and project not in
|
|
project_not_replied):
|
|
continue
|
|
LOG.info('\nMAINTENANCE to project %s\n' % project)
|
|
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
|
|
self.session_id,
|
|
project)
|
|
reply_at = reply_time_str(self.conf.project_maintenance_reply)
|
|
if is_time_after_time(reply_at, actions_at):
|
|
LOG.error('%s: No time for project to answer in state: %s'
|
|
% (self.session_id, state))
|
|
self.state("MAINTENANCE_FAILED")
|
|
return False
|
|
metadata = self.session.meta
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
self.start_timer(self.conf.project_maintenance_reply,
|
|
'MAINTENANCE_TIMEOUT')
|
|
|
|
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
|
|
if not all_replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_maintenance failed after retries')
|
|
break
|
|
else:
|
|
LOG.info('confirm_maintenance retry')
|
|
projects = self.get_projects_with_state()
|
|
project_not_replied = (
|
|
self._project_names_in_state(projects, state))
|
|
retry -= 1
|
|
return all_replied
|
|
|
|
def worker_nodes_cpu_info(self, system_reserved):
|
|
# TBD system_reserved is now just questimated according to what
|
|
# flannel, kubelet... needs on top of pods
|
|
workers_info = {}
|
|
worker_hosts = self.get_compute_hosts()
|
|
workers = self.get_worker_nodes()
|
|
pods = self._pods_by_nodes_and_controller(worker_hosts, 'ReplicaSet')
|
|
for worker in workers:
|
|
cpus = int(worker.status.capacity[u'cpu']) - system_reserved
|
|
name = worker.metadata.name
|
|
workers_info[name] = {'cpus_used': 0,
|
|
'cpus': cpus,
|
|
'name': name}
|
|
for pod in [p for p in pods if p.spec.node_name == name]:
|
|
|
|
cpus_used = 0
|
|
for container in pod.spec.containers:
|
|
try:
|
|
cpus_used += int(container.resources.requests[u'cpu'])
|
|
except AttributeError:
|
|
# container does not need to have
|
|
# resources.requests.cpu
|
|
pass
|
|
if cpus_used > 0:
|
|
workers_info[name]['cpus_used'] += cpus_used
|
|
if workers_info[name]['cpus_used'] > workers_info[name]['cpus']:
|
|
LOG.error('%s overbooked: %s' %
|
|
(name, workers_info[name]))
|
|
raise Exception('%s overbooked: %s' %
|
|
(name, workers_info[name]))
|
|
LOG.info('workers_info:\n%s' % workers_info)
|
|
return workers_info
|
|
|
|
def confirm_scale_in(self):
|
|
allowed_actions = []
|
|
actions_at = reply_time_str(self.conf.project_scale_in_reply)
|
|
reply_at = actions_at
|
|
state = 'SCALE_IN'
|
|
self.set_projets_state(state)
|
|
all_replied = False
|
|
project_not_replied = None
|
|
retry = 2
|
|
while not all_replied:
|
|
for project in self.project_names():
|
|
if (project_not_replied is not None and project not in
|
|
project_not_replied):
|
|
continue
|
|
LOG.info('\nSCALE_IN to project %s\n' % project)
|
|
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
|
|
self.session_id,
|
|
project)
|
|
metadata = self.session.meta
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
self.start_timer(self.conf.project_scale_in_reply,
|
|
'SCALE_IN_TIMEOUT')
|
|
|
|
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
|
|
if not all_replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_scale_in failed after retries')
|
|
break
|
|
else:
|
|
LOG.info('confirm_scale_in retry')
|
|
projects = self.get_projects_with_state()
|
|
project_not_replied = (
|
|
self._project_names_in_state(projects, state))
|
|
retry -= 1
|
|
return all_replied
|
|
|
|
def need_scale_in(self):
|
|
# TBD see if there is enough free capacity, so we do not need to scale
|
|
# TBD this should be calculated according to instance and
|
|
# instance_group constraints
|
|
workers_info = self.worker_nodes_cpu_info(2)
|
|
prev_cpus = 0
|
|
free_cpus = 0
|
|
prev_hostname = ''
|
|
LOG.info('checking workers CPU capacity')
|
|
for worker in workers_info.values():
|
|
hostname = worker['name']
|
|
cpus = worker['cpus']
|
|
cpus_used = worker['cpus_used']
|
|
if prev_cpus != 0 and prev_cpus != cpus:
|
|
raise Exception('%s: %d cpus on %s does not match to'
|
|
'%d on %s'
|
|
% (self.session_id, cpus, hostname,
|
|
prev_cpus, prev_hostname))
|
|
free_cpus += cpus - cpus_used
|
|
prev_cpus = cpus
|
|
prev_hostname = hostname
|
|
if free_cpus >= cpus:
|
|
# TBD cpu capacity might be too scattered so moving instances from
|
|
# one host to other host still might not succeed. At least with
|
|
# NUMA and CPU pinning, one should calculate and ask specific
|
|
# instances
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def find_host_to_be_empty(self, need_empty, weighted_hosts):
|
|
print("need_empty: %s" % need_empty)
|
|
hosts_to_be_empty = []
|
|
for instances in sorted(weighted_hosts.keys()):
|
|
print("instances in weighted_hosts: %s" % instances)
|
|
weighted_candidates = weighted_hosts[instances]
|
|
if len(weighted_candidates) == need_empty:
|
|
# Happened to be exact match to needed
|
|
hosts_to_be_empty = weighted_hosts[instances]
|
|
print("hosts to be empty: %s" % hosts_to_be_empty)
|
|
elif len(weighted_candidates) > need_empty:
|
|
# More candidates than we need, dig deeper to act_instances
|
|
for host in weighted_candidates:
|
|
print("host to be empty: %s" % host)
|
|
hosts_to_be_empty.append(host)
|
|
if len(hosts_to_be_empty) == need_empty:
|
|
break
|
|
if len(hosts_to_be_empty) == need_empty:
|
|
break
|
|
if len(hosts_to_be_empty) != need_empty:
|
|
print("we failed to search hosts to be empty!!!")
|
|
return hosts_to_be_empty
|
|
|
|
def make_empty_hosts(self, state):
|
|
# TBD, calculate how many nodes can be made empty, now just very simple
|
|
# According to where is least pods
|
|
weighted_hosts = {}
|
|
empty_hosts = []
|
|
for host in self.get_compute_hosts():
|
|
instances = len(self.instances_by_host(host))
|
|
if instances == 0:
|
|
self.empty_hosts.append(host)
|
|
self.cordon(host)
|
|
LOG.info("host %s empty" % host)
|
|
else:
|
|
if instances not in weighted_hosts:
|
|
weighted_hosts[instances] = [host]
|
|
else:
|
|
weighted_hosts[instances].append(host)
|
|
if len(empty_hosts):
|
|
# TBD We just need empty host to initial POC testing
|
|
return True
|
|
else:
|
|
need_empty = 1
|
|
hosts_to_be_empty = self.find_host_to_be_empty(need_empty,
|
|
weighted_hosts)
|
|
thrs = []
|
|
for host in hosts_to_be_empty:
|
|
thrs.append(self.actions_to_have_empty_host(host, state))
|
|
# self._wait_host_empty(host)
|
|
for thr in thrs:
|
|
thr.join()
|
|
return True
|
|
|
|
@run_async
|
|
def instance_action(self, instance, state, target_host=None):
|
|
if not self.confirm_instance_action(instance, state):
|
|
raise Exception('%s: instance %s action %s '
|
|
'confirmation failed' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
# TBD from constraints or override in instance.action
|
|
LOG.info('Action %s instance %s ' % (instance.action,
|
|
instance.instance_id))
|
|
try:
|
|
instance_constraints = (
|
|
db_api.project_instance_get(instance.instance_id))
|
|
group_id = instance_constraints.group_id
|
|
instance_group = db_api.instance_group_get(group_id)
|
|
if group_id not in self.group_impacted_members:
|
|
self.group_impacted_members[group_id] = 0
|
|
max_parallel = instance_group.max_impacted_members
|
|
LOG.info("%s - instance_group: %s max_impacted_members: %s "
|
|
"recovery_time: %s" %
|
|
(instance.instance_id, instance_group.group_name,
|
|
max_parallel, instance_group.recovery_time))
|
|
except db_exc.FenixDBNotFound:
|
|
raise Exception('failed to get %s constraints' %
|
|
(instance.instance_id))
|
|
while max_parallel < self.group_impacted_members[group_id]:
|
|
LOG.info('%s waiting in group queue / max_parallel %s/%s' %
|
|
(instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
time.sleep(5)
|
|
self.group_impacted_members[group_id] += 1
|
|
LOG.debug("%s Reserved / max_impacted_members: %s/%s" %
|
|
(instance.instance_id, self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
if instance.action == 'OWN_ACTION':
|
|
pass
|
|
elif instance.action == 'EVICTION':
|
|
pod = self._pod_by_id(instance.instance_id)
|
|
if not self.evict(pod, instance_group.recovery_time):
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / "
|
|
"max_impacted_members:%s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
raise Exception('%s: instance %s action '
|
|
'%s failed' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
else:
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / "
|
|
"max_impacted_members:%s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
raise Exception('%s: instance %s action '
|
|
'%s not supported' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
# We need to obey recovery time for instance group before
|
|
# decrease self.group_impacted_members[group_id] to allow
|
|
# one more instances of same group to be affected by any move
|
|
# operation
|
|
if instance_group.recovery_time > 0:
|
|
timer = 'RECOVERY_%s_TIMEOUT' % instance.instance_id
|
|
LOG.info("%s wait POD to recover from move..."
|
|
% instance.instance_id)
|
|
while not self.is_timer_expired(timer):
|
|
time.sleep(1)
|
|
self.notify_action_done(instance)
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / max_impacted_members: %s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
|
|
@run_async
|
|
def actions_to_have_empty_host(self, host, state, target_host=None):
|
|
# TBD we only support EVICTION of all pods with drain(host)
|
|
# Need parallel hosts and make_empty_hosts to calculate
|
|
thrs = []
|
|
LOG.info('actions_to_have_empty_host %s' % host)
|
|
instances = self.instances_by_host(host)
|
|
if not instances:
|
|
raise Exception('No instances on host: %s' % host)
|
|
self.cordon(host)
|
|
for instance in instances:
|
|
LOG.info('move %s from %s' % (instance.instance_name, host))
|
|
thrs.append(self.instance_action(instance, state,
|
|
target_host))
|
|
# thrs.append(self.confirm_instance_action(instance, state))
|
|
for thr in thrs:
|
|
thr.join()
|
|
if state == 'PLANNED_MAINTENANCE':
|
|
self.host_maintenance(host)
|
|
|
|
def confirm_instance_action(self, instance, state):
|
|
instance_id = instance.instance_id
|
|
LOG.info('%s to instance %s' % (state, instance_id))
|
|
allowed_actions = ['EVICTION', 'OWN_ACTION']
|
|
try:
|
|
instance_constraints = db_api.project_instance_get(instance_id)
|
|
wait_time = instance_constraints.lead_time
|
|
LOG.info("%s actions_at from constraints lead_time: %s" %
|
|
(instance_id, wait_time))
|
|
except db_exc.FenixDBNotFound:
|
|
wait_time = self.conf.project_maintenance_reply
|
|
actions_at = reply_time_str(wait_time)
|
|
reply_at = actions_at
|
|
instance.project_state = state
|
|
db_api.update_instance(instance)
|
|
metadata = self.session.meta
|
|
retry = 2
|
|
replied = False
|
|
while not replied:
|
|
metadata = self.session.meta
|
|
self._project_notify(instance.project_id, [instance_id],
|
|
allowed_actions, actions_at, reply_at,
|
|
state, metadata)
|
|
timer = '%s_%s_TIMEOUT' % (state, instance_id)
|
|
self.start_timer(self.conf.project_maintenance_reply, timer)
|
|
replied = self.wait_instance_reply_state(state, instance, timer)
|
|
if not replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_instance_action for %s failed after '
|
|
'retries' % instance.instance_id)
|
|
break
|
|
else:
|
|
LOG.info('confirm_instance_action for %s retry'
|
|
% instance.instance_id)
|
|
else:
|
|
break
|
|
retry -= 1
|
|
return replied
|
|
|
|
def confirm_maintenance_complete(self):
|
|
state = 'MAINTENANCE_COMPLETE'
|
|
metadata = self.session.meta
|
|
actions_at = reply_time_str(self.conf.project_scale_in_reply)
|
|
reply_at = actions_at
|
|
self.set_projets_state(state)
|
|
all_replied = False
|
|
project_not_replied = None
|
|
retry = 2
|
|
while not all_replied:
|
|
for project in self.project_names():
|
|
if (project_not_replied is not None and project not in
|
|
project_not_replied):
|
|
continue
|
|
LOG.info('%s to project %s' % (state, project))
|
|
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
|
|
self.session_id,
|
|
project)
|
|
allowed_actions = []
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
self.start_timer(self.conf.project_scale_in_reply,
|
|
'%s_TIMEOUT' % state)
|
|
|
|
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
|
|
if not all_replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_maintenance_complete failed after '
|
|
'retries')
|
|
break
|
|
else:
|
|
LOG.info('confirm_maintenance_complete retry')
|
|
projects = self.get_projects_with_state()
|
|
project_not_replied = (
|
|
self._project_names_in_state(projects, state))
|
|
retry -= 1
|
|
return all_replied
|
|
|
|
def notify_action_done(self, instance):
|
|
instance_ids = [instance.instance_id]
|
|
project = instance.project_id
|
|
allowed_actions = []
|
|
actions_at = None
|
|
reply_at = None
|
|
state = "INSTANCE_ACTION_DONE"
|
|
instance.project_state = state
|
|
db_api.update_instance(instance)
|
|
metadata = "{}"
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
|
|
def maintenance_by_plugin_type(self, hostname, plugin_type):
|
|
aps = self.get_action_plugins_by_type(plugin_type)
|
|
session_dir = "%s/%s" % (self.conf.engine.local_cache_dir,
|
|
self.session_id)
|
|
download_plugin_dir = session_dir + "/actions/"
|
|
if aps:
|
|
LOG.info("%s: Calling action plug-ins with type %s" %
|
|
(self.session_id, plugin_type))
|
|
for ap in aps:
|
|
ap_name = "fenix.workflow.actions.%s" % ap.plugin
|
|
LOG.info("%s: Calling action plug-in module: %s" %
|
|
(self.session_id, ap_name))
|
|
ap_db_instance = self._create_action_plugin_instance(ap.plugin,
|
|
hostname)
|
|
try:
|
|
action_plugin = getattr(import_module(ap_name),
|
|
'ActionPlugin')
|
|
ap_instance = action_plugin(self, ap_db_instance)
|
|
except ImportError:
|
|
download_plugin_file = "%s/%s.py" % (download_plugin_dir,
|
|
ap.plugin)
|
|
LOG.info("%s: Trying from: %s" % (self.session_id,
|
|
download_plugin_file))
|
|
if os.path.isfile(download_plugin_file):
|
|
ap_instance = (
|
|
mod_loader_action_instance(ap_name,
|
|
download_plugin_file,
|
|
self,
|
|
ap_db_instance))
|
|
else:
|
|
raise Exception('%s: could not find action plugin %s' %
|
|
(self.session_id, ap.plugin))
|
|
|
|
ap_instance.run()
|
|
if ap_db_instance.state:
|
|
LOG.info('%s: %s finished with %s host %s' %
|
|
(self.session_id, ap.plugin,
|
|
ap_db_instance.state, hostname))
|
|
if 'FAILED' in ap_db_instance.state:
|
|
raise Exception('%s: %s finished with %s host %s' %
|
|
(self.session_id, ap.plugin,
|
|
ap_db_instance.state, hostname))
|
|
else:
|
|
raise Exception('%s: %s reported no state for host %s' %
|
|
(self.session_id, ap.plugin, hostname))
|
|
# If ap_db_instance failed, we keep it for state
|
|
db_api.remove_action_plugin_instance(ap_db_instance)
|
|
else:
|
|
LOG.info("%s: No action plug-ins with type %s" %
|
|
(self.session_id, plugin_type))
|
|
|
|
def _wait_host_empty(self, host):
|
|
check = 60
|
|
pods = self._pods_by_node_and_controller(host, 'ReplicaSet')
|
|
while pods:
|
|
if check == 0:
|
|
raise Exception('Wait empty host %s timout' % host)
|
|
elif not check % 5:
|
|
LOG.info('...waiting host %s empty' % host)
|
|
check -= 1
|
|
time.sleep(1)
|
|
pods = self._pods_by_node_and_controller(host, 'ReplicaSet')
|
|
LOG.info('Host %s empty' % host)
|
|
|
|
@run_async
|
|
def host_maintenance_async(self, hostname):
|
|
self.host_maintenance(hostname)
|
|
|
|
def host_maintenance(self, hostname):
|
|
host = self.get_host_by_name(hostname)
|
|
if host.type == "compute":
|
|
self._wait_host_empty(hostname)
|
|
LOG.info('IN_MAINTENANCE %s' % hostname)
|
|
self._admin_notify(hostname,
|
|
'IN_MAINTENANCE',
|
|
self.session_id)
|
|
for plugin_type in ["host", host.type]:
|
|
LOG.info('%s: Execute %s action plugins' % (self.session_id,
|
|
plugin_type))
|
|
self.maintenance_by_plugin_type(hostname, plugin_type)
|
|
self._admin_notify(hostname,
|
|
'MAINTENANCE_COMPLETE',
|
|
self.session_id)
|
|
if host.type == "compute":
|
|
self.uncordon(hostname)
|
|
LOG.info('MAINTENANCE_COMPLETE %s' % hostname)
|
|
host.maintained = True
|
|
db_api.update_host(host)
|
|
self._session_notify(self.session.state,
|
|
self.get_maintained_percent(),
|
|
self.session_id)
|
|
|
|
def maintenance(self):
|
|
LOG.info("%s: maintenance called" % self.session_id)
|
|
self.initialize_server_info()
|
|
time.sleep(1)
|
|
self.state('START_MAINTENANCE')
|
|
|
|
if not self.projects_with_constraints:
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
|
|
if not self.confirm_maintenance():
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
|
|
maintenance_empty_hosts = self.get_empty_computes()
|
|
|
|
if len(maintenance_empty_hosts) == 0:
|
|
if self.need_scale_in():
|
|
LOG.info('%s: Need to scale in to get capacity for '
|
|
'empty host' % (self.session_id))
|
|
self.state('SCALE_IN')
|
|
else:
|
|
LOG.info('%s: Free capacity, but need empty host' %
|
|
(self.session_id))
|
|
self.state('PREPARE_MAINTENANCE')
|
|
else:
|
|
LOG.info('Empty host found')
|
|
self.state('START_MAINTENANCE')
|
|
|
|
if self.session.maintenance_at > datetime.datetime.utcnow():
|
|
time_now = time_now_str()
|
|
LOG.info('Time now: %s maintenance starts: %s....' %
|
|
(time_now, datetime_to_str(self.session.maintenance_at)))
|
|
td = self.session.maintenance_at - datetime.datetime.utcnow()
|
|
self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT')
|
|
while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'):
|
|
time.sleep(1)
|
|
|
|
time_now = time_now_str()
|
|
LOG.info('Time to start maintenance: %s' % time_now)
|
|
|
|
def scale_in(self):
|
|
LOG.info("%s: scale in" % self.session_id)
|
|
# TBD we just blindly ask to scale_in to have at least one
|
|
# empty compute. With NUMA and CPU pinning and together with
|
|
# how many instances can be affected at the same time, we should
|
|
# calculate and ask scaling of specific instances
|
|
if not self.confirm_scale_in():
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
# TBD it takes time to have proper information updated about free
|
|
# capacity. Should make sure instances removed by other means than
|
|
# sleeping here
|
|
time.sleep(4)
|
|
self.update_server_info()
|
|
maintenance_empty_hosts = self.get_empty_computes()
|
|
|
|
if len(maintenance_empty_hosts) == 0:
|
|
if self.need_scale_in():
|
|
LOG.info('%s: Need to scale in more to get capacity for '
|
|
'empty host' % (self.session_id))
|
|
self.state('SCALE_IN')
|
|
else:
|
|
LOG.info('%s: Free capacity, but need empty host' %
|
|
(self.session_id))
|
|
self.state('PREPARE_MAINTENANCE')
|
|
else:
|
|
LOG.info('Empty host found')
|
|
self.state('START_MAINTENANCE')
|
|
|
|
def prepare_maintenance(self):
|
|
LOG.info("%s: prepare_maintenance called" % self.session_id)
|
|
if not self.make_empty_hosts('PREPARE_MAINTENANCE'):
|
|
LOG.error('make_empty_hosts failed')
|
|
self.state('MAINTENANCE_FAILED')
|
|
else:
|
|
self.state('START_MAINTENANCE')
|
|
self.update_server_info()
|
|
|
|
def start_maintenance(self):
|
|
LOG.info("%s: start_maintenance called" % self.session_id)
|
|
empty_hosts = self.get_empty_computes()
|
|
if not empty_hosts:
|
|
LOG.error("%s: No empty host to be maintained" % self.session_id)
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
for host_name in self.get_compute_hosts():
|
|
self.cordon(host_name)
|
|
for host in self.get_controller_hosts():
|
|
# TBD one might need to change this. Now all controllers
|
|
# maintenance serialized
|
|
self.host_maintenance(host)
|
|
thrs = []
|
|
for host_name in empty_hosts:
|
|
# LOG.info("%s: Maintaining %s" % (self.session_id, host_name))
|
|
thrs.append(self.host_maintenance_async(host_name))
|
|
# LOG.info("%s: Maintained %s" % (self.session_id, host_name))
|
|
for thr in thrs:
|
|
thr.join()
|
|
time.sleep(1)
|
|
self.update_server_info()
|
|
self.state('PLANNED_MAINTENANCE')
|
|
|
|
def planned_maintenance(self):
|
|
LOG.info("%s: planned_maintenance called" % self.session_id)
|
|
maintained_hosts = self.get_maintained_hosts_by_type('compute')
|
|
compute_hosts = self.get_compute_hosts()
|
|
not_maintained_hosts = ([host for host in compute_hosts if host
|
|
not in maintained_hosts])
|
|
empty_compute_hosts = self.get_empty_computes()
|
|
parallel = len(empty_compute_hosts)
|
|
not_maintained = len(not_maintained_hosts)
|
|
while not_maintained:
|
|
if not_maintained < parallel:
|
|
parallel = not_maintained
|
|
thrs = []
|
|
for index in range(parallel):
|
|
shost = not_maintained_hosts[index]
|
|
thost = empty_compute_hosts[index]
|
|
thrs.append(
|
|
self.actions_to_have_empty_host(shost,
|
|
'PLANNED_MAINTENANCE',
|
|
thost))
|
|
for thr in thrs:
|
|
thr.join()
|
|
empty_compute_hosts = self.get_empty_computes()
|
|
del not_maintained_hosts[:parallel]
|
|
parallel = len(empty_compute_hosts)
|
|
not_maintained = len(not_maintained_hosts)
|
|
self.update_server_info()
|
|
|
|
LOG.info("%s: planned_maintenance done" % self.session_id)
|
|
self.state('MAINTENANCE_COMPLETE')
|
|
|
|
def maintenance_complete(self):
|
|
LOG.info("%s: maintenance_complete called" % self.session_id)
|
|
LOG.info('%s: Execute post action plugins' % self.session_id)
|
|
self.maintenance_by_plugin_type("localhost", "post")
|
|
LOG.info('Projects may still need to up scale back to full '
|
|
'capcity')
|
|
if not self.confirm_maintenance_complete():
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
self.update_server_info()
|
|
self.state('MAINTENANCE_DONE')
|
|
|
|
def maintenance_done(self):
|
|
LOG.info("%s: MAINTENANCE_DONE" % self.session_id)
|
|
|
|
def maintenance_failed(self):
|
|
LOG.info("%s: MAINTENANCE_FAILED" % self.session_id)
|
|
|
|
def cleanup(self):
|
|
LOG.info("%s: cleanup" % self.session_id)
|
|
db_api.remove_session(self.session_id)
|