neutron/neutron/worker.py
Boden R 7cae827b6b replace WorkerSupportServiceMixin with neutron-lib's WorkerBase
neutron-lib contains the WorkerBase class [1] that is the equivalent of
neutron's WorkerSupportServiceMixin. This patch switches over to
neutron-lib's version.

Note: IIUC no consumers are using WorkerSupportServiceMixin so this
patch can land without waiting for them to sync-up. I've included
the lib impact tag just in case.

NeutronLibImpact

[1] https://review.openstack.org/#/c/424151/
[2] http://codesearch.openstack.org/?q=WorkerSupportServiceMixin

Change-Id: Iacf1b6dfe318e3e6cfc76e61c65d407cf9ae7b36
2017-06-14 06:56:48 -06:00

47 lines
1.5 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib import worker
from oslo_service import loopingcall
class PeriodicWorker(worker.BaseWorker):
"""A worker that runs a function at a fixed interval."""
def __init__(self, check_func, interval, initial_delay):
super(PeriodicWorker, self).__init__(worker_process_count=0)
self._check_func = check_func
self._loop = None
self._interval = interval
self._initial_delay = initial_delay
def start(self):
super(PeriodicWorker, self).start()
if self._loop is None:
self._loop = loopingcall.FixedIntervalLoopingCall(self._check_func)
self._loop.start(interval=self._interval,
initial_delay=self._initial_delay)
def wait(self):
if self._loop is not None:
self._loop.wait()
def stop(self):
if self._loop is not None:
self._loop.stop()
def reset(self):
self.stop()
self.wait()
self.start()