Prevent hung process/threads when server_endpoint is not reachable
This commit is to address an issue that can cause Shaker to get stuck indefinately waiting for a zeroMQ message that will never arrive. To reproduce set server_endpoint to 10.0.0.1:8080 (assuming nothing is listening and/or this address is unreachable). Shaker will go through launching the heartbeat thread, and creating the heat stack, but then will be stuck at: Waiting for quorum of agents: If debug is turned on, there are no heartbeats coming through. In order to fix this situation this commit sends 3 "tcp pings" to the server_endpoint in order to test it's response before the heartbeat thread and heat stack are created. Change-Id: Ifcfacfb383e7553b53c5b2a20ad39ceccb833c8f
This commit is contained in:
parent
b948585b71
commit
639760a0fa
@ -14,13 +14,15 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
import socket
|
||||||
from oslo_log import log as logging
|
import time
|
||||||
import zmq
|
import zmq
|
||||||
|
|
||||||
|
from oslo_log import log as logging
|
||||||
from shaker.agent import agent
|
from shaker.agent import agent
|
||||||
from shaker.engine import utils
|
from shaker.engine import utils
|
||||||
|
from six.moves import zip_longest
|
||||||
|
from timeit import default_timer as timer
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -29,13 +31,27 @@ HEARTBEAT_AGENT = '__heartbeat'
|
|||||||
|
|
||||||
class MessageQueue(object):
|
class MessageQueue(object):
|
||||||
def __init__(self, endpoint):
|
def __init__(self, endpoint):
|
||||||
_, port = utils.split_address(endpoint)
|
ip, port = utils.split_address(endpoint)
|
||||||
|
|
||||||
context = zmq.Context()
|
context = zmq.Context()
|
||||||
self.socket = context.socket(zmq.REP)
|
self.socket = context.socket(zmq.REP)
|
||||||
self.socket.bind("tcp://*:%s" % port)
|
self.socket.bind("tcp://*:%s" % port)
|
||||||
LOG.info('Listening on *:%s', port)
|
LOG.info('Listening on *:%s', port)
|
||||||
|
|
||||||
|
# Test that endpoint is actually reachable
|
||||||
|
# otherwise the process will get stuck indefinately waiting
|
||||||
|
# for a REQ/REP that will never happen.
|
||||||
|
# The code to support this was adapted from pypi package tcping
|
||||||
|
try:
|
||||||
|
LOG.info("Testing route to %s" % endpoint)
|
||||||
|
ping_test = Ping(ip, port)
|
||||||
|
ping_test.ping(3)
|
||||||
|
if ping_test._success == 0:
|
||||||
|
raise socket.timeout("No valid route to %s" % endpoint)
|
||||||
|
except socket.error as e:
|
||||||
|
LOG.exception(e)
|
||||||
|
raise
|
||||||
|
|
||||||
heartbeat = multiprocessing.Process(
|
heartbeat = multiprocessing.Process(
|
||||||
target=agent.work,
|
target=agent.work,
|
||||||
kwargs=dict(agent_id=HEARTBEAT_AGENT, endpoint=endpoint,
|
kwargs=dict(agent_id=HEARTBEAT_AGENT, endpoint=endpoint,
|
||||||
@ -65,3 +81,85 @@ class MessageQueue(object):
|
|||||||
else:
|
else:
|
||||||
LOG.exception(e)
|
LOG.exception(e)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
class Socket(object):
|
||||||
|
def __init__(self, family, type_, timeout):
|
||||||
|
s = socket.socket(family, type_)
|
||||||
|
s.settimeout(timeout)
|
||||||
|
self._s = s
|
||||||
|
|
||||||
|
def connect(self, host, port=80):
|
||||||
|
self._s.connect((host, int(port)))
|
||||||
|
|
||||||
|
def shutdown(self):
|
||||||
|
self._s.shutdown(socket.SHUT_RD)
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
self._s.close()
|
||||||
|
|
||||||
|
|
||||||
|
class Timer(object):
|
||||||
|
def __init__(self):
|
||||||
|
self._start = 0
|
||||||
|
self._stop = 0
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
self._start = timer()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self._stop = timer()
|
||||||
|
|
||||||
|
def cost(self, funcs, args):
|
||||||
|
self.start()
|
||||||
|
for func, arg in zip_longest(funcs, args):
|
||||||
|
if arg:
|
||||||
|
func(*arg)
|
||||||
|
else:
|
||||||
|
func()
|
||||||
|
|
||||||
|
self.stop()
|
||||||
|
return self._stop - self._start
|
||||||
|
|
||||||
|
|
||||||
|
class Ping(object):
|
||||||
|
def __init__(self, host, port=80, timeout=1):
|
||||||
|
self.timer = Timer()
|
||||||
|
|
||||||
|
self._success = 0
|
||||||
|
self._failed = 0
|
||||||
|
self._conn_times = []
|
||||||
|
self._host = host
|
||||||
|
self._port = port
|
||||||
|
self._timeout = timeout
|
||||||
|
|
||||||
|
def _create_socket(self, family, type_):
|
||||||
|
return Socket(family, type_, self._timeout)
|
||||||
|
|
||||||
|
def ping(self, count=10):
|
||||||
|
for n in range(1, count + 1):
|
||||||
|
s = self._create_socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
try:
|
||||||
|
time.sleep(1)
|
||||||
|
cost_time = self.timer.cost(
|
||||||
|
(s.connect, s.shutdown),
|
||||||
|
((self._host, self._port), None))
|
||||||
|
s_runtime = 1000 * (cost_time)
|
||||||
|
|
||||||
|
LOG.debug("Connected to %s[:%s]: seq=%d time=%.2f ms" % (
|
||||||
|
self._host, self._port, n, s_runtime))
|
||||||
|
|
||||||
|
self._conn_times.append(s_runtime)
|
||||||
|
except socket.timeout:
|
||||||
|
LOG.error("Connected to %s[:%s]: seq=%d time out!" % (
|
||||||
|
self._host, self._port, n))
|
||||||
|
self._failed += 1
|
||||||
|
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
raise KeyboardInterrupt()
|
||||||
|
|
||||||
|
else:
|
||||||
|
self._success += 1
|
||||||
|
|
||||||
|
finally:
|
||||||
|
s.close()
|
||||||
|
Loading…
Reference in New Issue
Block a user