Merge "Use paramiko for stream-events."

This commit is contained in:
Jenkins 2012-06-06 16:55:28 +00:00 committed by Gerrit Code Review
commit 2eea2b69dd
2 changed files with 36 additions and 47 deletions

View File

@ -17,80 +17,63 @@ import threading
import select
import json
import time
import subprocess
import Queue
import paramiko
import logging
import pprint
# TODO: switch this to paramiko?
class GerritWatcher(threading.Thread):
log = logging.getLogger("gerrit.GerritWatcher")
def __init__(self, gerrit, username, server, port=29418, keyfile=None):
def __init__(self, gerrit, username, hostname, port=29418, keyfile=None):
threading.Thread.__init__(self)
self.username = username
self.keyfile = keyfile
self.server = server
self.hostname = hostname
self.port = port
self.proc = None
self.poll = select.poll()
self.gerrit = gerrit
def _open(self):
self.log.debug("Opening ssh connection to %s" % self.server)
cmd = ['/usr/bin/ssh', '-p', str(self.port)]
if self.keyfile:
cmd += ['-i', self.keyfile]
cmd += ['-l', self.username, self.server,
'gerrit', 'stream-events']
self.proc = subprocess.Popen(cmd,
bufsize=1,
stdin=None,
stdout=subprocess.PIPE,
stderr=None,
)
self.poll.register(self.proc.stdout)
def _close(self):
self.log.debug("Closing ssh connection")
try:
self.poll.unregister(self.proc.stdout)
except:
pass
try:
self.proc.kill()
except:
pass
self.proc = None
def _read(self):
l = self.proc.stdout.readline()
def _read(self, fd):
l = fd.readline()
data = json.loads(l)
self.log.debug("Received data from Gerrit event stream: \n%s" %
pprint.pformat(data))
self.gerrit.addEvent(data)
def _listen(self):
def _listen(self, stdout, stderr):
poll = select.poll()
poll.register(stdout.channel)
while True:
ret = self.poll.poll()
ret = poll.poll()
for (fd, event) in ret:
if fd == self.proc.stdout.fileno():
if fd == stdout.channel.fileno():
if event == select.POLLIN:
self._read()
self._read(stdout)
else:
raise Exception("event on ssh connection")
def _run(self):
try:
if not self.proc:
self._open()
self._listen()
client = paramiko.SSHClient()
client.load_system_host_keys()
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(self.hostname,
username=self.username,
port=self.port,
key_filename=self.keyfile)
stdin, stdout, stderr = client.exec_command("gerrit stream-events")
self._listen(stdout, stderr)
ret = stdout.channel.recv_exit_status()
self.log.debug("SSH exit status: %s" % ret)
if ret:
raise Exception("Gerrit error executing stream-events")
except:
self.log.exception("Exception on ssh event stream:")
self._close()
time.sleep(5)
def run(self):
@ -101,9 +84,10 @@ class GerritWatcher(threading.Thread):
class Gerrit(object):
log = logging.getLogger("gerrit.Gerrit")
def __init__(self, hostname, username, keyfile=None):
def __init__(self, hostname, username, port=29418, keyfile=None):
self.username = username
self.hostname = hostname
self.port = port
self.keyfile = keyfile
self.watcher_thread = None
self.event_queue = None
@ -157,7 +141,8 @@ class Gerrit(object):
client.set_missing_host_key_policy(paramiko.WarningPolicy())
client.connect(self.hostname,
username=self.username,
port=29418)
port=self.port,
key_filename=self.keyfile)
self.log.debug("SSH command:\n%s" % command)
stdin, stdout, stderr = client.exec_command(command)

View File

@ -69,7 +69,11 @@ class Gerrit(object):
sshkey = config.get('gerrit', 'sshkey')
else:
sshkey = None
self.gerrit = gerrit.Gerrit(server, user, sshkey)
if config.has_option('gerrit', 'port'):
port = config.get('gerrit', 'port')
else:
port = 29418
self.gerrit = gerrit.Gerrit(server, user, port, sshkey)
self.gerrit.startWatching()
self.gerrit_connector = GerritEventConnector(
self.gerrit, sched)