Refactoring tobiko lockers

Tobiko has used the oslo_concurrency.lockutils.synchronized to lock
execution of functions inter-process/worker.
The synchronized decorator does not support to disable intra-worker
locks. This is a problem for tobiko because there are some functions
that need to be locked between different workers, but not inside
a worker.
Why?
- Tobiko execution (based on pytest) is multi-worker, but not
  multi-thread.
- The creation of Tobiko resources (which is what we need to lock
  to avoid concurrency issues between workers) sometimes depends on the
  creation of other Tobiko resources that are defined in either parent
  or child classes from the first ones, getting blocked due to the
  locks applied.

Change-Id: I0b1c2d707b585fd4e45cad9968c88cedf2932eed
This commit is contained in:
Eduardo Olivares 2024-03-06 13:13:00 +01:00
parent c1bd1e8323
commit 72dbd0242e
13 changed files with 132 additions and 45 deletions

View File

@ -13,6 +13,7 @@ openstacksdk==0.31.2
oslo.concurrency==3.26.0 oslo.concurrency==3.26.0
oslo.config==8.4.0 oslo.config==8.4.0
oslo.log==4.4.0 oslo.log==4.4.0
oslo.utils==4.12.3
packaging==20.4 packaging==20.4
paramiko==2.9.2 paramiko==2.9.2
pbr==5.5.1 pbr==5.5.1

View File

@ -13,6 +13,7 @@ openstacksdk>=0.31.2 # Apache-2.0
oslo.concurrency>=3.26.0 # Apache-2.0 oslo.concurrency>=3.26.0 # Apache-2.0
oslo.config>=8.4.0 # Apache-2.0 oslo.config>=8.4.0 # Apache-2.0
oslo.log>=4.4.0 # Apache-2.0 oslo.log>=4.4.0 # Apache-2.0
oslo.utils>=4.12.3 # Apache-2.0
packaging>=20.4 # Apache-2.0 packaging>=20.4 # Apache-2.0
paramiko>=2.9.2 # LGPLv2.1 paramiko>=2.9.2 # LGPLv2.1
pbr>=5.5.1 # Apache-2.0 pbr>=5.5.1 # Apache-2.0

View File

@ -25,6 +25,7 @@ from tobiko.common import _exception
from tobiko.common import _fixture from tobiko.common import _fixture
from tobiko.common import _ini from tobiko.common import _ini
from tobiko.common import _loader from tobiko.common import _loader
from tobiko.common import _lockutils
from tobiko.common import _logging from tobiko.common import _logging
from tobiko.common import _operation from tobiko.common import _operation
from tobiko.common import _os from tobiko.common import _os
@ -107,6 +108,8 @@ CaptureLogFixture = _logging.CaptureLogFixture
load_object = _loader.load_object load_object = _loader.load_object
load_module = _loader.load_module load_module = _loader.load_module
interworker_synched = _lockutils.interworker_synched
makedirs = _os.makedirs makedirs = _os.makedirs
open_output_file = _os.open_output_file open_output_file = _os.open_output_file
@ -174,4 +177,3 @@ load_yaml = _yaml.load_yaml
from tobiko import config # noqa from tobiko import config # noqa
config.init_config() config.init_config()
LOCK_DIR = os.path.expanduser(config.CONF.tobiko.common.lock_dir)

108
tobiko/common/_lockutils.py Normal file
View File

@ -0,0 +1,108 @@
# Copyright (c) 2024 Red Hat, Inc.
#
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import
import contextlib
import functools
import os
from oslo_concurrency import lockutils
from oslo_log import log
from oslo_utils import reflection
from oslo_utils import timeutils
LOG = log.getLogger(__name__)
def interworker_synched(name):
"""Re-definition of oslo_concurrency.lockutils.synchronized.
Tobiko needs to re-difine this decorator in order to avoid intra-
process/worker locks. This is because tobiko is executed in multiple
processes (using pytest), but each process only runs one thread.
The intra-process lock should not be applied in tobiko because some of the
locked methods could be called recurrently.
Example:
The creation (setup_fixture) of CirrosPeerServerStackFixture depends on the
creation of CirrosServerStackFixture, which is also its parent class.
With intra-process locks, the creation of CirrosServerStackFixture could
not be started (would be locked by the creation of
CirrosPeerServerStackFixture).
"""
def wrap(f):
@functools.wraps(f)
def inner(*args, **kwargs):
t1 = timeutils.now()
t2 = None
gotten = True
f_name = reflection.get_callable_name(f)
try:
with lock(name):
t2 = timeutils.now()
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
'waited %(wait_secs)0.3fs',
{'name': name,
'function': f_name,
'wait_secs': (t2 - t1)})
return f(*args, **kwargs)
except lockutils.AcquireLockFailedException:
gotten = False
finally:
t3 = timeutils.now()
if t2 is None:
held_secs = "N/A"
else:
held_secs = "%0.3fs" % (t3 - t2)
LOG.debug('Lock "%(name)s" "%(gotten)s" by "%(function)s" ::'
' held %(held_secs)s',
{'name': name,
'gotten': 'released' if gotten else 'unacquired',
'function': f_name,
'held_secs': held_secs})
return inner
return wrap
@contextlib.contextmanager
def lock(name):
"""Re-definition of oslo_concurrency.lockutils.lock that does not apply
intra-worker locks. Only inter-worker locks are applied.
"""
from tobiko import config
lock_path = os.path.expanduser(config.CONF.tobiko.common.lock_dir)
LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
try:
ext_lock = lockutils.external_lock(name,
lock_file_prefix='tobiko',
lock_path=lock_path)
gotten = ext_lock.acquire(delay=0.01, blocking=True)
if not gotten:
raise lockutils.AcquireLockFailedException(name)
LOG.debug('Acquired external semaphore "%(lock)s"',
{'lock': name})
try:
yield ext_lock
finally:
ext_lock.release()
finally:
LOG.debug('Releasing lock "%(lock)s"', {'lock': name})

View File

@ -16,9 +16,9 @@ from __future__ import absolute_import
import io import io
import typing import typing
from oslo_concurrency import lockutils
from paramiko import sftp_file from paramiko import sftp_file
import tobiko
from tobiko import config from tobiko import config
from tobiko.openstack import glance from tobiko.openstack import glance
from tobiko.openstack import neutron from tobiko.openstack import neutron
@ -55,11 +55,6 @@ class CirrosImageFixture(glance.URLGlanceImageFixture):
# when using recent Paramiko versions (>= 2.9.2) # when using recent Paramiko versions (>= 2.9.2)
'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']} 'pubkeys': ['rsa-sha2-256', 'rsa-sha2-512']}
@lockutils.synchronized(
'cirros_image_setup_fixture', external=True, lock_path=tobiko.LOCK_DIR)
def setup_fixture(self):
super(CirrosImageFixture, self).setup_fixture()
class CirrosFlavorStackFixture(_nova.FlavorStackFixture): class CirrosFlavorStackFixture(_nova.FlavorStackFixture):
ram = 128 ram = 128
@ -79,6 +74,10 @@ class CirrosServerStackFixture(_nova.ServerStackFixture):
#: We expect CirrOS based servers to be fast to boot #: We expect CirrOS based servers to be fast to boot
is_reachable_timeout: tobiko.Seconds = 300. is_reachable_timeout: tobiko.Seconds = 300.
@tobiko.interworker_synched('cirros_server_setup_fixture')
def setup_fixture(self):
super(CirrosServerStackFixture, self).setup_fixture()
@property @property
def ssh_client(self) -> ssh.SSHClientFixture: def ssh_client(self) -> ssh.SSHClientFixture:
ssh_client = super().ssh_client ssh_client = super().ssh_client

View File

@ -13,8 +13,6 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
from oslo_concurrency import lockutils
import tobiko import tobiko
from tobiko.openstack import neutron from tobiko.openstack import neutron
from tobiko.openstack.stacks import _cirros from tobiko.openstack.stacks import _cirros
@ -31,8 +29,7 @@ class L3haRouterStackFixture(_neutron.RouterStackFixture):
class L3haNetworkStackFixture(_neutron.NetworkBaseStackFixture): class L3haNetworkStackFixture(_neutron.NetworkBaseStackFixture):
gateway_stack = tobiko.required_fixture(L3haRouterStackFixture) gateway_stack = tobiko.required_fixture(L3haRouterStackFixture)
@lockutils.synchronized( @tobiko.interworker_synched('create_l3ha_network_stack')
'create_l3ha_network_stack', external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()

View File

@ -19,7 +19,6 @@ import json
import typing import typing
import netaddr import netaddr
from oslo_concurrency import lockutils
from oslo_log import log from oslo_log import log
import tobiko import tobiko
@ -331,8 +330,7 @@ class SubnetPoolFixture(tobiko.SharedFixture):
if self.subnet_pool: if self.subnet_pool:
tobiko.addme_to_shared_resource(__name__, self.name) tobiko.addme_to_shared_resource(__name__, self.name)
@lockutils.synchronized( @tobiko.interworker_synched('create_subnet_pool')
'create_subnet_pool', external=True, lock_path=tobiko.LOCK_DIR)
def try_create_subnet_pool(self): def try_create_subnet_pool(self):
if not self.subnet_pool: if not self.subnet_pool:
self._subnet_pool = neutron.create_subnet_pool( self._subnet_pool = neutron.create_subnet_pool(
@ -575,8 +573,7 @@ class NetworkBaseStackFixture(heat.HeatStackFixture):
class NetworkStackFixture(NetworkBaseStackFixture): class NetworkStackFixture(NetworkBaseStackFixture):
@lockutils.synchronized( @tobiko.interworker_synched('create_network_stack')
'create_network_stack', external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()
@ -585,8 +582,7 @@ class NetworkNoFipStackFixture(NetworkBaseStackFixture):
"""Extra Network Stack where VMs will not have FIPs""" """Extra Network Stack where VMs will not have FIPs"""
gateway_stack = tobiko.required_fixture(RouterNoSnatStackFixture) gateway_stack = tobiko.required_fixture(RouterNoSnatStackFixture)
@lockutils.synchronized( @tobiko.interworker_synched('create_network_nofip_stack')
'create_network_nofip_stack', external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()
@ -594,9 +590,7 @@ class NetworkNoFipStackFixture(NetworkBaseStackFixture):
@neutron.skip_if_missing_networking_extensions('net-mtu-writable') @neutron.skip_if_missing_networking_extensions('net-mtu-writable')
class NetworkWithNetMtuWriteStackFixture(NetworkBaseStackFixture): class NetworkWithNetMtuWriteStackFixture(NetworkBaseStackFixture):
@lockutils.synchronized( @tobiko.interworker_synched('create_network_withnetmtuwrite_stack')
'create_network_withnetmtuwrite_stack',
external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()
@ -676,8 +670,7 @@ class StatelessSecurityGroupFixture(tobiko.SharedFixture):
if self.security_group: if self.security_group:
tobiko.addme_to_shared_resource(__name__, self.name) tobiko.addme_to_shared_resource(__name__, self.name)
@lockutils.synchronized( @tobiko.interworker_synched('create_security_group')
'create_security_group', external=True, lock_path=tobiko.LOCK_DIR)
def try_create_security_group(self): def try_create_security_group(self):
if not self.security_group: if not self.security_group:
self._security_group = neutron.create_security_group( self._security_group = neutron.create_security_group(

View File

@ -424,6 +424,10 @@ class CloudInitServerStackFixture(ServerStackFixture, ABC):
#: nax SWAP file size in bytes #: nax SWAP file size in bytes
swap_maxsize: typing.Optional[int] = None swap_maxsize: typing.Optional[int] = None
@tobiko.interworker_synched('cloudinit_server_setup_fixture')
def setup_fixture(self):
super(CloudInitServerStackFixture, self).setup_fixture()
@property @property
def is_reachable_timeout(self) -> tobiko.Seconds: def is_reachable_timeout(self) -> tobiko.Seconds:
# I expect cloud-init based servers to be slow to boot # I expect cloud-init based servers to be slow to boot

View File

@ -15,8 +15,6 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
from oslo_concurrency import lockutils
import tobiko import tobiko
from tobiko import config from tobiko import config
from tobiko.openstack import heat from tobiko.openstack import heat
@ -58,8 +56,7 @@ class QosNetworkStackFixture(_neutron.NetworkBaseStackFixture):
value_specs = super().network_value_specs value_specs = super().network_value_specs
return dict(value_specs, qos_policy_id=self.qos_stack.qos_policy_id) return dict(value_specs, qos_policy_id=self.qos_stack.qos_policy_id)
@lockutils.synchronized( @tobiko.interworker_synched('create_qos_network_stack')
'create_qos_network_stack', external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()

View File

@ -15,8 +15,6 @@ from __future__ import absolute_import
import typing import typing
from oslo_concurrency import lockutils
import tobiko import tobiko
from tobiko import config from tobiko import config
from tobiko.openstack import glance from tobiko.openstack import glance
@ -40,12 +38,6 @@ class UbuntuMinimalImageFixture(glance.FileGlanceImageFixture):
disabled_algorithms = CONF.tobiko.ubuntu.disabled_algorithms disabled_algorithms = CONF.tobiko.ubuntu.disabled_algorithms
is_reachable_timeout = CONF.tobiko.nova.ubuntu_is_reachable_timeout is_reachable_timeout = CONF.tobiko.nova.ubuntu_is_reachable_timeout
@lockutils.synchronized(
'ubuntu_minimal_setup_fixture',
external=True, lock_path=tobiko.LOCK_DIR)
def setup_fixture(self):
super(UbuntuMinimalImageFixture, self).setup_fixture()
IPERF3_SERVICE_FILE = """ IPERF3_SERVICE_FILE = """
[Unit] [Unit]

View File

@ -19,7 +19,6 @@ import abc
import typing import typing
import netaddr import netaddr
from oslo_concurrency import lockutils
import tobiko import tobiko
from tobiko import config from tobiko import config
@ -35,8 +34,7 @@ CONF = config.CONF
class VlanNetworkStackFixture(_neutron.NetworkBaseStackFixture): class VlanNetworkStackFixture(_neutron.NetworkBaseStackFixture):
@lockutils.synchronized( @tobiko.interworker_synched('create_vlan_network_stack')
'create_vlan_network_stack', external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()

View File

@ -15,7 +15,6 @@
# under the License. # under the License.
from __future__ import absolute_import from __future__ import absolute_import
from oslo_concurrency import lockutils
from oslo_log import log from oslo_log import log
import testtools import testtools
@ -152,9 +151,7 @@ class RouterInterfaceTestRouter(stacks.RouterStackFixture):
class RouterInterfaceTestNetwork(stacks.NetworkBaseStackFixture): class RouterInterfaceTestNetwork(stacks.NetworkBaseStackFixture):
@lockutils.synchronized( @tobiko.interworker_synched('create_router_interface_network_stack')
'create_router_interface_network_stack',
external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()

View File

@ -20,7 +20,6 @@ import re
import typing import typing
import pytest import pytest
from oslo_concurrency import lockutils
from oslo_log import log from oslo_log import log
import testtools import testtools
@ -211,8 +210,7 @@ class DvrNetworkStackFixture(stacks.NetworkBaseStackFixture):
gateway_stack = tobiko.required_fixture(DvrRouterStackFixture, gateway_stack = tobiko.required_fixture(DvrRouterStackFixture,
setup=False) setup=False)
@lockutils.synchronized( @tobiko.interworker_synched('create_dvr_network_stack')
'create_dvr_network_stack', external=True, lock_path=tobiko.LOCK_DIR)
def setup_stack(self): def setup_stack(self):
super().setup_stack() super().setup_stack()