# -*- coding: utf-8 -*- # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import functools from oslo_config import cfg from oslo_context import context from oslo_log import log from oslo_utils import timeutils from namos.common import config from namos.common import exception from namos.common import messaging from namos.db import api as db_api from namos.db import openstack_drivers LOG = log.getLogger(__name__) config.register_conductor_opts() CONF = cfg.CONF def request_context(func): @functools.wraps(func) def wrapped(self, ctx, *args, **kwargs): if ctx is not None and not isinstance(ctx, context.RequestContext): ctx = context.RequestContext.from_dict(ctx.to_dict()) return func(self, ctx, *args, **kwargs) return wrapped class ConductorManager(object): RPC_API_VERSION = '1.0' TOPIC = config.MESSAGE_QUEUE_CONDUCTOR_TOPIC @request_context def add_region(self, context, region): # Move this try except to wrpper fn of the db layer try: db_api.region_create(context, region) except: # noqa raise exception.NamosException() @request_context def region_get_all(self, context): return db_api.region_get_all(context) @request_context def register_myself(self, context, registration_info): LOG.info("REGISTER [%s.%s.%s] START" % ( registration_info['project_name'], registration_info['prog_name'], registration_info['identification'] )) # Service processing sp = ServiceProcessor(registration_info) service_worker_id = sp.process_service(context) # Device Driver processing dp = DriverProcessor(service_worker_id, registration_info['config_dict']) dp.process_drivers(context) LOG.info("REGISTER [%s.%s.%s] DONE" % ( registration_info['project_name'], registration_info['prog_name'], registration_info['identification'] )) self._regisgration_ackw(context, registration_info['identification']) return service_worker_id def _regisgration_ackw(self, context, identification): client = messaging.get_rpc_client(topic='namos.CONF.%s' % identification, version=self.RPC_API_VERSION, exchange=config.PROJECT_NAME) client.cast(context, 'regisgration_ackw', identification=identification) LOG.info("REGISTER [%s] ACK" % identification) @request_context def heart_beat(self, context, identification, dieing=False): try: sw = db_api.service_worker_get_all_by(context, pid=identification) if len(sw) == 1: if not dieing: db_api.service_worker_update( context, sw[0].id, dict(updated_at=timeutils.utcnow())) LOG.info("HEART-BEAT LIVE %s " % identification) else: db_api.service_worker_delete(context, sw[0].id) LOG.info("HEART-BEAT STOPPED %s " % identification) else: LOG.error("HEART-BEAT FAILED, No service worker registered " "with identification %s " % identification) except Exception as e: # noqa LOG.error("HEART-BEAT FAILED %s " % e) @request_context def service_perspective_get(self, context, service_id, include_details=False): return db_api.service_perspective_get(context, service_id, include_details) @request_context def device_perspective_get(self, context, device_id, include_details=False): return db_api.device_perspective_get(context, device_id, include_details) @request_context def region_perspective_get(self, context, region_id, include_details=False): return db_api.region_perspective_get(context, region_id, include_details) @request_context def infra_perspective_get(self, context): return db_api.infra_perspective_get(context) @request_context def view_360(self, context): return db_api.view_360(context) @request_context def config_get_by_name_for_service_worker(self, context, service_worker_id, name=None, only_configured=True): return db_api.config_get_by_name_for_service_worker(context, service_worker_id, name, only_configured) class ServiceProcessor(object): def __init__(self, registration_info): self.registration_info = registration_info def process_service(self, context): # Service Node try: # TODO(mrkanag) region_id is hard-coded, fix it ! # user proper node name instead of fqdn node = db_api.service_node_create( context, dict(name=self.registration_info.get('fqdn'), fqdn=self.registration_info.get('fqdn'), region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0')) LOG.info('Service node %s is created' % node) except exception.AlreadyExist: # TODO(mrkanag) is this to be region specifc search node = db_api.service_node_get_by_name( context, self.registration_info.get('fqdn')) LOG.info('Service node %s is existing' % node) # Service try: s_id = 'b9c2549f-f685-4bc2-92e9-ba8af9c18591' service = db_api.service_create( context, # TODO(mrkanag) use keystone python client and # use real service id here dict(name=self.registration_info.get('project_name'), keystone_service_id=s_id)) LOG.info('Service %s is created' % service) except exception.AlreadyExist: service = db_api.service_get_by_name( context, self.registration_info.get('project_name')) LOG.info('Service %s is existing' % service) # Service Component try: service_component = db_api.service_component_create( context, dict(name=self.registration_info['prog_name'], node_id=node.id, service_id=service.id)) LOG.info('Service Component %s is created' % service_component) except exception.AlreadyExist: service_components = \ db_api.service_component_get_all_by_node_for_service( context, node_id=node.id, service_id=service.id, name=self.registration_info['prog_name'] ) if len(service_components) == 1: service_component = service_components[0] LOG.info('Service Component %s is existing' % service_component) # TODO(mrkanag) what to do when service_components size is > 1 # config file for cfg_f in self.registration_info['config_file_list']: try: config_file = db_api.config_file_create( context, dict(name=cfg_f, file=self.registration_info[ 'config_file_dict'][cfg_f], service_component_id=service_component.id, service_node_id=node.id)) LOG.info('Oslo config file %s is created' % config_file) except exception.AlreadyExist: config_files = \ db_api.config_file_get_by_name_for_service_node( context, service_node_id=node.id, name=cfg_f ) if len(config_files) == 1: config_file = \ db_api.config_file_update( context, config_files[0].id, dict(file=self.registration_info[ 'config_file_dict'][cfg_f])) LOG.info('Oslo config file %s is existing and is updated' % config_file) # Service Worker try: service_worker = db_api.service_worker_create( context, # TODO(mrkanag) Fix the name, device driver proper ! dict(name='%s@%s' % (self.registration_info['pid'], service_component.name), pid=self.registration_info['identification'], host=self.registration_info['host'], service_component_id=service_component.id, deleted_at=None )) LOG.info('Service Worker %s is created' % service_worker) except exception.AlreadyExist: # TODO(mrkanag) Find a way to purge the dead service worker # Once each service is enabled with heart beating namos # purging can be done once heart beat stopped. this can be # done from openstack.common.service.py service_workers = \ db_api.service_worker_get_by_host_for_service_component( context, service_component_id=service_component.id, host=self.registration_info['host'] ) if len(service_workers) == 1: service_worker = \ db_api.service_worker_update( context, service_workers[0].id, dict( deleted_at=None, pid=self.registration_info['identification'], name='%s@%s' % (self.registration_info['pid'], service_component.name) )) LOG.info('Service Worker %s is existing and is updated' % service_worker) # TODO(mrkanag) what to do when service_workers size is > 1 # Config # TODO(mrkanag) Optimize the config like per service_component # or per service_worker, for cfg_name, cfg_obj in self.registration_info[ 'config_dict'].iteritems(): cfg_schs = db_api.config_schema_get_by( context=context, group=cfg_obj['group'], name=cfg_obj['name'] ) if len(cfg_schs) > 1: cfg_sche = cfg_schs[0] LOG.debug("Config Schema %s is existing and is updated" % cfg_sche) else: try: cfg_sche = db_api.config_schema_create( context, dict( namespace='UNKNOWN-NAMOS', default_value=cfg_obj['default_value'], type=cfg_obj['type'], help=cfg_obj['help'], required=cfg_obj['required'], secret=cfg_obj['secret'], mutable=False, group_name=cfg_obj['group'], name=cfg_obj['name'] ) ) LOG.debug("Config Schema %s is created" % cfg_sche) except exception.AlreadyExist: cfg_schs = db_api.config_schema_get_by( context=context, group=cfg_obj['group'], name=cfg_obj['name'], namespace='UNKNOWN-NAMOS' ) cfg_sche = cfg_schs[0] LOG.debug("Config Schema %s is existing and is updated" % cfg_sche) cfg_obj_ = dict( service_worker_id=service_worker.id, name="%s.%s" % (cfg_obj['group'], cfg_name), value=cfg_obj['value'], oslo_config_schema_id=cfg_sche.id ) try: config = db_api.config_create(context, cfg_obj_) LOG.debug("Config %s is created" % config) except exception.AlreadyExist: configs = db_api.config_get_by_name_for_service_worker( context, service_worker_id=cfg_obj_['service_worker_id'], name=cfg_obj_['name']) if len(configs) == 1: config = db_api.config_update(context, configs[0].id, cfg_obj_) LOG.debug("Config %s is existing and is updated" % config) return service_worker.id class DriverProcessor(object): def __init__(self, service_worker_id, config_dict): self.config_dict = config_dict self.service_worker_id = service_worker_id def _identify_drivers(self): return (set(openstack_drivers.get_drivers_config().keys()) & set(self.config_dict.keys())) def _get_value(self, name): if name is None: return name if isinstance(name, str): # Constant naming if name[0] == '#': return name[1:] return (self.config_dict[name].get('value') or self.config_dict[name].get('default_value')) elif isinstance(name, tuple): fn = name[0] args = list() for var in name[1:]: args.append(self._get_value(var)) return fn(*args) elif isinstance(name, list): fmt_str = name[0] params = [self._get_value(param) for param in name[1:]] return fmt_str % tuple(params) @staticmethod def _to_list(list_in_str): def strip_out(s): start_idx = 0 end_idx = len(s) if s[start_idx] == '[' \ or s[start_idx] == '\'' \ or s[start_idx] == '"': start_idx += 1 if s[end_idx - 1] == ']' \ or s[end_idx - 1] == '\'' \ or s[end_idx - 1] == '"': end_idx -= 1 return s[start_idx:end_idx] l = [] for s in strip_out(list_in_str.strip()).split(','): s = str(strip_out(s.strip())) l.append(s) return l def process_drivers(self, context): for driver_key in self._identify_drivers(): try: drivers = self._get_value(driver_key) drivers = DriverProcessor._to_list(drivers) for driver_name in drivers: self.process_driver(context, driver_key, driver_name) except KeyError: # noqa # TODO(mrkanag) run namos-manager and restart nova-scheduler # KeyError: 'libvirt.virt_type' is thrown, fix it LOG.error('Failed to process driver %s in service worker %s' % (driver_key, self.service_worker_id)) continue def process_driver(self, context, driver_key, driver_name): driver_config = \ openstack_drivers.get_drivers_config()[driver_key][driver_name] if driver_config.get('alias') is not None: alias = driver_config.get('alias') driver_config = \ openstack_drivers.get_drivers_config() for key in alias.split(':'): driver_config = driver_config[key] driver_name = key driver_def = \ openstack_drivers.get_drivers_def()[driver_name] connection = dict() endpoint_type = None connection_cfg = None device_endpoint_name = None device_cfg = None child_device_cfg = None if driver_config.get('device') is not None: device_cfg = driver_config['device'] if driver_config['endpoint'].get('type') is not None: endpoint_type = driver_config['endpoint']['type'] if endpoint_type[0] != '#': endpoint_type = self._get_value(endpoint_type) connection_cfg = driver_config['endpoint'][endpoint_type][ 'connection'] device_endpoint_name = self._get_value( driver_config['endpoint'][endpoint_type]['name']) # override the device name if driver_config['endpoint'][endpoint_type].get( 'device') is not None: device_cfg = driver_config['endpoint'][endpoint_type][ 'device'] if driver_config['endpoint'][endpoint_type].get( 'child_device') is not None: child_device_cfg = driver_config['endpoint'][ endpoint_type]['child_device'] else: endpoint_type = None connection_cfg = driver_config['endpoint']['connection'] device_endpoint_name = self._get_value( driver_config['endpoint']['name'] ) # override the device name if driver_config['endpoint'].get('device') is not None: device_cfg = driver_config['endpoint']['device'] if driver_config['endpoint'].get('child_device') is not None: child_device_cfg = driver_config['endpoint'][ 'child_device'] # Device device_name = self._get_value(device_cfg['name']) try: # TODO(mrkanag) region_id is hard-coded, fix it ! # Set the right status as well device = db_api.device_create( context, dict(name=device_name, status='active', region_id='f7dcd175-27ef-46b5-997f-e6e572f320b0')) LOG.info('Device %s is created' % device) except exception.AlreadyExist: device = db_api.device_get_by_name( context, device_name) LOG.info('Device %s is existing' % device) # TODO(mrkanag) Poperly Handle child devices if child_device_cfg is not None: for d_name in self._get_value(child_device_cfg['key']): base_name = self._get_value(child_device_cfg['base_name']) d_name = '%s-%s' % (base_name, d_name) try: device = db_api.device_get_by_name( context, d_name) LOG.info('Device %s is existing' % device) except exception.DeviceNotFound: # TODO(mrkanag) region_id is hard-coded, fix it ! # Set the right status as well r_id = 'f7dcd175-27ef-46b5-997f-e6e572f320b0' device = db_api.device_create( context, dict(name=d_name, status='active', parent_id=device.id, region_id=r_id)) LOG.info('Device %s is created' % device) # Device Endpoint try: for k, v in connection_cfg.iteritems(): connection[k] = self._get_value(k) device_endpoint = db_api.device_endpoint_create( context, dict(name=device_endpoint_name, connection=connection, type=endpoint_type, device_id=device.id)) LOG.info('Device Endpoint %s is created' % device_endpoint) except exception.AlreadyExist: device_endpoints = db_api.device_endpoint_get_by_device_type( context, device_id=device.id, type=endpoint_type, name=device_endpoint_name) if len(device_endpoints) >= 1: device_endpoint = device_endpoints[0] LOG.info('Device Endpoint %s is existing' % device_endpoints[0]) # Device Driver Class try: device_driver_class = db_api.device_driver_class_create( context, dict(name=driver_name, python_class=driver_name, type=driver_def['type'], device_id=device.id, endpoint_id=device_endpoint.id, service_worker_id=self.service_worker_id, extra=driver_def.get('extra'))) LOG.info('Device Driver Class %s is created' % device_driver_class) except exception.AlreadyExist: device_driver_class = db_api.device_driver_class_get_by_name( context, driver_name) LOG.info('Device Driver Class %s is existing' % device_driver_class) # Device Driver try: device_driver = db_api.device_driver_create( context, dict(device_id=device.id, name=driver_name, endpoint_id=device_endpoint.id, device_driver_class_id=device_driver_class.id, service_worker_id=self.service_worker_id) ) LOG.info('Device Driver %s is created' % device_driver) except exception.AlreadyExist: device_drivers = \ db_api.device_driver_get_by_device_endpoint_service_worker( context, device_id=device.id, endpoint_id=device_endpoint.id, device_driver_class_id=device_driver_class.id, service_worker_id=self.service_worker_id ) if len(device_drivers) >= 1: device_driver = device_drivers[0] LOG.info('Device Driver %s is existing' % device_driver) if __name__ == '__main__': print (DriverProcessor(None, None)._to_list("[\"file\', \'http\']"))