diff --git a/.zuul.yaml b/.zuul.yaml index 788105ccf..61667fd26 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -6,6 +6,7 @@ - openstack-python-jobs - openstack-python35-jobs - openstack-python36-jobs + - openstack-python37-jobs - publish-openstack-docs-pti - release-notes-jobs-python3 check: diff --git a/lower-constraints.txt b/lower-constraints.txt index 7bef8b7a8..17e2a20ef 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -34,7 +34,7 @@ fixtures==3.0.0 flake8==2.5.5 freezegun==0.3.10 future==0.16.0 -futurist==1.6.0 +futurist==1.8.0 gitdb2==2.0.3 GitPython==2.1.8 gnocchiclient==7.0.1 diff --git a/requirements.txt b/requirements.txt index cc5b2e83f..365539840 100644 --- a/requirements.txt +++ b/requirements.txt @@ -47,3 +47,4 @@ WebOb>=1.8.5 # MIT WSME>=0.9.2 # MIT networkx>=1.11 # BSD microversion_parse>=0.2.1 # Apache-2.0 +futurist>=1.8.0 # Apache-2.0 diff --git a/watcher/applier/messaging/trigger.py b/watcher/applier/messaging/trigger.py index 1c4b3a756..03502cd66 100644 --- a/watcher/applier/messaging/trigger.py +++ b/watcher/applier/messaging/trigger.py @@ -16,7 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from concurrent import futures +import futurist from oslo_config import cfg from oslo_log import log @@ -31,7 +31,7 @@ class TriggerActionPlan(object): def __init__(self, applier_manager): self.applier_manager = applier_manager workers = CONF.watcher_applier.workers - self.executor = futures.ThreadPoolExecutor(max_workers=workers) + self.executor = futurist.GreenThreadPoolExecutor(max_workers=workers) def do_launch_action_plan(self, context, action_plan_uuid): try: diff --git a/watcher/applier/workflow_engine/default.py b/watcher/applier/workflow_engine/default.py index 764addb3b..7ae72c920 100644 --- a/watcher/applier/workflow_engine/default.py +++ b/watcher/applier/workflow_engine/default.py @@ -106,7 +106,7 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine): decider=self.decider) e = engines.load( - flow, engine='parallel', + flow, executor='greenthreaded', engine='parallel', max_workers=self.config.max_workers) e.run() diff --git a/watcher/common/scheduling.py b/watcher/common/scheduling.py index 90884d111..6d42d0883 100644 --- a/watcher/common/scheduling.py +++ b/watcher/common/scheduling.py @@ -17,14 +17,41 @@ # limitations under the License. from apscheduler import events +from apscheduler.executors.pool import BasePoolExecutor from apscheduler.schedulers import background +import futurist from oslo_service import service job_events = events +class GreenThreadPoolExecutor(BasePoolExecutor): + """Green thread pool + + An executor that runs jobs in a green thread pool. + Plugin alias: ``threadpool`` + :param max_workers: the maximum number of spawned threads. + """ + + def __init__(self, max_workers=10): + pool = futurist.GreenThreadPoolExecutor(int(max_workers)) + super(GreenThreadPoolExecutor, self).__init__(pool) + +executors = { + 'default': GreenThreadPoolExecutor(), +} + + class BackgroundSchedulerService(service.ServiceBase, background.BackgroundScheduler): + def __init__(self, gconfig={}, **options): + if options is None: + options = {'executors': executors} + else: + if 'executors' not in options.keys(): + options['executors'] = executors + super(BackgroundSchedulerService, self).__init__( + gconfig, **options) def start(self): """Start service.""" diff --git a/watcher/decision_engine/messaging/audit_endpoint.py b/watcher/decision_engine/messaging/audit_endpoint.py index 9f1a15fe2..19492443a 100644 --- a/watcher/decision_engine/messaging/audit_endpoint.py +++ b/watcher/decision_engine/messaging/audit_endpoint.py @@ -16,7 +16,7 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from concurrent import futures +import futurist from oslo_config import cfg from oslo_log import log @@ -34,7 +34,7 @@ class AuditEndpoint(object): def __init__(self, messaging): self._messaging = messaging - self._executor = futures.ThreadPoolExecutor( + self._executor = futurist.GreenThreadPoolExecutor( max_workers=CONF.watcher_decision_engine.max_workers) self._oneshot_handler = o_handler.OneShotAuditHandler() self._continuous_handler = c_handler.ContinuousAuditHandler().start()