@ -14,7 +14,6 @@
import logging
import os
import Queue
import select
import socket
import struct
@ -24,6 +23,11 @@ import uuid as uuid_module
from gear import constants
try :
import Queue as queue
except ImportError :
import queue as queue
PRECEDENCE_NORMAL = 0
PRECEDENCE_LOW = 1
PRECEDENCE_HIGH = 2
@ -61,6 +65,14 @@ class GearmanError(Exception):
pass
def convert_to_bytes ( data ) :
try :
data = data . encode ( ' utf8 ' )
except AttributeError :
pass
return data
class Task ( object ) :
def __init__ ( self ) :
self . _wait_event = threading . Event ( )
@ -208,7 +220,7 @@ class Connection(object):
if not c :
return None
if admin is None :
if c == ' \x00 ' :
if c == b ' \x00 ' :
admin = False
else :
admin = True
@ -244,15 +256,15 @@ class Connection(object):
This method waits until the echo response has been received or the
timeout has been reached .
: arg str data : The data to request be echoed . If None , a random
unique string will be generated .
: arg bytes data : The data to request be echoed . If None , a random
unique byte string will be generated .
: arg numeric timeout : Number of seconds to wait until the response
is received . If None , wait forever ( default : 30 seconds ) .
: raises TimeoutError : If the timeout is reached before the response
is received .
"""
if data is None :
data = str ( uuid_module . uuid4 ( ) . hex )
data = uuid_module . uuid4 ( ) . hex . encode ( ' utf8 ' )
self . echo_lock . acquire ( )
try :
if data in self . echo_conditions :
@ -302,15 +314,15 @@ class AdminRequest(object):
instantiated dircectly ; a subclass implementing a specific command
must be used instead .
: arg list arguments : A list of string arguments for the command .
: arg list arguments : A list of byte string arguments for the command .
The following instance attributes are available :
* * response * * ( str )
* * response * * ( bytes )
The response from the server .
* * arguments * * ( str )
* * arguments * * ( bytes )
The argument supplied with the constructor .
* * command * * ( str )
* * command * * ( bytes )
The administrative command .
"""
@ -333,15 +345,15 @@ class AdminRequest(object):
def getCommand ( self ) :
cmd = self . command
if self . arguments :
cmd + = ' ' + ' ' . join ( self . arguments )
cmd + = ' \n '
cmd + = b ' ' + b ' ' . join ( self . arguments )
cmd + = b ' \n '
return cmd
def isComplete ( self , data ) :
if ( data [ - 3 : ] == ' \n . \n ' or
data [ - 5 : ] == ' \r \n . \r \n ' or
data == ' . \n ' or
data == ' . \r \n ' ) :
if ( data [ - 3 : ] == b ' \n . \n ' or
data [ - 5 : ] == b ' \r \n . \r \n ' or
data == b ' . \n ' or
data == b ' . \r \n ' ) :
self . response = data
return True
return False
@ -359,7 +371,7 @@ class StatusAdminRequest(AdminRequest):
The response from gearman may be found in the * * response * * attribute .
"""
command = ' status '
command = b ' status '
def __init__ ( self ) :
super ( StatusAdminRequest , self ) . __init__ ( )
@ -370,7 +382,7 @@ class ShowJobsAdminRequest(AdminRequest):
The response from gearman may be found in the * * response * * attribute .
"""
command = ' show jobs '
command = b ' show jobs '
def __init__ ( self ) :
super ( ShowJobsAdminRequest , self ) . __init__ ( )
@ -382,7 +394,7 @@ class ShowUniqueJobsAdminRequest(AdminRequest):
The response from gearman may be found in the * * response * * attribute .
"""
command = ' show unique jobs '
command = b ' show unique jobs '
def __init__ ( self ) :
super ( ShowUniqueJobsAdminRequest , self ) . __init__ ( )
@ -396,13 +408,14 @@ class CancelJobAdminRequest(AdminRequest):
The response from gearman may be found in the * * response * * attribute .
"""
command = ' cancel job '
command = b ' cancel job '
def __init__ ( self , handle ) :
handle = convert_to_bytes ( handle )
super ( CancelJobAdminRequest , self ) . __init__ ( handle )
def isComplete ( self , data ) :
if data [ - 1 ] == ' \n ' :
if data [ - 1 : ] == b ' \n ' :
self . response = data
return True
return False
@ -414,13 +427,13 @@ class VersionAdminRequest(AdminRequest):
The response from gearman may be found in the * * response * * attribute .
"""
command = ' version '
command = b ' version '
def __init__ ( self ) :
super ( VersionAdminRequest , self ) . __init__ ( )
def isComplete ( self , data ) :
if data [ - 1 ] == ' \n ' :
if data [ - 1 : ] == b ' \n ' :
self . response = data
return True
return False
@ -430,22 +443,26 @@ class Packet(object):
""" A data packet received from or to be sent over a
: py : class : ` Connection ` .
: arg str code : The Gearman magic code ( : py : data : ` constants . REQ ` or
: arg bytes code : The Gearman magic code ( : py : data : ` constants . REQ ` or
: py : data : ` constants . RES ` )
: arg str ptype : The packet type ( one of the packet types in constasts ) .
: arg str data : The data portion of the packet .
: arg str connection : The connection on which the packet was received
( optional ) .
: arg bytes ptype : The packet type ( one of the packet types in constasts ) .
: arg bytes data : The data portion of the packet .
: arg Connection connection : The connection on which the packet
was received ( optional ) .
: raises InvalidDataError : If the magic code is unknown .
"""
log = logging . getLogger ( " gear.Packet " )
def __init__ ( self , code , ptype , data , connection = None ) :
if code [ 0 ] != ' \x00 ' :
if not isinstance ( code , bytes ) and not isinstance ( code , bytearray ) :
raise TypeError ( " code must be of type bytes or bytearray " )
if code [ 0 : 1 ] != b ' \x00 ' :
raise InvalidDataError ( " First byte of packet must be 0 " )
self . code = code
self . ptype = ptype
if not isinstance ( data , bytes ) and not isinstance ( data , bytearray ) :
raise TypeError ( " data must be of type bytes or bytearray " )
self . data = data
self . connection = connection
@ -457,15 +474,11 @@ class Packet(object):
""" Return a Gearman wire protocol binary representation of the packet.
: returns : The packet in binary form .
: rtype : str
: rtype : bytes
"""
b = struct . pack ( ' !4sii ' , self . code , self . ptype , len ( self . data ) )
b = bytearray ( b )
if isinstance ( self . data , basestring ) :
data = bytearray ( self . data , ' utf8 ' )
else :
data = self . data
b + = data
b + = self . data
return b
def getArgument ( self , index , last = False ) :
@ -475,7 +488,7 @@ class Packet(object):
: arg bool last : Whether this is the last argument ( and thus
nulls should be ignored )
: returns : The argument value .
: rtype : str
: rtype : bytes
"""
parts = self . data . split ( b ' \x00 ' )
@ -488,7 +501,7 @@ class Packet(object):
this packet .
: returns : The : py : class : ` Job ` for this packet .
: rtype : str
: rtype : Job
: raises UnknownJobError : If the job is not known .
"""
handle = self . getArgument ( 0 )
@ -563,7 +576,7 @@ class BaseClientServer(object):
self . inactive_connections . remove ( conn )
self . active_connections . append ( conn )
self . connections_condition . notifyAll ( )
os . write ( self . wake_write , ' 1 \n ' )
os . write ( self . wake_write , b ' 1 \n ' )
self . connections_condition . release ( )
try :
@ -632,7 +645,7 @@ class BaseClientServer(object):
if fd == self . wake_read :
self . log . debug ( " Woken by pipe " )
while True :
if os . read ( self . wake_read , 1 ) == ' \n ' :
if os . read ( self . wake_read , 1 ) == b ' \n ' :
break
return
conn = conn_dict [ fd ]
@ -877,7 +890,7 @@ class BaseClientServer(object):
self . running = False
self . connections_condition . acquire ( )
self . connections_condition . notifyAll ( )
os . write ( self . wake_write , ' 1 \n ' )
os . write ( self . wake_write , b ' 1 \n ' )
self . connections_condition . release ( )
def _cleanup ( self ) :
@ -1093,6 +1106,7 @@ class Client(BaseClient):
: rtype : bool
"""
tasks = { }
name = convert_to_bytes ( name )
self . broadcast_lock . acquire ( )
try :
@ -1140,10 +1154,10 @@ class Client(BaseClient):
is supplied .
"""
if job . unique is None :
unique = ' '
unique = b ' '
else :
unique = job . unique
data = ' %s \x00 %s \x00 %s ' % ( job . name , unique , job . arguments )
data = b ' \x00 ' . join ( ( job . name , unique , job . arguments ) )
if background :
if precedence == PRECEDENCE_NORMAL :
cmd = constants . SUBMIT_JOB_BG
@ -1418,11 +1432,11 @@ class Worker(BaseClient):
log = logging . getLogger ( " gear.Worker " )
def __init__ ( self , worker_id ) :
self . worker_id = worker_id
self . worker_id = convert_to_bytes ( worker_id )
self . functions = { }
self . job_lock = threading . Lock ( )
self . waiting_for_jobs = 0
self . job_queue = Q ueue. Queue ( )
self . job_queue = q ueue. Queue ( )
super ( Worker , self ) . __init__ ( )
def __repr__ ( self ) :
@ -1437,6 +1451,7 @@ class Worker(BaseClient):
: arg str name : The name of the function to register .
: arg numeric timeout : The timeout value ( optional ) .
"""
name = convert_to_bytes ( name )
self . functions [ name ] = FunctionRecord ( name , timeout )
if timeout :
self . _sendCanDoTimeout ( name , timeout )
@ -1448,6 +1463,7 @@ class Worker(BaseClient):
: arg str name : The name of the function to remove .
"""
name = convert_to_bytes ( name )
del self . functions [ name ]
self . _sendCantDo ( name )
@ -1488,7 +1504,7 @@ class Worker(BaseClient):
def _sendCanDoTimeout ( self , name , timeout ) :
self . broadcast_lock . acquire ( )
try :
data = name + ' \x00 ' + timeout
data = name + b ' \x00 ' + timeout
p = Packet ( constants . REQ , constants . CAN_DO_TIMEOUT , data )
self . broadcast ( p )
finally :
@ -1505,17 +1521,17 @@ class Worker(BaseClient):
def _sendResetAbilities ( self ) :
self . broadcast_lock . acquire ( )
try :
p = Packet ( constants . REQ , constants . RESET_ABILITIES , ' ' )
p = Packet ( constants . REQ , constants . RESET_ABILITIES , b ' ' )
self . broadcast ( p )
finally :
self . broadcast_lock . release ( )
def _sendPreSleep ( self , connection ) :
p = Packet ( constants . REQ , constants . PRE_SLEEP , ' ' )
p = Packet ( constants . REQ , constants . PRE_SLEEP , b ' ' )
self . sendPacket ( p , connection )
def _sendGrabJobUniq ( self , connection = None ) :
p = Packet ( constants . REQ , constants . GRAB_JOB_UNIQ , ' ' )
p = Packet ( constants . REQ , constants . GRAB_JOB_UNIQ , b ' ' )
if connection :
self . sendPacket ( p , connection )
else :
@ -1530,7 +1546,7 @@ class Worker(BaseClient):
super ( Worker , self ) . _onConnect ( conn )
for f in self . functions . values ( ) :
if f . timeout :
data = f . name + ' \x00 ' + f . timeout
data = f . name + b ' \x00 ' + f . timeout
p = Packet ( constants . REQ , constants . CAN_DO_TIMEOUT , data )
else :
p = Packet ( constants . REQ , constants . CAN_DO , f . name )
@ -1580,7 +1596,7 @@ class Worker(BaseClient):
try :
job = self . job_queue . get ( False )
except Q ueue. Empty :
except q ueue. Empty :
job = None
if not job :
@ -1702,7 +1718,7 @@ class Worker(BaseClient):
handle = packet . getArgument ( 0 )
name = packet . getArgument ( 1 )
unique = packet . getArgument ( 2 )
if unique == ' ' :
if unique == b ' ' :
unique = None
arguments = packet . getArgument ( 3 , True )
return self . _handleJobAssignment ( packet , handle , name ,
@ -1729,9 +1745,12 @@ class BaseJob(object):
log = logging . getLogger ( " gear.Job " )
def __init__ ( self , name , arguments , unique = None , handle = None ) :
self . name = name
self . name = convert_to_bytes ( name )
if ( not isinstance ( arguments , bytes ) and
not isinstance ( arguments , bytearray ) ) :
raise TypeError ( " arguments must be of type bytes or bytearray " )
self . arguments = arguments
self . unique = unique
self . unique = convert_to_bytes ( unique )
self . handle = handle
self . connection = None
@ -1744,26 +1763,26 @@ class Job(BaseJob):
""" A job to run or being run by Gearman.
: arg str name : The name of the job .
: arg str arguments : The opaque data blob to be passed to the worker
: arg bytes arguments : The opaque data blob to be passed to the worker
as arguments .
: arg str unique : A string to uniquely identify the job to Gearman
: arg str unique : A byte string to uniquely identify the job to Gearman
( optional ) .
The following instance attributes are available :
* * name * * ( str )
The name of the job .
* * arguments * * ( str )
* * arguments * * ( bytes )
The opaque data blob passed to the worker as arguments .
* * unique * * ( str or None )
The unique ID of the job ( if supplied ) .
* * handle * * ( str or None )
* * handle * * ( bytes or None )
The Gearman job handle . None if no job handle has been received yet .
* * data * * ( list of byte - arrays )
The result data returned from Gearman . Each packet appends an
element to the list . Depending on the nature of the data , the
elements may need to be concatenated before use .
* * exception * * ( str or None )
* * exception * * ( bytes or None )
Exception information returned from Gearman . None if no exception
has been received .
* * warning * * ( bool )
@ -1772,10 +1791,10 @@ class Job(BaseJob):
Whether the job is complete .
* * failure * * ( bool )
Whether the job has failed . Only set when complete is True .
* * numerator * * ( str or None )
* * numerator * * ( bytes or None )
The numerator of the completion ratio reported by the worker .
Only set when a status update is sent by the worker .
* * denominator * * ( str or None )
* * denominator * * ( bytes or None )
The denominator of the completion ratio reported by the
worker . Only set when a status update is sent by the worker .
* * fraction_complete * * ( float or None )
@ -1815,20 +1834,20 @@ class WorkerJob(BaseJob):
: arg str handle : The job handle assigned by gearman .
: arg str name : The name of the job .
: arg str arguments : The opaque data blob passed to the worker
: arg bytes arguments : The opaque data blob passed to the worker
as arguments .
: arg str unique : A string to uniquely identify the job to Gearman
: arg str unique : A byte string to uniquely identify the job to Gearman
( optional ) .
The following instance attributes are available :
* * name * * ( str )
The name of the job .
* * arguments * * ( str )
* * arguments * * ( bytes )
The opaque data blob passed to the worker as arguments .
* * unique * * ( str or None )
The unique ID of the job ( if supplied ) .
* * handle * * ( str )
* * handle * * ( bytes )
The Gearman job handle .
* * connection * * ( : py : class : ` Connection ` or None )
The connection associated with the job . Only set after the job
@ -1840,23 +1859,23 @@ class WorkerJob(BaseJob):
def __init__ ( self , handle , name , arguments , unique = None ) :
super ( WorkerJob , self ) . __init__ ( name , arguments , unique , handle )
def sendWorkData ( self , data = ' ' ) :
def sendWorkData ( self , data = b ' ' ) :
""" Send a WORK_DATA packet to the client.
: arg str data : The data to be sent to the client ( optional ) .
: arg bytes data : The data to be sent to the client ( optional ) .
"""
data = self . handle + ' \x00 ' + data
data = self . handle + b ' \x00 ' + data
p = Packet ( constants . REQ , constants . WORK_DATA , data )
self . connection . sendPacket ( p )
def sendWorkWarning ( self , data = ' ' ) :
def sendWorkWarning ( self , data = b ' ' ) :
""" Send a WORK_WARNING packet to the client.
: arg str data : The data to be sent to the client ( optional ) .
: arg bytes data : The data to be sent to the client ( optional ) .
"""
data = self . handle + ' \x00 ' + data
data = self . handle + b ' \x00 ' + data
p = Packet ( constants . REQ , constants . WORK_WARNING , data )
self . connection . sendPacket ( p )
@ -1870,18 +1889,19 @@ class WorkerJob(BaseJob):
: arg numeric denominator : The denominator of the fraction complete .
"""
data = ( self . handle + ' \x00 ' +
str ( numerator ) + ' \x00 ' + str ( denominator ) )
data = ( self . handle + b ' \x00 ' +
str ( numerator ) . encode ( ' utf8 ' ) + b ' \x00 ' +
str ( denominator ) . encode ( ' utf8 ' ) )
p = Packet ( constants . REQ , constants . WORK_STATUS , data )
self . connection . sendPacket ( p )
def sendWorkComplete ( self , data = ' ' ) :
def sendWorkComplete ( self , data = b ' ' ) :
""" Send a WORK_COMPLETE packet to the client.
: arg str data : The data to be sent to the client ( optional ) .
: arg bytes data : The data to be sent to the client ( optional ) .
"""
data = self . handle + ' \x00 ' + data
data = self . handle + b ' \x00 ' + data
p = Packet ( constants . REQ , constants . WORK_COMPLETE , data )
self . connection . sendPacket ( p )
@ -1891,13 +1911,14 @@ class WorkerJob(BaseJob):
p = Packet ( constants . REQ , constants . WORK_FAIL , self . handle )
self . connection . sendPacket ( p )
def sendWorkException ( self , data = ' ' ) :
def sendWorkException ( self , data = b ' ' ) :
""" Send a WORK_EXCEPTION packet to the client.
: arg str data : The exception data to be sent to the client ( optional ) .
: arg bytes data : The exception data to be sent to the client
( optional ) .
"""
data = self . handle + ' \x00 ' + data
data = self . handle + b ' \x00 ' + data
p = Packet ( constants . REQ , constants . WORK_EXCEPTION , data )
self . connection . sendPacket ( p )
@ -1912,7 +1933,7 @@ class ServerAdminRequest(AdminRequest):
self . connection = connection
def isComplete ( self , data ) :
if data [ - 1 ] == ' \n ' :
if data [ - 1 : ] == b ' \n ' :
self . command = data . strip ( )
return True
return False
@ -1946,7 +1967,7 @@ class Server(BaseClientServer):
""" A simple gearman server implementation for testing
( not for production use ) .
: arg str port : The TCP port on which to listen .
: arg int port : The TCP port on which to listen .
"""
def __init__ ( self , port = 4730 ) :
@ -2005,7 +2026,7 @@ class Server(BaseClientServer):
if fd == self . connect_wake_read :
self . log . debug ( " Accept woken by pipe " )
while True :
if os . read ( self . connect_wake_read , 1 ) == ' \n ' :
if os . read ( self . connect_wake_read , 1 ) == b ' \n ' :
break
return
if event & select . POLLIN :
@ -2016,12 +2037,12 @@ class Server(BaseClientServer):
self . connections_condition . acquire ( )
self . active_connections . append ( conn )
self . connections_condition . notifyAll ( )
os . write ( self . wake_write , ' 1 \n ' )
os . write ( self . wake_write , b ' 1 \n ' )
self . connections_condition . release ( )
def _shutdown ( self ) :
super ( Server , self ) . _shutdown ( )
os . write ( self . connect_wake_write , ' 1 \n ' )
os . write ( self . connect_wake_write , b ' 1 \n ' )
def _cleanup ( self ) :
super ( Server , self ) . _cleanup ( )
@ -2037,9 +2058,9 @@ class Server(BaseClientServer):
self . connections_condition . release ( )
def handleAdminRequest ( self , request ) :
if request . command . startswith ( ' cancel job ' ) :
if request . command . startswith ( b ' cancel job ' ) :
self . handleCancelJob ( request )
elif request . command . startswith ( ' status ' ) :
elif request . command . startswith ( b ' status ' ) :
self . handleStatus ( request )
def handleCancelJob ( self , request ) :
@ -2051,9 +2072,9 @@ class Server(BaseClientServer):
if handle == job . handle :
self . queue . remove ( job )
del self . jobs [ handle ]
request . connection . conn . send ( " OK \n " )
request . connection . conn . send ( b ' OK \n ' )
return
request . connection . conn . send ( " ERR UNKNOWN_JOB \n " )
request . connection . conn . send ( b ' ERR UNKNOWN_JOB \n ' )
def handleStatus ( self , request ) :
functions = { }
@ -2070,13 +2091,13 @@ class Server(BaseClientServer):
for function in connection . functions :
functions [ function ] [ 2 ] + = 1
for name , values in functions . items ( ) :
request . connection . conn . send ( " %s \t %s \t %s \t %s \n " %
request . connection . conn . send ( ( " %s \t %s \t %s \t %s \n " %
( name , values [ 0 ] , values [ 1 ] ,
values [ 2 ] ) )
request . connection . conn . send ( " . \n " )
values [ 2 ] ) ) . encode ( ' utf8 ' ) )
request . connection . conn . send ( b ' . \n ' )
def wakeConnections ( self ) :
p = Packet ( constants . RES , constants . NOOP , ' ' )
p = Packet ( constants . RES , constants . NOOP , b ' ' )
for connection in self . active_connections :
if connection . state == ' SLEEP ' :
connection . sendPacket ( p )
@ -2089,8 +2110,8 @@ class Server(BaseClientServer):
unique = None
arguments = packet . getArgument ( 2 , True )
packet . connection . max_handle + = 1
handle = ' H: %s : %s ' % ( packet . connection . host ,
str ( packet . connection . max_handle ) )
handle = ( ' H: %s : %s ' % ( packet . connection . host ,
packet . connection . max_handle ) ) . encode ( ' utf8 ' )
job = Job ( name , arguments , unique )
job . handle = handle
job . connection = packet . connection
@ -2119,14 +2140,13 @@ class Server(BaseClientServer):
def sendJobAssignUniq ( self , connection , job ) :
unique = job . unique
if not unique :
unique = ' '
data = ' %s \x00 %s \x00 %s \x00 %s ' % ( job . handle , job . name ,
unique , job . arguments )
unique = b ' '
data = b ' \x00 ' . join ( ( job . handle , job . name , unique , job . arguments ) )
p = Packet ( constants . RES , constants . JOB_ASSIGN_UNIQ , data )
connection . sendPacket ( p )
def sendNoJob ( self , connection ) :
p = Packet ( constants . RES , constants . NO_JOB , " " )
p = Packet ( constants . RES , constants . NO_JOB , b ' ' )
connection . sendPacket ( p )
def handlePreSleep ( self , packet ) :
@ -2194,8 +2214,8 @@ class Server(BaseClientServer):
known = 0
running = 0
numerator = ' '
denominator = ' '
numerator = b ' '
denominator = b ' '
job = self . jobs . get ( handle )
if job :
known = 1
@ -2204,10 +2224,10 @@ class Server(BaseClientServer):
numerator = job . numerator
denominator = job . denominator
data = ( handle + ' \x00 ' +
str ( known ) + ' \x00 ' +
str ( running ) + ' \x00 ' +
str ( numerator ) + ' \x00 ' +
str ( denominator ) )
data = ( handle + b ' \x00 ' +
str ( known ) . encode ( ' utf8 ' ) + b ' \x00 ' +
str ( running ) . encode ( ' utf8 ' ) + b ' \x00 ' +
numerator + b ' \x00 ' +
denominator )
p = Packet ( constants . RES , constants . STATUS_RES , data )
packet . connection . sendPacket ( p )