ptp-notification-armada-app/notificationclient-base/docker/notificationclient-sidecar/notificationclientsdk/model/dto/subscription.py
Teresa Ho 67452b94db Implement ORAN v2 api on notificationserver
The commit added the support to publish ptp status for v2 API in the
notification-service.
It also renamed the previously name v0 API to v1 API.

Test Plan:
Pass: PTP status push notification v2 API
Pass: PTP status push notification v1 API
Pass: Subscribe/List/Delete subscription in v2 API
Pass: Subscribe/List/Delete subscription in v1 API

Story: 2010056
Task: 45809

Signed-off-by: Teresa Ho <teresa.ho@windriver.com>
Change-Id: Id5a1ff955eade59d68b6bcadfea4ffe6ed1567cd
2022-08-02 13:59:52 -04:00

132 lines
3.8 KiB
Python

#coding=utf-8
#
# Copyright (c) 2021-2022 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os
import json
from wsme import types as wtypes
import datetime
import time
import uuid
from notificationclientsdk.model.dto.resourcetype import EnumResourceType, ResourceType
'''
Base for Resource Qualifiers
'''
class ResourceQualifierBase(wtypes.Base):
def __init__(self, **kw):
super(ResourceQualifierBase, self).__init__(**kw)
def to_dict(self):
pass
'''
Resource Qualifiers PTP
'''
class ResourceQualifierPtp(ResourceQualifierBase):
NodeName = wtypes.text
def __init__(self, **kw):
self.NodeName = kw.pop('NodeName', None)
super(ResourceQualifierPtp, self).__init__(**kw)
def to_dict(self):
d = {
'NodeName': self.NodeName
}
return d
'''
ViewModel of Subscription
'''
class SubscriptionInfoV1(wtypes.Base):
SubscriptionId = wtypes.text
UriLocation = wtypes.text
ResourceType = EnumResourceType
EndpointUri = wtypes.text
# dynamic type depending on ResourceType
def set_resource_qualifier(self, value):
if isinstance(value, wtypes.Base):
self._ResourceQualifer = value
else:
self._ResourceQualifierJson = value
self._ResourceQualifer = None
def get_resource_qualifier(self):
if not self._ResourceQualifer:
if self.ResourceType == ResourceType.TypePTP:
self._ResourceQualifer = ResourceQualifierPtp(**self._ResourceQualifierJson)
else:
self._ResourceQualifer = None
return self._ResourceQualifer
ResourceQualifier = wtypes.wsproperty(wtypes.Base,
get_resource_qualifier, set_resource_qualifier)
def __init__(self, orm_entry=None):
if orm_entry:
self.SubscriptionId = orm_entry.SubscriptionId
self.ResourceType = orm_entry.ResourceType
self.UriLocation = orm_entry.UriLocation
self.ResourceQualifier = json.loads(orm_entry.ResourceQualifierJson)
self.EndpointUri = orm_entry.EndpointUri
def to_dict(self):
d = {
'SubscriptionId': self.SubscriptionId,
'ResourceType': self.ResourceType,
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri,
'ResourceQualifier': self.ResourceQualifier.to_dict(),
}
return d
def to_orm(self):
d = {
'SubscriptionId': self.SubscriptionId,
'ResourceType': self.ResourceType or '',
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri or '',
'ResourceQualifierJson': json.dumps(self.ResourceQualifier.to_dict()) or '',
}
return d
class SubscriptionInfoV2(wtypes.Base):
SubscriptionId = wtypes.text
UriLocation = wtypes.text
EndpointUri = wtypes.text
ResourceAddress = wtypes.text
def __init__(self, orm_entry=None):
if orm_entry:
self.SubscriptionId = orm_entry.SubscriptionId
self.UriLocation = orm_entry.UriLocation
self.EndpointUri = orm_entry.EndpointUri
self.ResourceAddress = orm_entry.ResourceAddress
def to_dict(self):
d = {
'SubscriptionId': self.SubscriptionId,
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri,
'ResourceAddress': self.ResourceAddress,
}
return d
def to_orm(self):
d = {
'SubscriptionId': self.SubscriptionId,
'UriLocation': self.UriLocation,
'EndpointUri': self.EndpointUri or '',
'ResourceAddress': self.ResourceAddress or ''
}
return d