Add compound status for richer status msgs
Implement a pool of statuses, define a status pool on the charm, and set up some default opinionated statuses in the pool for use by the main workload and the relations. Change-Id: I1aa094138e66bffd02d1bbcce3db79c7fd4058c4
This commit is contained in:
parent
193fe99c40
commit
1517456e9a
@ -38,9 +38,12 @@ import ops.framework
|
||||
import ops.model
|
||||
import ops.pebble
|
||||
|
||||
from ops.model import ActiveStatus
|
||||
|
||||
from lightkube import Client
|
||||
from lightkube.resources.core_v1 import Service
|
||||
|
||||
import ops_sunbeam.compound_status as compound_status
|
||||
import ops_sunbeam.config_contexts as sunbeam_config_contexts
|
||||
import ops_sunbeam.container_handlers as sunbeam_chandlers
|
||||
import ops_sunbeam.core as sunbeam_core
|
||||
@ -62,12 +65,14 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
|
||||
def __init__(self, framework: ops.framework.Framework) -> None:
|
||||
"""Run constructor."""
|
||||
super().__init__(framework)
|
||||
|
||||
self.status = compound_status.Status("workload", priority=100)
|
||||
self.status_pool = compound_status.StatusPool(self)
|
||||
self.status_pool.add(self.status)
|
||||
self._state.set_default(bootstrapped=False)
|
||||
self.relation_handlers = self.get_relation_handlers()
|
||||
self.pebble_handlers = self.get_pebble_handlers()
|
||||
self.framework.observe(self.on.config_changed, self._on_config_changed)
|
||||
# TODO: change update_status based on compound_status feature
|
||||
self.framework.observe(self.on.update_status, self._on_update_status)
|
||||
|
||||
def can_add_handler(
|
||||
self,
|
||||
@ -258,7 +263,7 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
|
||||
for ph in self.pebble_handlers:
|
||||
ph.add_healthchecks()
|
||||
|
||||
self.unit.status = ops.model.ActiveStatus()
|
||||
self.status.set(ActiveStatus(""))
|
||||
self._state.bootstrapped = True
|
||||
|
||||
@property
|
||||
@ -425,22 +430,6 @@ class OSBaseOperatorCharm(ops.charm.CharmBase):
|
||||
logger.error(' %s', line)
|
||||
return False
|
||||
|
||||
def _on_update_status(self, event: ops.framework.EventBase) -> None:
|
||||
"""Update status event handler."""
|
||||
status = []
|
||||
for ph in self.pebble_handlers:
|
||||
ph.assess_status()
|
||||
# Below lines are not required with compound status feature
|
||||
if ph.status:
|
||||
status.append(ph.status)
|
||||
|
||||
# Need to be changed once compound status in place
|
||||
if len(status) == 0:
|
||||
self.unit.status = ops.model.ActiveStatus()
|
||||
else:
|
||||
status_msg = ','.join(status)
|
||||
self.unit.status = ops.model.BlockedStatus(status_msg)
|
||||
|
||||
|
||||
class OSBaseOperatorAPICharm(OSBaseOperatorCharm):
|
||||
"""Base class for OpenStack API operators."""
|
||||
|
227
ops-sunbeam/ops_sunbeam/compound_status.py
Normal file
227
ops-sunbeam/ops_sunbeam/compound_status.py
Normal file
@ -0,0 +1,227 @@
|
||||
"""
|
||||
A mini library for tracking status messages.
|
||||
|
||||
We want this because keeping track of everything
|
||||
with a single unit.status is too difficult.
|
||||
|
||||
The user will still see a single status and message
|
||||
(one deemed to be the highest priority),
|
||||
but the charm can easily set the status of various
|
||||
aspects of the application without clobbering other parts.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from typing import Callable, Dict, Tuple, Optional
|
||||
|
||||
from ops.charm import CharmBase
|
||||
from ops.framework import Handle, Object, StoredStateData, CommitEvent
|
||||
from ops.model import ActiveStatus, StatusBase, UnknownStatus, WaitingStatus
|
||||
from ops.storage import NoSnapshotError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
STATUS_PRIORITIES = {
|
||||
"blocked": 1,
|
||||
"waiting": 2,
|
||||
"maintenance": 3,
|
||||
"active": 4,
|
||||
"unknown": 5,
|
||||
}
|
||||
|
||||
|
||||
class Status:
|
||||
"""
|
||||
An atomic status.
|
||||
|
||||
A wrapper around a StatusBase from ops,
|
||||
that adds a priority, label,
|
||||
and methods for use with a pool of statuses.
|
||||
"""
|
||||
|
||||
def __init__(self, label: str, priority: int = 0) -> None:
|
||||
"""
|
||||
Create a new Status object.
|
||||
|
||||
label: string label
|
||||
priority: integer, higher number is higher priority, default is 0
|
||||
"""
|
||||
self.label: str = label
|
||||
self._priority: int = priority
|
||||
self.never_set = True
|
||||
|
||||
# The actual status of this Status object.
|
||||
# Use `self.set(...)` to update it.
|
||||
self.status: StatusBase = UnknownStatus()
|
||||
|
||||
# if on_update is set,
|
||||
# it will be called as a function with no arguments
|
||||
# whenever the status is set.
|
||||
self.on_update: Optional[Callable[[], None]] = None
|
||||
|
||||
def set(self, status: StatusBase) -> None:
|
||||
"""
|
||||
Set the status.
|
||||
|
||||
Will also run the on_update hook if available
|
||||
(should be set by the pool so the pool knows when it should update).
|
||||
"""
|
||||
self.status = status
|
||||
self.never_set = False
|
||||
if self.on_update is not None:
|
||||
self.on_update()
|
||||
|
||||
def message(self) -> str:
|
||||
"""
|
||||
Get the status message consistently.
|
||||
|
||||
Useful because UnknownStatus has no message attribute.
|
||||
"""
|
||||
if self.status.name == "unknown":
|
||||
return ""
|
||||
return self.status.message
|
||||
|
||||
def priority(self) -> Tuple[int, int]:
|
||||
"""
|
||||
Return a value to use for sorting statuses by priority.
|
||||
|
||||
Used by the pool to retrieve the highest priority status
|
||||
to display to the user.
|
||||
"""
|
||||
return STATUS_PRIORITIES[self.status.name], -self._priority
|
||||
|
||||
def _serialize(self) -> dict:
|
||||
"""Serialize Status for storage."""
|
||||
return {
|
||||
"status": self.status.name,
|
||||
"message": self.message(),
|
||||
}
|
||||
|
||||
|
||||
class StatusPool(Object):
|
||||
"""
|
||||
A pool of Status objects.
|
||||
|
||||
This is implemented as an `Object`,
|
||||
so we can more simply save state between hook executions.
|
||||
"""
|
||||
|
||||
def __init__(self, charm: CharmBase) -> None:
|
||||
"""
|
||||
Init the status pool and restore from stored state if available.
|
||||
|
||||
Note that instantiating more than one StatusPool here is not supported,
|
||||
due to hardcoded framework stored data IDs.
|
||||
If we want that in the future,
|
||||
we'll need to generate a custom deterministic ID.
|
||||
I can't think of any cases where
|
||||
more than one StatusPool is required though...
|
||||
"""
|
||||
super().__init__(charm, "status_pool")
|
||||
self._pool: Dict[str, Status] = {}
|
||||
self._charm = charm
|
||||
|
||||
# Restore info from the charm's state.
|
||||
# We need to do this on init,
|
||||
# so we can retain previous statuses that were set.
|
||||
charm.framework.register_type(
|
||||
StoredStateData, self, StoredStateData.handle_kind
|
||||
)
|
||||
stored_handle = Handle(
|
||||
self, StoredStateData.handle_kind, "_status_pool"
|
||||
)
|
||||
|
||||
try:
|
||||
self._state = charm.framework.load_snapshot(stored_handle)
|
||||
status_state = json.loads(self._state["statuses"])
|
||||
except NoSnapshotError:
|
||||
self._state = StoredStateData(self, "_status_pool")
|
||||
status_state = []
|
||||
self._status_state = status_state
|
||||
|
||||
# 'commit' is an ops framework event
|
||||
# that tells the object to save a snapshot of its state for later.
|
||||
charm.framework.observe(charm.framework.on.commit, self._on_commit)
|
||||
|
||||
def add(self, status: Status) -> None:
|
||||
"""
|
||||
Idempotently add a status object to the pool.
|
||||
|
||||
Reconstitute from saved state if it's a new status.
|
||||
"""
|
||||
if (
|
||||
status.never_set and
|
||||
status.label in self._status_state and
|
||||
status.label not in self._pool
|
||||
):
|
||||
# If this status hasn't been seen or set yet,
|
||||
# and we have saved state for it,
|
||||
# then reconstitute it.
|
||||
# This allows us to retain statuses across hook invocations.
|
||||
saved = self._status_state[status.label]
|
||||
status.status = StatusBase.from_name(
|
||||
saved["status"],
|
||||
saved["message"],
|
||||
)
|
||||
|
||||
self._pool[status.label] = status
|
||||
status.on_update = self.on_update
|
||||
self.on_update()
|
||||
|
||||
def summarise(self) -> str:
|
||||
"""
|
||||
Return a human readable summary of all the statuses in the pool.
|
||||
|
||||
Will be a multi-line string.
|
||||
"""
|
||||
lines = []
|
||||
for status in sorted(self._pool.values(), key=lambda x: x.priority()):
|
||||
lines.append("{label:>30}: {status:>10} | {message}".format(
|
||||
label=status.label,
|
||||
message=status.message(),
|
||||
status=status.status.name,
|
||||
))
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _on_commit(self, _event: CommitEvent) -> None:
|
||||
"""
|
||||
Store the current state of statuses.
|
||||
|
||||
So we can restore them on the next run of the charm.
|
||||
"""
|
||||
self._state["statuses"] = json.dumps(
|
||||
{
|
||||
status.label: status._serialize()
|
||||
for status in self._pool.values()
|
||||
}
|
||||
)
|
||||
self._charm.framework.save_snapshot(self._state)
|
||||
self._charm.framework._storage.commit()
|
||||
|
||||
def on_update(self) -> None:
|
||||
"""
|
||||
Update the unit status with the current highest priority status.
|
||||
|
||||
Use as a hook to run whenever a status is updated in the pool.
|
||||
"""
|
||||
status = (
|
||||
sorted(self._pool.values(), key=lambda x: x.priority())[0]
|
||||
if self._pool
|
||||
else None
|
||||
)
|
||||
if status is None or status.status.name == "unknown":
|
||||
self._charm.unit.status = WaitingStatus("no status set yet")
|
||||
elif status.status.name == "active" and not status.message():
|
||||
# Avoid status name prefix if everything is active with no message.
|
||||
# If there's a message, then we want the prefix
|
||||
# to help identify where the message originates.
|
||||
self._charm.unit.status = ActiveStatus("")
|
||||
else:
|
||||
message = status.message()
|
||||
self._charm.unit.status = StatusBase.from_name(
|
||||
status.status.name,
|
||||
"({}){}".format(
|
||||
status.label,
|
||||
" " + message if message else "",
|
||||
)
|
||||
)
|
@ -22,11 +22,14 @@ in the container.
|
||||
import collections
|
||||
import logging
|
||||
|
||||
import ops_sunbeam.compound_status as compound_status
|
||||
import ops_sunbeam.core as sunbeam_core
|
||||
import ops_sunbeam.templating as sunbeam_templating
|
||||
import ops.charm
|
||||
import ops.pebble
|
||||
|
||||
from ops.model import ActiveStatus, WaitingStatus, BlockedStatus
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import List, TypedDict
|
||||
|
||||
@ -65,9 +68,14 @@ class PebbleHandler(ops.charm.Object):
|
||||
self.openstack_release = openstack_release
|
||||
self.callback_f = callback_f
|
||||
self.setup_pebble_handler()
|
||||
# The structure of status variable and corresponding logic
|
||||
# will change with compund status feature
|
||||
self.status = ""
|
||||
|
||||
self.status = compound_status.Status("container:" + container_name)
|
||||
self.charm.status_pool.add(self.status)
|
||||
|
||||
self.framework.observe(
|
||||
self.charm.on.update_status,
|
||||
self._on_update_status
|
||||
)
|
||||
|
||||
def setup_pebble_handler(self) -> None:
|
||||
"""Configure handler for pebble ready event."""
|
||||
@ -233,42 +241,42 @@ class PebbleHandler(ops.charm.Object):
|
||||
logger.error("Not able to add Healthcheck layer")
|
||||
logger.exception(connect_error)
|
||||
|
||||
def assess_status(self) -> str:
|
||||
"""Assess Healthcheck status.
|
||||
def _on_update_status(self, event: ops.framework.EventBase) -> None:
|
||||
"""Assess and set status.
|
||||
|
||||
:return: status message based on healthchecks
|
||||
:rtype: str
|
||||
Also takes into account healthchecks.
|
||||
"""
|
||||
failed_checks = []
|
||||
if not self.pebble_ready:
|
||||
self.status.set(WaitingStatus("pebble not ready"))
|
||||
return
|
||||
|
||||
if not self.service_ready:
|
||||
self.status.set(WaitingStatus("service not ready"))
|
||||
return
|
||||
|
||||
failed = []
|
||||
container = self.charm.unit.get_container(self.container_name)
|
||||
try:
|
||||
checks = container.get_checks(level=ops.pebble.CheckLevel.READY)
|
||||
checks = container.get_checks(level=ops.pebble.CheckLevel.READY)
|
||||
for name, check in checks.items():
|
||||
if check.status != ops.pebble.CheckStatus.UP:
|
||||
failed.append(name)
|
||||
|
||||
# Verify alive checks if ready checks are missing
|
||||
if not checks:
|
||||
checks = container.get_checks(
|
||||
level=ops.pebble.CheckLevel.ALIVE)
|
||||
for name, check in checks.items():
|
||||
if check.status != ops.pebble.CheckStatus.UP:
|
||||
failed_checks.append(name)
|
||||
failed.append(name)
|
||||
|
||||
# Verify alive checks if ready checks are missing
|
||||
if not checks:
|
||||
checks = container.get_checks(
|
||||
level=ops.pebble.CheckLevel.ALIVE)
|
||||
for name, check in checks.items():
|
||||
if check.status != ops.pebble.CheckStatus.UP:
|
||||
failed_checks.append(name)
|
||||
if failed:
|
||||
self.status.set(BlockedStatus('healthcheck{} failed: {}'.format(
|
||||
's' if len(failed) > 1 else '',
|
||||
', '.join(failed)
|
||||
)))
|
||||
return
|
||||
|
||||
except ops.model.ModelError:
|
||||
logger.warning(
|
||||
f'Health check online for {self.container_name} not defined')
|
||||
except ops.pebble.ConnectionError as connect_error:
|
||||
logger.exception(connect_error)
|
||||
failed_checks.append("Pebble Connection Error")
|
||||
|
||||
if failed_checks:
|
||||
self.status = (
|
||||
f'Health check failed for {self.container_name}: '
|
||||
f'{failed_checks}'
|
||||
)
|
||||
else:
|
||||
self.status = ''
|
||||
self.status.set(ActiveStatus(""))
|
||||
|
||||
def _start_all(self, restart: bool = True) -> None:
|
||||
"""Start services in container.
|
||||
|
@ -52,7 +52,7 @@ def guard(
|
||||
It also handles errors which can be interpreted as a Block rather than the
|
||||
charm going into error.
|
||||
|
||||
:param charm: the charm class (so that unit status can be set)
|
||||
:param charm: the charm class (so that status can be set)
|
||||
:param section: the name of the section (for debugging/info purposes)
|
||||
:handle_exception: whether to handle the exception to a BlockedStatus()
|
||||
:log_traceback: whether to log the traceback for debugging purposes.
|
||||
@ -72,7 +72,7 @@ def guard(
|
||||
logger.warning(
|
||||
"Charm is blocked in section '%s' due to '%s'", section, str(e)
|
||||
)
|
||||
charm.unit.status = BlockedStatus(e.msg)
|
||||
charm.status.set(BlockedStatus(e.msg))
|
||||
except Exception as e:
|
||||
# something else went wrong
|
||||
if handle_exception:
|
||||
@ -83,8 +83,8 @@ def guard(
|
||||
import traceback
|
||||
|
||||
logging.error(traceback.format_exc())
|
||||
charm.unit.status = BlockedStatus(
|
||||
charm.status.set(BlockedStatus(
|
||||
"Error in charm (see logs): {}".format(str(e))
|
||||
)
|
||||
))
|
||||
return
|
||||
raise
|
||||
|
@ -22,8 +22,10 @@ from urllib.parse import urlparse
|
||||
|
||||
import ops.charm
|
||||
import ops.framework
|
||||
from ops.model import BlockedStatus, ActiveStatus, WaitingStatus, UnknownStatus
|
||||
|
||||
import ops_sunbeam.interfaces as sunbeam_interfaces
|
||||
import ops_sunbeam.compound_status as compound_status
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -64,6 +66,27 @@ class RelationHandler(ops.charm.Object):
|
||||
self.callback_f = callback_f
|
||||
self.interface = self.setup_event_handler()
|
||||
self.mandatory = mandatory
|
||||
status = compound_status.Status(self.relation_name)
|
||||
self.charm.status_pool.add(status)
|
||||
self.set_status(status)
|
||||
|
||||
def set_status(self, status: compound_status.Status) -> None:
|
||||
"""
|
||||
Set the status based on current state.
|
||||
|
||||
Will be called once, during construction,
|
||||
after everything else is initialised.
|
||||
Override this in a child class if custom logic should be used.
|
||||
"""
|
||||
if not self.model.relations.get(self.relation_name):
|
||||
if self.mandatory:
|
||||
status.set(BlockedStatus("relation missing"))
|
||||
else:
|
||||
status.set(UnknownStatus())
|
||||
elif self.ready:
|
||||
status.set(ActiveStatus(""))
|
||||
else:
|
||||
status.set(WaitingStatus("relation incomplete"))
|
||||
|
||||
def setup_event_handler(self) -> ops.charm.Object:
|
||||
"""Configure event handlers for the relation.
|
||||
|
@ -51,7 +51,9 @@ deps = {[testenv:py3]deps}
|
||||
|
||||
[testenv:pep8]
|
||||
basepython = python3
|
||||
deps = {[testenv]deps}
|
||||
deps =
|
||||
{[testenv]deps}
|
||||
-r{toxinidir}/requirements.txt
|
||||
commands = flake8 {posargs} unit_tests ops_sunbeam --exclude unit_tests/lib
|
||||
|
||||
[testenv:cover]
|
||||
|
@ -49,7 +49,7 @@ class ApplicationCharm(CharmBase):
|
||||
self._start_application(config_file)
|
||||
|
||||
# Set active status
|
||||
self.unit.status = ActiveStatus("received database credentials")
|
||||
self.status.set(ActiveStatus("received database credentials"))
|
||||
```
|
||||
|
||||
As shown above, the library provides some custom events to handle specific situations,
|
||||
|
179
ops-sunbeam/unit_tests/test_compound_status.py
Normal file
179
ops-sunbeam/unit_tests/test_compound_status.py
Normal file
@ -0,0 +1,179 @@
|
||||
# Copyright 2021 Canonical Ltd.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""Test compound_status."""
|
||||
|
||||
import mock
|
||||
import sys
|
||||
|
||||
sys.path.append("lib") # noqa
|
||||
sys.path.append("src") # noqa
|
||||
|
||||
from ops.model import ActiveStatus, BlockedStatus, UnknownStatus, WaitingStatus
|
||||
|
||||
import ops_sunbeam.charm as sunbeam_charm
|
||||
import ops_sunbeam.compound_status as compound_status
|
||||
import ops_sunbeam.test_utils as test_utils
|
||||
from . import test_charms
|
||||
|
||||
|
||||
class TestCompoundStatus(test_utils.CharmTestCase):
|
||||
"""Test for the compound_status module."""
|
||||
|
||||
PATCHES = []
|
||||
|
||||
def setUp(self) -> None:
|
||||
"""Charm test class setup."""
|
||||
self.container_calls = test_utils.ContainerCalls()
|
||||
super().setUp(sunbeam_charm, self.PATCHES)
|
||||
self.harness = test_utils.get_harness(
|
||||
test_charms.MyCharm,
|
||||
test_charms.CHARM_METADATA,
|
||||
self.container_calls,
|
||||
charm_config=test_charms.CHARM_CONFIG,
|
||||
initial_charm_config=test_charms.INITIAL_CHARM_CONFIG,
|
||||
)
|
||||
self.harness.begin()
|
||||
self.addCleanup(self.harness.cleanup)
|
||||
|
||||
def test_status_triggering_on_set(self) -> None:
|
||||
"""Updating a status should call the on_update function if set."""
|
||||
status = compound_status.Status("test")
|
||||
|
||||
# this shouldn't fail, even though it's not connected to a pool yet,
|
||||
# and thus has no on_update set.
|
||||
status.set(WaitingStatus("test"))
|
||||
|
||||
# manually set the on_update hook and verify it is called
|
||||
on_update_mock = mock.Mock()
|
||||
status.on_update = on_update_mock
|
||||
status.set(ActiveStatus("test"))
|
||||
on_update_mock.assert_called_once_with()
|
||||
|
||||
def test_status_new_unknown_message(self) -> None:
|
||||
"""New status should be unknown status and empty message."""
|
||||
status = compound_status.Status("test")
|
||||
self.assertIsInstance(status.status, UnknownStatus)
|
||||
self.assertEqual(status.message(), "")
|
||||
|
||||
def test_serializing_status(self) -> None:
|
||||
"""Serialising a status should work as expected."""
|
||||
status = compound_status.Status("mylabel")
|
||||
self.assertEqual(
|
||||
status._serialize(),
|
||||
{
|
||||
"status": "unknown",
|
||||
"message": "",
|
||||
},
|
||||
)
|
||||
|
||||
# now with a message and new status
|
||||
status.set(WaitingStatus("still waiting..."))
|
||||
self.assertEqual(
|
||||
status._serialize(),
|
||||
{
|
||||
"status": "waiting",
|
||||
"message": "still waiting...",
|
||||
},
|
||||
)
|
||||
|
||||
# with a custom priority
|
||||
status = compound_status.Status("mylabel", priority=12)
|
||||
self.assertEqual(
|
||||
status._serialize(),
|
||||
{
|
||||
"status": "unknown",
|
||||
"message": "",
|
||||
},
|
||||
)
|
||||
|
||||
def test_status_pool_priority(self) -> None:
|
||||
"""A status pool should display the highest priority status."""
|
||||
pool = self.harness.charm.status_pool
|
||||
|
||||
status1 = compound_status.Status("test1")
|
||||
pool.add(status1)
|
||||
status2 = compound_status.Status("test2", priority=100)
|
||||
pool.add(status2)
|
||||
status3 = compound_status.Status("test3", priority=30)
|
||||
pool.add(status3)
|
||||
|
||||
status1.set(WaitingStatus(""))
|
||||
status2.set(WaitingStatus(""))
|
||||
status3.set(WaitingStatus(""))
|
||||
|
||||
# status2 has highest priority
|
||||
self.assertEqual(
|
||||
self.harness.charm.unit.status, WaitingStatus("(test2)")
|
||||
)
|
||||
|
||||
# status3 will new be displayed,
|
||||
# since blocked is more severe than waiting
|
||||
status3.set(BlockedStatus(":("))
|
||||
self.assertEqual(
|
||||
self.harness.charm.unit.status, BlockedStatus("(test3) :(")
|
||||
)
|
||||
|
||||
def test_add_status_idempotency(self) -> None:
|
||||
"""Should not be issues if add same status twice."""
|
||||
pool = self.harness.charm.status_pool
|
||||
|
||||
status1 = compound_status.Status("test1", priority=200)
|
||||
pool.add(status1)
|
||||
|
||||
status1.set(WaitingStatus("test"))
|
||||
self.assertEqual(
|
||||
self.harness.charm.unit.status,
|
||||
WaitingStatus("(test1) test"),
|
||||
)
|
||||
|
||||
new_status1 = compound_status.Status("test1", priority=201)
|
||||
new_status1.set(BlockedStatus(""))
|
||||
pool.add(new_status1)
|
||||
|
||||
# should be the new object in the pool
|
||||
self.assertIs(new_status1, pool._pool["test1"])
|
||||
self.assertEqual(new_status1.priority(), (1, -201))
|
||||
self.assertEqual(
|
||||
self.harness.charm.unit.status,
|
||||
BlockedStatus("(test1)"),
|
||||
)
|
||||
|
||||
def test_all_active_status(self) -> None:
|
||||
"""Should not be issues if add same status twice."""
|
||||
pool = self.harness.charm.status_pool
|
||||
|
||||
status1 = compound_status.Status("test1")
|
||||
pool.add(status1)
|
||||
status2 = compound_status.Status("test2", priority=150)
|
||||
pool.add(status2)
|
||||
status3 = compound_status.Status("test3", priority=30)
|
||||
pool.add(status3)
|
||||
|
||||
status1.set(ActiveStatus(""))
|
||||
status2.set(ActiveStatus(""))
|
||||
status3.set(ActiveStatus(""))
|
||||
|
||||
# also need to manually activate other default statuses
|
||||
pool._pool["container:my-service"].set(ActiveStatus(""))
|
||||
|
||||
# all empty messages should end up as an empty unit status
|
||||
self.assertEqual(self.harness.charm.unit.status, ActiveStatus(""))
|
||||
|
||||
# if there's a message (on the highest priority status),
|
||||
# it should also show the status prefix
|
||||
status2.set(ActiveStatus("a message"))
|
||||
self.assertEqual(
|
||||
self.harness.charm.unit.status, ActiveStatus("(test2) a message")
|
||||
)
|
Loading…
x
Reference in New Issue
Block a user