197 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			197 lines
		
	
	
		
			7.1 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright 2013-2014 DataStax, Inc.
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
# http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
import six
 | 
						|
import sys
 | 
						|
 | 
						|
from itertools import count, cycle
 | 
						|
import logging
 | 
						|
from six.moves import xrange
 | 
						|
from threading import Event
 | 
						|
 | 
						|
from cassandra.cluster import PagedResult
 | 
						|
 | 
						|
log = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def execute_concurrent(session, statements_and_parameters, concurrency=100, raise_on_first_error=True):
 | 
						|
    """
 | 
						|
    Executes a sequence of (statement, parameters) tuples concurrently.  Each
 | 
						|
    ``parameters`` item must be a sequence or :const:`None`.
 | 
						|
 | 
						|
    A sequence of ``(success, result_or_exc)`` tuples is returned in the same
 | 
						|
    order that the statements were passed in.  If ``success`` is :const:`False`,
 | 
						|
    there was an error executing the statement, and ``result_or_exc`` will be
 | 
						|
    an :class:`Exception`.  If ``success`` is :const:`True`, ``result_or_exc``
 | 
						|
    will be the query result.
 | 
						|
 | 
						|
    If `raise_on_first_error` is left as :const:`True`, execution will stop
 | 
						|
    after the first failed statement and the corresponding exception will be
 | 
						|
    raised.
 | 
						|
 | 
						|
    The `concurrency` parameter controls how many statements will be executed
 | 
						|
    concurrently.  When :attr:`.Cluster.protocol_version` is set to 1 or 2,
 | 
						|
    it is recommended that this be kept below 100 times the number of
 | 
						|
    core connections per host times the number of connected hosts (see
 | 
						|
    :meth:`.Cluster.set_core_connections_per_host`).  If that amount is exceeded,
 | 
						|
    the event loop thread may attempt to block on new connection creation,
 | 
						|
    substantially impacting throughput.  If :attr:`~.Cluster.protocol_version`
 | 
						|
    is 3 or higher, you can safely experiment with higher levels of concurrency.
 | 
						|
 | 
						|
    Example usage::
 | 
						|
 | 
						|
        select_statement = session.prepare("SELECT * FROM users WHERE id=?")
 | 
						|
 | 
						|
        statements_and_params = []
 | 
						|
        for user_id in user_ids:
 | 
						|
            params = (user_id, )
 | 
						|
            statements_and_params.append((select_statement, params))
 | 
						|
 | 
						|
        results = execute_concurrent(
 | 
						|
            session, statements_and_params, raise_on_first_error=False)
 | 
						|
 | 
						|
        for (success, result) in results:
 | 
						|
            if not success:
 | 
						|
                handle_error(result)  # result will be an Exception
 | 
						|
            else:
 | 
						|
                process_user(result[0])  # result will be a list of rows
 | 
						|
 | 
						|
    """
 | 
						|
    if concurrency <= 0:
 | 
						|
        raise ValueError("concurrency must be greater than 0")
 | 
						|
 | 
						|
    if not statements_and_parameters:
 | 
						|
        return []
 | 
						|
 | 
						|
    # TODO handle iterators and generators naturally without converting the
 | 
						|
    # whole thing to a list.  This would require not building a result
 | 
						|
    # list of Nones up front (we don't know how many results there will be),
 | 
						|
    # so a dict keyed by index should be used instead.  The tricky part is
 | 
						|
    # knowing when you're the final statement to finish.
 | 
						|
    statements_and_parameters = list(statements_and_parameters)
 | 
						|
 | 
						|
    event = Event()
 | 
						|
    first_error = [] if raise_on_first_error else None
 | 
						|
    to_execute = len(statements_and_parameters)
 | 
						|
    results = [None] * to_execute
 | 
						|
    num_finished = count(1)
 | 
						|
    statements = enumerate(iter(statements_and_parameters))
 | 
						|
    for i in xrange(min(concurrency, len(statements_and_parameters))):
 | 
						|
        _execute_next(_sentinel, i, event, session, statements, results, None, num_finished, to_execute, first_error)
 | 
						|
 | 
						|
    event.wait()
 | 
						|
    if first_error:
 | 
						|
        exc = first_error[0]
 | 
						|
        if six.PY2 and isinstance(exc, tuple):
 | 
						|
            (exc_type, value, traceback) = exc
 | 
						|
            six.reraise(exc_type, value, traceback)
 | 
						|
        else:
 | 
						|
            raise exc
 | 
						|
    else:
 | 
						|
        return results
 | 
						|
 | 
						|
 | 
						|
def execute_concurrent_with_args(session, statement, parameters, *args, **kwargs):
 | 
						|
    """
 | 
						|
    Like :meth:`~cassandra.concurrent.execute_concurrent()`, but takes a single
 | 
						|
    statement and a sequence of parameters.  Each item in ``parameters``
 | 
						|
    should be a sequence or :const:`None`.
 | 
						|
 | 
						|
    Example usage::
 | 
						|
 | 
						|
        statement = session.prepare("INSERT INTO mytable (a, b) VALUES (1, ?)")
 | 
						|
        parameters = [(x,) for x in range(1000)]
 | 
						|
        execute_concurrent_with_args(session, statement, parameters, concurrency=50)
 | 
						|
    """
 | 
						|
    return execute_concurrent(session, list(zip(cycle((statement,)), parameters)), *args, **kwargs)
 | 
						|
 | 
						|
 | 
						|
_sentinel = object()
 | 
						|
 | 
						|
 | 
						|
def _handle_error(error, result_index, event, session, statements, results,
 | 
						|
                  future, num_finished, to_execute, first_error):
 | 
						|
    if first_error is not None:
 | 
						|
        first_error.append(error)
 | 
						|
        event.set()
 | 
						|
        return
 | 
						|
    else:
 | 
						|
        results[result_index] = (False, error)
 | 
						|
        if next(num_finished) >= to_execute:
 | 
						|
            event.set()
 | 
						|
            return
 | 
						|
 | 
						|
    try:
 | 
						|
        (next_index, (statement, params)) = next(statements)
 | 
						|
    except StopIteration:
 | 
						|
        return
 | 
						|
 | 
						|
    try:
 | 
						|
        future = session.execute_async(statement, params)
 | 
						|
        args = (next_index, event, session, statements, results, future, num_finished, to_execute, first_error)
 | 
						|
        future.add_callbacks(
 | 
						|
            callback=_execute_next, callback_args=args,
 | 
						|
            errback=_handle_error, errback_args=args)
 | 
						|
    except Exception as exc:
 | 
						|
        if first_error is not None:
 | 
						|
            if six.PY2:
 | 
						|
                first_error.append(sys.exc_info())
 | 
						|
            else:
 | 
						|
                first_error.append(exc)
 | 
						|
            event.set()
 | 
						|
            return
 | 
						|
        else:
 | 
						|
            results[next_index] = (False, exc)
 | 
						|
            if next(num_finished) >= to_execute:
 | 
						|
                event.set()
 | 
						|
                return
 | 
						|
 | 
						|
 | 
						|
def _execute_next(result, result_index, event, session, statements, results,
 | 
						|
                  future, num_finished, to_execute, first_error):
 | 
						|
    if result is not _sentinel:
 | 
						|
        if future.has_more_pages:
 | 
						|
            result = PagedResult(future, result)
 | 
						|
            future.clear_callbacks()
 | 
						|
        results[result_index] = (True, result)
 | 
						|
        finished = next(num_finished)
 | 
						|
        if finished >= to_execute:
 | 
						|
            event.set()
 | 
						|
            return
 | 
						|
 | 
						|
    try:
 | 
						|
        (next_index, (statement, params)) = next(statements)
 | 
						|
    except StopIteration:
 | 
						|
        return
 | 
						|
 | 
						|
    try:
 | 
						|
        future = session.execute_async(statement, params)
 | 
						|
        args = (next_index, event, session, statements, results, future, num_finished, to_execute, first_error)
 | 
						|
        future.add_callbacks(
 | 
						|
            callback=_execute_next, callback_args=args,
 | 
						|
            errback=_handle_error, errback_args=args)
 | 
						|
    except Exception as exc:
 | 
						|
        if first_error is not None:
 | 
						|
            if six.PY2:
 | 
						|
                first_error.append(sys.exc_info())
 | 
						|
            else:
 | 
						|
                first_error.append(exc)
 | 
						|
            event.set()
 | 
						|
            return
 | 
						|
        else:
 | 
						|
            results[next_index] = (False, exc)
 | 
						|
            if next(num_finished) >= to_execute:
 | 
						|
                event.set()
 | 
						|
                return
 |