Fix O-RAN (API v2) pull of current state

While the change in
https://review.opendev.org/c/starlingx/ptp-notification-armada-app/+/860494
solved the problem of moving PTP tracking service across nodes for
notification API v1, it didn't managed properly for v2.
This change fixes the problem for v2 while improves the solution for v1.

Test Plan:
PASS: Both v1 & v2 of API tested for pull of current states.

Closes-Bug: #1991793
Signed-off-by: Douglas Henrique Koerich <douglashenrique.koerich@windriver.com>
Change-Id: I1afe8a864e8cda909743c2e91b93a7bc8dda66e8
This commit is contained in:
Douglas Henrique Koerich 2022-11-09 09:56:37 -03:00
parent 1d0a102859
commit dfd4dc8c36
5 changed files with 69 additions and 31 deletions

View File

@ -19,20 +19,28 @@ LOG = logging.getLogger(__name__)
log_helper.config_logger(LOG)
def ProcessWorkerDefault(event, subscription_event, daemon_context):
def ProcessWorkerDefault(event,
subscription_event,
daemon_context,
service_nodenames):
'''Entry point of Default Process Worker'''
worker = NotificationWorker(event, subscription_event, daemon_context)
worker = NotificationWorker(event,
subscription_event,
daemon_context,
service_nodenames)
worker.run()
class DaemonControl(object):
def __init__(self, daemon_context, process_worker=None):
self.daemon_context = daemon_context
self.residing_node_name = daemon_context['THIS_NODE_NAME']
self.event = mp.Event()
self.subscription_event = mp.Event()
self.manager = mp.Manager()
self.daemon_context = self.manager.dict(daemon_context)
LOG.debug('Managed (shared) daemon_context id %d contents %s' %
(id(self.daemon_context), daemon_context))
self.service_nodenames = self.manager.list()
LOG.debug('Managed (shared) list of nodes id %d contents %s' %
(id(self.service_nodenames), self.service_nodenames))
self.registration_endpoint = RpcEndpointInfo(
daemon_context['REGISTRATION_TRANSPORT_ENDPOINT'])
self.registration_transport = rpc_helper.get_transport(
@ -46,7 +54,8 @@ class DaemonControl(object):
self.mpinstance = mp.Process(target=process_worker,
args=(self.event,
self.subscription_event,
self.daemon_context))
daemon_context,
self.service_nodenames))
self.mpinstance.start()
# initial update
@ -61,5 +70,11 @@ class DaemonControl(object):
self.subscription_event.set()
self.event.set()
def get_service_nodename(self):
return self.daemon_context['SERVICE_NODE_NAME']
def get_residing_nodename(self):
return self.residing_node_name
def in_service_nodenames(self, nodename):
return nodename in self.service_nodenames
def list_of_service_nodenames(self):
return self.service_nodenames[:]

View File

@ -56,7 +56,8 @@ class NotificationWorker:
return self.locationinfo_dispatcher.produce_location_event(
location_info)
def __init__(self, event, subscription_event, daemon_context):
def __init__(self, event, subscription_event, daemon_context,
service_nodenames):
self.__alive = True
self.daemon_context = daemon_context
@ -68,6 +69,8 @@ class NotificationWorker:
self.event = event
self.subscription_event = subscription_event
self.service_nodenames = service_nodenames
self.__locationinfo_handler = \
NotificationWorker.LocationInfoHandler(self)
self.__notification_handler = NotificationHandler()
@ -174,14 +177,14 @@ class NotificationWorker:
self.__persist_locationinfo(location_info, nodeinfo_repo)
_nodeinfo_added = \
_nodeinfo_added + (1 if is_nodeinfo_added else 0)
if is_nodeinfo_added and \
node_name not in self.service_nodenames:
self.service_nodenames.append(node_name)
LOG.debug("List of nodes updated: id %d contents %s" %
(id(self.service_nodenames),
self.service_nodenames))
_nodeinfo_updated = \
_nodeinfo_updated + (1 if is_nodeinfo_updated else 0)
if is_nodeinfo_added or is_nodeinfo_updated:
LOG.debug("Setting daemon's SERVICE_NODE_NAME to %s"
% node_name)
self.daemon_context['SERVICE_NODE_NAME'] = node_name
LOG.debug("Daemon context updated: id %d contents %s"
% (id(self.daemon_context), self.daemon_context))
continue
LOG.debug("Finished consuming location event")

View File

@ -30,18 +30,38 @@ class CurrentStateController(rest.RestController):
@expose('json')
def get(self):
try:
service_nodenames = \
notification_control.list_of_service_nodenames()
LOG.debug('List of service nodes: %s' % service_nodenames)
if len(service_nodenames) == 0:
LOG.warning('No PTP service available yet')
abort(404)
# Starting with residing node, try querying the announced locations
# since the notification app may have moved to another node
nodename = notification_control.get_residing_nodename()
ptpservice = PtpService(notification_control)
service_node_name = notification_control.get_service_nodename()
LOG.debug('service_node_name is %s' % service_node_name)
ptpstatus = ptpservice.query(service_node_name)
LOG.debug('Got ptpstatus: %s' % ptpstatus)
# response.status = 200
return ptpstatus
except client_exception.NodeNotAvailable as ex:
LOG.warning("Node is not available:{0}".format(str(ex)))
while len(service_nodenames) > 0:
try:
LOG.debug('Querying nodename: %s' % nodename)
ptpstatus = ptpservice.query(nodename)
LOG.debug('Got ptpstatus: %s' % ptpstatus)
# response.status = 200
return ptpstatus
except client_exception.NodeNotAvailable as ex:
LOG.warning("{0}".format(str(ex)))
service_nodenames.remove(nodename)
if len(service_nodenames) > 0:
nodename = service_nodenames[0]
except Exception:
raise # break
LOG.warning('No PTP service available')
abort(404)
except client_exception.ResourceNotAvailable as ex:
LOG.warning("Resource is not available:{0}".format(str(ex)))
LOG.warning("{0}".format(str(ex)))
abort(404)
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("Resource is not reachable:{0}".format(str(ex)))

View File

@ -34,16 +34,17 @@ class ResourceAddressController(object):
_, nodename, resource, optional, self.resource_address = \
subscription_helper.parse_resource_address(
self.resource_address)
service_node_name = notification_control.get_service_nodename()
LOG.debug('service_node_name is %s' % service_node_name)
if nodename != service_node_name and nodename != '.':
if nodename == '.':
nodename = notification_control.get_residing_nodename()
LOG.debug('Nodename to query: %s' % nodename)
if not notification_control.in_service_nodenames(nodename):
LOG.warning("Node {} is not available".format(nodename))
abort(404)
if resource not in constants.VALID_SOURCE_URI:
LOG.warning("Resource {} is not valid".format(resource))
abort(404)
ptpservice = PtpService(notification_control)
ptpstatus = ptpservice.query(service_node_name,
ptpstatus = ptpservice.query(nodename,
self.resource_address, optional)
LOG.debug('Got ptpstatus: %s' % ptpstatus)
# Change time from float to ascii format
@ -55,10 +56,10 @@ class ResourceAddressController(object):
'%Y-%m-%dT%H:%M:%S%fZ')
return ptpstatus
except client_exception.NodeNotAvailable as ex:
LOG.warning("Node is not available:{0}".format(str(ex)))
LOG.warning("{0}".format(str(ex)))
abort(404)
except client_exception.ResourceNotAvailable as ex:
LOG.warning("Resource is not available:{0}".format(str(ex)))
LOG.warning("{0}".format(str(ex)))
abort(404)
except oslo_messaging.exceptions.MessagingTimeout as ex:
LOG.warning("Resource is not reachable:{0}".format(str(ex)))

View File

@ -33,7 +33,6 @@ sqlalchemy_conf_json = json.dumps(sqlalchemy_conf)
daemon_context = {
'SQLALCHEMY_CONF_JSON': sqlalchemy_conf_json,
'THIS_NODE_NAME': THIS_NODE_NAME,
'SERVICE_NODE_NAME': THIS_NODE_NAME,
'REGISTRATION_TRANSPORT_ENDPOINT': REGISTRATION_TRANSPORT_ENDPOINT,
'NOTIFICATION_BROKER_USER': NOTIFICATION_BROKER_USER,
'NOTIFICATION_BROKER_PASS': NOTIFICATION_BROKER_PASS,