Move AgentStatusCheckWorker to PeriodicWorker
Renames the AgentStatusCheckWorker class to PeriodicWorker and moves it into the worker module since there isn't anything agent-specific about it and it can be used for other periodic jobs server side. TrivialFix Change-Id: Ic7a55ef534f64e6bfc60ae38bb0e139a0078510b
This commit is contained in:
parent
7c7e2418a2
commit
8eac5e2db7
@ -21,7 +21,6 @@ from neutron_lib import constants
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import timeutils
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import exc
|
||||
@ -72,37 +71,6 @@ AGENTS_SCHEDULER_OPTS = [
|
||||
cfg.CONF.register_opts(AGENTS_SCHEDULER_OPTS)
|
||||
|
||||
|
||||
class AgentStatusCheckWorker(neutron_worker.NeutronWorker):
|
||||
|
||||
def __init__(self, check_func, interval, initial_delay):
|
||||
super(AgentStatusCheckWorker, self).__init__(worker_process_count=0)
|
||||
|
||||
self._check_func = check_func
|
||||
self._loop = None
|
||||
self._interval = interval
|
||||
self._initial_delay = initial_delay
|
||||
|
||||
def start(self):
|
||||
super(AgentStatusCheckWorker, self).start()
|
||||
if self._loop is None:
|
||||
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
|
||||
self._loop.start(interval=self._interval,
|
||||
initial_delay=self._initial_delay)
|
||||
|
||||
def wait(self):
|
||||
if self._loop is not None:
|
||||
self._loop.wait()
|
||||
|
||||
def stop(self):
|
||||
if self._loop is not None:
|
||||
self._loop.stop()
|
||||
|
||||
def reset(self):
|
||||
self.stop()
|
||||
self.wait()
|
||||
self.start()
|
||||
|
||||
|
||||
class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
||||
"""Common class for agent scheduler mixins."""
|
||||
|
||||
@ -148,9 +116,8 @@ class AgentSchedulerDbMixin(agents_db.AgentDbMixin):
|
||||
# neutron server first starts. random to offset multiple servers
|
||||
initial_delay = random.randint(interval, interval * 2)
|
||||
|
||||
check_worker = AgentStatusCheckWorker(function, interval,
|
||||
initial_delay)
|
||||
|
||||
check_worker = neutron_worker.PeriodicWorker(function, interval,
|
||||
initial_delay)
|
||||
self.add_worker(check_worker)
|
||||
|
||||
def agent_dead_limit_seconds(self):
|
||||
|
@ -29,7 +29,6 @@ from neutron.api.rpc.handlers import dhcp_rpc
|
||||
from neutron.api.rpc.handlers import l3_rpc
|
||||
from neutron.api.v2 import attributes
|
||||
from neutron.common import constants as n_const
|
||||
from neutron.common import utils
|
||||
from neutron import context
|
||||
from neutron.db import agents_db
|
||||
from neutron.db import agentschedulers_db
|
||||
@ -40,7 +39,6 @@ from neutron.extensions import dhcpagentscheduler
|
||||
from neutron.extensions import l3agentscheduler
|
||||
from neutron import manager
|
||||
from neutron.plugins.common import constants as service_constants
|
||||
from neutron.tests import base
|
||||
from neutron.tests.common import helpers
|
||||
from neutron.tests import fake_notifier
|
||||
from neutron.tests import tools
|
||||
@ -1453,31 +1451,6 @@ class OvsDhcpAgentNotifierTestCase(test_agent.AgentDBTestMixIn,
|
||||
self.assertTrue(self._is_schedule_network_called(device_id))
|
||||
|
||||
|
||||
class AgentStatusCheckWorkerTestCase(base.BaseTestCase):
|
||||
|
||||
def test_agent_worker_lifecycle(self):
|
||||
check_function = mock.Mock()
|
||||
worker = agentschedulers_db.AgentStatusCheckWorker(
|
||||
check_function, interval=1, initial_delay=1)
|
||||
self.addCleanup(worker.stop)
|
||||
worker.wait()
|
||||
self.assertFalse(check_function.called)
|
||||
worker.start()
|
||||
utils.wait_until_true(
|
||||
lambda: check_function.called,
|
||||
timeout=5,
|
||||
exception=RuntimeError("check_function not called"))
|
||||
worker.stop()
|
||||
check_function.reset_mock()
|
||||
worker.wait()
|
||||
self.assertFalse(check_function.called)
|
||||
worker.reset()
|
||||
utils.wait_until_true(
|
||||
lambda: check_function.called,
|
||||
timeout=5,
|
||||
exception=RuntimeError("check_function not called"))
|
||||
|
||||
|
||||
class OvsL3AgentNotifierTestCase(test_l3.L3NatTestCaseMixin,
|
||||
test_agent.AgentDBTestMixIn,
|
||||
AgentSchedulerTestMixIn,
|
||||
|
44
neutron/tests/unit/test_worker.py
Normal file
44
neutron/tests/unit/test_worker.py
Normal file
@ -0,0 +1,44 @@
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutron.common import utils
|
||||
from neutron.tests import base
|
||||
from neutron import worker as neutron_worker
|
||||
|
||||
|
||||
class PeriodicWorkerTestCase(base.BaseTestCase):
|
||||
|
||||
def test_periodic_worker_lifecycle(self):
|
||||
check_function = mock.Mock()
|
||||
worker = neutron_worker.PeriodicWorker(
|
||||
check_function, interval=1, initial_delay=1)
|
||||
self.addCleanup(worker.stop)
|
||||
worker.wait()
|
||||
self.assertFalse(check_function.called)
|
||||
worker.start()
|
||||
utils.wait_until_true(
|
||||
lambda: check_function.called,
|
||||
timeout=5,
|
||||
exception=RuntimeError("check_function not called"))
|
||||
worker.stop()
|
||||
check_function.reset_mock()
|
||||
worker.wait()
|
||||
self.assertFalse(check_function.called)
|
||||
worker.reset()
|
||||
utils.wait_until_true(
|
||||
lambda: check_function.called,
|
||||
timeout=5,
|
||||
exception=RuntimeError("check_function not called"))
|
@ -10,6 +10,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_service import loopingcall
|
||||
from oslo_service import service
|
||||
|
||||
from neutron.callbacks import events
|
||||
@ -90,3 +91,35 @@ class NeutronWorker(service.ServiceBase):
|
||||
def start(self):
|
||||
if self.worker_process_count > 0:
|
||||
registry.notify(resources.PROCESS, events.AFTER_INIT, self.start)
|
||||
|
||||
|
||||
class PeriodicWorker(NeutronWorker):
|
||||
"""A worker that runs a function at a fixed interval."""
|
||||
|
||||
def __init__(self, check_func, interval, initial_delay):
|
||||
super(PeriodicWorker, self).__init__(worker_process_count=0)
|
||||
|
||||
self._check_func = check_func
|
||||
self._loop = None
|
||||
self._interval = interval
|
||||
self._initial_delay = initial_delay
|
||||
|
||||
def start(self):
|
||||
super(PeriodicWorker, self).start()
|
||||
if self._loop is None:
|
||||
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
|
||||
self._loop.start(interval=self._interval,
|
||||
initial_delay=self._initial_delay)
|
||||
|
||||
def wait(self):
|
||||
if self._loop is not None:
|
||||
self._loop.wait()
|
||||
|
||||
def stop(self):
|
||||
if self._loop is not None:
|
||||
self._loop.stop()
|
||||
|
||||
def reset(self):
|
||||
self.stop()
|
||||
self.wait()
|
||||
self.start()
|
||||
|
Loading…
Reference in New Issue
Block a user