# Copyright (c) 2020 Nokia Corporation. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import aodhclient.client as aodhclient import datetime from flask import Flask from flask import request import json from keystoneauth1 import loading from keystoneclient import client as ks_client from kubernetes import client from kubernetes import config import logging as lging from oslo_config import cfg from oslo_log import log as logging import requests import sys from threading import Thread import time import yaml try: import fenix.utils.identity_auth as identity_auth except ValueError: sys.path.append('../utils') import identity_auth LOG = logging.getLogger(__name__) streamlog = lging.StreamHandler(sys.stdout) LOG.logger.addHandler(streamlog) LOG.logger.setLevel(logging.INFO) opts = [ cfg.StrOpt('ip', default='127.0.0.1', help='the ip of VNFM', required=True), cfg.IntOpt('port', default='12348', help='the port of VNFM', required=True), ] CONF = cfg.CONF CONF.register_opts(opts) CONF.register_opts(identity_auth.os_opts, group='service_user') def get_identity_auth(conf, project=None, username=None, password=None): loader = loading.get_plugin_loader('password') return loader.load_from_options( auth_url=conf.service_user.os_auth_url, username=(username or conf.service_user.os_username), password=(password or conf.service_user.os_password), user_domain_name=conf.service_user.os_user_domain_name, project_name=(project or conf.service_user.os_project_name), tenant_name=(project or conf.service_user.os_project_name), project_domain_name=conf.service_user.os_project_domain_name) class VNFM(object): def __init__(self, conf, log): self.conf = conf self.log = log self.app = None def start(self): LOG.info('VNFM start......') self.app = VNFManager(self.conf, self.log) self.app.start() def stop(self): LOG.info('VNFM stop......') if not self.app: return self.app.headers['X-Auth-Token'] = self.app.session.get_token() self.app.delete_constraints() headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } url = 'http://%s:%d/shutdown'\ % (self.conf.ip, self.conf.port) requests.post(url, data='', headers=headers) class VNFManager(Thread): def __init__(self, conf, log): Thread.__init__(self) self.conf = conf self.log = log self.port = self.conf.port self.intance_ids = None # VNFM is started with OS_* exported as admin user # We need that to query Fenix endpoint url # Still we work with our tenant/poroject/vnf as demo self.project = "demo" LOG.info('VNFM project: %s' % self.project) self.auth = identity_auth.get_identity_auth(conf, project=self.project) self.session = identity_auth.get_session(auth=self.auth) self.ks = ks_client.Client(version='v3', session=self.session) self.aodh = aodhclient.Client(2, self.session) # Subscribe to mainenance event alarm from Fenix via AODH self.create_alarm() config.load_kube_config() self.kaapi = client.AppsV1Api() self.kapi = client.CoreV1Api() self.headers = { 'Content-Type': 'application/json', 'Accept': 'application/json'} self.headers['X-Auth-Token'] = self.session.get_token() self.orig_number_of_instances = self.number_of_instances() # List of instances self.ha_instances = [] self.nonha_instances = [] # Different instance_id specific constraints {instanse_id: {},...} self.instance_constraints = None # Update existing instances to instance lists self.update_instances() # How many instances needs to exists (with current VNF load) # max_impacted_members need to be updated accordingly # if number of instances is scaled. example for demo-ha: # max_impacted_members = len(self.ha_instances) - ha_group_limit self.ha_group_limit = 2 self.nonha_group_limit = 2 # Different instance groups constraints dict self.ha_group = None self.nonha_group = None auth = get_identity_auth(conf, project='service', username='fenix', password='admin') session = identity_auth.get_session(auth=auth) keystone = ks_client.Client(version='v3', session=session) # VNF project_id (VNF ID) self.project_id = self.session.get_project_id() # HA instance_id that is active has active label self.active_instance_id = self.active_instance_id() services = keystone.services.list() for service in services: if service.type == 'maintenance': LOG.info('maintenance service: %s:%s type %s' % (service.name, service.id, service.type)) maint_id = service.id self.maint_endpoint = [ep.url for ep in keystone.endpoints.list() if ep.service_id == maint_id and ep.interface == 'public'][0] LOG.info('maintenance endpoint: %s' % self.maint_endpoint) self.update_constraints_lock = False self.update_constraints() # Instances waiting action to be done self.pending_actions = {} def create_alarm(self): alarms = {alarm['name']: alarm for alarm in self.aodh.alarm.list()} alarm_name = "%s_MAINTENANCE_ALARM" % self.project if alarm_name in alarms: return alarm_request = dict( name=alarm_name, description=alarm_name, enabled=True, alarm_actions=[u'http://%s:%d/maintenance' % (self.conf.ip, self.conf.port)], repeat_actions=True, severity=u'moderate', type=u'event', event_rule=dict(event_type=u'maintenance.scheduled')) self.aodh.alarm.create(alarm_request) def delete_remote_instance_constraints(self, instance_id): url = "%s/instance/%s" % (self.maint_endpoint, instance_id) LOG.info('DELETE: %s' % url) ret = requests.delete(url, data=None, headers=self.headers) if ret.status_code != 200 and ret.status_code != 204: if ret.status_code == 404: LOG.info('Already deleted: %s' % instance_id) else: raise Exception(ret.text) def update_remote_instance_constraints(self, instance): url = "%s/instance/%s" % (self.maint_endpoint, instance["instance_id"]) LOG.info('PUT: %s' % url) ret = requests.put(url, data=json.dumps(instance), headers=self.headers) if ret.status_code != 200 and ret.status_code != 204: raise Exception(ret.text) def delete_remote_group_constraints(self, instance_group): url = "%s/instance_group/%s" % (self.maint_endpoint, instance_group["group_id"]) LOG.info('DELETE: %s' % url) ret = requests.delete(url, data=None, headers=self.headers) if ret.status_code != 200 and ret.status_code != 204: raise Exception(ret.text) def update_remote_group_constraints(self, instance_group): url = "%s/instance_group/%s" % (self.maint_endpoint, instance_group["group_id"]) LOG.info('PUT: %s' % url) ret = requests.put(url, data=json.dumps(instance_group), headers=self.headers) if ret.status_code != 200 and ret.status_code != 204: raise Exception(ret.text) def delete_constraints(self): for instance_id in self.instance_constraints: self.delete_remote_instance_constraints(instance_id) self.delete_remote_group_constraints(self.nonha_group) self.delete_remote_group_constraints(self.ha_group) def update_constraints(self): while self.update_constraints_lock: LOG.info('Waiting update_constraints_lock...') time.sleep(1) self.update_constraints_lock = True LOG.info('Update constraints') # Pods groupped by ReplicaSet, so we use that id rs = {r.metadata.name: r.metadata.uid for r in self.kaapi.list_namespaced_replica_set('demo').items} max_impacted_members = len(self.nonha_instances) - 1 nonha_group = { "group_id": rs['demo-nonha'], "project_id": self.project_id, "group_name": "demo-nonha", "anti_affinity_group": False, "max_instances_per_host": 0, "max_impacted_members": max_impacted_members, "recovery_time": 10, "resource_mitigation": True} LOG.info('create demo-nonha constraints: %s' % nonha_group) ha_group = { "group_id": rs['demo-ha'], "project_id": self.project_id, "group_name": "demo-ha", "anti_affinity_group": True, "max_instances_per_host": 1, "max_impacted_members": 1, "recovery_time": 10, "resource_mitigation": True} LOG.info('create demo-ha constraints: %s' % ha_group) if not self.ha_group or self.ha_group != ha_group: LOG.info('ha instance group need update') self.update_remote_group_constraints(ha_group) self.ha_group = ha_group.copy() if not self.nonha_group or self.nonha_group != nonha_group: LOG.info('nonha instance group need update') self.update_remote_group_constraints(nonha_group) self.nonha_group = nonha_group.copy() instance_constraints = {} for ha_instance in self.ha_instances: instance = { "instance_id": ha_instance.metadata.uid, "project_id": self.project_id, "group_id": ha_group["group_id"], "instance_name": ha_instance.metadata.name, "max_interruption_time": 120, "migration_type": "EVICTION", "resource_mitigation": True, "lead_time": 40} LOG.info('create ha instance constraints: %s' % instance) instance_constraints[ha_instance.metadata.uid] = instance for nonha_instance in self.nonha_instances: instance = { "instance_id": nonha_instance.metadata.uid, "project_id": self.project_id, "group_id": nonha_group["group_id"], "instance_name": nonha_instance.metadata.name, "max_interruption_time": 120, "migration_type": "EVICTION", "resource_mitigation": True, "lead_time": 40} LOG.info('create nonha instance constraints: %s' % instance) instance_constraints[nonha_instance.metadata.uid] = instance if not self.instance_constraints: # Initial instance constraints LOG.info('create initial instances constraints...') for instance in [instance_constraints[i] for i in instance_constraints]: self.update_remote_instance_constraints(instance) self.instance_constraints = instance_constraints.copy() else: LOG.info('check instances constraints changes...') added = [i for i in instance_constraints.keys() if i not in self.instance_constraints] deleted = [i for i in self.instance_constraints.keys() if i not in instance_constraints] modified = [i for i in instance_constraints.keys() if (i not in added and i not in deleted and instance_constraints[i] != self.instance_constraints[i])] for instance_id in deleted: self.delete_remote_instance_constraints(instance_id) updated = added + modified for instance in [instance_constraints[i] for i in updated]: self.update_remote_instance_constraints(instance) if updated or deleted: # Some instance constraints have changed self.instance_constraints = instance_constraints.copy() self.update_constraints_lock = False def active_instance_id(self): # We digtate the active in the beginning instance = self.ha_instances[0] LOG.info('Initially Active instance: %s %s' % (instance.metadata.name, instance.metadata.uid)) name = instance.metadata.name namespace = instance.metadata.namespace body = {"metadata": {"labels": {"active": "True"}}} self.kapi.patch_namespaced_pod(name, namespace, body) self.active_instance_id = instance.metadata.uid def switch_over_ha_instance(self, instance_id): if instance_id == self.active_instance_id: # Need to switchover as instance_id will be affected and is active for instance in self.ha_instances: if instance_id == instance.metadata.uid: LOG.info('Active to Standby: %s %s' % (instance.metadata.name, instance.metadata.uid)) name = instance.metadata.name namespace = instance.metadata.namespace body = client.UNKNOWN_BASE_TYPE() body.metadata.labels = {"ative": None} self.kapi.patch_namespaced_pod(name, namespace, body) else: LOG.info('Standby to Active: %s %s' % (instance.metadata.name, instance.metadata.uid)) name = instance.metadata.name namespace = instance.metadata.namespace body = client.UNKNOWN_BASE_TYPE() body.metadata.labels = {"ative": "True"} self.kapi.patch_namespaced_pod(name, namespace, body) self.active_instance_id = instance.metadata.uid self.update_instances() def get_instance_ids(self): instances = self.kapi.list_pod_for_all_namespaces().items return [i.metadata.uid for i in instances if i.metadata.name.startswith("demo-") and i.metadata.namespace == "demo"] def update_instances(self): instances = self.kapi.list_pod_for_all_namespaces().items self.ha_instances = [i for i in instances if i.metadata.name.startswith("demo-ha") and i.metadata.namespace == "demo"] self.nonha_instances = [i for i in instances if i.metadata.name.startswith("demo-nonha") and i.metadata.namespace == "demo"] def _alarm_data_decoder(self, data): if "[" in data or "{" in data: # string to list or dict removing unicode data = yaml.load(data.replace("u'", "'")) return data def _alarm_traits_decoder(self, data): return ({str(t[0]): self._alarm_data_decoder(str(t[2])) for t in data['reason_data']['event']['traits']}) def get_session_instance_ids(self, url, session_id): ret = requests.get(url, data=None, headers=self.headers) if ret.status_code != 200: raise Exception(ret.text) LOG.info('get_instance_ids %s' % ret.json()) return ret.json()['instance_ids'] def scale_instances(self, scale_instances): number_of_instances_before = len(self.nonha_instances) replicas = number_of_instances_before + scale_instances # We only scale nonha apps namespace = "demo" name = "demo-nonha" body = {'spec': {"replicas": replicas}} self.kaapi.patch_namespaced_replica_set_scale(name, namespace, body) time.sleep(3) # Let's check if scale has taken effect self.update_instances() number_of_instances_after = len(self.nonha_instances) check = 20 while number_of_instances_after == number_of_instances_before: if check == 0: LOG.error('scale_instances with: %d failed, still %d instances' % (scale_instances, number_of_instances_after)) raise Exception('scale_instances failed') check -= 1 time.sleep(1) self.update_instances() number_of_instances_after = len(self.nonha_instances) LOG.info('scaled instances from %d to %d' % (number_of_instances_before, number_of_instances_after)) def number_of_instances(self): instances = self.kapi.list_pod_for_all_namespaces().items return len([i for i in instances if i.metadata.name.startswith("demo-")]) def instance_action(self, instance_id, allowed_actions): # We should keep instance constraint in our internal structur # and match instance_id specific allowed action. Now we assume EVICTION if 'EVICTION' not in allowed_actions: LOG.error('Action for %s not foudn from %s' % (instance_id, allowed_actions)) return None return 'EVICTION' def instance_action_started(self, instance_id, action): time_now = datetime.datetime.utcnow() max_interruption_time = ( self.instance_constraints[instance_id]['max_interruption_time']) self.pending_actions[instance_id] = { 'started': time_now, 'max_interruption_time': max_interruption_time, 'action': action} def was_instance_action_in_time(self, instance_id): time_now = datetime.datetime.utcnow() started = self.pending_actions[instance_id]['started'] limit = self.pending_actions[instance_id]['max_interruption_time'] action = self.pending_actions[instance_id]['action'] td = time_now - started if td.total_seconds() > limit: LOG.error('%s %s took too long: %ds' % (instance_id, action, td.total_seconds())) LOG.error('%s max_interruption_time %ds might be too short' % (instance_id, limit)) raise Exception('%s %s took too long: %ds' % (instance_id, action, td.total_seconds())) else: LOG.info('%s %s with recovery time took %ds' % (instance_id, action, td.total_seconds())) del self.pending_actions[instance_id] def run(self): app = Flask('VNFM') @app.route('/maintenance', methods=['POST']) def maintenance_alarm(): data = json.loads(request.data.decode('utf8')) try: payload = self._alarm_traits_decoder(data) except Exception: payload = ({t[0]: t[2] for t in data['reason_data']['event']['traits']}) LOG.error('cannot parse alarm data: %s' % payload) raise Exception('VNFM cannot parse alarm.' 'Possibly trait data over 256 char') LOG.info('VNFM received data = %s' % payload) state = payload['state'] reply_state = None reply = dict() LOG.info('VNFM state: %s' % state) if state == 'MAINTENANCE': self.headers['X-Auth-Token'] = self.session.get_token() instance_ids = (self.get_session_instance_ids( payload['instance_ids'], payload['session_id'])) reply['instance_ids'] = instance_ids reply_state = 'ACK_MAINTENANCE' elif state == 'SCALE_IN': # scale down only nonha instances nonha_instances = len(self.nonha_instances) scale_in = nonha_instances / 2 self.scale_instances(-scale_in) self.update_constraints() reply['instance_ids'] = self.get_instance_ids() reply_state = 'ACK_SCALE_IN' elif state == 'MAINTENANCE_COMPLETE': # possibly need to upscale number_of_instances = self.number_of_instances() if self.orig_number_of_instances > number_of_instances: scale_instances = (self.orig_number_of_instances - number_of_instances) self.scale_instances(scale_instances) self.update_constraints() reply_state = 'ACK_MAINTENANCE_COMPLETE' elif (state == 'PREPARE_MAINTENANCE' or state == 'PLANNED_MAINTENANCE'): instance_id = payload['instance_ids'][0] instance_action = (self.instance_action(instance_id, payload['allowed_actions'])) if not instance_action: raise Exception('Allowed_actions not supported for %s' % instance_id) LOG.info('VNFM got instance: %s' % instance_id) self.switch_over_ha_instance(instance_id) reply['instance_action'] = instance_action reply_state = 'ACK_%s' % state self.instance_action_started(instance_id, instance_action) elif state == 'INSTANCE_ACTION_DONE': # TBD was action done in max_interruption_time (live migration) # NOTE, in EVICTION instance_id reported that was in evicted # node. New instance_id might be different LOG.info('%s' % payload['instance_ids']) self.was_instance_action_in_time(payload['instance_ids'][0]) self.update_instances() self.update_constraints() else: raise Exception('VNFM received event with' ' unknown state %s' % state) if reply_state: reply['session_id'] = payload['session_id'] reply['state'] = reply_state url = payload['reply_url'] LOG.info('VNFM reply: %s' % reply) requests.put(url, data=json.dumps(reply), headers=self.headers) return 'OK' @app.route('/shutdown', methods=['POST']) def shutdown(): LOG.info('shutdown VNFM server at %s' % time.time()) func = request.environ.get('werkzeug.server.shutdown') if func is None: raise RuntimeError('Not running with the Werkzeug Server') func() return 'VNFM shutting down...' app.run(host="0.0.0.0", port=self.port) if __name__ == '__main__': app_manager = VNFM(CONF, LOG) app_manager.start() try: LOG.info('Press CTRL + C to quit') while True: time.sleep(2) except KeyboardInterrupt: app_manager.stop()