Reuse same paramiko connection recreating on errors

Move connection related code to a common object to allow reuse of the
connection setup code. This helps avoiding unnecessary recreation of
the connection where it is being used multiple times within the same
process for multiple calls.

Change-Id: Idf318ce5c36100e550182e773ce55da69fe35063
This commit is contained in:
Darragh Bailey 2016-12-19 16:37:45 +00:00 committed by Darragh Bailey (electrofelix)
parent 609deaf9a5
commit 500c8d2cc9
1 changed files with 71 additions and 58 deletions

View File

@ -36,50 +36,21 @@ UPDATE_ALLOWED_KEYS = ['description', 'submit-type',
'max-object-size-limit']
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
class GerritConnection(object):
log = logging.getLogger("gerrit.GerritConnection")
def __init__(
self, gerrit, username=None, hostname=None, port=None,
keyfile=None, connection_attempts=-1, retry_delay=5):
"""Create a GerritWatcher.
def __init__(self, username=None, hostname=None, port=29418,
keyfile=None, connection_attempts=-1, retry_delay=5):
:param gerrit: A Gerrit instance to pass events to.
All other parameters are optional and if not supplied are sourced from
the gerrit instance.
"""
super(GerritWatcher, self).__init__()
assert retry_delay >= 0, "Retry delay must be >= 0"
self.username = username or gerrit.username
self.keyfile = keyfile or gerrit.keyfile
self.hostname = hostname or gerrit.hostname
self.port = port or gerrit.port
self.gerrit = gerrit
self.username = username
self.hostname = hostname
self.port = port
self.keyfile = keyfile
self.connection_attempts = int(connection_attempts)
self.retry_delay = float(retry_delay)
self.state = IDLE
def _read(self, fd):
l = fd.readline()
data = json.loads(l)
self.log.debug("Received data from Gerrit event stream: \n%s" %
pprint.pformat(data))
self.gerrit.addEvent(data)
def _listen(self, stdout, stderr):
poll = select.poll()
poll.register(stdout.channel)
while True:
ret = poll.poll()
for (fd, event) in ret:
if fd == stdout.channel.fileno():
if event == select.POLLIN:
self._read(stdout)
else:
raise Exception("event on ssh connection")
def _connect(self):
def connect(self):
"""Attempts to connect and returns the connected client."""
def _make_client():
@ -131,6 +102,51 @@ class GerritWatcher(threading.Thread):
else:
raise e
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
def __init__(
self, gerrit, username=None, hostname=None, port=None,
keyfile=None, connection_attempts=-1, retry_delay=5):
"""Create a GerritWatcher.
:param gerrit: A GerritConnection instance to pass events to.
All other parameters are optional and if not supplied are sourced from
the gerrit instance.
"""
super(GerritWatcher, self).__init__()
self.connection = GerritConnection(
username or gerrit.username,
hostname or gerrit.hostname,
port or gerrit.port,
keyfile or gerrit.keyfile,
connection_attempts,
retry_delay
)
self.gerrit = gerrit
self.state = IDLE
def _read(self, fd):
l = fd.readline()
data = json.loads(l)
self.log.debug("Received data from Gerrit event stream: \n%s" %
pprint.pformat(data))
self.gerrit.addEvent(data)
def _listen(self, stdout, stderr):
poll = select.poll()
poll.register(stdout.channel)
while True:
ret = poll.poll()
for (fd, event) in ret:
if fd == stdout.channel.fileno():
if event == select.POLLIN:
self._read(stdout)
else:
raise Exception("event on ssh connection")
def _consume(self, client):
"""Consumes events using the given client."""
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
@ -147,7 +163,7 @@ class GerritWatcher(threading.Thread):
def _run(self):
self.state = CONNECTING
client = self._connect()
client = self.connection.connect()
self.state = CONNECTED
try:
self._consume(client)
@ -178,17 +194,15 @@ class Gerrit(object):
log = logging.getLogger("gerrit.Gerrit")
def __init__(self, hostname, username, port=29418, keyfile=None):
self.username = username
self.hostname = hostname
self.port = port
self.keyfile = keyfile
self.connection = GerritConnection(username, hostname, port, keyfile)
self.client = None
self.watcher_thread = None
self.event_queue = None
self.installed_plugins = None
def startWatching(self, connection_attempts=-1, retry_delay=5):
self.event_queue = six.moves.queue.Queue()
watcher = GerritWatcher(self,
watcher = GerritWatcher(self.connection,
connection_attempts=connection_attempts,
retry_delay=retry_delay)
self.watcher_thread = watcher
@ -396,17 +410,13 @@ class Gerrit(object):
return data
def _ssh(self, command):
try:
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(self.hostname,
username=self.username,
port=self.port,
key_filename=self.keyfile)
if self.client is None:
self.client = self.connection.connect()
self.log.debug("SSH command:\n%s" % command)
stdin, stdout, stderr = client.exec_command(command)
stdin, stdout, stderr = self.client.exec_command(command)
out = stdout.read()
self.log.debug("SSH received stdout:\n%s" % out)
@ -416,9 +426,12 @@ class Gerrit(object):
err = stderr.read()
self.log.debug("SSH received stderr:\n%s" % err)
finally:
if client:
client.close()
if ret:
raise Exception("Gerrit error executing %s" % command)
return (out, err)
if ret:
raise Exception("Gerrit error executing %s" % command)
return (out, err)
except Exception:
if self.client:
self.client.close()
self.client = None
raise