From 79090130378dc22661ef28e72b9d30434bb5a137 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Tue, 6 Nov 2012 13:09:39 -0500 Subject: [PATCH] Remove gtaskqueue sample. Reviewed in https://codereview.appspot.com/6821087/. --- samples/gtaskqueue_sample/README | 49 --- .../gtaskqueue/client_task.py | 334 ----------------- .../gtaskqueue/gen_appengine_access_token | 116 ------ .../gtaskqueue_sample/gtaskqueue/gtaskqueue | 53 --- .../gtaskqueue/gtaskqueue_puller | 348 ------------------ .../gtaskqueue_sample/gtaskqueue/task_cmds.py | 211 ----------- .../gtaskqueue/taskqueue_client.py | 189 ---------- .../gtaskqueue/taskqueue_cmd_base.py | 275 -------------- .../gtaskqueue/taskqueue_cmds.py | 57 --- .../gtaskqueue/taskqueue_logger.py | 54 --- samples/gtaskqueue_sample/setup.py | 53 --- 11 files changed, 1739 deletions(-) delete mode 100644 samples/gtaskqueue_sample/README delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/client_task.py delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/gtaskqueue delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/task_cmds.py delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py delete mode 100644 samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py delete mode 100644 samples/gtaskqueue_sample/setup.py diff --git a/samples/gtaskqueue_sample/README b/samples/gtaskqueue_sample/README deleted file mode 100644 index f823db0..0000000 --- a/samples/gtaskqueue_sample/README +++ /dev/null @@ -1,49 +0,0 @@ -This acts as a sample as well as commandline tool for accessing Google TaskQueue -APIs. - -api: taskqueue -keywords: cmdline - -Installation -============ - -To install, simply say - - $ python setup.py install --record=files.txt - -to install the files and record what files are installed in files.txt. -Make sure that you have already installed the google-python-client-libraries -using the setup.py in the toplevel directory. - -You may need root priveleges for install. - -Running -======= -This sample provides following: -1. gtaskqueue: This works as a command-line tool to access Google TaskQueue -API. You can perform various operations on taskqueues such as leastask, -getask, listtasks, deletetask, getqueue. -Example usage: - i. To lease a task - gtaskqueue leasetask --taskqueue_name= - -lease_secs=30 --project_name= - ii. To get stats on a queue - gtaskqueue getqueue --taskqueue_name= - --project_name= --get_stats - -2. gtaskqueue_puller: This works as a worker to continuously pull tasks enqueued -by your app,perform the task and the post the output back to your app. -Example Usage: - gtaskqueue_puller --taskqueue_name= - -lease_secs=30 --project_name= - --executable_binary=”cat” --output_url= - -Third Party Libraries -===================== - -These libraries will be installed when you install the client library: - -http://code.google.com/p/httplib2 -http://code.google.com/p/uri-templates -http://github.com/simplegeo/python-oauth2 diff --git a/samples/gtaskqueue_sample/gtaskqueue/client_task.py b/samples/gtaskqueue_sample/gtaskqueue/client_task.py deleted file mode 100644 index 5214e81..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/client_task.py +++ /dev/null @@ -1,334 +0,0 @@ -#!/usr/bin/env python -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Class to encapsulate task related information and methods on task_puller.""" - - - -import base64 -import oauth2 as oauth -import os -import subprocess -import tempfile -import time -import urllib2 -from apiclient.errors import HttpError -from gtaskqueue.taskqueue_logger import logger -import gflags as flags - - -FLAGS = flags.FLAGS -flags.DEFINE_string( - 'executable_binary', - '/bin/cat', - 'path of the binary to be executed') -flags.DEFINE_string( - 'output_url', - '', - 'url to which output is posted. The url must include param name, ' - 'value for which is populated with task_id from puller while posting ' - 'the data. Format of output url is absolute url which handles the' - 'post request from task queue puller.' - '(Eg: "http://taskpuller.appspot.com/taskdata?name=").' - 'The Param value is always the task_id. The handler for this post' - 'should be able to associate the task with its id and take' - 'appropriate action. Use the appengine_access_token.py tool to' - 'generate the token and store it in a file before you start.') -flags.DEFINE_string( - 'appengine_access_token_file', - None, - 'File containing an Appengine Access token, if any. If present this' - 'token is added to the output_url request, so that the output_url can' - 'be an authenticated end-point. Use the appengine_access_token.py tool' - 'to generate the token and store it in a file before you start.') -flags.DEFINE_float( - 'task_timeout_secs', - '3600', - 'timeout to kill the task') - - -class ClientTaskInitError(Exception): - """Raised when initialization of client task fails.""" - - def __init__(self, task_id, error_str): - Exception.__init__(self) - self.task_id = task_id - self.error_str = error_str - - def __str__(self): - return ('Error initializing task "%s". Error details "%s". ' - % (self.task_id, self.error_str)) - - -class ClientTask(object): - """Class to encapsulate task information pulled by taskqueue_puller module. - - This class is responsible for creating an independent client task object by - taking some information from lease response task object. It encapsulates - methods responsible for spawning an independent subprocess for executing - the task, tracking the status of the task and also deleting the task from - taskqeueue when completed. It also has the functionality to give the output - back to the application by posting to the specified url. - """ - - def __init__(self, task): - self._task = task - self._process = None - self._output_file = None - - # Class method that caches the Appengine Access Token if any - @classmethod - def get_access_token(cls): - if not FLAGS.appengine_access_token_file: - return None - if not _access_token: - fhandle = open(FLAGS.appengine_access_token_file, 'rb') - _access_token = oauth.Token.from_string(fhandle.read()) - fhandle.close() - return _access_token - - def init(self): - """Extracts information from task object and intializes processing. - - Extracts id and payload from task object, decodes the payload and puts - it in input file. After this, it spawns a subprocess to execute the - task. - - Returns: - True if everything till task execution starts fine. - False if anything goes wrong in initialization of task execution. - """ - try: - self.task_id = self._task.get('id') - self._payload = self._decode_base64_payload( - self._task.get('payloadBase64')) - self._payload_file = self._dump_payload_to_file() - self._start_task_execution() - return True - except ClientTaskInitError, ctie: - logger.error(str(ctie)) - return False - - def _decode_base64_payload(self, encoded_str): - """Method to decode payload encoded in base64.""" - try: - # If the payload is empty, do not try to decode it. Payload usually - # not expected to be empty and hence log a warning and then - # continue. - if encoded_str: - decoded_str = base64.urlsafe_b64decode( - encoded_str.encode('utf-8')) - return decoded_str - else: - logger.warn('Empty paylaod for task %s' % self.task_id) - return '' - except base64.binascii.Error, berror: - logger.error('Error decoding payload for task %s. Error details %s' - % (self.task_id, str(berror))) - raise ClientTaskInitError(self.task_id, 'Error decoding payload') - # Generic catch block to avoid crashing of puller due to some bad - # encoding issue wih payload of any task. - except: - raise ClientTaskInitError(self.task_id, 'Error decoding payload') - - def _dump_payload_to_file(self): - """Method to write input extracted from payload to a temporary file.""" - try: - (fd, fname) = tempfile.mkstemp() - f = os.fdopen(fd, 'w') - f.write(self._payload) - f.close() - return fname - except OSError: - logger.error('Error dumping payload %s. Error details %s' % - (self.task_id, str(OSError))) - raise ClientTaskInitError(self.task_id, 'Error dumping payload') - - def _get_input_file(self): - return self._payload_file - - def _post_output(self): - """Posts the outback back to specified url in the form of a byte - array. - - It reads the output generated by the task as a byte-array. It posts the - response to specified url appended with the taskId. The application - using the taskqueue must have a handler to handle the data being posted - from puller. Format of body of response object is byte-array to make - the it genric for any kind of output generated. - - Returns: - True/False based on post status. - """ - if FLAGS.output_url: - try: - f = open(self._get_output_file(), 'rb') - body = f.read() - f.close() - url = FLAGS.output_url + self.task_id - logger.debug('Posting data to url %s' % url) - headers = {'Content-Type': 'byte-array'} - # Add an access token to the headers if specified. - # This enables the output_url to be authenticated and not open. - access_token = ClientTask.get_access_token() - if access_token: - consumer = oauth.Consumer('anonymous', 'anonymous') - oauth_req = oauth.Request.from_consumer_and_token( - consumer, - token=access_token, - http_url=url) - headers.update(oauth_req.to_header()) - # TODO: Use httplib instead of urllib for consistency. - req = urllib2.Request(url, body, headers) - urllib2.urlopen(req) - except ValueError: - logger.error('Error posting data back %s. Error details %s' - % (self.task_id, str(ValueError))) - return False - except Exception: - logger.error('Exception while posting data back %s. Error' - 'details %s' % (self.task_id, str(Exception))) - return False - return True - - def _get_output_file(self): - """Returns the output file if it exists, else creates it and returns - it.""" - if not self._output_file: - (_, self._output_file) = tempfile.mkstemp() - return self._output_file - - def get_task_id(self): - return self.task_id - - def _start_task_execution(self): - """Method to spawn subprocess to execute the tasks. - - This method splits the commands/executable_binary to desired arguments - format for Popen API. It appends input and output files to the - arguments. It is assumed that commands/executable_binary expects input - and output files as first and second positional parameters - respectively. - """ - # TODO: Add code to handle the cleanly shutdown when a process is killed - # by Ctrl+C. - try: - cmdline = FLAGS.executable_binary.split(' ') - cmdline.append(self._get_input_file()) - cmdline.append(self._get_output_file()) - self._process = subprocess.Popen(cmdline) - self.task_start_time = time.time() - except OSError: - logger.error('Error creating subprocess %s. Error details %s' - % (self.task_id, str(OSError))) - self._cleanup() - raise ClientTaskInitError(self.task_id, - 'Error creating subprocess') - except ValueError: - logger.error('Invalid arguments while executing task ', - self.task_id) - self._cleanup() - raise ClientTaskInitError(self.task_id, - 'Invalid arguments while executing task') - - def is_completed(self, task_api): - """Method to check if task has finished executing. - - This is responsible for checking status of task execution. If the task - has already finished executing, it deletes the task from the task - queue. If the task has been running since long time then it assumes - that there is high proabbility that it is dfunct and hence kills the - corresponding subprocess. In this case, task had not completed - successfully and hence we do not delete it form the taskqueue. In above - two cases, task completion status is returned as true since there is - nothing more to run in the task. In all other cases, task is still - running and hence we return false as completion status. - - Args: - task_api: handle for taskqueue api collection. - - Returns: - Task completion status (True/False) - """ - status = False - try: - task_status = self._process.poll() - if task_status == 0: - status = True - if self._post_output(): - self._delete_task_from_queue(task_api) - self._cleanup() - elif self._has_timedout(): - status = True - self._kill_subprocess() - except OSError: - logger.error('Error during polling status of task %s, Error ' - 'details %s' % (self.task_id, str(OSError))) - return status - - def _cleanup(self): - """Cleans up temporary input/output files used in task execution.""" - try: - if os.path.exists(self._get_input_file()): - os.remove(self._get_input_file()) - if os.path.exists(self._get_output_file()): - os.remove(self._get_output_file()) - except OSError: - logger.error('Error during file cleanup for task %s. Error' - 'details %s' % (self.task_id, str(OSError))) - - def _delete_task_from_queue(self, task_api): - """Method to delete the task from the taskqueue. - - First, it tries to post the output back to speified url. On successful - post, the task is deleted from taskqueue since the task has produced - expected output. If the post was unsuccessful, the task is not deleted - form the tskqueue since the expected output has yet not reached the - application. In either case cleanup is performed on the task. - - Args: - task_api: handle for taskqueue api collection. - - Returns: - Delete status (True/False) - """ - - try: - delete_request = task_api.tasks().delete( - project=FLAGS.project_name, - taskqueue=FLAGS.taskqueue_name, - task=self.task_id) - delete_request.execute() - except HttpError, http_error: - logger.error('Error deleting task %s from taskqueue.' - 'Error details %s' - % (self.task_id, str(http_error))) - - def _has_timedout(self): - """Checks if task has been running since long and has timedout.""" - if (time.time() - self.task_start_time) > FLAGS.task_timeout_secs: - return True - else: - return False - - def _kill_subprocess(self): - """Kills the process after cleaning up the task.""" - self._cleanup() - try: - self._process.kill() - logger.info('Trying to kill task %s, since it has been running ' - 'for long' % self.task_id) - except OSError: - logger.error('Error killing task %s. Error details %s' - % (self.task_id, str(OSError))) diff --git a/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token b/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token deleted file mode 100644 index eeef137..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/gen_appengine_access_token +++ /dev/null @@ -1,116 +0,0 @@ -#!/usr/bin/env python -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Tool to get an Access Token to access an auth protected Appengine end point. - -This tool talks to the appengine end point, and gets an Access Token that is -stored in a file. This token can be used by a tool to do authorized access to -an appengine end point. -""" - - - -from google.apputils import app -import gflags as flags -import httplib2 -import oauth2 as oauth -import time - -FLAGS = flags.FLAGS - -flags.DEFINE_string( - 'appengine_host', - None, - 'Appengine Host for whom we are trying to get an access token') -flags.DEFINE_string( - 'access_token_file', - None, - 'The file where the access token is stored') - - -def get_access_token(): - if not FLAGS.appengine_host: - print('must supply the appengine host') - exit(1) - - # setup - server = FLAGS.appengine_host - request_token_url = server + '/_ah/OAuthGetRequestToken' - authorization_url = server + '/_ah/OAuthAuthorizeToken' - access_token_url = server + '/_ah/OAuthGetAccessToken' - consumer = oauth.Consumer('anonymous', 'anonymous') - signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1() - - # The Http client that will be used to make the requests. - h = httplib2.Http() - - # get request token - print '* Obtain a request token ...' - parameters = {} - # We dont have a callback server, we're going to use the browser to - # authorize. - - #TODO: Add check for 401 etc - parameters['oauth_callback'] = 'oob' - oauth_req1 = oauth.Request.from_consumer_and_token( - consumer, http_url=request_token_url, parameters=parameters) - oauth_req1.sign_request(signature_method_hmac_sha1, consumer, None) - print 'Request headers: %s' % str(oauth_req1.to_header()) - response, content = h.request(oauth_req1.to_url(), 'GET') - token = oauth.Token.from_string(content) - print 'GOT key: %s secret:%s' % (str(token.key), str(token.secret)) - - print '* Authorize the request token ...' - oauth_req2 = oauth.Request.from_token_and_callback( - token=token, callback='oob', http_url=authorization_url) - print 'Please run this URL in a browser and paste the token back here' - print oauth_req2.to_url() - verification_code = raw_input('Enter verification code: ').strip() - token.set_verifier(verification_code) - - # get access token - print '* Obtain an access token ...' - oauth_req3 = oauth.Request.from_consumer_and_token( - consumer, token=token, http_url=access_token_url) - oauth_req3.sign_request(signature_method_hmac_sha1, consumer, token) - print 'Request headers: %s' % str(oauth_req3.to_header()) - response, content = h.request(oauth_req3.to_url(), 'GET') - access_token = oauth.Token.from_string(content) - print 'Access Token key: %s secret:%s' % (str(access_token.key), - str(access_token.secret)) - - # Save the token to a file if its specified. - if FLAGS.access_token_file: - fhandle = open(FLAGS.access_token_file, 'w') - fhandle.write(access_token.to_string()) - fhandle.close() - - # Example : access some protected resources - print '* Checking the access token against protected resources...' - # Assumes that the server + "/" is protected. - test_url = server + "/" - oauth_req4 = oauth.Request.from_consumer_and_token(consumer, - token=token, - http_url=test_url) - oauth_req4.sign_request(signature_method_hmac_sha1, consumer, token) - resp, content = h.request(test_url, "GET", headers=oauth_req4.to_header()) - print resp - print content - - -def main(argv): - get_access_token() - -if __name__ == '__main__': - app.run() diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue deleted file mode 100644 index ce68ef1..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Command line tool for interacting with Google TaskQueue API.""" - -__version__ = '0.0.1' - - -import logging - -from gtaskqueue import task_cmds -from gtaskqueue import taskqueue_cmds - -from google.apputils import appcommands -import gflags as flags - -LOG_LEVELS = [logging.DEBUG, - logging.INFO, - logging.WARNING, - logging.CRITICAL] -LOG_LEVEL_NAMES = map(logging.getLevelName, LOG_LEVELS) -FLAGS = flags.FLAGS - -flags.DEFINE_enum( - 'log_level', - logging.getLevelName(logging.WARNING), - LOG_LEVEL_NAMES, - 'Logging output level.') - - -def main(unused_argv): - log_level_map = dict( - [(logging.getLevelName(level), level) for level in LOG_LEVELS]) - logging.getLogger().setLevel(log_level_map[FLAGS.log_level]) - taskqueue_cmds.add_commands() - task_cmds.add_commands() - -if __name__ == '__main__': - appcommands.Run() diff --git a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller b/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller deleted file mode 100644 index f53181a..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/gtaskqueue_puller +++ /dev/null @@ -1,348 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Module to pull tasks from TaskQueues and execute them. - -This module does the following in an infinite loop. -1. Connects to Task API (of TaskQueues API collection) to request lease on - certain number of tasks (specified by user). -2. Spawns parallel processes to execute the leased tasks. -3. Polls all the tasks continously till they finish. -4. Deletes the tasks from taskqueue on their successful completion. -5. It lets the user specify when to invoke the lease request instead of polling - tasks status in a tight loop for better resource utilization: - a. Invoke the Lease request when runnning tasks go beyound certain - threshold (min_running_tasks) - b. Wait time becomes more than specified poll-time-out interval. -6. Repeat the steps from 1 to 5 when either all tasks have finished executing - or one of the conditions in 5) is met. """ - - - -import sys -import time -from apiclient.errors import HttpError -from gtaskqueue.client_task import ClientTask -from gtaskqueue.taskqueue_client import TaskQueueClient -from gtaskqueue.taskqueue_logger import logger -from gtaskqueue.taskqueue_logger import set_logger -from google.apputils import app -import gflags as flags - -FLAGS = flags.FLAGS -flags.DEFINE_string( - 'project_name', - 'default', - 'The name of the Taskqueue API project.') -flags.DEFINE_string( - 'taskqueue_name', - 'testpuller', - 'taskqueue to which client wants to connect to') -flags.DEFINE_integer( - 'lease_secs', - 30, - 'The lease for the task in seconds') -flags.DEFINE_integer( - 'num_tasks', - 10, - 'The number of tasks to lease') -flags.DEFINE_integer( - 'min_running_tasks', - 0, - 'minmum number of tasks below which lease can be invoked') -flags.DEFINE_float( - 'sleep_interval_secs', - 2, - 'sleep interval when no tasks are found in the taskqueue') -flags.DEFINE_float( - 'timeout_secs_for_next_lease_request', - 600, - 'Wait time before next poll when no tasks are found in the' - 'queue (in seconds)') -flags.DEFINE_integer( - 'taskapi_requests_per_sec', - None, - 'limit on task_api requests per second') -flags.DEFINE_float( - 'sleep_before_next_poll_secs', - 2, - 'sleep interval before next poll') - - -class TaskQueuePuller(object): - """Maintains state information for TaskQueuePuller.""" - - def __init__(self): - self._last_lease_time = None - self._poll_timeout_start = None - self._num_last_leased_tasks = 0 - # Dictionary for running tasks's ids and their corresponding - # client_task object. - self._taskprocess_map = {} - try: - self.__tcq = TaskQueueClient() - self.task_api = self.__tcq.get_taskapi() - except HttpError, http_error: - logger.error('Could not get TaskQueue API handler and hence' \ - 'exiting: %s' % str(http_error)) - sys.exit() - - def _can_lease(self): - """Determines if new tasks can be leased. - - Determines if new taks can be leased based on - 1. Number of tasks already running in the system. - 2. Limit on accessing the taskqueue apirary. - - Returns: - True/False. - """ - if self._num_tasks_to_lease() > 0 and not self._is_rate_exceeded(): - return True - else: - return False - - def _is_rate_exceeded(self): - - """Determines if requests/second to TaskQueue API has exceeded limit. - - We do not access the APIs beyond the specified permissible limit. - If we have run N tasks in elapsed time since last lease, we have - already made N+1 requests to API (1 for collective lease and N for - their individual delete operations). If K reqs/sec is the limit on - accessing APIs, then we sould not invoke any request to API before - N+1/K sec approximately. The above condition is formulated in the - following method. - Returns: - True/False - """ - if not FLAGS.taskapi_requests_per_sec: - return False - if not self._last_lease_time: - return False - curr_time = time.time() - if ((curr_time - self._last_lease_time) < - ((1.0 * (self._num_last_leased_tasks - - len(self._taskprocess_map)) / - FLAGS.taskapi_requests_per_sec))): - return True - else: - return False - - def _num_tasks_to_lease(self): - - """Determines how many tasks can be leased. - - num_tasks is upper limit to running tasks in the system and hence - number of tasks which could be leased is difference of numtasks and - currently running tasks. - - Returns: - Number of tasks to lease. - """ - return FLAGS.num_tasks - len(self._taskprocess_map) - - def _update_last_lease_info(self, result): - - """Updates the information regarding last lease. - - Args: - result: Response object from TaskQueue API, containing list of - tasks. - """ - self._last_lease_time = time.time() - if result: - if result.get('items'): - self._num_last_leased_tasks = len(result.get('items')) - else: - self._num_last_leased_tasks = 0 - else: - self._num_last_leased_tasks = 0 - - def _update_poll_timeout_start(self): - - """Updates the start time for poll-timeout.""" - if not self._poll_timeout_start: - self._poll_timeout_start = time.time() - - def _continue_polling(self): - - """Checks whether lease can be invoked based on running tasks and - timeout. - - Lease can be invoked if - 1. Running tasks in the sytem has gone below the specified - threshold (min_running_tasks). - 2. Wait time has exceeded beyond time-out specified and at least one - tas has finished since last lease invocation. - - By doing this, we are essentially trying to batch the lease requests. - If this is not done and we start off leasing N tasks, its likely tasks - may finish slightly one after another, and we make N lease requests for - each task for next N tasks and so on. This can result in unnecessary - lease API call and hence to avoid that, we try and batch the lease - requests. Also we put certain limit on wait time for batching the - requests by incororating the time-out. - - Returns: - True/False - """ - if len(self._taskprocess_map) <= FLAGS.min_running_tasks: - return False - if self._poll_timeout_start: - elapsed_time = time.time() - self._poll_timeout_start - if elapsed_time > FLAGS.timeout_secs_for_next_lease_request: - self._poll_timeout_start = None - return False - return True - - def _get_tasks_from_queue(self): - - """Gets the available tasks from the taskqueue. - - Returns: - Lease response object. - """ - try: - tasks_to_fetch = self._num_tasks_to_lease() - lease_req = self.task_api.tasks().lease( - project=FLAGS.project_name, - taskqueue=FLAGS.taskqueue_name, - leaseSecs=FLAGS.lease_secs, - numTasks=tasks_to_fetch, - body={}) - result = lease_req.execute() - return result - except HttpError, http_error: - logger.error('Error during lease request: %s' % str(http_error)) - return None - - def _create_subprocesses_for_tasks(self, result): - - """Spawns parallel sub processes to execute tasks for better - throughput. - - Args: - result: lease resonse dictionary object. - """ - if not result: - logger.info('Error: result is not defined') - return None - if result.get('items'): - for task in result.get('items'): - task_id = task.get('id') - # Given that a task may be leased multiple times, we may get a - # task which we are currently executing on, so make sure we - # dont spaw another subprocess for it. - if task_id not in self._taskprocess_map: - ct = ClientTask(task) - # Check if tasks got initialized properly and then pu them - # in running tasks map. - if ct.init(): - # Put the clientTask objects in a dictionary to keep - # track of stats and objects are used later to delete - # the tasks from taskqueue - self._taskprocess_map[ct.get_task_id()] = ct - - def _poll_running_tasks(self): - - """Polls all the running tasks and delete them from taskqueue if - completed.""" - if self._taskprocess_map: - for task in self._taskprocess_map.values(): - if task.is_completed(self.task_api): - del self._taskprocess_map[task.get_task_id()] - # updates scheduling information for later use. - self._update_poll_timeout_start() - - def _sleep_before_next_lease(self): - - """Sleeps before invoking lease if required based on last lease info. - - It sleeps when no tasks were found on the taskqueue during last lease - request. To note, it discount the time taken in polling the tasks and - sleeps for (sleep_interval - time taken in poll). This avoids the - unnecessary wait if tasks could be leased. If no time was taken in - poll since there were not tasks in the system, it waits for full sleep - interval and thus optimizes the CPU cycles. - It does not sleep if the method is called for the first time (when no - lease request has ever been made). - """ - if not self._last_lease_time: - sleep_secs = 0 - elif self._num_last_leased_tasks <= 0: - time_elpased_since_last_lease = time.time() - self._last_lease_time - sleep_secs = (FLAGS.sleep_interval_secs - - time_elpased_since_last_lease) - if sleep_secs > 0: - logger.info('No tasks found and hence sleeping for sometime') - time.sleep(FLAGS.sleep_interval_secs) - - def lease_tasks(self): - - """Requests lease for specified number of tasks. - - It invokes lease request for appropriate number of tasks, spawns - parallel processes to execute them and also maintains scheduling - information. - - LeaseTask also takes care of waiting(sleeping) before invoking lease if - there are no tasks which can be leased in the taskqueue. This results - in better resource utilization. Apart from this, it also controls the - number of requests being sent to taskqueue APIs. - - Returns: - True/False based on if tasks could be leased or not. - """ - self._sleep_before_next_lease() - if self._can_lease(): - result = self._get_tasks_from_queue() - self._update_last_lease_info(result) - self._create_subprocesses_for_tasks(result) - return True - return False - - def poll_tasks(self): - - """Polls the status of running tasks of the system. - - Polls the status of tasks and then decides if it should continue to - poll depending on number of tasks running in the system and timeouts. - Instead of polling in a tight loop, it sleeps for sometime before the - next poll to avoid any unnecessary CPU cycles. poll_tasks returns - only when system has capability to accomodate at least one new task. - """ - - self._poll_running_tasks() - while self._continue_polling(): - logger.info('Sleeping before next poll') - time.sleep(FLAGS.sleep_before_next_poll_secs) - self._poll_running_tasks() - - -def main(argv): - - """Infinite loop to lease new tasks and poll them for completion.""" - # Settings for logger - set_logger() - # Instantiate puller - puller = TaskQueuePuller() - while True: - puller.lease_tasks() - puller.poll_tasks() - -if __name__ == '__main__': - app.run() diff --git a/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py deleted file mode 100644 index d4d651a..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/task_cmds.py +++ /dev/null @@ -1,211 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -"""Commands to interact with the Task object of the TaskQueue API.""" - - -__version__ = '0.0.1' - - - -from gtaskqueue.taskqueue_cmd_base import GoogleTaskCommand - -from google.apputils import app -from google.apputils import appcommands -import gflags as flags - -FLAGS = flags.FLAGS - - -class GetTaskCommand(GoogleTaskCommand): - """Get properties of an existing task.""" - - def __init__(self, name, flag_values): - super(GetTaskCommand, self).__init__(name, flag_values) - - def build_request(self, task_api, flag_values): - """Build a request to get properties of a Task. - - Args: - task_api: The handle to the task collection API. - flag_values: The parsed command flags. - Returns: - The properties of the task. - """ - return task_api.get(project=flag_values.project_name, - taskqueue=flag_values.taskqueue_name, - task=flag_values.task_name) - - -class LeaseTaskCommand(GoogleTaskCommand): - """Lease a new task from the queue.""" - - def __init__(self, name, flag_values): - flags.DEFINE_integer('lease_secs', - None, - 'The lease for the task in seconds', - flag_values=flag_values) - flags.DEFINE_integer('num_tasks', - 1, - 'The number of tasks to lease', - flag_values=flag_values) - flags.DEFINE_integer('payload_size_to_display', - 2 * 1024 * 1024, - 'Size of the payload for leased tasks to show', - flag_values=flag_values) - super(LeaseTaskCommand, self).__init__(name, - flag_values, - need_task_flag=False) - - def build_request(self, task_api, flag_values): - """Build a request to lease a pending task from the TaskQueue. - - Args: - task_api: The handle to the task collection API. - flag_values: The parsed command flags. - Returns: - A new leased task. - """ - if not flag_values.lease_secs: - raise app.UsageError('lease_secs must be specified') - - return task_api.lease(project=flag_values.project_name, - taskqueue=flag_values.taskqueue_name, - leaseSecs=flag_values.lease_secs, - numTasks=flag_values.num_tasks) - - def print_result(self, result): - """Override to optionally strip the payload since it can be long.""" - if result.get('items'): - items = [] - for task in result.get('items'): - payloadlen = len(task['payloadBase64']) - if payloadlen > FLAGS.payload_size_to_display: - extra = payloadlen - FLAGS.payload_size_to_display - task['payloadBase64'] = ('%s(%d more bytes)' % - (task['payloadBase64'][:FLAGS.payload_size_to_display], - extra)) - items.append(task) - result['items'] = items - GoogleTaskCommand.print_result(self, result) - - -class DeleteTaskCommand(GoogleTaskCommand): - """Delete an existing task.""" - - def __init__(self, name, flag_values): - super(DeleteTaskCommand, self).__init__(name, flag_values) - - def build_request(self, task_api, flag_values): - """Build a request to delete a Task. - - Args: - task_api: The handle to the taskqueue collection API. - flag_values: The parsed command flags. - Returns: - Whether the delete was successful. - """ - return task_api.delete(project=flag_values.project_name, - taskqueue=flag_values.taskqueue_name, - task=flag_values.task_name) - - -class ListTasksCommand(GoogleTaskCommand): - """Lists all tasks in a queue (currently upto a max of 100).""" - - def __init__(self, name, flag_values): - super(ListTasksCommand, self).__init__(name, - flag_values, - need_task_flag=False) - - def build_request(self, task_api, flag_values): - """Build a request to lists tasks in a queue. - - Args: - task_api: The handle to the taskqueue collection API. - flag_values: The parsed command flags. - Returns: - A list of pending tasks in the queue. - """ - return task_api.list(project=flag_values.project_name, - taskqueue=flag_values.taskqueue_name) - - -class ClearTaskQueueCommand(GoogleTaskCommand): - """Deletes all tasks in a queue (default to a max of 100).""" - - - def __init__(self, name, flag_values): - flags.DEFINE_integer('max_delete', 100, 'How many to clear at most', - flag_values=flag_values) - super(ClearTaskQueueCommand, self).__init__(name, - flag_values, - need_task_flag=False) - - def run_with_api_and_flags(self, api, flag_values): - """Run the command, returning the result. - - Args: - api: The handle to the Google TaskQueue API. - flag_values: The parsed command flags. - Returns: - The result of running the command. - """ - tasks_api = api.tasks() - self._flag_values = flag_values - self._to_delete = flag_values.max_delete - total_deleted = 0 - while self._to_delete > 0: - n_deleted = self._delete_a_batch(tasks_api) - if n_deleted <= 0: - break - total_deleted += n_deleted - return {'deleted': total_deleted} - - def _delete_a_batch(self, tasks): - """Delete a batch of tasks. - - Since the list method only gives us back 100 at a time, we may have - to call it several times to clear the entire queue. - - Args: - tasks: The handle to the Google TaskQueue API Tasks resource. - Returns: - The number of tasks deleted. - """ - list_request = tasks.list(project=self._flag_values.project_name, - taskqueue=self._flag_values.taskqueue_name) - result = list_request.execute() - n_deleted = 0 - if result: - for task in result.get('items', []): - if self._to_delete > 0: - self._to_delete -= 1 - n_deleted += 1 - print 'Deleting: %s' % task['id'] - tasks.delete(project=self._flag_values.project_name, - taskqueue=self._flag_values.taskqueue_name, - task=task['id']).execute() - return n_deleted - - -def add_commands(): - appcommands.AddCmd('listtasks', ListTasksCommand) - appcommands.AddCmd('gettask', GetTaskCommand) - appcommands.AddCmd('deletetask', DeleteTaskCommand) - appcommands.AddCmd('leasetask', LeaseTaskCommand) - appcommands.AddCmd('clear', ClearTaskQueueCommand) diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py deleted file mode 100644 index 1df012d..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_client.py +++ /dev/null @@ -1,189 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Class to connect to TaskQueue API.""" - - - -import os -import sys -import urlparse -from apiclient.discovery import build -from apiclient.errors import HttpError -import httplib2 -from oauth2client.anyjson import simplejson as json -from oauth2client.file import Storage -from oauth2client.client import OAuth2WebServerFlow -from oauth2client.tools import run -from gtaskqueue.taskqueue_logger import logger - -from google.apputils import app -import gflags as flags - -FLAGS = flags.FLAGS -flags.DEFINE_string( - 'service_version', - 'v1beta1', - 'Google taskqueue api version.') -flags.DEFINE_string( - 'api_host', - 'https://www.googleapis.com/', - 'API host name') -flags.DEFINE_bool( - 'use_developer_key', - False, - 'User wants to use the developer key while accessing taskqueue apis') -flags.DEFINE_string( - 'developer_key_file', - '~/.taskqueue.apikey', - 'Developer key provisioned from api console') -flags.DEFINE_bool( - 'dump_request', - False, - 'Prints the outgoing HTTP request along with headers and body.') -flags.DEFINE_string( - 'credentials_file', - 'taskqueue.dat', - 'File where you want to store the auth credentails for later user') - -# Set up a Flow object to be used if we need to authenticate. This -# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with -# the information it needs to authenticate. Note that it is called -# the Web Server Flow, but it can also handle the flow for native -# applications -# The client_id client_secret are copied from the Identity tab on -# the Google APIs Console -FLOW = OAuth2WebServerFlow( - client_id='157776985798.apps.googleusercontent.com', - client_secret='tlpVCmaS6yLjxnnPu0ARIhNw', - scope='https://www.googleapis.com/auth/taskqueue', - user_agent='taskqueue-cmdline-sample/1.0') - - -class TaskQueueClient: - """Class to setup connection with taskqueue API.""" - - def __init__(self): - if not FLAGS.project_name: - raise app.UsageError('You must specify a project name' - ' using the "--project_name" flag.') - discovery_uri = ( - FLAGS.api_host + 'discovery/v1/apis/{api}/{apiVersion}/rest') - logger.info(discovery_uri) - try: - # If the Credentials don't exist or are invalid run through the - # native clien flow. The Storage object will ensure that if - # successful the good Credentials will get written back to a file. - # Setting FLAGS.auth_local_webserver to false since we can run our - # tool on Virtual Machines and we do not want to run the webserver - # on VMs. - FLAGS.auth_local_webserver = False - storage = Storage(FLAGS.credentials_file) - credentials = storage.get() - if credentials is None or credentials.invalid == True: - credentials = run(FLOW, storage) - http = credentials.authorize(self._dump_request_wrapper( - httplib2.Http())) - self.task_api = build('taskqueue', - FLAGS.service_version, - http=http, - discoveryServiceUrl=discovery_uri) - except HttpError, http_error: - logger.error('Error gettin task_api: %s' % http_error) - - def get_taskapi(self): - """Returns handler for tasks API from taskqueue API collection.""" - return self.task_api - - - def _dump_request_wrapper(self, http): - """Dumps the outgoing HTTP request if requested. - - Args: - http: An instance of httplib2.Http or something that acts like it. - - Returns: - httplib2.Http like object. - """ - request_orig = http.request - - def new_request(uri, method='GET', body=None, headers=None, - redirections=httplib2.DEFAULT_MAX_REDIRECTS, - connection_type=None): - """Overrides the http.request method to add some utilities.""" - if (FLAGS.api_host + "discovery/" not in uri and - FLAGS.use_developer_key): - developer_key_path = os.path.expanduser( - FLAGS.developer_key_file) - if not os.path.isfile(developer_key_path): - print 'Please generate developer key from the Google API' \ - 'Console and store it in %s' % (FLAGS.developer_key_file) - sys.exit() - developer_key_file = open(developer_key_path, 'r') - try: - developer_key = developer_key_file.read().strip() - except IOError, io_error: - print 'Error loading developer key from file %s' % ( - FLAGS.developer_key_file) - print 'Error details: %s' % str(io_error) - sys.exit() - finally: - developer_key_file.close() - s = urlparse.urlparse(uri) - query = 'key=' + developer_key - if s.query: - query = s.query + '&key=' + developer_key - d = urlparse.ParseResult(s.scheme, - s.netloc, - s.path, - s.params, - query, - s.fragment) - uri = urlparse.urlunparse(d) - if FLAGS.dump_request: - print '--request-start--' - print '%s %s' % (method, uri) - if headers: - for (h, v) in headers.iteritems(): - print '%s: %s' % (h, v) - print '' - if body: - print json.dumps(json.loads(body), - sort_keys=True, - indent=2) - print '--request-end--' - return request_orig(uri, - method, - body, - headers, - redirections, - connection_type) - http.request = new_request - return http - - def print_result(self, result): - """Pretty-print the result of the command. - - The default behavior is to dump a formatted JSON encoding - of the result. - - Args: - result: The JSON-serializable result to print. - """ - # We could have used the pprint module, but it produces - # noisy output due to all of our keys and values being - # unicode strings rather than simply ascii. - print json.dumps(result, sort_keys=True, indent=2) diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py deleted file mode 100644 index 0400acd..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmd_base.py +++ /dev/null @@ -1,275 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -"""Commands for interacting with Google TaskQueue.""" - -__version__ = '0.0.1' - - -import os -import sys -import urlparse - - -from apiclient.discovery import build -from apiclient.errors import HttpError -import httplib2 -from oauth2client.anyjson import simplejson as json -from oauth2client.file import Storage -from oauth2client.client import OAuth2WebServerFlow -from oauth2client.tools import run - -from google.apputils import app -from google.apputils import appcommands -import gflags as flags - - -FLAGS = flags.FLAGS - -flags.DEFINE_string( - 'service_version', - 'v1beta1', - 'Google taskqueue api version.') -flags.DEFINE_string( - 'api_host', - 'https://www.googleapis.com/', - 'API host name') -flags.DEFINE_string( - 'project_name', - 'default', - 'The name of the Taskqueue API project.') -flags.DEFINE_bool( - 'use_developer_key', - False, - 'User wants to use the developer key while accessing taskqueue apis') -flags.DEFINE_string( - 'developer_key_file', - '~/.taskqueue.apikey', - 'Developer key provisioned from api console') -flags.DEFINE_bool( - 'dump_request', - False, - 'Prints the outgoing HTTP request along with headers and body.') -flags.DEFINE_string( - 'credentials_file', - 'taskqueue.dat', - 'File where you want to store the auth credentails for later user') - -# Set up a Flow object to be used if we need to authenticate. This -# sample uses OAuth 2.0, and we set up the OAuth2WebServerFlow with -# the information it needs to authenticate. Note that it is called -# the Web Server Flow, but it can also handle the flow for native -# applications -# The client_id client_secret are copied from the Identity tab on -# the Google APIs Console -FLOW = OAuth2WebServerFlow( - client_id='157776985798.apps.googleusercontent.com', - client_secret='tlpVCmaS6yLjxnnPu0ARIhNw', - scope='https://www.googleapis.com/auth/taskqueue', - user_agent='taskqueue-cmdline-sample/1.0') - -class GoogleTaskQueueCommandBase(appcommands.Cmd): - """Base class for all the Google TaskQueue client commands.""" - - DEFAULT_PROJECT_PATH = 'projects/default' - - def __init__(self, name, flag_values): - super(GoogleTaskQueueCommandBase, self).__init__(name, flag_values) - - def _dump_request_wrapper(self, http): - """Dumps the outgoing HTTP request if requested. - - Args: - http: An instance of httplib2.Http or something that acts like it. - - Returns: - httplib2.Http like object. - """ - request_orig = http.request - - def new_request(uri, method='GET', body=None, headers=None, - redirections=httplib2.DEFAULT_MAX_REDIRECTS, - connection_type=None): - """Overrides the http.request method to add some utilities.""" - if (FLAGS.api_host + "discovery/" not in uri and - FLAGS.use_developer_key): - developer_key_path = os.path.expanduser( - FLAGS.developer_key_file) - if not os.path.isfile(developer_key_path): - print 'Please generate developer key from the Google APIs' \ - 'Console and store it in %s' % (FLAGS.developer_key_file) - sys.exit() - developer_key_file = open(developer_key_path, 'r') - try: - developer_key = developer_key_file.read().strip() - except IOError, io_error: - print 'Error loading developer key from file %s' % ( - FLAGS.developer_key_file) - print 'Error details: %s' % str(io_error) - sys.exit() - finally: - developer_key_file.close() - s = urlparse.urlparse(uri) - query = 'key=' + developer_key - if s.query: - query = s.query + '&key=' + developer_key - d = urlparse.ParseResult(s.scheme, - s.netloc, - s.path, - s.params, - query, - s.fragment) - uri = urlparse.urlunparse(d) - - if FLAGS.dump_request: - print '--request-start--' - print '%s %s' % (method, uri) - if headers: - for (h, v) in headers.iteritems(): - print '%s: %s' % (h, v) - print '' - if body: - print json.dumps(json.loads(body), sort_keys=True, indent=2) - print '--request-end--' - return request_orig(uri, - method, - body, - headers, - redirections, - connection_type) - http.request = new_request - return http - - def Run(self, argv): - """Run the command, printing the result. - - Args: - argv: The non-flag arguments to the command. - """ - if not FLAGS.project_name: - raise app.UsageError('You must specify a project name' - ' using the "--project_name" flag.') - discovery_uri = ( - FLAGS.api_host + 'discovery/v1/apis/{api}/{apiVersion}/rest') - try: - # If the Credentials don't exist or are invalid run through the - # native client flow. The Storage object will ensure that if - # successful the good Credentials will get written back to a file. - # Setting FLAGS.auth_local_webserver to false since we can run our - # tool on Virtual Machines and we do not want to run the webserver - # on VMs. - FLAGS.auth_local_webserver = False - storage = Storage(FLAGS.credentials_file) - credentials = storage.get() - if credentials is None or credentials.invalid == True: - credentials = run(FLOW, storage) - http = credentials.authorize(self._dump_request_wrapper( - httplib2.Http())) - api = build('taskqueue', - FLAGS.service_version, - http=http, - discoveryServiceUrl=discovery_uri) - result = self.run_with_api_and_flags_and_args(api, FLAGS, argv) - self.print_result(result) - except HttpError, http_error: - print 'Error Processing request: %s' % str(http_error) - - def run_with_api_and_flags_and_args(self, api, flag_values, unused_argv): - """Run the command given the API, flags, and args. - - The default implementation of this method discards the args and - calls into run_with_api_and_flags. - - Args: - api: The handle to the Google TaskQueue API. - flag_values: The parsed command flags. - unused_argv: The non-flag arguments to the command. - Returns: - The result of running the command - """ - return self.run_with_api_and_flags(api, flag_values) - - def print_result(self, result): - """Pretty-print the result of the command. - - The default behavior is to dump a formatted JSON encoding - of the result. - - Args: - result: The JSON-serializable result to print. - """ - # We could have used the pprint module, but it produces - # noisy output due to all of our keys and values being - # unicode strings rather than simply ascii. - print json.dumps(result, sort_keys=True, indent=2) - - - -class GoogleTaskQueueCommand(GoogleTaskQueueCommandBase): - """Base command for working with the taskqueues collection.""" - - def __init__(self, name, flag_values): - super(GoogleTaskQueueCommand, self).__init__(name, flag_values) - flags.DEFINE_string('taskqueue_name', - 'myqueue', - 'TaskQueue name', - flag_values=flag_values) - - def run_with_api_and_flags(self, api, flag_values): - """Run the command, returning the result. - - Args: - api: The handle to the Google TaskQueue API. - flag_values: The parsed command flags. - Returns: - The result of running the command. - """ - taskqueue_request = self.build_request(api.taskqueues(), flag_values) - return taskqueue_request.execute() - - -class GoogleTaskCommand(GoogleTaskQueueCommandBase): - """Base command for working with the tasks collection.""" - - def __init__(self, name, flag_values, need_task_flag=True): - super(GoogleTaskCommand, self).__init__(name, flag_values) - # Common flags that are shared by all the Task commands. - flags.DEFINE_string('taskqueue_name', - 'myqueue', - 'TaskQueue name', - flag_values=flag_values) - # Not all task commands need the task_name flag. - if need_task_flag: - flags.DEFINE_string('task_name', - None, - 'Task name', - flag_values=flag_values) - - def run_with_api_and_flags(self, api, flag_values): - """Run the command, returning the result. - - Args: - api: The handle to the Google TaskQueue API. - flag_values: The parsed command flags. - flags.DEFINE_string('payload', - None, - 'Payload of the task') - Returns: - The result of running the command. - """ - task_request = self.build_request(api.tasks(), flag_values) - return task_request.execute() diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py deleted file mode 100644 index f21840a..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_cmds.py +++ /dev/null @@ -1,57 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Commands to interact with the TaskQueue object of the TaskQueue API.""" - - -__version__ = '0.0.1' - - - -from gtaskqueue.taskqueue_cmd_base import GoogleTaskQueueCommand - -from google.apputils import appcommands -import gflags as flags - -FLAGS = flags.FLAGS - - -class GetTaskQueueCommand(GoogleTaskQueueCommand): - """Get properties of an existing task queue.""" - - def __init__(self, name, flag_values): - flags.DEFINE_boolean('get_stats', - False, - 'Whether to get Stats', - flag_values=flag_values) - super(GetTaskQueueCommand, self).__init__(name, flag_values) - - def build_request(self, taskqueue_api, flag_values): - """Build a request to get properties of a TaskQueue. - - Args: - taskqueue_api: The handle to the taskqueue collection API. - flag_values: The parsed command flags. - Returns: - The properties of the taskqueue. - """ - return taskqueue_api.get(project=flag_values.project_name, - taskqueue=flag_values.taskqueue_name, - getStats=flag_values.get_stats) - - -def add_commands(): - appcommands.AddCmd('getqueue', GetTaskQueueCommand) diff --git a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py b/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py deleted file mode 100644 index 3f445f3..0000000 --- a/samples/gtaskqueue_sample/gtaskqueue/taskqueue_logger.py +++ /dev/null @@ -1,54 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Log settings for taskqueue_puller module.""" - - - -import logging -import logging.config -from google.apputils import app -import gflags as flags - -FLAGS = flags.FLAGS -flags.DEFINE_string( - 'log_output_file', - '/tmp/taskqueue-puller.log', - 'Logfile name for taskqueue_puller.') - - -logger = logging.getLogger('TaskQueueClient') - - -def set_logger(): - """Settings for taskqueue_puller logger.""" - logger.setLevel(logging.INFO) - - # create formatter - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - - # Set size of the log file and the backup count for rotated log files. - handler = logging.handlers.RotatingFileHandler(FLAGS.log_output_file, - maxBytes = 1024 * 1024, - backupCount = 5) - # add formatter to handler - handler.setFormatter(formatter) - # add formatter to handler - logger.addHandler(handler) - -if __name__ == '__main__': - app.run() diff --git a/samples/gtaskqueue_sample/setup.py b/samples/gtaskqueue_sample/setup.py deleted file mode 100644 index b7eff8f..0000000 --- a/samples/gtaskqueue_sample/setup.py +++ /dev/null @@ -1,53 +0,0 @@ -#!/usr/bin/env python -# -# Copyright (C) 2010 Google Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Setup script for the Google TaskQueue API command-line tool.""" - -__version__ = '1.0.2' - - -import sys -try: - from setuptools import setup - print 'Loaded setuptools' -except ImportError: - from distutils.core import setup - print 'Loaded distutils.core' - - -PACKAGE_NAME = 'google-taskqueue-client' -INSTALL_REQUIRES = ['google-apputils==0.1', - 'google-api-python-client', - 'httplib2', - 'oauth2', - 'python-gflags'] -setup(name=PACKAGE_NAME, - version=__version__, - description='Google TaskQueue API command-line tool and utils', - author='Google Inc.', - author_email='google-appengine@googlegroups.com', - url='http://code.google.com/appengine/docs/python/taskqueue/pull/overview.html', - install_requires=INSTALL_REQUIRES, - packages=['gtaskqueue'], - scripts=['gtaskqueue/gtaskqueue', 'gtaskqueue/gtaskqueue_puller', - 'gtaskqueue/gen_appengine_access_token'], - license='Apache 2.0', - keywords='google taskqueue api client', - classifiers=['Development Status :: 3 - Alpha', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: POSIX', - 'Topic :: Internet :: WWW/HTTP'])