Liam Young b156c2a94a Add broken event
Add event to signify the relation is broken and allow charms to
list all remote units.

Change-Id: I06df13e0039f65174a94ab8b9da0daeae0ab8818
2022-12-06 18:55:27 +00:00

318 lines
11 KiB
Python

#!/usr/bin/env python3
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import functools
import json
import logging
from ops.charm import CharmBase, RelationEvent
from ops.framework import (
StoredState,
EventBase,
ObjectEvents,
EventSource,
Object)
UNIT_DATA_KEYS = ['backend-port', 'backend-ip']
APP_DATA_KEYS = ['frontend-port', 'check-type']
SERVICE_NAME_KEY = 'service-name'
PUBLIC_SPACE = "public"
ADMIN_SPACE = "admin"
INTERNAL_SPACE = "internal"
class LoadbalancerRelationBrokenEvent(EventBase):
pass
class LoadbalancerRelationReadyEvent(EventBase):
pass
class LoadbalancerRequestsEvent(EventBase):
pass
class LoadbalancerConfiguredEvent(EventBase):
pass
class OSLoadbalancerEvents(ObjectEvents):
lb_relation_ready = EventSource(LoadbalancerRelationReadyEvent)
lb_requested = EventSource(LoadbalancerRequestsEvent)
lb_configured = EventSource(LoadbalancerConfiguredEvent)
lb_relation_broken = EventSource(LoadbalancerRelationBrokenEvent)
class OSLoadbalancerRequires(Object):
on = OSLoadbalancerEvents()
_stored = StoredState()
def __init__(self, charm: CharmBase, relation_name: str) -> None:
"""Initialise class
:param charm: The charm using this interface.
:param relation_name: Model alias map to store
"""
super().__init__(charm, relation_name)
self.charm = charm
self.relation_name = relation_name
self.framework.observe(
charm.on[self.relation_name].relation_changed,
self._on_relation_changed)
self.framework.observe(
charm.on[self.relation_name].relation_joined,
self._on_relation_joined)
self.framework.observe(
charm.on[self.relation_name].relation_broken,
self._on_relation_broken)
@property
def relations(self) -> list:
"""List relations"""
return self.model.relations[self.relation_name]
def _on_relation_joined(self, event: RelationEvent) -> None:
"""Handle relation joined event
:param event: Event triggering action
"""
self.on.lb_relation_ready.emit()
def _on_relation_broken(self, event: RelationEvent) -> None:
"""Handle relation joined event
:param event: Event triggering action
"""
self.on.lb_relation_broken.emit()
def _on_relation_changed(self, event: RelationEvent) -> None:
"""Handle relation changed event
:param event: Event triggering action
"""
self._process_response()
@property
def units(self) -> list:
"""List all remote units."""
units = []
for relation in self.model.relations[self.relation_name]:
units.extend(relation.units)
return units
def _update_relation_data(self, relation_data: dict,
service: dict) -> dict:
"""Update or add service to requests
The endpoints are a list of dicts for both app data and unit data. This
method updates an entry in the list if it already exists or adds a new
one it it does not.
:param relation_data: Relation data dict.
:param service: Service data
"""
endpoints = [e
for e in json.loads(relation_data.get('endpoints', '[]'))
if e['service-name'] != service['service-name']]
endpoints.append(service)
return endpoints
def request_loadbalancer(self, service_name: str, lb_port: int,
backend_port: int, backend_ip: str,
lb_check_type: str = 'http') -> None:
"""Send request for loadbalancer.
:param service_name: Name of service
:param lb_port: Port the loadbalancer should bind to.
:param backend_port: Port backend is bound to.
:param backend_ip: IP address backend is listening on.
:param lb_check_type: NEEDS UPDATING
"""
unit_data = {
'service-name': service_name,
'backend-port': backend_port,
'backend-ip': backend_ip}
app_data = {
'service-name': service_name,
'frontend-port': lb_port,
'check-type': lb_check_type}
for relation in self.model.relations[self.relation_name]:
if self.model.unit.is_leader():
relation.data[self.model.app]['endpoints'] = json.dumps(
self._update_relation_data(
relation.data[self.model.app],
app_data),
sort_keys=True)
relation.data[self.model.unit]['endpoints'] = json.dumps(
self._update_relation_data(
relation.data[self.model.unit],
unit_data),
sort_keys=True)
def get_frontend_data(self) -> dict:
"""Get the details of the loadbalancers that have been created.
Construct a dictionary of created listeners.
"""
if not self.model.relations[self.relation_name]:
return
data = None
for relation in self.model.relations[self.relation_name]:
data = relation.data[relation.app].get('frontends')
if data:
data = json.loads(data)
return data
def _process_response(self) -> None:
"""Check for a complete response from loadbalancer"""
if self.get_frontend_data():
self.on.lb_configured.emit()
def get_lb_endpoint(self, service_name: str, binding: str):
"""Return the loadbalancer details on a given binding.
:param service_name: Name of service
:param binding: Port the loadbalancer should bind to.
"""
endpoint = None
lb_endpoints = self.get_frontend_data()
if lb_endpoints:
endpoint = lb_endpoints.get(service_name, {}).get(binding)
return endpoint
get_lb_public_endpoint = functools.partialmethod(
get_lb_endpoint,
binding=PUBLIC_SPACE)
get_lb_internal_endpoint = functools.partialmethod(
get_lb_endpoint,
binding=INTERNAL_SPACE)
get_lb_admin_endpoint = functools.partialmethod(
get_lb_endpoint,
binding=ADMIN_SPACE)
class OSLoadbalancerProvides(Object):
on = OSLoadbalancerEvents()
_stored = StoredState()
def __init__(self, charm: str,
relation_name: str = 'loadbalancer') -> None:
"""Initialise class
:param charm: The charm using this interface.
:param relation_name: Model alias map to store
"""
super().__init__(charm, relation_name)
self.relation_name = relation_name
self.framework.observe(
charm.on["loadbalancer"].relation_changed,
self._on_relation_changed)
self.charm = charm
self.service_listeners = collections.defaultdict(dict)
def _on_relation_changed(self, event: RelationEvent) -> None:
"""Handle relation changed event
:param event: Event triggering action
"""
self.on.lb_requested.emit()
def _get_frontends(self) -> dict:
"""Get a dict of requested loadbalancers.
Examine the application data bag across all relations to construct
a dictionary of all requested loadbalancers and their settings.
"""
ep_data = collections.defaultdict(dict)
for relation in self.model.relations[self.relation_name]:
endpoints = json.loads(
relation.data[relation.app].get('endpoints', '[]'))
for ep in endpoints:
for config in APP_DATA_KEYS:
_config_key = config.replace('-', '_')
ep_data[ep[SERVICE_NAME_KEY]][_config_key] = ep[config]
return {'endpoints': ep_data}
def _get_backends(self) -> dict:
"""Get a dict of registered backends.
Examine the unit data bag across all relations to construct
a dictionary of all registered backends for a service.
"""
members = collections.defaultdict(list)
for relation in self.model.relations['loadbalancer']:
units = sorted(
[u for u in relation.units],
key=lambda unit: unit.name)
for unit in units:
unit_name = unit.name.replace('/', '_')
eps = json.loads(relation.data[unit].get('endpoints', '[]'))
for ep in eps:
member_data = {
'unit_name': unit_name}
for config in UNIT_DATA_KEYS:
_config_key = config.replace('-', '_')
member_data[_config_key] = ep[config]
members[ep['service-name']].append(member_data)
return members
def get_loadbalancer_requests(self) -> dict:
"""Return dict of loadbalancer requests.
Match loadbalancer requests with advertised backends.
"""
ep_data = self._get_frontends()
for ep, members in self._get_backends().items():
if ep_data['endpoints'].get(ep):
ep_data['endpoints'][ep]['members'] = members
return ep_data
def _get_requested_service_names(self, relation) -> list:
"""A list of loadbalancer service name requests for a relation"""
requests = json.loads(
relation.data[relation.app].get('endpoints', '[]'))
return [e['service-name'] for e in requests]
def loadbalancer_ready(self, service_name: str, space: str, ips: list,
port: int, protocol: str) -> None:
"""Register a loadbalancer as ready."""
self.service_listeners[service_name][space] = {
'ip': ips,
'port': port,
'protocol': protocol}
def advertise_loadbalancers(self) -> None:
"""Advertise a loadbalancers as ready down the requesting relation
Tell requesters whether their requested loadbalacers are ready.
"""
if not self.model.unit.is_leader():
logging.info("Not sending response, not leader")
return
for relation in self.model.relations[self.relation_name]:
_listeners = {}
for service_name in self._get_requested_service_names(relation):
_listeners[service_name] = self.service_listeners.get(
service_name)
relation.data[self.model.app]['frontends'] = json.dumps(
_listeners,
sort_keys=True)