Adapt Watcher to Python3.7

There are some issues with bundle of eventlet, taskflow and
apscheduler on Python 3.7.
According to [0], replace ThreadPoolExecutor with
GreenThreadPoolExecutor

[0]: http://lists.openstack.org/pipermail/openstack-dev/2018-July/132481.html

Co-Authored-By: Canwei Li <li.canwei2@zte.com.cn>

Change-Id: I989d7155eb8764088f91b3ca0d1f47ac6332bf11
This commit is contained in:
Alexander Chadin 2018-10-16 18:13:15 +03:00 committed by licanwei
parent ab03bf9bb0
commit 6dfeeb7359
7 changed files with 35 additions and 6 deletions

View File

@ -6,6 +6,7 @@
- openstack-python-jobs - openstack-python-jobs
- openstack-python35-jobs - openstack-python35-jobs
- openstack-python36-jobs - openstack-python36-jobs
- openstack-python37-jobs
- publish-openstack-docs-pti - publish-openstack-docs-pti
- release-notes-jobs-python3 - release-notes-jobs-python3
check: check:

View File

@ -34,7 +34,7 @@ fixtures==3.0.0
flake8==2.5.5 flake8==2.5.5
freezegun==0.3.10 freezegun==0.3.10
future==0.16.0 future==0.16.0
futurist==1.6.0 futurist==1.8.0
gitdb2==2.0.3 gitdb2==2.0.3
GitPython==2.1.8 GitPython==2.1.8
gnocchiclient==7.0.1 gnocchiclient==7.0.1

View File

@ -47,3 +47,4 @@ WebOb>=1.7.4 # MIT
WSME>=0.9.2 # MIT WSME>=0.9.2 # MIT
networkx>=1.11 # BSD networkx>=1.11 # BSD
microversion_parse>=0.2.1 # Apache-2.0 microversion_parse>=0.2.1 # Apache-2.0
futurist>=1.8.0 # Apache-2.0

View File

@ -16,7 +16,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from concurrent import futures import futurist
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@ -31,7 +31,7 @@ class TriggerActionPlan(object):
def __init__(self, applier_manager): def __init__(self, applier_manager):
self.applier_manager = applier_manager self.applier_manager = applier_manager
workers = CONF.watcher_applier.workers workers = CONF.watcher_applier.workers
self.executor = futures.ThreadPoolExecutor(max_workers=workers) self.executor = futurist.GreenThreadPoolExecutor(max_workers=workers)
def do_launch_action_plan(self, context, action_plan_uuid): def do_launch_action_plan(self, context, action_plan_uuid):
try: try:

View File

@ -106,7 +106,7 @@ class DefaultWorkFlowEngine(base.BaseWorkFlowEngine):
decider=self.decider) decider=self.decider)
e = engines.load( e = engines.load(
flow, engine='parallel', flow, executor='greenthreaded', engine='parallel',
max_workers=self.config.max_workers) max_workers=self.config.max_workers)
e.run() e.run()

View File

@ -17,14 +17,41 @@
# limitations under the License. # limitations under the License.
from apscheduler import events from apscheduler import events
from apscheduler.executors.pool import BasePoolExecutor
from apscheduler.schedulers import background from apscheduler.schedulers import background
import futurist
from oslo_service import service from oslo_service import service
job_events = events job_events = events
class GreenThreadPoolExecutor(BasePoolExecutor):
"""Green thread pool
An executor that runs jobs in a green thread pool.
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
"""
def __init__(self, max_workers=10):
pool = futurist.GreenThreadPoolExecutor(int(max_workers))
super(GreenThreadPoolExecutor, self).__init__(pool)
executors = {
'default': GreenThreadPoolExecutor(),
}
class BackgroundSchedulerService(service.ServiceBase, class BackgroundSchedulerService(service.ServiceBase,
background.BackgroundScheduler): background.BackgroundScheduler):
def __init__(self, gconfig={}, **options):
if options is None:
options = {'executors': executors}
else:
if 'executors' not in options.keys():
options['executors'] = executors
super(BackgroundSchedulerService, self).__init__(
gconfig, **options)
def start(self): def start(self):
"""Start service.""" """Start service."""

View File

@ -16,7 +16,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
# #
from concurrent import futures import futurist
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
@ -34,7 +34,7 @@ class AuditEndpoint(object):
def __init__(self, messaging): def __init__(self, messaging):
self._messaging = messaging self._messaging = messaging
self._executor = futures.ThreadPoolExecutor( self._executor = futurist.GreenThreadPoolExecutor(
max_workers=CONF.watcher_decision_engine.max_workers) max_workers=CONF.watcher_decision_engine.max_workers)
self._oneshot_handler = o_handler.OneShotAuditHandler() self._oneshot_handler = o_handler.OneShotAuditHandler()
self._continuous_handler = c_handler.ContinuousAuditHandler().start() self._continuous_handler = c_handler.ContinuousAuditHandler().start()