160 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			160 lines
		
	
	
		
			5.0 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
"""\
 | 
						|
@file test_pools.py
 | 
						|
@author Donovan Preston, Aaron Brashears
 | 
						|
 | 
						|
Copyright (c) 2006-2007, Linden Research, Inc.
 | 
						|
Permission is hereby granted, free of charge, to any person obtaining a copy
 | 
						|
of this software and associated documentation files (the "Software"), to deal
 | 
						|
in the Software without restriction, including without limitation the rights
 | 
						|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 | 
						|
copies of the Software, and to permit persons to whom the Software is
 | 
						|
furnished to do so, subject to the following conditions:
 | 
						|
 | 
						|
The above copyright notice and this permission notice shall be included in
 | 
						|
all copies or substantial portions of the Software.
 | 
						|
 | 
						|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 | 
						|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 | 
						|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 | 
						|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 | 
						|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 | 
						|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 | 
						|
THE SOFTWARE.
 | 
						|
"""
 | 
						|
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
from eventlet import api
 | 
						|
from eventlet import coros
 | 
						|
from eventlet import pools
 | 
						|
 | 
						|
 | 
						|
class IntPool(pools.Pool):
 | 
						|
    def create(self):
 | 
						|
        self.current_integer = getattr(self, 'current_integer', 0) + 1
 | 
						|
        return self.current_integer
 | 
						|
 | 
						|
 | 
						|
class TestIntPool(unittest.TestCase):
 | 
						|
    mode = 'static'
 | 
						|
    def setUp(self):
 | 
						|
        self.pool = IntPool(min_size=0, max_size=4)
 | 
						|
 | 
						|
    def test_integers(self):
 | 
						|
        # Do not actually use this pattern in your code. The pool will be
 | 
						|
        # exhausted, and unrestoreable.
 | 
						|
        # If you do a get, you should ALWAYS do a put, probably like this:
 | 
						|
        # try:
 | 
						|
        #     thing = self.pool.get()
 | 
						|
        #     # do stuff
 | 
						|
        # finally:
 | 
						|
        #     self.pool.put(thing)
 | 
						|
 | 
						|
        # with self.pool.some_api_name() as thing:
 | 
						|
        #     # do stuff
 | 
						|
        self.assertEquals(self.pool.get(), 1)
 | 
						|
        self.assertEquals(self.pool.get(), 2)
 | 
						|
        self.assertEquals(self.pool.get(), 3)
 | 
						|
        self.assertEquals(self.pool.get(), 4)
 | 
						|
 | 
						|
    def test_free(self):
 | 
						|
        self.assertEquals(self.pool.free(), 4)
 | 
						|
        gotten = self.pool.get()
 | 
						|
        self.assertEquals(self.pool.free(), 3)
 | 
						|
        self.pool.put(gotten)
 | 
						|
        self.assertEquals(self.pool.free(), 4)
 | 
						|
 | 
						|
    def test_exhaustion(self):
 | 
						|
        waiter = coros.event()
 | 
						|
        def consumer():
 | 
						|
            gotten = None
 | 
						|
            cancel = api.exc_after(1, api.TimeoutError)
 | 
						|
            try:
 | 
						|
                print time.asctime(), "getting"
 | 
						|
                gotten = self.pool.get()
 | 
						|
                print time.asctime(), "got"
 | 
						|
            finally:
 | 
						|
                cancel.cancel()
 | 
						|
                print "waiter send"
 | 
						|
                waiter.send(gotten)
 | 
						|
                print "waiter sent"
 | 
						|
 | 
						|
        api.spawn(consumer)
 | 
						|
 | 
						|
        one, two, three, four = (
 | 
						|
            self.pool.get(), self.pool.get(), self.pool.get(), self.pool.get())
 | 
						|
        self.assertEquals(self.pool.free(), 0)
 | 
						|
 | 
						|
        # Let consumer run; nothing will be in the pool, so he will wait
 | 
						|
        api.sleep(0)
 | 
						|
 | 
						|
        print "put in pool", one
 | 
						|
        # Wake consumer
 | 
						|
        self.pool.put(one)
 | 
						|
        print "done put"
 | 
						|
 | 
						|
        # wait for the consumer
 | 
						|
        self.assertEquals(waiter.wait(), one)
 | 
						|
        print "done wait"
 | 
						|
 | 
						|
    def test_blocks_on_pool(self):
 | 
						|
        waiter = coros.event()
 | 
						|
        def greedy():
 | 
						|
            self.pool.get()
 | 
						|
            self.pool.get()
 | 
						|
            self.pool.get()
 | 
						|
            self.pool.get()
 | 
						|
            # No one should be waiting yet.
 | 
						|
            self.assertEquals(self.pool.waiting(), 0)
 | 
						|
            # The call to the next get will unschedule this routine.
 | 
						|
            print "calling get"
 | 
						|
            self.pool.get()
 | 
						|
            print "called get"
 | 
						|
            # So this send should never be called.
 | 
						|
            waiter.send('Failed!')
 | 
						|
 | 
						|
        killable = api.spawn(greedy)
 | 
						|
 | 
						|
        # no one should be waiting yet.
 | 
						|
        self.assertEquals(self.pool.waiting(), 0)
 | 
						|
 | 
						|
        ## Wait for greedy
 | 
						|
        api.sleep(0)
 | 
						|
 | 
						|
        ## Greedy should be blocking on the last get
 | 
						|
        self.assertEquals(self.pool.waiting(), 1)
 | 
						|
 | 
						|
        ## Send will never be called, so the event should not be ready.
 | 
						|
        self.assertEquals(waiter.ready(), False)
 | 
						|
 | 
						|
        api.kill(killable)
 | 
						|
 | 
						|
 | 
						|
class TestAbstract(unittest.TestCase):
 | 
						|
    mode = 'static'
 | 
						|
    def test_abstract(self):
 | 
						|
        ## Going for 100% coverage here
 | 
						|
        ## A Pool cannot be used without overriding create()
 | 
						|
        pool = pools.Pool()
 | 
						|
        self.assertRaises(NotImplementedError, pool.get)
 | 
						|
 | 
						|
 | 
						|
class TestIntPool2(unittest.TestCase):
 | 
						|
    mode = 'static'
 | 
						|
    def setUp(self):
 | 
						|
        self.pool = IntPool(min_size=3, max_size=3)
 | 
						|
 | 
						|
    def test_something(self):
 | 
						|
        self.assertEquals(len(self.pool.free_items), 3)
 | 
						|
        ## Cover the clause in get where we get from the free list instead of creating
 | 
						|
        ## an item on get
 | 
						|
        gotten = self.pool.get()
 | 
						|
        self.assertEquals(gotten, 1)
 | 
						|
 | 
						|
        
 | 
						|
    
 | 
						|
if __name__ == '__main__':
 | 
						|
    unittest.main()
 | 
						|
 |