Remove interface libraries in ops-sunbeam tests

Remove ops-sunbeam/tests/lib folder and copy them
when running py3 tests on ops-sunbeam.

Change-Id: I4f266761fa07dcd6f9c3cddb7151393296948dac
This commit is contained in:
Hemanth Nakkina 2024-02-22 10:37:21 +05:30
parent b0493c2d31
commit 1fd1f6e5de
No known key found for this signature in database
GPG Key ID: 2E4970F7B143168E
12 changed files with 13 additions and 6152 deletions

View File

@ -479,3 +479,12 @@ function pop_common_files {
popd
}
function copy_libs_for_ops_sunbeam {
mkdir -p tests/lib
cp -rf ../libs/external/lib ../libs/internal/lib tests/
}
function remove_libs_for_ops_sunbeam {
rm -rf tests/lib
}

View File

@ -1,224 +0,0 @@
"""CeilometerServiceProvides and Requires module.
This library contains the Requires and Provides classes for handling
the ceilometer_service interface.
Import `CeilometerServiceRequires` in your charm, with the charm object and the
relation name:
- self
- "ceilometer_service"
Two events are also available to respond to:
- config_changed
- goneaway
A basic example showing the usage of this relation follows:
```
from charms.ceilometer_k8s.v0.ceilometer_service import (
CeilometerServiceRequires
)
class CeilometerServiceClientCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
# CeilometerService Requires
self.ceilometer_service = CeilometerServiceRequires(
self, "ceilometer_service",
)
self.framework.observe(
self.ceilometer_service.on.config_changed,
self._on_ceilometer_service_config_changed
)
self.framework.observe(
self.ceilometer_service.on.goneaway,
self._on_ceiometer_service_goneaway
)
def _on_ceilometer_service_config_changed(self, event):
'''React to the Ceilometer service config changed event.
This event happens when CeilometerService relation is added to the
model and relation data is changed.
'''
# Do something with the configuration provided by relation.
pass
def _on_ceilometer_service_goneaway(self, event):
'''React to the CeilometerService goneaway event.
This event happens when CeilometerService relation is removed.
'''
# CeilometerService Relation has goneaway.
pass
```
"""
import logging
from typing import (
Optional,
)
from ops.charm import (
CharmBase,
RelationBrokenEvent,
RelationChangedEvent,
RelationEvent,
)
from ops.framework import (
EventSource,
Object,
ObjectEvents,
)
from ops.model import (
Relation,
)
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "fcbb94e7a18740729eaf9e2c3b90017f"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
class CeilometerConfigRequestEvent(RelationEvent):
"""CeilometerConfigRequest Event."""
pass
class CeilometerServiceProviderEvents(ObjectEvents):
"""Events class for `on`."""
config_request = EventSource(CeilometerConfigRequestEvent)
class CeilometerServiceProvides(Object):
"""CeilometerServiceProvides class."""
on = CeilometerServiceProviderEvents()
def __init__(self, charm: CharmBase, relation_name: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_ceilometer_service_relation_changed,
)
def _on_ceilometer_service_relation_changed(
self, event: RelationChangedEvent
):
"""Handle CeilometerService relation changed."""
logging.debug("CeilometerService relation changed")
self.on.config_request.emit(event.relation)
def set_config(
self, relation: Optional[Relation], telemetry_secret: str
) -> None:
"""Set ceilometer configuration on the relation."""
if not self.charm.unit.is_leader():
logging.debug("Not a leader unit, skipping set config")
return
# If relation is not provided send config to all the related
# applications. This happens usually when config data is
# updated by provider and wants to send the data to all
# related applications
if relation is None:
logging.debug(
"Sending config to all related applications of relation"
f"{self.relation_name}"
)
for relation in self.framework.model.relations[self.relation_name]:
relation.data[self.charm.app][
"telemetry-secret"
] = telemetry_secret
else:
logging.debug(
f"Sending config on relation {relation.app.name} "
f"{relation.name}/{relation.id}"
)
relation.data[self.charm.app][
"telemetry-secret"
] = telemetry_secret
class CeilometerConfigChangedEvent(RelationEvent):
"""CeilometerConfigChanged Event."""
pass
class CeilometerServiceGoneAwayEvent(RelationEvent):
"""CeilometerServiceGoneAway Event."""
pass
class CeilometerServiceRequirerEvents(ObjectEvents):
"""Events class for `on`."""
config_changed = EventSource(CeilometerConfigChangedEvent)
goneaway = EventSource(CeilometerServiceGoneAwayEvent)
class CeilometerServiceRequires(Object):
"""CeilometerServiceRequires class."""
on = CeilometerServiceRequirerEvents()
def __init__(self, charm: CharmBase, relation_name: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_ceilometer_service_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_ceilometer_service_relation_broken,
)
def _on_ceilometer_service_relation_changed(
self, event: RelationChangedEvent
):
"""Handle CeilometerService relation changed."""
logging.debug("CeilometerService config data changed")
self.on.config_changed.emit(event.relation)
def _on_ceilometer_service_relation_broken(
self, event: RelationBrokenEvent
):
"""Handle CeilometerService relation changed."""
logging.debug("CeilometerService on_broken")
self.on.goneaway.emit(event.relation)
@property
def _ceilometer_service_rel(self) -> Optional[Relation]:
"""The ceilometer service relation."""
return self.framework.model.get_relation(self.relation_name)
def get_remote_app_data(self, key: str) -> Optional[str]:
"""Return the value for the given key from remote app data."""
if self._ceilometer_service_rel:
data = self._ceilometer_service_rel.data[
self._ceilometer_service_rel.app
]
return data.get(key)
return None
@property
def telemetry_secret(self) -> Optional[str]:
"""Return the telemetry_secret."""
return self.get_remote_app_data("telemetry-secret")

View File

@ -1,265 +0,0 @@
"""CephAccess Provides and Requires module.
This library contains the Requires and Provides classes for handling
the ceph-access interface.
Import `CephAccessRequires` in your charm, with the charm object and the
relation name:
- self
- "ceph_access"
Three events are also available to respond to:
- connected
- ready
- goneaway
A basic example showing the usage of this relation follows:
```
from charms.cinder_ceph_k8s.v0.ceph_access import CephAccessRequires
class CephAccessClientCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
# CephAccess Requires
self.ceph_access = CephAccessRequires(
self,
relation_name="ceph_access",
)
self.framework.observe(
self.ceph_access.on.connected, self._on_ceph_access_connected)
self.framework.observe(
self.ceph_access.on.ready, self._on_ceph_access_ready)
self.framework.observe(
self.ceph_access.on.goneaway, self._on_ceph_access_goneaway)
def _on_ceph_access_connected(self, event):
'''React to the CephAccess connected event.
This event happens when n CephAccess relation is added to the
model before credentials etc have been provided.
'''
# Do something before the relation is complete
pass
def _on_ceph_access_ready(self, event):
'''React to the CephAccess ready event.
This event happens when an CephAccess relation is removed.
'''
# IdentityService Relation has goneaway. shutdown services or suchlike
pass
```
"""
# The unique Charmhub library identifier, never change it
LIBID = "7fa8d4f8407c4f31ab1deb51c0c046f1"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
import logging
from typing import Optional
from ops import (
RelationEvent
)
from ops.model import (
Relation,
Secret,
SecretNotFoundError,
)
from ops.framework import (
EventBase,
ObjectEvents,
EventSource,
Object,
)
logger = logging.getLogger(__name__)
class CephAccessConnectedEvent(EventBase):
"""CephAccess connected Event."""
pass
class CephAccessReadyEvent(EventBase):
"""CephAccess ready for use Event."""
pass
class CephAccessGoneAwayEvent(EventBase):
"""CephAccess relation has gone-away Event"""
pass
class CephAccessServerEvents(ObjectEvents):
"""Events class for `on`"""
connected = EventSource(CephAccessConnectedEvent)
ready = EventSource(CephAccessReadyEvent)
goneaway = EventSource(CephAccessGoneAwayEvent)
class CephAccessRequires(Object):
"""
CephAccessRequires class
"""
on = CephAccessServerEvents()
def __init__(self, charm, relation_name: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_ceph_access_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_ceph_access_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_departed,
self._on_ceph_access_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_ceph_access_relation_broken,
)
@property
def _ceph_access_rel(self) -> Relation:
"""The CephAccess relation."""
return self.framework.model.get_relation(self.relation_name)
def get_remote_app_data(self, key: str) -> Optional[str]:
"""Return the value for the given key from remote app data."""
data = self._ceph_access_rel.data[self._ceph_access_rel.app]
return data.get(key)
def _on_ceph_access_relation_joined(self, event):
"""CephAccess relation joined."""
logging.debug("CephAccess on_joined")
self.on.connected.emit()
def _on_ceph_access_relation_changed(self, event):
"""CephAccess relation changed."""
logging.debug("CephAccess on_changed")
try:
if self.ready:
self.on.ready.emit()
except (AttributeError, KeyError):
pass
def _on_ceph_access_relation_broken(self, event):
"""CephAccess relation broken."""
logging.debug("CephAccess on_broken")
self.on.goneaway.emit()
def _retrieve_secret(self) -> Optional[Secret]:
try:
credentials_id = self.get_remote_app_data('access-credentials')
if not credentials_id:
return None
credentials = self.charm.model.get_secret(id=credentials_id)
return credentials
except SecretNotFoundError:
logger.warning(f"Secret {credentials_id} not found")
return None
@property
def ceph_access_data(self) -> dict:
"""Return the service_password."""
secret = self._retrieve_secret()
if not secret:
return {}
return secret.get_content()
@property
def ready(self) -> bool:
"""Return the service_password."""
return all(k in self.ceph_access_data for k in ["uuid", "key"])
class HasCephAccessClientsEvent(EventBase):
"""Has CephAccessClients Event."""
pass
class ReadyCephAccessClientsEvent(RelationEvent):
"""Has ReadyCephAccessClients Event."""
pass
class CephAccessClientEvents(ObjectEvents):
"""Events class for `on`"""
has_ceph_access_clients = EventSource(HasCephAccessClientsEvent)
ready_ceph_access_clients = EventSource(ReadyCephAccessClientsEvent)
class CephAccessProvides(Object):
"""
CephAccessProvides class
"""
on = CephAccessClientEvents()
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_ceph_access_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_ceph_access_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_ceph_access_relation_broken,
)
def _on_ceph_access_relation_joined(self, event):
"""Handle CephAccess joined."""
logging.debug("CephAccess on_joined")
self.on.has_ceph_access_clients.emit()
def _on_ceph_access_relation_changed(self, event):
"""Handle CephAccess joined."""
logging.debug("CephAccess on_changed")
self.on.ready_ceph_access_clients.emit(
event.relation,
app=event.app,
unit=event.unit)
def _on_ceph_access_relation_broken(self, event):
"""Handle CephAccess broken."""
logging.debug("CephAccessProvides on_broken")
def set_ceph_access_credentials(self, relation_name: int,
relation_id: str,
access_credentials: str):
logging.debug("Setting ceph_access connection information.")
_ceph_access_rel = None
for relation in self.framework.model.relations[relation_name]:
if relation.id == relation_id:
_ceph_access_rel = relation
if not _ceph_access_rel:
# Relation has disappeared so skip send of data
return
app_data = _ceph_access_rel.data[self.charm.app]
logging.debug(access_credentials)
app_data["access-credentials"] = access_credentials

View File

@ -1,439 +0,0 @@
"""IdentityCredentialsProvides and Requires module.
This library contains the Requires and Provides classes for handling
the identity_credentials interface.
Import `IdentityCredentialsRequires` in your charm, with the charm object and the
relation name:
- self
- "identity_credentials"
Also provide additional parameters to the charm object:
- service
- internal_url
- public_url
- admin_url
- region
- username
- vhost
Two events are also available to respond to:
- connected
- ready
- goneaway
A basic example showing the usage of this relation follows:
```
from charms.keystone_k8s.v0.identity_credentials import IdentityCredentialsRequires
class IdentityCredentialsClientCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
# IdentityCredentials Requires
self.identity_credentials = IdentityCredentialsRequires(
self, "identity_credentials",
service = "my-service"
internal_url = "http://internal-url"
public_url = "http://public-url"
admin_url = "http://admin-url"
region = "region"
)
self.framework.observe(
self.identity_credentials.on.connected, self._on_identity_credentials_connected)
self.framework.observe(
self.identity_credentials.on.ready, self._on_identity_credentials_ready)
self.framework.observe(
self.identity_credentials.on.goneaway, self._on_identity_credentials_goneaway)
def _on_identity_credentials_connected(self, event):
'''React to the IdentityCredentials connected event.
This event happens when IdentityCredentials relation is added to the
model before credentials etc have been provided.
'''
# Do something before the relation is complete
pass
def _on_identity_credentials_ready(self, event):
'''React to the IdentityCredentials ready event.
The IdentityCredentials interface will use the provided config for the
request to the identity server.
'''
# IdentityCredentials Relation is ready. Do something with the completed relation.
pass
def _on_identity_credentials_goneaway(self, event):
'''React to the IdentityCredentials goneaway event.
This event happens when an IdentityCredentials relation is removed.
'''
# IdentityCredentials Relation has goneaway. shutdown services or suchlike
pass
```
"""
import logging
from ops.framework import (
StoredState,
EventBase,
ObjectEvents,
EventSource,
Object,
)
from ops.model import (
Relation,
SecretNotFoundError,
)
# The unique Charmhub library identifier, never change it
LIBID = "b5fa18d4427c4ab9a269c3a2fbed545c"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
logger = logging.getLogger(__name__)
class IdentityCredentialsConnectedEvent(EventBase):
"""IdentityCredentials connected Event."""
pass
class IdentityCredentialsReadyEvent(EventBase):
"""IdentityCredentials ready for use Event."""
pass
class IdentityCredentialsGoneAwayEvent(EventBase):
"""IdentityCredentials relation has gone-away Event"""
pass
class IdentityCredentialsServerEvents(ObjectEvents):
"""Events class for `on`"""
connected = EventSource(IdentityCredentialsConnectedEvent)
ready = EventSource(IdentityCredentialsReadyEvent)
goneaway = EventSource(IdentityCredentialsGoneAwayEvent)
class IdentityCredentialsRequires(Object):
"""
IdentityCredentialsRequires class
"""
on = IdentityCredentialsServerEvents()
_stored = StoredState()
def __init__(self, charm, relation_name: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_identity_credentials_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_identity_credentials_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_departed,
self._on_identity_credentials_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_identity_credentials_relation_broken,
)
def _on_identity_credentials_relation_joined(self, event):
"""IdentityCredentials relation joined."""
logging.debug("IdentityCredentials on_joined")
self.on.connected.emit()
self.request_credentials()
def _on_identity_credentials_relation_changed(self, event):
"""IdentityCredentials relation changed."""
logging.debug("IdentityCredentials on_changed")
try:
self.on.ready.emit()
except (AttributeError, KeyError):
logger.exception('Error when emitting event')
def _on_identity_credentials_relation_broken(self, event):
"""IdentityCredentials relation broken."""
logging.debug("IdentityCredentials on_broken")
self.on.goneaway.emit()
@property
def _identity_credentials_rel(self) -> Relation:
"""The IdentityCredentials relation."""
return self.framework.model.get_relation(self.relation_name)
def get_remote_app_data(self, key: str) -> str:
"""Return the value for the given key from remote app data."""
data = self._identity_credentials_rel.data[self._identity_credentials_rel.app]
return data.get(key)
@property
def api_version(self) -> str:
"""Return the api_version."""
return self.get_remote_app_data('api-version')
@property
def auth_host(self) -> str:
"""Return the auth_host."""
return self.get_remote_app_data('auth-host')
@property
def auth_port(self) -> str:
"""Return the auth_port."""
return self.get_remote_app_data('auth-port')
@property
def auth_protocol(self) -> str:
"""Return the auth_protocol."""
return self.get_remote_app_data('auth-protocol')
@property
def internal_host(self) -> str:
"""Return the internal_host."""
return self.get_remote_app_data('internal-host')
@property
def internal_port(self) -> str:
"""Return the internal_port."""
return self.get_remote_app_data('internal-port')
@property
def internal_protocol(self) -> str:
"""Return the internal_protocol."""
return self.get_remote_app_data('internal-protocol')
@property
def credentials(self) -> str:
return self.get_remote_app_data('credentials')
@property
def username(self) -> str:
credentials_id = self.get_remote_app_data('credentials')
if not credentials_id:
return None
try:
credentials = self.charm.model.get_secret(id=credentials_id)
return credentials.get_content().get("username")
except SecretNotFoundError:
logger.warning(f"Secret {credentials_id} not found")
return None
@property
def password(self) -> str:
credentials_id = self.get_remote_app_data('credentials')
if not credentials_id:
return None
try:
credentials = self.charm.model.get_secret(id=credentials_id)
return credentials.get_content().get("password")
except SecretNotFoundError:
logger.warning(f"Secret {credentials_id} not found")
return None
@property
def project_name(self) -> str:
"""Return the project name."""
return self.get_remote_app_data('project-name')
@property
def project_id(self) -> str:
"""Return the project id."""
return self.get_remote_app_data('project-id')
@property
def user_domain_name(self) -> str:
"""Return the name of the user domain."""
return self.get_remote_app_data('user-domain-name')
@property
def user_domain_id(self) -> str:
"""Return the id of the user domain."""
return self.get_remote_app_data('user-domain-id')
@property
def project_domain_name(self) -> str:
"""Return the name of the project domain."""
return self.get_remote_app_data('project-domain-name')
@property
def project_domain_id(self) -> str:
"""Return the id of the project domain."""
return self.get_remote_app_data('project-domain-id')
@property
def region(self) -> str:
"""Return the region for the auth urls."""
return self.get_remote_app_data('region')
def request_credentials(self) -> None:
"""Request credentials from the IdentityCredentials server."""
if self.model.unit.is_leader():
logging.debug(f'Requesting credentials for {self.charm.app.name}')
app_data = self._identity_credentials_rel.data[self.charm.app]
app_data['username'] = self.charm.app.name
class HasIdentityCredentialsClientsEvent(EventBase):
"""Has IdentityCredentialsClients Event."""
pass
class ReadyIdentityCredentialsClientsEvent(EventBase):
"""IdentityCredentialsClients Ready Event."""
def __init__(self, handle, relation_id, relation_name, username):
super().__init__(handle)
self.relation_id = relation_id
self.relation_name = relation_name
self.username = username
def snapshot(self):
return {
"relation_id": self.relation_id,
"relation_name": self.relation_name,
"username": self.username,
}
def restore(self, snapshot):
super().restore(snapshot)
self.relation_id = snapshot["relation_id"]
self.relation_name = snapshot["relation_name"]
self.username = snapshot["username"]
class IdentityCredentialsClientsGoneAwayEvent(EventBase):
"""Has IdentityCredentialsClientsGoneAwayEvent Event."""
pass
class IdentityCredentialsClientEvents(ObjectEvents):
"""Events class for `on`"""
has_identity_credentials_clients = EventSource(
HasIdentityCredentialsClientsEvent
)
ready_identity_credentials_clients = EventSource(
ReadyIdentityCredentialsClientsEvent
)
identity_credentials_clients_gone = EventSource(
IdentityCredentialsClientsGoneAwayEvent
)
class IdentityCredentialsProvides(Object):
"""
IdentityCredentialsProvides class
"""
on = IdentityCredentialsClientEvents()
_stored = StoredState()
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_identity_credentials_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_identity_credentials_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_identity_credentials_relation_broken,
)
def _on_identity_credentials_relation_joined(self, event):
"""Handle IdentityCredentials joined."""
logging.debug("IdentityCredentialsProvides on_joined")
self.on.has_identity_credentials_clients.emit()
def _on_identity_credentials_relation_changed(self, event):
"""Handle IdentityCredentials changed."""
logging.debug("IdentityCredentials on_changed")
REQUIRED_KEYS = ['username']
values = [
event.relation.data[event.relation.app].get(k)
for k in REQUIRED_KEYS
]
# Validate data on the relation
if all(values):
username = event.relation.data[event.relation.app]['username']
self.on.ready_identity_credentials_clients.emit(
event.relation.id,
event.relation.name,
username,
)
def _on_identity_credentials_relation_broken(self, event):
"""Handle IdentityCredentials broken."""
logging.debug("IdentityCredentialsProvides on_departed")
self.on.identity_credentials_clients_gone.emit()
def set_identity_credentials(self, relation_name: int,
relation_id: str,
api_version: str,
auth_host: str,
auth_port: str,
auth_protocol: str,
internal_host: str,
internal_port: str,
internal_protocol: str,
credentials: str,
project_name: str,
project_id: str,
user_domain_name: str,
user_domain_id: str,
project_domain_name: str,
project_domain_id: str,
region: str):
logging.debug("Setting identity_credentials connection information.")
_identity_credentials_rel = None
for relation in self.framework.model.relations[relation_name]:
if relation.id == relation_id:
_identity_credentials_rel = relation
if not _identity_credentials_rel:
# Relation has disappeared so don't send the data
return
app_data = _identity_credentials_rel.data[self.charm.app]
app_data["api-version"] = api_version
app_data["auth-host"] = auth_host
app_data["auth-port"] = str(auth_port)
app_data["auth-protocol"] = auth_protocol
app_data["internal-host"] = internal_host
app_data["internal-port"] = str(internal_port)
app_data["internal-protocol"] = internal_protocol
app_data["credentials"] = credentials
app_data["project-name"] = project_name
app_data["project-id"] = project_id
app_data["user-domain-name"] = user_domain_name
app_data["user-domain-id"] = user_domain_id
app_data["project-domain-name"] = project_domain_name
app_data["project-domain-id"] = project_domain_id
app_data["region"] = region

View File

@ -1,373 +0,0 @@
"""IdentityResourceProvides and Requires module.
This library contains the Requires and Provides classes for handling
the identity_ops interface.
Import `IdentityResourceRequires` in your charm, with the charm object and the
relation name:
- self
- "identity_ops"
Also provide additional parameters to the charm object:
- request
Three events are also available to respond to:
- provider_ready
- provider_goneaway
- response_avaialable
A basic example showing the usage of this relation follows:
```
from charms.keystone_k8s.v0.identity_resource import IdentityResourceRequires
class IdentityResourceClientCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
# IdentityResource Requires
self.identity_resource = IdentityResourceRequires(
self, "identity_ops",
)
self.framework.observe(
self.identity_resource.on.provider_ready, self._on_identity_resource_ready)
self.framework.observe(
self.identity_resource.on.provider_goneaway, self._on_identity_resource_goneaway)
self.framework.observe(
self.identity_resource.on.response_available, self._on_identity_resource_response)
def _on_identity_resource_ready(self, event):
'''React to the IdentityResource provider_ready event.
This event happens when n IdentityResource relation is added to the
model. Ready to send any ops to keystone.
'''
# Ready to send any ops.
pass
def _on_identity_resource_response(self, event):
'''React to the IdentityResource response_available event.
The IdentityResource interface will provide the response for the ops sent.
'''
# Read the response for the ops sent.
pass
def _on_identity_resource_goneaway(self, event):
'''React to the IdentityResource goneaway event.
This event happens when an IdentityResource relation is removed.
'''
# IdentityResource Relation has goneaway. No ops can be sent.
pass
```
A sample ops request can be of format
{
"id": <request id>
"tag": <string to identify request>
"ops": [
{
"name": <op name>,
"params": {
<param 1>: <value 1>,
<param 2>: <value 2>
}
}
]
}
For any sensitive data in the ops params, the charm can create secrets and pass
secret id instead of sensitive data as part of ops request. The charm should
ensure to grant secret access to provider charm i.e., keystone over relation.
The secret content should hold the sensitive data with same name as param name.
"""
import json
import logging
from ops.charm import (
RelationEvent,
)
from ops.framework import (
EventBase,
EventSource,
Object,
ObjectEvents,
StoredState,
)
from ops.model import (
Relation,
)
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "b419d4d8249e423487daafc3665ed06f"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 2
REQUEST_NOT_SENT = 1
REQUEST_SENT = 2
REQUEST_PROCESSED = 3
class IdentityOpsProviderReadyEvent(RelationEvent):
"""Has IdentityOpsProviderReady Event."""
pass
class IdentityOpsResponseEvent(RelationEvent):
"""Has IdentityOpsResponse Event."""
pass
class IdentityOpsProviderGoneAwayEvent(RelationEvent):
"""Has IdentityOpsProviderGoneAway Event."""
pass
class IdentityResourceResponseEvents(ObjectEvents):
"""Events class for `on`."""
provider_ready = EventSource(IdentityOpsProviderReadyEvent)
response_available = EventSource(IdentityOpsResponseEvent)
provider_goneaway = EventSource(IdentityOpsProviderGoneAwayEvent)
class IdentityResourceRequires(Object):
"""IdentityResourceRequires class."""
on = IdentityResourceResponseEvents()
_stored = StoredState()
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self._stored.set_default(provider_ready=False, requests=[])
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_identity_resource_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_identity_resource_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_identity_resource_relation_broken,
)
def _on_identity_resource_relation_joined(self, event):
"""Handle IdentityResource joined."""
self._stored.provider_ready = True
self.on.provider_ready.emit(event.relation)
def _on_identity_resource_relation_changed(self, event):
"""Handle IdentityResource changed."""
id_ = self.response.get("id")
self.save_request_in_store(id_, None, None, REQUEST_PROCESSED)
self.on.response_available.emit(event.relation)
def _on_identity_resource_relation_broken(self, event):
"""Handle IdentityResource broken."""
self._stored.provider_ready = False
self.on.provider_goneaway.emit(event.relation)
@property
def _identity_resource_rel(self) -> Relation:
"""The IdentityResource relation."""
return self.framework.model.get_relation(self.relation_name)
@property
def response(self) -> dict:
"""Response object from keystone."""
response = self.get_remote_app_data("response")
if not response:
return {}
try:
return json.loads(response)
except Exception as e:
logger.debug(str(e))
return {}
def save_request_in_store(self, id: str, tag: str, ops: list, state: int):
"""Save request in the store."""
if id is None:
return
for request in self._stored.requests:
if request.get("id") == id:
if tag:
request["tag"] = tag
if ops:
request["ops"] = ops
request["state"] = state
return
# New request
self._stored.requests.append(
{"id": id, "tag": tag, "ops": ops, "state": state}
)
def get_request_from_store(self, id: str) -> dict:
"""Get request from the stote."""
for request in self._stored.requests:
if request.get("id") == id:
return request
return {}
def is_request_processed(self, id: str) -> bool:
"""Check if request is processed."""
for request in self._stored.requests:
if (
request.get("id") == id
and request.get("state") == REQUEST_PROCESSED
):
return True
return False
def get_remote_app_data(self, key: str) -> str:
"""Return the value for the given key from remote app data."""
data = self._identity_resource_rel.data[
self._identity_resource_rel.app
]
return data.get(key)
def ready(self) -> bool:
"""Interface is ready or not.
Interface is considered ready if the op request is processed
and response is sent. In case of non leader unit, just consider
the interface is ready.
"""
if not self.model.unit.is_leader():
logger.debug("Not a leader unit, set the interface to ready")
return True
try:
app_data = self._identity_resource_rel.data[self.charm.app]
if "request" not in app_data:
return False
request = json.loads(app_data["request"])
request_id = request.get("id")
response_id = self.response.get("id")
if request_id == response_id:
return True
except Exception as e:
logger.debug(str(e))
return False
def request_ops(self, request: dict) -> None:
"""Request keystone ops."""
if not self.model.unit.is_leader():
logger.debug("Not a leader unit, not sending request")
return
id_ = request.get("id")
tag = request.get("tag")
ops = request.get("ops")
req = self.get_request_from_store(id_)
if req and req.get("state") == REQUEST_PROCESSED:
logger.debug("Request {id_} already processed")
return
if not self._stored.provider_ready:
self.save_request_in_store(id_, tag, ops, REQUEST_NOT_SENT)
logger.debug("Keystone not yet ready to take requests")
return
logger.debug("Requesting ops to keystone")
app_data = self._identity_resource_rel.data[self.charm.app]
app_data["request"] = json.dumps(request)
self.save_request_in_store(id_, tag, ops, REQUEST_SENT)
class IdentityOpsRequestEvent(EventBase):
"""Has IdentityOpsRequest Event."""
def __init__(self, handle, relation_id, relation_name, request):
"""Initialise event."""
super().__init__(handle)
self.relation_id = relation_id
self.relation_name = relation_name
self.request = request
def snapshot(self):
"""Snapshot the event."""
return {
"relation_id": self.relation_id,
"relation_name": self.relation_name,
"request": self.request,
}
def restore(self, snapshot):
"""Restore the event."""
super().restore(snapshot)
self.relation_id = snapshot["relation_id"]
self.relation_name = snapshot["relation_name"]
self.request = snapshot["request"]
class IdentityResourceProviderEvents(ObjectEvents):
"""Events class for `on`."""
process_op = EventSource(IdentityOpsRequestEvent)
class IdentityResourceProvides(Object):
"""IdentityResourceProvides class."""
on = IdentityResourceProviderEvents()
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_identity_resource_relation_changed,
)
def _on_identity_resource_relation_changed(self, event):
"""Handle IdentityResource changed."""
request = event.relation.data[event.relation.app].get("request", {})
self.on.process_op.emit(
event.relation.id, event.relation.name, request
)
def set_ops_response(
self, relation_id: str, relation_name: str, ops_response: dict
):
"""Set response to ops request."""
if not self.model.unit.is_leader():
logger.debug("Not a leader unit, not sending response")
return
logger.debug("Update response from keystone")
_identity_resource_rel = self.charm.model.get_relation(
relation_name, relation_id
)
if not _identity_resource_rel:
# Relation has disappeared so skip send of data
return
app_data = _identity_resource_rel.data[self.charm.app]
app_data["response"] = json.dumps(ops_response)

View File

@ -1,525 +0,0 @@
"""IdentityServiceProvides and Requires module.
This library contains the Requires and Provides classes for handling
the identity_service interface.
Import `IdentityServiceRequires` in your charm, with the charm object and the
relation name:
- self
- "identity_service"
Also provide additional parameters to the charm object:
- service
- internal_url
- public_url
- admin_url
- region
- username
- vhost
Two events are also available to respond to:
- connected
- ready
- goneaway
A basic example showing the usage of this relation follows:
```
from charms.keystone_k8s.v1.identity_service import IdentityServiceRequires
class IdentityServiceClientCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
# IdentityService Requires
self.identity_service = IdentityServiceRequires(
self, "identity_service",
service = "my-service"
internal_url = "http://internal-url"
public_url = "http://public-url"
admin_url = "http://admin-url"
region = "region"
)
self.framework.observe(
self.identity_service.on.connected, self._on_identity_service_connected)
self.framework.observe(
self.identity_service.on.ready, self._on_identity_service_ready)
self.framework.observe(
self.identity_service.on.goneaway, self._on_identity_service_goneaway)
def _on_identity_service_connected(self, event):
'''React to the IdentityService connected event.
This event happens when n IdentityService relation is added to the
model before credentials etc have been provided.
'''
# Do something before the relation is complete
pass
def _on_identity_service_ready(self, event):
'''React to the IdentityService ready event.
The IdentityService interface will use the provided config for the
request to the identity server.
'''
# IdentityService Relation is ready. Do something with the completed relation.
pass
def _on_identity_service_goneaway(self, event):
'''React to the IdentityService goneaway event.
This event happens when an IdentityService relation is removed.
'''
# IdentityService Relation has goneaway. shutdown services or suchlike
pass
```
"""
import json
import logging
from ops.framework import (
StoredState,
EventBase,
ObjectEvents,
EventSource,
Object,
)
from ops.model import (
Relation,
SecretNotFoundError,
)
logger = logging.getLogger(__name__)
# The unique Charmhub library identifier, never change it
LIBID = "0fa7fe7236c14c6e9624acf232b9a3b0"
# Increment this major API version when introducing breaking changes
LIBAPI = 1
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
logger = logging.getLogger(__name__)
class IdentityServiceConnectedEvent(EventBase):
"""IdentityService connected Event."""
pass
class IdentityServiceReadyEvent(EventBase):
"""IdentityService ready for use Event."""
pass
class IdentityServiceGoneAwayEvent(EventBase):
"""IdentityService relation has gone-away Event"""
pass
class IdentityServiceServerEvents(ObjectEvents):
"""Events class for `on`"""
connected = EventSource(IdentityServiceConnectedEvent)
ready = EventSource(IdentityServiceReadyEvent)
goneaway = EventSource(IdentityServiceGoneAwayEvent)
class IdentityServiceRequires(Object):
"""
IdentityServiceRequires class
"""
on = IdentityServiceServerEvents()
_stored = StoredState()
def __init__(self, charm, relation_name: str, service_endpoints: dict,
region: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.service_endpoints = service_endpoints
self.region = region
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_identity_service_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_identity_service_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_departed,
self._on_identity_service_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_identity_service_relation_broken,
)
def _on_identity_service_relation_joined(self, event):
"""IdentityService relation joined."""
logging.debug("IdentityService on_joined")
self.on.connected.emit()
self.register_services(
self.service_endpoints,
self.region)
def _on_identity_service_relation_changed(self, event):
"""IdentityService relation changed."""
logging.debug("IdentityService on_changed")
try:
self.service_password
self.on.ready.emit()
except (AttributeError, KeyError):
pass
def _on_identity_service_relation_broken(self, event):
"""IdentityService relation broken."""
logging.debug("IdentityService on_broken")
self.on.goneaway.emit()
@property
def _identity_service_rel(self) -> Relation:
"""The IdentityService relation."""
return self.framework.model.get_relation(self.relation_name)
def get_remote_app_data(self, key: str) -> str:
"""Return the value for the given key from remote app data."""
data = self._identity_service_rel.data[self._identity_service_rel.app]
return data.get(key)
@property
def api_version(self) -> str:
"""Return the api_version."""
return self.get_remote_app_data('api-version')
@property
def auth_host(self) -> str:
"""Return the auth_host."""
return self.get_remote_app_data('auth-host')
@property
def auth_port(self) -> str:
"""Return the auth_port."""
return self.get_remote_app_data('auth-port')
@property
def auth_protocol(self) -> str:
"""Return the auth_protocol."""
return self.get_remote_app_data('auth-protocol')
@property
def internal_host(self) -> str:
"""Return the internal_host."""
return self.get_remote_app_data('internal-host')
@property
def internal_port(self) -> str:
"""Return the internal_port."""
return self.get_remote_app_data('internal-port')
@property
def internal_protocol(self) -> str:
"""Return the internal_protocol."""
return self.get_remote_app_data('internal-protocol')
@property
def admin_domain_name(self) -> str:
"""Return the admin_domain_name."""
return self.get_remote_app_data('admin-domain-name')
@property
def admin_domain_id(self) -> str:
"""Return the admin_domain_id."""
return self.get_remote_app_data('admin-domain-id')
@property
def admin_project_name(self) -> str:
"""Return the admin_project_name."""
return self.get_remote_app_data('admin-project-name')
@property
def admin_project_id(self) -> str:
"""Return the admin_project_id."""
return self.get_remote_app_data('admin-project-id')
@property
def admin_user_name(self) -> str:
"""Return the admin_user_name."""
return self.get_remote_app_data('admin-user-name')
@property
def admin_user_id(self) -> str:
"""Return the admin_user_id."""
return self.get_remote_app_data('admin-user-id')
@property
def service_domain_name(self) -> str:
"""Return the service_domain_name."""
return self.get_remote_app_data('service-domain-name')
@property
def service_domain_id(self) -> str:
"""Return the service_domain_id."""
return self.get_remote_app_data('service-domain-id')
@property
def service_host(self) -> str:
"""Return the service_host."""
return self.get_remote_app_data('service-host')
@property
def service_credentials(self) -> str:
"""Return the service_credentials secret."""
return self.get_remote_app_data('service-credentials')
@property
def service_password(self) -> str:
"""Return the service_password."""
credentials_id = self.get_remote_app_data('service-credentials')
if not credentials_id:
return None
try:
credentials = self.charm.model.get_secret(id=credentials_id)
return credentials.get_content().get("password")
except SecretNotFoundError:
logger.warning(f"Secret {credentials_id} not found")
return None
@property
def service_port(self) -> str:
"""Return the service_port."""
return self.get_remote_app_data('service-port')
@property
def service_protocol(self) -> str:
"""Return the service_protocol."""
return self.get_remote_app_data('service-protocol')
@property
def service_project_name(self) -> str:
"""Return the service_project_name."""
return self.get_remote_app_data('service-project-name')
@property
def service_project_id(self) -> str:
"""Return the service_project_id."""
return self.get_remote_app_data('service-project-id')
@property
def service_user_name(self) -> str:
"""Return the service_user_name."""
credentials_id = self.get_remote_app_data('service-credentials')
if not credentials_id:
return None
try:
credentials = self.charm.model.get_secret(id=credentials_id)
return credentials.get_content().get("username")
except SecretNotFoundError:
logger.warning(f"Secret {credentials_id} not found")
return None
@property
def service_user_id(self) -> str:
"""Return the service_user_id."""
return self.get_remote_app_data('service-user-id')
@property
def internal_auth_url(self) -> str:
"""Return the internal_auth_url."""
return self.get_remote_app_data('internal-auth-url')
@property
def admin_auth_url(self) -> str:
"""Return the admin_auth_url."""
return self.get_remote_app_data('admin-auth-url')
@property
def public_auth_url(self) -> str:
"""Return the public_auth_url."""
return self.get_remote_app_data('public-auth-url')
@property
def admin_role(self) -> str:
"""Return the admin_role."""
return self.get_remote_app_data('admin-role')
def register_services(self, service_endpoints: dict,
region: str) -> None:
"""Request access to the IdentityService server."""
if self.model.unit.is_leader():
logging.debug("Requesting service registration")
app_data = self._identity_service_rel.data[self.charm.app]
app_data["service-endpoints"] = json.dumps(
service_endpoints, sort_keys=True
)
app_data["region"] = region
class HasIdentityServiceClientsEvent(EventBase):
"""Has IdentityServiceClients Event."""
pass
class ReadyIdentityServiceClientsEvent(EventBase):
"""IdentityServiceClients Ready Event."""
def __init__(self, handle, relation_id, relation_name, service_endpoints,
region, client_app_name):
super().__init__(handle)
self.relation_id = relation_id
self.relation_name = relation_name
self.service_endpoints = service_endpoints
self.region = region
self.client_app_name = client_app_name
def snapshot(self):
return {
"relation_id": self.relation_id,
"relation_name": self.relation_name,
"service_endpoints": self.service_endpoints,
"client_app_name": self.client_app_name,
"region": self.region}
def restore(self, snapshot):
super().restore(snapshot)
self.relation_id = snapshot["relation_id"]
self.relation_name = snapshot["relation_name"]
self.service_endpoints = snapshot["service_endpoints"]
self.region = snapshot["region"]
self.client_app_name = snapshot["client_app_name"]
class IdentityServiceClientEvents(ObjectEvents):
"""Events class for `on`"""
has_identity_service_clients = EventSource(HasIdentityServiceClientsEvent)
ready_identity_service_clients = EventSource(ReadyIdentityServiceClientsEvent)
class IdentityServiceProvides(Object):
"""
IdentityServiceProvides class
"""
on = IdentityServiceClientEvents()
_stored = StoredState()
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_identity_service_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_identity_service_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_identity_service_relation_broken,
)
def _on_identity_service_relation_joined(self, event):
"""Handle IdentityService joined."""
logging.debug("IdentityService on_joined")
self.on.has_identity_service_clients.emit()
def _on_identity_service_relation_changed(self, event):
"""Handle IdentityService changed."""
logging.debug("IdentityService on_changed")
REQUIRED_KEYS = [
'service-endpoints',
'region']
values = [
event.relation.data[event.relation.app].get(k)
for k in REQUIRED_KEYS
]
# Validate data on the relation
if all(values):
service_eps = json.loads(
event.relation.data[event.relation.app]['service-endpoints'])
self.on.ready_identity_service_clients.emit(
event.relation.id,
event.relation.name,
service_eps,
event.relation.data[event.relation.app]['region'],
event.relation.app.name)
def _on_identity_service_relation_broken(self, event):
"""Handle IdentityService broken."""
logging.debug("IdentityServiceProvides on_departed")
# TODO clear data on the relation
def set_identity_service_credentials(self, relation_name: int,
relation_id: str,
api_version: str,
auth_host: str,
auth_port: str,
auth_protocol: str,
internal_host: str,
internal_port: str,
internal_protocol: str,
service_host: str,
service_port: str,
service_protocol: str,
admin_domain: str,
admin_project: str,
admin_user: str,
service_domain: str,
service_project: str,
service_user: str,
internal_auth_url: str,
admin_auth_url: str,
public_auth_url: str,
service_credentials: str,
admin_role: str):
logging.debug("Setting identity_service connection information.")
_identity_service_rel = None
for relation in self.framework.model.relations[relation_name]:
if relation.id == relation_id:
_identity_service_rel = relation
if not _identity_service_rel:
# Relation has disappeared so skip send of data
return
app_data = _identity_service_rel.data[self.charm.app]
app_data["api-version"] = api_version
app_data["auth-host"] = auth_host
app_data["auth-port"] = str(auth_port)
app_data["auth-protocol"] = auth_protocol
app_data["internal-host"] = internal_host
app_data["internal-port"] = str(internal_port)
app_data["internal-protocol"] = internal_protocol
app_data["service-host"] = service_host
app_data["service-port"] = str(service_port)
app_data["service-protocol"] = service_protocol
app_data["admin-domain-name"] = admin_domain.name
app_data["admin-domain-id"] = admin_domain.id
app_data["admin-project-name"] = admin_project.name
app_data["admin-project-id"] = admin_project.id
app_data["admin-user-name"] = admin_user.name
app_data["admin-user-id"] = admin_user.id
app_data["service-domain-name"] = service_domain.name
app_data["service-domain-id"] = service_domain.id
app_data["service-project-name"] = service_project.name
app_data["service-project-id"] = service_project.id
app_data["service-user-id"] = service_user.id
app_data["internal-auth-url"] = internal_auth_url
app_data["admin-auth-url"] = admin_auth_url
app_data["public-auth-url"] = public_auth_url
app_data["service-credentials"] = service_credentials
app_data["admin-role"] = admin_role

View File

@ -1,416 +0,0 @@
# Copyright 2023 Canonical Ltd.
# Licensed under the Apache2.0. See LICENSE file in charm source for details.
"""Library for the ingress relation.
This library contains the Requires and Provides classes for handling
the ingress interface.
Import `IngressRequires` in your charm, with two required options:
- "self" (the charm itself)
- config_dict
`config_dict` accepts the following keys:
- additional-hostnames
- backend-protocol
- limit-rps
- limit-whitelist
- max-body-size
- owasp-modsecurity-crs
- owasp-modsecurity-custom-rules
- path-routes
- retry-errors
- rewrite-enabled
- rewrite-target
- service-hostname (required)
- service-name (required)
- service-namespace
- service-port (required)
- session-cookie-max-age
- tls-secret-name
See [the config section](https://charmhub.io/nginx-ingress-integrator/configure) for descriptions
of each, along with the required type.
As an example, add the following to `src/charm.py`:
```
from charms.nginx_ingress_integrator.v0.ingress import IngressRequires
# In your charm's `__init__` method (assuming your app is listening on port 8080).
self.ingress = IngressRequires(self, {
"service-hostname": self.app.name,
"service-name": self.app.name,
"service-port": 8080,
}
)
```
And then add the following to `metadata.yaml`:
```
requires:
ingress:
interface: ingress
```
You _must_ register the IngressRequires class as part of the `__init__` method
rather than, for instance, a config-changed event handler, for the relation
changed event to be properly handled.
In the example above we're setting `service-hostname` (which translates to the
external hostname for the application when related to nginx-ingress-integrator)
to `self.app.name` here. This ensures by default the charm will be available on
the name of the deployed juju application, but can be overridden in a
production deployment by setting `service-hostname` on the
nginx-ingress-integrator charm. For example:
```bash
juju deploy nginx-ingress-integrator
juju deploy my-charm
juju relate nginx-ingress-integrator my-charm:ingress
# The service is now reachable on the ingress IP(s) of your k8s cluster at
# 'http://my-charm'.
juju config nginx-ingress-integrator service-hostname='my-charm.example.com'
# The service is now reachable on the ingress IP(s) of your k8s cluster at
# 'http://my-charm.example.com'.
"""
import copy
import logging
from typing import Dict
from ops.charm import CharmBase, CharmEvents, RelationBrokenEvent, RelationChangedEvent
from ops.framework import EventBase, EventSource, Object
from ops.model import BlockedStatus
INGRESS_RELATION_NAME = "ingress"
INGRESS_PROXY_RELATION_NAME = "ingress-proxy"
# The unique Charmhub library identifier, never change it
LIBID = "db0af4367506491c91663468fb5caa4c"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 17
LOGGER = logging.getLogger(__name__)
REQUIRED_INGRESS_RELATION_FIELDS = {"service-hostname", "service-name", "service-port"}
OPTIONAL_INGRESS_RELATION_FIELDS = {
"additional-hostnames",
"backend-protocol",
"limit-rps",
"limit-whitelist",
"max-body-size",
"owasp-modsecurity-crs",
"owasp-modsecurity-custom-rules",
"path-routes",
"retry-errors",
"rewrite-target",
"rewrite-enabled",
"service-namespace",
"session-cookie-max-age",
"tls-secret-name",
}
RELATION_INTERFACES_MAPPINGS = {
"service-hostname": "host",
"service-name": "name",
"service-namespace": "model",
"service-port": "port",
}
RELATION_INTERFACES_MAPPINGS_VALUES = set(RELATION_INTERFACES_MAPPINGS.values())
class IngressAvailableEvent(EventBase):
"""IngressAvailableEvent custom event.
This event indicates the Ingress provider is available.
"""
class IngressProxyAvailableEvent(EventBase):
"""IngressProxyAvailableEvent custom event.
This event indicates the IngressProxy provider is available.
"""
class IngressBrokenEvent(RelationBrokenEvent):
"""IngressBrokenEvent custom event.
This event indicates the Ingress provider is broken.
"""
class IngressCharmEvents(CharmEvents):
"""Custom charm events.
Attrs:
ingress_available: Event to indicate that Ingress is available.
ingress_proxy_available: Event to indicate that IngressProxy is available.
ingress_broken: Event to indicate that Ingress is broken.
"""
ingress_available = EventSource(IngressAvailableEvent)
ingress_proxy_available = EventSource(IngressProxyAvailableEvent)
ingress_broken = EventSource(IngressBrokenEvent)
class IngressRequires(Object):
"""This class defines the functionality for the 'requires' side of the 'ingress' relation.
Hook events observed:
- relation-changed
Attrs:
model: Juju model where the charm is deployed.
config_dict: Contains all the configuration options for Ingress.
"""
def __init__(self, charm: CharmBase, config_dict: Dict) -> None:
"""Init function for the IngressRequires class.
Args:
charm: The charm that requires the ingress relation.
config_dict: Contains all the configuration options for Ingress.
"""
super().__init__(charm, INGRESS_RELATION_NAME)
self.framework.observe(
charm.on[INGRESS_RELATION_NAME].relation_changed, self._on_relation_changed
)
# Set default values.
default_relation_fields = {
"service-namespace": self.model.name,
}
config_dict.update(
(key, value)
for key, value in default_relation_fields.items()
if key not in config_dict or not config_dict[key]
)
self.config_dict = self._convert_to_relation_interface(config_dict)
@staticmethod
def _convert_to_relation_interface(config_dict: Dict) -> Dict:
"""Create a new relation dict that conforms with charm-relation-interfaces.
Args:
config_dict: Ingress configuration that doesn't conform with charm-relation-interfaces.
Returns:
The Ingress configuration conforming with charm-relation-interfaces.
"""
config_dict = copy.copy(config_dict)
config_dict.update(
(key, config_dict[old_key])
for old_key, key in RELATION_INTERFACES_MAPPINGS.items()
if old_key in config_dict and config_dict[old_key]
)
return config_dict
def _config_dict_errors(self, config_dict: Dict, update_only: bool = False) -> bool:
"""Check our config dict for errors.
Args:
config_dict: Contains all the configuration options for Ingress.
update_only: If the charm needs to update only existing keys.
Returns:
If we need to update the config dict or not.
"""
blocked_message = "Error in ingress relation, check `juju debug-log`"
unknown = [
config_key
for config_key in config_dict
if config_key
not in REQUIRED_INGRESS_RELATION_FIELDS
| OPTIONAL_INGRESS_RELATION_FIELDS
| RELATION_INTERFACES_MAPPINGS_VALUES
]
if unknown:
LOGGER.error(
"Ingress relation error, unknown key(s) in config dictionary found: %s",
", ".join(unknown),
)
self.model.unit.status = BlockedStatus(blocked_message)
return True
if not update_only:
missing = tuple(
config_key
for config_key in REQUIRED_INGRESS_RELATION_FIELDS
if config_key not in self.config_dict
)
if missing:
LOGGER.error(
"Ingress relation error, missing required key(s) in config dictionary: %s",
", ".join(sorted(missing)),
)
self.model.unit.status = BlockedStatus(blocked_message)
return True
return False
def _on_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handle the relation-changed event.
Args:
event: Event triggering the relation-changed hook for the relation.
"""
# `self.unit` isn't available here, so use `self.model.unit`.
if self.model.unit.is_leader():
if self._config_dict_errors(config_dict=self.config_dict):
return
event.relation.data[self.model.app].update(
(key, str(self.config_dict[key])) for key in self.config_dict
)
def update_config(self, config_dict: Dict) -> None:
"""Allow for updates to relation.
Args:
config_dict: Contains all the configuration options for Ingress.
Attrs:
config_dict: Contains all the configuration options for Ingress.
"""
if self.model.unit.is_leader():
self.config_dict = self._convert_to_relation_interface(config_dict)
if self._config_dict_errors(self.config_dict, update_only=True):
return
relation = self.model.get_relation(INGRESS_RELATION_NAME)
if relation:
for key in self.config_dict:
relation.data[self.model.app][key] = str(self.config_dict[key])
class IngressBaseProvides(Object):
"""Parent class for IngressProvides and IngressProxyProvides.
Attrs:
model: Juju model where the charm is deployed.
"""
def __init__(self, charm: CharmBase, relation_name: str) -> None:
"""Init function for the IngressProxyProvides class.
Args:
charm: The charm that provides the ingress-proxy relation.
relation_name: The name of the relation.
"""
super().__init__(charm, relation_name)
self.charm = charm
def _on_relation_changed(self, event: RelationChangedEvent) -> None:
"""Handle a change to the ingress/ingress-proxy relation.
Confirm we have the fields we expect to receive.
Args:
event: Event triggering the relation-changed hook for the relation.
"""
# `self.unit` isn't available here, so use `self.model.unit`.
if not self.model.unit.is_leader():
return
relation_name = event.relation.name
assert event.app is not None # nosec
if not event.relation.data[event.app]:
LOGGER.info(
"%s hasn't finished configuring, waiting until relation is changed again.",
relation_name,
)
return
ingress_data = {
field: event.relation.data[event.app].get(field)
for field in REQUIRED_INGRESS_RELATION_FIELDS | OPTIONAL_INGRESS_RELATION_FIELDS
}
missing_fields = sorted(
field for field in REQUIRED_INGRESS_RELATION_FIELDS if ingress_data.get(field) is None
)
if missing_fields:
LOGGER.warning(
"Missing required data fields for %s relation: %s",
relation_name,
", ".join(missing_fields),
)
self.model.unit.status = BlockedStatus(
f"Missing fields for {relation_name}: {', '.join(missing_fields)}"
)
if relation_name == INGRESS_RELATION_NAME:
# Conform to charm-relation-interfaces.
if "name" in ingress_data and "port" in ingress_data:
name = ingress_data["name"]
port = ingress_data["port"]
else:
name = ingress_data["service-name"]
port = ingress_data["service-port"]
event.relation.data[self.model.app]["url"] = f"http://{name}:{port}/"
# Create an event that our charm can use to decide it's okay to
# configure the ingress.
self.charm.on.ingress_available.emit()
elif relation_name == INGRESS_PROXY_RELATION_NAME:
self.charm.on.ingress_proxy_available.emit()
class IngressProvides(IngressBaseProvides):
"""Class containing the functionality for the 'provides' side of the 'ingress' relation.
Hook events observed:
- relation-changed
"""
def __init__(self, charm: CharmBase) -> None:
"""Init function for the IngressProvides class.
Args:
charm: The charm that provides the ingress relation.
"""
super().__init__(charm, INGRESS_RELATION_NAME)
# Observe the relation-changed hook event and bind
# self.on_relation_changed() to handle the event.
self.framework.observe(
charm.on[INGRESS_RELATION_NAME].relation_changed, self._on_relation_changed
)
self.framework.observe(
charm.on[INGRESS_RELATION_NAME].relation_broken, self._on_relation_broken
)
def _on_relation_broken(self, event: RelationBrokenEvent) -> None:
"""Handle a relation-broken event in the ingress relation.
Args:
event: Event triggering the relation-broken hook for the relation.
"""
if not self.model.unit.is_leader():
return
# Create an event that our charm can use to remove the ingress resource.
self.charm.on.ingress_broken.emit(event.relation)
class IngressProxyProvides(IngressBaseProvides):
"""Class containing the functionality for the 'provides' side of the 'ingress-proxy' relation.
Hook events observed:
- relation-changed
"""
def __init__(self, charm: CharmBase) -> None:
"""Init function for the IngressProxyProvides class.
Args:
charm: The charm that provides the ingress-proxy relation.
"""
super().__init__(charm, INGRESS_PROXY_RELATION_NAME)
# Observe the relation-changed hook event and bind
# self.on_relation_changed() to handle the event.
self.framework.observe(
charm.on[INGRESS_PROXY_RELATION_NAME].relation_changed, self._on_relation_changed
)

View File

@ -1,206 +0,0 @@
"""TODO: Add a proper docstring here.
This is a placeholder docstring for this charm library. Docstrings are
presented on Charmhub and updated whenever you push a new version of the
library.
Complete documentation about creating and documenting libraries can be found
in the SDK docs at https://juju.is/docs/sdk/libraries.
See `charmcraft publish-lib` and `charmcraft fetch-lib` for details of how to
share and consume charm libraries. They serve to enhance collaboration
between charmers. Use a charmer's libraries for classes that handle
integration with their charm.
Bear in mind that new revisions of the different major API versions (v0, v1,
v2 etc) are maintained independently. You can continue to update v0 and v1
after you have pushed v3.
Markdown is supported, following the CommonMark specification.
"""
import logging
import typing
from ops.framework import (
StoredState,
EventBase,
ObjectEvents,
EventSource,
Object,
)
# The unique Charmhub library identifier, never change it
LIBID = "114b7bb1970445daa61650e451f9da62"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 3
# TODO: add your code here! Happy coding!
class OVSDBCMSConnectedEvent(EventBase):
"""OVSDBCMS connected Event."""
pass
class OVSDBCMSReadyEvent(EventBase):
"""OVSDBCMS ready for use Event."""
pass
class OVSDBCMSGoneAwayEvent(EventBase):
"""OVSDBCMS relation has gone-away Event"""
pass
class OVSDBCMSServerEvents(ObjectEvents):
"""Events class for `on`"""
connected = EventSource(OVSDBCMSConnectedEvent)
ready = EventSource(OVSDBCMSReadyEvent)
goneaway = EventSource(OVSDBCMSGoneAwayEvent)
class OVSDBCMSRequires(Object):
"""
OVSDBCMSRequires class
"""
on = OVSDBCMSServerEvents()
_stored = StoredState()
def __init__(self, charm, relation_name: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_ovsdb_cms_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_ovsdb_cms_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_departed,
self._on_ovsdb_cms_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_ovsdb_cms_relation_broken,
)
def _on_ovsdb_cms_relation_joined(self, event):
"""OVSDBCMS relation joined."""
logging.debug("OVSDBCMSRequires on_joined")
self.on.connected.emit()
def bound_hostnames(self):
return self.get_all_unit_values("bound-hostname")
def bound_addresses(self):
return self.get_all_unit_values("bound-address")
def remote_ready(self):
return all(self.bound_hostnames()) or all(self.bound_addresses())
def _on_ovsdb_cms_relation_changed(self, event):
"""OVSDBCMS relation changed."""
logging.debug("OVSDBCMSRequires on_changed")
if self.remote_ready():
self.on.ready.emit()
def _on_ovsdb_cms_relation_broken(self, event):
"""OVSDBCMS relation broken."""
logging.debug("OVSDBCMSRequires on_broken")
self.on.goneaway.emit()
def get_all_unit_values(self, key: str) -> typing.List[str]:
"""Retrieve value for key from all related units."""
values = []
relation = self.framework.model.get_relation(self.relation_name)
if relation:
for unit in relation.units:
values.append(relation.data[unit].get(key))
return values
class OVSDBCMSClientConnectedEvent(EventBase):
"""OVSDBCMS connected Event."""
pass
class OVSDBCMSClientReadyEvent(EventBase):
"""OVSDBCMS ready for use Event."""
pass
class OVSDBCMSClientGoneAwayEvent(EventBase):
"""OVSDBCMS relation has gone-away Event"""
pass
class OVSDBCMSClientEvents(ObjectEvents):
"""Events class for `on`"""
connected = EventSource(OVSDBCMSClientConnectedEvent)
ready = EventSource(OVSDBCMSClientReadyEvent)
goneaway = EventSource(OVSDBCMSClientGoneAwayEvent)
class OVSDBCMSProvides(Object):
"""
OVSDBCMSProvides class
"""
on = OVSDBCMSClientEvents()
_stored = StoredState()
def __init__(self, charm, relation_name):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_ovsdb_cms_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_ovsdb_cms_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_ovsdb_cms_relation_broken,
)
def _on_ovsdb_cms_relation_joined(self, event):
"""Handle ovsdb-cms joined."""
logging.debug("OVSDBCMSProvides on_joined")
self.on.connected.emit()
def _on_ovsdb_cms_relation_changed(self, event):
"""Handle ovsdb-cms changed."""
logging.debug("OVSDBCMSProvides on_changed")
self.on.ready.emit()
def _on_ovsdb_cms_relation_broken(self, event):
"""Handle ovsdb-cms broken."""
logging.debug("OVSDBCMSProvides on_departed")
self.on.goneaway.emit()
def set_unit_data(self, settings: typing.Dict[str, str]) -> None:
"""Publish settings on the peer unit data bag."""
relations = self.framework.model.relations[self.relation_name]
for relation in relations:
for k, v in settings.items():
relation.data[self.model.unit][k] = v

View File

@ -1,286 +0,0 @@
"""RabbitMQProvides and Requires module.
This library contains the Requires and Provides classes for handling
the rabbitmq interface.
Import `RabbitMQRequires` in your charm, with the charm object and the
relation name:
- self
- "amqp"
Also provide two additional parameters to the charm object:
- username
- vhost
Two events are also available to respond to:
- connected
- ready
- goneaway
A basic example showing the usage of this relation follows:
```
from charms.rabbitmq_k8s.v0.rabbitmq import RabbitMQRequires
class RabbitMQClientCharm(CharmBase):
def __init__(self, *args):
super().__init__(*args)
# RabbitMQ Requires
self.amqp = RabbitMQRequires(
self, "amqp",
username="myusername",
vhost="vhostname"
)
self.framework.observe(
self.amqp.on.connected, self._on_amqp_connected)
self.framework.observe(
self.amqp.on.ready, self._on_amqp_ready)
self.framework.observe(
self.amqp.on.goneaway, self._on_amqp_goneaway)
def _on_amqp_connected(self, event):
'''React to the RabbitMQ connected event.
This event happens when n RabbitMQ relation is added to the
model before credentials etc have been provided.
'''
# Do something before the relation is complete
pass
def _on_amqp_ready(self, event):
'''React to the RabbitMQ ready event.
The RabbitMQ interface will use the provided username and vhost for the
request to the rabbitmq server.
'''
# RabbitMQ Relation is ready. Do something with the completed relation.
pass
def _on_amqp_goneaway(self, event):
'''React to the RabbitMQ goneaway event.
This event happens when an RabbitMQ relation is removed.
'''
# RabbitMQ Relation has goneaway. shutdown services or suchlike
pass
```
"""
# The unique Charmhub library identifier, never change it
LIBID = "45622352791142fd9cf87232e3bd6f2a"
# Increment this major API version when introducing breaking changes
LIBAPI = 0
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 1
import logging
from ops.framework import (
StoredState,
EventBase,
ObjectEvents,
EventSource,
Object,
)
from ops.model import Relation
from typing import List
logger = logging.getLogger(__name__)
class RabbitMQConnectedEvent(EventBase):
"""RabbitMQ connected Event."""
pass
class RabbitMQReadyEvent(EventBase):
"""RabbitMQ ready for use Event."""
pass
class RabbitMQGoneAwayEvent(EventBase):
"""RabbitMQ relation has gone-away Event"""
pass
class RabbitMQServerEvents(ObjectEvents):
"""Events class for `on`"""
connected = EventSource(RabbitMQConnectedEvent)
ready = EventSource(RabbitMQReadyEvent)
goneaway = EventSource(RabbitMQGoneAwayEvent)
class RabbitMQRequires(Object):
"""
RabbitMQRequires class
"""
on = RabbitMQServerEvents()
def __init__(self, charm, relation_name: str, username: str, vhost: str):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.username = username
self.vhost = vhost
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_amqp_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_amqp_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_departed,
self._on_amqp_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_amqp_relation_broken,
)
def _on_amqp_relation_joined(self, event):
"""RabbitMQ relation joined."""
logging.debug("RabbitMQRabbitMQRequires on_joined")
self.on.connected.emit()
self.request_access(self.username, self.vhost)
def _on_amqp_relation_changed(self, event):
"""RabbitMQ relation changed."""
logging.debug("RabbitMQRabbitMQRequires on_changed/departed")
if self.password:
self.on.ready.emit()
def _on_amqp_relation_broken(self, event):
"""RabbitMQ relation broken."""
logging.debug("RabbitMQRabbitMQRequires on_broken")
self.on.goneaway.emit()
@property
def _amqp_rel(self) -> Relation:
"""The RabbitMQ relation."""
return self.framework.model.get_relation(self.relation_name)
@property
def password(self) -> str:
"""Return the RabbitMQ password from the server side of the relation."""
return self._amqp_rel.data[self._amqp_rel.app].get("password")
@property
def hostname(self) -> str:
"""Return the hostname from the RabbitMQ relation"""
return self._amqp_rel.data[self._amqp_rel.app].get("hostname")
@property
def ssl_port(self) -> str:
"""Return the SSL port from the RabbitMQ relation"""
return self._amqp_rel.data[self._amqp_rel.app].get("ssl_port")
@property
def ssl_ca(self) -> str:
"""Return the SSL port from the RabbitMQ relation"""
return self._amqp_rel.data[self._amqp_rel.app].get("ssl_ca")
@property
def hostnames(self) -> List[str]:
"""Return a list of remote RMQ hosts from the RabbitMQ relation"""
_hosts = []
for unit in self._amqp_rel.units:
_hosts.append(self._amqp_rel.data[unit].get("ingress-address"))
return _hosts
def request_access(self, username: str, vhost: str) -> None:
"""Request access to the RabbitMQ server."""
if self.model.unit.is_leader():
logging.debug("Requesting RabbitMQ user and vhost")
self._amqp_rel.data[self.charm.app]["username"] = username
self._amqp_rel.data[self.charm.app]["vhost"] = vhost
class HasRabbitMQClientsEvent(EventBase):
"""Has RabbitMQClients Event."""
pass
class ReadyRabbitMQClientsEvent(EventBase):
"""RabbitMQClients Ready Event."""
pass
class RabbitMQClientEvents(ObjectEvents):
"""Events class for `on`"""
has_amqp_clients = EventSource(HasRabbitMQClientsEvent)
ready_amqp_clients = EventSource(ReadyRabbitMQClientsEvent)
class RabbitMQProvides(Object):
"""
RabbitMQProvides class
"""
on = RabbitMQClientEvents()
def __init__(self, charm, relation_name, callback):
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.callback = callback
self.framework.observe(
self.charm.on[relation_name].relation_joined,
self._on_amqp_relation_joined,
)
self.framework.observe(
self.charm.on[relation_name].relation_changed,
self._on_amqp_relation_changed,
)
self.framework.observe(
self.charm.on[relation_name].relation_broken,
self._on_amqp_relation_broken,
)
def _on_amqp_relation_joined(self, event):
"""Handle RabbitMQ joined."""
logging.debug("RabbitMQRabbitMQProvides on_joined data={}"
.format(event.relation.data[event.relation.app]))
self.on.has_amqp_clients.emit()
def _on_amqp_relation_changed(self, event):
"""Handle RabbitMQ changed."""
logging.debug("RabbitMQRabbitMQProvides on_changed data={}"
.format(event.relation.data[event.relation.app]))
# Validate data on the relation
if self.username(event) and self.vhost(event):
self.on.ready_amqp_clients.emit()
if self.charm.unit.is_leader():
self.callback(event, self.username(event), self.vhost(event))
else:
logging.warning("Received RabbitMQ changed event without the "
"expected keys ('username', 'vhost') in the "
"application data bag. Incompatible charm in "
"other end of relation?")
def _on_amqp_relation_broken(self, event):
"""Handle RabbitMQ broken."""
logging.debug("RabbitMQRabbitMQProvides on_departed")
# TODO clear data on the relation
def username(self, event):
"""Return the RabbitMQ username from the client side of the relation."""
return event.relation.data[event.relation.app].get("username")
def vhost(self, event):
"""Return the RabbitMQ vhost from the client side of the relation."""
return event.relation.data[event.relation.app].get("vhost")

View File

@ -1,734 +0,0 @@
# Copyright 2023 Canonical Ltd.
# See LICENSE file for licensing details.
r"""# Interface Library for ingress.
This library wraps relation endpoints using the `ingress` interface
and provides a Python API for both requesting and providing per-application
ingress, with load-balancing occurring across all units.
## Getting Started
To get started using the library, you just need to fetch the library using `charmcraft`.
```shell
cd some-charm
charmcraft fetch-lib charms.traefik_k8s.v1.ingress
```
In the `metadata.yaml` of the charm, add the following:
```yaml
requires:
ingress:
interface: ingress
limit: 1
```
Then, to initialise the library:
```python
from charms.traefik_k8s.v2.ingress import (IngressPerAppRequirer,
IngressPerAppReadyEvent, IngressPerAppRevokedEvent)
class SomeCharm(CharmBase):
def __init__(self, *args):
# ...
self.ingress = IngressPerAppRequirer(self, port=80)
# The following event is triggered when the ingress URL to be used
# by this deployment of the `SomeCharm` is ready (or changes).
self.framework.observe(
self.ingress.on.ready, self._on_ingress_ready
)
self.framework.observe(
self.ingress.on.revoked, self._on_ingress_revoked
)
def _on_ingress_ready(self, event: IngressPerAppReadyEvent):
logger.info("This app's ingress URL: %s", event.url)
def _on_ingress_revoked(self, event: IngressPerAppRevokedEvent):
logger.info("This app no longer has ingress")
"""
import json
import logging
import socket
import typing
from dataclasses import dataclass
from typing import (
Any,
Dict,
List,
MutableMapping,
Optional,
Sequence,
Tuple,
)
import pydantic
from ops.charm import CharmBase, RelationBrokenEvent, RelationEvent
from ops.framework import EventSource, Object, ObjectEvents, StoredState
from ops.model import ModelError, Relation, Unit
from pydantic import AnyHttpUrl, BaseModel, Field, validator
# The unique Charmhub library identifier, never change it
LIBID = "e6de2a5cd5b34422a204668f3b8f90d2"
# Increment this major API version when introducing breaking changes
LIBAPI = 2
# Increment this PATCH version before using `charmcraft publish-lib` or reset
# to 0 if you are raising the major API version
LIBPATCH = 6
PYDEPS = ["pydantic<2.0"]
DEFAULT_RELATION_NAME = "ingress"
RELATION_INTERFACE = "ingress"
log = logging.getLogger(__name__)
BUILTIN_JUJU_KEYS = {"ingress-address", "private-address", "egress-subnets"}
class DatabagModel(BaseModel):
"""Base databag model."""
class Config:
"""Pydantic config."""
allow_population_by_field_name = True
"""Allow instantiating this class by field name (instead of forcing alias)."""
_NEST_UNDER = None
@classmethod
def load(cls, databag: MutableMapping):
"""Load this model from a Juju databag."""
if cls._NEST_UNDER:
return cls.parse_obj(json.loads(databag[cls._NEST_UNDER]))
try:
data = {k: json.loads(v) for k, v in databag.items() if k not in BUILTIN_JUJU_KEYS}
except json.JSONDecodeError as e:
msg = f"invalid databag contents: expecting json. {databag}"
log.error(msg)
raise DataValidationError(msg) from e
try:
return cls.parse_raw(json.dumps(data)) # type: ignore
except pydantic.ValidationError as e:
msg = f"failed to validate databag: {databag}"
log.error(msg, exc_info=True)
raise DataValidationError(msg) from e
def dump(self, databag: Optional[MutableMapping] = None, clear: bool = True):
"""Write the contents of this model to Juju databag.
:param databag: the databag to write the data to.
:param clear: ensure the databag is cleared before writing it.
"""
if clear and databag:
databag.clear()
if databag is None:
databag = {}
if self._NEST_UNDER:
databag[self._NEST_UNDER] = self.json()
dct = self.dict()
for key, field in self.__fields__.items(): # type: ignore
value = dct[key]
databag[field.alias or key] = json.dumps(value)
return databag
# todo: import these models from charm-relation-interfaces/ingress/v2 instead of redeclaring them
class IngressUrl(BaseModel):
"""Ingress url schema."""
url: AnyHttpUrl
class IngressProviderAppData(DatabagModel):
"""Ingress application databag schema."""
ingress: IngressUrl
class ProviderSchema(BaseModel):
"""Provider schema for Ingress."""
app: IngressProviderAppData
class IngressRequirerAppData(DatabagModel):
"""Ingress requirer application databag model."""
model: str = Field(description="The model the application is in.")
name: str = Field(description="the name of the app requesting ingress.")
port: int = Field(description="The port the app wishes to be exposed.")
# fields on top of vanilla 'ingress' interface:
strip_prefix: Optional[bool] = Field(
description="Whether to strip the prefix from the ingress url.", alias="strip-prefix"
)
redirect_https: Optional[bool] = Field(
description="Whether to redirect http traffic to https.", alias="redirect-https"
)
scheme: Optional[str] = Field(
default="http", description="What scheme to use in the generated ingress url"
)
@validator("scheme", pre=True)
def validate_scheme(cls, scheme): # noqa: N805 # pydantic wants 'cls' as first arg
"""Validate scheme arg."""
if scheme not in {"http", "https", "h2c"}:
raise ValueError("invalid scheme: should be one of `http|https|h2c`")
return scheme
@validator("port", pre=True)
def validate_port(cls, port): # noqa: N805 # pydantic wants 'cls' as first arg
"""Validate port."""
assert isinstance(port, int), type(port)
assert 0 < port < 65535, "port out of TCP range"
return port
class IngressRequirerUnitData(DatabagModel):
"""Ingress requirer unit databag model."""
host: str = Field(description="Hostname the unit wishes to be exposed.")
@validator("host", pre=True)
def validate_host(cls, host): # noqa: N805 # pydantic wants 'cls' as first arg
"""Validate host."""
assert isinstance(host, str), type(host)
return host
class RequirerSchema(BaseModel):
"""Requirer schema for Ingress."""
app: IngressRequirerAppData
unit: IngressRequirerUnitData
class IngressError(RuntimeError):
"""Base class for custom errors raised by this library."""
class NotReadyError(IngressError):
"""Raised when a relation is not ready."""
class DataValidationError(IngressError):
"""Raised when data validation fails on IPU relation data."""
class _IngressPerAppBase(Object):
"""Base class for IngressPerUnit interface classes."""
def __init__(self, charm: CharmBase, relation_name: str = DEFAULT_RELATION_NAME):
super().__init__(charm, relation_name)
self.charm: CharmBase = charm
self.relation_name = relation_name
self.app = self.charm.app
self.unit = self.charm.unit
observe = self.framework.observe
rel_events = charm.on[relation_name]
observe(rel_events.relation_created, self._handle_relation)
observe(rel_events.relation_joined, self._handle_relation)
observe(rel_events.relation_changed, self._handle_relation)
observe(rel_events.relation_broken, self._handle_relation_broken)
observe(charm.on.leader_elected, self._handle_upgrade_or_leader) # type: ignore
observe(charm.on.upgrade_charm, self._handle_upgrade_or_leader) # type: ignore
@property
def relations(self):
"""The list of Relation instances associated with this endpoint."""
return list(self.charm.model.relations[self.relation_name])
def _handle_relation(self, event):
"""Subclasses should implement this method to handle a relation update."""
pass
def _handle_relation_broken(self, event):
"""Subclasses should implement this method to handle a relation breaking."""
pass
def _handle_upgrade_or_leader(self, event):
"""Subclasses should implement this method to handle upgrades or leadership change."""
pass
class _IPAEvent(RelationEvent):
__args__: Tuple[str, ...] = ()
__optional_kwargs__: Dict[str, Any] = {}
@classmethod
def __attrs__(cls):
return cls.__args__ + tuple(cls.__optional_kwargs__.keys())
def __init__(self, handle, relation, *args, **kwargs):
super().__init__(handle, relation)
if not len(self.__args__) == len(args):
raise TypeError("expected {} args, got {}".format(len(self.__args__), len(args)))
for attr, obj in zip(self.__args__, args):
setattr(self, attr, obj)
for attr, default in self.__optional_kwargs__.items():
obj = kwargs.get(attr, default)
setattr(self, attr, obj)
def snapshot(self):
dct = super().snapshot()
for attr in self.__attrs__():
obj = getattr(self, attr)
try:
dct[attr] = obj
except ValueError as e:
raise ValueError(
"cannot automagically serialize {}: "
"override this method and do it "
"manually.".format(obj)
) from e
return dct
def restore(self, snapshot) -> None:
super().restore(snapshot)
for attr, obj in snapshot.items():
setattr(self, attr, obj)
class IngressPerAppDataProvidedEvent(_IPAEvent):
"""Event representing that ingress data has been provided for an app."""
__args__ = ("name", "model", "hosts", "strip_prefix", "redirect_https")
if typing.TYPE_CHECKING:
name: Optional[str] = None
model: Optional[str] = None
# sequence of hostname, port dicts
hosts: Sequence["IngressRequirerUnitData"] = ()
strip_prefix: bool = False
redirect_https: bool = False
class IngressPerAppDataRemovedEvent(RelationEvent):
"""Event representing that ingress data has been removed for an app."""
class IngressPerAppProviderEvents(ObjectEvents):
"""Container for IPA Provider events."""
data_provided = EventSource(IngressPerAppDataProvidedEvent)
data_removed = EventSource(IngressPerAppDataRemovedEvent)
@dataclass
class IngressRequirerData:
"""Data exposed by the ingress requirer to the provider."""
app: "IngressRequirerAppData"
units: List["IngressRequirerUnitData"]
class TlsProviderType(typing.Protocol):
"""Placeholder."""
@property
def enabled(self) -> bool: # type: ignore
"""Placeholder."""
class IngressPerAppProvider(_IngressPerAppBase):
"""Implementation of the provider of ingress."""
on = IngressPerAppProviderEvents() # type: ignore
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
):
"""Constructor for IngressPerAppProvider.
Args:
charm: The charm that is instantiating the instance.
relation_name: The name of the relation endpoint to bind to
(defaults to "ingress").
"""
super().__init__(charm, relation_name)
def _handle_relation(self, event):
# created, joined or changed: if remote side has sent the required data:
# notify listeners.
if self.is_ready(event.relation):
data = self.get_data(event.relation)
self.on.data_provided.emit( # type: ignore
event.relation,
data.app.name,
data.app.model,
[unit.dict() for unit in data.units],
data.app.strip_prefix or False,
data.app.redirect_https or False,
)
def _handle_relation_broken(self, event):
self.on.data_removed.emit(event.relation) # type: ignore
def wipe_ingress_data(self, relation: Relation):
"""Clear ingress data from relation."""
assert self.unit.is_leader(), "only leaders can do this"
try:
relation.data
except ModelError as e:
log.warning(
"error {} accessing relation data for {!r}. "
"Probably a ghost of a dead relation is still "
"lingering around.".format(e, relation.name)
)
return
del relation.data[self.app]["ingress"]
def _get_requirer_units_data(self, relation: Relation) -> List["IngressRequirerUnitData"]:
"""Fetch and validate the requirer's app databag."""
out: List["IngressRequirerUnitData"] = []
unit: Unit
for unit in relation.units:
databag = relation.data[unit]
try:
data = IngressRequirerUnitData.load(databag)
out.append(data)
except pydantic.ValidationError:
log.info(f"failed to validate remote unit data for {unit}")
raise
return out
@staticmethod
def _get_requirer_app_data(relation: Relation) -> "IngressRequirerAppData":
"""Fetch and validate the requirer's app databag."""
app = relation.app
if app is None:
raise NotReadyError(relation)
databag = relation.data[app]
return IngressRequirerAppData.load(databag)
def get_data(self, relation: Relation) -> IngressRequirerData:
"""Fetch the remote (requirer) app and units' databags."""
try:
return IngressRequirerData(
self._get_requirer_app_data(relation), self._get_requirer_units_data(relation)
)
except (pydantic.ValidationError, DataValidationError) as e:
raise DataValidationError("failed to validate ingress requirer data") from e
def is_ready(self, relation: Optional[Relation] = None):
"""The Provider is ready if the requirer has sent valid data."""
if not relation:
return any(map(self.is_ready, self.relations))
try:
self.get_data(relation)
except (DataValidationError, NotReadyError) as e:
log.debug("Provider not ready; validation error encountered: %s" % str(e))
return False
return True
def _published_url(self, relation: Relation) -> Optional["IngressProviderAppData"]:
"""Fetch and validate this app databag; return the ingress url."""
if not self.is_ready(relation) or not self.unit.is_leader():
# Handle edge case where remote app name can be missing, e.g.,
# relation_broken events.
# Also, only leader units can read own app databags.
# FIXME https://github.com/canonical/traefik-k8s-operator/issues/34
return None
# fetch the provider's app databag
databag = relation.data[self.app]
if not databag.get("ingress"):
raise NotReadyError("This application did not `publish_url` yet.")
return IngressProviderAppData.load(databag)
def publish_url(self, relation: Relation, url: str):
"""Publish to the app databag the ingress url."""
ingress_url = {"url": url}
IngressProviderAppData.parse_obj({"ingress": ingress_url}).dump(relation.data[self.app])
@property
def proxied_endpoints(self) -> Dict[str, str]:
"""Returns the ingress settings provided to applications by this IngressPerAppProvider.
For example, when this IngressPerAppProvider has provided the
`http://foo.bar/my-model.my-app` URL to the my-app application, the returned dictionary
will be:
```
{
"my-app": {
"url": "http://foo.bar/my-model.my-app"
}
}
```
"""
results = {}
for ingress_relation in self.relations:
if not ingress_relation.app:
log.warning(
f"no app in relation {ingress_relation} when fetching proxied endpoints: skipping"
)
continue
try:
ingress_data = self._published_url(ingress_relation)
except NotReadyError:
log.warning(
f"no published url found in {ingress_relation}: "
f"traefik didn't publish_url yet to this relation."
)
continue
if not ingress_data:
log.warning(f"relation {ingress_relation} not ready yet: try again in some time.")
continue
results[ingress_relation.app.name] = ingress_data.ingress.dict()
return results
class IngressPerAppReadyEvent(_IPAEvent):
"""Event representing that ingress for an app is ready."""
__args__ = ("url",)
if typing.TYPE_CHECKING:
url: Optional[str] = None
class IngressPerAppRevokedEvent(RelationEvent):
"""Event representing that ingress for an app has been revoked."""
class IngressPerAppRequirerEvents(ObjectEvents):
"""Container for IPA Requirer events."""
ready = EventSource(IngressPerAppReadyEvent)
revoked = EventSource(IngressPerAppRevokedEvent)
class IngressPerAppRequirer(_IngressPerAppBase):
"""Implementation of the requirer of the ingress relation."""
on = IngressPerAppRequirerEvents() # type: ignore
# used to prevent spurious urls to be sent out if the event we're currently
# handling is a relation-broken one.
_stored = StoredState()
def __init__(
self,
charm: CharmBase,
relation_name: str = DEFAULT_RELATION_NAME,
*,
host: Optional[str] = None,
port: Optional[int] = None,
strip_prefix: bool = False,
redirect_https: bool = False,
# fixme: this is horrible UX.
# shall we switch to manually calling provide_ingress_requirements with all args when ready?
scheme: typing.Callable[[], str] = lambda: "http",
):
"""Constructor for IngressRequirer.
The request args can be used to specify the ingress properties when the
instance is created. If any are set, at least `port` is required, and
they will be sent to the ingress provider as soon as it is available.
All request args must be given as keyword args.
Args:
charm: the charm that is instantiating the library.
relation_name: the name of the relation endpoint to bind to (defaults to `ingress`);
relation must be of interface type `ingress` and have "limit: 1")
host: Hostname to be used by the ingress provider to address the requiring
application; if unspecified, the default Kubernetes service name will be used.
strip_prefix: configure Traefik to strip the path prefix.
redirect_https: redirect incoming requests to HTTPS.
scheme: callable returning the scheme to use when constructing the ingress url.
Request Args:
port: the port of the service
"""
super().__init__(charm, relation_name)
self.charm: CharmBase = charm
self.relation_name = relation_name
self._strip_prefix = strip_prefix
self._redirect_https = redirect_https
self._get_scheme = scheme
self._stored.set_default(current_url=None) # type: ignore
# if instantiated with a port, and we are related, then
# we immediately publish our ingress data to speed up the process.
if port:
self._auto_data = host, port
else:
self._auto_data = None
def _handle_relation(self, event):
# created, joined or changed: if we have auto data: publish it
self._publish_auto_data()
if self.is_ready():
# Avoid spurious events, emit only when there is a NEW URL available
new_url = (
None
if isinstance(event, RelationBrokenEvent)
else self._get_url_from_relation_data()
)
if self._stored.current_url != new_url: # type: ignore
self._stored.current_url = new_url # type: ignore
self.on.ready.emit(event.relation, new_url) # type: ignore
def _handle_relation_broken(self, event):
self._stored.current_url = None # type: ignore
self.on.revoked.emit(event.relation) # type: ignore
def _handle_upgrade_or_leader(self, event):
"""On upgrade/leadership change: ensure we publish the data we have."""
self._publish_auto_data()
def is_ready(self):
"""The Requirer is ready if the Provider has sent valid data."""
try:
return bool(self._get_url_from_relation_data())
except DataValidationError as e:
log.debug("Requirer not ready; validation error encountered: %s" % str(e))
return False
def _publish_auto_data(self):
if self._auto_data:
host, port = self._auto_data
self.provide_ingress_requirements(host=host, port=port)
def provide_ingress_requirements(
self,
*,
scheme: Optional[str] = None,
host: Optional[str] = None,
port: int,
):
"""Publishes the data that Traefik needs to provide ingress.
Args:
scheme: Scheme to be used; if unspecified, use the one used by __init__.
host: Hostname to be used by the ingress provider to address the
requirer unit; if unspecified, FQDN will be used instead
port: the port of the service (required)
"""
for relation in self.relations:
self._provide_ingress_requirements(scheme, host, port, relation)
def _provide_ingress_requirements(
self,
scheme: Optional[str],
host: Optional[str],
port: int,
relation: Relation,
):
if self.unit.is_leader():
self._publish_app_data(scheme, port, relation)
self._publish_unit_data(host, relation)
def _publish_unit_data(
self,
host: Optional[str],
relation: Relation,
):
if not host:
host = socket.getfqdn()
unit_databag = relation.data[self.unit]
try:
IngressRequirerUnitData(host=host).dump(unit_databag)
except pydantic.ValidationError as e:
msg = "failed to validate unit data"
log.info(msg, exc_info=True) # log to INFO because this might be expected
raise DataValidationError(msg) from e
def _publish_app_data(
self,
scheme: Optional[str],
port: int,
relation: Relation,
):
# assumes leadership!
app_databag = relation.data[self.app]
if not scheme:
# If scheme was not provided, use the one given to the constructor.
scheme = self._get_scheme()
try:
IngressRequirerAppData( # type: ignore # pyright does not like aliases
model=self.model.name,
name=self.app.name,
scheme=scheme,
port=port,
strip_prefix=self._strip_prefix, # type: ignore # pyright does not like aliases
redirect_https=self._redirect_https, # type: ignore # pyright does not like aliases
).dump(app_databag)
except pydantic.ValidationError as e:
msg = "failed to validate app data"
log.info(msg, exc_info=True) # log to INFO because this might be expected
raise DataValidationError(msg) from e
@property
def relation(self):
"""The established Relation instance, or None."""
return self.relations[0] if self.relations else None
def _get_url_from_relation_data(self) -> Optional[str]:
"""The full ingress URL to reach the current unit.
Returns None if the URL isn't available yet.
"""
relation = self.relation
if not relation or not relation.app:
return None
# fetch the provider's app databag
try:
databag = relation.data[relation.app]
except ModelError as e:
log.debug(
f"Error {e} attempting to read remote app data; "
f"probably we are in a relation_departed hook"
)
return None
if not databag: # not ready yet
return None
return str(IngressProviderAppData.load(databag).ingress.url)
@property
def url(self) -> Optional[str]:
"""The full ingress URL to reach the current unit.
Returns None if the URL isn't available yet.
"""
data = (
typing.cast(Optional[str], self._stored.current_url) # type: ignore
or self._get_url_from_relation_data()
)
return data

View File

@ -70,7 +70,9 @@ then
# Run py3 on ops-sunbeam
if should_test_ops_sunbeam $2; then
pushd ops-sunbeam
copy_libs_for_ops_sunbeam
stestr run --slowest || exit 1
remove_libs_for_ops_sunbeam
popd
fi
@ -90,9 +92,11 @@ then
# Run coverage on ops-sunbeam
if should_test_ops_sunbeam $2; then
pushd ops-sunbeam
copy_libs_for_ops_sunbeam
coverage erase
PYTHON="coverage run --parallel-mode --omit .tox/*" stestr run --slowest || exit 1
coverage combine
remove_libs_for_ops_sunbeam
popd
fi