Merge "Cancelling thread start while unit tests running"

This commit is contained in:
Jenkins 2014-03-31 20:31:00 +00:00 committed by Gerrit Code Review
commit 9e90eb6965
2 changed files with 103 additions and 106 deletions

View File

@ -177,7 +177,7 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
plugin)
self.workflow_templates_exists = False
self.completion_handler.setDaemon(True)
self.completion_handler.start()
self.completion_handler_started = False
def create_vip(self, context, vip):
LOG.debug(_('create_vip. vip: %s'), str(vip))
@ -340,6 +340,12 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
context, extended_vip['subnet_id'])
return subnet['network_id']
def _start_completion_handling_thread(self):
if not self.completion_handler_started:
LOG.info(_('Starting operation completion handling thread'))
self.completion_handler.start()
self.completion_handler_started = True
@call_log.log
def _update_workflow(self, wf_name, action,
wf_params, context,
@ -371,6 +377,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
entity_id,
delete=delete)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
def _remove_workflow(self, ids, context):
@ -391,6 +399,8 @@ class LoadBalancerDriver(abstract_driver.LoadBalancerAbstractDriver):
ids['vip'],
delete=True)
LOG.debug(_('Pushing operation %s to the queue'), oper)
self._start_completion_handling_thread()
self.queue.put_nowait(oper)
def _remove_service(self, service_name):
@ -619,24 +629,52 @@ class OperationCompletionHandler(threading.Thread):
self.stoprequest = threading.Event()
self.opers_to_handle_before_rest = 0
def _get_db_status(self, operation, success, messages=None):
"""Get the db_status based on the status of the vdirect operation."""
if not success:
# we have a failure - log it and set the return ERROR as DB state
msg = ', '.join(messages) if messages else "unknown"
error_params = {"operation": operation, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
return constants.ERROR
if operation.delete:
return None
else:
return constants.ACTIVE
def join(self, timeout=None):
self.stoprequest.set()
super(OperationCompletionHandler, self).join(timeout)
def handle_operation_completion(self, oper):
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
reason = result[RESP_REASON],
description = result[RESP_STR]
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = None
if not success:
# failure - log it and set the return ERROR as DB state
if reason or description:
msg = 'Reason:%s. Description:%s' % (reason, description)
else:
msg = "unknown"
error_params = {"operation": oper, "msg": msg}
LOG.error(_('Operation %(operation)s failed. Reason: %(msg)s'),
error_params)
db_status = constants.ERROR
else:
if oper.delete:
_remove_object_from_db(self.plugin, oper)
else:
db_status = constants.ACTIVE
if db_status:
_update_vip_graph_status(self.plugin, oper, db_status)
return completed
def run(self):
oper = None
while not self.stoprequest.isSet():
@ -653,31 +691,7 @@ class OperationCompletionHandler(threading.Thread):
str(oper))
# check the status - if oper is done: update the db ,
# else push the oper again to the queue
result = self.rest_client.call('GET',
oper.operation_url,
None,
None)
completed = result[RESP_DATA]['complete']
if completed:
# operation is done - update the DB with the status
# or delete the entire graph from DB
success = result[RESP_DATA]['success']
sec_to_completion = time.time() - oper.creation_time
debug_data = {'oper': oper,
'sec_to_completion': sec_to_completion,
'success': success}
LOG.debug(_('Operation %(oper)s is completed after '
'%(sec_to_completion)d sec '
'with success status: %(success)s :'),
debug_data)
db_status = self._get_db_status(oper, success)
if db_status:
_update_vip_graph_status(
self.plugin, oper, db_status)
else:
_remove_object_from_db(
self.plugin, oper)
else:
if not self.handle_operation_completion(oper):
LOG.debug(_('Operation %s is not completed yet..') % oper)
# Not completed - push to the queue again
self.queue.put_nowait(oper)

View File

@ -16,10 +16,10 @@
#
# @author: Avishay Balderman, Radware
import Queue
import re
import contextlib
import eventlet
import mock
from neutron import context
@ -34,8 +34,16 @@ from neutron.tests.unit.db.loadbalancer import test_db_loadbalancer
GET_200 = ('/api/workflow/', '/api/service/', '/api/workflowTemplate')
def rest_call_function_mock(action, resource, data, headers, binary=False):
class QueueMock(Queue.Queue):
def __init__(self, completion_handler):
self.completion_handler = completion_handler
super(QueueMock, self).__init__()
def put_nowait(self, oper):
self.completion_handler(oper)
def rest_call_function_mock(action, resource, data, headers, binary=False):
if rest_call_function_mock.RESPOND_WITH_ERROR:
return 400, 'error_status', 'error_description', None
@ -107,13 +115,24 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
rest_call_function_mock.__dict__.update(
{'TEMPLATES_MISSING': False})
self.rest_call_mock = mock.Mock(name='rest_call_mock',
side_effect=rest_call_function_mock,
spec=self.plugin_instance.
drivers['radware'].
rest_client.call)
self.operation_completer_start_mock = mock.Mock(
return_value=None)
self.operation_completer_join_mock = mock.Mock(
return_value=None)
self.driver_rest_call_mock = mock.Mock(
side_effect=rest_call_function_mock)
radware_driver = self.plugin_instance.drivers['radware']
radware_driver.rest_client.call = self.rest_call_mock
radware_driver.completion_handler.start = (
self.operation_completer_start_mock)
radware_driver.completion_handler.join = (
self.operation_completer_join_mock)
radware_driver.rest_client.call = self.driver_rest_call_mock
radware_driver.completion_handler.rest_client.call = (
self.driver_rest_call_mock)
radware_driver.queue = QueueMock(
radware_driver.completion_handler.handle_operation_completion)
self.addCleanup(radware_driver.completion_handler.join)
@ -128,7 +147,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
def test_create_vip_failure(self):
"""Test the rest call failure handling by Exception raising."""
self.rest_call_mock.reset_mock()
with self.network(do_delete=False) as network:
with self.subnet(network=network, do_delete=False) as subnet:
with self.pool(no_delete=True, provider='radware') as pool:
@ -155,9 +173,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
{'vip': vip_data})
def test_create_vip(self):
self.skipTest("Skipping test till bug 1288312 is fixed")
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as pool:
vip_data = {
@ -211,10 +226,8 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.call('GET', '/api/workflow/' +
pool['pool']['id'], None, None)
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
# sleep to wait for the operation completion
eventlet.greenthread.sleep(0)
self.driver_rest_call_mock.assert_has_calls(calls,
any_order=True)
#Test DB
new_vip = self.plugin_instance.get_vip(
@ -232,12 +245,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.call('DELETE', u'/api/workflow/' + pool['pool']['id'],
None, None)
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
# need to wait some time to allow driver to delete vip
eventlet.greenthread.sleep(1)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
def test_update_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
vip_data = {
@ -268,15 +279,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
'/action/BaseCreate',
mock.ANY, driver.TEMPLATE_HEADER),
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
updated_vip = self.plugin_instance.get_vip(
context.get_admin_context(), vip['id'])
self.assertEqual(updated_vip['status'],
constants.PENDING_UPDATE)
# sleep to wait for the operation completion
eventlet.greenthread.sleep(1)
updated_vip = self.plugin_instance.get_vip(
context.get_admin_context(), vip['id'])
self.assertEqual(updated_vip['status'], constants.ACTIVE)
@ -286,7 +291,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
context.get_admin_context(), vip['id'])
def test_delete_vip_failure(self):
self.rest_call_mock.reset_mock()
plugin = self.plugin_instance
with self.network(do_delete=False) as network:
@ -306,8 +310,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
context.get_admin_context(), hm, pool['pool']['id']
)
eventlet.greenthread.sleep(1)
rest_call_function_mock.__dict__.update(
{'RESPOND_WITH_ERROR': True})
@ -333,7 +335,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.assertEqual(u_phm['status'], constants.ACTIVE)
def test_delete_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
vip_data = {
@ -360,14 +361,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.call('DELETE', '/api/workflow/' + pool['pool']['id'],
None, None)
]
self.rest_call_mock.assert_has_calls(calls, any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
self.assertRaises(loadbalancer.VipNotFound,
self.plugin_instance.get_vip,
context.get_admin_context(), vip['id'])
def test_update_pool(self):
self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool() as pool:
del pool['pool']['provider']
@ -380,7 +381,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.assertEqual(pool_db['status'], constants.PENDING_UPDATE)
def test_delete_pool_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware', no_delete=True) as pool:
with self.vip(pool=pool, subnet=subnet):
@ -390,7 +390,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
pool['pool']['id'])
def test_create_member_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.vip(pool=p, subnet=subnet):
@ -407,11 +406,10 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(calls,
any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
def test_update_member_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id']) as member:
@ -432,16 +430,14 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(calls,
any_order=True)
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
updated_member = self.plugin_instance.get_member(
context.get_admin_context(),
member['member']['id']
)
# sleep to wait for the operation completion
eventlet.greenthread.sleep(0)
updated_member = self.plugin_instance.get_member(
context.get_admin_context(),
member['member']['id']
@ -450,7 +446,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
constants.ACTIVE)
def test_update_member_without_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool(provider='radware') as pool:
with self.member(pool_id=pool['pool']['id']) as member:
@ -463,7 +458,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
constants.PENDING_UPDATE)
def test_delete_member_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id'],
@ -474,21 +468,22 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
# wait for being sure the member
# Changed status from PENDING-CREATE
# to ACTIVE
self.rest_call_mock.reset_mock()
eventlet.greenthread.sleep(1)
self.plugin_instance.delete_member(
context.get_admin_context(),
m['member']['id']
)
args, kwargs = self.rest_call_mock.call_args
name, args, kwargs = (
self.driver_rest_call_mock.mock_calls[-2]
)
deletion_post_graph = str(args[2])
self.assertTrue(re.search(
r'.*\'member_address_array\': \[\].*',
deletion_post_graph
))
calls = [
mock.call(
'POST', '/api/workflow/' + p['pool']['id'] +
@ -496,17 +491,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
eventlet.greenthread.sleep(1)
self.assertRaises(loadbalancer.MemberNotFound,
self.plugin_instance.get_member,
context.get_admin_context(),
m['member']['id'])
def test_delete_member_without_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet():
with self.pool(provider='radware') as p:
with self.member(pool_id=p['pool']['id'], no_delete=True) as m:
@ -519,7 +512,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
m['member']['id'])
def test_create_hm_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.health_monitor() as hm:
with self.pool(provider='radware') as pool:
@ -543,11 +535,9 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
eventlet.greenthread.sleep(1)
phm = self.plugin_instance.get_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'], pool['pool']['id']
@ -555,7 +545,6 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
self.assertEqual(phm['status'], constants.ACTIVE)
def test_delete_pool_hm_with_vip(self):
self.rest_call_mock.reset_mock()
with self.subnet() as subnet:
with self.health_monitor(no_delete=True) as hm:
with self.pool(provider='radware') as pool:
@ -565,21 +554,15 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
hm, pool['pool']['id']
)
# Reset mock and
# wait for being sure that status
# changed from PENDING-CREATE
# to ACTIVE
self.rest_call_mock.reset_mock()
eventlet.greenthread.sleep(1)
self.plugin_instance.delete_pool_health_monitor(
context.get_admin_context(),
hm['health_monitor']['id'],
pool['pool']['id']
)
eventlet.greenthread.sleep(1)
name, args, kwargs = self.rest_call_mock.mock_calls[-2]
name, args, kwargs = (
self.driver_rest_call_mock.mock_calls[-2]
)
deletion_post_graph = str(args[2])
self.assertTrue(re.search(
@ -594,7 +577,7 @@ class TestLoadBalancerPlugin(TestLoadBalancerPluginBase):
mock.ANY, driver.TEMPLATE_HEADER
)
]
self.rest_call_mock.assert_has_calls(
self.driver_rest_call_mock.assert_has_calls(
calls, any_order=True)
self.assertRaises(