200 lines
5.8 KiB
Python
200 lines
5.8 KiB
Python
import functools
|
|
import sys
|
|
import time
|
|
|
|
from dcos import emitting, util
|
|
from dcos.errors import DCOSException
|
|
|
|
logger = util.get_logger(__name__)
|
|
emitter = emitting.FlatEmitter()
|
|
|
|
|
|
def _no_file_exception():
|
|
return DCOSException('No files exist. Exiting.')
|
|
|
|
|
|
def log_files(mesos_files, follow, lines):
|
|
"""Print the contents of the given `mesos_files`. Behaves like unix
|
|
tail.
|
|
|
|
:param mesos_files: file objects to print
|
|
:type mesos_files: [MesosFile]
|
|
:param follow: same as unix tail's -f
|
|
:type follow: bool
|
|
:param lines: number of lines to print
|
|
:type lines: int
|
|
:rtype: None
|
|
"""
|
|
|
|
fn = functools.partial(_read_last_lines, lines)
|
|
curr_header, mesos_files = _stream_files(None, fn, mesos_files)
|
|
if not mesos_files:
|
|
raise _no_file_exception()
|
|
|
|
while follow:
|
|
# This flush is needed only for testing, since stdout is fully
|
|
# buffered (as opposed to line-buffered) when redirected to a
|
|
# pipe. So if we don't flush, our --follow tests, which use a
|
|
# pipe, never see the data
|
|
sys.stdout.flush()
|
|
|
|
curr_header, mesos_files = _stream_files(curr_header,
|
|
_read_rest,
|
|
mesos_files)
|
|
if not mesos_files:
|
|
raise _no_file_exception()
|
|
time.sleep(1)
|
|
|
|
|
|
def _stream_files(curr_header, fn, mesos_files):
|
|
"""Apply `fn` in parallel to each file in `mesos_files`. `fn` must
|
|
return a list of strings, and these strings are then printed
|
|
serially as separate lines.
|
|
|
|
`curr_header` is the most recently printed header. It's used to
|
|
group lines. Each line has an associated header (e.g. a string
|
|
representation of the MesosFile it was read from), and we only
|
|
print the header before printing a line with a different header
|
|
than the previous line. This effectively groups lines together
|
|
when the have the same header.
|
|
|
|
:param curr_header: Most recently printed header
|
|
:type curr_header: str
|
|
:param fn: function that reads a sequence of lines from a MesosFile
|
|
:type fn: MesosFile -> [str]
|
|
:param mesos_files: files to read
|
|
:type mesos_files: [MesosFile]
|
|
:returns: Returns the most recently printed header, and a list of
|
|
files that are still reachable. Once we detect a file is
|
|
unreachable, we stop trying to read from it.
|
|
:rtype: (str, [MesosFile])
|
|
"""
|
|
|
|
reachable_files = list(mesos_files)
|
|
|
|
# TODO switch to map
|
|
for job, mesos_file in util.stream(fn, mesos_files):
|
|
try:
|
|
lines = job.result()
|
|
except DCOSException as e:
|
|
# The read function might throw an exception if read.json
|
|
# is unavailable, or if the file doesn't exist in the
|
|
# sandbox. In any case, we silently remove the file and
|
|
# continue.
|
|
logger.exception("Error reading file: {}".format(e))
|
|
|
|
reachable_files.remove(mesos_file)
|
|
continue
|
|
|
|
if lines:
|
|
curr_header = _output(curr_header,
|
|
len(reachable_files) > 1,
|
|
str(mesos_file),
|
|
lines)
|
|
|
|
return curr_header, reachable_files
|
|
|
|
|
|
def _output(curr_header, output_header, header, lines):
|
|
"""Prints a sequence of lines. If `header` is different than
|
|
`curr_header`, first print the header.
|
|
|
|
:param curr_header: most recently printed header
|
|
:type curr_header: str
|
|
:param output_header: whether or not to output the header
|
|
:type output_header: bool
|
|
:param header: header for `lines`
|
|
:type header: str
|
|
:param lines: lines to print
|
|
:type lines: [str]
|
|
:returns: `header`
|
|
:rtype: str
|
|
"""
|
|
|
|
if lines:
|
|
if output_header and header != curr_header:
|
|
emitter.publish('===> {} <==='.format(header))
|
|
for line in lines:
|
|
emitter.publish(line)
|
|
return header
|
|
|
|
|
|
# A liberal estimate of a line size. Used to estimate how much data
|
|
# we need to fetch from a file when we want to read N lines.
|
|
LINE_SIZE = 200
|
|
|
|
|
|
def _read_last_lines(num_lines, mesos_file):
|
|
"""Returns the last `num_lines` of a file, or less if the file is
|
|
smaller. Seeks to EOF.
|
|
|
|
:param num_lines: number of lines to read
|
|
:type num_lines: int
|
|
:param mesos_file: file to read
|
|
:type mesos_file: MesosFile
|
|
:returns: lines read
|
|
:rtype: [str]
|
|
"""
|
|
|
|
file_size = mesos_file.size()
|
|
|
|
# estimate how much data we need to fetch to read `num_lines`.
|
|
fetch_size = LINE_SIZE * num_lines
|
|
|
|
end = file_size
|
|
start = max(end - fetch_size, 0)
|
|
data = ''
|
|
while True:
|
|
# fetch data
|
|
mesos_file.seek(start)
|
|
data = mesos_file.read(end - start) + data
|
|
|
|
# break if we have enough lines
|
|
data_tmp = _strip_trailing_newline(data)
|
|
lines = data_tmp.split('\n')
|
|
if len(lines) > num_lines:
|
|
ret = lines[-num_lines:]
|
|
break
|
|
elif start == 0:
|
|
ret = lines
|
|
break
|
|
|
|
# otherwise shift our read window and repeat
|
|
end = start
|
|
start = max(end - fetch_size, 0)
|
|
|
|
mesos_file.seek(file_size)
|
|
return ret
|
|
|
|
|
|
def _read_rest(mesos_file):
|
|
""" Reads the rest of the file, and returns the lines.
|
|
|
|
:param mesos_file: file to read
|
|
:type mesos_file: MesosFile
|
|
:returns: lines read
|
|
:rtype: [str]
|
|
"""
|
|
data = mesos_file.read()
|
|
if data == '':
|
|
return []
|
|
else:
|
|
data_tmp = _strip_trailing_newline(data)
|
|
return data_tmp.split('\n')
|
|
|
|
|
|
def _strip_trailing_newline(s):
|
|
"""Returns a modified version of the string with the last character
|
|
truncated if it's a newline.
|
|
|
|
:param s: string to trim
|
|
:type s: str
|
|
:returns: modified string
|
|
:rtype: str
|
|
"""
|
|
|
|
if s == "":
|
|
return s
|
|
else:
|
|
return s[:-1] if s[-1] == '\n' else s
|