Add unit tests for dccertmon

Introduce mocked unit tests to cover the core functionality of the
dccertmon service. These include tests migrated from the config
repository that previously validated subcloud certificate audits.

Additionally, this change adds dedicated tests for the Notification
Audit Queue, covering:
 - Enqueuing logic
 - Timestamp-based ordering
 - Requeuing on failure with delay
 - Concurrent audits from both queues
 - Lock enforcement when auditing the same subcloud

The unit test coverage for certificate_monitor_manager was improved
to 93%, and for subcloud_audit_queue to 97%.

Test Plan:
PASS: Successfully built distributedcloud package with these changes.
PASS: Run tox and verify that all tests pass.
PASS: All of the tests were created considering the output of 'tox -c
      tox.ini -e cover' command.

Story: 2011311
Task: 52181

Change-Id: I6b07d30c380cd9fee761f5f11876073ef9353645
Signed-off-by: Enzo Candotti <Enzo.Candotti@windriver.com>
This commit is contained in:
Enzo Candotti
2025-05-13 19:41:06 -03:00
parent 2f6eb4c374
commit 2fb911b015
13 changed files with 1120 additions and 2 deletions

View File

@@ -5,6 +5,7 @@ source =
dcmanager
dcorch
dccommon
dccertmon
dcagent
dcdbsync
omit =

View File

@@ -20,6 +20,7 @@ from oslo_log import log
from oslo_serialization import base64
from oslo_utils import encodeutils
# TODO(ecandotti): Replace six library with urllib/requests
from six.moves.urllib.error import HTTPError
from six.moves.urllib.error import URLError
from six.moves.urllib.parse import urlparse
@@ -27,7 +28,7 @@ from six.moves.urllib.request import Request
from six.moves.urllib.request import urlopen
# pylint: disable=import-error
# TODO(srana): copy sys_kube to dccertmon/common
# TODO(ecandotti):Import from dccommon/kubeoperator.py
from sysinv.common import kubernetes as sys_kube
# pylint: enable=import-error
@@ -52,6 +53,7 @@ INVALID_SUBCLOUD_AUDIT_DEPLOY_STATES = [
"secondary-failed",
]
# TODO(ecandotti): Move constants to dccommon and remove if already present
# Subcloud sync status
ENDPOINT_TYPE_DC_CERT = "dc-cert"
@@ -405,6 +407,7 @@ def get_token():
return token
# TODO(ecandotti): Improve token retrieval using EndpointCache or keystoneauth1
def get_dc_token(region_name=constants.SYSTEM_CONTROLLER_REGION):
"""Get token for the dcmanager user.

View File

@@ -10,6 +10,8 @@ import os
import re
from dateutil.parser import parse
# TODO(ecandotti): Replace six library with urllib/requests
from six.moves.urllib.error import URLError
from kubernetes import __version__ as K8S_MODULE_VERSION
@@ -23,7 +25,7 @@ from oslo_serialization import base64
from oslo_utils import encodeutils
# pylint: disable=import-error
# TODO(srana): copy sys_kube to dccertmon/common
# TODO(ecandotti): import from dccommon/kubeoperator.py
from sysinv.common import kubernetes as sys_kube
# pylint: enable=import-error

View File

@@ -0,0 +1,30 @@
#
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import mock
from oslo_config import cfg
from oslotest import base
from dccertmon.common import config
class DCCertMonTestCase(base.BaseTestCase):
"""Test case base class for all unit tests."""
def setUp(self):
super().setUp()
config.register_config_opts()
cfg.CONF([], project="dccertmon", default_config_files=[])
cfg.CONF.set_override("auth_uri", "http://fake:5000/v3", group="endpoint_cache")
def _mock_object(self, target, attribute, wraps=None):
"""Mock a specified target's attribute and return the mock object"""
mock_patch_object = mock.patch.object(target, attribute, wraps=wraps)
self.addCleanup(mock_patch_object.stop)
return mock_patch_object.start()

View File

@@ -0,0 +1,7 @@
from oslo_config import cfg
from dccertmon.common import config
# Ensure config options are registered before importing any module using CONF
config.register_config_opts()
cfg.CONF([], project="dccertmon", default_config_files=[])

View File

@@ -0,0 +1,64 @@
-----BEGIN CERTIFICATE-----
MIIFmDCCA4CgAwIBAgIBATANBgkqhkiG9w0BAQsFADBcMQswCQYDVQQGEwJDQTEL
MAkGA1UECAwCT04xDzANBgNVBAcMBk90dGF3YTENMAsGA1UECgwEV1JDUDEMMAoG
A1UECwwDT3JnMRIwEAYDVQQDDAlsb2NhbGhvc3QwHhcNMjEwNjI4MTkwOTM5WhcN
MjgwOTI5MTkwOTM5WjBbMQswCQYDVQQGEwJDQTELMAkGA1UECAwCT04xDTALBgNV
BAoMBFdSQ1AxDDAKBgNVBAsMA09yZzEiMCAGA1UEAwwZbG9jYWxob3N0IEludGVy
bWVkaWF0ZSBDQTCCAiIwDQYJKoZIhvcNAQEBBQADggIPADCCAgoCggIBAKp9wfxx
Ei6farnJEKvYqvrRCKxfJPXsxkl6L5VzPmhNcE1ZVKS0kJwKGOFj11u9oVjZW0mg
FvKkO9wyQi51qH7oP+wapcY4D3eWsxwnmb22T4rN9y4pR8p7oZDBdDQnTYWOskiD
y/zZktmiJ2q7CqT50a8ZULK35TSoyZGL3f5QFambgmEtmicYmKsytkWWnmyj0sJG
Rs8sgnKO6LxG6M0k4Kyl2uesSVvoITPXveZclDLTaT0xLZfLA/CppaFWaoLJfyDB
F4xT62ExXzVKO+uEoJugBM7GBTqrdnQHKwMz+5MG6PrRPpgMuu5nlzXyKpT/RStJ
rOAuoIUNF+f1vJKzp2S6nXe+VgKJ8IkxONAu6GZTxkf3KNG3WN3JsTA7a1WVB1Lj
0e+ZJv7JeomwRZP6Qi+V+FI/rzRMlk0fRSLKh0NzAvJMsLrjxicpUQyhdctgqz7s
Rop0d1W9ZEwG2JyjoNC5T9cu++M3sSkyr4AZU/OZjIx7Fsv5IJKY/d39Mpb5ohzA
6e5ytt/f0vihvvZ5iDQo863KVz6EHii+Y43/V/ZU1XgJfffooQpIbadbNSgGD5Ja
B40gWq2NxtVzdA0BW+zq3o5wocuxON0esJqjBJV0g+D+iY/P+mceQgJE3nloSG4h
i8Vgs44+evcdi/Theyywwja9fx/rNdHDRhudAgMBAAGjZjBkMB0GA1UdDgQWBBQO
85wRSbaj2r7tboYBpRzcaWiP9zAfBgNVHSMEGDAWgBQLijSZJvvFXfbD8I2BdwMc
NhdAlDASBgNVHRMBAf8ECDAGAQH/AgEAMA4GA1UdDwEB/wQEAwIBhjANBgkqhkiG
9w0BAQsFAAOCAgEAnJtwZeMfgZcDjJviPS5dxv9RxDMebRidBcFhUNr2uZRLhz/x
HSZ+/u8kYM2DkylAvVRypoIvFwzef2tX1OIJnn5P8ww//uAYh6Z8fPuT/++8M7da
76u1DfsUgOnwTAGd99m4SFlh19hjpkn8+cQ/JOLhjldQw5LnWmqlTc/dPnO7fRLF
af8kqHNTHaz1yJJMvYd3IhZYW+sB7uQk/+bdutBycBpVNeTXV7bT1e+zVllYuAY2
Sw0CxQENb4h/QKNu0wpTVvk0wJaEVB5Zg69/gFzJPVZmjJ8RAUuGkPlSZ+rgi1Xk
tZ3CvJEuOqvGj3q30Zh6hGqvT6pL+kI+MAn0tiPT5+xGXKGOCpsmOSYx+O8ncynS
ZHP3NkRMPaj6x/zx2OkCmsK22WLa5Yv3x4eIRmPWScyj+Rw9pPlmMiSgdDAUAmXL
XN6EuveNUnjmPPnSE8R7UO3dUlrV12Mc6yCu+M8fljInpA5oanXfFf095U79RKYS
P88Ov1NooqzAUtvScfOp1CEZRpWCktkXz0GdiphZjiIqWWlod7nS9L96Xy83Sm+L
/kDmTF8Rswrd5fedBbf+ZKzdZYl5lC/6RYsWc4tK5B64qZYmYD2YGcM8i8xMn45V
O8QS40gegrVJamKMwEhhcaXjgZTymaXAG0ypkAU1PorSWS3DxgKBaP4I3fA=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFmTCCA4GgAwIBAgIUHzDAj50gsWu1ur+dW7z/6Zkx3t0wDQYJKoZIhvcNAQEL
BQAwXDELMAkGA1UEBhMCQ0ExCzAJBgNVBAgMAk9OMQ8wDQYDVQQHDAZPdHRhd2Ex
DTALBgNVBAoMBFdSQ1AxDDAKBgNVBAsMA09yZzESMBAGA1UEAwwJbG9jYWxob3N0
MB4XDTIxMDYyODE4MDMyN1oXDTMxMDYyNjE4MDMyN1owXDELMAkGA1UEBhMCQ0Ex
CzAJBgNVBAgMAk9OMQ8wDQYDVQQHDAZPdHRhd2ExDTALBgNVBAoMBFdSQ1AxDDAK
BgNVBAsMA09yZzESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEAtQj+/6gE5gSKFys6QoUUEFQRjdPWFJ+upjJtNdZhNOEf
rrJUj79NL6OK00E6QvhloMPtRwFUujaiVz6mHsv+NFRy+2hyPepTMZWU+g1JlPdY
nU/VAf2yiCVQ8V6npu/sexK+H9Uqwg14LJ8pZDfVpxpja6RvW/dXm3MSvf1Yw6Jk
MCfetOCakcqwxI4BA0y95dfJdxhHjd3RkpjGUehlVyGILBNXNFJ3Re1FPl3ZjwHT
9DhtI4LB14+A0n2iAfz9oQpA0Y2uJaXLjmNHTBGvXqHlxZUGIORKy2H/R0D/1e93
SOAQ9MBSOkzV0Bf9JOgUOKvwD3HfbS5gXo9AX3oPkpGqdQDGqU4O+KjlzMlUJrNm
GBspYQfjGouimXFO7XaNgC3TX5w+p0HNL4JPxP49QuBiEpp6wRHmlUa2vAcLR+Ok
jcFz5KRFAd9dOlkD2ZIRHPHBQw5GToHH2gxQ9SScIYqLeUCy3LO0QbclsV5b/JLq
YJOKNwqdfhdU19s1uFbYmKGa1KWaAbsrZwDWEYs3ZR5vGGF0R62Y1w48BgxasNK4
lZL5dmtg1pHvhwCmBuWrs5HTAiWwnC+CqMexSmh5Bp7dFjljN0jifnDOI8K+nfeJ
+BELCfQC3rVP4Ce+Tqd93fmFYWXeirtk1V2svqFsa2CfycEbQSY7U43FwLjrVE0C
AwEAAaNTMFEwHQYDVR0OBBYEFAuKNJkm+8Vd9sPwjYF3Axw2F0CUMB8GA1UdIwQY
MBaAFAuKNJkm+8Vd9sPwjYF3Axw2F0CUMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
hvcNAQELBQADggIBACzBZlCt7rHjHM9VdK+Bhu+WUeLV76jdq4NRCUaz4peizHxY
3+Cyon43LxGTxuUKVb+OibkkJiOBr5M8aIXXBD+d4UEfjccEpDGHEppMiYx2uo8O
Jj04rP1Nm1ZrZ/qe5CDPAPCLIDFYhsf1SJ01ypmrsL633IpGxqcLm0mYqk4ypgf6
sa+cxlWv44cAvrvUjL+oUqqFdwMFRu2SRSe0Q/zJo8ZTxM0CQI+vq7rMWwBLgZcF
OLHMhdNRUXWhlpQvnNHN2j35WpY+c+udHpeOcgD03PYIjtf69H8FXclXTYXIrdM9
H39BH1oZd0NwtVaAzzQZl07wmlUW7pwFEhrpq4PwrOLgjIW3qr/6snIODkoManhg
fhwExaLr/GDS0tIxt7CNd349vWDGstFIoP1otfKqApsC24kY/icOyZ+IpoZTf2lf
ipy2R32FEMu/u0Y/DEYKxlaqFyZvOYdgUDMQ7IP7aveykw1iRAtCW5zJRjpaYb1n
juqBou6WqzpcLsBPABn0ELSc7IDr8V1PQNx4mEtXOhb6cyZ2+V59rhq+Jf8Fn196
p/TxP8+5GwgGhXTQJpoSC/PV6Hclqm9FGfomVqZLfEiHVC2xsfBf4GNi/DIqo0VL
ee9mqghQGDQ6agMfIEw8dsX19uEKvsfCx5O0VvCzZmre4U1ZzqtuLY2OMhjU
-----END CERTIFICATE-----

View File

@@ -0,0 +1,32 @@
-----BEGIN CERTIFICATE-----
MIIFmTCCA4GgAwIBAgIUHzDAj50gsWu1ur+dW7z/6Zkx3t0wDQYJKoZIhvcNAQEL
BQAwXDELMAkGA1UEBhMCQ0ExCzAJBgNVBAgMAk9OMQ8wDQYDVQQHDAZPdHRhd2Ex
DTALBgNVBAoMBFdSQ1AxDDAKBgNVBAsMA09yZzESMBAGA1UEAwwJbG9jYWxob3N0
MB4XDTIxMDYyODE4MDMyN1oXDTMxMDYyNjE4MDMyN1owXDELMAkGA1UEBhMCQ0Ex
CzAJBgNVBAgMAk9OMQ8wDQYDVQQHDAZPdHRhd2ExDTALBgNVBAoMBFdSQ1AxDDAK
BgNVBAsMA09yZzESMBAGA1UEAwwJbG9jYWxob3N0MIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEAtQj+/6gE5gSKFys6QoUUEFQRjdPWFJ+upjJtNdZhNOEf
rrJUj79NL6OK00E6QvhloMPtRwFUujaiVz6mHsv+NFRy+2hyPepTMZWU+g1JlPdY
nU/VAf2yiCVQ8V6npu/sexK+H9Uqwg14LJ8pZDfVpxpja6RvW/dXm3MSvf1Yw6Jk
MCfetOCakcqwxI4BA0y95dfJdxhHjd3RkpjGUehlVyGILBNXNFJ3Re1FPl3ZjwHT
9DhtI4LB14+A0n2iAfz9oQpA0Y2uJaXLjmNHTBGvXqHlxZUGIORKy2H/R0D/1e93
SOAQ9MBSOkzV0Bf9JOgUOKvwD3HfbS5gXo9AX3oPkpGqdQDGqU4O+KjlzMlUJrNm
GBspYQfjGouimXFO7XaNgC3TX5w+p0HNL4JPxP49QuBiEpp6wRHmlUa2vAcLR+Ok
jcFz5KRFAd9dOlkD2ZIRHPHBQw5GToHH2gxQ9SScIYqLeUCy3LO0QbclsV5b/JLq
YJOKNwqdfhdU19s1uFbYmKGa1KWaAbsrZwDWEYs3ZR5vGGF0R62Y1w48BgxasNK4
lZL5dmtg1pHvhwCmBuWrs5HTAiWwnC+CqMexSmh5Bp7dFjljN0jifnDOI8K+nfeJ
+BELCfQC3rVP4Ce+Tqd93fmFYWXeirtk1V2svqFsa2CfycEbQSY7U43FwLjrVE0C
AwEAAaNTMFEwHQYDVR0OBBYEFAuKNJkm+8Vd9sPwjYF3Axw2F0CUMB8GA1UdIwQY
MBaAFAuKNJkm+8Vd9sPwjYF3Axw2F0CUMA8GA1UdEwEB/wQFMAMBAf8wDQYJKoZI
hvcNAQELBQADggIBACzBZlCt7rHjHM9VdK+Bhu+WUeLV76jdq4NRCUaz4peizHxY
3+Cyon43LxGTxuUKVb+OibkkJiOBr5M8aIXXBD+d4UEfjccEpDGHEppMiYx2uo8O
Jj04rP1Nm1ZrZ/qe5CDPAPCLIDFYhsf1SJ01ypmrsL633IpGxqcLm0mYqk4ypgf6
sa+cxlWv44cAvrvUjL+oUqqFdwMFRu2SRSe0Q/zJo8ZTxM0CQI+vq7rMWwBLgZcF
OLHMhdNRUXWhlpQvnNHN2j35WpY+c+udHpeOcgD03PYIjtf69H8FXclXTYXIrdM9
H39BH1oZd0NwtVaAzzQZl07wmlUW7pwFEhrpq4PwrOLgjIW3qr/6snIODkoManhg
fhwExaLr/GDS0tIxt7CNd349vWDGstFIoP1otfKqApsC24kY/icOyZ+IpoZTf2lf
ipy2R32FEMu/u0Y/DEYKxlaqFyZvOYdgUDMQ7IP7aveykw1iRAtCW5zJRjpaYb1n
juqBou6WqzpcLsBPABn0ELSc7IDr8V1PQNx4mEtXOhb6cyZ2+V59rhq+Jf8Fn196
p/TxP8+5GwgGhXTQJpoSC/PV6Hclqm9FGfomVqZLfEiHVC2xsfBf4GNi/DIqo0VL
ee9mqghQGDQ6agMfIEw8dsX19uEKvsfCx5O0VvCzZmre4U1ZzqtuLY2OMhjU
-----END CERTIFICATE-----

View File

@@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA0wlvfCcZ6G/BfdiRp6i0K3aNaGeFJ/9b7QSOsSxa4FQGGqnB
GzH5mqiJ3OUU0gPh11jPYPtJvszH2aAaesNIemK+oWYG5Py73f2XEr6IhziFEsny
h+ckjlwG0paW5fmnrXQODcqYWRGZbNIciRFxo8V0RSQ5yXcUlaU2zi5UGUhB5O7U
sNZZl1Bq/914dg3oSCTsLrzVSXbdpKRK3GRlWdH17XzfjUQxVMvMz4trWMYDAELv
XBk0rGJztMAj0GrLLy44QwO/5OcBEwk6gc01Iazh+t7/RUbTA7+ZVa2cmc7ztK0+
nhpgF3FWzitVi13hetkx3OuCuhj8rJ0xdTcCewIDAQABAoIBAQDSgxEAiYigggpD
XKs+0VyYFnRMdyculN2/+tHUDmjAaVCFfFwyopFYI1MSVDmGnXTE+cQz+7a+a0vX
3ItNdktwOdvq5/lspmdIs9PlUMakDE9CRMx3oKyojUgI/UqdMYJ/1crHGxcJkjK3
iIgKeqofMbIf3lj+jJiuBdY7qZ8eb1N84P20o+CAq1ILDVmD7GYdm5tJGbRDpcp6
HV/3cLZgMfcZwTYdtvOre+p1yK3BAC0uX8366TzaoceFc2po8QgEm99PIEXBlLk1
6uaHKWf27pxB4mIM3tRvEZDhKUzIn1Zdl5mkLGUX4NozpVpLDlu/C4wXM4h+2UGI
AjWFviVBAoGBAPPKw/9mdGmNMQc/Ob4sci8sNT3iAqfHZovkHs8nHLexDGsC3QWG
tXLj55KYjt14AHcti7oQt+IERq6y60aAhd+uiMdlOo6mHUhzlnBBu+QwFXeWUsNk
LHfVC/vRzuv0mm8gKxI0/q8/lhFX99aDNrAYMIgnpUuh6i8pD8UiLrzxAoGBAN2a
xby7NcDvxOPFR0U2P46/Hzl+Gm8OMPBE7WtmJOvZN9v2MNvq1CD2zWHQWHaATJEV
fnsKzT0upkaAjB1qGhAsNh6Aqq8OjLc57G7IH3rHullPPhyB/0rAZ9GaGpSZiQMP
nhzbfgnP4kfRMYXYDhq052pIY7AMV47srjWKaaYrAoGBAJfjSW+S5o5ogBZSxuf8
Cvvm4Bj4+cyMSBB6BNPtO1Aax3DYbNmnjt7QrRNBZykGPcwCnV5EUjxHi74GDN10
73Nn6yHHqM79IZtlIGI6IhTN/GHwTwobHDVgj8HJetC5KYp4kT0btV18EUQm+Ws1
ftoBiCMcCDjx3NSbEY8xd8sRAoGAdYeOc+g+PBNuMvcAM57v2n9WyiEtV9UI84U6
/gjwb/2GeKx7gUdsNgdvKf9by42EDjZ+HDfAXkCNgzrOFROeuxEXqtnGrI1k6BBa
au7Mc0vWc2Npp6jyguzEow++JS3A0tTHoLpwgDe25Sv3veq2oEdtrJqz9Dy3e1/f
jRnZqn0CgYEA1ywZJiktYA7dw4C2rvK7SdXwZhdNiko/gXVOHMgpiUxK49NBERaL
UjVDEYFfGEva4+5DhbXH06FsU6E14bgjmx8+3DJZwUO3Lt7IzEauF1AHNb4Rl8Fe
UFvtzHl1kxHWammr0sHGedApGaF9EZMPMrl0GaBuZwXryeHiKmTyRic=
-----END RSA PRIVATE KEY-----

View File

@@ -0,0 +1,597 @@
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import os.path
import time
import base64 as pybase64
import eventlet
import greenlet
import mock
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from dccertmon.common import certificate_monitor_manager as cert_mon_manager
from dccertmon.common import constants
from dccertmon.common import subcloud_audit_queue
import dccertmon.common.watcher
from dccertmon.tests.base import DCCertMonTestCase
OPT_GROUP_NAME = "dccertmon"
cfg.CONF.import_group(OPT_GROUP_NAME, "keystonemiddleware.auth_token")
class CertMonManagerBase(DCCertMonTestCase):
def setUp(self):
super(CertMonManagerBase, self).setUp()
self.manager = cert_mon_manager.CertificateMonitorManager()
self.manager.sc_audit_pool = None # Force serial audit for testing
self.config_fixture = self.useFixture(config_fixture.Config(cfg.CONF))
self.manager.dc_token_cache = self._mock_token_cache()
self.mock_get_subcloud = self._mock_object(
cert_mon_manager.utils, "get_subcloud"
)
self.mock_update_subcloud_status = self._mock_object(
cert_mon_manager.utils, "update_subcloud_status"
)
self.mock_is_subcloud_online = self._mock_object(
cert_mon_manager.utils, "is_subcloud_online"
)
self.mock_build_endpoint = self._mock_object(
cert_mon_manager.utils.SubcloudSysinvEndpointCache, "build_endpoint"
)
self.mock_get_endpoint_certificate = self._mock_object(
cert_mon_manager.utils, "get_endpoint_certificate"
)
self.mock_get_intermediate_ca = self._mock_object(
cert_mon_manager.utils, "get_sc_intermediate_ca_secret"
)
self.mock_verify_ca = self._mock_object(
cert_mon_manager.utils, "verify_intermediate_ca_cert"
)
self.mock_update_ca_cert = self._mock_object(
cert_mon_manager.utils, "update_subcloud_ca_cert"
)
self.mock_get_dc_token = self._mock_object(
cert_mon_manager.utils, "get_dc_token"
)
# Default return values
self.mock_get_subcloud.return_value = self._mock_subcloud()
self.mock_build_endpoint.return_value = "http://fake"
self.mock_get_endpoint_certificate.return_value = (
self._get_valid_certificate_pem()
)
self.mock_get_intermediate_ca.return_value = (
self._get_sc_intermediate_ca_secret()
)
self.mock_is_subcloud_online.return_value = True
self.mock_get_dc_token.return_value = "fake-token"
def _mock_token_cache(self):
mock_token_cache = mock.Mock()
mock_token_cache.get_token.return_value = "fake-token"
return mock_token_cache
def _mock_subcloud(self, status="completed", ip="1.1.1.1"):
return {"deploy-status": status, "management-start-ip": ip}
def _mock_dcmanager_subcloud(self, name, ip, status):
return {"name": name, "management_ip": ip, "dc-cert": status}
def _get_data_file_path(self, file_name):
return os.path.join(os.path.dirname(__file__), "data", file_name)
def _get_valid_certificate_pem(self):
cert_filename = self._get_data_file_path("audit/cacert.pem")
with open(cert_filename, "r") as cfile:
cert_file = cfile.read()
return cert_file
def _get_sc_intermediate_ca_secret(self):
cert_filename = self._get_data_file_path("audit/ca-chain-bundle.cert.pem")
key_filename = self._get_data_file_path("audit/cakey.pem")
cacert_filename = self._get_data_file_path("audit/cacert.pem")
with open(cert_filename, "rb") as cfile:
tls_cert = pybase64.b64encode(cfile.read()).decode("utf-8")
with open(key_filename, "rb") as kfile:
tls_key = pybase64.b64encode(kfile.read()).decode("utf-8")
with open(cacert_filename, "rb") as cafile:
ca_cert = pybase64.b64encode(cafile.read()).decode("utf-8")
secret = mock.Mock()
secret.data = {
"tls.crt": tls_cert,
"tls.key": tls_key,
"ca.crt": ca_cert,
}
return secret
class TestAuditQueueBehavior(CertMonManagerBase):
def test_audit_notification_queue_task_delegates_to_process_audit_queue(self):
"""Test that audit_notification_queue_task delegates to _process_audit_queue."""
mock_process = self._mock_object(self.manager, "_process_audit_queue")
self.manager.audit_notification_queue_task(None)
mock_process.assert_called_once_with(
self.manager.sc_notify_audit_queue, "audit_notification_queue_task"
)
def test_do_subcloud_audit_logs_exception(self):
"""Ensure that exceptions during subcloud audit are logged."""
item = subcloud_audit_queue.SubcloudAuditData("fail-subcloud")
mock_queue = mock.Mock()
mock_queue.task_done = mock.Mock()
mock_sc_audit = self._mock_object(self.manager, "_subcloud_audit")
mock_sc_audit.side_effect = Exception("exc")
mock_log = self._mock_object(cert_mon_manager.LOG, "exception")
self.manager.do_subcloud_audit(mock_queue, item)
mock_log.assert_called_once_with(
"An error occurred during the subcloud audit task"
)
mock_queue.task_done.assert_called_once()
def test_process_audit_queue_uses_greenpool(self):
"""Test that _process_audit_queue uses GreenPool to spawn audit workers."""
queue = subcloud_audit_queue.SubcloudAuditPriorityQueue()
item = subcloud_audit_queue.SubcloudAuditData("test-subcloud")
queue.enqueue(item, delay_secs=0)
self.manager.sc_audit_pool = mock.Mock()
self.manager._process_audit_queue(queue, "test-queue")
self.manager.sc_audit_pool.spawn_n.assert_called_once()
def test_audit_sc_cert_task_shallow(self):
"""Test the audit_sc_cert_task basic queuing functionality.
Mocks beginning at do_subcloud_audit.
"""
mock_do_audit = self._mock_object(self.manager, "do_subcloud_audit")
mock_do_audit.return_value = None
# Add two items with future timestamps
self.manager.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test1"), delay_secs=1
)
self.manager.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test2"), delay_secs=2
)
self.assertEqual(self.manager.sc_audit_queue.qsize(), 2)
# Run audit immediately, it should not have picked up anything
self.manager.audit_sc_cert_task(None)
mock_do_audit.assert_not_called()
self.assertEqual(self.manager.sc_audit_queue.qsize(), 2)
time.sleep(3)
self.manager.audit_sc_cert_task(None)
# It should now be drained:
mock_do_audit.assert_called()
self.assertEqual(self.manager.sc_audit_queue.qsize(), 0)
mock_do_audit.reset_mock()
self.manager.audit_sc_cert_task(None)
mock_do_audit.assert_not_called()
def test_audit_sc_cert_task_deep(self):
"""Validate a complete subcloud audit flow with all utils mocked"""
# also need to mock the TokenCache
with mock.patch(
"sysinv.cert_mon.utils.TokenCache", get_token=mock.DEFAULT
) as token_cache_mock:
token_cache_mock["get_token"].return_value = None # don"t care
self.manager.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test1"), delay_secs=1
)
self.manager.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test2"), delay_secs=2
)
self.assertEqual(self.manager.sc_audit_queue.qsize(), 2)
# Run audit immediately, it should not have picked up anything
self.manager.audit_sc_cert_task(None)
self.assertEqual(self.manager.sc_audit_queue.qsize(), 2)
time.sleep(3)
self.manager.audit_sc_cert_task(None)
# It should now be drained:
self.assertEqual(self.manager.sc_audit_queue.qsize(), 0)
def test_requeue_audit_subcloud_enqueues_if_not_present(self):
"""Test that requeue_audit_subcloud adds the subcloud if not present."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
self.manager.sc_audit_queue = mock.Mock()
self.manager.sc_audit_queue.contains.return_value = False
self.manager.requeue_audit_subcloud(
self.manager.sc_audit_queue, item, delay_secs=42
)
self.manager.sc_audit_queue.enqueue.assert_called_once_with(item, 42)
def test_requeue_audit_subcloud_skips_if_already_queued(self):
"""Test that requeue_audit_subcloud does not enqueue duplicates."""
queue = mock.Mock()
queue.contains.return_value = True
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
self.manager.requeue_audit_subcloud(queue, item, delay_secs=42)
queue.enqueue.assert_not_called()
def test_audit_subcloud_allow_requeue(self):
"""Test that audit_subcloud requeues if allow_requeue is True and count < 2."""
queue = subcloud_audit_queue.SubcloudAuditPriorityQueue()
subcloud_name = "subcloud1"
queue.enqueue(subcloud_audit_queue.SubcloudAuditData(subcloud_name))
mock_enqueue = self._mock_object(queue, "enqueue")
self.manager.audit_subcloud(subcloud_name, queue, allow_requeue=True)
mock_enqueue.assert_called_once()
def test_audit_subcloud_ignored_if_already_in_queue(self):
"""Test that audit_subcloud does not requeue if allow_requeue is False."""
queue = subcloud_audit_queue.SubcloudAuditPriorityQueue()
subcloud_name = "subcloud2"
# Enqueue once so it's already present
queue.enqueue(subcloud_audit_queue.SubcloudAuditData(subcloud_name))
mock_enqueue = self._mock_object(queue, "enqueue")
self.manager.audit_subcloud(subcloud_name, queue, allow_requeue=False)
mock_enqueue.assert_not_called()
class TestSubcloudAuditFlow(CertMonManagerBase):
def setUp(self):
super().setUp()
self.mock_requeue = self._mock_object(self.manager, "requeue_audit_subcloud")
def test_subcloud_sysinv_endpoint_update(self):
"""Test that subcloud_sysinv_endpoint_update calls update_endpoints."""
self.mock_update_endpoints = self._mock_object(
cert_mon_manager.utils.SubcloudSysinvEndpointCache, "update_endpoints"
)
self.manager.subcloud_sysinv_endpoint_update("sc1", "http://sysinv.sc1")
self.mock_update_endpoints.assert_called_once_with({"sc1": "http://sysinv.sc1"})
def test_subcloud_audit_invalid_deploy_status(self):
"""Test that subcloud audit exits early for invalid deploy status."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
self.mock_get_subcloud.return_value = self._mock_subcloud(
status="create-failed"
)
self.manager._subcloud_audit(
self.manager.sc_audit_queue, item, subcloud_name="subcloud1"
)
self.mock_update_subcloud_status.assert_not_called()
def test_subcloud_audit_endpoint_failure_then_retry(self):
"""Test that audit retries on endpoint failure when subcloud is online."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
item.audit_count = 0
self.mock_build_endpoint.side_effect = Exception("network error")
self.manager._subcloud_audit(
self.manager.sc_audit_queue, item, subcloud_name="subcloud1"
)
self.mock_requeue.assert_called_once_with(
self.manager.sc_audit_queue, item, mock.ANY
)
self.mock_update_subcloud_status.assert_not_called()
def test_subcloud_audit_subcloud_offline(self):
"""Test that no retry happens when the subcloud is offline."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
item.audit_count = 0
self.mock_build_endpoint.side_effect = Exception("network error")
self.mock_is_subcloud_online.return_value = False
self.manager._subcloud_audit(
self.manager.sc_audit_queue, item, subcloud_name="subcloud1"
)
self.mock_requeue.assert_not_called()
self.mock_update_subcloud_status.assert_not_called()
def test_subcloud_audit_missing_cert_data(self):
"""Test that audit exits when intermediate cert data is incomplete."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
self.mock_get_endpoint_certificate.return_value = "cert"
self.mock_get_intermediate_ca.return_value = {"data": {"ca.crt": "ca"}}
self.manager._subcloud_audit(
self.manager.sc_audit_queue, item, subcloud_name="subcloud1"
)
self.mock_requeue.assert_not_called()
self.mock_update_subcloud_status.assert_not_called()
def test_subcloud_audit_cert_chain_out_of_sync(self):
"""Test audit flow when intermediate CA is out-of-sync and needs update."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
self.mock_verify_ca.return_value = False
mock_log = self._mock_object(cert_mon_manager.LOG, "exception")
self.manager._subcloud_audit(
self.manager.sc_audit_queue, item, subcloud_name="subcloud1"
)
mock_log.assert_not_called()
self.mock_verify_ca.assert_called_once()
self.mock_update_ca_cert.assert_called_once()
self.mock_update_subcloud_status.assert_not_called()
def test_subcloud_audit_cert_already_in_sync(self):
"""Test audit flow when the intermediate CA cert is already in sync."""
item = subcloud_audit_queue.SubcloudAuditData("subcloud1")
self.mock_verify_ca.return_value = True
self.manager._subcloud_audit(
self.manager.sc_audit_queue,
item,
subcloud_name="subcloud1",
)
self.mock_verify_ca.assert_called_once()
self.mock_update_subcloud_status.assert_called_once_with(
self.manager.dc_token_cache.get_token(),
"subcloud1",
mock.ANY,
)
class TestStartupAuditBehavior(CertMonManagerBase):
"""Tests related to subcloud auditing triggered during service startup."""
def setUp(self):
super().setUp()
# Patch functions and methods commonly used across tests
self.mock_get_dc_role = self._mock_object(cert_mon_manager.utils, "get_dc_role")
self.mock_get_subclouds = self._mock_object(
cert_mon_manager.utils, "get_subclouds_from_dcmanager"
)
self.mock_cache_endpoints = self._mock_object(
cert_mon_manager.utils.SubcloudSysinvEndpointCache,
"cache_endpoints_by_ip",
)
def test_on_start_audits_out_of_sync_subclouds(self):
"""Test that on_start enqueues only out-of-sync subclouds."""
self.mock_get_dc_role.return_value = constants.DC_ROLE_SYSTEMCONTROLLER
self.mock_get_subclouds.return_value = [
self._mock_dcmanager_subcloud("subcloud1", "192.168.101.2", "out-of-sync"),
self._mock_dcmanager_subcloud("subcloud2", "192.168.101.3", "in-sync"),
]
self.manager.token_cache = self._mock_token_cache()
mock_enqueue = self._mock_object(self.manager.sc_audit_queue, "enqueue")
self.manager.on_start()
mock_enqueue.assert_called_once()
audit_data = mock_enqueue.call_args[0][0]
self.assertEqual(audit_data.name, "subcloud1")
def test_on_start_not_systemcontroller(self):
"""Test that on_start does nothing if not systemcontroller."""
self.mock_get_dc_role.return_value = "subcloud"
mock_enqueue = self._mock_object(self.manager.sc_audit_queue, "enqueue")
self.manager.on_start()
mock_enqueue.assert_not_called()
def test_on_start_with_startup_audit_all(self):
"""Test that on_start triggers full audit when startup_audit_all is set."""
self.mock_get_dc_role.return_value = constants.DC_ROLE_SYSTEMCONTROLLER
self.config_fixture.config(startup_audit_all=True, group=OPT_GROUP_NAME)
mock_audit = self._mock_object(self.manager, "audit_sc_cert_start")
self.manager.on_start()
mock_audit.assert_called_once()
def test_on_start_skips_subcloud_already_under_audit(self):
"""Test that subcloud already under audit is not re-enqueued."""
self.config_fixture.config(startup_audit_all=False, group=OPT_GROUP_NAME)
self.mock_get_dc_role.return_value = constants.DC_ROLE_SYSTEMCONTROLLER
self.mock_get_subclouds.return_value = [
self._mock_dcmanager_subcloud("subcloud1", "192.168.101.2", "out-of-sync")
]
self.manager.token_cache = self._mock_token_cache()
# Manually enqueue before running on_start
self.manager.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("subcloud1")
)
mock_enqueue = self._mock_object(self.manager.sc_audit_queue, "enqueue")
self.manager.on_start()
mock_enqueue.assert_not_called()
def test_audit_sc_cert_start_enqueues_all_subclouds(self):
"""Test that audit_sc_cert_start enqueues all subclouds for auditing."""
self.mock_get_dc_role.return_value = constants.DC_ROLE_SYSTEMCONTROLLER
self.mock_get_subclouds.return_value = [
{"name": "sc1", "management_ip": "1.2.3.4"},
{"name": "sc2", "management_ip": "1.2.3.5"},
]
self.manager.token_cache = self._mock_token_cache()
mock_enqueue = self._mock_object(self.manager.sc_audit_queue, "enqueue")
self.manager.audit_sc_cert_start(None)
self.assertEqual(mock_enqueue.call_count, 2)
def test_audit_sc_cert_start_not_systemcontroller(self):
"""Test that audit_sc_cert_start skips if not systemcontroller."""
self.mock_get_dc_role.return_value = "subcloud"
mock_enqueue = self._mock_object(self.manager.sc_audit_queue, "enqueue")
self.manager.audit_sc_cert_start(None)
mock_enqueue.assert_not_called()
class TestRetryMechanism(CertMonManagerBase):
def setUp(self):
super().setUp()
# Create a reusable mock task
self.fake_task = mock.Mock()
self.fake_task.get_id.return_value = "task1"
self.fake_task.run.return_value = True
self.fake_task.number_of_reattempt = 1
self.fake_task.failed = mock.Mock()
def test_retry_monitor_task(self):
"""Test that retry_monitor_task removes successful tasks from the queue."""
self.manager.reattempt_monitor_tasks = [self.fake_task]
self._mock_object(time, "sleep")
self.manager.retry_monitor_task(None)
self.assertNotIn(self.fake_task, self.manager.reattempt_monitor_tasks)
def test_retry_monitor_task_failed_permanently(self):
"""Test retry_monitor_task removes and fails task after max attempts."""
self.fake_task.run.return_value = False
self.fake_task.number_of_reattempt = cfg.CONF.dccertmon.max_retry
self.manager.reattempt_monitor_tasks = [self.fake_task]
self._mock_object(eventlet, "sleep")
self.manager.retry_monitor_task(None)
self.assertNotIn(self.fake_task, self.manager.reattempt_monitor_tasks)
self.fake_task.failed.assert_called_once()
def test_add_reattempt_monitor_task(self):
"""Test that a new reattempt task is added to the retry list."""
self.fake_task.get_id.return_value = "task-123"
self.manager.reattempt_monitor_tasks = [self.fake_task]
mock_purge = self._mock_object(self.manager, "_purge_reattempt_monitor_task")
self.manager._add_reattempt_monitor_task(self.fake_task)
mock_purge.assert_called_once_with("task-123", "for new reattempt")
self.assertIn(self.fake_task, self.manager.reattempt_monitor_tasks)
def test_purge_reattempt_monitor_task_removes_task(self):
"""Test that purge does nothing if the task is not found."""
self.fake_task.get_id.return_value = "task-abc"
self.manager.reattempt_monitor_tasks = [self.fake_task]
self.manager._purge_reattempt_monitor_task("task-abc", "reason")
self.assertNotIn(self.fake_task, self.manager.reattempt_monitor_tasks)
def test_purge_reattempt_monitor_task_not_found(self):
"""Test that purge does nothing if the task is not found."""
self.fake_task.get_id.return_value = "other-task"
self.manager.reattempt_monitor_tasks = [self.fake_task]
# Should not raise
self.manager._purge_reattempt_monitor_task("non-existent", "reason")
self.assertIn(self.fake_task, self.manager.reattempt_monitor_tasks)
class TestTaskExecutorLifecycle(CertMonManagerBase):
def test_start_and_stop_task_executor(self):
"""Test that the task executor thread starts and stops correctly."""
mock_on_start = self._mock_object(self.manager, "on_start")
mock_spawn = self._mock_object(eventlet.greenthread, "spawn")
thread_mock = mock.Mock()
thread_mock.kill = mock.Mock()
thread_mock.wait = mock.Mock()
mock_spawn.return_value = thread_mock
self.manager.start_task_executor()
mock_on_start.assert_called_once()
mock_spawn.assert_called_once()
self.assertIsNotNone(self.manager.worker_thread)
self.manager.stop_task_executor()
thread_mock.kill.assert_called_once()
thread_mock.wait.assert_called_once()
self.assertIsNone(self.manager.worker_thread)
def test_worker_task_loop_exits_on_greenlet_exit(self):
"""Test that worker_task_loop exits gracefully on GreenletExit."""
mock_sleep = self._mock_object(time, "sleep")
mock_sleep.side_effect = greenlet.GreenletExit
self.manager.worker_task_loop() # Should exit cleanly without error
def test_worker_task_loop_handles_generic_exception(self):
"""Test worker_task_loop logs exceptions and continues."""
self.manager.run_periodic_tasks = mock.Mock(
side_effect=[Exception("exc"), greenlet.GreenletExit]
)
self._mock_object(time, "sleep")
mock_log = self._mock_object(cert_mon_manager.LOG, "exception")
self.manager.worker_task_loop()
mock_log.assert_called_once()
class TestCertWatcherBehavior(CertMonManagerBase):
def test_stop_cert_watcher(self):
"""Test that stop_cert_watcher kills and clears mon_thread."""
thread = mock.Mock()
self.manager.mon_thread = thread
self.manager.stop_cert_watcher()
thread.kill.assert_called_once()
thread.wait.assert_called_once()
self.assertIsNone(self.manager.mon_thread)
def test_start_cert_watcher_retry_on_exception(self):
"""Test that start_cert_watcher retries once on failure."""
mock_class = self._mock_object(dccertmon.common.watcher, "DC_CertWatcher")
mock_class.side_effect = Exception("fail"), mock.Mock()
mock_sleep = self._mock_object(time, "sleep")
mock_spawn = self._mock_object(eventlet.greenthread, "spawn")
self.manager.start_cert_watcher()
self.assertEqual(mock_class.call_count, 2)
mock_sleep.assert_called_once()
mock_spawn.assert_called_once()
def test_stop_cert_watcher_when_mon_thread_is_none(self):
"""Test stop_cert_watcher does nothing if mon_thread is None."""
self.manager.mon_thread = None
self.manager.stop_cert_watcher()
def test_monitor_cert_loop_greenlet_exit(self):
"""Test that monitor_cert_loop exits on GreenletExit from start_watch."""
fake_monitor = mock.Mock()
fake_monitor.start_watch.side_effect = greenlet.GreenletExit
self.manager.monitor_cert_loop(fake_monitor) # Should exit without exception
def test_monitor_cert_loop_handles_unexpected_exception(self):
"""Test monitor_cert_loop handles exceptions and sleeps before retry."""
fake_monitor = mock.Mock()
# Raise Exception once, then GreenletExit to break loop
fake_monitor.start_watch.side_effect = [Exception("exc"), greenlet.GreenletExit]
mock_sleep = self._mock_object(time, "sleep")
self.manager.monitor_cert_loop(fake_monitor)
self.assertTrue(mock_sleep.called)

View File

@@ -0,0 +1,226 @@
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import time
import eventlet
eventlet.monkey_patch(os=False)
# pylint: disable=wrong-import-position
from dccertmon.common import ( # noqa: E402
certificate_monitor_manager as cert_mon_manager,
)
from dccertmon.common.service import CertificateMonitorService # noqa: E402
from dccertmon.common.subcloud_audit_queue import ( # noqa: E402
NotificationAuditQueue,
SubcloudAuditData,
SubcloudAuditException,
)
from dccertmon.tests.base import DCCertMonTestCase # noqa: E402
# pylint: enable=wrong-import-position
# Shared function for mocking certificate retrieval
audit_order = []
first_started = eventlet.event.Event()
AUDIT_SLEEP_TIME = 3
def slow_get_cert(*args, **kwargs):
"""Returns a side-effect function that simulates a delayed cert fetch.
Records the start time of the audit in `audit_order`.
If `first_started` is provided, it will be triggered once the first audit starts.
"""
ts = time.time()
audit_order.append(ts)
if len(audit_order) == 1:
first_started.send()
# Simulate long-running audit for holding the lock
eventlet.sleep(AUDIT_SLEEP_TIME)
return None
class NotificationAuditQueueTestCase(DCCertMonTestCase):
def setUp(self):
super().setUp()
self.queue = NotificationAuditQueue()
def tearDown(self):
self.queue = None
super().tearDown()
def test_enqueue_single(self):
item = SubcloudAuditData("subcloud1")
self.queue.enqueue(item)
self.assertEqual(self.queue.qsize(), 1)
self.assertTrue(self.queue.contains("subcloud1"))
def test_enqueue_duplicate_raises(self):
item = SubcloudAuditData("subcloud1")
self.queue.enqueue(item)
self.assertRaises(SubcloudAuditException, self.queue.enqueue, item)
def test_enqueue_with_timestamp_ordering(self):
items = [SubcloudAuditData(f"subcloud{i}") for i in range(3)]
timestamp = int(time.time())
self.queue.enqueue(items[2], timestamp=timestamp + 20)
self.queue.enqueue(items[0], timestamp=timestamp + 0)
self.queue.enqueue(items[1], timestamp=timestamp + 10)
first = self.queue.get()[1]
second = self.queue.get()[1]
third = self.queue.get()[1]
self.assertEqual(first.name, "subcloud0")
self.assertEqual(second.name, "subcloud1")
self.assertEqual(third.name, "subcloud2")
def test_contains_and_qsize(self):
self.assertFalse(self.queue.contains("subcloudX"))
self.assertEqual(self.queue.qsize(), 0)
item = SubcloudAuditData("subcloudX")
self.queue.enqueue(item)
self.assertTrue(self.queue.contains("subcloudX"))
self.assertEqual(self.queue.qsize(), 1)
class NotificationAuditBehaviorTestCase(DCCertMonTestCase):
def setUp(self):
super().setUp()
global audit_order, first_started
audit_order = []
first_started = eventlet.event.Event()
self.manager = cert_mon_manager.CertificateMonitorManager()
self.manager.sc_audit_pool = None # Force serial execution
self.service = CertificateMonitorService()
self.service.manager = self.manager
# Store common mocks as instance attributes
self.mock_get_subcloud = self._mock_object(
cert_mon_manager.utils, "get_subcloud"
)
self.mock_is_subcloud_online = self._mock_object(
cert_mon_manager.utils, "is_subcloud_online"
)
self.mock_get_dc_token = self._mock_object(
cert_mon_manager.utils, "get_dc_token"
)
self.mock_slow_get_cert = self._mock_object(
cert_mon_manager.utils, "get_endpoint_certificate"
)
self.mock_get_subcloud.return_value = {
"name": "subcloud",
"deploy-status": "complete",
"availability-status": "online",
"management-start-ip": "1.2.3.4",
}
self.mock_is_subcloud_online.return_value = True
self.mock_get_dc_token.return_value = "fake-token"
def test_subcloud_added_to_notification_queue(self):
"""Ensure subcloud is enqueued when marked online."""
subcloud = "subcloud1"
self.assertFalse(self.manager.sc_notify_audit_queue.contains(subcloud))
self.manager.audit_subcloud(subcloud, self.manager.sc_notify_audit_queue)
self.assertTrue(self.manager.sc_notify_audit_queue.contains(subcloud))
def test_failed_audit_requeues_with_delay(self):
"""Ensure that an audit failure requeues the subcloud with delay."""
subcloud = "subcloud2"
audit_data = SubcloudAuditData(subcloud)
self.manager.sc_notify_audit_queue.enqueue(audit_data)
# Patch internal utils to simulate failure in cert retrieval
self.mock_slow_get_cert.side_effect = Exception("fail")
_, item = self.manager.sc_notify_audit_queue.get()
self.manager._subcloud_audit(self.manager.sc_notify_audit_queue, item)
# The item should have been re-enqueued with now+60
self.assertTrue(self.manager.sc_notify_audit_queue.contains(subcloud))
next_timestamp, _ = self.manager.sc_notify_audit_queue.queue[0]
now = int(time.time())
self.assertGreaterEqual(next_timestamp, now + 59)
def test_audit_same_subclouds_is_serialized(self):
"""Ensure audits for the same subcloud run sequentially using the lock."""
subcloud = "subcloud-lock-test"
item1 = SubcloudAuditData(subcloud)
item2 = SubcloudAuditData(subcloud)
self.manager.sc_audit_queue.enqueue(item1)
self.manager.sc_notify_audit_queue.enqueue(item2)
self.mock_slow_get_cert.side_effect = slow_get_cert
# Spawn first audit and wait until it starts (acquires the lock)
t1 = eventlet.spawn(
self.manager.do_subcloud_audit, self.manager.sc_audit_queue, item1
)
first_started.wait(timeout=5)
# Spawn second audit while first is still holding the lock
t2 = eventlet.spawn(
self.manager.do_subcloud_audit,
self.manager.sc_notify_audit_queue,
item2,
)
eventlet.sleep(1)
# Assert that the second audit hasn't started yet
self.assertEqual(
len(audit_order),
1,
"Second audit should still be waiting for the lock",
)
# Wait for both audits to complete
t1.wait()
t2.wait()
self.assertEqual(len(audit_order), 2, "Both audits should have run")
self.assertLess(
audit_order[0] + AUDIT_SLEEP_TIME,
audit_order[1],
"Second audit should have started after the first released the lock",
)
def test_audit_different_subclouds_run_concurrently(self):
"""Ensure audits for different subclouds are not blocked by the lock."""
subcloud1 = "subcloud-lock-test1"
subcloud2 = "subcloud-lock-test2"
item1 = SubcloudAuditData(subcloud1)
item2 = SubcloudAuditData(subcloud2)
self.manager.sc_audit_queue.enqueue(item1)
self.manager.sc_notify_audit_queue.enqueue(item2)
self.mock_slow_get_cert.side_effect = slow_get_cert
# Spawn both audits for different subclouds simultaneously
t1 = eventlet.spawn(
self.manager.do_subcloud_audit, self.manager.sc_audit_queue, item1
)
t2 = eventlet.spawn(
self.manager.do_subcloud_audit,
self.manager.sc_notify_audit_queue,
item2,
)
eventlet.sleep(1)
# Both audits should have started within a short time
self.assertEqual(len(audit_order), 2, "Both audits should have started")
t1.wait()
t2.wait()
self.assertLess(
abs(audit_order[0] - audit_order[1]),
AUDIT_SLEEP_TIME,
"Audits for different subclouds should run concurrently",
)

View File

@@ -0,0 +1,127 @@
# Copyright (c) 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
import time
from dccertmon.common.subcloud_audit_queue import (
SubcloudAuditData,
SubcloudAuditException,
SubcloudAuditPriorityQueue,
)
from dccertmon.tests.base import DCCertMonTestCase
class SubcloudAuditQueueTestCase(DCCertMonTestCase):
def setUp(self):
super().setUp()
self.sc_audit_queue = SubcloudAuditPriorityQueue()
def tearDown(self):
self.sc_audit_queue = None
super().tearDown()
def test_audit_item(self):
item1 = SubcloudAuditData("item1")
self.assertEqual(item1.name, "item1")
self.assertEqual(item1.audit_count, 0)
self.assertEqual(item1, SubcloudAuditData("item1", 0))
self.assertEqual(item1, SubcloudAuditData("item1", 1))
def test_subcloud_audit_queue_single(self):
sc_name = "subcloud1"
subcloud = SubcloudAuditData(sc_name)
self.sc_audit_queue.enqueue(subcloud)
self.assertTrue(self.sc_audit_queue.contains(sc_name))
self.assertEqual(self.sc_audit_queue.qsize(), 1)
# peek using the underlying queue
_, sc_audit_item1 = self.sc_audit_queue.queue[0]
self.assertEqual(sc_audit_item1.name, sc_name)
self.assertEqual(sc_audit_item1.audit_count, 1)
def test_subcloud_audit_queue_multiple(self):
subclouds = [SubcloudAuditData("subcloud%s" % i) for i in range(20)]
delay = 0
for i in range(20):
self.sc_audit_queue.enqueue(subclouds[i], delay)
delay += 10
self.assertEqual(self.sc_audit_queue.qsize(), 20)
_, first = self.sc_audit_queue.get()
self.assertEqual(first.name, subclouds[0].name)
self.assertFalse(self.sc_audit_queue.contains(subclouds[0].name))
self.assertEqual(self.sc_audit_queue.qsize(), 19)
# re-enqueue with no delay; it should come out first again
self.sc_audit_queue.enqueue(first, 0)
_, first = self.sc_audit_queue.get()
self.assertEqual(first.name, subclouds[0].name)
timestamp, second = self.sc_audit_queue.get()
self.assertEqual(second.name, subclouds[1].name)
# The time now should be well under the timestamp for this item
self.assertLess(int(time.time()), timestamp)
def test_subcloud_audit_queue_custom_timestamp(self):
subclouds = [SubcloudAuditData("subcloud%s" % i) for i in range(20)]
timestamp = 0
for i in range(20):
self.sc_audit_queue.enqueue(subclouds[i], timestamp=timestamp)
timestamp += 10
self.assertEqual(self.sc_audit_queue.qsize(), 20)
_, first = self.sc_audit_queue.get()
self.assertEqual(first.name, subclouds[0].name)
self.assertFalse(self.sc_audit_queue.contains(subclouds[0].name))
self.assertEqual(self.sc_audit_queue.qsize(), 19)
# re-enqueue with no delay; it should come out first again
self.sc_audit_queue.enqueue(first, timestamp=0)
_, first = self.sc_audit_queue.get()
self.assertEqual(first.name, subclouds[0].name)
self.assertEqual(first, subclouds[0])
self.sc_audit_queue.enqueue(subclouds[0], timestamp=10000)
prev_timestamp = 0
for i in range(19):
next_timestamp, next_item = self.sc_audit_queue.get()
self.assertLess(prev_timestamp, next_timestamp)
self.assertNotEqual(next_item.name, subclouds[0].name)
prev_timestamp = next_timestamp
next_timestamp, next_item = self.sc_audit_queue.get()
self.assertEqual(next_timestamp, 10000)
self.assertEqual(next_item.name, subclouds[0].name)
def test_subcloud_audit_requeue(self):
subclouds = [SubcloudAuditData("subcloud%s" % i, 0) for i in range(20)]
timestamp = 0
for i in range(20):
self.sc_audit_queue.enqueue(subclouds[i], timestamp=timestamp)
timestamp += 10
self.assertEqual(self.sc_audit_queue.qsize(), 20)
self.assertTrue(self.sc_audit_queue.contains(subclouds[0].name))
got_exception = False
try:
self.sc_audit_queue.enqueue(subclouds[0], timestamp=timestamp)
except SubcloudAuditException:
got_exception = True
self.assertTrue(got_exception)
got_exception = False
try:
self.sc_audit_queue.enqueue(
subclouds[0], timestamp=timestamp, allow_requeue=True
)
except SubcloudAuditException:
got_exception = True
self.assertFalse(got_exception)
count = 0
for name in self.sc_audit_queue.enqueued_subcloud_names:
if name == subclouds[0].name:
count += 1
self.assertEqual(count, 2)

View File

@@ -17,6 +17,7 @@ fm_api_src_dir = {[dc]stx_fault_dir}/fm-api/source
nfv_client_src_dir = ../../nfv/nfv/nfv-client
tsconfig_src_dir = {[dc]stx_config_dir}/tsconfig/tsconfig
software_src_dir = ../../update/software
sysinv_src_dir = {[dc]stx_config_dir}/sysinv/sysinv/sysinv
[testenv]
basepython = python3.9
@@ -40,6 +41,7 @@ deps =
-e{[dc]nfv_client_src_dir}
-e{[dc]tsconfig_src_dir}
-e{[dc]software_src_dir}
-e{[dc]sysinv_src_dir}
allowlist_externals =
rm
find