Add run_once_per_unit decorator

This change adds a decorator `run_once_per_unit` which can be used
to limit running a method to once per instantiation of a unit.

This decorator relies on using ops framework BoundStoredState with
use_juju_for_storage=False as the storage must be cleared when
the unit is recycled. (use_juju_for_storage=True is deprecated
by the ops framework so should not be used anyway). To enforce
this the charm will throw an exeption if use_juju_for_storage=True
all existing sunbeam charms will need updating to accomadate
this.

There is a follow up patch which adds `run_once_per_app` which is
a decorator that runs once per version of the application (so only
the leader runs it) but this is more complicated and less useful
(imo) so I will propose it seperatly.

Change-Id: I5109f1a462208eba5bfa7dd484448511d686bf19
This commit is contained in:
Liam Young 2023-02-24 09:52:34 +00:00
parent e56122a481
commit a086398d86
3 changed files with 208 additions and 1 deletions

View File

@ -41,6 +41,7 @@ import ops.charm
import ops.framework
import ops.model
import ops.pebble
import ops.storage
from lightkube import (
Client,
)
@ -72,7 +73,13 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
def __init__(self, framework: ops.framework.Framework) -> None:
"""Run constructor."""
super().__init__(framework)
if isinstance(self.framework._storage, ops.storage.JujuStorage):
raise ValueError(
(
"use_juju_for_storage=True is deprecated and not supported "
"by ops_sunbeam"
)
)
self.status = compound_status.Status("workload", priority=100)
self.status_pool = compound_status.StatusPool(self)
self.status_pool.add(self.status)

View File

@ -0,0 +1,99 @@
# Copyright 2023 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for controlling whether jobs should run.
In general it is better for a command to be a noop if run when it is not
needed but in some cases the commands are expensive or disruptive in which
case these helpers can limit how frequently they are run.
"""
import logging
import time
from functools import (
wraps,
)
import ops.framework
import ops_sunbeam
logger = logging.getLogger(__name__)
def run_once_per_unit(label):
"""Run once per instantiation of a unit.
This is designed for commands which only need to be run once on each
instantiation of a unit.
Note: This decorator can only be used within a charm derived from
ops_sunbeam.charm.OSBaseOperatorCharm.
Example usage:
class MyCharm(ops_sunbeam.charm.OSBaseOperatorCharm):
...
@run_once_per_unit('a2enmod')
def enable_apache_module(self):
check_call(['a2enmod', 'wsgi'])
"""
def wrap(f):
@wraps(f)
def wrapped_f(
charm: ops_sunbeam.charm.OSBaseOperatorCharm, *args, **kwargs
):
"""Run once decorator.
:param charm: Instance of charm
"""
storage = LocalJobStorage(charm._state)
if label in storage:
logging.warning(
f"Not running {label}, it has run previously for this unit"
)
else:
logging.warning(
f"Running {label}, it has not run on this unit before"
)
f(charm, *args, **kwargs)
storage.add(label)
return wrapped_f
return wrap
class LocalJobStorage:
"""Class to store job info of jobs run on the local unit."""
def __init__(self, storage: ops.framework.BoundStoredState):
"""Setup job history storage."""
self.storage = storage
try:
self.storage.run_once
except AttributeError:
self.storage.run_once = {}
def get_labels(self):
"""Return all job entries."""
return self.storage.run_once
def __contains__(self, key):
"""Check if label is in list or run jobs."""
return key in self.get_labels().keys()
def add(self, key):
"""Add the label of job that has run."""
self.storage.run_once[key] = str(time.time())

View File

@ -0,0 +1,101 @@
# Copyright 2021 Canonical Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Test job ctrl code."""
import sys
import mock
sys.path.append("lib") # noqa
sys.path.append("src") # noqa
import ops_sunbeam.job_ctrl as sunbeam_job_ctrl
import ops_sunbeam.test_utils as test_utils
from . import (
test_charms,
)
class JobCtrlCharm(test_charms.MyAPICharm):
"""Test charm that use job ctrl code."""
unit_job_counter = 1
@sunbeam_job_ctrl.run_once_per_unit("unit-job")
def unit_specific_job(self):
"""Run a dummy once per unit job."""
self.unit_job_counter = self.unit_job_counter + 1
class TestJobCtrl(test_utils.CharmTestCase):
"""Test for the OSBaseOperatorCharm class."""
PATCHES = ["time"]
@mock.patch(
"charms.observability_libs.v0.kubernetes_service_patch."
"KubernetesServicePatch"
)
def setUp(self, mock_svc_patch: mock.patch) -> None:
"""Charm test class setup."""
self.container_calls = test_utils.ContainerCalls()
super().setUp(sunbeam_job_ctrl, self.PATCHES)
self.harness = test_utils.get_harness(
JobCtrlCharm,
test_charms.API_CHARM_METADATA,
self.container_calls,
charm_config=test_charms.CHARM_CONFIG,
initial_charm_config=test_charms.INITIAL_CHARM_CONFIG,
)
# clean up events that were dynamically defined,
# otherwise we get issues because they'll be redefined,
# which is not allowed.
from charms.data_platform_libs.v0.database_requires import (
DatabaseEvents,
)
for attr in (
"database_database_created",
"database_endpoints_changed",
"database_read_only_endpoints_changed",
):
try:
delattr(DatabaseEvents, attr)
except AttributeError:
pass
self.addCleanup(self.harness.cleanup)
self.harness.begin()
def test_local_job_storage(self) -> None:
"""Test local job storage."""
local_job_storage = sunbeam_job_ctrl.LocalJobStorage(
self.harness.charm._state
)
self.assertEqual(dict(local_job_storage.get_labels()), {})
local_job_storage.add("my-job")
self.assertIn("my-job", local_job_storage.get_labels())
def test_run_once_per_unit(self) -> None:
"""Test run_once_per_unit decorator."""
self.harness.charm._state.run_once = {}
call_counter = self.harness.charm.unit_job_counter
self.harness.charm.unit_specific_job()
expected_count = call_counter + 1
self.assertEqual(expected_count, self.harness.charm.unit_job_counter)
self.harness.charm.unit_specific_job()
# The call count should be unchanged as the job should not have
# run
self.assertEqual(expected_count, self.harness.charm.unit_job_counter)