############################################################################## # Copyright (c) 2020 Nokia Corporation # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import aodhclient.client as aodhclient import argparse import datetime from flask import Flask from flask import request import json from keystoneauth1 import loading from keystoneclient import client as ks_client import logging as lging import os from oslo_config import cfg from oslo_log import log as logging import requests import sys from threading import Thread import time import yaml try: import fenix.utils.identity_auth as identity_auth except ValueError: sys.path.append('../utils') import identity_auth LOG = logging.getLogger(__name__) streamlog = lging.StreamHandler(sys.stdout) formatter = lging.Formatter("%(asctime)s: %(message)s") streamlog.setFormatter(formatter) LOG.logger.addHandler(streamlog) LOG.logger.setLevel(logging.INFO) def get_identity_auth(conf, project=None, username=None, password=None): loader = loading.get_plugin_loader('password') return loader.load_from_options( auth_url=conf.service_user.os_auth_url, username=(username or conf.service_user.os_username), password=(password or conf.service_user.os_password), user_domain_name=conf.service_user.os_user_domain_name, project_name=(project or conf.service_user.os_project_name), tenant_name=(project or conf.service_user.os_project_name), project_domain_name=conf.service_user.os_project_domain_name) class InfraAdmin(object): def __init__(self, conf, log): self.conf = conf self.log = log self.app = None def start(self): self.log.info('InfraAdmin start...') self.app = InfraAdminManager(self.conf, self.log) self.app.start() def stop(self): self.log.info('InfraAdmin stop...') if not self.app: return headers = { 'Content-Type': 'application/json', 'Accept': 'application/json', } url = 'http://%s:%d/shutdown'\ % (self.conf.host, self.conf.port) requests.post(url, data='', headers=headers) class InfraAdminManager(Thread): def __init__(self, conf, log, project='service'): Thread.__init__(self) self.conf = conf self.log = log self.project = project # Now we are as admin:admin:admin by default. This means we listen # notifications/events as admin # This means Fenix service user needs to be admin:admin:admin # self.auth = identity_auth.get_identity_auth(conf, # project=self.project) self.auth = get_identity_auth(conf, project='service', username='fenix', password='admin') self.session = identity_auth.get_session(auth=self.auth) self.keystone = ks_client.Client(version='v3', session=self.session) self.aodh = aodhclient.Client(2, self.session) self.headers = { 'Content-Type': 'application/json', 'Accept': 'application/json'} self.project_id = self.keystone.projects.list(name=self.project)[0].id self.headers['X-Auth-Token'] = self.session.get_token() self.create_alarm() services = self.keystone.services.list() for service in services: if service.type == 'maintenance': LOG.info('maintenance service: %s:%s type %s' % (service.name, service.id, service.type)) maint_id = service.id self.endpoint = [ep.url for ep in self.keystone.endpoints.list() if ep.service_id == maint_id and ep.interface == 'public'][0] self.log.info('maintenance endpoint: %s' % self.endpoint) if self.conf.workflow_file: with open(self.conf.workflow_file) as json_file: self.session_request = yaml.safe_load(json_file) else: if self.conf.cloud_type == 'openstack': metadata = {'openstack': 'upgrade'} elif self.conf.cloud_type in ['k8s', 'kubernetes']: metadata = {'kubernetes': 'upgrade'} else: metadata = {} self.session_request = {'state': 'MAINTENANCE', 'workflow': self.conf.workflow, 'metadata': metadata, 'actions': [ {"plugin": "dummy", "type": "host", "metadata": {"foo": "bar"}}]} self.start_maintenance() def create_alarm(self): alarms = {alarm['name']: alarm for alarm in self.aodh.alarm.list()} alarm_name = "%s_MAINTENANCE_SESSION" % self.project if alarm_name not in alarms: alarm_request = dict( name=alarm_name, description=alarm_name, enabled=True, alarm_actions=[u'http://%s:%d/maintenance_session' % (self.conf.host, self.conf.port)], repeat_actions=True, severity=u'moderate', type=u'event', event_rule=dict(event_type=u'maintenance.session')) self.aodh.alarm.create(alarm_request) alarm_name = "%s_MAINTENANCE_HOST" % self.project if alarm_name not in alarms: alarm_request = dict( name=alarm_name, description=alarm_name, enabled=True, alarm_actions=[u'http://%s:%d/maintenance_host' % (self.conf.host, self.conf.port)], repeat_actions=True, severity=u'moderate', type=u'event', event_rule=dict(event_type=u'maintenance.host')) self.aodh.alarm.create(alarm_request) def start_maintenance(self): self.log.info('Waiting AODH to initialize...') time.sleep(5) raw_input('--Press ENTER to start maintenance session--') maintenance_at = (datetime.datetime.utcnow() + datetime.timedelta(seconds=20) ).strftime('%Y-%m-%d %H:%M:%S') self.session_request['maintenance_at'] = maintenance_at self.headers['X-Auth-Token'] = self.session.get_token() url = self.endpoint + "/maintenance" self.log.info('Start maintenance session: %s\n%s\n%s' % (url, self.headers, self.session_request)) ret = requests.post(url, data=json.dumps(self.session_request), headers=self.headers) session_id = ret.json()['session_id'] self.log.info('--== Maintenance session %s instantiated ==--' % session_id) def _alarm_data_decoder(self, data): if "[" in data or "{" in data: # string to list or dict removing unicode data = yaml.load(data.replace("u'", "'")) return data def _alarm_traits_decoder(self, data): return ({str(t[0]): self._alarm_data_decoder(str(t[2])) for t in data['reason_data']['event']['traits']}) def run(self): app = Flask('InfraAdmin') @app.route('/maintenance_host', methods=['POST']) def maintenance_host(): data = json.loads(request.data.decode('utf8')) try: payload = self._alarm_traits_decoder(data) except Exception: payload = ({t[0]: t[2] for t in data['reason_data']['event']['traits']}) self.log.error('cannot parse alarm data: %s' % payload) raise Exception('VNFM cannot parse alarm.' 'Possibly trait data over 256 char') state = payload['state'] host = payload['host'] session_id = payload['session_id'] self.log.info("%s: Host: %s %s" % (session_id, host, state)) return 'OK' @app.route('/maintenance_session', methods=['POST']) def maintenance_session(): data = json.loads(request.data.decode('utf8')) try: payload = self._alarm_traits_decoder(data) except Exception: payload = ({t[0]: t[2] for t in data['reason_data']['event']['traits']}) self.log.error('cannot parse alarm data: %s' % payload) raise Exception('VNFM cannot parse alarm.' 'Possibly trait data over 256 char') state = payload['state'] percent_done = payload['percent_done'] session_id = payload['session_id'] self.log.info("%s: %s%% done in state %s" % (session_id, percent_done, state)) if state in ['MAINTENANCE_FAILED', 'MAINTENANCE_DONE']: self.headers['X-Auth-Token'] = self.session.get_token() raw_input('--Press any key to remove %s session--' % session_id) self.log.info('Remove maintenance session %s....' % session_id) url = ('%s/maintenance/%s' % (self.endpoint, session_id)) self.headers['X-Auth-Token'] = self.session.get_token() ret = requests.delete(url, data=None, headers=self.headers) LOG.info('Press CTRL + C to quit') if ret.status_code != 200: raise Exception(ret.text) return 'OK' @app.route('/shutdown', methods=['POST']) def shutdown(): self.log.info('shutdown InfraAdmin server at %s' % time.time()) func = request.environ.get('werkzeug.server.shutdown') if func is None: raise RuntimeError('Not running with the Werkzeug Server') func() return 'InfraAdmin shutting down...' app.run(host=self.conf.host, port=self.conf.port) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Workflow Admin tool') parser.add_argument('--file', type=str, default=None, help='Workflow sesssion creation arguments file') parser.add_argument('--host', type=str, default=None, help='the ip of InfraAdmin') parser.add_argument('--port', type=int, default=None, help='the port of InfraAdmin') args = parser.parse_args() opts = [ cfg.StrOpt('host', default=(args.host or '127.0.0.1'), help='the ip of InfraAdmin', required=True), cfg.IntOpt('port', default=(args.port or '12349'), help='the port of InfraAdmin', required=True), cfg.StrOpt('workflow', default=os.environ.get('WORKFLOW', 'vnf'), help='Workflow to be used', required=True), cfg.StrOpt('cloud_type', default=os.environ.get('CLOUD_TYPE', 'openstack'), help='Cloud type for metadata', required=True), cfg.StrOpt('workflow_file', default=(args.file or None), help='Workflow session creation arguments file', required=True)] CONF = cfg.CONF CONF.register_opts(opts) CONF.register_opts(identity_auth.os_opts, group='service_user') app = InfraAdmin(CONF, LOG) app.start() try: LOG.info('Press CTRL + C to quit') while True: time.sleep(2) except KeyboardInterrupt: app.stop()