diff --git a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py index 722b463..8745284 100644 --- a/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py +++ b/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/services/notification_handler.py @@ -1,147 +1,157 @@ -# -# Copyright (c) 2021-2022 Wind River Systems, Inc. -# -# SPDX-License-Identifier: Apache-2.0 -# - -import json -import logging - -import multiprocessing as mp -import threading -import time -from datetime import datetime, timezone - -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 -from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 -from notificationclientsdk.model.dto.resourcetype import ResourceType - -from notificationclientsdk.repository.subscription_repo import SubscriptionRepo - -from notificationclientsdk.common.helpers import subscription_helper -from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper - -from notificationclientsdk.client.notificationservice import NotificationHandlerBase - -LOG = logging.getLogger(__name__) - -from notificationclientsdk.common.helpers import log_helper - -log_helper.config_logger(LOG) - - -class NotificationHandler(NotificationHandlerBase): - - def __init__(self): - self.__supported_resource_types = (ResourceType.TypePTP,) - self.__init_notification_channel() - pass - - def __init_notification_channel(self): - self.notification_lock = threading.Lock() - self.notification_stat = {} - - # def handle_notification_delivery(self, notification_info): - def handle(self, notification_info): - LOG.debug("start notification delivery") - subscription_repo = None - try: - self.notification_lock.acquire() - subscription_repo = SubscriptionRepo(autocommit=True) - resource_type = notification_info.get('ResourceType', None) - # Get nodename from resource address - if resource_type: - node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) - if not resource_type: - raise Exception("abnormal notification@{0}".format(node_name)) - if not resource_type in self.__supported_resource_types: - raise Exception( - "notification with unsupported resource type:{0}".format(resource_type)) - this_delivery_time = notification_info['EventTimestamp'] - else: - parent_key = list(notification_info.keys())[0] - source = notification_info[parent_key].get('source', None) - values = notification_info[parent_key].get('data', {}).get('values', []) - resource_address = values[0].get('ResourceAddress', None) - this_delivery_time = notification_info[parent_key].get('time') - if not resource_address: - raise Exception("No resource address in notification source".format(source)) - _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) - - entries = subscription_repo.get(Status=1) - for entry in entries: - subscriptionid = entry.SubscriptionId - if entry.ResourceAddress: - _, entry_node_name, _, _, _ = subscription_helper.parse_resource_address( - entry.ResourceAddress) - subscription_dto2 = SubscriptionInfoV2(entry) - else: - ResourceQualifierJson = entry.ResourceQualifierJson or '{}' - ResourceQualifier = json.loads(ResourceQualifierJson) - # qualify by NodeName - entry_node_name = ResourceQualifier.get('NodeName', None) - subscription_dto2 = SubscriptionInfoV1(entry) - node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name) - if not node_name_matched: - continue - - try: - last_delivery_time = self.__get_latest_delivery_timestamp(node_name, - subscriptionid) - if last_delivery_time and last_delivery_time >= this_delivery_time: - # skip this entry since already delivered - LOG.debug("Ignore the outdated notification for: {0}".format( - entry.SubscriptionId)) - continue - - subscription_helper.notify(subscription_dto2, notification_info) - LOG.debug("notification is delivered successfully to {0}".format( - entry.SubscriptionId)) - - self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) - - except Exception as ex: - LOG.warning("notification is not delivered to {0}:{1}".format( - entry.SubscriptionId, str(ex))) - # proceed to next entry - continue - finally: - pass - LOG.debug("Finished notification delivery") - return True - except Exception as ex: - LOG.warning("Failed to delivery notification:{0}".format(str(ex))) - return False - finally: - self.notification_lock.release() - if not subscription_repo: - del subscription_repo - - def __get_latest_delivery_timestamp(self, node_name, subscriptionid): - last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) - return last_delivery_time - - def update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): - if not self.notification_stat.get(node_name, None): - self.notification_stat[node_name] = { - subscriptionid: { - 'EventTimestamp': this_delivery_time - } - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - elif not self.notification_stat[node_name].get(subscriptionid, None): - self.notification_stat[node_name][subscriptionid] = { - 'EventTimestamp': this_delivery_time - } - LOG.debug("delivery time @node: {0},subscription:{1} is added".format( - node_name, subscriptionid)) - else: - last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) - last_delivery_time = last_delivery_stat.get('EventTimestamp', None) - if (last_delivery_time and last_delivery_time >= this_delivery_time): - return - last_delivery_stat['EventTimestamp'] = this_delivery_time - LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( - node_name, subscriptionid)) +# +# Copyright (c) 2021-2022 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +import json +import logging + +import multiprocessing as mp +import threading +import time +from datetime import datetime, timezone + +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV1 +from notificationclientsdk.model.dto.subscription import SubscriptionInfoV2 +from notificationclientsdk.model.dto.resourcetype import ResourceType + +from notificationclientsdk.repository.subscription_repo import SubscriptionRepo + +from notificationclientsdk.common.helpers import subscription_helper +from notificationclientsdk.common.helpers.nodeinfo_helper import NodeInfoHelper + +from notificationclientsdk.client.notificationservice import NotificationHandlerBase + +LOG = logging.getLogger(__name__) + +from notificationclientsdk.common.helpers import log_helper + +log_helper.config_logger(LOG) + + +class NotificationHandler(NotificationHandlerBase): + + def __init__(self): + self.__supported_resource_types = (ResourceType.TypePTP,) + self.__init_notification_channel() + pass + + def __init_notification_channel(self): + self.notification_lock = threading.Lock() + self.notification_stat = {} + + # def handle_notification_delivery(self, notification_info): + def handle(self, notification_info): + LOG.debug("start notification delivery") + subscription_repo = None + resource_address = None + try: + self.notification_lock.acquire() + subscription_repo = SubscriptionRepo(autocommit=True) + resource_type = notification_info.get('ResourceType', None) + # Get nodename from resource address + if resource_type: + node_name = notification_info.get('ResourceQualifier', {}).get('NodeName', None) + if not resource_type: + raise Exception("abnormal notification@{0}".format(node_name)) + if not resource_type in self.__supported_resource_types: + raise Exception( + "notification with unsupported resource type:{0}".format(resource_type)) + this_delivery_time = notification_info['EventTimestamp'] + # Get subscriptions from DB to deliver notification to + entries = subscription_repo.get(Status=1, ResourceType=resource_type) + else: + parent_key = list(notification_info.keys())[0] + source = notification_info[parent_key].get('source', None) + values = notification_info[parent_key].get('data', {}).get('values', []) + resource_address = values[0].get('ResourceAddress', None) + this_delivery_time = notification_info[parent_key].get('time') + if not resource_address: + raise Exception("No resource address in notification source".format(source)) + _, node_name, _, _, _ = subscription_helper.parse_resource_address(resource_address) + # Get subscriptions from DB to deliver notification to. + # Unable to filter on resource_address here because resource_address may contain + # either an unknown node name (ie. controller-0) or a '/./' resulting in entries + # being missed. Instead, filter these v2 subscriptions in the for loop below once + # the resource path has been obtained. + entries = subscription_repo.get(Status=1) + + for entry in entries: + subscriptionid = entry.SubscriptionId + if entry.ResourceAddress: + _, entry_node_name, entry_resource_path, _, _ = \ + subscription_helper.parse_resource_address(entry.ResourceAddress) + if entry_resource_path not in resource_address: + continue + subscription_dto2 = SubscriptionInfoV2(entry) + else: + ResourceQualifierJson = entry.ResourceQualifierJson or '{}' + ResourceQualifier = json.loads(ResourceQualifierJson) + # qualify by NodeName + entry_node_name = ResourceQualifier.get('NodeName', None) + subscription_dto2 = SubscriptionInfoV1(entry) + node_name_matched = NodeInfoHelper.match_node_name(entry_node_name, node_name) + if not node_name_matched: + continue + + try: + last_delivery_time = self.__get_latest_delivery_timestamp(node_name, + subscriptionid) + if last_delivery_time and last_delivery_time >= this_delivery_time: + # skip this entry since already delivered + LOG.debug("Ignore the outdated notification for: {0}".format( + entry.SubscriptionId)) + continue + + subscription_helper.notify(subscription_dto2, notification_info) + LOG.debug("notification is delivered successfully to {0}".format( + entry.SubscriptionId)) + + self.update_delivery_timestamp(node_name, subscriptionid, this_delivery_time) + + except Exception as ex: + LOG.warning("notification is not delivered to {0}:{1}".format( + entry.SubscriptionId, str(ex))) + # proceed to next entry + continue + finally: + pass + LOG.debug("Finished notification delivery") + return True + except Exception as ex: + LOG.warning("Failed to delivery notification:{0}".format(str(ex))) + return False + finally: + self.notification_lock.release() + if not subscription_repo: + del subscription_repo + + def __get_latest_delivery_timestamp(self, node_name, subscriptionid): + last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + return last_delivery_time + + def update_delivery_timestamp(self, node_name, subscriptionid, this_delivery_time): + if not self.notification_stat.get(node_name, None): + self.notification_stat[node_name] = { + subscriptionid: { + 'EventTimestamp': this_delivery_time + } + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + elif not self.notification_stat[node_name].get(subscriptionid, None): + self.notification_stat[node_name][subscriptionid] = { + 'EventTimestamp': this_delivery_time + } + LOG.debug("delivery time @node: {0},subscription:{1} is added".format( + node_name, subscriptionid)) + else: + last_delivery_stat = self.notification_stat.get(node_name, {}).get(subscriptionid, {}) + last_delivery_time = last_delivery_stat.get('EventTimestamp', None) + if (last_delivery_time and last_delivery_time >= this_delivery_time): + return + last_delivery_stat['EventTimestamp'] = this_delivery_time + LOG.debug("delivery time @node: {0},subscription:{1} is updated".format( + node_name, subscriptionid)) diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py index 60d01fc..b13e170 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/common/helpers/gnss_monitor.py @@ -5,6 +5,7 @@ # import logging import datetime +import os.path import re from abc import ABC, abstractmethod @@ -71,6 +72,13 @@ class GnssMonitor(Observer): self.set_gnss_status() def set_gnss_status(self): + # Check that ts2phc is running, else Freerun + if not os.path.isfile('/var/run/ts2phc-%s.pid' % self.ts2phc_service_name): + LOG.warning("TS2PHC instance %s is not running, reporting GNSS unlocked." + % self.ts2phc_service_name) + self._state = GnssState.Failure_Nofix + return + self.gnss_cgu_handler.read_cgu() self.gnss_cgu_handler.cgu_output_to_dict() self.gnss_eec_state = self.gnss_eec_state = \ @@ -79,9 +87,9 @@ class GnssMonitor(Observer): LOG.debug("GNSS EEC Status is: %s" % self.gnss_eec_state) LOG.debug("GNSS PPS Status is: %s" % self.gnss_pps_state) if self.gnss_pps_state == 'locked_ho_ack' and self.gnss_eec_state == 'locked_ho_ack': - self._state = GnssState.Locked + self._state = GnssState.Synchronized else: - self._state = GnssState.Freerun + self._state = GnssState.Failure_Nofix LOG.debug("Set state GNSS to %s" % self._state) @@ -93,18 +101,7 @@ class GnssMonitor(Observer): self.set_gnss_status() - if self._state == constants.FREERUN_PHC_STATE: - if previous_sync_state in [constants.UNKNOWN_PHC_STATE, constants.FREERUN_PHC_STATE]: - self._state = constants.FREERUN_PHC_STATE - elif previous_sync_state == constants.LOCKED_PHC_STATE: - self._state = constants.HOLDOVER_PHC_STATE - elif previous_sync_state == constants.HOLDOVER_PHC_STATE and \ - time_in_holdover < max_holdover_time: - self._state = constants.HOLDOVER_PHC_STATE - else: - self._state = constants.FREERUN_PHC_STATE - - # determine if os clock sync state has changed since the last check + # determine if GNSS state has changed since the last check if self._state != previous_sync_state: new_event = True event_time = datetime.datetime.utcnow().timestamp() diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/gnssstate.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/gnssstate.py index 654b812..7a8afcd 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/gnssstate.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/model/dto/gnssstate.py @@ -3,7 +3,14 @@ from wsme import types as wtypes EnumGnssState = wtypes.Enum(str, 'Locked', 'Freerun', 'Holdover') class GnssState(object): - Locked = "Locked" - Freerun = "Freerun" - Holdover = "Holdover" + # Not all states are implemented on some hardware + Synchronized = "SYNCHRONIZED" + Acquiring_Sync = "ACQUIRING-SYNC" + Antenna_Disconnected = "ANTENNA-DISCONNECTED" + Booting = "BOOTING" + Antenna_Short_Circuit = "ANTENNA-SHORT-CIRCUIT" + Failure_Multipath = "FAULURE-MULTIPATH" + Failure_Nofix = "FAILURE-NOFIX" + Failure_Low_SNR = "FAILURE-LOW-SNR" + Failure_PLL = "FAILURE-PLL" diff --git a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py index ebfc145..9f5c3d3 100644 --- a/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py +++ b/notificationservice-base/docker/ptptrackingfunction/trackingfunctionsdk/services/daemon.py @@ -119,7 +119,7 @@ class PtpWatcherDefault: self.watcher.gnsstracker_context_lock.acquire() if optional: sync_state = self.watcher.gnsstracker_context[optional]. \ - get('sync_state', GnssState.Freerun) + get('sync_state', GnssState.Failure_Nofix) last_event_time = self.watcher.gnsstracker_context[optional].get( 'last_event_time', time.time()) @@ -130,7 +130,7 @@ class PtpWatcherDefault: else: for config in self.daemon_context['GNSS_INSTANCES']: sync_state = self.watcher.gnsstracker_context[config] \ - .get('sync_state', GnssState.Freerun) + .get('sync_state', GnssState.Failure_Nofix) last_event_time = self.watcher.gnsstracker_context[config].get( 'last_event_time', time.time()) @@ -272,7 +272,7 @@ class PtpWatcherDefault: self.gnsstracker_context = {} for config in self.daemon_context['GNSS_INSTANCES']: self.gnsstracker_context[config] = PtpWatcherDefault.DEFAULT_GNSSTRACKER_CONTEXT.copy() - self.gnsstracker_context[config]['sync_state'] = GnssState.Freerun + self.gnsstracker_context[config]['sync_state'] = GnssState.Failure_Nofix self.gnsstracker_context[config]['last_event_time'] = self.init_time self.gnsstracker_context_lock = threading.Lock() LOG.debug("gnsstracker_context: %s" % self.gnsstracker_context) @@ -406,10 +406,10 @@ class PtpWatcherDefault: LOG.debug("Getting overall sync state.") for gnss in self.observer_list: - if gnss._state == GnssState.Holdover or gnss._state == GnssState.Freerun: - gnss_state = GnssState.Freerun - elif gnss._state == GnssState.Locked and gnss_state != GnssState.Freerun: - gnss_state = GnssState.Locked + if gnss._state == constants.UNKNOWN_PHC_STATE or gnss._state == GnssState.Failure_Nofix: + gnss_state = GnssState.Failure_Nofix + elif gnss._state == GnssState.Synchronized and gnss_state != GnssState.Failure_Nofix: + gnss_state = GnssState.Synchronized for ptp4l in self.ptp_monitor_list: _, read_state, _ = ptp4l.get_ptp_sync_state() @@ -421,8 +421,8 @@ class PtpWatcherDefault: os_clock_state = self.os_clock_monitor.get_os_clock_state() - if gnss_state is GnssState.Freerun or os_clock_state is OsClockState.Freerun or ptp_state \ - is PtpState.Freerun: + if gnss_state is GnssState.Failure_Nofix or os_clock_state is OsClockState.Freerun or \ + ptp_state is PtpState.Freerun: sync_state = OverallClockState.Freerun else: sync_state = OverallClockState.Locked