From 61bd6ce5708d27015bb9ccf01aff99bb2b0ac2d8 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 2 May 2013 17:38:47 -0700 Subject: [PATCH] Add simple Gearman server. For testing, not for real use under load. Change-Id: I9c84b1eea7d868e907b80b6edf60c49c172c356b --- doc/source/index.rst | 17 ++ gear/__init__.py | 690 +++++++++++++++++++++++++++++++------------ 2 files changed, 518 insertions(+), 189 deletions(-) diff --git a/doc/source/index.rst b/doc/source/index.rst index 31bd184..584bd37 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -10,6 +10,9 @@ it simple, with a relatively thin abstration of the Gearman protocol itself. It should be easy to use to build a client or worker that operates either synchronously or asynchronously. +The module also provides a simple Gearman server for use as a +convenience in unit tests. The server is not designed for production +use under load. Client Example -------------- @@ -137,6 +140,20 @@ AdminRequest Objects :inherited-members: +Server Usage +------------ + +A simple Gearman server is provided for convenience in unit testing, +but is not designed for production use at scale. It takes no +parameters other than the port number on which to listen. + +Server Objects +^^^^^^^^^^^^^^ +.. autoclass:: gear.Server + :members: + :inherited-members: + + Common ------ diff --git a/gear/__init__.py b/gear/__init__.py index 9d2cbfb..9d18890 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -121,6 +121,12 @@ class Connection(object): data. """ + if self.conn: + try: + self.conn.close() + except Exception: + pass + self.log.debug("Disconnected from %s port %s" % (self.host, self.port)) self._init() @@ -136,8 +142,12 @@ class Connection(object): :arg Packet packet: The :py:class:`Packet` to send. """ + self.log.debug("Sending packet: %s" % packet) self.conn.send(packet.toBinary()) + def _getAdminRequest(self): + return self.admin_requests.pop(0) + def readPacket(self): """Read one packet or administrative response from the server. @@ -161,7 +171,7 @@ class Connection(object): admin = False else: admin = True - admin_request = self.admin_requests.pop(0) + admin_request = self._getAdminRequest() packet += c if admin: if admin_request.isComplete(packet): @@ -369,10 +379,11 @@ class Packet(object): return job -class BaseClient(object): - log = logging.getLogger("gear.BaseClient") +class BaseClientServer(object): + log = logging.getLogger("gear.BaseClientServer") def __init__(self): + self.running = True self.active_connections = [] self.inactive_connections = [] @@ -391,9 +402,231 @@ class BaseClient(object): target=self._doConnectLoop) self.connect_thread.start() - def __repr__(self): - return '' % id(self) + def _doConnectLoop(self): + # Outer run method of the reconnection thread + while self.running: + self.connections_condition.acquire() + while self.running and not self.inactive_connections: + self.log.debug("Waiting for change in available servers " + "to reconnect") + self.connections_condition.wait() + self.connections_condition.release() + self.log.debug("Checking if servers need to be reconnected") + try: + if self.running and not self._connectLoop(): + # Nothing happened + time.sleep(2) + except Exception: + self.log.exception("Exception in connect loop:") + def _connectLoop(self): + # Inner method of the reconnection loop, triggered by + # a connection change + success = False + for conn in self.inactive_connections[:]: + self.log.debug("Trying to reconnect %s" % conn) + try: + conn.reconnect() + except ConnectionError: + self.log.debug("Unable to connect to %s" % conn) + continue + except Exception: + self.log.exception("Exception while connecting to %s" % conn) + continue + + try: + self._onConnect(conn) + except Exception: + self.log.exception("Exception while performing on-connect " + "tasks for %s" % conn) + continue + self.connections_condition.acquire() + self.inactive_connections.remove(conn) + self.active_connections.append(conn) + self.connections_condition.notifyAll() + os.write(self.wake_write, '1\n') + self.connections_condition.release() + + try: + self._onActiveConnection(conn) + except Exception: + self.log.exception("Exception while performing active conn " + "tasks for %s" % conn) + + success = True + return success + + def _onConnect(self, conn): + # Called immediately after a successful (re-)connection + pass + + def _onActiveConnection(self, conn): + # Called immediately after a connection is activated + pass + + def _lostConnection(self, conn): + # Called as soon as a connection is detected as faulty. Remove + # it and return ASAP and let the connection thread deal with it. + self.log.debug("Marking %s as disconnected" % conn) + self.connections_condition.acquire() + jobs = conn.pending_jobs + conn.related_jobs.values() + self.active_connections.remove(conn) + self.inactive_connections.append(conn) + self.connections_condition.notifyAll() + self.connections_condition.release() + for job in jobs: + self.handleDisconnect(job) + + def _doPollLoop(self): + # Outer run method of poll thread. + while self.running: + self.connections_condition.acquire() + while self.running and not self.active_connections: + self.log.debug("Waiting for change in available connections " + "to poll") + self.connections_condition.wait() + self.connections_condition.release() + try: + self._pollLoop() + except Exception: + self.log.exception("Exception in poll loop:") + + def _pollLoop(self): + # Inner method of poll loop + self.log.debug("Preparing to poll") + poll = select.poll() + bitmask = (select.POLLIN | select.POLLERR | + select.POLLHUP | select.POLLNVAL) + # Reverse mapping of fd -> connection + conn_dict = {} + for conn in self.active_connections: + poll.register(conn.conn.fileno(), bitmask) + conn_dict[conn.conn.fileno()] = conn + # Register the wake pipe so that we can break if we need to + # reconfigure connections + poll.register(self.wake_read, bitmask) + while self.running: + self.log.debug("Polling %s connections" % + len(self.active_connections)) + ret = poll.poll() + for fd, event in ret: + if fd == self.wake_read: + self.log.debug("Woken by pipe") + while True: + if os.read(self.wake_read, 1) == '\n': + break + return + if event & select.POLLIN: + self.log.debug("Processing input on %s" % conn) + p = conn_dict[fd].readPacket() + if p: + if isinstance(p, Packet): + self.handlePacket(p) + else: + self.handleAdminRequest(p) + else: + self.log.debug("Received no data on %s" % conn) + self._lostConnection(conn_dict[fd]) + return + else: + self.log.debug("Received error event on %s" % conn) + self._lostConnection(conn_dict[fd]) + return + + def handlePacket(self, packet): + """Handle a received packet. + + This method is called whenever a packet is received from any + connection. It normally calls the handle method appropriate + for the specific packet. + + :arg Packet packet: The :py:class:`Packet` that was received. + """ + + self.log.debug("Received packet %s" % packet) + if packet.ptype == constants.JOB_CREATED: + self.handleJobCreated(packet) + elif packet.ptype == constants.WORK_COMPLETE: + self.handleWorkComplete(packet) + elif packet.ptype == constants.WORK_FAIL: + self.handleWorkFail(packet) + elif packet.ptype == constants.WORK_EXCEPTION: + self.handleWorkException(packet) + elif packet.ptype == constants.WORK_DATA: + self.handleWorkData(packet) + elif packet.ptype == constants.WORK_WARNING: + self.handleWorkWarning(packet) + elif packet.ptype == constants.WORK_STATUS: + self.handleWorkStatus(packet) + elif packet.ptype == constants.STATUS_RES: + self.handleStatusRes(packet) + elif packet.ptype == constants.JOB_ASSIGN_UNIQ: + self.handleJobAssignUnique(packet) + elif packet.ptype == constants.NO_JOB: + self.handleNoJob(packet) + elif packet.ptype == constants.NOOP: + self.handleNoop(packet) + elif packet.ptype == constants.SUBMIT_JOB: + self.handleSubmitJob(packet) + elif packet.ptype == constants.GRAB_JOB_UNIQ: + self.handleGrabJobUniq(packet) + elif packet.ptype == constants.PRE_SLEEP: + self.handlePreSleep(packet) + elif packet.ptype == constants.SET_CLIENT_ID: + self.handleSetClientID(packet) + elif packet.ptype == constants.CAN_DO: + self.handleCanDo(packet) + elif packet.ptype == constants.CANT_DO: + self.handleCantDo(packet) + elif packet.ptype == constants.RESET_ABILITIES: + self.handleResetAbilities(packet) + else: + self.log.error("Received unknown packet" % packet) + + def handleAdminRequest(self, request): + """Handle an administrative command response from Gearman. + + This method is called whenever a response to a previously + issued administrative command is received from one of this + client's connections. It normally releases the wait lock on + the initiating AdminRequest object. + + :arg AdminRequest request: The :py:class:`AdminRequest` that + initiated the received response. + """ + + self.log.debug("Received admin data %s" % request) + request.setComplete() + + def shutdown(self): + """Close all connections and stop all running threads. + + The object may no longer be used after shutdown is called. + """ + self._shutdown() + self._cleanup() + + def _shutdown(self): + # The first part of the shutdown process where all threads + # are told to exit. + self.running = False + self.connections_condition.acquire() + self.connections_condition.notifyAll() + os.write(self.wake_write, '1\n') + self.connections_condition.release() + + def _cleanup(self): + # The second part of the shutdown process where we wait for all + # threads to exit and then clean up. + self.poll_thread.join() + self.connect_thread.join() + for connection in self.active_connections: + connection.disconnect() + self.active_connections = [] + self.inactive_connections = [] + + +class BaseClient(BaseClientServer): def addServer(self, host, port=4730): """Add a server to the client's connection pool. @@ -434,9 +667,9 @@ class BaseClient(object): Block until at least one gearman server is connected. """ connected = False - while True: + while self.running: self.connections_condition.acquire() - while not self.active_connections: + while self.running and not self.active_connections: self.log.debug("Waiting for at least one active connection") self.connections_condition.wait() if self.active_connections: @@ -446,70 +679,6 @@ class BaseClient(object): if connected: return - def _doConnectLoop(self): - # Outer run method of the reconnection thread - while True: - self.connections_condition.acquire() - while not self.inactive_connections: - self.log.debug("Waiting for change in available servers " - "to reconnect") - self.connections_condition.wait() - self.connections_condition.release() - self.log.debug("Checking if servers need to be reconnected") - try: - if not self._connectLoop(): - # Nothing happened - time.sleep(2) - except Exception: - self.log.exception("Exception in connect loop:") - - def _connectLoop(self): - # Inner method of the reconnection loop, triggered by - # a connection change - success = False - for conn in self.inactive_connections[:]: - self.log.debug("Trying to reconnect %s" % conn) - try: - conn.reconnect() - except ConnectionError: - self.log.debug("Unable to connect to %s" % conn) - continue - except Exception: - self.log.exception("Exception while connecting to %s" % conn) - continue - - try: - self._onConnect(conn) - except Exception: - self.log.exception("Exception while performing on-connect " - "tasks for %s" % conn) - continue - self.connections_condition.acquire() - self.inactive_connections.remove(conn) - self.active_connections.append(conn) - self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') - self.connections_condition.release() - success = True - return success - - def _onConnect(self, conn): - # Called immediately after a successful (re-)connection - pass - - def _lostConnection(self, conn): - # Called as soon as a connection is detected as faulty. Remove - # it and return ASAP and let the connection thread deal with it. - self.log.debug("Marking %s as disconnected" % conn) - self.connections_condition.acquire() - jobs = conn.pending_jobs + conn.related_jobs.values() - self.active_connections.remove(conn) - self.inactive_connections.append(conn) - self.connections_condition.notifyAll() - self.connections_condition.release() - for job in jobs: - self.handleDisconnect(job) - def getConnection(self): """Return a connected server. @@ -539,62 +708,6 @@ class BaseClient(object): self.connections_condition.release() return conn - def _doPollLoop(self): - # Outer run method of poll thread. - while True: - self.connections_condition.acquire() - while not self.active_connections: - self.log.debug("Waiting for change in available servers " - "to poll") - self.connections_condition.wait() - self.connections_condition.release() - try: - self._pollLoop() - except Exception: - self.log.exception("Exception in poll loop:") - - def _pollLoop(self): - # Inner method of poll loop - self.log.debug("Preparing to poll") - poll = select.poll() - bitmask = (select.POLLIN | select.POLLERR | - select.POLLHUP | select.POLLNVAL) - # Reverse mapping of fd -> connection - conn_dict = {} - for conn in self.active_connections: - poll.register(conn.conn.fileno(), bitmask) - conn_dict[conn.conn.fileno()] = conn - # Register the wake pipe so that we can break if we need to - # reconfigure connections - poll.register(self.wake_read, bitmask) - while True: - self.log.debug("Polling %s connections" % - len(self.active_connections)) - ret = poll.poll() - for fd, event in ret: - if fd == self.wake_read: - self.log.debug("Woken by pipe") - while True: - if os.read(self.wake_read, 1) == '\n': - break - return - if event & select.POLLIN: - self.log.debug("Processing input on %s" % conn) - p = conn_dict[fd].readPacket() - if p: - if isinstance(p, Packet): - self.handlePacket(p) - else: - self.handleAdminResponse(p) - else: - self.log.debug("Received no data on %s" % conn) - self._lostConnection(conn_dict[fd]) - return - else: - self.log.debug("Received error event on %s" % conn) - self._lostConnection(conn_dict[fd]) - return - def broadcast(self, packet): """Send a packet to all currently connected servers. @@ -626,21 +739,6 @@ class BaseClient(object): self._lostConnection(connection) raise - def handleAdminResponse(self, request): - """Handle an administrative command response from Gearman. - - This method is called whenever a response to a previously - issued administrative command is received from one of this - client's connections. It normally releases the wait lock on - the initiating AdminRequest object. - - :arg AdminRequest request: The :py:class:`AdminRequest` that - initiated the received response. - """ - - self.log.debug("Received admin response %s" % request) - request.setComplete() - class Client(BaseClient): """A Gearman client. @@ -653,6 +751,9 @@ class Client(BaseClient): log = logging.getLogger("gear.Client") + def __repr__(self): + return '' % id(self) + def submitJob(self, job, background=False, precedence=PRECEDENCE_NORMAL): """Submit a job to a Gearman server. @@ -709,34 +810,6 @@ class Client(BaseClient): # try again self._lostConnection(conn) - def handlePacket(self, packet): - """Handle a packet received from a Gearman server. - - This method is called whenever a packet is received from any - of this client's connections. It normally calls the handle - method appropriate for the specific packet. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - - self.log.debug("Received packet %s" % packet) - if packet.ptype == constants.JOB_CREATED: - self.handleJobCreated(packet) - elif packet.ptype == constants.WORK_COMPLETE: - self.handleWorkComplete(packet) - elif packet.ptype == constants.WORK_FAIL: - self.handleWorkFail(packet) - elif packet.ptype == constants.WORK_EXCEPTION: - self.handleWorkException(packet) - elif packet.ptype == constants.WORK_DATA: - self.handleWorkData(packet) - elif packet.ptype == constants.WORK_WARNING: - self.handleWorkWarning(packet) - elif packet.ptype == constants.WORK_STATUS: - self.handleWorkStatus(packet) - elif packet.ptype == constants.STATUS_RES: - self.handleStatusRes(packet) - def handleJobCreated(self, packet): """Handle a JOB_CREATED packet. @@ -946,6 +1019,9 @@ class Worker(BaseClient): self.job_queue = Queue.Queue() super(Worker, self).__init__() + def __repr__(self): + return '' % id(self) + def registerFunction(self, name, timeout=None): """Register a function with Gearman. @@ -1038,6 +1114,14 @@ class Worker(BaseClient): # Any exceptions will be handled by the calling function, and the # connection will not be put into the pool. + def _onActiveConnection(self, conn): + self.job_lock.acquire() + try: + if self.waiting_for_jobs > 0: + self._updateStateMachines() + finally: + self.job_lock.release() + def _updateStateMachines(self): connections = self.active_connections[:] @@ -1097,6 +1181,8 @@ class Worker(BaseClient): ok = True for connection in connections: if connection.state == "GRAB_WAIT": + # Replies to GRAB_JOB should be fast, give up if we've + # been waiting for more than 5 seconds. if now - connection.state_time > 5: self._lostConnection(connection) else: @@ -1115,6 +1201,10 @@ class Worker(BaseClient): self._updateStateMachines() self.job_lock.release() + def _shutdown(self): + super(Worker, self)._shutdown() + self.stopWaitingForJobs() + def handleNoop(self, packet): """Handle a NOOP packet. @@ -1188,25 +1278,6 @@ class Worker(BaseClient): finally: self.job_lock.release() - def handlePacket(self, packet): - """Handle a packet received from a Gearman server. - - This method is called whenever a packet is received from any - of this worker's connections. It normally calls the handle - method appropriate for the specific packet. - - :arg Packet packet: The :py:class:`Packet` that was received. - """ - - self.log.debug("Received packet %s" % packet) - - if packet.ptype == constants.JOB_ASSIGN_UNIQ: - self.handleJobAssignUnique(packet) - elif packet.ptype == constants.NO_JOB: - self.handleNoJob(packet) - elif packet.ptype == constants.NOOP: - self.handleNoop(packet) - class BaseJob(object): log = logging.getLogger("gear.Job") @@ -1396,3 +1467,244 @@ class WorkerJob(BaseJob): data = self.handle + '\x00' + data p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) self.connection.sendPacket(p) + +# Below are classes for use in the server implementation: + + +class ServerAdminRequest(AdminRequest): + """An administrative request sent to a server.""" + + finished_re = re.compile('^.*\r?\n', re.M) + + def __init__(self, connection): + super(ServerAdminRequest, self).__init__() + self.connection = connection + + def isComplete(self, data): + if self.finished_re.search(data): + self.command = data.strip() + return True + return False + + +class ServerConnection(Connection): + """A Connection to a Gearman Client.""" + + def __init__(self, addr, conn): + self.host = addr[0] + self.port = addr[1] + self.conn = conn + self.max_handle = 0 + self.client_id = None + self.functions = set() + self.changeState("INIT") + + def _getAdminRequest(self): + return ServerAdminRequest(self) + + def __repr__(self): + if self.client_id: + name = self.client_id + else: + name = '0x%x' % id(self) + return '' % ( + name, self.host, self.port) + + +class Server(BaseClientServer): + """A simple gearman server implementation for testing + (not for production use). + + :arg str port: The TCP port on which to listen. + """ + + def __init__(self, port=4730): + self.port = port + self.queue = [] + self.jobs = {} + self.connect_wake_read, self.connect_wake_write = os.pipe() + + for res in socket.getaddrinfo(None, self.port, socket.AF_UNSPEC, + socket.SOCK_STREAM, 0, + socket.AI_PASSIVE): + af, socktype, proto, canonname, sa = res + try: + self.socket = socket.socket(af, socktype, proto) + self.socket.setsockopt(socket.SOL_SOCKET, + socket.SO_REUSEADDR, 1) + except socket.error: + self.socket = None + continue + try: + self.socket.bind(sa) + self.socket.listen(1) + except socket.error: + self.socket.close() + self.socket = None + continue + break + + if self.socket is None: + raise Exception("Could not open socket") + + super(Server, self).__init__() + + def _doConnectLoop(self): + while self.running: + try: + self.connectLoop() + except Exception: + self.log.exception("Exception in connect loop:") + + def connectLoop(self): + poll = select.poll() + bitmask = (select.POLLIN | select.POLLERR | + select.POLLHUP | select.POLLNVAL) + # Register the wake pipe so that we can break if we need to + # shutdown. + poll.register(self.connect_wake_read, bitmask) + poll.register(self.socket.fileno(), bitmask) + while self.running: + ret = poll.poll() + for fd, event in ret: + if fd == self.connect_wake_read: + self.log.debug("Accept woken by pipe") + while True: + if os.read(self.connect_wake_read, 1) == '\n': + break + return + if event & select.POLLIN: + self.log.debug("Accepting new connection") + c, addr = self.socket.accept() + self.log.debug("Accepted new connection") + conn = ServerConnection(addr, c) + self.connections_condition.acquire() + self.active_connections.append(conn) + self.connections_condition.notifyAll() + os.write(self.wake_write, '1\n') + self.connections_condition.release() + + def _shutdown(self): + super(Server, self)._shutdown() + os.write(self.connect_wake_write, '1\n') + + def _cleanup(self): + super(Server, self)._cleanup() + self.socket.close() + + def _lostConnection(self, conn): + # Called as soon as a connection is detected as faulty. Remove + # it and return ASAP and let the connection thread deal with it. + self.log.debug("Marking %s as disconnected" % conn) + self.connections_condition.acquire() + self.active_connections.remove(conn) + self.connections_condition.notifyAll() + self.connections_condition.release() + + def handleAdminRequest(self, request): + if request.command.startswith('cancel job'): + self.handleCancelJob(request) + + def handleCancelJob(self, request): + words = request.command.split() + handle = words[2] + + if handle in self.jobs: + for job in self.queue: + if handle == job.handle: + self.queue.remove(job) + del self.jobs[handle] + request.connection.conn.send("OK\n") + return + request.connection.conn.send("ERR UNKNOWN_JOB\n") + + def wakeConnections(self): + p = Packet(constants.REQ, constants.NOOP, '') + for connection in self.active_connections: + if connection.state == 'SLEEP': + connection.sendPacket(p) + + def handleSubmitJob(self, packet): + name = packet.getArgument(0) + unique = packet.getArgument(1) + if not unique: + unique = None + arguments = packet.getArgument(2) + packet.connection.max_handle += 1 + handle = 'H:%s:%s' % (packet.connection.host, + str(packet.connection.max_handle)) + job = BaseJob(name, arguments, unique, handle) + job.connection = packet.connection + p = Packet(constants.REQ, constants.JOB_CREATED, handle) + packet.connection.sendPacket(p) + self.jobs[handle] = job + self.queue.append(job) + self.wakeConnections() + + def handleGrabJobUniq(self, packet): + job = None + for j in self.queue: + if j.name in packet.connection.functions: + job = j + self.queue.remove(j) + break + if job: + unique = job.unique + if not unique: + unique = '' + data = '%s\x00%s\x00%s\x00%s' % (job.handle, job.name, + unique, job.arguments) + p = Packet(constants.REQ, constants.JOB_ASSIGN_UNIQ, data) + packet.connection.sendPacket(p) + else: + p = Packet(constants.REQ, constants.NO_JOB, "") + packet.connection.sendPacket(p) + + def handlePreSleep(self, packet): + packet.connection.changeState("SLEEP") + + def handleWorkComplete(self, packet): + self.handlePassthrough(self, packet, True) + + def handleWorkFail(self, packet): + self.handlePassthrough(self, packet, True) + + def handleWorkException(self, packet): + self.handlePassthrough(self, packet, True) + + def handleWorkData(self, packet): + self.handlePassthrough(self, packet) + + def handleWorkWarning(self, packet): + self.handlePassthrough(self, packet) + + def handleWorkStatus(self, packet): + self.handlePassthrough(self, packet) + + def handlePassthrough(self, packet, finished=False): + handle = packet.getArgument(0) + job = self.jobs.get(handle) + if not job: + raise UnknownJobError() + job.connection.sendPacket(packet) + if finished: + del self.jobs[handle] + + def handleSetClientID(self, packet): + name = packet.getArgument(0) + packet.connection.client_id = name + + def handleCanDo(self, packet): + name = packet.getArgument(0) + self.log.debug("Adding function %s to %s" % (name, packet.connection)) + packet.connection.functions.add(name) + + def handleCantDo(self, packet): + name = packet.getArgument(0) + self.log.debug("Removing function %s from %s" % + (name, packet.connection)) + packet.connection.functions.remove(name) + + def handleResetAbilities(self, packet): + self.log.debug("Resetting functions for %s" % packet.connection) + packet.connection.functions = set()