864 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			864 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3.4
 | 
						|
 | 
						|
"""A simple web crawler."""
 | 
						|
 | 
						|
# TODO:
 | 
						|
# - More organized logging (with task ID or URL?).
 | 
						|
# - Use logging module for Logger.
 | 
						|
# - KeyboardInterrupt in HTML parsing may hang or report unretrieved error.
 | 
						|
# - Support gzip encoding.
 | 
						|
# - Close connection if HTTP/1.0 response.
 | 
						|
# - Add timeouts.  (E.g. when switching networks, all seems to hang.)
 | 
						|
# - Add arguments to specify TLS settings (e.g. cert/key files).
 | 
						|
# - Skip reading large non-text/html files?
 | 
						|
# - Use ETag and If-Modified-Since?
 | 
						|
# - Handle out of file descriptors directly?  (How?)
 | 
						|
 | 
						|
import argparse
 | 
						|
import asyncio
 | 
						|
import asyncio.locks
 | 
						|
import cgi
 | 
						|
from http.client import BadStatusLine
 | 
						|
import logging
 | 
						|
import re
 | 
						|
import sys
 | 
						|
import time
 | 
						|
import urllib.parse
 | 
						|
 | 
						|
 | 
						|
ARGS = argparse.ArgumentParser(description="Web crawler")
 | 
						|
ARGS.add_argument(
 | 
						|
    '--iocp', action='store_true', dest='iocp',
 | 
						|
    default=False, help='Use IOCP event loop (Windows only)')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--select', action='store_true', dest='select',
 | 
						|
    default=False, help='Use Select event loop instead of default')
 | 
						|
ARGS.add_argument(
 | 
						|
    'roots', nargs='*',
 | 
						|
    default=[], help='Root URL (may be repeated)')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--max_redirect', action='store', type=int, metavar='N',
 | 
						|
    default=10, help='Limit redirection chains (for 301, 302 etc.)')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--max_tries', action='store', type=int, metavar='N',
 | 
						|
    default=4, help='Limit retries on network errors')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--max_tasks', action='store', type=int, metavar='N',
 | 
						|
    default=100, help='Limit concurrent connections')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--max_pool', action='store', type=int, metavar='N',
 | 
						|
    default=100, help='Limit connection pool size')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--exclude', action='store', metavar='REGEX',
 | 
						|
    help='Exclude matching URLs')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--strict', action='store_true',
 | 
						|
    default=True, help='Strict host matching (default)')
 | 
						|
ARGS.add_argument(
 | 
						|
    '--lenient', action='store_false', dest='strict',
 | 
						|
    default=False, help='Lenient host matching')
 | 
						|
ARGS.add_argument(
 | 
						|
    '-v', '--verbose', action='count', dest='level',
 | 
						|
    default=1, help='Verbose logging (repeat for more verbose)')
 | 
						|
ARGS.add_argument(
 | 
						|
    '-q', '--quiet', action='store_const', const=0, dest='level',
 | 
						|
    default=1, help='Quiet logging (opposite of --verbose)')
 | 
						|
 | 
						|
 | 
						|
ESCAPES = [('quot', '"'),
 | 
						|
           ('gt', '>'),
 | 
						|
           ('lt', '<'),
 | 
						|
           ('amp', '&')  # Must be last.
 | 
						|
           ]
 | 
						|
 | 
						|
 | 
						|
def unescape(url):
 | 
						|
    """Turn & into &, and so on.
 | 
						|
 | 
						|
    This is the inverse of cgi.escape().
 | 
						|
    """
 | 
						|
    for name, char in ESCAPES:
 | 
						|
        url = url.replace('&' + name + ';', char)
 | 
						|
    return url
 | 
						|
 | 
						|
 | 
						|
def fix_url(url):
 | 
						|
    """Prefix a schema-less URL with http://."""
 | 
						|
    if '://' not in url:
 | 
						|
        url = 'http://' + url
 | 
						|
    return url
 | 
						|
 | 
						|
 | 
						|
class Logger:
 | 
						|
 | 
						|
    def __init__(self, level):
 | 
						|
        self.level = level
 | 
						|
 | 
						|
    def _log(self, n, args):
 | 
						|
        if self.level >= n:
 | 
						|
            print(*args, file=sys.stderr, flush=True)
 | 
						|
 | 
						|
    def log(self, n, *args):
 | 
						|
        self._log(n, args)
 | 
						|
 | 
						|
    def __call__(self, n, *args):
 | 
						|
        self._log(n, args)
 | 
						|
 | 
						|
 | 
						|
class ConnectionPool:
 | 
						|
    """A connection pool.
 | 
						|
 | 
						|
    To open a connection, use reserve().  To recycle it, use unreserve().
 | 
						|
 | 
						|
    The pool is mostly just a mapping from (host, port, ssl) tuples to
 | 
						|
    lists of Connections.  The currently active connections are *not*
 | 
						|
    in the data structure; get_connection() takes the connection out,
 | 
						|
    and recycle_connection() puts it back in.  To recycle a
 | 
						|
    connection, call conn.close(recycle=True).
 | 
						|
 | 
						|
    There are limits to both the overall pool and the per-key pool.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, log, max_pool=10, max_tasks=5):
 | 
						|
        self.log = log
 | 
						|
        self.max_pool = max_pool  # Overall limit.
 | 
						|
        self.max_tasks = max_tasks  # Per-key limit.
 | 
						|
        self.loop = asyncio.get_event_loop()
 | 
						|
        self.connections = {}  # {(host, port, ssl): [Connection, ...], ...}
 | 
						|
        self.queue = []  # [Connection, ...]
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        """Close all connections available for reuse."""
 | 
						|
        for conns in self.connections.values():
 | 
						|
            for conn in conns:
 | 
						|
                conn.close()
 | 
						|
        self.connections.clear()
 | 
						|
        self.queue.clear()
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def get_connection(self, host, port, ssl):
 | 
						|
        """Create or reuse a connection."""
 | 
						|
        port = port or (443 if ssl else 80)
 | 
						|
        try:
 | 
						|
            ipaddrs = yield from self.loop.getaddrinfo(host, port)
 | 
						|
        except Exception as exc:
 | 
						|
            self.log(0, 'Exception %r for (%r, %r)' % (exc, host, port))
 | 
						|
            raise
 | 
						|
        self.log(1, '* %s resolves to %s' %
 | 
						|
                    (host, ', '.join(ip[4][0] for ip in ipaddrs)))
 | 
						|
 | 
						|
        # Look for a reusable connection.
 | 
						|
        for _, _, _, _, (h, p, *_) in ipaddrs:
 | 
						|
            key = h, p, ssl
 | 
						|
            conn = None
 | 
						|
            conns = self.connections.get(key)
 | 
						|
            while conns:
 | 
						|
                conn = conns.pop(0)
 | 
						|
                self.queue.remove(conn)
 | 
						|
                if not conns:
 | 
						|
                    del self.connections[key]
 | 
						|
                if conn.stale():
 | 
						|
                    self.log(1, 'closing stale connection for', key)
 | 
						|
                    conn.close()  # Just in case.
 | 
						|
                else:
 | 
						|
                    self.log(1, '* Reusing pooled connection', key,
 | 
						|
                                'FD =', conn.fileno())
 | 
						|
                    return conn
 | 
						|
 | 
						|
        # Create a new connection.
 | 
						|
        conn = Connection(self.log, self, host, port, ssl)
 | 
						|
        yield from conn.connect()
 | 
						|
        self.log(1, '* New connection', conn.key, 'FD =', conn.fileno())
 | 
						|
        return conn
 | 
						|
 | 
						|
    def recycle_connection(self, conn):
 | 
						|
        """Make a connection available for reuse.
 | 
						|
 | 
						|
        This also prunes the pool if it exceeds the size limits.
 | 
						|
        """
 | 
						|
        if conn.stale():
 | 
						|
            conn.close()
 | 
						|
            return
 | 
						|
 | 
						|
        key = conn.key
 | 
						|
        conns = self.connections.setdefault(key, [])
 | 
						|
        conns.append(conn)
 | 
						|
        self.queue.append(conn)
 | 
						|
 | 
						|
        if len(conns) <= self.max_tasks and len(self.queue) <= self.max_pool:
 | 
						|
            return
 | 
						|
 | 
						|
        # Prune the queue.
 | 
						|
 | 
						|
        # Close stale connections for this key first.
 | 
						|
        stale = [conn for conn in conns if conn.stale()]
 | 
						|
        if stale:
 | 
						|
            for conn in stale:
 | 
						|
                conns.remove(conn)
 | 
						|
                self.queue.remove(conn)
 | 
						|
                self.log(1, 'closing stale connection for', key)
 | 
						|
                conn.close()
 | 
						|
            if not conns:
 | 
						|
                del self.connections[key]
 | 
						|
 | 
						|
        # Close oldest connection(s) for this key if limit reached.
 | 
						|
        while len(conns) > self.max_tasks:
 | 
						|
            conn = conns.pop(0)
 | 
						|
            self.queue.remove(conn)
 | 
						|
            self.log(1, 'closing oldest connection for', key)
 | 
						|
            conn.close()
 | 
						|
 | 
						|
        if len(self.queue) <= self.max_pool:
 | 
						|
            return
 | 
						|
 | 
						|
        # Close overall stale connections.
 | 
						|
        stale = [conn for conn in self.queue if conn.stale()]
 | 
						|
        if stale:
 | 
						|
            for conn in stale:
 | 
						|
                conns = self.connections.get(conn.key)
 | 
						|
                conns.remove(conn)
 | 
						|
                self.queue.remove(conn)
 | 
						|
                self.log(1, 'closing stale connection for', key)
 | 
						|
                conn.close()
 | 
						|
 | 
						|
        # Close oldest overall connection(s) if limit reached.
 | 
						|
        while len(self.queue) > self.max_pool:
 | 
						|
            conn = self.queue.pop(0)
 | 
						|
            conns = self.connections.get(conn.key)
 | 
						|
            c = conns.pop(0)
 | 
						|
            assert conn == c, (conn.key, conn, c, conns)
 | 
						|
            self.log(1, 'closing overall oldest connection for', conn.key)
 | 
						|
            conn.close()
 | 
						|
 | 
						|
 | 
						|
class Connection:
 | 
						|
 | 
						|
    def __init__(self, log, pool, host, port, ssl):
 | 
						|
        self.log = log
 | 
						|
        self.pool = pool
 | 
						|
        self.host = host
 | 
						|
        self.port = port
 | 
						|
        self.ssl = ssl
 | 
						|
        self.reader = None
 | 
						|
        self.writer = None
 | 
						|
        self.key = None
 | 
						|
 | 
						|
    def stale(self):
 | 
						|
        return self.reader is None or self.reader.at_eof()
 | 
						|
 | 
						|
    def fileno(self):
 | 
						|
        writer = self.writer
 | 
						|
        if writer is not None:
 | 
						|
            transport = writer.transport
 | 
						|
            if transport is not None:
 | 
						|
                sock = transport.get_extra_info('socket')
 | 
						|
                if sock is not None:
 | 
						|
                    return sock.fileno()
 | 
						|
        return None
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def connect(self):
 | 
						|
        self.reader, self.writer = yield from asyncio.open_connection(
 | 
						|
            self.host, self.port, ssl=self.ssl)
 | 
						|
        peername = self.writer.get_extra_info('peername')
 | 
						|
        if peername:
 | 
						|
            self.host, self.port = peername[:2]
 | 
						|
        else:
 | 
						|
            self.log(1, 'NO PEERNAME???', self.host, self.port, self.ssl)
 | 
						|
        self.key = self.host, self.port, self.ssl
 | 
						|
 | 
						|
    def close(self, recycle=False):
 | 
						|
        if recycle and not self.stale():
 | 
						|
            self.pool.recycle_connection(self)
 | 
						|
        else:
 | 
						|
            self.writer.close()
 | 
						|
            self.pool = self.reader = self.writer = None
 | 
						|
 | 
						|
 | 
						|
class Request:
 | 
						|
    """HTTP request.
 | 
						|
 | 
						|
    Use connect() to open a connection; send_request() to send the
 | 
						|
    request; get_response() to receive the response headers.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, log, url, pool):
 | 
						|
        self.log = log
 | 
						|
        self.url = url
 | 
						|
        self.pool = pool
 | 
						|
        self.parts = urllib.parse.urlparse(self.url)
 | 
						|
        self.scheme = self.parts.scheme
 | 
						|
        assert self.scheme in ('http', 'https'), repr(url)
 | 
						|
        self.ssl = self.parts.scheme == 'https'
 | 
						|
        self.netloc = self.parts.netloc
 | 
						|
        self.hostname = self.parts.hostname
 | 
						|
        self.port = self.parts.port or (443 if self.ssl else 80)
 | 
						|
        self.path = (self.parts.path or '/')
 | 
						|
        self.query = self.parts.query
 | 
						|
        if self.query:
 | 
						|
            self.full_path = '%s?%s' % (self.path, self.query)
 | 
						|
        else:
 | 
						|
            self.full_path = self.path
 | 
						|
        self.http_version = 'HTTP/1.1'
 | 
						|
        self.method = 'GET'
 | 
						|
        self.headers = []
 | 
						|
        self.conn = None
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def connect(self):
 | 
						|
        """Open a connection to the server."""
 | 
						|
        self.log(1, '* Connecting to %s:%s using %s for %s' %
 | 
						|
                    (self.hostname, self.port,
 | 
						|
                     'ssl' if self.ssl else 'tcp',
 | 
						|
                     self.url))
 | 
						|
        self.conn = yield from self.pool.get_connection(self.hostname,
 | 
						|
                                                        self.port, self.ssl)
 | 
						|
 | 
						|
    def close(self, recycle=False):
 | 
						|
        """Close the connection, recycle if requested."""
 | 
						|
        if self.conn is not None:
 | 
						|
            if not recycle:
 | 
						|
                self.log(1, 'closing connection for', self.conn.key)
 | 
						|
            self.conn.close(recycle)
 | 
						|
            self.conn = None
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def putline(self, line):
 | 
						|
        """Write a line to the connection.
 | 
						|
 | 
						|
        Used for the request line and headers.
 | 
						|
        """
 | 
						|
        self.log(2, '>', line)
 | 
						|
        self.conn.writer.write(line.encode('latin-1') + b'\r\n')
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def send_request(self):
 | 
						|
        """Send the request."""
 | 
						|
        request_line = '%s %s %s' % (self.method, self.full_path,
 | 
						|
                                     self.http_version)
 | 
						|
        yield from self.putline(request_line)
 | 
						|
        # TODO: What if a header is already set?
 | 
						|
        self.headers.append(('User-Agent', 'asyncio-example-crawl/0.0'))
 | 
						|
        self.headers.append(('Host', self.netloc))
 | 
						|
        self.headers.append(('Accept', '*/*'))
 | 
						|
        ##self.headers.append(('Accept-Encoding', 'gzip'))
 | 
						|
        for key, value in self.headers:
 | 
						|
            line = '%s: %s' % (key, value)
 | 
						|
            yield from self.putline(line)
 | 
						|
        yield from self.putline('')
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def get_response(self):
 | 
						|
        """Receive the response."""
 | 
						|
        response = Response(self.log, self.conn.reader)
 | 
						|
        yield from response.read_headers()
 | 
						|
        return response
 | 
						|
 | 
						|
 | 
						|
class Response:
 | 
						|
    """HTTP response.
 | 
						|
 | 
						|
    Call read_headers() to receive the request headers.  Then check
 | 
						|
    the status attribute and call get_header() to inspect the headers.
 | 
						|
    Finally call read() to receive the body.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, log, reader):
 | 
						|
        self.log = log
 | 
						|
        self.reader = reader
 | 
						|
        self.http_version = None  # 'HTTP/1.1'
 | 
						|
        self.status = None  # 200
 | 
						|
        self.reason = None  # 'Ok'
 | 
						|
        self.headers = []  # [('Content-Type', 'text/html')]
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def getline(self):
 | 
						|
        """Read one line from the connection."""
 | 
						|
        line = (yield from self.reader.readline()).decode('latin-1').rstrip()
 | 
						|
        self.log(2, '<', line)
 | 
						|
        return line
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def read_headers(self):
 | 
						|
        """Read the response status and the request headers."""
 | 
						|
        status_line = yield from self.getline()
 | 
						|
        status_parts = status_line.split(None, 2)
 | 
						|
        if len(status_parts) != 3:
 | 
						|
            self.log(0, 'bad status_line', repr(status_line))
 | 
						|
            raise BadStatusLine(status_line)
 | 
						|
        self.http_version, status, self.reason = status_parts
 | 
						|
        self.status = int(status)
 | 
						|
        while True:
 | 
						|
            header_line = yield from self.getline()
 | 
						|
            if not header_line:
 | 
						|
                break
 | 
						|
            # TODO: Continuation lines.
 | 
						|
            key, value = header_line.split(':', 1)
 | 
						|
            self.headers.append((key, value.strip()))
 | 
						|
 | 
						|
    def get_redirect_url(self, default=''):
 | 
						|
        """Inspect the status and return the redirect url if appropriate."""
 | 
						|
        if self.status not in (300, 301, 302, 303, 307):
 | 
						|
            return default
 | 
						|
        return self.get_header('Location', default)
 | 
						|
 | 
						|
    def get_header(self, key, default=''):
 | 
						|
        """Get one header value, using a case insensitive header name."""
 | 
						|
        key = key.lower()
 | 
						|
        for k, v in self.headers:
 | 
						|
            if k.lower() == key:
 | 
						|
                return v
 | 
						|
        return default
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def read(self):
 | 
						|
        """Read the response body.
 | 
						|
 | 
						|
        This honors Content-Length and Transfer-Encoding: chunked.
 | 
						|
        """
 | 
						|
        nbytes = None
 | 
						|
        for key, value in self.headers:
 | 
						|
            if key.lower() == 'content-length':
 | 
						|
                nbytes = int(value)
 | 
						|
                break
 | 
						|
        if nbytes is None:
 | 
						|
            if self.get_header('transfer-encoding').lower() == 'chunked':
 | 
						|
                self.log(2, 'parsing chunked response')
 | 
						|
                blocks = []
 | 
						|
                while True:
 | 
						|
                    size_header = yield from self.reader.readline()
 | 
						|
                    if not size_header:
 | 
						|
                        self.log(0, 'premature end of chunked response')
 | 
						|
                        break
 | 
						|
                    self.log(3, 'size_header =', repr(size_header))
 | 
						|
                    parts = size_header.split(b';')
 | 
						|
                    size = int(parts[0], 16)
 | 
						|
                    if size:
 | 
						|
                        self.log(3, 'reading chunk of', size, 'bytes')
 | 
						|
                        block = yield from self.reader.readexactly(size)
 | 
						|
                        assert len(block) == size, (len(block), size)
 | 
						|
                        blocks.append(block)
 | 
						|
                    crlf = yield from self.reader.readline()
 | 
						|
                    assert crlf == b'\r\n', repr(crlf)
 | 
						|
                    if not size:
 | 
						|
                        break
 | 
						|
                body = b''.join(blocks)
 | 
						|
                self.log(1, 'chunked response had', len(body),
 | 
						|
                            'bytes in', len(blocks), 'blocks')
 | 
						|
            else:
 | 
						|
                self.log(3, 'reading until EOF')
 | 
						|
                body = yield from self.reader.read()
 | 
						|
                # TODO: Should make sure not to recycle the connection
 | 
						|
                # in this case.
 | 
						|
        else:
 | 
						|
            body = yield from self.reader.readexactly(nbytes)
 | 
						|
        return body
 | 
						|
 | 
						|
 | 
						|
class Fetcher:
 | 
						|
    """Logic and state for one URL.
 | 
						|
 | 
						|
    When found in crawler.busy, this represents a URL to be fetched or
 | 
						|
    in the process of being fetched; when found in crawler.done, this
 | 
						|
    holds the results from fetching it.
 | 
						|
 | 
						|
    This is usually associated with a task.  This references the
 | 
						|
    crawler for the connection pool and to add more URLs to its todo
 | 
						|
    list.
 | 
						|
 | 
						|
    Call fetch() to do the fetching, then report() to print the results.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, log, url, crawler, max_redirect=10, max_tries=4):
 | 
						|
        self.log = log
 | 
						|
        self.url = url
 | 
						|
        self.crawler = crawler
 | 
						|
        # We don't loop resolving redirects here -- we just use this
 | 
						|
        # to decide whether to add the redirect URL to crawler.todo.
 | 
						|
        self.max_redirect = max_redirect
 | 
						|
        # But we do loop to retry on errors a few times.
 | 
						|
        self.max_tries = max_tries
 | 
						|
        # Everything we collect from the response goes here.
 | 
						|
        self.task = None
 | 
						|
        self.exceptions = []
 | 
						|
        self.tries = 0
 | 
						|
        self.request = None
 | 
						|
        self.response = None
 | 
						|
        self.body = None
 | 
						|
        self.next_url = None
 | 
						|
        self.ctype = None
 | 
						|
        self.pdict = None
 | 
						|
        self.encoding = None
 | 
						|
        self.urls = None
 | 
						|
        self.new_urls = None
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def fetch(self):
 | 
						|
        """Attempt to fetch the contents of the URL.
 | 
						|
 | 
						|
        If successful, and the data is HTML, extract further links and
 | 
						|
        add them to the crawler.  Redirects are also added back there.
 | 
						|
        """
 | 
						|
        while self.tries < self.max_tries:
 | 
						|
            self.tries += 1
 | 
						|
            self.request = None
 | 
						|
            try:
 | 
						|
                self.request = Request(self.log, self.url, self.crawler.pool)
 | 
						|
                yield from self.request.connect()
 | 
						|
                yield from self.request.send_request()
 | 
						|
                self.response = yield from self.request.get_response()
 | 
						|
                self.body = yield from self.response.read()
 | 
						|
                h_conn = self.response.get_header('connection').lower()
 | 
						|
                if h_conn != 'close':
 | 
						|
                    self.request.close(recycle=True)
 | 
						|
                    self.request = None
 | 
						|
                if self.tries > 1:
 | 
						|
                    self.log(1, 'try', self.tries, 'for', self.url, 'success')
 | 
						|
                break
 | 
						|
            except (BadStatusLine, OSError) as exc:
 | 
						|
                self.exceptions.append(exc)
 | 
						|
                self.log(1, 'try', self.tries, 'for', self.url,
 | 
						|
                            'raised', repr(exc))
 | 
						|
                ##import pdb; pdb.set_trace()
 | 
						|
                # Don't reuse the connection in this case.
 | 
						|
            finally:
 | 
						|
                if self.request is not None:
 | 
						|
                    self.request.close()
 | 
						|
        else:
 | 
						|
            # We never broke out of the while loop, i.e. all tries failed.
 | 
						|
            self.log(0, 'no success for', self.url,
 | 
						|
                        'in', self.max_tries, 'tries')
 | 
						|
            return
 | 
						|
        next_url = self.response.get_redirect_url()
 | 
						|
        if next_url:
 | 
						|
            self.next_url = urllib.parse.urljoin(self.url, next_url)
 | 
						|
            if self.max_redirect > 0:
 | 
						|
                self.log(1, 'redirect to', self.next_url, 'from', self.url)
 | 
						|
                self.crawler.add_url(self.next_url, self.max_redirect-1)
 | 
						|
            else:
 | 
						|
                self.log(0, 'redirect limit reached for', self.next_url,
 | 
						|
                            'from', self.url)
 | 
						|
        else:
 | 
						|
            if self.response.status == 200:
 | 
						|
                self.ctype = self.response.get_header('content-type')
 | 
						|
                self.pdict = {}
 | 
						|
                if self.ctype:
 | 
						|
                    self.ctype, self.pdict = cgi.parse_header(self.ctype)
 | 
						|
                self.encoding = self.pdict.get('charset', 'utf-8')
 | 
						|
                if self.ctype == 'text/html':
 | 
						|
                    body = self.body.decode(self.encoding, 'replace')
 | 
						|
                    # Replace href with (?:href|src) to follow image links.
 | 
						|
                    self.urls = set(re.findall(r'(?i)href=["\']?([^\s"\'<>]+)',
 | 
						|
                                               body))
 | 
						|
                    if self.urls:
 | 
						|
                        self.log(1, 'got', len(self.urls),
 | 
						|
                                    'distinct urls from', self.url)
 | 
						|
                    self.new_urls = set()
 | 
						|
                    for url in self.urls:
 | 
						|
                        url = unescape(url)
 | 
						|
                        url = urllib.parse.urljoin(self.url, url)
 | 
						|
                        url, frag = urllib.parse.urldefrag(url)
 | 
						|
                        if self.crawler.add_url(url):
 | 
						|
                            self.new_urls.add(url)
 | 
						|
 | 
						|
    def report(self, stats, file=None):
 | 
						|
        """Print a report on the state for this URL.
 | 
						|
 | 
						|
        Also update the Stats instance.
 | 
						|
        """
 | 
						|
        if self.task is not None:
 | 
						|
            if not self.task.done():
 | 
						|
                stats.add('pending')
 | 
						|
                print(self.url, 'pending', file=file)
 | 
						|
                return
 | 
						|
            elif self.task.cancelled():
 | 
						|
                stats.add('cancelled')
 | 
						|
                print(self.url, 'cancelled', file=file)
 | 
						|
                return
 | 
						|
            elif self.task.exception():
 | 
						|
                stats.add('exception')
 | 
						|
                exc = self.task.exception()
 | 
						|
                stats.add('exception_' + exc.__class__.__name__)
 | 
						|
                print(self.url, exc, file=file)
 | 
						|
                return
 | 
						|
        if len(self.exceptions) == self.tries:
 | 
						|
            stats.add('fail')
 | 
						|
            exc = self.exceptions[-1]
 | 
						|
            stats.add('fail_' + str(exc.__class__.__name__))
 | 
						|
            print(self.url, 'error', exc, file=file)
 | 
						|
        elif self.next_url:
 | 
						|
            stats.add('redirect')
 | 
						|
            print(self.url, self.response.status, 'redirect', self.next_url,
 | 
						|
                  file=file)
 | 
						|
        elif self.ctype == 'text/html':
 | 
						|
            stats.add('html')
 | 
						|
            size = len(self.body or b'')
 | 
						|
            stats.add('html_bytes', size)
 | 
						|
            print(self.url, self.response.status,
 | 
						|
                  self.ctype, self.encoding,
 | 
						|
                  size,
 | 
						|
                  '%d/%d' % (len(self.new_urls or ()), len(self.urls or ())),
 | 
						|
                  file=file)
 | 
						|
        elif self.response is None:
 | 
						|
            print(self.url, 'no response object')
 | 
						|
        else:
 | 
						|
            size = len(self.body or b'')
 | 
						|
            if self.response.status == 200:
 | 
						|
                stats.add('other')
 | 
						|
                stats.add('other_bytes', size)
 | 
						|
            else:
 | 
						|
                stats.add('error')
 | 
						|
                stats.add('error_bytes', size)
 | 
						|
                stats.add('status_%s' % self.response.status)
 | 
						|
            print(self.url, self.response.status,
 | 
						|
                  self.ctype, self.encoding,
 | 
						|
                  size,
 | 
						|
                  file=file)
 | 
						|
 | 
						|
 | 
						|
class Stats:
 | 
						|
    """Record stats of various sorts."""
 | 
						|
 | 
						|
    def __init__(self):
 | 
						|
        self.stats = {}
 | 
						|
 | 
						|
    def add(self, key, count=1):
 | 
						|
        self.stats[key] = self.stats.get(key, 0) + count
 | 
						|
 | 
						|
    def report(self, file=None):
 | 
						|
        for key, count in sorted(self.stats.items()):
 | 
						|
            print('%10d' % count, key, file=file)
 | 
						|
 | 
						|
 | 
						|
class Crawler:
 | 
						|
    """Crawl a set of URLs.
 | 
						|
 | 
						|
    This manages three disjoint sets of URLs (todo, busy, done).  The
 | 
						|
    data structures actually store dicts -- the values in todo give
 | 
						|
    the redirect limit, while the values in busy and done are Fetcher
 | 
						|
    instances.
 | 
						|
    """
 | 
						|
    def __init__(self, log,
 | 
						|
                 roots, exclude=None, strict=True,  # What to crawl.
 | 
						|
                 max_redirect=10, max_tries=4,  # Per-url limits.
 | 
						|
                 max_tasks=10, max_pool=10,  # Global limits.
 | 
						|
                 ):
 | 
						|
        self.log = log
 | 
						|
        self.roots = roots
 | 
						|
        self.exclude = exclude
 | 
						|
        self.strict = strict
 | 
						|
        self.max_redirect = max_redirect
 | 
						|
        self.max_tries = max_tries
 | 
						|
        self.max_tasks = max_tasks
 | 
						|
        self.max_pool = max_pool
 | 
						|
        self.todo = {}
 | 
						|
        self.busy = {}
 | 
						|
        self.done = {}
 | 
						|
        self.pool = ConnectionPool(self.log, max_pool, max_tasks)
 | 
						|
        self.root_domains = set()
 | 
						|
        for root in roots:
 | 
						|
            parts = urllib.parse.urlparse(root)
 | 
						|
            host, port = urllib.parse.splitport(parts.netloc)
 | 
						|
            if not host:
 | 
						|
                continue
 | 
						|
            if re.match(r'\A[\d\.]*\Z', host):
 | 
						|
                self.root_domains.add(host)
 | 
						|
            else:
 | 
						|
                host = host.lower()
 | 
						|
                if self.strict:
 | 
						|
                    self.root_domains.add(host)
 | 
						|
                    if host.startswith('www.'):
 | 
						|
                        self.root_domains.add(host[4:])
 | 
						|
                    else:
 | 
						|
                        self.root_domains.add('www.' + host)
 | 
						|
                else:
 | 
						|
                    parts = host.split('.')
 | 
						|
                    if len(parts) > 2:
 | 
						|
                        host = '.'.join(parts[-2:])
 | 
						|
                    self.root_domains.add(host)
 | 
						|
        for root in roots:
 | 
						|
            self.add_url(root)
 | 
						|
        self.governor = asyncio.locks.Semaphore(max_tasks)
 | 
						|
        self.termination = asyncio.locks.Condition()
 | 
						|
        self.t0 = time.time()
 | 
						|
        self.t1 = None
 | 
						|
 | 
						|
    def close(self):
 | 
						|
        """Close resources (currently only the pool)."""
 | 
						|
        self.pool.close()
 | 
						|
 | 
						|
    def host_okay(self, host):
 | 
						|
        """Check if a host should be crawled.
 | 
						|
 | 
						|
        A literal match (after lowercasing) is always good.  For hosts
 | 
						|
        that don't look like IP addresses, some approximate matches
 | 
						|
        are okay depending on the strict flag.
 | 
						|
        """
 | 
						|
        host = host.lower()
 | 
						|
        if host in self.root_domains:
 | 
						|
            return True
 | 
						|
        if re.match(r'\A[\d\.]*\Z', host):
 | 
						|
            return False
 | 
						|
        if self.strict:
 | 
						|
            return self._host_okay_strictish(host)
 | 
						|
        else:
 | 
						|
            return self._host_okay_lenient(host)
 | 
						|
 | 
						|
    def _host_okay_strictish(self, host):
 | 
						|
        """Check if a host should be crawled, strict-ish version.
 | 
						|
 | 
						|
        This checks for equality modulo an initial 'www.' component.
 | 
						|
         """
 | 
						|
        if host.startswith('www.'):
 | 
						|
            if host[4:] in self.root_domains:
 | 
						|
                return True
 | 
						|
        else:
 | 
						|
            if 'www.' + host in self.root_domains:
 | 
						|
                return True
 | 
						|
        return False
 | 
						|
 | 
						|
    def _host_okay_lenient(self, host):
 | 
						|
        """Check if a host should be crawled, lenient version.
 | 
						|
 | 
						|
        This compares the last two components of the host.
 | 
						|
        """
 | 
						|
        parts = host.split('.')
 | 
						|
        if len(parts) > 2:
 | 
						|
            host = '.'.join(parts[-2:])
 | 
						|
        return host in self.root_domains
 | 
						|
 | 
						|
    def add_url(self, url, max_redirect=None):
 | 
						|
        """Add a URL to the todo list if not seen before."""
 | 
						|
        if self.exclude and re.search(self.exclude, url):
 | 
						|
            return False
 | 
						|
        parts = urllib.parse.urlparse(url)
 | 
						|
        if parts.scheme not in ('http', 'https'):
 | 
						|
            self.log(2, 'skipping non-http scheme in', url)
 | 
						|
            return False
 | 
						|
        host, port = urllib.parse.splitport(parts.netloc)
 | 
						|
        if not self.host_okay(host):
 | 
						|
            self.log(2, 'skipping non-root host in', url)
 | 
						|
            return False
 | 
						|
        if max_redirect is None:
 | 
						|
            max_redirect = self.max_redirect
 | 
						|
        if url in self.todo or url in self.busy or url in self.done:
 | 
						|
            return False
 | 
						|
        self.log(1, 'adding', url, max_redirect)
 | 
						|
        self.todo[url] = max_redirect
 | 
						|
        return True
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def crawl(self):
 | 
						|
        """Run the crawler until all finished."""
 | 
						|
        with (yield from self.termination):
 | 
						|
            while self.todo or self.busy:
 | 
						|
                if self.todo:
 | 
						|
                    url, max_redirect = self.todo.popitem()
 | 
						|
                    fetcher = Fetcher(self.log, url,
 | 
						|
                                      crawler=self,
 | 
						|
                                      max_redirect=max_redirect,
 | 
						|
                                      max_tries=self.max_tries,
 | 
						|
                                      )
 | 
						|
                    self.busy[url] = fetcher
 | 
						|
                    fetcher.task = asyncio.Task(self.fetch(fetcher))
 | 
						|
                else:
 | 
						|
                    yield from self.termination.wait()
 | 
						|
        self.t1 = time.time()
 | 
						|
 | 
						|
    @asyncio.coroutine
 | 
						|
    def fetch(self, fetcher):
 | 
						|
        """Call the Fetcher's fetch(), with a limit on concurrency.
 | 
						|
 | 
						|
        Once this returns, move the fetcher from busy to done.
 | 
						|
        """
 | 
						|
        url = fetcher.url
 | 
						|
        with (yield from self.governor):
 | 
						|
            try:
 | 
						|
                yield from fetcher.fetch()  # Fetcher gonna fetch.
 | 
						|
            finally:
 | 
						|
                # Force GC of the task, so the error is logged.
 | 
						|
                fetcher.task = None
 | 
						|
        with (yield from self.termination):
 | 
						|
            self.done[url] = fetcher
 | 
						|
            del self.busy[url]
 | 
						|
            self.termination.notify()
 | 
						|
 | 
						|
    def report(self, file=None):
 | 
						|
        """Print a report on all completed URLs."""
 | 
						|
        if self.t1 is None:
 | 
						|
            self.t1 = time.time()
 | 
						|
        dt = self.t1 - self.t0
 | 
						|
        if dt and self.max_tasks:
 | 
						|
            speed = len(self.done) / dt / self.max_tasks
 | 
						|
        else:
 | 
						|
            speed = 0
 | 
						|
        stats = Stats()
 | 
						|
        print('*** Report ***', file=file)
 | 
						|
        try:
 | 
						|
            show = []
 | 
						|
            show.extend(self.done.items())
 | 
						|
            show.extend(self.busy.items())
 | 
						|
            show.sort()
 | 
						|
            for url, fetcher in show:
 | 
						|
                fetcher.report(stats, file=file)
 | 
						|
        except KeyboardInterrupt:
 | 
						|
            print('\nInterrupted', file=file)
 | 
						|
        print('Finished', len(self.done),
 | 
						|
              'urls in %.3f secs' % dt,
 | 
						|
              '(max_tasks=%d)' % self.max_tasks,
 | 
						|
              '(%.3f urls/sec/task)' % speed,
 | 
						|
              file=file)
 | 
						|
        stats.report(file=file)
 | 
						|
        print('Todo:', len(self.todo), file=file)
 | 
						|
        print('Busy:', len(self.busy), file=file)
 | 
						|
        print('Done:', len(self.done), file=file)
 | 
						|
        print('Date:', time.ctime(), 'local time', file=file)
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    """Main program.
 | 
						|
 | 
						|
    Parse arguments, set up event loop, run crawler, print report.
 | 
						|
    """
 | 
						|
    args = ARGS.parse_args()
 | 
						|
    if not args.roots:
 | 
						|
        print('Use --help for command line help')
 | 
						|
        return
 | 
						|
 | 
						|
    log = Logger(args.level)
 | 
						|
 | 
						|
    if args.iocp:
 | 
						|
        from asyncio.windows_events import ProactorEventLoop
 | 
						|
        loop = ProactorEventLoop()
 | 
						|
        asyncio.set_event_loop(loop)
 | 
						|
    elif args.select:
 | 
						|
        loop = asyncio.SelectorEventLoop()
 | 
						|
        asyncio.set_event_loop(loop)
 | 
						|
    else:
 | 
						|
        loop = asyncio.get_event_loop()
 | 
						|
 | 
						|
    roots = {fix_url(root) for root in args.roots}
 | 
						|
 | 
						|
    crawler = Crawler(log,
 | 
						|
                      roots, exclude=args.exclude,
 | 
						|
                      strict=args.strict,
 | 
						|
                      max_redirect=args.max_redirect,
 | 
						|
                      max_tries=args.max_tries,
 | 
						|
                      max_tasks=args.max_tasks,
 | 
						|
                      max_pool=args.max_pool,
 | 
						|
                      )
 | 
						|
    try:
 | 
						|
        loop.run_until_complete(crawler.crawl())  # Crawler gonna crawl.
 | 
						|
    except KeyboardInterrupt:
 | 
						|
        sys.stderr.flush()
 | 
						|
        print('\nInterrupted\n')
 | 
						|
    finally:
 | 
						|
        crawler.report()
 | 
						|
        crawler.close()
 | 
						|
        loop.close()
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    logging.basicConfig(level=logging.INFO)
 | 
						|
    main()
 |