#!/usr/bin/env python # # Copyright (C) 2010 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Module to pull tasks from TaskQueues and execute them. This module does the following in an infinite loop. 1. Connects to Task API (of TaskQueues API collection) to request lease on certain number of tasks (specified by user). 2. Spawns parallel processes to execute the leased tasks. 3. Polls all the tasks continously till they finish. 4. Deletes the tasks from taskqueue on their successful completion. 5. It lets the user specify when to invoke the lease request instead of polling tasks status in a tight loop for better resource utilization: a. Invoke the Lease request when runnning tasks go beyound certain threshold (min_running_tasks) b. Wait time becomes more than specified poll-time-out interval. 6. Repeat the steps from 1 to 5 when either all tasks have finished executing or one of the conditions in 5) is met. """ import sys import time from apiclient.errors import HttpError from gtaskqueue.client_task import ClientTask from gtaskqueue.taskqueue_client import TaskQueueClient from gtaskqueue.taskqueue_logger import logger from gtaskqueue.taskqueue_logger import set_logger from google.apputils import app import gflags as flags FLAGS = flags.FLAGS flags.DEFINE_string( 'project_name', 'default', 'The name of the Taskqueue API project.') flags.DEFINE_string( 'taskqueue_name', 'testpuller', 'taskqueue to which client wants to connect to') flags.DEFINE_integer( 'lease_secs', 30, 'The lease for the task in seconds') flags.DEFINE_integer( 'num_tasks', 10, 'The number of tasks to lease') flags.DEFINE_integer( 'min_running_tasks', 0, 'minmum number of tasks below which lease can be invoked') flags.DEFINE_float( 'sleep_interval_secs', 2, 'sleep interval when no tasks are found in the taskqueue') flags.DEFINE_float( 'timeout_secs_for_next_lease_request', 600, 'Wait time before next poll when no tasks are found in the' 'queue (in seconds)') flags.DEFINE_integer( 'taskapi_requests_per_sec', None, 'limit on task_api requests per second') flags.DEFINE_float( 'sleep_before_next_poll_secs', 2, 'sleep interval before next poll') class TaskQueuePuller(object): """Maintains state information for TaskQueuePuller.""" def __init__(self): self._last_lease_time = None self._poll_timeout_start = None self._num_last_leased_tasks = 0 # Dictionary for running tasks's ids and their corresponding # client_task object. self._taskprocess_map = {} try: self.__tcq = TaskQueueClient() self.task_api = self.__tcq.get_taskapi() except HttpError, http_error: logger.error('Could not get TaskQueue API handler and hence' \ 'exiting: %s' % str(http_error)) sys.exit() def _can_lease(self): """Determines if new tasks can be leased. Determines if new taks can be leased based on 1. Number of tasks already running in the system. 2. Limit on accessing the taskqueue apirary. Returns: True/False. """ if self._num_tasks_to_lease() > 0 and not self._is_rate_exceeded(): return True else: return False def _is_rate_exceeded(self): """Determines if requests/second to TaskQueue API has exceeded limit. We do not access the APIs beyond the specified permissible limit. If we have run N tasks in elapsed time since last lease, we have already made N+1 requests to API (1 for collective lease and N for their individual delete operations). If K reqs/sec is the limit on accessing APIs, then we sould not invoke any request to API before N+1/K sec approximately. The above condition is formulated in the following method. Returns: True/False """ if not FLAGS.taskapi_requests_per_sec: return False if not self._last_lease_time: return False curr_time = time.time() if ((curr_time - self._last_lease_time) < ((1.0 * (self._num_last_leased_tasks - len(self._taskprocess_map)) / FLAGS.taskapi_requests_per_sec))): return True else: return False def _num_tasks_to_lease(self): """Determines how many tasks can be leased. num_tasks is upper limit to running tasks in the system and hence number of tasks which could be leased is difference of numtasks and currently running tasks. Returns: Number of tasks to lease. """ return FLAGS.num_tasks - len(self._taskprocess_map) def _update_last_lease_info(self, result): """Updates the information regarding last lease. Args: result: Response object from TaskQueue API, containing list of tasks. """ self._last_lease_time = time.time() if result: if result.get('items'): self._num_last_leased_tasks = len(result.get('items')) else: self._num_last_leased_tasks = 0 else: self._num_last_leased_tasks = 0 def _update_poll_timeout_start(self): """Updates the start time for poll-timeout.""" if not self._poll_timeout_start: self._poll_timeout_start = time.time() def _continue_polling(self): """Checks whether lease can be invoked based on running tasks and timeout. Lease can be invoked if 1. Running tasks in the sytem has gone below the specified threshold (min_running_tasks). 2. Wait time has exceeded beyond time-out specified and at least one tas has finished since last lease invocation. By doing this, we are essentially trying to batch the lease requests. If this is not done and we start off leasing N tasks, its likely tasks may finish slightly one after another, and we make N lease requests for each task for next N tasks and so on. This can result in unnecessary lease API call and hence to avoid that, we try and batch the lease requests. Also we put certain limit on wait time for batching the requests by incororating the time-out. Returns: True/False """ if len(self._taskprocess_map) <= FLAGS.min_running_tasks: return False if self._poll_timeout_start: elapsed_time = time.time() - self._poll_timeout_start if elapsed_time > FLAGS.timeout_secs_for_next_lease_request: self._poll_timeout_start = None return False return True def _get_tasks_from_queue(self): """Gets the available tasks from the taskqueue. Returns: Lease response object. """ try: tasks_to_fetch = self._num_tasks_to_lease() lease_req = self.task_api.tasks().lease( project=FLAGS.project_name, taskqueue=FLAGS.taskqueue_name, leaseSecs=FLAGS.lease_secs, numTasks=tasks_to_fetch, body={}) result = lease_req.execute() return result except HttpError, http_error: logger.error('Error during lease request: %s' % str(http_error)) return None def _create_subprocesses_for_tasks(self, result): """Spawns parallel sub processes to execute tasks for better throughput. Args: result: lease resonse dictionary object. """ if not result: logger.info('Error: result is not defined') return None if result.get('items'): for task in result.get('items'): task_id = task.get('id') # Given that a task may be leased multiple times, we may get a # task which we are currently executing on, so make sure we # dont spaw another subprocess for it. if task_id not in self._taskprocess_map: ct = ClientTask(task) # Check if tasks got initialized properly and then pu them # in running tasks map. if ct.init(): # Put the clientTask objects in a dictionary to keep # track of stats and objects are used later to delete # the tasks from taskqueue self._taskprocess_map[ct.get_task_id()] = ct def _poll_running_tasks(self): """Polls all the running tasks and delete them from taskqueue if completed.""" if self._taskprocess_map: for task in self._taskprocess_map.values(): if task.is_completed(self.task_api): del self._taskprocess_map[task.get_task_id()] # updates scheduling information for later use. self._update_poll_timeout_start() def _sleep_before_next_lease(self): """Sleeps before invoking lease if required based on last lease info. It sleeps when no tasks were found on the taskqueue during last lease request. To note, it discount the time taken in polling the tasks and sleeps for (sleep_interval - time taken in poll). This avoids the unnecessary wait if tasks could be leased. If no time was taken in poll since there were not tasks in the system, it waits for full sleep interval and thus optimizes the CPU cycles. It does not sleep if the method is called for the first time (when no lease request has ever been made). """ if not self._last_lease_time: sleep_secs = 0 elif self._num_last_leased_tasks <= 0: time_elpased_since_last_lease = time.time() - self._last_lease_time sleep_secs = (FLAGS.sleep_interval_secs - time_elpased_since_last_lease) if sleep_secs > 0: logger.info('No tasks found and hence sleeping for sometime') time.sleep(FLAGS.sleep_interval_secs) def lease_tasks(self): """Requests lease for specified number of tasks. It invokes lease request for appropriate number of tasks, spawns parallel processes to execute them and also maintains scheduling information. LeaseTask also takes care of waiting(sleeping) before invoking lease if there are no tasks which can be leased in the taskqueue. This results in better resource utilization. Apart from this, it also controls the number of requests being sent to taskqueue APIs. Returns: True/False based on if tasks could be leased or not. """ self._sleep_before_next_lease() if self._can_lease(): result = self._get_tasks_from_queue() self._update_last_lease_info(result) self._create_subprocesses_for_tasks(result) return True return False def poll_tasks(self): """Polls the status of running tasks of the system. Polls the status of tasks and then decides if it should continue to poll depending on number of tasks running in the system and timeouts. Instead of polling in a tight loop, it sleeps for sometime before the next poll to avoid any unnecessary CPU cycles. poll_tasks returns only when system has capability to accomodate at least one new task. """ self._poll_running_tasks() while self._continue_polling(): logger.info('Sleeping before next poll') time.sleep(FLAGS.sleep_before_next_poll_secs) self._poll_running_tasks() def main(argv): """Infinite loop to lease new tasks and poll them for completion.""" # Settings for logger set_logger() # Instantiate puller puller = TaskQueuePuller() while True: puller.lease_tasks() puller.poll_tasks() if __name__ == '__main__': app.run()