diff --git a/.testr.conf b/.testr.conf index 7409a5a..4c3aa44 100644 --- a/.testr.conf +++ b/.testr.conf @@ -1,4 +1,4 @@ [DEFAULT] -test_command=OS_TEST_TIMEOUT=60 ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION +test_command=OS_TEST_TIMEOUT=60 ${PYTHON:-python} -m subunit.run discover -t ./ gear/tests/ $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/gear/__init__.py b/gear/__init__.py index 134f03b..e62a830 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -14,7 +14,6 @@ import logging import os -import Queue import select import socket import struct @@ -24,6 +23,11 @@ import uuid as uuid_module from gear import constants +try: + import Queue as queue +except ImportError: + import queue as queue + PRECEDENCE_NORMAL = 0 PRECEDENCE_LOW = 1 PRECEDENCE_HIGH = 2 @@ -61,6 +65,14 @@ class GearmanError(Exception): pass +def convert_to_bytes(data): + try: + data = data.encode('utf8') + except AttributeError: + pass + return data + + class Task(object): def __init__(self): self._wait_event = threading.Event() @@ -208,7 +220,7 @@ class Connection(object): if not c: return None if admin is None: - if c == '\x00': + if c == b'\x00': admin = False else: admin = True @@ -244,15 +256,15 @@ class Connection(object): This method waits until the echo response has been received or the timeout has been reached. - :arg str data: The data to request be echoed. If None, a random - unique string will be generated. + :arg bytes data: The data to request be echoed. If None, a random + unique byte string will be generated. :arg numeric timeout: Number of seconds to wait until the response is received. If None, wait forever (default: 30 seconds). :raises TimeoutError: If the timeout is reached before the response is received. """ if data is None: - data = str(uuid_module.uuid4().hex) + data = uuid_module.uuid4().hex.encode('utf8') self.echo_lock.acquire() try: if data in self.echo_conditions: @@ -302,15 +314,15 @@ class AdminRequest(object): instantiated dircectly; a subclass implementing a specific command must be used instead. - :arg list arguments: A list of string arguments for the command. + :arg list arguments: A list of byte string arguments for the command. The following instance attributes are available: - **response** (str) + **response** (bytes) The response from the server. - **arguments** (str) + **arguments** (bytes) The argument supplied with the constructor. - **command** (str) + **command** (bytes) The administrative command. """ @@ -333,15 +345,15 @@ class AdminRequest(object): def getCommand(self): cmd = self.command if self.arguments: - cmd += ' ' + ' '.join(self.arguments) - cmd += '\n' + cmd += b' ' + b' '.join(self.arguments) + cmd += b'\n' return cmd def isComplete(self, data): - if (data[-3:] == '\n.\n' or - data[-5:] == '\r\n.\r\n' or - data == '.\n' or - data == '.\r\n'): + if (data[-3:] == b'\n.\n' or + data[-5:] == b'\r\n.\r\n' or + data == b'.\n' or + data == b'.\r\n'): self.response = data return True return False @@ -359,7 +371,7 @@ class StatusAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'status' + command = b'status' def __init__(self): super(StatusAdminRequest, self).__init__() @@ -370,7 +382,7 @@ class ShowJobsAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'show jobs' + command = b'show jobs' def __init__(self): super(ShowJobsAdminRequest, self).__init__() @@ -382,7 +394,7 @@ class ShowUniqueJobsAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'show unique jobs' + command = b'show unique jobs' def __init__(self): super(ShowUniqueJobsAdminRequest, self).__init__() @@ -396,13 +408,14 @@ class CancelJobAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'cancel job' + command = b'cancel job' def __init__(self, handle): + handle = convert_to_bytes(handle) super(CancelJobAdminRequest, self).__init__(handle) def isComplete(self, data): - if data[-1] == '\n': + if data[-1:] == b'\n': self.response = data return True return False @@ -414,13 +427,13 @@ class VersionAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'version' + command = b'version' def __init__(self): super(VersionAdminRequest, self).__init__() def isComplete(self, data): - if data[-1] == '\n': + if data[-1:] == b'\n': self.response = data return True return False @@ -430,22 +443,26 @@ class Packet(object): """A data packet received from or to be sent over a :py:class:`Connection`. - :arg str code: The Gearman magic code (:py:data:`constants.REQ` or + :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or :py:data:`constants.RES`) - :arg str ptype: The packet type (one of the packet types in constasts). - :arg str data: The data portion of the packet. - :arg str connection: The connection on which the packet was received - (optional). + :arg bytes ptype: The packet type (one of the packet types in constasts). + :arg bytes data: The data portion of the packet. + :arg Connection connection: The connection on which the packet + was received (optional). :raises InvalidDataError: If the magic code is unknown. """ log = logging.getLogger("gear.Packet") def __init__(self, code, ptype, data, connection=None): - if code[0] != '\x00': + if not isinstance(code, bytes) and not isinstance(code, bytearray): + raise TypeError("code must be of type bytes or bytearray") + if code[0:1] != b'\x00': raise InvalidDataError("First byte of packet must be 0") self.code = code self.ptype = ptype + if not isinstance(data, bytes) and not isinstance(data, bytearray): + raise TypeError("data must be of type bytes or bytearray") self.data = data self.connection = connection @@ -457,15 +474,11 @@ class Packet(object): """Return a Gearman wire protocol binary representation of the packet. :returns: The packet in binary form. - :rtype: str + :rtype: bytes """ b = struct.pack('!4sii', self.code, self.ptype, len(self.data)) b = bytearray(b) - if isinstance(self.data, basestring): - data = bytearray(self.data, 'utf8') - else: - data = self.data - b += data + b += self.data return b def getArgument(self, index, last=False): @@ -475,7 +488,7 @@ class Packet(object): :arg bool last: Whether this is the last argument (and thus nulls should be ignored) :returns: The argument value. - :rtype: str + :rtype: bytes """ parts = self.data.split(b'\x00') @@ -488,7 +501,7 @@ class Packet(object): this packet. :returns: The :py:class:`Job` for this packet. - :rtype: str + :rtype: Job :raises UnknownJobError: If the job is not known. """ handle = self.getArgument(0) @@ -563,7 +576,7 @@ class BaseClientServer(object): self.inactive_connections.remove(conn) self.active_connections.append(conn) self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') + os.write(self.wake_write, b'1\n') self.connections_condition.release() try: @@ -632,7 +645,7 @@ class BaseClientServer(object): if fd == self.wake_read: self.log.debug("Woken by pipe") while True: - if os.read(self.wake_read, 1) == '\n': + if os.read(self.wake_read, 1) == b'\n': break return conn = conn_dict[fd] @@ -877,7 +890,7 @@ class BaseClientServer(object): self.running = False self.connections_condition.acquire() self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') + os.write(self.wake_write, b'1\n') self.connections_condition.release() def _cleanup(self): @@ -1093,6 +1106,7 @@ class Client(BaseClient): :rtype: bool """ tasks = {} + name = convert_to_bytes(name) self.broadcast_lock.acquire() try: @@ -1140,10 +1154,10 @@ class Client(BaseClient): is supplied. """ if job.unique is None: - unique = '' + unique = b'' else: unique = job.unique - data = '%s\x00%s\x00%s' % (job.name, unique, job.arguments) + data = b'\x00'.join((job.name, unique, job.arguments)) if background: if precedence == PRECEDENCE_NORMAL: cmd = constants.SUBMIT_JOB_BG @@ -1418,11 +1432,11 @@ class Worker(BaseClient): log = logging.getLogger("gear.Worker") def __init__(self, worker_id): - self.worker_id = worker_id + self.worker_id = convert_to_bytes(worker_id) self.functions = {} self.job_lock = threading.Lock() self.waiting_for_jobs = 0 - self.job_queue = Queue.Queue() + self.job_queue = queue.Queue() super(Worker, self).__init__() def __repr__(self): @@ -1437,6 +1451,7 @@ class Worker(BaseClient): :arg str name: The name of the function to register. :arg numeric timeout: The timeout value (optional). """ + name = convert_to_bytes(name) self.functions[name] = FunctionRecord(name, timeout) if timeout: self._sendCanDoTimeout(name, timeout) @@ -1448,6 +1463,7 @@ class Worker(BaseClient): :arg str name: The name of the function to remove. """ + name = convert_to_bytes(name) del self.functions[name] self._sendCantDo(name) @@ -1488,7 +1504,7 @@ class Worker(BaseClient): def _sendCanDoTimeout(self, name, timeout): self.broadcast_lock.acquire() try: - data = name + '\x00' + timeout + data = name + b'\x00' + timeout p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) self.broadcast(p) finally: @@ -1505,17 +1521,17 @@ class Worker(BaseClient): def _sendResetAbilities(self): self.broadcast_lock.acquire() try: - p = Packet(constants.REQ, constants.RESET_ABILITIES, '') + p = Packet(constants.REQ, constants.RESET_ABILITIES, b'') self.broadcast(p) finally: self.broadcast_lock.release() def _sendPreSleep(self, connection): - p = Packet(constants.REQ, constants.PRE_SLEEP, '') + p = Packet(constants.REQ, constants.PRE_SLEEP, b'') self.sendPacket(p, connection) def _sendGrabJobUniq(self, connection=None): - p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, '') + p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'') if connection: self.sendPacket(p, connection) else: @@ -1530,7 +1546,7 @@ class Worker(BaseClient): super(Worker, self)._onConnect(conn) for f in self.functions.values(): if f.timeout: - data = f.name + '\x00' + f.timeout + data = f.name + b'\x00' + f.timeout p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) else: p = Packet(constants.REQ, constants.CAN_DO, f.name) @@ -1580,7 +1596,7 @@ class Worker(BaseClient): try: job = self.job_queue.get(False) - except Queue.Empty: + except queue.Empty: job = None if not job: @@ -1702,7 +1718,7 @@ class Worker(BaseClient): handle = packet.getArgument(0) name = packet.getArgument(1) unique = packet.getArgument(2) - if unique == '': + if unique == b'': unique = None arguments = packet.getArgument(3, True) return self._handleJobAssignment(packet, handle, name, @@ -1729,9 +1745,12 @@ class BaseJob(object): log = logging.getLogger("gear.Job") def __init__(self, name, arguments, unique=None, handle=None): - self.name = name + self.name = convert_to_bytes(name) + if (not isinstance(arguments, bytes) and + not isinstance(arguments, bytearray)): + raise TypeError("arguments must be of type bytes or bytearray") self.arguments = arguments - self.unique = unique + self.unique = convert_to_bytes(unique) self.handle = handle self.connection = None @@ -1744,26 +1763,26 @@ class Job(BaseJob): """A job to run or being run by Gearman. :arg str name: The name of the job. - :arg str arguments: The opaque data blob to be passed to the worker + :arg bytes arguments: The opaque data blob to be passed to the worker as arguments. - :arg str unique: A string to uniquely identify the job to Gearman + :arg str unique: A byte string to uniquely identify the job to Gearman (optional). The following instance attributes are available: **name** (str) The name of the job. - **arguments** (str) + **arguments** (bytes) The opaque data blob passed to the worker as arguments. **unique** (str or None) The unique ID of the job (if supplied). - **handle** (str or None) + **handle** (bytes or None) The Gearman job handle. None if no job handle has been received yet. **data** (list of byte-arrays) The result data returned from Gearman. Each packet appends an element to the list. Depending on the nature of the data, the elements may need to be concatenated before use. - **exception** (str or None) + **exception** (bytes or None) Exception information returned from Gearman. None if no exception has been received. **warning** (bool) @@ -1772,10 +1791,10 @@ class Job(BaseJob): Whether the job is complete. **failure** (bool) Whether the job has failed. Only set when complete is True. - **numerator** (str or None) + **numerator** (bytes or None) The numerator of the completion ratio reported by the worker. Only set when a status update is sent by the worker. - **denominator** (str or None) + **denominator** (bytes or None) The denominator of the completion ratio reported by the worker. Only set when a status update is sent by the worker. **fraction_complete** (float or None) @@ -1815,20 +1834,20 @@ class WorkerJob(BaseJob): :arg str handle: The job handle assigned by gearman. :arg str name: The name of the job. - :arg str arguments: The opaque data blob passed to the worker + :arg bytes arguments: The opaque data blob passed to the worker as arguments. - :arg str unique: A string to uniquely identify the job to Gearman + :arg str unique: A byte string to uniquely identify the job to Gearman (optional). The following instance attributes are available: **name** (str) The name of the job. - **arguments** (str) + **arguments** (bytes) The opaque data blob passed to the worker as arguments. **unique** (str or None) The unique ID of the job (if supplied). - **handle** (str) + **handle** (bytes) The Gearman job handle. **connection** (:py:class:`Connection` or None) The connection associated with the job. Only set after the job @@ -1840,23 +1859,23 @@ class WorkerJob(BaseJob): def __init__(self, handle, name, arguments, unique=None): super(WorkerJob, self).__init__(name, arguments, unique, handle) - def sendWorkData(self, data=''): + def sendWorkData(self, data=b''): """Send a WORK_DATA packet to the client. - :arg str data: The data to be sent to the client (optional). + :arg bytes data: The data to be sent to the client (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_DATA, data) self.connection.sendPacket(p) - def sendWorkWarning(self, data=''): + def sendWorkWarning(self, data=b''): """Send a WORK_WARNING packet to the client. - :arg str data: The data to be sent to the client (optional). + :arg bytes data: The data to be sent to the client (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_WARNING, data) self.connection.sendPacket(p) @@ -1870,18 +1889,19 @@ class WorkerJob(BaseJob): :arg numeric denominator: The denominator of the fraction complete. """ - data = (self.handle + '\x00' + - str(numerator) + '\x00' + str(denominator)) + data = (self.handle + b'\x00' + + str(numerator).encode('utf8') + b'\x00' + + str(denominator).encode('utf8')) p = Packet(constants.REQ, constants.WORK_STATUS, data) self.connection.sendPacket(p) - def sendWorkComplete(self, data=''): + def sendWorkComplete(self, data=b''): """Send a WORK_COMPLETE packet to the client. - :arg str data: The data to be sent to the client (optional). + :arg bytes data: The data to be sent to the client (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_COMPLETE, data) self.connection.sendPacket(p) @@ -1891,13 +1911,14 @@ class WorkerJob(BaseJob): p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) self.connection.sendPacket(p) - def sendWorkException(self, data=''): + def sendWorkException(self, data=b''): """Send a WORK_EXCEPTION packet to the client. - :arg str data: The exception data to be sent to the client (optional). + :arg bytes data: The exception data to be sent to the client + (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) self.connection.sendPacket(p) @@ -1912,7 +1933,7 @@ class ServerAdminRequest(AdminRequest): self.connection = connection def isComplete(self, data): - if data[-1] == '\n': + if data[-1:] == b'\n': self.command = data.strip() return True return False @@ -1946,7 +1967,7 @@ class Server(BaseClientServer): """A simple gearman server implementation for testing (not for production use). - :arg str port: The TCP port on which to listen. + :arg int port: The TCP port on which to listen. """ def __init__(self, port=4730): @@ -2005,7 +2026,7 @@ class Server(BaseClientServer): if fd == self.connect_wake_read: self.log.debug("Accept woken by pipe") while True: - if os.read(self.connect_wake_read, 1) == '\n': + if os.read(self.connect_wake_read, 1) == b'\n': break return if event & select.POLLIN: @@ -2016,12 +2037,12 @@ class Server(BaseClientServer): self.connections_condition.acquire() self.active_connections.append(conn) self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') + os.write(self.wake_write, b'1\n') self.connections_condition.release() def _shutdown(self): super(Server, self)._shutdown() - os.write(self.connect_wake_write, '1\n') + os.write(self.connect_wake_write, b'1\n') def _cleanup(self): super(Server, self)._cleanup() @@ -2037,9 +2058,9 @@ class Server(BaseClientServer): self.connections_condition.release() def handleAdminRequest(self, request): - if request.command.startswith('cancel job'): + if request.command.startswith(b'cancel job'): self.handleCancelJob(request) - elif request.command.startswith('status'): + elif request.command.startswith(b'status'): self.handleStatus(request) def handleCancelJob(self, request): @@ -2051,9 +2072,9 @@ class Server(BaseClientServer): if handle == job.handle: self.queue.remove(job) del self.jobs[handle] - request.connection.conn.send("OK\n") + request.connection.conn.send(b'OK\n') return - request.connection.conn.send("ERR UNKNOWN_JOB\n") + request.connection.conn.send(b'ERR UNKNOWN_JOB\n') def handleStatus(self, request): functions = {} @@ -2070,13 +2091,13 @@ class Server(BaseClientServer): for function in connection.functions: functions[function][2] += 1 for name, values in functions.items(): - request.connection.conn.send("%s\t%s\t%s\t%s\n" % + request.connection.conn.send(("%s\t%s\t%s\t%s\n" % (name, values[0], values[1], - values[2])) - request.connection.conn.send(".\n") + values[2])).encode('utf8')) + request.connection.conn.send(b'.\n') def wakeConnections(self): - p = Packet(constants.RES, constants.NOOP, '') + p = Packet(constants.RES, constants.NOOP, b'') for connection in self.active_connections: if connection.state == 'SLEEP': connection.sendPacket(p) @@ -2089,8 +2110,8 @@ class Server(BaseClientServer): unique = None arguments = packet.getArgument(2, True) packet.connection.max_handle += 1 - handle = 'H:%s:%s' % (packet.connection.host, - str(packet.connection.max_handle)) + handle = ('H:%s:%s' % (packet.connection.host, + packet.connection.max_handle)).encode('utf8') job = Job(name, arguments, unique) job.handle = handle job.connection = packet.connection @@ -2119,14 +2140,13 @@ class Server(BaseClientServer): def sendJobAssignUniq(self, connection, job): unique = job.unique if not unique: - unique = '' - data = '%s\x00%s\x00%s\x00%s' % (job.handle, job.name, - unique, job.arguments) + unique = b'' + data = b'\x00'.join((job.handle, job.name, unique, job.arguments)) p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data) connection.sendPacket(p) def sendNoJob(self, connection): - p = Packet(constants.RES, constants.NO_JOB, "") + p = Packet(constants.RES, constants.NO_JOB, b'') connection.sendPacket(p) def handlePreSleep(self, packet): @@ -2194,8 +2214,8 @@ class Server(BaseClientServer): known = 0 running = 0 - numerator = '' - denominator = '' + numerator = b'' + denominator = b'' job = self.jobs.get(handle) if job: known = 1 @@ -2204,10 +2224,10 @@ class Server(BaseClientServer): numerator = job.numerator denominator = job.denominator - data = (handle + '\x00' + - str(known) + '\x00' + - str(running) + '\x00' + - str(numerator) + '\x00' + - str(denominator)) + data = (handle + b'\x00' + + str(known).encode('utf8') + b'\x00' + + str(running).encode('utf8') + b'\x00' + + numerator + b'\x00' + + denominator) p = Packet(constants.RES, constants.STATUS_RES, data) packet.connection.sendPacket(p) diff --git a/gear/constants.py b/gear/constants.py index ad90d03..e10eba4 100644 --- a/gear/constants.py +++ b/gear/constants.py @@ -79,5 +79,5 @@ for i, name in types.items(): globals()[name] = i __doc__ += '\n.. py:data:: %s\n' % name -REQ = '\x00REQ' -RES = '\x00RES' +REQ = b'\x00REQ' +RES = b'\x00RES'