From 1ac45c8f432113b7f22d35a8c7a898f1d9dd51d7 Mon Sep 17 00:00:00 2001 From: Andre Mauricio Zelak Date: Fri, 3 Mar 2023 15:10:15 -0300 Subject: [PATCH] Add support to '/sync' aka sync all subscription When adding a new subscription check for an existing matching one, considering the source uri hierachy. Deny a new individual if there is already a sync all subscription, and deny a new sync all if there is already an invidual one. After a new sync all subscription is created a set of event messages are sent to the client containing the initial state of each source down in the hierarchy. And, every time one of the source states changes a new message is sent. Test Plan: PASS: Build the container images PASS: Mannually deploy them and test with v2 client PASS: Create a '/././sync' subscription and check the event messages PASS: Check current subscription list PASS: Change GNSS sync state and check the event messages PASS: Attempt to create a new individual subscription and check it fails PASS: Delete the '/././sync' subscription PASS: Check current subscription list again Closes-bug: 2009188 Signed-off-by: Andre Mauricio Zelak Change-Id: I90b642e73f30fb1798f4a93ab5313411c177949c --- .../common/helpers/subscription_helper.py | 13 +- .../exception/client_exception.py | 7 + .../services/notification_handler.py | 6 +- .../notificationclientsdk/services/ptp.py | 132 ++++++++++-------- .../sidecar/controllers/v2/subscriptions.py | 2 + .../trackingfunctionsdk/services/daemon.py | 65 ++++++--- 6 files changed, 141 insertions(+), 84 deletions(-) diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py index 7a6344f..e5a868c 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/common/helpers/subscription_helper.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2022 Wind River Systems, Inc. +# Copyright (c) 2021-2023 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -24,12 +24,13 @@ def notify(subscriptioninfo, notification, timeout=2, retry=3): retry = retry - 1 try: headers = {'Content-Type': 'application/json'} - data = format_notification_data(subscriptioninfo, notification) - data = json.dumps(data) url = subscriptioninfo.EndpointUri - response = requests.post(url, data=data, headers=headers, - timeout=timeout) - response.raise_for_status() + for item in notification: + data = format_notification_data(subscriptioninfo, {item: notification[item]}) + data = json.dumps(data) + response = requests.post(url, data=data, headers=headers, + timeout=timeout) + response.raise_for_status() result = True return response except client_exception.InvalidResource as ex: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py index bb89576..1784b0d 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/exception/client_exception.py @@ -44,6 +44,13 @@ class InvalidSubscription(Exception): return "Subscription is invalid:{0}".format( self.subscriptioninfo.to_dict()) +class SubscriptionAlreadyExists(Exception): + def __init__(self, subscriptioninfo): + self.subscriptioninfo = subscriptioninfo + + def __str__(self): + return "Subscription already exists: {0}".format( + self.subscriptioninfo) class ServiceError(Exception): def __init__(self, code, *args): diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index 8745284..412ff50 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2022 Wind River Systems, Inc. +# Copyright (c) 2021-2023 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -82,7 +82,9 @@ class NotificationHandler(NotificationHandlerBase): if entry.ResourceAddress: _, entry_node_name, entry_resource_path, _, _ = \ subscription_helper.parse_resource_address(entry.ResourceAddress) - if entry_resource_path not in resource_address: + _, _, event_resource_path, _, _ = \ + subscription_helper.parse_resource_address(resource_address) + if not event_resource_path.startswith(entry_resource_path): continue subscription_dto2 = SubscriptionInfoV2(entry) else: diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py index 8262615..b5297e5 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/ptp.py @@ -1,5 +1,5 @@ # -# Copyright (c) 2021-2022 Wind River Systems, Inc. +# Copyright (c) 2021-2023 Wind River Systems, Inc. # # SPDX-License-Identifier: Apache-2.0 # @@ -127,75 +127,97 @@ class PtpService(object): notificationservice_client.cleanup() del notificationservice_client + def _match_resource_address(self, resource_address_a, resource_address_b): + + clustername_a, nodename_a, resource_path_a, _, _ = \ + subscription_helper.parse_resource_address( + resource_address_a) + + clustername_b, nodename_b, resource_path_b, _, _ = \ + subscription_helper.parse_resource_address( + resource_address_b) + + # Compare cluster names + if clustername_a != clustername_b: + return False, "clusterName {0} is different from {1}".format( + clustername_a, clustername_b) + + # Compare node names + # If one of them is '*' skip comparison + if nodename_a != constants.WILDCARD_ALL_NODES and \ + nodename_b != constants.WILDCARD_ALL_NODES: + + # If one of the nodename is '.' replace by + # current node. + if nodename_a == constants.WILDCARD_CURRENT_NODE: + nodename_a = self.daemon_control.get_residing_nodename() + if nodename_b == constants.WILDCARD_CURRENT_NODE: + nodename_b = self.daemon_control.get_residing_nodename() + + if nodename_a != nodename_b: + return False, "nodeName {0} is different from {1}".format( + nodename_a, nodename_b) + + # Compare resource path + if resource_path_a == resource_path_b: + return True, "resourceAddress {0} is equal to {1}".format( + resource_address_a, resource_address_b) + + if resource_path_a.startswith(resource_path_b): + return True, "resourceAddress {1} contains {0}".format( + resource_address_a, resource_address_b) + + if resource_path_b.startswith(resource_path_a): + return True, "resourceAddress {0} contains {1}".format( + resource_address_a, resource_address_b) + + return False, "resourceAddress {0} is different from {1}".format( + resource_address_a, resource_address_b) + + def add_subscription(self, subscription_dto): resource_address = None if hasattr(subscription_dto, 'ResourceAddress'): version = 2 - _, nodename, _, _, _ = subscription_helper.parse_resource_address( - subscription_dto.ResourceAddress) - LOG.debug("nodename in ResourceAddress is '%s', residing is %s" % - (nodename, self.daemon_control.get_residing_nodename())) - + endpoint = subscription_dto.EndpointUri resource_address = subscription_dto.ResourceAddress + LOG.debug('Looking for existing subscription for EndpointUri %s ' - 'ResourceAddress %s' % (subscription_dto.EndpointUri, - resource_address)) + 'ResourceAddress %s' % (endpoint, resource_address)) + entry = self.subscription_repo.get_one( - EndpointUri=subscription_dto.EndpointUri, + EndpointUri=endpoint, ResourceAddress=resource_address) + # Did not find matched duplicated, but needs to look for other + # cases... if entry is None: - # Did not find matched duplicated, but needs to look for other - # cases... - if nodename != constants.WILDCARD_ALL_NODES: - # There may be a subscription for all nodes already in - # place - resource_address_star = \ - subscription_helper.set_nodename_in_resource_address( - resource_address, constants.WILDCARD_ALL_NODES) - LOG.debug('Additional lookup for existing subscription ' - 'for EndpointUri %s ResourceAddress %s' - % (subscription_dto.EndpointUri, - resource_address_star)) - if self.subscription_repo.get_one( - EndpointUri=subscription_dto.EndpointUri, - ResourceAddress=resource_address_star) is not None: - LOG.debug('Found existing %s entry in subscription ' - 'repo' % constants.WILDCARD_ALL_NODES) - raise client_exception.ServiceError(409) - if nodename == constants.WILDCARD_CURRENT_NODE: - # There may be a subscription for the residing (current) - # node already in place - resource_address_synonym = \ - subscription_helper.set_nodename_in_resource_address( - resource_address, - self.daemon_control.get_residing_nodename()) - LOG.debug('In addition, looking for existing subscription ' - 'for EndpointUri %s ResourceAddress %s' % ( - subscription_dto.EndpointUri, - resource_address_synonym)) - entry = self.subscription_repo.get_one( - EndpointUri=subscription_dto.EndpointUri, - ResourceAddress=resource_address_synonym) + subscriptions = self.subscription_repo.get( + EndpointUri=endpoint) - if nodename == self.daemon_control.get_residing_nodename(): - # There may be a subscription for '.' (current node) - # already in place - resource_address_synonym = \ - subscription_helper.set_nodename_in_resource_address( - resource_address, constants.WILDCARD_CURRENT_NODE) - LOG.debug('In addition, looking for existing subscription ' - 'for EndpointUri %s ResourceAddress %s' % ( - subscription_dto.EndpointUri, - resource_address_synonym)) - entry = self.subscription_repo.get_one( - EndpointUri=subscription_dto.EndpointUri, - ResourceAddress=resource_address_synonym) + for subscription in subscriptions: + match, message = self._match_resource_address( + subscription.ResourceAddress, resource_address) + + if match: + entry = subscription + LOG.debug(message) + break if entry is not None: LOG.debug('Found existing v2 entry in subscription repo') - raise client_exception.ServiceError(409) + subscriptioninfo = { + 'SubscriptionId': entry.SubscriptionId, + 'UriLocation': entry.UriLocation, + 'EndpointUri': entry.EndpointUri, + 'ResourceAddress': entry.ResourceAddress + } + raise client_exception.SubscriptionAlreadyExists( + subscriptioninfo) + + _, nodename, _, _, _ = subscription_helper.parse_resource_address( + resource_address) if nodename == constants.WILDCARD_ALL_NODES: broker_names = self.daemon_control.list_of_service_nodenames() diff --git a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py index ecf04cf..610db77 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py +++ b/notificationclient-base/docker/notificationclient-sidecar/sidecar/controllers/v2/subscriptions.py @@ -63,6 +63,8 @@ class SubscriptionsControllerV2(rest.RestController): return subscription except client_exception.ServiceError as err: abort(int(str(err))) + except client_exception.SubscriptionAlreadyExists as ex: + abort(409, str(ex)) except client_exception.InvalidSubscription: abort(400) except client_exception.InvalidEndpoint as ex: diff --git a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index eb4f57b..799f744 100644 --- a/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base-v2/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -123,9 +123,8 @@ class PtpWatcherDefault: if resource_address: _, nodename, resource_path = utils.parse_resource_address( resource_address) - if resource_path == constants.SOURCE_SYNC_ALL: - resource_path = constants.SOURCE_SYNC_SYNC_STATE - if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS: + if resource_path == constants.SOURCE_SYNC_GNSS_SYNC_STATUS or \ + resource_path == constants.SOURCE_SYNC_ALL: self.watcher.gnsstracker_context_lock.acquire() if optional and self.watcher.gnsstracker_context.get(optional): sync_state = \ @@ -135,7 +134,10 @@ class PtpWatcherDefault: self.watcher.gnsstracker_context[optional].get( 'last_event_time', time.time()) lastStatus[optional] = self._build_event_response( - resource_path, last_event_time, resource_address, + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, + last_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_GNSS_SYNC_STATUS), sync_state) elif not optional: for config in self.daemon_context['GNSS_INSTANCES']: @@ -146,12 +148,16 @@ class PtpWatcherDefault: self.watcher.gnsstracker_context[config].get( 'last_event_time', time.time()) lastStatus[config] = self._build_event_response( - resource_path, last_event_time, - resource_address, sync_state) + constants.SOURCE_SYNC_GNSS_SYNC_STATUS, + last_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_GNSS_SYNC_STATUS), + sync_state) else: lastStatus = None self.watcher.gnsstracker_context_lock.release() - elif resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS: + if resource_path == constants.SOURCE_SYNC_PTP_CLOCK_CLASS or \ + resource_path == constants.SOURCE_SYNC_ALL: self.watcher.ptptracker_context_lock.acquire() if optional and self.watcher.ptptracker_context.get(optional): clock_class = \ @@ -161,9 +167,11 @@ class PtpWatcherDefault: self.watcher.ptptracker_context[optional].get( 'last_clock_class_event_time', time.time()) lastStatus[optional] = self._build_event_response( - resource_path, last_clock_class_event_time, - resource_address, clock_class, - constants.VALUE_TYPE_METRIC) + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + last_clock_class_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS), + clock_class, constants.VALUE_TYPE_METRIC) elif not optional: for config in self.daemon_context['PTP4L_INSTANCES']: clock_class = \ @@ -174,13 +182,16 @@ class PtpWatcherDefault: 'last_clock_class_event_time', time.time()) lastStatus[config] = self._build_event_response( - resource_path, last_clock_class_event_time, - resource_address, clock_class, - constants.VALUE_TYPE_METRIC) + constants.SOURCE_SYNC_PTP_CLOCK_CLASS, + last_clock_class_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_PTP_CLOCK_CLASS), + clock_class, constants.VALUE_TYPE_METRIC) else: lastStatus = None self.watcher.ptptracker_context_lock.release() - elif resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE: + if resource_path == constants.SOURCE_SYNC_PTP_LOCK_STATE or \ + resource_path == constants.SOURCE_SYNC_ALL: self.watcher.ptptracker_context_lock.acquire() if optional and self.watcher.ptptracker_context.get(optional): sync_state = \ @@ -190,7 +201,10 @@ class PtpWatcherDefault: self.watcher.ptptracker_context[optional].get( 'last_event_time', time.time()) lastStatus[optional] = self._build_event_response( - resource_path, last_event_time, resource_address, + constants.SOURCE_SYNC_PTP_LOCK_STATE, + last_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_PTP_LOCK_STATE), sync_state) elif not optional: for config in self.daemon_context['PTP4L_INSTANCES']: @@ -201,13 +215,17 @@ class PtpWatcherDefault: self.watcher.ptptracker_context[config].get( 'last_event_time', time.time()) lastStatus[config] = self._build_event_response( - resource_path, last_event_time, - resource_address, sync_state) + constants.SOURCE_SYNC_PTP_LOCK_STATE, + last_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_PTP_LOCK_STATE), + sync_state) else: lastStatus = None self.watcher.ptptracker_context_lock.release() - elif resource_path == constants.SOURCE_SYNC_OS_CLOCK: + if resource_path == constants.SOURCE_SYNC_OS_CLOCK or \ + resource_path == constants.SOURCE_SYNC_ALL: self.watcher.osclocktracker_context_lock.acquire() sync_state = \ self.watcher.osclocktracker_context.get( @@ -217,9 +235,12 @@ class PtpWatcherDefault: 'last_event_time', time.time()) self.watcher.osclocktracker_context_lock.release() lastStatus['os_clock_status'] = self._build_event_response( - resource_path, last_event_time, resource_address, + constants.SOURCE_SYNC_OS_CLOCK, last_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_OS_CLOCK), sync_state) - elif resource_path == constants.SOURCE_SYNC_SYNC_STATE: + if resource_path == constants.SOURCE_SYNC_SYNC_STATE or \ + resource_path == constants.SOURCE_SYNC_ALL: self.watcher.overalltracker_context_lock.acquire() sync_state = self.watcher.overalltracker_context.get( 'sync_state', OverallClockState.Freerun) @@ -228,7 +249,9 @@ class PtpWatcherDefault: self.watcher.overalltracker_context_lock.release() lastStatus['overall_sync_status'] = \ self._build_event_response( - resource_path, last_event_time, resource_address, + constants.SOURCE_SYNC_SYNC_STATE, last_event_time, + utils.format_resource_address(nodename, + constants.SOURCE_SYNC_SYNC_STATE), sync_state) LOG.debug("query_status: {}".format(lastStatus)) else: