Provide TextJob and TextWorker for convenience
These classes will provide a smoother path from python2 -> python3 when using gear, as it will relieve the need for encoding/decoding. Change-Id: I93bfe33f898294f30a82c0a24a18a081f9752354
This commit is contained in:
parent
6d479a0196
commit
afb9001c28
425
gear/__init__.py
425
gear/__init__.py
|
@ -1426,8 +1426,8 @@ class Client(BaseClient):
|
|||
if job.unique is None:
|
||||
unique = b''
|
||||
else:
|
||||
unique = job.unique
|
||||
data = b'\x00'.join((job.binary_name, unique, job.arguments))
|
||||
unique = job.binary_unique
|
||||
data = b'\x00'.join((job.binary_name, unique, job.binary_arguments))
|
||||
if background:
|
||||
if precedence == PRECEDENCE_NORMAL:
|
||||
cmd = constants.SUBMIT_JOB_BG
|
||||
|
@ -1696,6 +1696,159 @@ class FunctionRecord(object):
|
|||
id(self), self.name, self.timeout)
|
||||
|
||||
|
||||
class BaseJob(object):
|
||||
def __init__(self, name, arguments, unique=None, handle=None):
|
||||
self._name = convert_to_bytes(name)
|
||||
self._validate_arguments(arguments)
|
||||
self._arguments = convert_to_bytes(arguments)
|
||||
self._unique = convert_to_bytes(unique)
|
||||
self.handle = handle
|
||||
self.connection = None
|
||||
|
||||
def _validate_arguments(self, arguments):
|
||||
if (not isinstance(arguments, bytes) and
|
||||
not isinstance(arguments, bytearray)):
|
||||
raise TypeError("arguments must be of type bytes or bytearray")
|
||||
|
||||
@property
|
||||
def arguments(self):
|
||||
return self._arguments
|
||||
|
||||
@arguments.setter
|
||||
def arguments(self, value):
|
||||
self._arguments = value
|
||||
|
||||
@property
|
||||
def unique(self):
|
||||
return self._unique
|
||||
|
||||
@unique.setter
|
||||
def unique(self, value):
|
||||
self._unique = value
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if isinstance(self._name, six.binary_type):
|
||||
return self._name.decode('utf-8')
|
||||
return self._name
|
||||
|
||||
@name.setter
|
||||
def name(self, value):
|
||||
if isinstance(value, six.text_type):
|
||||
value = value.encode('utf-8')
|
||||
self._name = value
|
||||
|
||||
@property
|
||||
def binary_name(self):
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def binary_arguments(self):
|
||||
return self._arguments
|
||||
|
||||
@property
|
||||
def binary_unique(self):
|
||||
return self._unique
|
||||
|
||||
def __repr__(self):
|
||||
return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
|
||||
id(self), self.handle, self.name, self.unique)
|
||||
|
||||
|
||||
class WorkerJob(BaseJob):
|
||||
"""A job that Gearman has assigned to a Worker. Not intended to
|
||||
be instantiated directly, but rather returned by
|
||||
:py:meth:`Worker.getJob`.
|
||||
|
||||
:arg str handle: The job handle assigned by gearman.
|
||||
:arg str name: The name of the job.
|
||||
:arg bytes arguments: The opaque data blob passed to the worker
|
||||
as arguments.
|
||||
:arg str unique: A byte string to uniquely identify the job to Gearman
|
||||
(optional).
|
||||
|
||||
The following instance attributes are available:
|
||||
|
||||
**name** (str)
|
||||
The name of the job. Assumed to be utf-8.
|
||||
**arguments** (bytes)
|
||||
The opaque data blob passed to the worker as arguments.
|
||||
**unique** (str or None)
|
||||
The unique ID of the job (if supplied).
|
||||
**handle** (bytes)
|
||||
The Gearman job handle.
|
||||
**connection** (:py:class:`Connection` or None)
|
||||
The connection associated with the job. Only set after the job
|
||||
has been submitted to a Gearman server.
|
||||
"""
|
||||
|
||||
def __init__(self, handle, name, arguments, unique=None):
|
||||
super(WorkerJob, self).__init__(name, arguments, unique, handle)
|
||||
|
||||
def sendWorkData(self, data=b''):
|
||||
"""Send a WORK_DATA packet to the client.
|
||||
|
||||
:arg bytes data: The data to be sent to the client (optional).
|
||||
"""
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_DATA, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkWarning(self, data=b''):
|
||||
"""Send a WORK_WARNING packet to the client.
|
||||
|
||||
:arg bytes data: The data to be sent to the client (optional).
|
||||
"""
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_WARNING, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkStatus(self, numerator, denominator):
|
||||
"""Send a WORK_STATUS packet to the client.
|
||||
|
||||
Sends a numerator and denominator that together represent the
|
||||
fraction complete of the job.
|
||||
|
||||
:arg numeric numerator: The numerator of the fraction complete.
|
||||
:arg numeric denominator: The denominator of the fraction complete.
|
||||
"""
|
||||
|
||||
data = (self.handle + b'\x00' +
|
||||
str(numerator).encode('utf8') + b'\x00' +
|
||||
str(denominator).encode('utf8'))
|
||||
p = Packet(constants.REQ, constants.WORK_STATUS, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkComplete(self, data=b''):
|
||||
"""Send a WORK_COMPLETE packet to the client.
|
||||
|
||||
:arg bytes data: The data to be sent to the client (optional).
|
||||
"""
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkFail(self):
|
||||
"Send a WORK_FAIL packet to the client."
|
||||
|
||||
p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkException(self, data=b''):
|
||||
"""Send a WORK_EXCEPTION packet to the client.
|
||||
|
||||
:arg bytes data: The exception data to be sent to the client
|
||||
(optional).
|
||||
"""
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
|
||||
class Worker(BaseClient):
|
||||
"""A Gearman worker.
|
||||
|
||||
|
@ -1708,6 +1861,8 @@ class Worker(BaseClient):
|
|||
is deprecated, use client_id instead.
|
||||
"""
|
||||
|
||||
job_class = WorkerJob
|
||||
|
||||
def __init__(self, client_id=None, worker_id=None):
|
||||
if not client_id or worker_id:
|
||||
raise Exception("A client_id must be provided")
|
||||
|
@ -2027,7 +2182,7 @@ class Worker(BaseClient):
|
|||
arguments, unique)
|
||||
|
||||
def _handleJobAssignment(self, packet, handle, name, arguments, unique):
|
||||
job = WorkerJob(handle, name, arguments, unique)
|
||||
job = self.job_class(handle, name, arguments, unique)
|
||||
job.connection = packet.connection
|
||||
|
||||
self.job_lock.acquire()
|
||||
|
@ -2043,38 +2198,6 @@ class Worker(BaseClient):
|
|||
self.job_lock.release()
|
||||
|
||||
|
||||
class BaseJob(object):
|
||||
def __init__(self, name, arguments, unique=None, handle=None):
|
||||
self._name = convert_to_bytes(name)
|
||||
if (not isinstance(arguments, bytes) and
|
||||
not isinstance(arguments, bytearray)):
|
||||
raise TypeError("arguments must be of type bytes or bytearray")
|
||||
self.arguments = arguments
|
||||
self.unique = convert_to_bytes(unique)
|
||||
self.handle = handle
|
||||
self.connection = None
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
if isinstance(self._name, six.binary_type):
|
||||
return self._name.decode('utf-8')
|
||||
return self._name
|
||||
|
||||
@name.setter
|
||||
def name(self, value):
|
||||
if isinstance(value, six.text_type):
|
||||
value = value.encode('utf-8')
|
||||
self._name = value
|
||||
|
||||
@property
|
||||
def binary_name(self):
|
||||
return self._name
|
||||
|
||||
def __repr__(self):
|
||||
return '<gear.Job 0x%x handle: %s name: %s unique: %s>' % (
|
||||
id(self), self.handle, self.name, self.unique)
|
||||
|
||||
|
||||
class Job(BaseJob):
|
||||
"""A job to run or being run by Gearman.
|
||||
|
||||
|
@ -2097,7 +2220,9 @@ class Job(BaseJob):
|
|||
**data** (list of byte-arrays)
|
||||
The result data returned from Gearman. Each packet appends an
|
||||
element to the list. Depending on the nature of the data, the
|
||||
elements may need to be concatenated before use.
|
||||
elements may need to be concatenated before use. This is returned
|
||||
as a snapshot copy of the data to prevent accidental attempts at
|
||||
modification which will be lost.
|
||||
**exception** (bytes or None)
|
||||
Exception information returned from Gearman. None if no exception
|
||||
has been received.
|
||||
|
@ -2127,10 +2252,12 @@ class Job(BaseJob):
|
|||
has been submitted to a Gearman server.
|
||||
"""
|
||||
|
||||
data_type = list
|
||||
|
||||
def __init__(self, name, arguments, unique=None):
|
||||
super(Job, self).__init__(name, arguments, unique)
|
||||
self.data = []
|
||||
self.exception = None
|
||||
self._data = self.data_type()
|
||||
self._exception = None
|
||||
self.warning = False
|
||||
self.complete = False
|
||||
self.failure = False
|
||||
|
@ -2140,99 +2267,181 @@ class Job(BaseJob):
|
|||
self.known = None
|
||||
self.running = None
|
||||
|
||||
@property
|
||||
def binary_data(self):
|
||||
for value in self._data:
|
||||
if isinstance(value, six.text_type):
|
||||
value = value.encode('utf-8')
|
||||
yield value
|
||||
|
||||
class WorkerJob(BaseJob):
|
||||
"""A job that Gearman has assigned to a Worker. Not intended to
|
||||
be instantiated directly, but rather returned by
|
||||
:py:meth:`Worker.getJob`.
|
||||
@property
|
||||
def data(self):
|
||||
return self._data
|
||||
|
||||
:arg str handle: The job handle assigned by gearman.
|
||||
:arg str name: The name of the job.
|
||||
:arg bytes arguments: The opaque data blob passed to the worker
|
||||
as arguments.
|
||||
:arg str unique: A byte string to uniquely identify the job to Gearman
|
||||
(optional).
|
||||
@data.setter
|
||||
def data(self, value):
|
||||
if not isinstance(value, self.data_type):
|
||||
raise ValueError(
|
||||
"data attribute must be {}".format(self.data_type))
|
||||
self._data = value
|
||||
|
||||
The following instance attributes are available:
|
||||
@property
|
||||
def exception(self):
|
||||
return self._exception
|
||||
|
||||
@exception.setter
|
||||
def exception(self, value):
|
||||
self._data = value
|
||||
|
||||
|
||||
class TextJobArguments(object):
|
||||
"""Assumes utf-8 arguments in addition to name
|
||||
|
||||
If one is always dealing in valid utf-8, using this job class relieves one
|
||||
of the need to encode/decode constantly."""
|
||||
|
||||
def _validate_arguments(self, arguments):
|
||||
pass
|
||||
|
||||
@property
|
||||
def arguments(self):
|
||||
args = self._arguments
|
||||
if isinstance(args, six.binary_type):
|
||||
return args.decode('utf-8')
|
||||
return args
|
||||
|
||||
@arguments.setter
|
||||
def arguments(self, value):
|
||||
if not isinstance(value, six.binary_type):
|
||||
value = value.encode('utf-8')
|
||||
self._arguments = value
|
||||
|
||||
|
||||
class TextJobUnique(object):
|
||||
"""Assumes utf-8 unique
|
||||
|
||||
If one is always dealing in valid utf-8, using this job class relieves one
|
||||
of the need to encode/decode constantly."""
|
||||
|
||||
@property
|
||||
def unique(self):
|
||||
unique = self._unique
|
||||
if isinstance(unique, six.binary_type):
|
||||
return unique.decode('utf-8')
|
||||
return unique
|
||||
|
||||
@unique.setter
|
||||
def unique(self, value):
|
||||
if not isinstance(value, six.binary_type):
|
||||
value = value.encode('utf-8')
|
||||
self._unique = value
|
||||
|
||||
|
||||
class TextList(list):
|
||||
def append(self, x):
|
||||
if isinstance(x, six.binary_type):
|
||||
x = x.decode('utf-8')
|
||||
super(TextList, self).append(x)
|
||||
|
||||
def extend(self, iterable):
|
||||
def _iter():
|
||||
for value in iterable:
|
||||
if isinstance(value, six.binary_type):
|
||||
yield value.decode('utf-8')
|
||||
else:
|
||||
yield value
|
||||
super(TextList, self).extend(_iter)
|
||||
|
||||
def insert(self, i, x):
|
||||
if isinstance(x, six.binary_type):
|
||||
x = x.decode('utf-8')
|
||||
super(TextList, self).insert(i, x)
|
||||
|
||||
|
||||
class TextJob(TextJobArguments, TextJobUnique, Job):
|
||||
""" Sends and receives UTF-8 arguments and data.
|
||||
|
||||
Use this instead of Job when you only expect to send valid UTF-8 through
|
||||
gearman. It will automatically encode arguments and work data as UTF-8, and
|
||||
any jobs fetched from this worker will have their arguments and data
|
||||
decoded assuming they are valid UTF-8, and thus return strings.
|
||||
|
||||
Attributes and method signatures are thes ame as Job except as noted here:
|
||||
|
||||
** arguments ** (str) This will be returned as a string.
|
||||
** data ** (tuple of str) This will be returned as a tuble of strings.
|
||||
|
||||
**name** (str)
|
||||
The name of the job. Assumed to be utf-8.
|
||||
**arguments** (bytes)
|
||||
The opaque data blob passed to the worker as arguments.
|
||||
**unique** (str or None)
|
||||
The unique ID of the job (if supplied).
|
||||
**handle** (bytes)
|
||||
The Gearman job handle.
|
||||
**connection** (:py:class:`Connection` or None)
|
||||
The connection associated with the job. Only set after the job
|
||||
has been submitted to a Gearman server.
|
||||
"""
|
||||
|
||||
def __init__(self, handle, name, arguments, unique=None):
|
||||
super(WorkerJob, self).__init__(name, arguments, unique, handle)
|
||||
data_type = TextList
|
||||
|
||||
def sendWorkData(self, data=b''):
|
||||
@property
|
||||
def exception(self):
|
||||
exception = self._exception
|
||||
if isinstance(exception, six.binary_type):
|
||||
return exception.decode('utf-8')
|
||||
return exception
|
||||
|
||||
@exception.setter
|
||||
def exception(self, value):
|
||||
if not isinstance(value, six.binary_type):
|
||||
value = value.encode('utf-8')
|
||||
self._exception = value
|
||||
|
||||
|
||||
class TextWorkerJob(TextJobArguments, TextJobUnique, WorkerJob):
|
||||
""" Sends and receives UTF-8 arguments and data.
|
||||
|
||||
See TextJob. sendWorkData and sendWorkWarning accept strings
|
||||
and will encode them as UTF-8.
|
||||
"""
|
||||
def sendWorkData(self, data=''):
|
||||
"""Send a WORK_DATA packet to the client.
|
||||
|
||||
:arg bytes data: The data to be sent to the client (optional).
|
||||
:arg str data: The data to be sent to the client (optional).
|
||||
"""
|
||||
if isinstance(data, six.text_type):
|
||||
data = data.encode('utf8')
|
||||
return super(TextWorkerJob, self).sendWorkData(data)
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_DATA, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkWarning(self, data=b''):
|
||||
def sendWorkWarning(self, data=''):
|
||||
"""Send a WORK_WARNING packet to the client.
|
||||
|
||||
:arg bytes data: The data to be sent to the client (optional).
|
||||
:arg str data: The data to be sent to the client (optional).
|
||||
"""
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_WARNING, data)
|
||||
self.connection.sendPacket(p)
|
||||
if isinstance(data, six.text_type):
|
||||
data = data.encode('utf8')
|
||||
return super(TextWorkerJob, self).sendWorkWarning(data)
|
||||
|
||||
def sendWorkStatus(self, numerator, denominator):
|
||||
"""Send a WORK_STATUS packet to the client.
|
||||
|
||||
Sends a numerator and denominator that together represent the
|
||||
fraction complete of the job.
|
||||
|
||||
:arg numeric numerator: The numerator of the fraction complete.
|
||||
:arg numeric denominator: The denominator of the fraction complete.
|
||||
"""
|
||||
|
||||
data = (self.handle + b'\x00' +
|
||||
str(numerator).encode('utf8') + b'\x00' +
|
||||
str(denominator).encode('utf8'))
|
||||
p = Packet(constants.REQ, constants.WORK_STATUS, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkComplete(self, data=b''):
|
||||
def sendWorkComplete(self, data=''):
|
||||
"""Send a WORK_COMPLETE packet to the client.
|
||||
|
||||
:arg bytes data: The data to be sent to the client (optional).
|
||||
:arg str data: The data to be sent to the client (optional).
|
||||
"""
|
||||
if isinstance(data, six.text_type):
|
||||
data = data.encode('utf8')
|
||||
return super(TextWorkerJob, self).sendWorkComplete(data)
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_COMPLETE, data)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkFail(self):
|
||||
"Send a WORK_FAIL packet to the client."
|
||||
|
||||
p = Packet(constants.REQ, constants.WORK_FAIL, self.handle)
|
||||
self.connection.sendPacket(p)
|
||||
|
||||
def sendWorkException(self, data=b''):
|
||||
def sendWorkException(self, data=''):
|
||||
"""Send a WORK_EXCEPTION packet to the client.
|
||||
|
||||
:arg bytes data: The exception data to be sent to the client
|
||||
(optional).
|
||||
:arg str data: The data to be sent to the client (optional).
|
||||
"""
|
||||
|
||||
data = self.handle + b'\x00' + data
|
||||
p = Packet(constants.REQ, constants.WORK_EXCEPTION, data)
|
||||
self.connection.sendPacket(p)
|
||||
if isinstance(data, six.text_type):
|
||||
data = data.encode('utf8')
|
||||
return super(TextWorkerJob, self).sendWorkException(data)
|
||||
|
||||
|
||||
class TextWorker(Worker):
|
||||
""" Sends and receives UTF-8 only.
|
||||
|
||||
See TextJob.
|
||||
|
||||
"""
|
||||
|
||||
job_class = TextWorkerJob
|
||||
|
||||
|
||||
class BaseBinaryJob(object):
|
||||
|
@ -3153,7 +3362,7 @@ class Server(BaseClientServer):
|
|||
self.sendNoJob(packet.connection)
|
||||
|
||||
def sendJobAssignUniq(self, connection, job):
|
||||
unique = job.unique
|
||||
unique = job.binary_unique
|
||||
if not unique:
|
||||
unique = b''
|
||||
data = b'\x00'.join((job.handle, job.name, unique, job.arguments))
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from OpenSSL import crypto
|
||||
import fixtures
|
||||
|
@ -149,5 +150,79 @@ class TestFunctional(tests.BaseTestCase):
|
|||
self.assertEqual('test', workerjob.name)
|
||||
|
||||
|
||||
class TestFunctionalText(tests.BaseTestCase):
|
||||
def setUp(self):
|
||||
super(TestFunctionalText, self).setUp()
|
||||
self.server = gear.Server(0)
|
||||
self.client = gear.Client('client')
|
||||
self.worker = gear.TextWorker('worker')
|
||||
self.client.addServer('127.0.0.1', self.server.port)
|
||||
self.worker.addServer('127.0.0.1', self.server.port)
|
||||
self.client.waitForServer()
|
||||
self.worker.waitForServer()
|
||||
|
||||
def test_text_job(self):
|
||||
self.worker.registerFunction('test')
|
||||
|
||||
for jobcount in range(2):
|
||||
job = gear.TextJob('test', 'testdata')
|
||||
self.client.submitJob(job)
|
||||
self.assertNotEqual(job.handle, None)
|
||||
|
||||
workerjob = self.worker.getJob()
|
||||
self.assertEqual(workerjob.handle, job.handle)
|
||||
self.assertEqual(workerjob.arguments, 'testdata')
|
||||
workerjob.sendWorkData('workdata')
|
||||
workerjob.sendWorkComplete()
|
||||
|
||||
for count in iterate_timeout(30, "job completion"):
|
||||
if job.complete:
|
||||
break
|
||||
self.assertTrue(job.complete)
|
||||
self.assertEqual(job.data, ['workdata'])
|
||||
|
||||
def test_text_job_unique(self):
|
||||
self.worker.registerFunction('test')
|
||||
|
||||
for jobcount in range(2):
|
||||
jobunique = uuid.uuid4().hex
|
||||
job = gear.TextJob('test', 'testdata', unique=jobunique)
|
||||
self.client.submitJob(job)
|
||||
self.assertNotEqual(job.handle, None)
|
||||
|
||||
workerjob = self.worker.getJob()
|
||||
self.assertEqual(workerjob.handle, job.handle)
|
||||
self.assertEqual(workerjob.arguments, 'testdata')
|
||||
workerjob.sendWorkData('workdata')
|
||||
workerjob.sendWorkComplete()
|
||||
|
||||
for count in iterate_timeout(30, "job completion"):
|
||||
if job.complete:
|
||||
break
|
||||
self.assertTrue(job.complete)
|
||||
self.assertEqual(job.data, ['workdata'])
|
||||
self.assertEqual(job.unique, jobunique)
|
||||
self.assertEqual(workerjob.unique, jobunique)
|
||||
|
||||
def test_text_job_exception(self):
|
||||
self.worker.registerFunction('test')
|
||||
|
||||
for jobcount in range(2):
|
||||
job = gear.TextJob('test', 'testdata')
|
||||
self.client.submitJob(job)
|
||||
self.assertNotEqual(job.handle, None)
|
||||
|
||||
workerjob = self.worker.getJob()
|
||||
self.assertEqual(workerjob.handle, job.handle)
|
||||
self.assertEqual(workerjob.arguments, 'testdata')
|
||||
workerjob.sendWorkException('work failed')
|
||||
|
||||
for count in iterate_timeout(30, "job completion"):
|
||||
if job.complete:
|
||||
break
|
||||
self.assertTrue(job.complete)
|
||||
self.assertEqual(job.exception, 'work failed')
|
||||
|
||||
|
||||
def load_tests(loader, in_tests, pattern):
|
||||
return testscenarios.load_tests_apply_scenarios(loader, in_tests, pattern)
|
||||
|
|
Loading…
Reference in New Issue