fenix/fenix/tools/vnfm_k8s.py

562 lines
24 KiB
Python

# Copyright (c) 2020 Nokia Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import aodhclient.client as aodhclient
import datetime
from flask import Flask
from flask import request
import json
from keystoneauth1 import loading
from keystoneclient import client as ks_client
from kubernetes import client
from kubernetes import config
import logging as lging
from oslo_config import cfg
from oslo_log import log as logging
import requests
import sys
from threading import Thread
import time
import yaml
try:
import fenix.utils.identity_auth as identity_auth
except ValueError:
sys.path.append('../utils')
import identity_auth
LOG = logging.getLogger(__name__)
streamlog = lging.StreamHandler(sys.stdout)
LOG.logger.addHandler(streamlog)
LOG.logger.setLevel(logging.INFO)
opts = [
cfg.StrOpt('ip',
default='127.0.0.1',
help='the ip of VNFM',
required=True),
cfg.IntOpt('port',
default='12348',
help='the port of VNFM',
required=True),
]
CONF = cfg.CONF
CONF.register_opts(opts)
CONF.register_opts(identity_auth.os_opts, group='service_user')
def get_identity_auth(conf, project=None, username=None, password=None):
loader = loading.get_plugin_loader('password')
return loader.load_from_options(
auth_url=conf.service_user.os_auth_url,
username=(username or conf.service_user.os_username),
password=(password or conf.service_user.os_password),
user_domain_name=conf.service_user.os_user_domain_name,
project_name=(project or conf.service_user.os_project_name),
tenant_name=(project or conf.service_user.os_project_name),
project_domain_name=conf.service_user.os_project_domain_name)
class VNFM(object):
def __init__(self, conf, log):
self.conf = conf
self.log = log
self.app = None
def start(self):
LOG.info('VNFM start......')
self.app = VNFManager(self.conf, self.log)
self.app.start()
def stop(self):
LOG.info('VNFM stop......')
if not self.app:
return
self.app.headers['X-Auth-Token'] = self.app.session.get_token()
self.app.delete_constraints()
headers = {
'Content-Type': 'application/json',
'Accept': 'application/json',
}
url = 'http://%s:%d/shutdown'\
% (self.conf.ip,
self.conf.port)
requests.post(url, data='', headers=headers)
class VNFManager(Thread):
def __init__(self, conf, log):
Thread.__init__(self)
self.conf = conf
self.log = log
self.port = self.conf.port
self.intance_ids = None
# VNFM is started with OS_* exported as admin user
# We need that to query Fenix endpoint url
# Still we work with our tenant/poroject/vnf as demo
self.project = "demo"
LOG.info('VNFM project: %s' % self.project)
self.auth = identity_auth.get_identity_auth(conf, project=self.project)
self.session = identity_auth.get_session(auth=self.auth)
self.ks = ks_client.Client(version='v3', session=self.session)
self.aodh = aodhclient.Client(2, self.session)
# Subscribe to mainenance event alarm from Fenix via AODH
self.create_alarm()
config.load_kube_config()
self.kaapi = client.AppsV1Api()
self.kapi = client.CoreV1Api()
self.headers = {
'Content-Type': 'application/json',
'Accept': 'application/json'}
self.headers['X-Auth-Token'] = self.session.get_token()
self.orig_number_of_instances = self.number_of_instances()
# List of instances
self.ha_instances = []
self.nonha_instances = []
# Different instance_id specific constraints {instanse_id: {},...}
self.instance_constraints = None
# Update existing instances to instance lists
self.update_instances()
# How many instances needs to exists (with current VNF load)
# max_impacted_members need to be updated accordingly
# if number of instances is scaled. example for demo-ha:
# max_impacted_members = len(self.ha_instances) - ha_group_limit
self.ha_group_limit = 2
self.nonha_group_limit = 2
# Different instance groups constraints dict
self.ha_group = None
self.nonha_group = None
auth = get_identity_auth(conf,
project='service',
username='fenix',
password='admin')
session = identity_auth.get_session(auth=auth)
keystone = ks_client.Client(version='v3', session=session)
# VNF project_id (VNF ID)
self.project_id = self.session.get_project_id()
# HA instance_id that is active has active label
self.active_instance_id = self.active_instance_id()
services = keystone.services.list()
for service in services:
if service.type == 'maintenance':
LOG.info('maintenance service: %s:%s type %s'
% (service.name, service.id, service.type))
maint_id = service.id
self.maint_endpoint = [ep.url for ep in keystone.endpoints.list()
if ep.service_id == maint_id and
ep.interface == 'public'][0]
LOG.info('maintenance endpoint: %s' % self.maint_endpoint)
self.update_constraints_lock = False
self.update_constraints()
# Instances waiting action to be done
self.pending_actions = {}
def create_alarm(self):
alarms = {alarm['name']: alarm for alarm in self.aodh.alarm.list()}
alarm_name = "%s_MAINTENANCE_ALARM" % self.project
if alarm_name in alarms:
return
alarm_request = dict(
name=alarm_name,
description=alarm_name,
enabled=True,
alarm_actions=[u'http://%s:%d/maintenance'
% (self.conf.ip,
self.conf.port)],
repeat_actions=True,
severity=u'moderate',
type=u'event',
event_rule=dict(event_type=u'maintenance.scheduled'))
self.aodh.alarm.create(alarm_request)
def delete_remote_instance_constraints(self, instance_id):
url = "%s/instance/%s" % (self.maint_endpoint, instance_id)
LOG.info('DELETE: %s' % url)
ret = requests.delete(url, data=None, headers=self.headers)
if ret.status_code != 200 and ret.status_code != 204:
if ret.status_code == 404:
LOG.info('Already deleted: %s' % instance_id)
else:
raise Exception(ret.text)
def update_remote_instance_constraints(self, instance):
url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"])
LOG.info('PUT: %s' % url)
ret = requests.put(url, data=json.dumps(instance),
headers=self.headers)
if ret.status_code != 200 and ret.status_code != 204:
raise Exception(ret.text)
def delete_remote_group_constraints(self, instance_group):
url = "%s/instance_group/%s" % (self.maint_endpoint,
instance_group["group_id"])
LOG.info('DELETE: %s' % url)
ret = requests.delete(url, data=None, headers=self.headers)
if ret.status_code != 200 and ret.status_code != 204:
raise Exception(ret.text)
def update_remote_group_constraints(self, instance_group):
url = "%s/instance_group/%s" % (self.maint_endpoint,
instance_group["group_id"])
LOG.info('PUT: %s' % url)
ret = requests.put(url, data=json.dumps(instance_group),
headers=self.headers)
if ret.status_code != 200 and ret.status_code != 204:
raise Exception(ret.text)
def delete_constraints(self):
for instance_id in self.instance_constraints:
self.delete_remote_instance_constraints(instance_id)
self.delete_remote_group_constraints(self.nonha_group)
self.delete_remote_group_constraints(self.ha_group)
def update_constraints(self):
while self.update_constraints_lock:
LOG.info('Waiting update_constraints_lock...')
time.sleep(1)
self.update_constraints_lock = True
LOG.info('Update constraints')
# Pods groupped by ReplicaSet, so we use that id
rs = {r.metadata.name: r.metadata.uid for r in
self.kaapi.list_namespaced_replica_set('demo').items}
max_impacted_members = len(self.nonha_instances) - 1
nonha_group = {
"group_id": rs['demo-nonha'],
"project_id": self.project_id,
"group_name": "demo-nonha",
"anti_affinity_group": False,
"max_instances_per_host": 0,
"max_impacted_members": max_impacted_members,
"recovery_time": 10,
"resource_mitigation": True}
LOG.info('create demo-nonha constraints: %s'
% nonha_group)
ha_group = {
"group_id": rs['demo-ha'],
"project_id": self.project_id,
"group_name": "demo-ha",
"anti_affinity_group": True,
"max_instances_per_host": 1,
"max_impacted_members": 1,
"recovery_time": 10,
"resource_mitigation": True}
LOG.info('create demo-ha constraints: %s'
% ha_group)
if not self.ha_group or self.ha_group != ha_group:
LOG.info('ha instance group need update')
self.update_remote_group_constraints(ha_group)
self.ha_group = ha_group.copy()
if not self.nonha_group or self.nonha_group != nonha_group:
LOG.info('nonha instance group need update')
self.update_remote_group_constraints(nonha_group)
self.nonha_group = nonha_group.copy()
instance_constraints = {}
for ha_instance in self.ha_instances:
instance = {
"instance_id": ha_instance.metadata.uid,
"project_id": self.project_id,
"group_id": ha_group["group_id"],
"instance_name": ha_instance.metadata.name,
"max_interruption_time": 120,
"migration_type": "EVICTION",
"resource_mitigation": True,
"lead_time": 40}
LOG.info('create ha instance constraints: %s' % instance)
instance_constraints[ha_instance.metadata.uid] = instance
for nonha_instance in self.nonha_instances:
instance = {
"instance_id": nonha_instance.metadata.uid,
"project_id": self.project_id,
"group_id": nonha_group["group_id"],
"instance_name": nonha_instance.metadata.name,
"max_interruption_time": 120,
"migration_type": "EVICTION",
"resource_mitigation": True,
"lead_time": 40}
LOG.info('create nonha instance constraints: %s' % instance)
instance_constraints[nonha_instance.metadata.uid] = instance
if not self.instance_constraints:
# Initial instance constraints
LOG.info('create initial instances constraints...')
for instance in [instance_constraints[i] for i
in instance_constraints]:
self.update_remote_instance_constraints(instance)
self.instance_constraints = instance_constraints.copy()
else:
LOG.info('check instances constraints changes...')
added = [i for i in instance_constraints.keys()
if i not in self.instance_constraints]
deleted = [i for i in self.instance_constraints.keys()
if i not in instance_constraints]
modified = [i for i in instance_constraints.keys()
if (i not in added and i not in deleted and
instance_constraints[i] !=
self.instance_constraints[i])]
for instance_id in deleted:
self.delete_remote_instance_constraints(instance_id)
updated = added + modified
for instance in [instance_constraints[i] for i in updated]:
self.update_remote_instance_constraints(instance)
if updated or deleted:
# Some instance constraints have changed
self.instance_constraints = instance_constraints.copy()
self.update_constraints_lock = False
def active_instance_id(self):
# We digtate the active in the beginning
instance = self.ha_instances[0]
LOG.info('Initially Active instance: %s %s' %
(instance.metadata.name, instance.metadata.uid))
name = instance.metadata.name
namespace = instance.metadata.namespace
body = {"metadata": {"labels": {"active": "True"}}}
self.kapi.patch_namespaced_pod(name, namespace, body)
self.active_instance_id = instance.metadata.uid
def switch_over_ha_instance(self, instance_id):
if instance_id == self.active_instance_id:
# Need to switchover as instance_id will be affected and is active
for instance in self.ha_instances:
if instance_id == instance.metadata.uid:
LOG.info('Active to Standby: %s %s' %
(instance.metadata.name, instance.metadata.uid))
name = instance.metadata.name
namespace = instance.metadata.namespace
body = client.UNKNOWN_BASE_TYPE()
body.metadata.labels = {"ative": None}
self.kapi.patch_namespaced_pod(name, namespace, body)
else:
LOG.info('Standby to Active: %s %s' %
(instance.metadata.name, instance.metadata.uid))
name = instance.metadata.name
namespace = instance.metadata.namespace
body = client.UNKNOWN_BASE_TYPE()
body.metadata.labels = {"ative": "True"}
self.kapi.patch_namespaced_pod(name, namespace, body)
self.active_instance_id = instance.metadata.uid
self.update_instances()
def get_instance_ids(self):
instances = self.kapi.list_pod_for_all_namespaces().items
return [i.metadata.uid for i in instances
if i.metadata.name.startswith("demo-") and
i.metadata.namespace == "demo"]
def update_instances(self):
instances = self.kapi.list_pod_for_all_namespaces().items
self.ha_instances = [i for i in instances
if i.metadata.name.startswith("demo-ha") and
i.metadata.namespace == "demo"]
self.nonha_instances = [i for i in instances
if i.metadata.name.startswith("demo-nonha") and
i.metadata.namespace == "demo"]
def _alarm_data_decoder(self, data):
if "[" in data or "{" in data:
# string to list or dict removing unicode
data = yaml.load(data.replace("u'", "'"))
return data
def _alarm_traits_decoder(self, data):
return ({str(t[0]): self._alarm_data_decoder(str(t[2]))
for t in data['reason_data']['event']['traits']})
def get_session_instance_ids(self, url, session_id):
ret = requests.get(url, data=None, headers=self.headers)
if ret.status_code != 200:
raise Exception(ret.text)
LOG.info('get_instance_ids %s' % ret.json())
return ret.json()['instance_ids']
def scale_instances(self, scale_instances):
number_of_instances_before = len(self.nonha_instances)
replicas = number_of_instances_before + scale_instances
# We only scale nonha apps
namespace = "demo"
name = "demo-nonha"
body = {'spec': {"replicas": replicas}}
self.kaapi.patch_namespaced_replica_set_scale(name, namespace, body)
time.sleep(3)
# Let's check if scale has taken effect
self.update_instances()
number_of_instances_after = len(self.nonha_instances)
check = 20
while number_of_instances_after == number_of_instances_before:
if check == 0:
LOG.error('scale_instances with: %d failed, still %d instances'
% (scale_instances, number_of_instances_after))
raise Exception('scale_instances failed')
check -= 1
time.sleep(1)
self.update_instances()
number_of_instances_after = len(self.nonha_instances)
LOG.info('scaled instances from %d to %d' %
(number_of_instances_before, number_of_instances_after))
def number_of_instances(self):
instances = self.kapi.list_pod_for_all_namespaces().items
return len([i for i in instances
if i.metadata.name.startswith("demo-")])
def instance_action(self, instance_id, allowed_actions):
# We should keep instance constraint in our internal structur
# and match instance_id specific allowed action. Now we assume EVICTION
if 'EVICTION' not in allowed_actions:
LOG.error('Action for %s not foudn from %s' %
(instance_id, allowed_actions))
return None
return 'EVICTION'
def instance_action_started(self, instance_id, action):
time_now = datetime.datetime.utcnow()
max_interruption_time = (
self.instance_constraints[instance_id]['max_interruption_time'])
self.pending_actions[instance_id] = {
'started': time_now,
'max_interruption_time': max_interruption_time,
'action': action}
def was_instance_action_in_time(self, instance_id):
time_now = datetime.datetime.utcnow()
started = self.pending_actions[instance_id]['started']
limit = self.pending_actions[instance_id]['max_interruption_time']
action = self.pending_actions[instance_id]['action']
td = time_now - started
if td.total_seconds() > limit:
LOG.error('%s %s took too long: %ds' %
(instance_id, action, td.total_seconds()))
LOG.error('%s max_interruption_time %ds might be too short' %
(instance_id, limit))
raise Exception('%s %s took too long: %ds' %
(instance_id, action, td.total_seconds()))
else:
LOG.info('%s %s with recovery time took %ds' %
(instance_id, action, td.total_seconds()))
del self.pending_actions[instance_id]
def run(self):
app = Flask('VNFM')
@app.route('/maintenance', methods=['POST'])
def maintenance_alarm():
data = json.loads(request.data.decode('utf8'))
try:
payload = self._alarm_traits_decoder(data)
except Exception:
payload = ({t[0]: t[2] for t in
data['reason_data']['event']['traits']})
LOG.error('cannot parse alarm data: %s' % payload)
raise Exception('VNFM cannot parse alarm.'
'Possibly trait data over 256 char')
LOG.info('VNFM received data = %s' % payload)
state = payload['state']
reply_state = None
reply = dict()
LOG.info('VNFM state: %s' % state)
if state == 'MAINTENANCE':
self.headers['X-Auth-Token'] = self.session.get_token()
instance_ids = (self.get_session_instance_ids(
payload['instance_ids'],
payload['session_id']))
reply['instance_ids'] = instance_ids
reply_state = 'ACK_MAINTENANCE'
elif state == 'SCALE_IN':
# scale down only nonha instances
nonha_instances = len(self.nonha_instances)
scale_in = nonha_instances / 2
self.scale_instances(-scale_in)
self.update_constraints()
reply['instance_ids'] = self.get_instance_ids()
reply_state = 'ACK_SCALE_IN'
elif state == 'MAINTENANCE_COMPLETE':
# possibly need to upscale
number_of_instances = self.number_of_instances()
if self.orig_number_of_instances > number_of_instances:
scale_instances = (self.orig_number_of_instances -
number_of_instances)
self.scale_instances(scale_instances)
self.update_constraints()
reply_state = 'ACK_MAINTENANCE_COMPLETE'
elif (state == 'PREPARE_MAINTENANCE' or
state == 'PLANNED_MAINTENANCE'):
instance_id = payload['instance_ids'][0]
instance_action = (self.instance_action(instance_id,
payload['allowed_actions']))
if not instance_action:
raise Exception('Allowed_actions not supported for %s' %
instance_id)
LOG.info('VNFM got instance: %s' % instance_id)
self.switch_over_ha_instance(instance_id)
reply['instance_action'] = instance_action
reply_state = 'ACK_%s' % state
self.instance_action_started(instance_id, instance_action)
elif state == 'INSTANCE_ACTION_DONE':
# TBD was action done in max_interruption_time (live migration)
# NOTE, in EVICTION instance_id reported that was in evicted
# node. New instance_id might be different
LOG.info('%s' % payload['instance_ids'])
self.was_instance_action_in_time(payload['instance_ids'][0])
self.update_instances()
self.update_constraints()
else:
raise Exception('VNFM received event with'
' unknown state %s' % state)
if reply_state:
reply['session_id'] = payload['session_id']
reply['state'] = reply_state
url = payload['reply_url']
LOG.info('VNFM reply: %s' % reply)
requests.put(url, data=json.dumps(reply), headers=self.headers)
return 'OK'
@app.route('/shutdown', methods=['POST'])
def shutdown():
LOG.info('shutdown VNFM server at %s' % time.time())
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
return 'VNFM shutting down...'
app.run(host="0.0.0.0", port=self.port)
if __name__ == '__main__':
app_manager = VNFM(CONF, LOG)
app_manager.start()
try:
LOG.info('Press CTRL + C to quit')
while True:
time.sleep(2)
except KeyboardInterrupt:
app_manager.stop()