Module loggers in test/fixtures and test/service
This commit is contained in:
@@ -11,6 +11,10 @@ from six.moves.urllib.parse import urlparse # pylint: disable-msg=E0611
|
||||
from test.service import ExternalService, SpawnedService
|
||||
from test.testutil import get_open_port
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Fixture(object):
|
||||
kafka_version = os.environ.get('KAFKA_VERSION', '0.8.0')
|
||||
scala_version = os.environ.get("SCALA_VERSION", '2.8.0')
|
||||
@@ -35,21 +39,21 @@ class Fixture(object):
|
||||
output_file = os.path.join(output_dir, distfile + '.tgz')
|
||||
|
||||
if os.path.isfile(output_file):
|
||||
logging.info("Found file already on disk: %s", output_file)
|
||||
log.info("Found file already on disk: %s", output_file)
|
||||
return output_file
|
||||
|
||||
# New tarballs are .tgz, older ones are sometimes .tar.gz
|
||||
try:
|
||||
url = url_base + distfile + '.tgz'
|
||||
logging.info("Attempting to download %s", url)
|
||||
log.info("Attempting to download %s", url)
|
||||
response = urllib.request.urlopen(url)
|
||||
except urllib.error.HTTPError:
|
||||
logging.exception("HTTP Error")
|
||||
log.exception("HTTP Error")
|
||||
url = url_base + distfile + '.tar.gz'
|
||||
logging.info("Attempting to download %s", url)
|
||||
log.info("Attempting to download %s", url)
|
||||
response = urllib.request.urlopen(url)
|
||||
|
||||
logging.info("Saving distribution file to %s", output_file)
|
||||
log.info("Saving distribution file to %s", output_file)
|
||||
with open(output_file, 'w') as output_file_fd:
|
||||
output_file_fd.write(response.read())
|
||||
|
||||
@@ -101,14 +105,14 @@ class ZookeeperFixture(Fixture):
|
||||
self.child = None
|
||||
|
||||
def out(self, message):
|
||||
logging.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
|
||||
log.info("*** Zookeeper [%s:%d]: %s", self.host, self.port, message)
|
||||
|
||||
def open(self):
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
self.out("Running local instance...")
|
||||
logging.info(" host = %s", self.host)
|
||||
logging.info(" port = %s", self.port)
|
||||
logging.info(" tmp_dir = %s", self.tmp_dir)
|
||||
log.info(" host = %s", self.host)
|
||||
log.info(" port = %s", self.port)
|
||||
log.info(" tmp_dir = %s", self.tmp_dir)
|
||||
|
||||
# Generate configs
|
||||
template = self.test_resource("zookeeper.properties")
|
||||
@@ -167,7 +171,7 @@ class KafkaFixture(Fixture):
|
||||
self.running = False
|
||||
|
||||
def out(self, message):
|
||||
logging.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
|
||||
log.info("*** Kafka [%s:%d]: %s", self.host, self.port, message)
|
||||
|
||||
def open(self):
|
||||
if self.running:
|
||||
@@ -176,15 +180,15 @@ class KafkaFixture(Fixture):
|
||||
|
||||
self.tmp_dir = tempfile.mkdtemp()
|
||||
self.out("Running local instance...")
|
||||
logging.info(" host = %s", self.host)
|
||||
logging.info(" port = %s", self.port)
|
||||
logging.info(" broker_id = %s", self.broker_id)
|
||||
logging.info(" zk_host = %s", self.zk_host)
|
||||
logging.info(" zk_port = %s", self.zk_port)
|
||||
logging.info(" zk_chroot = %s", self.zk_chroot)
|
||||
logging.info(" replicas = %s", self.replicas)
|
||||
logging.info(" partitions = %s", self.partitions)
|
||||
logging.info(" tmp_dir = %s", self.tmp_dir)
|
||||
log.info(" host = %s", self.host)
|
||||
log.info(" port = %s", self.port)
|
||||
log.info(" broker_id = %s", self.broker_id)
|
||||
log.info(" zk_host = %s", self.zk_host)
|
||||
log.info(" zk_port = %s", self.zk_port)
|
||||
log.info(" zk_chroot = %s", self.zk_chroot)
|
||||
log.info(" replicas = %s", self.replicas)
|
||||
log.info(" partitions = %s", self.partitions)
|
||||
log.info(" tmp_dir = %s", self.tmp_dir)
|
||||
|
||||
# Create directories
|
||||
os.mkdir(os.path.join(self.tmp_dir, "logs"))
|
||||
|
@@ -11,9 +11,13 @@ __all__ = [
|
||||
|
||||
]
|
||||
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExternalService(object):
|
||||
def __init__(self, host, port):
|
||||
logging.info("Using already running service at %s:%d", host, port)
|
||||
log.info("Using already running service at %s:%d", host, port)
|
||||
self.host = host
|
||||
self.port = port
|
||||
|
||||
@@ -73,13 +77,13 @@ class SpawnedService(threading.Thread):
|
||||
raise RuntimeError("Subprocess has died. Aborting. (args=%s)" % ' '.join(str(x) for x in self.args))
|
||||
|
||||
def dump_logs(self):
|
||||
logging.critical('stderr')
|
||||
log.critical('stderr')
|
||||
for line in self.captured_stderr:
|
||||
logging.critical(line.rstrip())
|
||||
log.critical(line.rstrip())
|
||||
|
||||
logging.critical('stdout')
|
||||
log.critical('stdout')
|
||||
for line in self.captured_stdout:
|
||||
logging.critical(line.rstrip())
|
||||
log.critical(line.rstrip())
|
||||
|
||||
def wait_for(self, pattern, timeout=30):
|
||||
t1 = time.time()
|
||||
@@ -89,16 +93,16 @@ class SpawnedService(threading.Thread):
|
||||
try:
|
||||
self.child.kill()
|
||||
except:
|
||||
logging.exception("Received exception when killing child process")
|
||||
log.exception("Received exception when killing child process")
|
||||
self.dump_logs()
|
||||
|
||||
raise RuntimeError("Waiting for %r timed out after %d seconds" % (pattern, timeout))
|
||||
|
||||
if re.search(pattern, '\n'.join(self.captured_stdout), re.IGNORECASE) is not None:
|
||||
logging.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1))
|
||||
log.info("Found pattern %r in %d seconds via stdout", pattern, (t2 - t1))
|
||||
return
|
||||
if re.search(pattern, '\n'.join(self.captured_stderr), re.IGNORECASE) is not None:
|
||||
logging.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1))
|
||||
log.info("Found pattern %r in %d seconds via stderr", pattern, (t2 - t1))
|
||||
return
|
||||
time.sleep(0.1)
|
||||
|
||||
|
Reference in New Issue
Block a user