diff --git a/namos/cmd/manage.py b/namos/cmd/manage.py index 8e072be..5805f3d 100644 --- a/namos/cmd/manage.py +++ b/namos/cmd/manage.py @@ -30,6 +30,9 @@ MANAGE_COMMAND_NAME = 'namos-manage' class HeartBeat(object): + def cleanup(self): + api.cleanup(None) + def report_status(self): print_format = "%-20s%-15s%-15s%-35s%-10s" strip = 90 * '-' @@ -40,12 +43,19 @@ class HeartBeat(object): 'Component', 'Status')) print(strip) - for k, s in api.get_status(None).items(): + for k, s in api.get_status( + None, + CONF.command.node, + CONF.command.service, + CONF.command.type, + CONF.command.component + ).items(): + # if not CONF.command.component and s['is_launcher']: print(print_format % (s['node'], s['type'], s['service'], - s['component'], - s['status'])) + s['worker'], + ':)' if s['status'] else 'XXX')) print(strip) @@ -193,8 +203,15 @@ def add_command_parsers(subparsers): parser.set_defaults(func=OsloConfigSchemaManager().sync) parser = subparsers.add_parser('status') + parser.add_argument('-n', '--node') + parser.add_argument('-s', '--service') + parser.add_argument('-c', '--component') + parser.add_argument('-t', '--type') parser.set_defaults(func=HeartBeat().report_status) + parser = subparsers.add_parser('cleanup') + parser.set_defaults(func=HeartBeat().cleanup) + command_opt = cfg.SubCommandOpt('command', title='Command', help='Available commands', diff --git a/namos/conductor/manager.py b/namos/conductor/manager.py index 8ce4896..07dab39 100644 --- a/namos/conductor/manager.py +++ b/namos/conductor/manager.py @@ -69,7 +69,7 @@ class ConductorManager(object): return db_api.region_delete(context. region_id) @request_context - def add_service_node(self, context, service_node): + def service_node_create(self, context, service_node): return db_api.service_node_create(context, service_node) @request_context @@ -137,12 +137,12 @@ class ConductorManager(object): # TODO(mrkanag) Move this to periofic task, before deleting each # sw, make usre its created atleast 5 mins before - # sp.cleanup(service_component_id) + sp.cleanup(service_component_id) return service_worker_id def _regisgration_ackw(self, context, identification): client = messaging.get_rpc_client( - topic=self.os_namos_listener_topic(identification), + topic=self._os_namos_listener_topic(identification), version=self.RPC_API_VERSION, exchange=namos_config.PROJECT_NAME) client.cast(context, @@ -150,12 +150,12 @@ class ConductorManager(object): identification=identification) LOG.info("REGISTER [%s] ACK" % identification) - def os_namos_listener_topic(self, identification): + def _os_namos_listener_topic(self, identification): return 'namos.CONF.%s' % identification def _ping(self, context, identification): client = messaging.get_rpc_client( - topic=self.os_namos_listener_topic(identification), + topic=self._os_namos_listener_topic(identification), version=self.RPC_API_VERSION, exchange=namos_config.PROJECT_NAME, timeout=1) @@ -170,9 +170,9 @@ class ConductorManager(object): LOG.info("PING [%s] FAILED" % identification) return False - def update_config_file(self, context, identification, name, content): + def _update_config_file(self, context, identification, name, content): client = messaging.get_rpc_client( - topic=self.os_namos_listener_topic(identification), + topic=self._os_namos_listener_topic(identification), version=self.RPC_API_VERSION, exchange=namos_config.PROJECT_NAME, timeout=2) @@ -304,10 +304,10 @@ class ConductorManager(object): # TODO(mrkanag) is ping() better option instead? if utils.find_status(sw): try: - self.update_config_file(context, - sw.pid, - cf.name, - cf.file) + self._update_config_file(context, + sw.pid, + cf.name, + cf.file) cf['status'] = 'completed' return cf except: # noqa @@ -448,8 +448,8 @@ class ServiceProcessor(object): service_worker = db_api.service_worker_create( self.context, # TODO(mrkanag) Fix the name, device driver proper ! - dict(name='%s@%s' % (self.registration_info['pid'], - service_component.name), + dict(name='%s@%s' % (service_component.name, + self.registration_info['pid']), pid=self.registration_info['identification'], host=self.registration_info['host'], service_component_id=service_component.id, @@ -470,37 +470,7 @@ class ServiceProcessor(object): def cleanup(self, service_component_id): # clean up the dead service workers - # TODO(mrkanag) Make this into thread - service_workers = \ - db_api.service_worker_get_all_by( - self.context, - service_component_id=service_component_id - ) - - for srv_wkr in service_workers: - # TODO(mrkanag) Move this to db layer and query non deleted entries - if srv_wkr.deleted_at is not None: - continue - - # TODO(mrkanag) is this interval ok - if utils.find_status(srv_wkr, report_interval=60): - LOG.info('Service Worker %s is live' - % srv_wkr.id) - continue - else: - confs = db_api.config_get_by_name_for_service_worker( - self.context, - service_worker_id=srv_wkr.id - ) - - for conf in confs: - db_api.config_delete(self.context, conf.id) - LOG.info('Config %s is deleted' - % conf.id) - - db_api.service_worker_delete(self.context, srv_wkr.id) - LOG.info('Service Worker %s is deleted' - % srv_wkr.id) + db_api.cleanup(self.context, service_component_id) class ConfigProcessor(object): diff --git a/namos/conductor/rpcapi.py b/namos/conductor/rpcapi.py index 43d3933..39580a4 100644 --- a/namos/conductor/rpcapi.py +++ b/namos/conductor/rpcapi.py @@ -95,7 +95,7 @@ class ConductorAPI(object): region_id=region_id) @wrapper_function - def add_service_node(self, context, service_node): + def service_node_create(self, context, service_node): return self.client.call( context, 'service_node_create', @@ -248,6 +248,6 @@ if __name__ == '__main__': 'dcf0f17b-99f6-49e9-8d5f-23b3ad1167dc', content)) - # print_config_schema() - # print_view_360() - sample_config_update() + print_config_schema() + print_view_360() + # sample_config_update() diff --git a/namos/db/api.py b/namos/db/api.py index e75ccae..1c68ba9 100644 --- a/namos/db/api.py +++ b/namos/db/api.py @@ -484,5 +484,9 @@ def view_360(context, include_conf_file=False, include_status=False): include_status=include_status) -def get_status(context): - return IMPL.get_status(context) +def get_status(context, node=None, service=None, type=None, component=None): + return IMPL.get_status(context, node, service, type, component) + + +def cleanup(context, service_component_id=None, dead_since=300): + return IMPL.cleanup(context, service_component_id, dead_since) diff --git a/namos/db/sqlalchemy/api.py b/namos/db/sqlalchemy/api.py index 81d98e6..40a6500 100644 --- a/namos/db/sqlalchemy/api.py +++ b/namos/db/sqlalchemy/api.py @@ -175,7 +175,7 @@ def device_get_all(context): return _get_all(context, models.Device) -def _device_get_all_by(context, **kwargs): +def device_get_all_by(context, **kwargs): return _get_all_by(context, models.Device, **kwargs) @@ -227,7 +227,7 @@ def device_endpoint_get_all(context): return _get_all(context, models.DeviceEndpoint) -def _device_endpoint_get_all_by(context, **kwargs): +def device_endpoint_get_all_by(context, **kwargs): return _get_all_by(context, models.DeviceEndpoint, **kwargs) @@ -282,7 +282,7 @@ def device_driver_get_all(context): return _get_all(context, models.DeviceDriver) -def _device_driver_get_all_by(context, **kwargs): +def device_driver_get_all_by(context, **kwargs): return _get_all_by(context, models.DeviceDriver, **kwargs) @@ -320,7 +320,7 @@ def device_driver_class_get_all(context): return _get_all(context, models.DeviceDriverClass) -def _device_driver_classget_all_by(context, **kwargs): +def device_driver_class_get_all_by(context, **kwargs): return _get_all_by(context, models.DeviceDriverClass, **kwargs) @@ -358,7 +358,7 @@ def service_get_all(context): return _get_all(context, models.Service) -def _service_get_all_by(context, **kwargs): +def service_get_all_by(context, **kwargs): return _get_all_by(context, models.Service, **kwargs) @@ -396,7 +396,7 @@ def service_node_get_all(context): return _get_all(context, models.ServiceNode) -def _service_node_get_all_by(context, **kwargs): +def service_node_get_all_by(context, **kwargs): return _get_all_by(context, models.ServiceNode, **kwargs) @@ -460,7 +460,7 @@ def service_component_get_all(context): return _get_all(context, models.ServiceComponent) -def _service_component_get_all_by(context, **kwargs): +def service_component_get_all_by(context, **kwargs): return _get_all_by(context, models.ServiceComponent, **kwargs) @@ -510,10 +510,6 @@ def service_worker_get_all(context): def service_worker_get_all_by(context, **kwargs): - return _service_worker_get_all_by(context, **kwargs) - - -def _service_worker_get_all_by(context, **kwargs): return _get_all_by(context, models.ServiceWorker, **kwargs) @@ -547,7 +543,6 @@ def config_schema_get_by_name(context, name): return config -# TODO(mrkanag) fix it to take **kwargs def config_schema_get_by(context, namespace=None, group=None, @@ -622,7 +617,7 @@ def config_get_all(context): return _get_all(context, models.OsloConfig) -def _config_get_all_by(context, **kwargs): +def config_get_all_by(context, **kwargs): return _get_all_by(context, models.OsloConfig, **kwargs) @@ -721,7 +716,7 @@ def config_file_get_all(context): return _get_all(context, models.OsloConfigFile) -def _config_file_get_all_by(context, **kwargs): +def config_file_get_all_by(context, **kwargs): return _get_all_by(context, models.OsloConfigFile, **kwargs) @@ -743,8 +738,8 @@ def service_perspective_get(context, service_id, include_details=False): # on include_details, for each of the entity, include complete details service_perspective = dict() service_perspective['service'] = service_get(context, service_id).to_dict() - service_components = _service_component_get_all_by(context, - service_id=service_id) + service_components = service_component_get_all_by(context, + service_id=service_id) service_perspective['service_components'] = dict() # service_perspective['service_components']['size'] = # len(service_components) @@ -755,7 +750,7 @@ def service_perspective_get(context, service_id, include_details=False): = sc.to_dict() service_perspective['service_components'][sc.id]['service_node']\ = service_node_get(context, sc.node_id).to_dict() - service_workers = _service_worker_get_all_by( + service_workers = service_worker_get_all_by( context, service_component_id=sc.id) service_perspective['service_components'][sc.id]['service_workers'] \ @@ -770,7 +765,7 @@ def service_perspective_get(context, service_id, include_details=False): sc.id]['service_workers'][sw.id][ 'service_worker'] = sw.to_dict() - device_drivers = _device_driver_get_all_by( + device_drivers = device_driver_get_all_by( context, service_worker_id=sw.id) service_perspective['service_components'][ @@ -819,8 +814,8 @@ def device_perspective_get(context, device_id, include_details=False): # on include_details, for each of the entity, include complete details device_perspective = dict() device_perspective['device'] = device_get(context, device_id).to_dict() - endpoints = _device_endpoint_get_all_by(context, - device_id=device_id) + endpoints = device_endpoint_get_all_by(context, + device_id=device_id) device_perspective['device_endpoints'] = dict() # device_perspective['device_endpoints']['size'] = len(endpoints) @@ -829,8 +824,8 @@ def device_perspective_get(context, device_id, include_details=False): device_perspective['device_endpoints'][ ep.id]['device_endpoint'] = ep.to_dict() - device_drivers = _device_driver_get_all_by(context, - endpoint_id=ep.id) + device_drivers = device_driver_get_all_by(context, + endpoint_id=ep.id) device_perspective['device_endpoints'][ ep.id]['device_drivers'] = dict() # device_perspective['device_endpoints'][ep.id] \ @@ -881,8 +876,8 @@ def region_perspective_get(context, region_id, include_details=False): region_perspective = dict() region_perspective['region'] = region_get(context, region_id).to_dict() - s_nodes = _service_node_get_all_by(context, - region_id=region_id) + s_nodes = service_node_get_all_by(context, + region_id=region_id) # region_perspective['service_nodes'] = dict() # region_perspective['service_nodes']['size'] = len(s_nodes) # for s_node in s_nodes: @@ -906,7 +901,7 @@ def region_perspective_get(context, region_id, include_details=False): region_perspective['services'] = dict() for s_node in s_nodes: - s_components = _service_component_get_all_by( + s_components = service_component_get_all_by( context, node_id=s_node.id) srvs = list() @@ -920,7 +915,7 @@ def region_perspective_get(context, region_id, include_details=False): s = service_get(context, s_id) region_perspective['services'][s_id] = s.to_dict() - devices = _device_get_all_by(context, region_id=region_id) + devices = device_get_all_by(context, region_id=region_id) region_perspective['devices'] = dict() # region_perspective['devices']['size'] = len(devices) for d in devices: @@ -980,8 +975,8 @@ def view_360(context, include_conf_file=False, include_status=False): view['region'][rg.id] = region_get(context, rg.id).to_dict() view['region'][rg.id]['service_node'] = dict() - srv_nd_lst = _service_node_get_all_by(context, - region_id=rg.id) + srv_nd_lst = service_node_get_all_by(context, + region_id=rg.id) for srv_nd in srv_nd_lst: # service node view['service_node'][srv_nd.id] = service_node_get( @@ -1051,7 +1046,7 @@ def view_360(context, include_conf_file=False, include_status=False): view['region'][rg.id]['service_node'][srv_nd.id][ 'service_component'][srv_cmp.id][ 'service_worker'][srv_wkr.id]['device_driver'] = dict() - dvc_drv_list = _device_driver_get_all_by( + dvc_drv_list = device_driver_get_all_by( context, service_worker_id=srv_wkr.id ) @@ -1111,33 +1106,75 @@ def view_360(context, include_conf_file=False, include_status=False): return view -def get_status(context): +def get_status(context, node=None, service=None, type=None, component=None): sr = {} for sn in service_node_get_all(context): + if node and not node == sn.name: + continue + for sc in service_component_get_all_by_node_for_service( context, node_id=sn.id ): - service = service_get(context, sc.service_id) + if sc.deleted_at is not None: + continue + + s = service_get(context, sc.service_id) + if service and not s.name == service: + continue + + if type and not sc.type == type: + continue + + if component and not sc.name == component: + continue + for sw in service_worker_get_by_host_for_service_component( context, service_component_id=sc.id ): # TODO(mrkanag) Move this to db layer and query non deleted - # if sw.deleted_at is not None: - # continue + if sw.deleted_at is not None: + continue sr[sw.pid] = ( dict(node=sn.name, type=sc.type, - service=service.name, - component=sw.name, + service=s.name, + component=sc.name, + worker=sw.name, status=utils.find_status(sw), is_launcher=sw.is_launcher)) return sr +def cleanup(context, service_component_id=None, dead_since=300): + # clean up the dead service workers + service_workers = \ + service_worker_get_all_by( + context, + service_component_id=service_component_id + ) + + for srv_wkr in service_workers: + if srv_wkr.deleted_at is not None: + continue + + if utils.find_status(srv_wkr, report_interval=dead_since): + continue + else: + confs = config_get_by_name_for_service_worker( + context, + service_worker_id=srv_wkr.id + ) + + for conf in confs: + config_delete(context, conf.id) + + service_worker_delete(context, srv_wkr.id) + + if __name__ == '__main__': from namos.common import config