diff --git a/neutron/plugins/ml2/driver_api.py b/neutron/plugins/ml2/driver_api.py index c2216c5ec91..4ca1a3c97cb 100644 --- a/neutron/plugins/ml2/driver_api.py +++ b/neutron/plugins/ml2/driver_api.py @@ -980,10 +980,10 @@ class MechanismDriver(object): pass def get_workers(self): - """Get any NeutronWorker instances that should have their own process + """Get any worker instances that should have their own process Any driver that needs to run processes separate from the API or RPC - workers, can return a sequence of NeutronWorker instances. + workers, can return a sequence of worker instances. """ return () diff --git a/neutron/service.py b/neutron/service.py index 9c71238c786..7c83aa2f821 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -22,6 +22,7 @@ from neutron_lib.callbacks import registry from neutron_lib.callbacks import resources from neutron_lib import context from neutron_lib.plugins import directory +from neutron_lib import worker as neutron_worker from oslo_concurrency import processutils from oslo_config import cfg from oslo_log import log as logging @@ -38,7 +39,6 @@ from neutron.common import profiler from neutron.common import rpc as n_rpc from neutron.conf import service from neutron.db import api as session -from neutron import worker as neutron_worker from neutron import wsgi @@ -95,7 +95,7 @@ def serve_wsgi(cls): return service -class RpcWorker(neutron_worker.NeutronWorker): +class RpcWorker(neutron_worker.BaseWorker): """Wraps a worker to be handled by ProcessLauncher""" start_listeners_method = 'start_rpc_listeners' @@ -198,7 +198,7 @@ def _get_plugins_workers(): ] -class AllServicesNeutronWorker(neutron_worker.NeutronWorker): +class AllServicesNeutronWorker(neutron_worker.BaseWorker): def __init__(self, services, worker_process_count=1): super(AllServicesNeutronWorker, self).__init__(worker_process_count) self._services = services diff --git a/neutron/tests/functional/test_server.py b/neutron/tests/functional/test_server.py index 7eb66d4663f..b0a28c05efe 100644 --- a/neutron/tests/functional/test_server.py +++ b/neutron/tests/functional/test_server.py @@ -21,6 +21,7 @@ import traceback import httplib2 import mock +from neutron_lib import worker as neutron_worker from oslo_config import cfg import psutil @@ -28,7 +29,6 @@ from neutron.common import utils from neutron import manager from neutron import service from neutron.tests import base -from neutron import worker as neutron_worker from neutron import wsgi @@ -273,7 +273,7 @@ class TestPluginWorker(TestNeutronServer): plugin_workers_launcher.wait() def test_start(self): - class FakeWorker(neutron_worker.NeutronWorker): + class FakeWorker(neutron_worker.BaseWorker): def start(self): pass diff --git a/neutron/worker.py b/neutron/worker.py index 2f434a8d291..97693daaf71 100644 --- a/neutron/worker.py +++ b/neutron/worker.py @@ -10,11 +10,8 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron_lib.callbacks import events -from neutron_lib.callbacks import registry -from neutron_lib.callbacks import resources +from neutron_lib import worker from oslo_service import loopingcall -from oslo_service import service class WorkerSupportServiceMixin(object): @@ -28,12 +25,13 @@ class WorkerSupportServiceMixin(object): return self.__workers def get_workers(self): - """Returns a collection NeutronWorker instances needed by this service + """Returns a collection neutron_lib.worker.BaseWorker instances + needed by this service """ return list(self._workers) def add_worker(self, worker): - """Adds NeutronWorker needed for this service + """Adds neutron_lib.worker.BaseWorker needed for this service If a object needs to define workers thread/processes outside of API/RPC workers then it will call this method to register worker. Should be @@ -42,57 +40,14 @@ class WorkerSupportServiceMixin(object): self._workers.append(worker) def add_workers(self, workers): - """Adds NeutronWorker list needed for this service + """Adds neutron_lib.worker.BaseWorker list needed for this service The same as add_worker but adds a list of workers """ self._workers.extend(workers) -class NeutronWorker(service.ServiceBase): - """Partial implementation of the ServiceBase ABC - - Subclasses will still need to add the other abstract methods defined in - service.ServiceBase. See oslo_service for more details. - - If a plugin needs to handle synchronization with the Neutron database and - do this only once instead of in every API worker, for instance, it would - define a NeutronWorker class and the plugin would have get_workers return - an array of NeutronWorker instances. For example: - class MyPlugin(...): - def get_workers(self): - return [MyPluginWorker()] - - class MyPluginWorker(NeutronWorker): - def start(self): - super(MyPluginWorker, self).start() - do_sync() - """ - - # default class value for case when super().__init__ is not called - _worker_process_count = 1 - - def __init__(self, worker_process_count=_worker_process_count): - """ - Initialize worker - - :param worker_process_count: Defines how many processes to spawn for - worker: - 0 - spawn 1 new worker thread, - 1..N - spawn N new worker processes - """ - self._worker_process_count = worker_process_count - - @property - def worker_process_count(self): - return self._worker_process_count - - def start(self): - if self.worker_process_count > 0: - registry.notify(resources.PROCESS, events.AFTER_INIT, self.start) - - -class PeriodicWorker(NeutronWorker): +class PeriodicWorker(worker.BaseWorker): """A worker that runs a function at a fixed interval.""" def __init__(self, check_func, interval, initial_delay): diff --git a/neutron/wsgi.py b/neutron/wsgi.py index a7b13c7c0b7..b309584e144 100644 --- a/neutron/wsgi.py +++ b/neutron/wsgi.py @@ -24,6 +24,7 @@ import time import eventlet.wsgi from neutron_lib import context from neutron_lib import exceptions as exception +from neutron_lib import worker as neutron_worker from oslo_config import cfg import oslo_i18n from oslo_log import log as logging @@ -43,7 +44,6 @@ from neutron.common import config from neutron.common import exceptions as n_exc from neutron.conf import wsgi as wsgi_config from neutron.db import api -from neutron import worker as neutron_worker CONF = cfg.CONF wsgi_config.register_socket_opts() @@ -59,7 +59,7 @@ def encode_body(body): return encodeutils.to_utf8(body) -class WorkerService(neutron_worker.NeutronWorker): +class WorkerService(neutron_worker.BaseWorker): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, service, application, disable_ssl=False, worker_process_count=0):