From 1e76bfa886dd340069a5bf80535e5eb8f37a7c88 Mon Sep 17 00:00:00 2001 From: Clark Boylan Date: Mon, 3 Jun 2013 20:09:34 -0700 Subject: [PATCH] Support python3. Import queue on python3 and Queue on python2. Use bytestrings for all string constants. Slice byte arrays when comparing to bytestrings. Don't use basestring. Gear now expects its packet data to be byte strings. Non data fields may be passed as unicode strings, bytes or bytearrays. Unicode strings will be converted to bytes using the utf8 encoding. Change-Id: Iccdd01f9ce6f649f8bbaa93370d330c78c89f087 --- .testr.conf | 2 +- gear/__init__.py | 226 +++++++++++++++++++++++++--------------------- gear/constants.py | 4 +- 3 files changed, 126 insertions(+), 106 deletions(-) diff --git a/.testr.conf b/.testr.conf index 7409a5a..4c3aa44 100644 --- a/.testr.conf +++ b/.testr.conf @@ -1,4 +1,4 @@ [DEFAULT] -test_command=OS_TEST_TIMEOUT=60 ${PYTHON:-python} -m subunit.run discover -t ./ . $LISTOPT $IDOPTION +test_command=OS_TEST_TIMEOUT=60 ${PYTHON:-python} -m subunit.run discover -t ./ gear/tests/ $LISTOPT $IDOPTION test_id_option=--load-list $IDFILE test_list_option=--list diff --git a/gear/__init__.py b/gear/__init__.py index 134f03b..e62a830 100644 --- a/gear/__init__.py +++ b/gear/__init__.py @@ -14,7 +14,6 @@ import logging import os -import Queue import select import socket import struct @@ -24,6 +23,11 @@ import uuid as uuid_module from gear import constants +try: + import Queue as queue +except ImportError: + import queue as queue + PRECEDENCE_NORMAL = 0 PRECEDENCE_LOW = 1 PRECEDENCE_HIGH = 2 @@ -61,6 +65,14 @@ class GearmanError(Exception): pass +def convert_to_bytes(data): + try: + data = data.encode('utf8') + except AttributeError: + pass + return data + + class Task(object): def __init__(self): self._wait_event = threading.Event() @@ -208,7 +220,7 @@ class Connection(object): if not c: return None if admin is None: - if c == '\x00': + if c == b'\x00': admin = False else: admin = True @@ -244,15 +256,15 @@ class Connection(object): This method waits until the echo response has been received or the timeout has been reached. - :arg str data: The data to request be echoed. If None, a random - unique string will be generated. + :arg bytes data: The data to request be echoed. If None, a random + unique byte string will be generated. :arg numeric timeout: Number of seconds to wait until the response is received. If None, wait forever (default: 30 seconds). :raises TimeoutError: If the timeout is reached before the response is received. """ if data is None: - data = str(uuid_module.uuid4().hex) + data = uuid_module.uuid4().hex.encode('utf8') self.echo_lock.acquire() try: if data in self.echo_conditions: @@ -302,15 +314,15 @@ class AdminRequest(object): instantiated dircectly; a subclass implementing a specific command must be used instead. - :arg list arguments: A list of string arguments for the command. + :arg list arguments: A list of byte string arguments for the command. The following instance attributes are available: - **response** (str) + **response** (bytes) The response from the server. - **arguments** (str) + **arguments** (bytes) The argument supplied with the constructor. - **command** (str) + **command** (bytes) The administrative command. """ @@ -333,15 +345,15 @@ class AdminRequest(object): def getCommand(self): cmd = self.command if self.arguments: - cmd += ' ' + ' '.join(self.arguments) - cmd += '\n' + cmd += b' ' + b' '.join(self.arguments) + cmd += b'\n' return cmd def isComplete(self, data): - if (data[-3:] == '\n.\n' or - data[-5:] == '\r\n.\r\n' or - data == '.\n' or - data == '.\r\n'): + if (data[-3:] == b'\n.\n' or + data[-5:] == b'\r\n.\r\n' or + data == b'.\n' or + data == b'.\r\n'): self.response = data return True return False @@ -359,7 +371,7 @@ class StatusAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'status' + command = b'status' def __init__(self): super(StatusAdminRequest, self).__init__() @@ -370,7 +382,7 @@ class ShowJobsAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'show jobs' + command = b'show jobs' def __init__(self): super(ShowJobsAdminRequest, self).__init__() @@ -382,7 +394,7 @@ class ShowUniqueJobsAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'show unique jobs' + command = b'show unique jobs' def __init__(self): super(ShowUniqueJobsAdminRequest, self).__init__() @@ -396,13 +408,14 @@ class CancelJobAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'cancel job' + command = b'cancel job' def __init__(self, handle): + handle = convert_to_bytes(handle) super(CancelJobAdminRequest, self).__init__(handle) def isComplete(self, data): - if data[-1] == '\n': + if data[-1:] == b'\n': self.response = data return True return False @@ -414,13 +427,13 @@ class VersionAdminRequest(AdminRequest): The response from gearman may be found in the **response** attribute. """ - command = 'version' + command = b'version' def __init__(self): super(VersionAdminRequest, self).__init__() def isComplete(self, data): - if data[-1] == '\n': + if data[-1:] == b'\n': self.response = data return True return False @@ -430,22 +443,26 @@ class Packet(object): """A data packet received from or to be sent over a :py:class:`Connection`. - :arg str code: The Gearman magic code (:py:data:`constants.REQ` or + :arg bytes code: The Gearman magic code (:py:data:`constants.REQ` or :py:data:`constants.RES`) - :arg str ptype: The packet type (one of the packet types in constasts). - :arg str data: The data portion of the packet. - :arg str connection: The connection on which the packet was received - (optional). + :arg bytes ptype: The packet type (one of the packet types in constasts). + :arg bytes data: The data portion of the packet. + :arg Connection connection: The connection on which the packet + was received (optional). :raises InvalidDataError: If the magic code is unknown. """ log = logging.getLogger("gear.Packet") def __init__(self, code, ptype, data, connection=None): - if code[0] != '\x00': + if not isinstance(code, bytes) and not isinstance(code, bytearray): + raise TypeError("code must be of type bytes or bytearray") + if code[0:1] != b'\x00': raise InvalidDataError("First byte of packet must be 0") self.code = code self.ptype = ptype + if not isinstance(data, bytes) and not isinstance(data, bytearray): + raise TypeError("data must be of type bytes or bytearray") self.data = data self.connection = connection @@ -457,15 +474,11 @@ class Packet(object): """Return a Gearman wire protocol binary representation of the packet. :returns: The packet in binary form. - :rtype: str + :rtype: bytes """ b = struct.pack('!4sii', self.code, self.ptype, len(self.data)) b = bytearray(b) - if isinstance(self.data, basestring): - data = bytearray(self.data, 'utf8') - else: - data = self.data - b += data + b += self.data return b def getArgument(self, index, last=False): @@ -475,7 +488,7 @@ class Packet(object): :arg bool last: Whether this is the last argument (and thus nulls should be ignored) :returns: The argument value. - :rtype: str + :rtype: bytes """ parts = self.data.split(b'\x00') @@ -488,7 +501,7 @@ class Packet(object): this packet. :returns: The :py:class:`Job` for this packet. - :rtype: str + :rtype: Job :raises UnknownJobError: If the job is not known. """ handle = self.getArgument(0) @@ -563,7 +576,7 @@ class BaseClientServer(object): self.inactive_connections.remove(conn) self.active_connections.append(conn) self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') + os.write(self.wake_write, b'1\n') self.connections_condition.release() try: @@ -632,7 +645,7 @@ class BaseClientServer(object): if fd == self.wake_read: self.log.debug("Woken by pipe") while True: - if os.read(self.wake_read, 1) == '\n': + if os.read(self.wake_read, 1) == b'\n': break return conn = conn_dict[fd] @@ -877,7 +890,7 @@ class BaseClientServer(object): self.running = False self.connections_condition.acquire() self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') + os.write(self.wake_write, b'1\n') self.connections_condition.release() def _cleanup(self): @@ -1093,6 +1106,7 @@ class Client(BaseClient): :rtype: bool """ tasks = {} + name = convert_to_bytes(name) self.broadcast_lock.acquire() try: @@ -1140,10 +1154,10 @@ class Client(BaseClient): is supplied. """ if job.unique is None: - unique = '' + unique = b'' else: unique = job.unique - data = '%s\x00%s\x00%s' % (job.name, unique, job.arguments) + data = b'\x00'.join((job.name, unique, job.arguments)) if background: if precedence == PRECEDENCE_NORMAL: cmd = constants.SUBMIT_JOB_BG @@ -1418,11 +1432,11 @@ class Worker(BaseClient): log = logging.getLogger("gear.Worker") def __init__(self, worker_id): - self.worker_id = worker_id + self.worker_id = convert_to_bytes(worker_id) self.functions = {} self.job_lock = threading.Lock() self.waiting_for_jobs = 0 - self.job_queue = Queue.Queue() + self.job_queue = queue.Queue() super(Worker, self).__init__() def __repr__(self): @@ -1437,6 +1451,7 @@ class Worker(BaseClient): :arg str name: The name of the function to register. :arg numeric timeout: The timeout value (optional). """ + name = convert_to_bytes(name) self.functions[name] = FunctionRecord(name, timeout) if timeout: self._sendCanDoTimeout(name, timeout) @@ -1448,6 +1463,7 @@ class Worker(BaseClient): :arg str name: The name of the function to remove. """ + name = convert_to_bytes(name) del self.functions[name] self._sendCantDo(name) @@ -1488,7 +1504,7 @@ class Worker(BaseClient): def _sendCanDoTimeout(self, name, timeout): self.broadcast_lock.acquire() try: - data = name + '\x00' + timeout + data = name + b'\x00' + timeout p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) self.broadcast(p) finally: @@ -1505,17 +1521,17 @@ class Worker(BaseClient): def _sendResetAbilities(self): self.broadcast_lock.acquire() try: - p = Packet(constants.REQ, constants.RESET_ABILITIES, '') + p = Packet(constants.REQ, constants.RESET_ABILITIES, b'') self.broadcast(p) finally: self.broadcast_lock.release() def _sendPreSleep(self, connection): - p = Packet(constants.REQ, constants.PRE_SLEEP, '') + p = Packet(constants.REQ, constants.PRE_SLEEP, b'') self.sendPacket(p, connection) def _sendGrabJobUniq(self, connection=None): - p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, '') + p = Packet(constants.REQ, constants.GRAB_JOB_UNIQ, b'') if connection: self.sendPacket(p, connection) else: @@ -1530,7 +1546,7 @@ class Worker(BaseClient): super(Worker, self)._onConnect(conn) for f in self.functions.values(): if f.timeout: - data = f.name + '\x00' + f.timeout + data = f.name + b'\x00' + f.timeout p = Packet(constants.REQ, constants.CAN_DO_TIMEOUT, data) else: p = Packet(constants.REQ, constants.CAN_DO, f.name) @@ -1580,7 +1596,7 @@ class Worker(BaseClient): try: job = self.job_queue.get(False) - except Queue.Empty: + except queue.Empty: job = None if not job: @@ -1702,7 +1718,7 @@ class Worker(BaseClient): handle = packet.getArgument(0) name = packet.getArgument(1) unique = packet.getArgument(2) - if unique == '': + if unique == b'': unique = None arguments = packet.getArgument(3, True) return self._handleJobAssignment(packet, handle, name, @@ -1729,9 +1745,12 @@ class BaseJob(object): log = logging.getLogger("gear.Job") def __init__(self, name, arguments, unique=None, handle=None): - self.name = name + self.name = convert_to_bytes(name) + if (not isinstance(arguments, bytes) and + not isinstance(arguments, bytearray)): + raise TypeError("arguments must be of type bytes or bytearray") self.arguments = arguments - self.unique = unique + self.unique = convert_to_bytes(unique) self.handle = handle self.connection = None @@ -1744,26 +1763,26 @@ class Job(BaseJob): """A job to run or being run by Gearman. :arg str name: The name of the job. - :arg str arguments: The opaque data blob to be passed to the worker + :arg bytes arguments: The opaque data blob to be passed to the worker as arguments. - :arg str unique: A string to uniquely identify the job to Gearman + :arg str unique: A byte string to uniquely identify the job to Gearman (optional). The following instance attributes are available: **name** (str) The name of the job. - **arguments** (str) + **arguments** (bytes) The opaque data blob passed to the worker as arguments. **unique** (str or None) The unique ID of the job (if supplied). - **handle** (str or None) + **handle** (bytes or None) The Gearman job handle. None if no job handle has been received yet. **data** (list of byte-arrays) The result data returned from Gearman. Each packet appends an element to the list. Depending on the nature of the data, the elements may need to be concatenated before use. - **exception** (str or None) + **exception** (bytes or None) Exception information returned from Gearman. None if no exception has been received. **warning** (bool) @@ -1772,10 +1791,10 @@ class Job(BaseJob): Whether the job is complete. **failure** (bool) Whether the job has failed. Only set when complete is True. - **numerator** (str or None) + **numerator** (bytes or None) The numerator of the completion ratio reported by the worker. Only set when a status update is sent by the worker. - **denominator** (str or None) + **denominator** (bytes or None) The denominator of the completion ratio reported by the worker. Only set when a status update is sent by the worker. **fraction_complete** (float or None) @@ -1815,20 +1834,20 @@ class WorkerJob(BaseJob): :arg str handle: The job handle assigned by gearman. :arg str name: The name of the job. - :arg str arguments: The opaque data blob passed to the worker + :arg bytes arguments: The opaque data blob passed to the worker as arguments. - :arg str unique: A string to uniquely identify the job to Gearman + :arg str unique: A byte string to uniquely identify the job to Gearman (optional). The following instance attributes are available: **name** (str) The name of the job. - **arguments** (str) + **arguments** (bytes) The opaque data blob passed to the worker as arguments. **unique** (str or None) The unique ID of the job (if supplied). - **handle** (str) + **handle** (bytes) The Gearman job handle. **connection** (:py:class:`Connection` or None) The connection associated with the job. Only set after the job @@ -1840,23 +1859,23 @@ class WorkerJob(BaseJob): def __init__(self, handle, name, arguments, unique=None): super(WorkerJob, self).__init__(name, arguments, unique, handle) - def sendWorkData(self, data=''): + def sendWorkData(self, data=b''): """Send a WORK_DATA packet to the client. - :arg str data: The data to be sent to the client (optional). + :arg bytes data: The data to be sent to the client (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_DATA, data) self.connection.sendPacket(p) - def sendWorkWarning(self, data=''): + def sendWorkWarning(self, data=b''): """Send a WORK_WARNING packet to the client. - :arg str data: The data to be sent to the client (optional). + :arg bytes data: The data to be sent to the client (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_WARNING, data) self.connection.sendPacket(p) @@ -1870,18 +1889,19 @@ class WorkerJob(BaseJob): :arg numeric denominator: The denominator of the fraction complete. """ - data = (self.handle + '\x00' + - str(numerator) + '\x00' + str(denominator)) + data = (self.handle + b'\x00' + + str(numerator).encode('utf8') + b'\x00' + + str(denominator).encode('utf8')) p = Packet(constants.REQ, constants.WORK_STATUS, data) self.connection.sendPacket(p) - def sendWorkComplete(self, data=''): + def sendWorkComplete(self, data=b''): """Send a WORK_COMPLETE packet to the client. - :arg str data: The data to be sent to the client (optional). + :arg bytes data: The data to be sent to the client (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_COMPLETE, data) self.connection.sendPacket(p) @@ -1891,13 +1911,14 @@ class WorkerJob(BaseJob): p = Packet(constants.REQ, constants.WORK_FAIL, self.handle) self.connection.sendPacket(p) - def sendWorkException(self, data=''): + def sendWorkException(self, data=b''): """Send a WORK_EXCEPTION packet to the client. - :arg str data: The exception data to be sent to the client (optional). + :arg bytes data: The exception data to be sent to the client + (optional). """ - data = self.handle + '\x00' + data + data = self.handle + b'\x00' + data p = Packet(constants.REQ, constants.WORK_EXCEPTION, data) self.connection.sendPacket(p) @@ -1912,7 +1933,7 @@ class ServerAdminRequest(AdminRequest): self.connection = connection def isComplete(self, data): - if data[-1] == '\n': + if data[-1:] == b'\n': self.command = data.strip() return True return False @@ -1946,7 +1967,7 @@ class Server(BaseClientServer): """A simple gearman server implementation for testing (not for production use). - :arg str port: The TCP port on which to listen. + :arg int port: The TCP port on which to listen. """ def __init__(self, port=4730): @@ -2005,7 +2026,7 @@ class Server(BaseClientServer): if fd == self.connect_wake_read: self.log.debug("Accept woken by pipe") while True: - if os.read(self.connect_wake_read, 1) == '\n': + if os.read(self.connect_wake_read, 1) == b'\n': break return if event & select.POLLIN: @@ -2016,12 +2037,12 @@ class Server(BaseClientServer): self.connections_condition.acquire() self.active_connections.append(conn) self.connections_condition.notifyAll() - os.write(self.wake_write, '1\n') + os.write(self.wake_write, b'1\n') self.connections_condition.release() def _shutdown(self): super(Server, self)._shutdown() - os.write(self.connect_wake_write, '1\n') + os.write(self.connect_wake_write, b'1\n') def _cleanup(self): super(Server, self)._cleanup() @@ -2037,9 +2058,9 @@ class Server(BaseClientServer): self.connections_condition.release() def handleAdminRequest(self, request): - if request.command.startswith('cancel job'): + if request.command.startswith(b'cancel job'): self.handleCancelJob(request) - elif request.command.startswith('status'): + elif request.command.startswith(b'status'): self.handleStatus(request) def handleCancelJob(self, request): @@ -2051,9 +2072,9 @@ class Server(BaseClientServer): if handle == job.handle: self.queue.remove(job) del self.jobs[handle] - request.connection.conn.send("OK\n") + request.connection.conn.send(b'OK\n') return - request.connection.conn.send("ERR UNKNOWN_JOB\n") + request.connection.conn.send(b'ERR UNKNOWN_JOB\n') def handleStatus(self, request): functions = {} @@ -2070,13 +2091,13 @@ class Server(BaseClientServer): for function in connection.functions: functions[function][2] += 1 for name, values in functions.items(): - request.connection.conn.send("%s\t%s\t%s\t%s\n" % + request.connection.conn.send(("%s\t%s\t%s\t%s\n" % (name, values[0], values[1], - values[2])) - request.connection.conn.send(".\n") + values[2])).encode('utf8')) + request.connection.conn.send(b'.\n') def wakeConnections(self): - p = Packet(constants.RES, constants.NOOP, '') + p = Packet(constants.RES, constants.NOOP, b'') for connection in self.active_connections: if connection.state == 'SLEEP': connection.sendPacket(p) @@ -2089,8 +2110,8 @@ class Server(BaseClientServer): unique = None arguments = packet.getArgument(2, True) packet.connection.max_handle += 1 - handle = 'H:%s:%s' % (packet.connection.host, - str(packet.connection.max_handle)) + handle = ('H:%s:%s' % (packet.connection.host, + packet.connection.max_handle)).encode('utf8') job = Job(name, arguments, unique) job.handle = handle job.connection = packet.connection @@ -2119,14 +2140,13 @@ class Server(BaseClientServer): def sendJobAssignUniq(self, connection, job): unique = job.unique if not unique: - unique = '' - data = '%s\x00%s\x00%s\x00%s' % (job.handle, job.name, - unique, job.arguments) + unique = b'' + data = b'\x00'.join((job.handle, job.name, unique, job.arguments)) p = Packet(constants.RES, constants.JOB_ASSIGN_UNIQ, data) connection.sendPacket(p) def sendNoJob(self, connection): - p = Packet(constants.RES, constants.NO_JOB, "") + p = Packet(constants.RES, constants.NO_JOB, b'') connection.sendPacket(p) def handlePreSleep(self, packet): @@ -2194,8 +2214,8 @@ class Server(BaseClientServer): known = 0 running = 0 - numerator = '' - denominator = '' + numerator = b'' + denominator = b'' job = self.jobs.get(handle) if job: known = 1 @@ -2204,10 +2224,10 @@ class Server(BaseClientServer): numerator = job.numerator denominator = job.denominator - data = (handle + '\x00' + - str(known) + '\x00' + - str(running) + '\x00' + - str(numerator) + '\x00' + - str(denominator)) + data = (handle + b'\x00' + + str(known).encode('utf8') + b'\x00' + + str(running).encode('utf8') + b'\x00' + + numerator + b'\x00' + + denominator) p = Packet(constants.RES, constants.STATUS_RES, data) packet.connection.sendPacket(p) diff --git a/gear/constants.py b/gear/constants.py index ad90d03..e10eba4 100644 --- a/gear/constants.py +++ b/gear/constants.py @@ -79,5 +79,5 @@ for i, name in types.items(): globals()[name] = i __doc__ += '\n.. py:data:: %s\n' % name -REQ = '\x00REQ' -RES = '\x00RES' +REQ = b'\x00REQ' +RES = b'\x00RES'