1114 lines
51 KiB
Python
1114 lines
51 KiB
Python
# Copyright (c) 2019 OpenStack Foundation.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import datetime
|
|
from importlib import import_module
|
|
try:
|
|
from importlib.machinery import SourceFileLoader
|
|
|
|
def mod_loader_action_instance(mname, mpath, session_instance,
|
|
ap_db_instance):
|
|
mi = SourceFileLoader(mname, mpath).load_module()
|
|
return mi.ActionPlugin(session_instance, ap_db_instance)
|
|
except ImportError:
|
|
from imp import load_source
|
|
|
|
def mod_loader_action_instance(mname, mpath, session_instance,
|
|
ap_db_instance):
|
|
mi = load_source(mname, mpath)
|
|
return mi.ActionPlugin(session_instance, ap_db_instance)
|
|
|
|
from novaclient import API_MAX_VERSION as nova_max_version
|
|
import novaclient.client as novaclient
|
|
from novaclient.exceptions import BadRequest
|
|
|
|
import os
|
|
from oslo_log import log as logging
|
|
import time
|
|
|
|
from fenix.db import api as db_api
|
|
from fenix.db import exceptions as db_exc
|
|
from fenix.utils.thread import run_async
|
|
from fenix.utils.time import datetime_to_str
|
|
from fenix.utils.time import is_time_after_time
|
|
from fenix.utils.time import reply_time_str
|
|
from fenix.utils.time import time_now_str
|
|
|
|
|
|
from fenix.workflow.workflow import BaseWorkflow
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class Workflow(BaseWorkflow):
|
|
|
|
def __init__(self, conf, session_id, data):
|
|
super(Workflow, self).__init__(conf, session_id, data)
|
|
nova_version = 2.53
|
|
self.nova = novaclient.Client(nova_version, session=self.auth_session)
|
|
max_nova_server_ver = float(self.nova.versions.get_current().version)
|
|
max_nova_client_ver = float(nova_max_version.get_string())
|
|
if max_nova_server_ver > 2.53 and max_nova_client_ver > 2.53:
|
|
if max_nova_client_ver <= max_nova_server_ver:
|
|
nova_version = max_nova_client_ver
|
|
else:
|
|
nova_version = max_nova_server_ver
|
|
self.nova = novaclient.Client(nova_version,
|
|
session=self.auth_session)
|
|
if not self.hosts:
|
|
self.hosts = self._init_hosts_by_services()
|
|
else:
|
|
self._init_update_hosts()
|
|
LOG.info("%s: initialized. Nova version %f" % (self.session_id,
|
|
nova_version))
|
|
|
|
LOG.info('%s: Execute pre action plugins' % (self.session_id))
|
|
self.maintenance_by_plugin_type("localhost", "pre")
|
|
# How many members of each instance group are currently affected
|
|
self.group_impacted_members = {}
|
|
|
|
def _init_hosts_by_services(self):
|
|
LOG.info("%s: Dicovering hosts by Nova services" % self.session_id)
|
|
hosts = []
|
|
hostnames = []
|
|
controllers = self.nova.services.list(binary='nova-conductor')
|
|
for controller in controllers:
|
|
host = {}
|
|
service_host = str(controller.__dict__.get(u'host'))
|
|
if service_host in hostnames:
|
|
continue
|
|
host['hostname'] = service_host
|
|
hostnames.append(service_host)
|
|
host['type'] = 'controller'
|
|
if str(controller.__dict__.get(u'status')) == 'disabled':
|
|
LOG.error("%s: %s nova-conductor disabled before maintenance"
|
|
% (self.session_id, service_host))
|
|
raise Exception("%s: %s already disabled"
|
|
% (self.session_id, service_host))
|
|
host['disabled'] = False
|
|
host['details'] = str(controller.__dict__.get(u'id'))
|
|
host['maintained'] = False
|
|
hosts.append(host)
|
|
|
|
computes = self.nova.services.list(binary='nova-compute')
|
|
for compute in computes:
|
|
host = {}
|
|
service_host = str(compute.__dict__.get(u'host'))
|
|
host['hostname'] = service_host
|
|
host['type'] = 'compute'
|
|
if str(compute.__dict__.get(u'status')) == 'disabled':
|
|
LOG.error("%s: %s nova-compute disabled before maintenance"
|
|
% (self.session_id, service_host))
|
|
raise Exception("%s: %s already disabled"
|
|
% (self.session_id, service_host))
|
|
host['disabled'] = False
|
|
host['details'] = str(compute.__dict__.get(u'id'))
|
|
host['maintained'] = False
|
|
hosts.append(host)
|
|
|
|
return db_api.create_hosts_by_details(self.session_id, hosts)
|
|
|
|
def _init_update_hosts(self):
|
|
LOG.info("%s: Update given hosts" % self.session_id)
|
|
controllers = self.nova.services.list(binary='nova-conductor')
|
|
computes = self.nova.services.list(binary='nova-compute')
|
|
|
|
for host in self.hosts:
|
|
hostname = host.hostname
|
|
host.disabled = False
|
|
host.maintained = False
|
|
match = [compute for compute in computes if
|
|
hostname == compute.host]
|
|
if match:
|
|
host.type = 'compute'
|
|
if match[0].status == 'disabled':
|
|
LOG.error("%s: %s nova-compute disabled before maintenance"
|
|
% (self.session_id, hostname))
|
|
raise Exception("%s: %s already disabled"
|
|
% (self.session_id, hostname))
|
|
host.details = match[0].id
|
|
continue
|
|
if ([controller for controller in controllers if
|
|
hostname == controller.host]):
|
|
host.type = 'controller'
|
|
continue
|
|
host.type = 'other'
|
|
|
|
def disable_host_nova_compute(self, hostname):
|
|
LOG.info('%s: disable nova-compute on host %s' % (self.session_id,
|
|
hostname))
|
|
host = self.get_host_by_name(hostname)
|
|
try:
|
|
self.nova.services.disable_log_reason(host.details, "maintenance")
|
|
except TypeError:
|
|
LOG.debug('%s: Using old API to disable nova-compute on host %s' %
|
|
(self.session_id, hostname))
|
|
self.nova.services.disable_log_reason(hostname, "nova-compute",
|
|
"maintenance")
|
|
host.disabled = True
|
|
|
|
def enable_host_nova_compute(self, hostname):
|
|
LOG.info('%s: enable nova-compute on host %s' % (self.session_id,
|
|
hostname))
|
|
host = self.get_host_by_name(hostname)
|
|
try:
|
|
self.nova.services.enable(host.details)
|
|
except TypeError:
|
|
LOG.debug('%s: Using old API to enable nova-compute on host %s' %
|
|
(self.session_id, hostname))
|
|
self.nova.services.enable(hostname, "nova-compute")
|
|
host.disabled = False
|
|
|
|
def get_compute_hosts(self):
|
|
return [host.hostname for host in self.hosts
|
|
if host.type == 'compute']
|
|
|
|
def get_empty_computes(self):
|
|
all_computes = self.get_compute_hosts()
|
|
instance_computes = []
|
|
for instance in self.instances:
|
|
if instance.host not in instance_computes:
|
|
instance_computes.append(instance.host)
|
|
return [host for host in all_computes if host not in instance_computes]
|
|
|
|
def get_instance_details(self, instance):
|
|
network_interfaces = next(iter(instance.addresses.values()))
|
|
for network_interface in network_interfaces:
|
|
_type = network_interface.get('OS-EXT-IPS:type')
|
|
if _type == "floating":
|
|
LOG.info('Instance with floating ip: %s %s' %
|
|
(instance.id, instance.name))
|
|
return "floating_ip"
|
|
return None
|
|
|
|
def _fenix_instance(self, project_id, instance_id, instance_name, host,
|
|
state, details, action=None, project_state=None,
|
|
action_done=False):
|
|
instance = {'session_id': self.session_id,
|
|
'instance_id': instance_id,
|
|
'action': action,
|
|
'project_id': project_id,
|
|
'instance_id': instance_id,
|
|
'project_state': project_state,
|
|
'state': state,
|
|
'instance_name': instance_name,
|
|
'action_done': action_done,
|
|
'host': host,
|
|
'details': details}
|
|
return instance
|
|
|
|
def initialize_server_info(self):
|
|
project_ids = []
|
|
instances = []
|
|
compute_hosts = self.get_compute_hosts()
|
|
opts = {'all_tenants': True}
|
|
servers = self.nova.servers.list(detailed=True, search_opts=opts)
|
|
for server in servers:
|
|
try:
|
|
host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
if host not in compute_hosts:
|
|
continue
|
|
project_id = str(server.tenant_id)
|
|
instance_name = str(server.name)
|
|
instance_id = str(server.id)
|
|
details = self.get_instance_details(server)
|
|
state = str(server.__dict__.get('OS-EXT-STS:vm_state'))
|
|
except Exception:
|
|
raise Exception('can not get params from server=%s' % server)
|
|
instances.append(self._fenix_instance(project_id, instance_id,
|
|
instance_name, host, state,
|
|
details))
|
|
if project_id not in project_ids:
|
|
project_ids.append(project_id)
|
|
|
|
if len(project_ids):
|
|
self.projects = self.init_projects(project_ids)
|
|
else:
|
|
LOG.info('%s: No projects on computes under maintenance' %
|
|
self.session_id)
|
|
if len(instances):
|
|
self.instances = self.add_instances(instances)
|
|
else:
|
|
LOG.info('%s: No instances on computes under maintenance' %
|
|
self.session_id)
|
|
LOG.info(str(self))
|
|
|
|
def update_instance(self, project_id, instance_id, instance_name, host,
|
|
state, details):
|
|
if self.instance_id_found(instance_id):
|
|
# TBD Might need to update instance variables here if not done
|
|
# somewhere else
|
|
return
|
|
elif self.instance_name_found(instance_name):
|
|
# Project has made re-instantiation, remove old add new
|
|
old_instance = self.instance_by_name(instance_name)
|
|
instance = self._fenix_instance(project_id, instance_id,
|
|
instance_name, host,
|
|
state, details,
|
|
old_instance.action,
|
|
old_instance.project_state,
|
|
old_instance.action_done)
|
|
self.instances.append(self.add_instance(instance))
|
|
self.remove_instance(old_instance)
|
|
else:
|
|
# Instance new, as project has added instances
|
|
instance = self._fenix_instance(project_id, instance_id,
|
|
instance_name, host,
|
|
state, details)
|
|
self.instances.append(self.add_instance(instance))
|
|
|
|
def remove_non_existing_instances(self, instance_ids):
|
|
remove_instances = [instance for instance in
|
|
self.instances if instance.instance_id not in
|
|
instance_ids]
|
|
for instance in remove_instances:
|
|
# Instance deleted, as project possibly scaled down
|
|
self.remove_instance(instance)
|
|
|
|
def update_server_info(self):
|
|
# TBD This keeps internal instance information up-to-date and prints
|
|
# it out. Same could be done by updating the information when changed
|
|
# Anyhow this also double checks information against Nova
|
|
instance_ids = []
|
|
compute_hosts = self.get_compute_hosts()
|
|
opts = {'all_tenants': True}
|
|
servers = self.nova.servers.list(detailed=True, search_opts=opts)
|
|
for server in servers:
|
|
try:
|
|
host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
if host not in compute_hosts:
|
|
continue
|
|
project_id = str(server.tenant_id)
|
|
instance_name = str(server.name)
|
|
instance_id = str(server.id)
|
|
details = self.get_instance_details(server)
|
|
state = str(server.__dict__.get('OS-EXT-STS:vm_state'))
|
|
except Exception:
|
|
LOG.error('can not get params from server: %s, retry...' %
|
|
str(server.id))
|
|
# TBD sometimes cannot get all parameters, this retry can be
|
|
# enhanced when caught better
|
|
time.sleep(5)
|
|
try:
|
|
server = self.nova.servers.get(str(server.id))
|
|
host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
if host not in compute_hosts:
|
|
continue
|
|
project_id = str(server.tenant_id)
|
|
instance_name = str(server.name)
|
|
instance_id = str(server.id)
|
|
details = self.get_instance_details(server)
|
|
state = str(server.__dict__.get('OS-EXT-STS:vm_state'))
|
|
except Exception:
|
|
raise Exception('can not get params from server: %s' %
|
|
str(server.id))
|
|
LOG.info('got params from server: %s' % str(server.id))
|
|
self.update_instance(project_id, instance_id, instance_name, host,
|
|
state, details)
|
|
instance_ids.append(instance_id)
|
|
self.remove_non_existing_instances(instance_ids)
|
|
|
|
LOG.info(str(self))
|
|
|
|
def confirm_maintenance(self):
|
|
allowed_actions = []
|
|
actions_at = self.session.maintenance_at
|
|
state = 'MAINTENANCE'
|
|
self.set_projets_state(state)
|
|
all_replied = False
|
|
project_not_replied = None
|
|
retry = 2
|
|
while not all_replied:
|
|
for project in self.project_names():
|
|
if (project_not_replied is not None and project not in
|
|
project_not_replied):
|
|
continue
|
|
LOG.info('\nMAINTENANCE to project %s\n' % project)
|
|
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
|
|
self.session_id,
|
|
project)
|
|
reply_at = reply_time_str(self.conf.project_maintenance_reply)
|
|
if is_time_after_time(reply_at, actions_at):
|
|
LOG.error('%s: No time for project to answer in state: %s'
|
|
% (self.session_id, state))
|
|
self.state("MAINTENANCE_FAILED")
|
|
return False
|
|
metadata = self.session.meta
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
self.start_timer(self.conf.project_maintenance_reply,
|
|
'MAINTENANCE_TIMEOUT')
|
|
|
|
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
|
|
if not all_replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_maintenance failed after retries')
|
|
break
|
|
else:
|
|
LOG.info('confirm_maintenance retry')
|
|
projects = self.get_projects_with_state()
|
|
project_not_replied = (
|
|
self._project_names_in_state(projects, state))
|
|
retry -= 1
|
|
return all_replied
|
|
|
|
def confirm_scale_in(self):
|
|
allowed_actions = []
|
|
actions_at = reply_time_str(self.conf.project_scale_in_reply)
|
|
reply_at = actions_at
|
|
state = 'SCALE_IN'
|
|
self.set_projets_state(state)
|
|
all_replied = False
|
|
project_not_replied = None
|
|
retry = 2
|
|
while not all_replied:
|
|
for project in self.project_names():
|
|
if (project_not_replied is not None and project not in
|
|
project_not_replied):
|
|
continue
|
|
LOG.info('\nSCALE_IN to project %s\n' % project)
|
|
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
|
|
self.session_id,
|
|
project)
|
|
metadata = self.session.meta
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
self.start_timer(self.conf.project_scale_in_reply,
|
|
'SCALE_IN_TIMEOUT')
|
|
|
|
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
|
|
if not all_replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_scale_in failed after retries')
|
|
break
|
|
else:
|
|
LOG.info('confirm_scale_in retry')
|
|
projects = self.get_projects_with_state()
|
|
project_not_replied = (
|
|
self._project_names_in_state(projects, state))
|
|
retry -= 1
|
|
return all_replied
|
|
|
|
def need_scale_in(self):
|
|
hvisors = self.nova.hypervisors.list(detailed=True)
|
|
prev_vcpus = 0
|
|
free_vcpus = 0
|
|
prev_hostname = ''
|
|
LOG.info('checking hypervisors for VCPU capacity')
|
|
for hvisor in hvisors:
|
|
hostname = (
|
|
hvisor.__getattr__('hypervisor_hostname').split(".", 1)[0])
|
|
if hostname not in self.get_compute_hosts():
|
|
continue
|
|
vcpus = hvisor.__getattr__('vcpus')
|
|
vcpus_used = hvisor.__getattr__('vcpus_used')
|
|
if prev_vcpus != 0 and prev_vcpus != vcpus:
|
|
raise Exception('%s: %d vcpus on %s does not match to'
|
|
'%d on %s'
|
|
% (self.session_id, vcpus, hostname,
|
|
prev_vcpus, prev_hostname))
|
|
free_vcpus += vcpus - vcpus_used
|
|
prev_vcpus = vcpus
|
|
prev_hostname = hostname
|
|
if free_vcpus >= vcpus:
|
|
# TBD vcpu capacity might be too scattered so moving instances from
|
|
# one host to other host still might not succeed. At least with
|
|
# NUMA and CPU pinning, one should calculate and ask specific
|
|
# instances
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def get_vcpus_by_host(self, host, hvisors):
|
|
hvisor = ([h for h in hvisors if
|
|
h.__getattr__('hypervisor_hostname').split(".", 1)[0]
|
|
== host][0])
|
|
vcpus = hvisor.__getattr__('vcpus')
|
|
vcpus_used = hvisor.__getattr__('vcpus_used')
|
|
return vcpus, vcpus_used
|
|
|
|
def find_host_to_be_empty(self, need_empty, weighted_hosts,
|
|
act_instances_hosts):
|
|
print("need_empty: %s" % need_empty)
|
|
hosts_to_be_empty = []
|
|
for vcpus_used in sorted(weighted_hosts.keys()):
|
|
print("vcpus_used in weighted_hosts: %s" % vcpus_used)
|
|
weighted_candidates = weighted_hosts[vcpus_used]
|
|
if len(weighted_candidates) == need_empty:
|
|
# Happened to be exact match to needed
|
|
hosts_to_be_empty = weighted_hosts[vcpus_used]
|
|
print("hosts to be empty: %s" % hosts_to_be_empty)
|
|
elif len(weighted_candidates) > need_empty:
|
|
# More candidates than we need, dig deeper to act_instances
|
|
for act_instances in sorted(act_instances_hosts.keys()):
|
|
print("act_instances in act_instances_hosts: %s"
|
|
% act_instances)
|
|
for candidate in weighted_candidates:
|
|
print("candidate: %s" % candidate)
|
|
if candidate in act_instances_hosts[act_instances]:
|
|
print("host to be empty: %s" % candidate)
|
|
hosts_to_be_empty.append(candidate)
|
|
if len(hosts_to_be_empty) == need_empty:
|
|
break
|
|
if len(hosts_to_be_empty) == need_empty:
|
|
break
|
|
if len(hosts_to_be_empty) == need_empty:
|
|
break
|
|
if len(hosts_to_be_empty) != need_empty:
|
|
print("we failed to search hosts to be empty!!!")
|
|
return hosts_to_be_empty
|
|
|
|
def make_empty_hosts(self, state):
|
|
# disable nova-compute on empty host or parallel to be empty
|
|
# host so cannot move instance to there. Otherwise make math
|
|
# to find the target host and give it to move operation
|
|
# done with temporarily_disabled_hosts
|
|
temporarily_disabled_hosts = []
|
|
|
|
hvisors = self.nova.hypervisors.list(detailed=True)
|
|
weighted_hosts = {}
|
|
act_instances_hosts = {}
|
|
self.empty_hosts = []
|
|
full_capacity = 0
|
|
used_capacity = 0
|
|
for host in self.get_compute_hosts():
|
|
vcpus, vcpus_used = self.get_vcpus_by_host(host, hvisors)
|
|
full_capacity += vcpus
|
|
used_capacity += vcpus_used
|
|
act_instances = 0
|
|
for project in self.project_names():
|
|
for instance in (self.instances_by_host_and_project(host,
|
|
project)):
|
|
if instance.details and "floating_ip" in instance.details:
|
|
act_instances += 1
|
|
if vcpus_used == 0:
|
|
self.empty_hosts.append(host)
|
|
temporarily_disabled_hosts.append(host)
|
|
self.disable_host_nova_compute(host)
|
|
LOG.info("host %s empty" % host)
|
|
elif vcpus_used == vcpus:
|
|
# We do not choose full host
|
|
continue
|
|
else:
|
|
if vcpus_used not in weighted_hosts:
|
|
weighted_hosts[vcpus_used] = [host]
|
|
else:
|
|
weighted_hosts[vcpus_used].append(host)
|
|
if act_instances not in act_instances_hosts:
|
|
act_instances_hosts[act_instances] = [host]
|
|
else:
|
|
act_instances_hosts[act_instances].append(host)
|
|
# how many empty hosts possible
|
|
parallel_hosts = int((full_capacity - used_capacity) / vcpus)
|
|
need_empty = parallel_hosts - len(self.empty_hosts)
|
|
|
|
if need_empty != 0:
|
|
hosts_to_be_empty = self.find_host_to_be_empty(need_empty,
|
|
weighted_hosts,
|
|
act_instances_hosts)
|
|
for host in hosts_to_be_empty:
|
|
temporarily_disabled_hosts.append(host)
|
|
self.disable_host_nova_compute(host)
|
|
thrs = []
|
|
for host in hosts_to_be_empty:
|
|
thrs.append(self.actions_to_have_empty_host(host, state))
|
|
LOG.info("waiting hosts %s to be empty..." % hosts_to_be_empty)
|
|
for thr in thrs:
|
|
thr.join()
|
|
LOG.info("hosts %s made empty" % hosts_to_be_empty)
|
|
for host in temporarily_disabled_hosts:
|
|
self.enable_host_nova_compute(host)
|
|
return True
|
|
|
|
def confirm_instance_action(self, instance, state):
|
|
instance_id = instance.instance_id
|
|
LOG.info('%s to instance %s' % (state, instance_id))
|
|
allowed_actions = ['MIGRATE', 'LIVE_MIGRATE', 'OWN_ACTION']
|
|
try:
|
|
instance_constraints = db_api.project_instance_get(instance_id)
|
|
wait_time = instance_constraints.lead_time
|
|
LOG.info("%s actions_at from constraints lead_time: %s" %
|
|
(instance_id, wait_time))
|
|
except db_exc.FenixDBNotFound:
|
|
wait_time = self.conf.project_maintenance_reply
|
|
actions_at = reply_time_str(wait_time)
|
|
reply_at = actions_at
|
|
instance.project_state = state
|
|
metadata = self.session.meta
|
|
retry = 2
|
|
replied = False
|
|
while not replied:
|
|
metadata = self.session.meta
|
|
self._project_notify(instance.project_id, [instance_id],
|
|
allowed_actions, actions_at, reply_at,
|
|
state, metadata)
|
|
timer = '%s_%s_TIMEOUT' % (state, instance_id)
|
|
self.start_timer(self.conf.project_maintenance_reply, timer)
|
|
replied = self.wait_instance_reply_state(state, instance, timer)
|
|
if not replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_instance_action for %s failed after '
|
|
'retries' % instance.instance_id)
|
|
break
|
|
else:
|
|
LOG.info('confirm_instance_action for %s retry'
|
|
% instance.instance_id)
|
|
else:
|
|
break
|
|
retry -= 1
|
|
return replied
|
|
|
|
def confirm_maintenance_complete(self):
|
|
state = 'MAINTENANCE_COMPLETE'
|
|
metadata = self.session.meta
|
|
actions_at = reply_time_str(self.conf.project_scale_in_reply)
|
|
reply_at = actions_at
|
|
self.set_projets_state(state)
|
|
all_replied = False
|
|
project_not_replied = None
|
|
retry = 2
|
|
while not all_replied:
|
|
for project in self.project_names():
|
|
if (project_not_replied is not None and project not in
|
|
project_not_replied):
|
|
continue
|
|
LOG.info('%s to project %s' % (state, project))
|
|
instance_ids = '%s/v1/maintenance/%s/%s' % (self.url,
|
|
self.session_id,
|
|
project)
|
|
allowed_actions = []
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
self.start_timer(self.conf.project_scale_in_reply,
|
|
'%s_TIMEOUT' % state)
|
|
|
|
all_replied = self.wait_projects_state(state, '%s_TIMEOUT' % state)
|
|
if not all_replied:
|
|
if retry == 0:
|
|
LOG.info('confirm_maintenance_complete failed after '
|
|
'retries')
|
|
break
|
|
else:
|
|
LOG.info('confirm_maintenance_complete retry')
|
|
projects = self.get_projects_with_state()
|
|
project_not_replied = (
|
|
self._project_names_in_state(projects, state))
|
|
retry -= 1
|
|
return all_replied
|
|
|
|
def notify_action_done(self, instance):
|
|
instance_ids = [instance.instance_id]
|
|
project = instance.project_id
|
|
allowed_actions = []
|
|
actions_at = None
|
|
reply_at = None
|
|
state = "INSTANCE_ACTION_DONE"
|
|
instance.project_state = state
|
|
metadata = "{}"
|
|
self._project_notify(project, instance_ids, allowed_actions,
|
|
actions_at, reply_at, state, metadata)
|
|
|
|
@run_async
|
|
def instance_action(self, instance, state, target_host=None):
|
|
if not self.confirm_instance_action(instance, state):
|
|
raise Exception('%s: instance %s action %s '
|
|
'confirmation failed' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
# TBD from constraints or override in instance.action
|
|
LOG.info('Action %s instance %s ' % (instance.action,
|
|
instance.instance_id))
|
|
try:
|
|
instance_constraints = (
|
|
db_api.project_instance_get(instance.instance_id))
|
|
group_id = instance_constraints.group_id
|
|
instance_group = db_api.instance_group_get(group_id)
|
|
if group_id not in self.group_impacted_members:
|
|
self.group_impacted_members[group_id] = 0
|
|
max_parallel = instance_group.max_impacted_members
|
|
LOG.info("%s - instance_group: %s max_impacted_members: %s "
|
|
"recovery_time: %s" %
|
|
(instance.instance_id, instance_group.group_name,
|
|
max_parallel, instance_group.recovery_time))
|
|
except db_exc.FenixDBNotFound:
|
|
raise Exception('failed to get %s constraints' %
|
|
(instance.instance_id))
|
|
while max_parallel < self.group_impacted_members[group_id]:
|
|
LOG.info('%s waiting in group queue / max_parallel %s/%s' %
|
|
(instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
time.sleep(5)
|
|
self.group_impacted_members[group_id] += 1
|
|
LOG.debug("%s Reserved / max_impacted_members: %s/%s" %
|
|
(instance.instance_id, self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
if instance.action == 'MIGRATE':
|
|
if not self.migrate_server(instance, target_host):
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / "
|
|
"max_impacted_members:%s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
raise Exception('%s: instance %s action '
|
|
'%s failed' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
self.notify_action_done(instance)
|
|
elif instance.action == 'OWN_ACTION':
|
|
pass
|
|
elif instance.action == 'LIVE_MIGRATE':
|
|
if not self.live_migrate_server(instance, target_host):
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / "
|
|
"max_impacted_members:%s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
raise Exception('%s: instance %s action '
|
|
'%s failed' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
self.notify_action_done(instance)
|
|
else:
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / "
|
|
"max_impacted_members:%s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
raise Exception('%s: instance %s action '
|
|
'%s not supported' %
|
|
(self.session_id, instance.instance_id,
|
|
instance.action))
|
|
# We need to obey recovery time for instance group before
|
|
# decrease self.group_impacted_members[group_id] to allow
|
|
# one more instances of same group to be affected by any move
|
|
# operation
|
|
if instance_group.recovery_time > 0:
|
|
LOG.debug("%s wait VNF to recover from move for %ssec"
|
|
% (instance.instance_id,
|
|
instance_group.recovery_time))
|
|
time.sleep(instance_group.recovery_time)
|
|
self.group_impacted_members[group_id] -= 1
|
|
LOG.debug("%s Reservation freed. remain / max_impacted_members: %s/%s"
|
|
% (instance.instance_id,
|
|
self.group_impacted_members[group_id],
|
|
max_parallel))
|
|
|
|
@run_async
|
|
def actions_to_have_empty_host(self, host, state, target_host=None):
|
|
thrs = []
|
|
LOG.info('actions_to_have_empty_host %s' % host)
|
|
instances = self.instances_by_host(host)
|
|
if not instances:
|
|
raise Exception('No instances on host: %s' % host)
|
|
for instance in instances:
|
|
LOG.info('move %s from %s' % (instance.instance_name, host))
|
|
thrs.append(self.instance_action(instance, state,
|
|
target_host))
|
|
for thr in thrs:
|
|
thr.join()
|
|
return self._wait_host_empty(host)
|
|
|
|
def _wait_host_empty(self, host):
|
|
hid = self.nova.hypervisors.search(host)[0].id
|
|
vcpus_used_last = 0
|
|
# wait 4min to get host emptys
|
|
for j in range(48):
|
|
hvisor = self.nova.hypervisors.get(hid)
|
|
vcpus_used = hvisor.__getattr__('vcpus_used')
|
|
if vcpus_used > 0:
|
|
if vcpus_used != vcpus_used_last or vcpus_used_last == 0:
|
|
LOG.info('%s still has %d vcpus reserved. wait...'
|
|
% (host, vcpus_used))
|
|
vcpus_used_last = vcpus_used
|
|
time.sleep(5)
|
|
else:
|
|
LOG.info('%s empty' % host)
|
|
return True
|
|
LOG.info('%s host still not empty' % host)
|
|
return False
|
|
|
|
def live_migrate_server(self, instance, target_host=None):
|
|
server_id = instance.instance_id
|
|
server = self.nova.servers.get(server_id)
|
|
instance.state = server.__dict__.get('OS-EXT-STS:vm_state')
|
|
orig_host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
LOG.info('live_migrate_server %s state %s host %s to %s' %
|
|
(server_id, instance.state, orig_host, target_host))
|
|
orig_vm_state = instance.state
|
|
last_vm_status = str(server.__dict__.get('status'))
|
|
last_migration_status = "active"
|
|
try:
|
|
server.live_migrate(host=target_host)
|
|
waited = 0
|
|
migrate_retries = 0
|
|
while waited != self.conf.live_migration_wait_time:
|
|
time.sleep(1)
|
|
server = self.nova.servers.get(server_id)
|
|
host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
vm_status = str(server.__dict__.get('status'))
|
|
instance.state = server.__dict__.get('OS-EXT-STS:vm_state')
|
|
instance.host = host
|
|
if vm_status != last_vm_status:
|
|
LOG.info('instance %s status changed: %s' % (server_id,
|
|
vm_status))
|
|
if instance.state == 'error':
|
|
LOG.error('instance %s live migration failed'
|
|
% server_id)
|
|
return False
|
|
elif orig_vm_state != instance.state:
|
|
LOG.info('instance %s state changed: %s' % (server_id,
|
|
instance.state))
|
|
elif host != orig_host:
|
|
LOG.info('instance %s live migrated to host %s' %
|
|
(server_id, host))
|
|
return True
|
|
migration = (
|
|
self.nova.migrations.list(instance_uuid=server_id)[0])
|
|
if migration.status == 'error':
|
|
if migrate_retries == self.conf.live_migration_retries:
|
|
LOG.error('instance %s live migration failed after '
|
|
'%d retries' %
|
|
(server_id,
|
|
self.conf.live_migration_retries))
|
|
return False
|
|
# When live migrate fails it can fail fast after calling
|
|
# To have Nova time to be ready for next live migration
|
|
# There needs to be enough time to wait before retry
|
|
# And waiting more on next retry have better chance to
|
|
# Have live migration finally through
|
|
time.sleep(2 * (migrate_retries + 5))
|
|
LOG.info('instance %s live migration failed, retry'
|
|
% server_id)
|
|
server.live_migrate(host=target_host)
|
|
waited = 0
|
|
migrate_retries = migrate_retries + 1
|
|
elif migration.status != last_migration_status:
|
|
LOG.info('instance %s live migration status changed: %s'
|
|
% (server_id, migration.status))
|
|
waited = waited + 1
|
|
last_migration_status = migration.status
|
|
last_vm_status = vm_status
|
|
LOG.error('instance %s live migration did not finish in %ss, '
|
|
'state: %s' % (server_id, waited, instance.state))
|
|
except Exception as e:
|
|
LOG.error('server %s live migration failed, Exception=%s' %
|
|
(server_id, e))
|
|
return False
|
|
|
|
def migrate_server(self, instance, target_host=None):
|
|
server_id = instance.instance_id
|
|
server = self.nova.servers.get(server_id)
|
|
instance.state = server.__dict__.get('OS-EXT-STS:vm_state')
|
|
orig_host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
LOG.info('migrate_server %s state %s host %s to %s' %
|
|
(server_id, instance.state, orig_host, target_host))
|
|
last_vm_state = instance.state
|
|
retry_migrate = 7
|
|
while True:
|
|
try:
|
|
server.migrate(host=target_host)
|
|
time.sleep(5)
|
|
retries = 48
|
|
while instance.state != 'resized' and retries > 0:
|
|
# try to confirm within 4min
|
|
server = self.nova.servers.get(server_id)
|
|
host = str(server.__dict__.get('OS-EXT-SRV-ATTR:host'))
|
|
instance.state = server.__dict__.get('OS-EXT-STS:vm_state')
|
|
if instance.state == 'resized':
|
|
server.confirm_resize()
|
|
LOG.info('instance %s migration resized to host %s' %
|
|
(server_id, host))
|
|
instance.host = host
|
|
return True
|
|
if last_vm_state != instance.state:
|
|
LOG.info('instance %s state changed: %s' % (server_id,
|
|
instance.state))
|
|
if instance.state == 'error':
|
|
LOG.error('instance %s migration failed, state: %s'
|
|
% (server_id, instance.state))
|
|
instance.host = host
|
|
return False
|
|
time.sleep(5)
|
|
retries = retries - 1
|
|
last_vm_state = instance.state
|
|
# Timout waiting state to change
|
|
break
|
|
|
|
except BadRequest:
|
|
if retry_migrate == 0:
|
|
LOG.error('server %s migrate failed after retries' %
|
|
server_id)
|
|
return False
|
|
# Might take time for scheduler to sync inconsistent instance
|
|
# list for host.
|
|
retry_timeout = 40 - (retry_migrate * 5)
|
|
LOG.info('server %s migrate failed, retry in %s sec'
|
|
% (server_id, retry_timeout))
|
|
time.sleep(retry_timeout)
|
|
# Somehow the retry mightwork when refresh the server
|
|
server = self.nova.servers.get(server_id)
|
|
except Exception as e:
|
|
LOG.error('server %s migration failed, Exception=%s' %
|
|
(server_id, e))
|
|
return False
|
|
finally:
|
|
retry_migrate = retry_migrate - 1
|
|
LOG.error('instance %s migration timeout, state: %s' %
|
|
(server_id, instance.state))
|
|
return False
|
|
|
|
def maintenance_by_plugin_type(self, hostname, plugin_type):
|
|
aps = self.get_action_plugins_by_type(plugin_type)
|
|
session_dir = "%s/%s" % (self.conf.engine.local_cache_dir,
|
|
self.session_id)
|
|
download_plugin_dir = session_dir + "/actions/"
|
|
if aps:
|
|
LOG.info("%s: Calling action plug-ins with type %s" %
|
|
(self.session_id, plugin_type))
|
|
for ap in aps:
|
|
ap_name = "fenix.workflow.actions.%s" % ap.plugin
|
|
LOG.info("%s: Calling action plug-in module: %s" %
|
|
(self.session_id, ap_name))
|
|
ap_db_instance = self._create_action_plugin_instance(ap.plugin,
|
|
hostname)
|
|
try:
|
|
action_plugin = getattr(import_module(ap_name),
|
|
'ActionPlugin')
|
|
ap_instance = action_plugin(self, ap_db_instance)
|
|
except ImportError:
|
|
download_plugin_file = "%s/%s.py" % (download_plugin_dir,
|
|
ap.plugin)
|
|
LOG.info("%s: Trying from: %s" % (self.session_id,
|
|
download_plugin_file))
|
|
if os.path.isfile(download_plugin_file):
|
|
ap_instance = (
|
|
mod_loader_action_instance(ap_name,
|
|
download_plugin_file,
|
|
self,
|
|
ap_db_instance))
|
|
else:
|
|
raise Exception('%s: could not find action plugin %s' %
|
|
(self.session_id, ap.plugin))
|
|
|
|
ap_instance.run()
|
|
if ap_db_instance.state:
|
|
LOG.info('%s: %s finished with %s host %s' %
|
|
(self.session_id, ap.plugin,
|
|
ap_db_instance.state, hostname))
|
|
if 'FAILED' in ap_db_instance.state:
|
|
raise Exception('%s: %s finished with %s host %s' %
|
|
(self.session_id, ap.plugin,
|
|
ap_db_instance.state, hostname))
|
|
else:
|
|
raise Exception('%s: %s reported no state for host %s' %
|
|
(self.session_id, ap.plugin, hostname))
|
|
# If ap_db_instance failed, we keep it for state
|
|
db_api.remove_action_plugin_instance(ap_db_instance)
|
|
else:
|
|
LOG.info("%s: No action plug-ins with type %s" %
|
|
(self.session_id, plugin_type))
|
|
|
|
@run_async
|
|
def host_maintenance_async(self, hostname):
|
|
self.host_maintenance(hostname)
|
|
|
|
def host_maintenance(self, hostname):
|
|
host = self.get_host_by_name(hostname)
|
|
if host.type == "compute":
|
|
self._wait_host_empty(hostname)
|
|
LOG.info('IN_MAINTENANCE %s' % hostname)
|
|
self._admin_notify(self.conf.service_user.os_project_name,
|
|
hostname,
|
|
'IN_MAINTENANCE',
|
|
self.session_id)
|
|
for plugin_type in ["host", host.type]:
|
|
LOG.info('%s: Execute %s action plugins' % (self.session_id,
|
|
plugin_type))
|
|
self.maintenance_by_plugin_type(hostname, plugin_type)
|
|
self._admin_notify(self.conf.service_user.os_project_name,
|
|
hostname,
|
|
'MAINTENANCE_COMPLETE',
|
|
self.session_id)
|
|
if host.type == "compute":
|
|
self.enable_host_nova_compute(hostname)
|
|
LOG.info('MAINTENANCE_COMPLETE %s' % hostname)
|
|
host.maintained = True
|
|
|
|
def maintenance(self):
|
|
LOG.info("%s: maintenance called" % self.session_id)
|
|
self.initialize_server_info()
|
|
|
|
if not self.projects_listen_alarm('maintenance.scheduled'):
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
|
|
if not self.confirm_maintenance():
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
|
|
maintenance_empty_hosts = self.get_empty_computes()
|
|
|
|
if len(maintenance_empty_hosts) == 0:
|
|
if self.need_scale_in():
|
|
LOG.info('%s: Need to scale in to get capacity for '
|
|
'empty host' % (self.session_id))
|
|
self.state('SCALE_IN')
|
|
else:
|
|
LOG.info('%s: Free capacity, but need empty host' %
|
|
(self.session_id))
|
|
self.state('PREPARE_MAINTENANCE')
|
|
else:
|
|
LOG.info('Empty host found')
|
|
self.state('START_MAINTENANCE')
|
|
|
|
if self.session.maintenance_at > datetime.datetime.utcnow():
|
|
time_now = time_now_str()
|
|
LOG.info('Time now: %s maintenance starts: %s....' %
|
|
(time_now, datetime_to_str(self.session.maintenance_at)))
|
|
td = self.session.maintenance_at - datetime.datetime.utcnow()
|
|
self.start_timer(td.total_seconds(), 'MAINTENANCE_START_TIMEOUT')
|
|
while not self.is_timer_expired('MAINTENANCE_START_TIMEOUT'):
|
|
time.sleep(1)
|
|
|
|
time_now = time_now_str()
|
|
LOG.info('Time to start maintenance: %s' % time_now)
|
|
|
|
def scale_in(self):
|
|
LOG.info("%s: scale in" % self.session_id)
|
|
# TBD we just blindly ask to scale_in to have at least one
|
|
# empty compute. With NUMA and CPI pinning and together with
|
|
# how many instances can be affected at the same time, we should
|
|
# calculate and ask scaling of specific instances
|
|
if not self.confirm_scale_in():
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
# TBD it takes time to have proper information updated about free
|
|
# capacity. Should make sure instances removed has also VCPUs removed
|
|
self.update_server_info()
|
|
maintenance_empty_hosts = self.get_empty_computes()
|
|
|
|
if len(maintenance_empty_hosts) == 0:
|
|
if self.need_scale_in():
|
|
LOG.info('%s: Need to scale in more to get capacity for '
|
|
'empty host' % (self.session_id))
|
|
self.state('SCALE_IN')
|
|
else:
|
|
LOG.info('%s: Free capacity, but need empty host' %
|
|
(self.session_id))
|
|
self.state('PREPARE_MAINTENANCE')
|
|
else:
|
|
LOG.info('Empty host found')
|
|
for host in maintenance_empty_hosts:
|
|
self._wait_host_empty(host)
|
|
self.state('START_MAINTENANCE')
|
|
|
|
def prepare_maintenance(self):
|
|
LOG.info("%s: prepare_maintenance called" % self.session_id)
|
|
if not self.make_empty_hosts('PREPARE_MAINTENANCE'):
|
|
LOG.error('make_empty_hosts failed')
|
|
self.state('MAINTENANCE_FAILED')
|
|
else:
|
|
self.state('START_MAINTENANCE')
|
|
self.update_server_info()
|
|
|
|
def start_maintenance(self):
|
|
LOG.info("%s: start_maintenance called" % self.session_id)
|
|
empty_hosts = self.get_empty_computes()
|
|
if not empty_hosts:
|
|
LOG.info("%s: No empty host to be maintained" % self.session_id)
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
maintained_hosts = self.get_maintained_hosts_by_type('compute')
|
|
if not maintained_hosts:
|
|
computes = self.get_compute_hosts()
|
|
for compute in computes:
|
|
# When we start to maintain compute hosts, all these hosts
|
|
# nova-compute service is disabled, so projects cannot have
|
|
# instances scheduled to not maintained hosts
|
|
self.disable_host_nova_compute(compute)
|
|
for host in self.get_controller_hosts():
|
|
# TBD one might need to change this. Now all controllers
|
|
# maintenance serialized
|
|
self.host_maintenance(host)
|
|
# First we maintain all empty compute hosts
|
|
thrs = []
|
|
for host in empty_hosts:
|
|
thrs.append(self.host_maintenance_async(host))
|
|
for thr in thrs:
|
|
thr.join()
|
|
self.state('PLANNED_MAINTENANCE')
|
|
|
|
def planned_maintenance(self):
|
|
LOG.info("%s: planned_maintenance called" % self.session_id)
|
|
maintained_hosts = self.get_maintained_hosts_by_type('compute')
|
|
compute_hosts = self.get_compute_hosts()
|
|
not_maintained_hosts = ([host for host in compute_hosts if host
|
|
not in maintained_hosts])
|
|
empty_compute_hosts = self.get_empty_computes()
|
|
parallel = len(empty_compute_hosts)
|
|
not_maintained = len(not_maintained_hosts)
|
|
while not_maintained:
|
|
if not_maintained < parallel:
|
|
parallel = not_maintained
|
|
thrs = []
|
|
for index in range(parallel):
|
|
shost = not_maintained_hosts[index]
|
|
thost = empty_compute_hosts[index]
|
|
thrs.append(
|
|
self.actions_to_have_empty_host(shost,
|
|
'PLANNED_MAINTENANCE',
|
|
thost))
|
|
for thr in thrs:
|
|
thr.join()
|
|
thrs = []
|
|
for index in range(parallel):
|
|
host = not_maintained_hosts[index]
|
|
thrs.append(self.host_maintenance_async(host))
|
|
for thr in thrs:
|
|
thr.join()
|
|
empty_compute_hosts = self.get_empty_computes()
|
|
del not_maintained_hosts[:parallel]
|
|
parallel = len(empty_compute_hosts)
|
|
not_maintained = len(not_maintained_hosts)
|
|
self.update_server_info()
|
|
|
|
LOG.info("%s: planned_maintenance done" % self.session_id)
|
|
self.state('MAINTENANCE_COMPLETE')
|
|
|
|
def maintenance_complete(self):
|
|
LOG.info("%s: maintenance_complete called" % self.session_id)
|
|
LOG.info('%s: Execute post action plugins' % self.session_id)
|
|
self.maintenance_by_plugin_type("localhost", "post")
|
|
LOG.info('Projects may still need to up scale back to full '
|
|
'capcity')
|
|
if not self.confirm_maintenance_complete():
|
|
self.state('MAINTENANCE_FAILED')
|
|
return
|
|
self.update_server_info()
|
|
self.state('MAINTENANCE_DONE')
|
|
|
|
def maintenance_done(self):
|
|
pass
|
|
|
|
def maintenance_failed(self):
|
|
LOG.info("%s: maintenance_failed called" % self.session_id)
|
|
|
|
def cleanup(self):
|
|
LOG.info("%s: cleanup" % self.session_id)
|
|
db_api.remove_session(self.session_id)
|