diff --git a/samples/gtaskqueue_sample/README b/samples/gtaskqueue_sample/README new file mode 100644 index 0000000..6535127 --- /dev/null +++ b/samples/gtaskqueue_sample/README @@ -0,0 +1,46 @@ +This acts as a sample as well as commandline tool for accessing Google TaskQueue +APIs. + +Installation +============ + +To install, simply say + + $ python setup.py install --record=files.txt + +to install the files and record what files are installed in files.txt. +Make sure that you have already installed the google-python-client-libraries +using the setup.py in the toplevel directory. + +You may need root priveleges for install. + +Running +======= +This sample provides following: +1. gtaskqueue: This works as a command-line tool to access Google TaskQueue +API. You can perform various operations on taskqueues such as leastask, +getask, listtasks, deletetask, getqueue. +Example usage: + i. To lease a task + gtaskqueue leasetask --taskqueue_name= + -lease_secs=30 --project_name= + ii. To get stats on a queue + gtaskqueue getqueue --taskqueue_name= + --project_name= --get_stats + +2. gtaskqueue_puller: This works as a worker to continuously pull tasks enqueued +by your app,perform the task and the post the output back to your app. +Example Usage: + gtaskqueue_puller --taskqueue_name= + -lease_secs=30 --project_name= + --executable_binary=”cat” --output_url= + +Third Party Libraries +===================== + +These libraries will be installed when you install the client library: + +http://code.google.com/p/httplib2 +http://code.google.com/p/uri-templates +http://github.com/simplegeo/python-oauth2 diff --git a/samples/gtaskqueue_sample/gtaskqueue/client_task.py b/samples/gtaskqueue_sample/gtaskqueue/client_task.py new file mode 100644 index 0000000..5214e81 --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/client_task.py @@ -0,0 +1,334 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Class to encapsulate task related information and methods on task_puller.""" + + + +import base64 +import oauth2 as oauth +import os +import subprocess +import tempfile +import time +import urllib2 +from apiclient.errors import HttpError +from gtaskqueue.taskqueue_logger import logger +import gflags as flags + + +FLAGS = flags.FLAGS +flags.DEFINE_string( + 'executable_binary', + '/bin/cat', + 'path of the binary to be executed') +flags.DEFINE_string( + 'output_url', + '', + 'url to which output is posted. The url must include param name, ' + 'value for which is populated with task_id from puller while posting ' + 'the data. Format of output url is absolute url which handles the' + 'post request from task queue puller.' + '(Eg: "http://taskpuller.appspot.com/taskdata?name=").' + 'The Param value is always the task_id. The handler for this post' + 'should be able to associate the task with its id and take' + 'appropriate action. Use the appengine_access_token.py tool to' + 'generate the token and store it in a file before you start.') +flags.DEFINE_string( + 'appengine_access_token_file', + None, + 'File containing an Appengine Access token, if any. If present this' + 'token is added to the output_url request, so that the output_url can' + 'be an authenticated end-point. Use the appengine_access_token.py tool' + 'to generate the token and store it in a file before you start.') +flags.DEFINE_float( + 'task_timeout_secs', + '3600', + 'timeout to kill the task') + + +class ClientTaskInitError(Exception): + """Raised when initialization of client task fails.""" + + def __init__(self, task_id, error_str): + Exception.__init__(self) + self.task_id = task_id + self.error_str = error_str + + def __str__(self): + return ('Error initializing task "%s". Error details "%s". ' + % (self.task_id, self.error_str)) + + +class ClientTask(object): + """Class to encapsulate task information pulled by taskqueue_puller module. + + This class is responsible for creating an independent client task object by + taking some information from lease response task object. It encapsulates + methods responsible for spawning an independent subprocess for executing + the task, tracking the status of the task and also deleting the task from + taskqeueue when completed. It also has the functionality to give the output + back to the application by posting to the specified url. + """ + + def __init__(self, task): + self._task = task + self._process = None + self._output_file = None + + # Class method that caches the Appengine Access Token if any + @classmethod + def get_access_token(cls): + if not FLAGS.appengine_access_token_file: + return None + if not _access_token: + fhandle = open(FLAGS.appengine_access_token_file, 'rb') + _access_token = oauth.Token.from_string(fhandle.read()) + fhandle.close() + return _access_token + + def init(self): + """Extracts information from task object and intializes processing. + + Extracts id and payload from task object, decodes the payload and puts + it in input file. After this, it spawns a subprocess to execute the + task. + + Returns: + True if everything till task execution starts fine. + False if anything goes wrong in initialization of task execution. + """ + try: + self.task_id = self._task.get('id') + self._payload = self._decode_base64_payload( + self._task.get('payloadBase64')) + self._payload_file = self._dump_payload_to_file() + self._start_task_execution() + return True + except ClientTaskInitError, ctie: + logger.error(str(ctie)) + return False + + def _decode_base64_payload(self, encoded_str): + """Method to decode payload encoded in base64.""" + try: + # If the payload is empty, do not try to decode it. Payload usually + # not expected to be empty and hence log a warning and then + # continue. + if encoded_str: + decoded_str = base64.urlsafe_b64decode( + encoded_str.encode('utf-8')) + return decoded_str + else: + logger.warn('Empty paylaod for task %s' % self.task_id) + return '' + except base64.binascii.Error, berror: + logger.error('Error decoding payload for task %s. Error details %s' + % (self.task_id, str(berror))) + raise ClientTaskInitError(self.task_id, 'Error decoding payload') + # Generic catch block to avoid crashing of puller due to some bad + # encoding issue wih payload of any task. + except: + raise ClientTaskInitError(self.task_id, 'Error decoding payload') + + def _dump_payload_to_file(self): + """Method to write input extracted from payload to a temporary file.""" + try: + (fd, fname) = tempfile.mkstemp() + f = os.fdopen(fd, 'w') + f.write(self._payload) + f.close() + return fname + except OSError: + logger.error('Error dumping payload %s. Error details %s' % + (self.task_id, str(OSError))) + raise ClientTaskInitError(self.task_id, 'Error dumping payload') + + def _get_input_file(self): + return self._payload_file + + def _post_output(self): + """Posts the outback back to specified url in the form of a byte + array. + + It reads the output generated by the task as a byte-array. It posts the + response to specified url appended with the taskId. The application + using the taskqueue must have a handler to handle the data being posted + from puller. Format of body of response object is byte-array to make + the it genric for any kind of output generated. + + Returns: + True/False based on post status. + """ + if FLAGS.output_url: + try: + f = open(self._get_output_file(), 'rb') + body = f.read() + f.close() + url = FLAGS.output_url + self.task_id + logger.debug('Posting data to url %s' % url) + headers = {'Content-Type': 'byte-array'} + # Add an access token to the headers if specified. + # This enables the output_url to be authenticated and not open. + access_token = ClientTask.get_access_token() + if access_token: + consumer = oauth.Consumer('anonymous', 'anonymous') + oauth_req = oauth.Request.from_consumer_and_token( + consumer, + token=access_token, + http_url=url) + headers.update(oauth_req.to_header()) + # TODO: Use httplib instead of urllib for consistency. + req = urllib2.Request(url, body, headers) + urllib2.urlopen(req) + except ValueError: + logger.error('Error posting data back %s. Error details %s' + % (self.task_id, str(ValueError))) + return False + except Exception: + logger.error('Exception while posting data back %s. Error' + 'details %s' % (self.task_id, str(Exception))) + return False + return True + + def _get_output_file(self): + """Returns the output file if it exists, else creates it and returns + it.""" + if not self._output_file: + (_, self._output_file) = tempfile.mkstemp() + return self._output_file + + def get_task_id(self): + return self.task_id + + def _start_task_execution(self): + """Method to spawn subprocess to execute the tasks. + + This method splits the commands/executable_binary to desired arguments + format for Popen API. It appends input and output files to the + arguments. It is assumed that commands/executable_binary expects input + and output files as first and second positional parameters + respectively. + """ + # TODO: Add code to handle the cleanly shutdown when a process is killed + # by Ctrl+C. + try: + cmdline = FLAGS.executable_binary.split(' ') + cmdline.append(self._get_input_file()) + cmdline.append(self._get_output_file()) + self._process = subprocess.Popen(cmdline) + self.task_start_time = time.time() + except OSError: + logger.error('Error creating subprocess %s. Error details %s' + % (self.task_id, str(OSError))) + self._cleanup() + raise ClientTaskInitError(self.task_id, + 'Error creating subprocess') + except ValueError: + logger.error('Invalid arguments while executing task ', + self.task_id) + self._cleanup() + raise ClientTaskInitError(self.task_id, + 'Invalid arguments while executing task') + + def is_completed(self, task_api): + """Method to check if task has finished executing. + + This is responsible for checking status of task execution. If the task + has already finished executing, it deletes the task from the task + queue. If the task has been running since long time then it assumes + that there is high proabbility that it is dfunct and hence kills the + corresponding subprocess. In this case, task had not completed + successfully and hence we do not delete it form the taskqueue. In above + two cases, task completion status is returned as true since there is + nothing more to run in the task. In all other cases, task is still + running and hence we return false as completion status. + + Args: + task_api: handle for taskqueue api collection. + + Returns: + Task completion status (True/False) + """ + status = False + try: + task_status = self._process.poll() + if task_status == 0: + status = True + if self._post_output(): + self._delete_task_from_queue(task_api) + self._cleanup() + elif self._has_timedout(): + status = True + self._kill_subprocess() + except OSError: + logger.error('Error during polling status of task %s, Error ' + 'details %s' % (self.task_id, str(OSError))) + return status + + def _cleanup(self): + """Cleans up temporary input/output files used in task execution.""" + try: + if os.path.exists(self._get_input_file()): + os.remove(self._get_input_file()) + if os.path.exists(self._get_output_file()): + os.remove(self._get_output_file()) + except OSError: + logger.error('Error during file cleanup for task %s. Error' + 'details %s' % (self.task_id, str(OSError))) + + def _delete_task_from_queue(self, task_api): + """Method to delete the task from the taskqueue. + + First, it tries to post the output back to speified url. On successful + post, the task is deleted from taskqueue since the task has produced + expected output. If the post was unsuccessful, the task is not deleted + form the tskqueue since the expected output has yet not reached the + application. In either case cleanup is performed on the task. + + Args: + task_api: handle for taskqueue api collection. + + Returns: + Delete status (True/False) + """ + + try: + delete_request = task_api.tasks().delete( + project=FLAGS.project_name, + taskqueue=FLAGS.taskqueue_name, + task=self.task_id) + delete_request.execute() + except HttpError, http_error: + logger.error('Error deleting task %s from taskqueue.' + 'Error details %s' + % (self.task_id, str(http_error))) + + def _has_timedout(self): + """Checks if task has been running since long and has timedout.""" + if (time.time() - self.task_start_time) > FLAGS.task_timeout_secs: + return True + else: + return False + + def _kill_subprocess(self): + """Kills the process after cleaning up the task.""" + self._cleanup() + try: + self._process.kill() + logger.info('Trying to kill task %s, since it has been running ' + 'for long' % self.task_id) + except OSError: + logger.error('Error killing task %s. Error details %s' + % (self.task_id, str(OSError))) diff --git a/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token b/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token new file mode 100644 index 0000000..eeef137 --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tool to get an Access Token to access an auth protected Appengine end point. + +This tool talks to the appengine end point, and gets an Access Token that is +stored in a file. This token can be used by a tool to do authorized access to +an appengine end point. +""" + + + +from google.apputils import app +import gflags as flags +import httplib2 +import oauth2 as oauth +import time + +FLAGS = flags.FLAGS + +flags.DEFINE_string( + 'appengine_host', + None, + 'Appengine Host for whom we are trying to get an access token') +flags.DEFINE_string( + 'access_token_file', + None, + 'The file where the access token is stored') + + +def get_access_token(): + if not FLAGS.appengine_host: + print('must supply the appengine host') + exit(1) + + # setup + server = FLAGS.appengine_host + request_token_url = server + '/_ah/OAuthGetRequestToken' + authorization_url = server + '/_ah/OAuthAuthorizeToken' + access_token_url = server + '/_ah/OAuthGetAccessToken' + consumer = oauth.Consumer('anonymous', 'anonymous') + signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() + + # The Http client that will be used to make the requests. + h = httplib2.Http() + + # get request token + print '* Obtain a request token ...' + parameters = {} + # We dont have a callback server, we're going to use the browser to + # authorize. + + #TODO: Add check for 401 etc + parameters['oauth_callback'] = 'oob' + oauth_req1 = oauth.Request.from_consumer_and_token( + consumer, http_url=request_token_url, parameters=parameters) + oauth_req1.sign_request(signature_method_hmac_sha1, consumer, None) + print 'Request headers: %s' % str(oauth_req1.to_header()) + response, content = h.request(oauth_req1.to_url(), 'GET') + token = oauth.Token.from_string(content) + print 'GOT key: %s secret:%s' % (str(token.key), str(token.secret)) + + print '* Authorize the request token ...' + oauth_req2 = oauth.Request.from_token_and_callback( + token=token, callback='oob', http_url=authorization_url) + print 'Please run this URL in a browser and paste the token back here' + print oauth_req2.to_url() + verification_code = raw_input('Enter verification code: ').strip() + token.set_verifier(verification_code) + + # get access token + print '* Obtain an access token ...' + oauth_req3 = oauth.Request.from_consumer_and_token( + consumer, token=token, http_url=access_token_url) + oauth_req3.sign_request(signature_method_hmac_sha1, consumer, token) + print 'Request headers: %s' % str(oauth_req3.to_header()) + response, content = h.request(oauth_req3.to_url(), 'GET') + access_token = oauth.Token.from_string(content) + print 'Access Token key: %s secret:%s' % (str(access_token.key), + str(access_token.secret)) + + # Save the token to a file if its specified. + if FLAGS.access_token_file: + fhandle = open(FLAGS.access_token_file, 'w') + fhandle.write(access_token.to_string()) + fhandle.close() + + # Example : access some protected resources + print '* Checking the access token against protected resources...' + # Assumes that the server + "/" is protected. + test_url = server + "/" + oauth_req4 = oauth.Request.from_consumer_and_token(consumer, + token=token, + http_url=test_url) + oauth_req4.sign_request(signature_method_hmac_sha1, consumer, token) + resp, content = h.request(test_url, "GET", headers=oauth_req4.to_header()) + print resp + print content + + +def main(argv): + get_access_token() + +if __name__ == '__main__': + app.run() diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue new file mode 100644 index 0000000..ce68ef1 --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Command line tool for interacting with Google TaskQueue API.""" + +__version__ = '0.0.1' + + +import logging + +from gtaskqueue import task_cmds +from gtaskqueue import taskqueue_cmds + +from google.apputils import appcommands +import gflags as flags + +LOG_LEVELS = [logging.DEBUG, + logging.INFO, + logging.WARNING, + logging.CRITICAL] +LOG_LEVEL_NAMES = map(logging.getLevelName, LOG_LEVELS) +FLAGS = flags.FLAGS + +flags.DEFINE_enum( + 'log_level', + logging.getLevelName(logging.WARNING), + LOG_LEVEL_NAMES, + 'Logging output level.') + + +def main(unused_argv): + log_level_map = dict( + [(logging.getLevelName(level), level) for level in LOG_LEVELS]) + logging.getLogger().setLevel(log_level_map[FLAGS.log_level]) + taskqueue_cmds.add_commands() + task_cmds.add_commands() + +if __name__ == '__main__': + appcommands.Run() diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller new file mode 100644 index 0000000..f53181a --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller @@ -0,0 +1,348 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Module to pull tasks from TaskQueues and execute them. + +This module does the following in an infinite loop. +1. Connects to Task API (of TaskQueues API collection) to request lease on + certain number of tasks (specified by user). +2. Spawns parallel processes to execute the leased tasks. +3. Polls all the tasks continously till they finish. +4. Deletes the tasks from taskqueue on their successful completion. +5. It lets the user specify when to invoke the lease request instead of polling + tasks status in a tight loop for better resource utilization: + a. Invoke the Lease request when runnning tasks go beyound certain + threshold (min_running_tasks) + b. Wait time becomes more than specified poll-time-out interval. +6. Repeat the steps from 1 to 5 when either all tasks have finished executing + or one of the conditions in 5) is met. """ + + + +import sys +import time +from apiclient.errors import HttpError +from gtaskqueue.client_task import ClientTask +from gtaskqueue.taskqueue_client import TaskQueueClient +from gtaskqueue.taskqueue_logger import logger +from gtaskqueue.taskqueue_logger import set_logger +from google.apputils import app +import gflags as flags + +FLAGS = flags.FLAGS +flags.DEFINE_string( + 'project_name', + 'default', + 'The name of the Taskqueue API project.') +flags.DEFINE_string( + 'taskqueue_name', + 'testpuller', + 'taskqueue to which client wants to connect to') +flags.DEFINE_integer( + 'lease_secs', + 30, + 'The lease for the task in seconds') +flags.DEFINE_integer( + 'num_tasks', + 10, + 'The number of tasks to lease') +flags.DEFINE_integer( + 'min_running_tasks', + 0, + 'minmum number of tasks below which lease can be invoked') +flags.DEFINE_float( + 'sleep_interval_secs', + 2, + 'sleep interval when no tasks are found in the taskqueue') +flags.DEFINE_float( + 'timeout_secs_for_next_lease_request', + 600, + 'Wait time before next poll when no tasks are found in the' + 'queue (in seconds)') +flags.DEFINE_integer( + 'taskapi_requests_per_sec', + None, + 'limit on task_api requests per second') +flags.DEFINE_float( + 'sleep_before_next_poll_secs', + 2, + 'sleep interval before next poll') + + +class TaskQueuePuller(object): + """Maintains state information for TaskQueuePuller.""" + + def __init__(self): + self._last_lease_time = None + self._poll_timeout_start = None + self._num_last_leased_tasks = 0 + # Dictionary for running tasks's ids and their corresponding + # client_task object. + self._taskprocess_map = {} + try: + self.__tcq = TaskQueueClient() + self.task_api = self.__tcq.get_taskapi() + except HttpError, http_error: + logger.error('Could not get TaskQueue API handler and hence' \ + 'exiting: %s' % str(http_error)) + sys.exit() + + def _can_lease(self): + """Determines if new tasks can be leased. + + Determines if new taks can be leased based on + 1. Number of tasks already running in the system. + 2. Limit on accessing the taskqueue apirary. + + Returns: + True/False. + """ + if self._num_tasks_to_lease() > 0 and not self._is_rate_exceeded(): + return True + else: + return False + + def _is_rate_exceeded(self): + + """Determines if requests/second to TaskQueue API has exceeded limit. + + We do not access the APIs beyond the specified permissible limit. + If we have run N tasks in elapsed time since last lease, we have + already made N+1 requests to API (1 for collective lease and N for + their individual delete operations). If K reqs/sec is the limit on + accessing APIs, then we sould not invoke any request to API before + N+1/K sec approximately. The above condition is formulated in the + following method. + Returns: + True/False + """ + if not FLAGS.taskapi_requests_per_sec: + return False + if not self._last_lease_time: + return False + curr_time = time.time() + if ((curr_time - self._last_lease_time) < + ((1.0 * (self._num_last_leased_tasks - + len(self._taskprocess_map)) / + FLAGS.taskapi_requests_per_sec))): + return True + else: + return False + + def _num_tasks_to_lease(self): + + """Determines how many tasks can be leased. + + num_tasks is upper limit to running tasks in the system and hence + number of tasks which could be leased is difference of numtasks and + currently running tasks. + + Returns: + Number of tasks to lease. + """ + return FLAGS.num_tasks - len(self._taskprocess_map) + + def _update_last_lease_info(self, result): + + """Updates the information regarding last lease. + + Args: + result: Response object from TaskQueue API, containing list of + tasks. + """ + self._last_lease_time = time.time() + if result: + if result.get('items'): + self._num_last_leased_tasks = len(result.get('items')) + else: + self._num_last_leased_tasks = 0 + else: + self._num_last_leased_tasks = 0 + + def _update_poll_timeout_start(self): + + """Updates the start time for poll-timeout.""" + if not self._poll_timeout_start: + self._poll_timeout_start = time.time() + + def _continue_polling(self): + + """Checks whether lease can be invoked based on running tasks and + timeout. + + Lease can be invoked if + 1. Running tasks in the sytem has gone below the specified + threshold (min_running_tasks). + 2. Wait time has exceeded beyond time-out specified and at least one + tas has finished since last lease invocation. + + By doing this, we are essentially trying to batch the lease requests. + If this is not done and we start off leasing N tasks, its likely tasks + may finish slightly one after another, and we make N lease requests for + each task for next N tasks and so on. This can result in unnecessary + lease API call and hence to avoid that, we try and batch the lease + requests. Also we put certain limit on wait time for batching the + requests by incororating the time-out. + + Returns: + True/False + """ + if len(self._taskprocess_map) <= FLAGS.min_running_tasks: + return False + if self._poll_timeout_start: + elapsed_time = time.time() - self._poll_timeout_start + if elapsed_time > FLAGS.timeout_secs_for_next_lease_request: + self._poll_timeout_start = None + return False + return True + + def _get_tasks_from_queue(self): + + """Gets the available tasks from the taskqueue. + + Returns: + Lease response object. + """ + try: + tasks_to_fetch = self._num_tasks_to_lease() + lease_req = self.task_api.tasks().lease( + project=FLAGS.project_name, + taskqueue=FLAGS.taskqueue_name, + leaseSecs=FLAGS.lease_secs, + numTasks=tasks_to_fetch, + body={}) + result = lease_req.execute() + return result + except HttpError, http_error: + logger.error('Error during lease request: %s' % str(http_error)) + return None + + def _create_subprocesses_for_tasks(self, result): + + """Spawns parallel sub processes to execute tasks for better + throughput. + + Args: + result: lease resonse dictionary object. + """ + if not result: + logger.info('Error: result is not defined') + return None + if result.get('items'): + for task in result.get('items'): + task_id = task.get('id') + # Given that a task may be leased multiple times, we may get a + # task which we are currently executing on, so make sure we + # dont spaw another subprocess for it. + if task_id not in self._taskprocess_map: + ct = ClientTask(task) + # Check if tasks got initialized properly and then pu them + # in running tasks map. + if ct.init(): + # Put the clientTask objects in a dictionary to keep + # track of stats and objects are used later to delete + # the tasks from taskqueue + self._taskprocess_map[ct.get_task_id()] = ct + + def _poll_running_tasks(self): + + """Polls all the running tasks and delete them from taskqueue if + completed.""" + if self._taskprocess_map: + for task in self._taskprocess_map.values(): + if task.is_completed(self.task_api): + del self._taskprocess_map[task.get_task_id()] + # updates scheduling information for later use. + self._update_poll_timeout_start() + + def _sleep_before_next_lease(self): + + """Sleeps before invoking lease if required based on last lease info. + + It sleeps when no tasks were found on the taskqueue during last lease + request. To note, it discount the time taken in polling the tasks and + sleeps for (sleep_interval - time taken in poll). This avoids the + unnecessary wait if tasks could be leased. If no time was taken in + poll since there were not tasks in the system, it waits for full sleep + interval and thus optimizes the CPU cycles. + It does not sleep if the method is called for the first time (when no + lease request has ever been made). + """ + if not self._last_lease_time: + sleep_secs = 0 + elif self._num_last_leased_tasks <= 0: + time_elpased_since_last_lease = time.time() - self._last_lease_time + sleep_secs = (FLAGS.sleep_interval_secs - + time_elpased_since_last_lease) + if sleep_secs > 0: + logger.info('No tasks found and hence sleeping for sometime') + time.sleep(FLAGS.sleep_interval_secs) + + def lease_tasks(self): + + """Requests lease for specified number of tasks. + + It invokes lease request for appropriate number of tasks, spawns + parallel processes to execute them and also maintains scheduling + information. + + LeaseTask also takes care of waiting(sleeping) before invoking lease if + there are no tasks which can be leased in the taskqueue. This results + in better resource utilization. Apart from this, it also controls the + number of requests being sent to taskqueue APIs. + + Returns: + True/False based on if tasks could be leased or not. + """ + self._sleep_before_next_lease() + if self._can_lease(): + result = self._get_tasks_from_queue() + self._update_last_lease_info(result) + self._create_subprocesses_for_tasks(result) + return True + return False + + def poll_tasks(self): + + """Polls the status of running tasks of the system. + + Polls the status of tasks and then decides if it should continue to + poll depending on number of tasks running in the system and timeouts. + Instead of polling in a tight loop, it sleeps for sometime before the + next poll to avoid any unnecessary CPU cycles. poll_tasks returns + only when system has capability to accomodate at least one new task. + """ + + self._poll_running_tasks() + while self._continue_polling(): + logger.info('Sleeping before next poll') + time.sleep(FLAGS.sleep_before_next_poll_secs) + self._poll_running_tasks() + + +def main(argv): + + """Infinite loop to lease new tasks and poll them for completion.""" + # Settings for logger + set_logger() + # Instantiate puller + puller = TaskQueuePuller() + while True: + puller.lease_tasks() + puller.poll_tasks() + +if __name__ == '__main__': + app.run() diff --git a/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py new file mode 100644 index 0000000..4adea2b --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Commands to interact with the Task object of the TaskQueue API.""" + + +__version__ = '0.0.1' + + + +from gtaskqueue.taskqueue_cmd_base import GoogleTaskCommand + +from google.apputils import app +from google.apputils import appcommands +import gflags as flags + +FLAGS = flags.FLAGS + + +class GetTaskCommand(GoogleTaskCommand): + """Get properties of an existing task.""" + + def __init__(self, name, flag_values): + super(GetTaskCommand, self).__init__(name, flag_values) + + def build_request(self, task_api, flag_values): + """Build a request to get properties of a Task. + + Args: + task_api: The handle to the task collection API. + flag_values: The parsed command flags. + Returns: + The properties of the task. + """ + return task_api.get(project=flag_values.project_name, + taskqueue=flag_values.taskqueue_name, + task=flag_values.task_name) + + +class LeaseTaskCommand(GoogleTaskCommand): + """Lease a new task from the queue.""" + + def __init__(self, name, flag_values): + super(LeaseTaskCommand, self).__init__(name, + flag_values, + need_task_flag=False) + flags.DEFINE_integer('lease_secs', + None, + 'The lease for the task in seconds') + flags.DEFINE_integer('num_tasks', + 1, + 'The number of tasks to lease') + flags.DEFINE_integer('payload_size_to_display', + 2 * 1024 * 1024, + 'Size of the payload for leased tasks to show') + + def build_request(self, task_api, flag_values): + """Build a request to lease a pending task from the TaskQueue. + + Args: + task_api: The handle to the task collection API. + flag_values: The parsed command flags. + Returns: + A new leased task. + """ + if not flag_values.lease_secs: + raise app.UsageError('lease_secs must be specified') + + return task_api.lease(project=flag_values.project_name, + taskqueue=flag_values.taskqueue_name, + leaseSecs=flag_values.lease_secs, + numTasks=flag_values.num_tasks, + body={}) + + def print_result(self, result): + """Override to optionally strip the payload since it can be long.""" + if result.get('items'): + items = [] + for task in result.get('items'): + payloadlen = len(task['payloadBase64']) + if payloadlen > FLAGS.payload_size_to_display: + extra = payloadlen - FLAGS.payload_size_to_display + task['payloadBase64'] = ('%s(%d more bytes)' % + (task['payloadBase64'][:FLAGS.payload_size_to_display], + extra)) + items.append(task) + result['items'] = items + GoogleTaskCommand.print_result(self, result) + + +class DeleteTaskCommand(GoogleTaskCommand): + """Delete an existing task.""" + + def __init__(self, name, flag_values): + super(DeleteTaskCommand, self).__init__(name, flag_values) + + def build_request(self, task_api, flag_values): + """Build a request to delete a Task. + + Args: + task_api: The handle to the taskqueue collection API. + flag_values: The parsed command flags. + Returns: + Whether the delete was successful. + """ + return task_api.delete(project=flag_values.project_name, + taskqueue=flag_values.taskqueue_name, + task=flag_values.task_name) + + +class ListTasksCommand(GoogleTaskCommand): + """Lists all tasks in a queue (currently upto a max of 100).""" + + def __init__(self, name, flag_values): + super(ListTasksCommand, self).__init__(name, + flag_values, + need_task_flag=False) + + def build_request(self, task_api, flag_values): + """Build a request to lists tasks in a queue. + + Args: + task_api: The handle to the taskqueue collection API. + flag_values: The parsed command flags. + Returns: + A list of pending tasks in the queue. + """ + return task_api.list(project=flag_values.project_name, + taskqueue=flag_values.taskqueue_name) + + +def add_commands(): + appcommands.AddCmd('listtasks', ListTasksCommand) + appcommands.AddCmd('gettask', GetTaskCommand) + appcommands.AddCmd('deletetask', DeleteTaskCommand) + appcommands.AddCmd('leasetask', LeaseTaskCommand) diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py new file mode 100644 index 0000000..425e156 --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py @@ -0,0 +1,189 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Class to connect to TaskQueue API.""" + + + +import os +import sys +import urlparse +from apiclient.anyjson import simplejson as json +from apiclient.discovery import build +from apiclient.errors import HttpError +import httplib2 +from oauth2client.file import Storage +from oauth2client.client import OAuth2WebServerFlow +from oauth2client.tools import run +from gtaskqueue.taskqueue_logger import logger + +from google.apputils import app +import gflags as flags + +FLAGS = flags.FLAGS +flags.DEFINE_string( + 'service_version', + 'v1beta1', + 'Google taskqueue api version.') +flags.DEFINE_string( + 'api_host', + 'https://www.googleapis.com/', + 'API host name') +flags.DEFINE_bool( + 'use_developer_key', + False, + 'User wants to use the developer key while accessing taskqueue apis') +flags.DEFINE_string( + 'developer_key_file', + '~/.taskqueue.apikey', + 'Developer key provisioned from api console') +flags.DEFINE_bool( + 'dump_request', + False, + 'Prints the outgoing HTTP request along with headers and body.') +flags.DEFINE_string( + 'credentials_file', + 'taskqueue.dat', + 'File where you want to store the auth credentails for later user') + +# Set up a Flow object to be used if we need to authenticate. This +# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with +# the information it needs to authenticate. Note that it is called +# the Web Server Flow, but it can also handle the flow for native +# applications +# The client_id client_secret are copied from the Identity tab on +# the Google APIs Console +FLOW = OAuth2WebServerFlow( + client_id='157776985798.apps.googleusercontent.com', + client_secret='tlpVCmaS6yLjxnnPu0ARIhNw', + scope='https://www.googleapis.com/auth/taskqueue', + user_agent='taskqueue-cmdline-sample/1.0') + + +class TaskQueueClient: + """Class to setup connection with taskqueue API.""" + + def __init__(self): + if not FLAGS.project_name: + raise app.UsageError('You must specify a project name' + ' using the "--project_name" flag.') + discovery_uri = ( + FLAGS.api_host + 'discovery/v0.3/describe/{api}/{apiVersion}') + logger.info(discovery_uri) + try: + # If the Credentials don't exist or are invalid run through the + # native clien flow. The Storage object will ensure that if + # successful the good Credentials will get written back to a file. + # Setting FLAGS.auth_local_webserver to false since we can run our + # tool on Virtual Machines and we do not want to run the webserver + # on VMs. + FLAGS.auth_local_webserver = False + storage = Storage(FLAGS.credentials_file) + credentials = storage.get() + if credentials is None or credentials.invalid == True: + credentials = run(FLOW, storage) + http = credentials.authorize(self._dump_request_wrapper( + httplib2.Http())) + self.task_api = build('taskqueue', + FLAGS.service_version, + http=http, + discoveryServiceUrl=discovery_uri) + except HttpError, http_error: + logger.error('Error gettin task_api: %s' % http_error) + + def get_taskapi(self): + """Returns handler for tasks API from taskqueue API collection.""" + return self.task_api + + + def _dump_request_wrapper(self, http): + """Dumps the outgoing HTTP request if requested. + + Args: + http: An instance of httplib2.Http or something that acts like it. + + Returns: + httplib2.Http like object. + """ + request_orig = http.request + + def new_request(uri, method='GET', body=None, headers=None, + redirections=httplib2.DEFAULT_MAX_REDIRECTS, + connection_type=None): + """Overrides the http.request method to add some utilities.""" + if (FLAGS.api_host + "discovery/" not in uri and + FLAGS.use_developer_key): + developer_key_path = os.path.expanduser( + FLAGS.developer_key_file) + if not os.path.isfile(developer_key_path): + print 'Please generate developer key from the Google API' \ + 'Console and store it in %s' % (FLAGS.developer_key_file) + sys.exit() + developer_key_file = open(developer_key_path, 'r') + try: + developer_key = developer_key_file.read().strip() + except IOError, io_error: + print 'Error loading developer key from file %s' % ( + FLAGS.developer_key_file) + print 'Error details: %s' % str(io_error) + sys.exit() + finally: + developer_key_file.close() + s = urlparse.urlparse(uri) + query = 'key=' + developer_key + if s.query: + query = s.query + '&key=' + developer_key + d = urlparse.ParseResult(s.scheme, + s.netloc, + s.path, + s.params, + query, + s.fragment) + uri = urlparse.urlunparse(d) + if FLAGS.dump_request: + print '--request-start--' + print '%s %s' % (method, uri) + if headers: + for (h, v) in headers.iteritems(): + print '%s: %s' % (h, v) + print '' + if body: + print json.dumps(json.loads(body), + sort_keys=True, + indent=2) + print '--request-end--' + return request_orig(uri, + method, + body, + headers, + redirections, + connection_type) + http.request = new_request + return http + + def print_result(self, result): + """Pretty-print the result of the command. + + The default behavior is to dump a formatted JSON encoding + of the result. + + Args: + result: The JSON-serializable result to print. + """ + # We could have used the pprint module, but it produces + # noisy output due to all of our keys and values being + # unicode strings rather than simply ascii. + print json.dumps(result, sort_keys=True, indent=2) diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py new file mode 100644 index 0000000..42474c6 --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +"""Commands for interacting with Google TaskQueue.""" + +__version__ = '0.0.1' + + +import os +import sys +import urlparse + + +from apiclient.discovery import build +from apiclient.errors import HttpError +from apiclient.anyjson import simplejson as json +import httplib2 +from oauth2client.file import Storage +from oauth2client.client import OAuth2WebServerFlow +from oauth2client.tools import run + +from google.apputils import app +from google.apputils import appcommands +import gflags as flags + + +FLAGS = flags.FLAGS + +flags.DEFINE_string( + 'service_version', + 'v1beta1', + 'Google taskqueue api version.') +flags.DEFINE_string( + 'api_host', + 'https://www.googleapis.com/', + 'API host name') +flags.DEFINE_string( + 'project_name', + 'default', + 'The name of the Taskqueue API project.') +flags.DEFINE_bool( + 'use_developer_key', + False, + 'User wants to use the developer key while accessing taskqueue apis') +flags.DEFINE_string( + 'developer_key_file', + '~/.taskqueue.apikey', + 'Developer key provisioned from api console') +flags.DEFINE_bool( + 'dump_request', + False, + 'Prints the outgoing HTTP request along with headers and body.') +flags.DEFINE_string( + 'credentials_file', + 'taskqueue.dat', + 'File where you want to store the auth credentails for later user') + +# Set up a Flow object to be used if we need to authenticate. This +# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with +# the information it needs to authenticate. Note that it is called +# the Web Server Flow, but it can also handle the flow for native +# applications +# The client_id client_secret are copied from the Identity tab on +# the Google APIs Console +FLOW = OAuth2WebServerFlow( + client_id='157776985798.apps.googleusercontent.com', + client_secret='tlpVCmaS6yLjxnnPu0ARIhNw', + scope='https://www.googleapis.com/auth/taskqueue', + user_agent='taskqueue-cmdline-sample/1.0') + +class GoogleTaskQueueCommandBase(appcommands.Cmd): + """Base class for all the Google TaskQueue client commands.""" + + DEFAULT_PROJECT_PATH = 'projects/default' + + def __init__(self, name, flag_values): + super(GoogleTaskQueueCommandBase, self).__init__(name, flag_values) + + def _dump_request_wrapper(self, http): + """Dumps the outgoing HTTP request if requested. + + Args: + http: An instance of httplib2.Http or something that acts like it. + + Returns: + httplib2.Http like object. + """ + request_orig = http.request + + def new_request(uri, method='GET', body=None, headers=None, + redirections=httplib2.DEFAULT_MAX_REDIRECTS, + connection_type=None): + """Overrides the http.request method to add some utilities.""" + if (FLAGS.api_host + "discovery/" not in uri and + FLAGS.use_developer_key): + developer_key_path = os.path.expanduser( + FLAGS.developer_key_file) + if not os.path.isfile(developer_key_path): + print 'Please generate developer key from the Google APIs' \ + 'Console and store it in %s' % (FLAGS.developer_key_file) + sys.exit() + developer_key_file = open(developer_key_path, 'r') + try: + developer_key = developer_key_file.read().strip() + except IOError, io_error: + print 'Error loading developer key from file %s' % ( + FLAGS.developer_key_file) + print 'Error details: %s' % str(io_error) + sys.exit() + finally: + developer_key_file.close() + s = urlparse.urlparse(uri) + query = 'key=' + developer_key + if s.query: + query = s.query + '&key=' + developer_key + d = urlparse.ParseResult(s.scheme, + s.netloc, + s.path, + s.params, + query, + s.fragment) + uri = urlparse.urlunparse(d) + + if FLAGS.dump_request: + print '--request-start--' + print '%s %s' % (method, uri) + if headers: + for (h, v) in headers.iteritems(): + print '%s: %s' % (h, v) + print '' + if body: + print json.dumps(json.loads(body), sort_keys=True, indent=2) + print '--request-end--' + return request_orig(uri, + method, + body, + headers, + redirections, + connection_type) + http.request = new_request + return http + + def Run(self, argv): + """Run the command, printing the result. + + Args: + argv: The non-flag arguments to the command. + """ + if not FLAGS.project_name: + raise app.UsageError('You must specify a project name' + ' using the "--project_name" flag.') + discovery_uri = ( + FLAGS.api_host + 'discovery/v0.3/describe/{api}/{apiVersion}') + try: + # If the Credentials don't exist or are invalid run through the + # native client flow. The Storage object will ensure that if + # successful the good Credentials will get written back to a file. + # Setting FLAGS.auth_local_webserver to false since we can run our + # tool on Virtual Machines and we do not want to run the webserver + # on VMs. + FLAGS.auth_local_webserver = False + storage = Storage(FLAGS.credentials_file) + credentials = storage.get() + if credentials is None or credentials.invalid == True: + credentials = run(FLOW, storage) + http = credentials.authorize(self._dump_request_wrapper( + httplib2.Http())) + api = build('taskqueue', + FLAGS.service_version, + http=http, + discoveryServiceUrl=discovery_uri) + result = self.run_with_api_and_flags_and_args(api, FLAGS, argv) + self.print_result(result) + except HttpError, http_error: + print 'Error Processing request: %s' % str(http_error) + + def run_with_api_and_flags_and_args(self, api, flag_values, unused_argv): + """Run the command given the API, flags, and args. + + The default implementation of this method discards the args and + calls into run_with_api_and_flags. + + Args: + api: The handle to the Google TaskQueue API. + flag_values: The parsed command flags. + unused_argv: The non-flag arguments to the command. + Returns: + The result of running the command + """ + return self.run_with_api_and_flags(api, flag_values) + + def print_result(self, result): + """Pretty-print the result of the command. + + The default behavior is to dump a formatted JSON encoding + of the result. + + Args: + result: The JSON-serializable result to print. + """ + # We could have used the pprint module, but it produces + # noisy output due to all of our keys and values being + # unicode strings rather than simply ascii. + print json.dumps(result, sort_keys=True, indent=2) + + + +class GoogleTaskQueueCommand(GoogleTaskQueueCommandBase): + """Base command for working with the taskqueues collection.""" + + def __init__(self, name, flag_values): + super(GoogleTaskQueueCommand, self).__init__(name, flag_values) + flags.DEFINE_string('taskqueue_name', + 'myqueue', + 'TaskQueue name', + flag_values=flag_values) + + def run_with_api_and_flags(self, api, flag_values): + """Run the command, returning the result. + + Args: + api: The handle to the Google TaskQueue API. + flag_values: The parsed command flags. + Returns: + The result of running the command. + """ + taskqueue_request = self.build_request(api.taskqueues(), flag_values) + return taskqueue_request.execute() + + +class GoogleTaskCommand(GoogleTaskQueueCommandBase): + """Base command for working with the tasks collection.""" + + def __init__(self, name, flag_values, need_task_flag=True): + super(GoogleTaskCommand, self).__init__(name, flag_values) + # Common flags that are shared by all the Task commands. + flags.DEFINE_string('taskqueue_name', + 'myqueue', + 'TaskQueue name', + flag_values=flag_values) + # Not all task commands need the task_name flag. + if need_task_flag: + flags.DEFINE_string('task_name', + None, + 'Task name', + flag_values=flag_values) + + def run_with_api_and_flags(self, api, flag_values): + """Run the command, returning the result. + + Args: + api: The handle to the Google TaskQueue API. + flag_values: The parsed command flags. + flags.DEFINE_string('payload', + None, + 'Payload of the task') + Returns: + The result of running the command. + """ + task_request = self.build_request(api.tasks(), flag_values) + return task_request.execute() diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py new file mode 100644 index 0000000..99c3f2a --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Commands to interact with the TaskQueue object of the TaskQueue API.""" + + +__version__ = '0.0.1' + + + +from gtaskqueue.taskqueue_cmd_base import GoogleTaskQueueCommand + +from google.apputils import appcommands +import gflags as flags + +FLAGS = flags.FLAGS + + +class GetTaskQueueCommand(GoogleTaskQueueCommand): + """Get properties of an existing task queue.""" + + def __init__(self, name, flag_values): + super(GetTaskQueueCommand, self).__init__(name, flag_values) + flags.DEFINE_boolean('get_stats', + False, + 'Whether to get Stats') + + def build_request(self, taskqueue_api, flag_values): + """Build a request to get properties of a TaskQueue. + + Args: + taskqueue_api: The handle to the taskqueue collection API. + flag_values: The parsed command flags. + Returns: + The properties of the taskqueue. + """ + return taskqueue_api.get(project=flag_values.project_name, + taskqueue=flag_values.taskqueue_name, + getStats=flag_values.get_stats) + + +def add_commands(): + appcommands.AddCmd('getqueue', GetTaskQueueCommand) diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py new file mode 100644 index 0000000..3f445f3 --- /dev/null +++ b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Log settings for taskqueue_puller module.""" + + + +import logging +import logging.config +from google.apputils import app +import gflags as flags + +FLAGS = flags.FLAGS +flags.DEFINE_string( + 'log_output_file', + '/tmp/taskqueue-puller.log', + 'Logfile name for taskqueue_puller.') + + +logger = logging.getLogger('TaskQueueClient') + + +def set_logger(): + """Settings for taskqueue_puller logger.""" + logger.setLevel(logging.INFO) + + # create formatter + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + + # Set size of the log file and the backup count for rotated log files. + handler = logging.handlers.RotatingFileHandler(FLAGS.log_output_file, + maxBytes = 1024 * 1024, + backupCount = 5) + # add formatter to handler + handler.setFormatter(formatter) + # add formatter to handler + logger.addHandler(handler) + +if __name__ == '__main__': + app.run() diff --git a/samples/gtaskqueue_sample/setup.py b/samples/gtaskqueue_sample/setup.py new file mode 100644 index 0000000..b7eff8f --- /dev/null +++ b/samples/gtaskqueue_sample/setup.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# Copyright (C) 2010 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Setup script for the Google TaskQueue API command-line tool.""" + +__version__ = '1.0.2' + + +import sys +try: + from setuptools import setup + print 'Loaded setuptools' +except ImportError: + from distutils.core import setup + print 'Loaded distutils.core' + + +PACKAGE_NAME = 'google-taskqueue-client' +INSTALL_REQUIRES = ['google-apputils==0.1', + 'google-api-python-client', + 'httplib2', + 'oauth2', + 'python-gflags'] +setup(name=PACKAGE_NAME, + version=__version__, + description='Google TaskQueue API command-line tool and utils', + author='Google Inc.', + author_email='google-appengine@googlegroups.com', + url='http://code.google.com/appengine/docs/python/taskqueue/pull/overview.html', + install_requires=INSTALL_REQUIRES, + packages=['gtaskqueue'], + scripts=['gtaskqueue/gtaskqueue', 'gtaskqueue/gtaskqueue_puller', + 'gtaskqueue/gen_appengine_access_token'], + license='Apache 2.0', + keywords='google taskqueue api client', + classifiers=['Development Status :: 3 - Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: Apache Software License', + 'Operating System :: POSIX', + 'Topic :: Internet :: WWW/HTTP'])