Use a thread pool to poll load balancer status

Closes-Bug: #2055161
Change-Id: Ifeb6473c2fd2efb6c8be4608296ba7458ca5b741
This commit is contained in:
Jacky Hu 2019-10-08 11:17:16 +08:00 committed by Tobias Urdin
parent 58fb718540
commit 8cf3e72c9e
2 changed files with 28 additions and 20 deletions

View File

@ -14,7 +14,8 @@
"""API over the neutron LBaaS v2 service.
"""
import _thread as thread
from django.conf import settings
from multiprocessing import pool as mpp
import time
from django.views import generic
@ -26,8 +27,12 @@ from openstack_dashboard.api import neutron
from openstack_dashboard.api.rest import urls
from openstack_dashboard.api.rest import utils as rest_utils
neutronclient = neutron.neutronclient
MAX_THREADS = getattr(settings, 'OCTAVIA_DASHBOARD_MAX_THREADS', 10)
POOL = mpp.ThreadPool(processes=MAX_THREADS)
def _sdk_object_to_list(object):
"""Converts an SDK generator object to a list of dictionaries.
@ -117,13 +122,14 @@ def retry_on_conflict(conn, func, *args, retry_timeout=120, **kwargs):
try:
func(*args, **kwargs)
except exceptions.ConflictException:
thread.start_new_thread(
_retry_on_conflict,
(conn, func, *args),
{'retry_timeout': retry_timeout,
'load_balancer_getter': load_balancer_getter,
'resource_id': resource_id,
**kwargs})
retry_kwargs = {
'retry_timeout': retry_timeout,
'load_balancer_getter': load_balancer_getter,
'resource_id': resource_id,
**kwargs,
}
POOL.apply_async(
_retry_on_conflict, (conn, func, *args), kwds=retry_kwargs)
def listener_get_load_balancer_id(conn, listener_id):
@ -175,7 +181,7 @@ def create_loadbalancer(request):
# active.
args = (request, loadbalancer.id, create_listener)
kwargs = {'from_state': 'PENDING_CREATE'}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
return _get_sdk_object_dict(loadbalancer)
@ -217,7 +223,7 @@ def create_listener(request, **kwargs):
if data.get('pool'):
args = (request, kwargs['loadbalancer_id'], create_pool)
kwargs = {'callback_kwargs': {'listener_id': listener.id}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
return _get_sdk_object_dict(listener)
@ -288,11 +294,11 @@ def create_pool(request, **kwargs):
args = (request, kwargs['loadbalancer_id'], add_member)
kwargs = {'callback_kwargs': {'pool_id': pool.id,
'index': 0}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
elif data.get('monitor'):
args = (request, kwargs['loadbalancer_id'], create_health_monitor)
kwargs = {'callback_kwargs': {'pool_id': pool.id}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
return _get_sdk_object_dict(pool)
@ -398,16 +404,16 @@ def add_member(request, **kwargs):
'members_to_add': members_to_add,
'members_to_delete': kwargs.get('members_to_delete'),
'pool_id': pool_id}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
elif len(members) > index:
args = (request, loadbalancer_id, add_member)
kwargs = {'callback_kwargs': {'pool_id': pool_id,
'index': index}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
elif data.get('monitor'):
args = (request, loadbalancer_id, create_health_monitor)
kwargs = {'callback_kwargs': {'pool_id': pool_id}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
return _get_sdk_object_dict(member)
@ -434,7 +440,7 @@ def remove_member(request, **kwargs):
'members_to_add': kwargs.get('members_to_add'),
'members_to_delete': members_to_delete,
'pool_id': pool_id}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
def update_loadbalancer(request, **kwargs):
@ -494,7 +500,7 @@ def update_listener(request, **kwargs):
if data.get('pool'):
args = (request, loadbalancer_id, update_pool)
thread.start_new_thread(poll_loadbalancer_status, args)
POOL.apply_async(poll_loadbalancer_status, args)
return _get_sdk_object_dict(listener)
@ -578,10 +584,10 @@ def update_pool(request, **kwargs):
'members_to_add': members_to_add,
'members_to_delete': members_to_delete,
'pool_id': pool_id}}
thread.start_new_thread(poll_loadbalancer_status, args, kwargs)
POOL.apply_async(poll_loadbalancer_status, args, kwds=kwargs)
elif data.get('monitor'):
args = (request, loadbalancer_id, update_monitor)
thread.start_new_thread(poll_loadbalancer_status, args)
POOL.apply_async(poll_loadbalancer_status, args)
return _get_sdk_object_dict(pool)
@ -679,7 +685,7 @@ def update_member_list(request, **kwargs):
add_member(request, **kwargs)
elif data.get('monitor'):
args = (request, loadbalancer_id, update_monitor)
thread.start_new_thread(poll_loadbalancer_status, args)
POOL.apply_async(poll_loadbalancer_status, args)
def get_members_to_add_remove(request_member_data, existing_members):

View File

@ -34,3 +34,5 @@ settings.POLICY_FILES.update({
# 'propagate': False,
# }
# })
OCTAVIA_DASHBOARD_MAX_THREADS = 10