mirror of
https://github.com/fish-shell/fish-shell.git
synced 2025-01-23 09:15:32 +08:00
64b5a22274
In #7459, asan printed error output. However, because we had a failure on stdout already, littlecheck would only print the first unmatched line from stderr, leading to output like ``` additional output on stderr:1: ================================================================= ``` Which is of course entirely useless. So in that case we just let it print *all* unmatched stderr lines, so you'd get the full asan output, which presumably is of more use. This upgrades littlecheck to 5f7deafcea4e58dd3d369eae069a3781bb6ce75e.
572 lines
19 KiB
Python
Executable File
572 lines
19 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
""" Command line test driver. """
|
|
|
|
from __future__ import unicode_literals
|
|
from __future__ import print_function
|
|
|
|
import argparse
|
|
from collections import deque
|
|
import datetime
|
|
import io
|
|
import re
|
|
import shlex
|
|
import subprocess
|
|
import sys
|
|
|
|
# Directives can occur at the beginning of a line, or anywhere in a line that does not start with #.
|
|
COMMENT_RE = r'^(?:[^#].*)?#\s*'
|
|
|
|
# A regex showing how to run the file.
|
|
RUN_RE = re.compile(COMMENT_RE + r"RUN:\s+(.*)\n")
|
|
|
|
# A regex capturing lines that should be checked against stdout.
|
|
CHECK_STDOUT_RE = re.compile(COMMENT_RE + r"CHECK:\s+(.*)\n")
|
|
|
|
# A regex capturing lines that should be checked against stderr.
|
|
CHECK_STDERR_RE = re.compile(COMMENT_RE + r"CHECKERR:\s+(.*)\n")
|
|
|
|
|
|
class Config(object):
|
|
def __init__(self):
|
|
# Whether to have verbose output.
|
|
self.verbose = False
|
|
# Whether output gets ANSI colorization.
|
|
self.colorize = False
|
|
# Whether to show which file was tested.
|
|
self.progress = False
|
|
# How many after lines to print
|
|
self.after = 5
|
|
# How many before lines to print
|
|
self.before = 5
|
|
|
|
def colors(self):
|
|
""" Return a dictionary mapping color names to ANSI escapes """
|
|
|
|
def ansic(n):
|
|
return "\033[%dm" % n if self.colorize else ""
|
|
|
|
return {
|
|
"RESET": ansic(0),
|
|
"BOLD": ansic(1),
|
|
"NORMAL": ansic(39),
|
|
"BLACK": ansic(30),
|
|
"RED": ansic(31),
|
|
"GREEN": ansic(32),
|
|
"YELLOW": ansic(33),
|
|
"BLUE": ansic(34),
|
|
"MAGENTA": ansic(35),
|
|
"CYAN": ansic(36),
|
|
"LIGHTGRAY": ansic(37),
|
|
"DARKGRAY": ansic(90),
|
|
"LIGHTRED": ansic(91),
|
|
"LIGHTGREEN": ansic(92),
|
|
"LIGHTYELLOW": ansic(93),
|
|
"LIGHTBLUE": ansic(94),
|
|
"LIGHTMAGENTA": ansic(95),
|
|
"LIGHTCYAN": ansic(96),
|
|
"WHITE": ansic(97),
|
|
}
|
|
|
|
|
|
def output(*args):
|
|
print("".join(args) + "\n")
|
|
|
|
|
|
import unicodedata
|
|
|
|
|
|
def esc(m):
|
|
map = {
|
|
"\n": "\\n",
|
|
"\\": "\\\\",
|
|
"'": "\\'",
|
|
'"': '\\"',
|
|
"\a": "\\a",
|
|
"\b": "\\b",
|
|
"\f": "\\f",
|
|
"\r": "\\r",
|
|
"\t": "\\t",
|
|
"\v": "\\v",
|
|
}
|
|
if m in map:
|
|
return map[m]
|
|
if unicodedata.category(m)[0] == "C":
|
|
return "\\x{:02x}".format(ord(m))
|
|
else:
|
|
return m
|
|
|
|
|
|
def escape_string(s):
|
|
return "".join(esc(ch) for ch in s)
|
|
|
|
|
|
class CheckerError(Exception):
|
|
"""Exception subclass for check line parsing.
|
|
|
|
Attributes:
|
|
line: the Line object on which the exception occurred.
|
|
"""
|
|
|
|
def __init__(self, message, line=None):
|
|
super(CheckerError, self).__init__(message)
|
|
self.line = line
|
|
|
|
|
|
class Line(object):
|
|
""" A line that remembers where it came from. """
|
|
|
|
def __init__(self, text, number, file):
|
|
self.text = text
|
|
self.number = number
|
|
self.file = file
|
|
|
|
def subline(self, text):
|
|
""" Return a substring of our line with the given text, preserving number and file. """
|
|
return Line(text, self.number, self.file)
|
|
|
|
@staticmethod
|
|
def readfile(file, name):
|
|
return [Line(text, idx + 1, name) for idx, text in enumerate(file)]
|
|
|
|
def is_empty_space(self):
|
|
return not self.text or self.text.isspace()
|
|
|
|
|
|
class RunCmd(object):
|
|
""" A command to run on a given Checker.
|
|
|
|
Attributes:
|
|
args: Unexpanded shell command as a string.
|
|
"""
|
|
|
|
def __init__(self, args, line):
|
|
self.args = args
|
|
self.line = line
|
|
|
|
@staticmethod
|
|
def parse(line):
|
|
if not shlex.split(line.text):
|
|
raise CheckerError("Invalid RUN command", line)
|
|
return RunCmd(line.text, line)
|
|
|
|
|
|
class TestFailure(object):
|
|
def __init__(self, line, check, testrun, before=None, after=None):
|
|
self.line = line
|
|
self.check = check
|
|
self.testrun = testrun
|
|
self.error_annotation_lines = None
|
|
# The output that comes *after* the failure.
|
|
self.after = after
|
|
self.before = before
|
|
|
|
def message(self):
|
|
afterlines = self.testrun.config.after
|
|
fields = self.testrun.config.colors()
|
|
fields["name"] = self.testrun.name
|
|
fields["subbed_command"] = self.testrun.subbed_command
|
|
if self.line:
|
|
fields.update(
|
|
{
|
|
"output_file": self.line.file,
|
|
"output_lineno": self.line.number,
|
|
"output_line": self.line.text.rstrip("\n"),
|
|
}
|
|
)
|
|
if self.check:
|
|
fields.update(
|
|
{
|
|
"input_file": self.check.line.file,
|
|
"input_lineno": self.check.line.number,
|
|
"input_line": self.check.line.text,
|
|
"check_type": self.check.type,
|
|
}
|
|
)
|
|
filemsg = "" if self.testrun.config.progress else " in {name}"
|
|
fmtstrs = ["{RED}Failure{RESET}" + filemsg + ":", ""]
|
|
if self.line and self.check:
|
|
fmtstrs += [
|
|
" The {check_type} on line {input_lineno} wants:",
|
|
" {BOLD}{input_line}{RESET}",
|
|
"",
|
|
" which failed to match line {output_file}:{output_lineno}:",
|
|
" {BOLD}{output_line}{RESET}",
|
|
"",
|
|
]
|
|
|
|
elif self.check:
|
|
fmtstrs += [
|
|
" The {check_type} on line {input_lineno} wants:",
|
|
" {BOLD}{input_line}{RESET}",
|
|
"",
|
|
" but there was no remaining output to match.",
|
|
"",
|
|
]
|
|
else:
|
|
fmtstrs += [
|
|
" There were no remaining checks left to match {output_file}:{output_lineno}:",
|
|
" {BOLD}{output_line}{RESET}",
|
|
"",
|
|
]
|
|
if self.error_annotation_lines:
|
|
fields["error_annotation"] = " ".join([x.text for x in self.error_annotation_lines])
|
|
fields["error_annotation_lineno"] = str(self.error_annotation_lines[0].number)
|
|
if len(self.error_annotation_lines) > 1:
|
|
fields["error_annotation_lineno"] += ":" + str(self.error_annotation_lines[-1].number)
|
|
fmtstrs += [
|
|
" additional output on stderr:{error_annotation_lineno}:",
|
|
" {BOLD}{error_annotation}{RESET}",
|
|
]
|
|
if self.before or self.after:
|
|
fmtstrs += [" Context:"]
|
|
|
|
if self.before:
|
|
fields["before_output"] = " ".join(self.before)[:-1]
|
|
fmtstrs += [" {BOLD}{before_output}"]
|
|
|
|
fmtstrs += [
|
|
" {RED}{output_line}{RESET} <= does not match '{LIGHTBLUE}{input_line}{RESET}'",
|
|
]
|
|
|
|
if self.after is not None:
|
|
fields["additional_output"] = " ".join(self.after[:afterlines])
|
|
fmtstrs += [" {BOLD}{additional_output}{RESET}"]
|
|
fmtstrs += [" when running command:", " {subbed_command}"]
|
|
return "\n".join(fmtstrs).format(**fields)
|
|
|
|
def print_message(self):
|
|
""" Print our message to stdout. """
|
|
print(self.message())
|
|
|
|
|
|
def perform_substitution(input_str, subs):
|
|
""" Perform the substitutions described by subs to str
|
|
Return the substituted string.
|
|
"""
|
|
# Sort our substitutions into a list of tuples (key, value), descending by length.
|
|
# It needs to be descending because we need to try longer substitutions first.
|
|
subs_ordered = sorted(subs.items(), key=lambda s: len(s[0]), reverse=True)
|
|
|
|
def subber(m):
|
|
# We get the entire sequence of characters.
|
|
# Replace just the prefix and return it.
|
|
text = m.group(1)
|
|
for key, replacement in subs_ordered:
|
|
if text.startswith(key):
|
|
return replacement + text[len(key) :]
|
|
# No substitution found, so we default to running it as-is,
|
|
# which will end up running it via $PATH.
|
|
return text
|
|
|
|
return re.sub(r"%(%|[a-zA-Z0-9_-]+)", subber, input_str)
|
|
|
|
|
|
class TestRun(object):
|
|
def __init__(self, name, runcmd, checker, subs, config):
|
|
self.name = name
|
|
self.runcmd = runcmd
|
|
self.subbed_command = perform_substitution(runcmd.args, subs)
|
|
self.checker = checker
|
|
self.subs = subs
|
|
self.config = config
|
|
|
|
def check(self, lines, checks):
|
|
# Reverse our lines and checks so we can pop off the end.
|
|
lineq = lines[::-1]
|
|
checkq = checks[::-1]
|
|
# We keep the last couple of lines in a deque so we can show context.
|
|
before = deque(maxlen=self.config.before)
|
|
while lineq and checkq:
|
|
line = lineq[-1]
|
|
check = checkq[-1]
|
|
if check.regex.match(line.text):
|
|
# This line matched this checker, continue on.
|
|
lineq.pop()
|
|
checkq.pop()
|
|
before.append(line)
|
|
elif line.is_empty_space():
|
|
# Skip all whitespace input lines.
|
|
lineq.pop()
|
|
else:
|
|
# Failed to match.
|
|
lineq.pop()
|
|
line.text = escape_string(line.text.strip()) + "\n"
|
|
# Add context, ignoring empty lines.
|
|
return TestFailure(
|
|
line,
|
|
check,
|
|
self,
|
|
before=[escape_string(line.text.strip()) + "\n" for line in before],
|
|
after=[
|
|
escape_string(line.text.strip()) + "\n"
|
|
for line in lineq[::-1]
|
|
if not line.is_empty_space()
|
|
],
|
|
)
|
|
# Drain empties.
|
|
while lineq and lineq[-1].is_empty_space():
|
|
lineq.pop()
|
|
# If there's still lines or checkers, we have a failure.
|
|
# Otherwise it's success.
|
|
if lineq:
|
|
return TestFailure(lineq[-1], None, self)
|
|
elif checkq:
|
|
return TestFailure(None, checkq[-1], self)
|
|
else:
|
|
return None
|
|
|
|
def run(self):
|
|
""" Run the command. Return a TestFailure, or None. """
|
|
|
|
def split_by_newlines(s):
|
|
""" Decode a string and split it by newlines only,
|
|
retaining the newlines.
|
|
"""
|
|
return [s + "\n" for s in s.decode("utf-8").split("\n")]
|
|
|
|
PIPE = subprocess.PIPE
|
|
if self.config.verbose:
|
|
print(self.subbed_command)
|
|
proc = subprocess.Popen(
|
|
self.subbed_command,
|
|
stdin=PIPE,
|
|
stdout=PIPE,
|
|
stderr=PIPE,
|
|
shell=True,
|
|
close_fds=True, # For Python 2.6 as shipped on RHEL 6
|
|
)
|
|
stdout, stderr = proc.communicate()
|
|
# HACK: This is quite cheesy: POSIX specifies that sh should return 127 for a missing command.
|
|
# Technically it's also possible to return it in other conditions.
|
|
# Practically, that's *probably* not going to happen.
|
|
status = proc.returncode
|
|
if status == 127:
|
|
raise CheckerError("Command could not be found: " + self.subbed_command)
|
|
|
|
outlines = [
|
|
Line(text, idx + 1, "stdout")
|
|
for idx, text in enumerate(split_by_newlines(stdout))
|
|
]
|
|
errlines = [
|
|
Line(text, idx + 1, "stderr")
|
|
for idx, text in enumerate(split_by_newlines(stderr))
|
|
]
|
|
outfail = self.check(outlines, self.checker.outchecks)
|
|
errfail = self.check(errlines, self.checker.errchecks)
|
|
# It's possible that something going wrong on stdout resulted in new
|
|
# text being printed on stderr. If we have an outfailure, and either
|
|
# non-matching or unmatched stderr text, then annotate the outfail
|
|
# with it.
|
|
if outfail and errfail and errfail.line:
|
|
outfail.error_annotation_lines = errlines[errfail.line.number - 1:]
|
|
# Trim a trailing newline
|
|
if outfail.error_annotation_lines[-1].text == "\n":
|
|
del outfail.error_annotation_lines[-1]
|
|
return outfail if outfail else errfail
|
|
|
|
|
|
class CheckCmd(object):
|
|
def __init__(self, line, checktype, regex):
|
|
self.line = line
|
|
self.type = checktype
|
|
self.regex = regex
|
|
|
|
@staticmethod
|
|
def parse(line, checktype):
|
|
# type: (Line) -> CheckCmd
|
|
# Everything inside {{}} is a regular expression.
|
|
# Everything outside of it is a literal string.
|
|
# Split around {{...}}. Then every odd index will be a regex, and
|
|
# evens will be literals.
|
|
# Note that if {{...}} appears first we will get an empty string in
|
|
# the split array, so the {{...}} matches are always at odd indexes.
|
|
bracket_re = re.compile(
|
|
r"""
|
|
\{\{ # Two open brackets
|
|
(.*?) # Nongreedy capture
|
|
\}\} # Two close brackets
|
|
""",
|
|
re.VERBOSE,
|
|
)
|
|
pieces = bracket_re.split(line.text)
|
|
even = True
|
|
re_strings = []
|
|
for piece in pieces:
|
|
if even:
|
|
# piece is a literal string.
|
|
re_strings.append(re.escape(piece))
|
|
else:
|
|
# piece is a regex (found inside {{...}}).
|
|
# Verify the regex can be compiled.
|
|
try:
|
|
re.compile(piece)
|
|
except re.error:
|
|
raise CheckerError("Invalid regular expression: '%s'" % piece, line)
|
|
re_strings.append(piece)
|
|
even = not even
|
|
# Enclose each piece in a non-capturing group.
|
|
# This ensures that lower-precedence operators don't trip up catenation.
|
|
# For example: {{b|c}}d would result in /b|cd/ which is different.
|
|
# Backreferences are assumed to match across the entire string.
|
|
re_strings = ["(?:%s)" % s for s in re_strings]
|
|
# Anchor at beginning and end (allowing arbitrary whitespace), and maybe
|
|
# a terminating newline.
|
|
# We need the anchors because Python's match() matches an arbitrary prefix,
|
|
# not the entire string.
|
|
re_strings = [r"^\s*"] + re_strings + [r"\s*\n?$"]
|
|
full_re = re.compile("".join(re_strings))
|
|
return CheckCmd(line, checktype, full_re)
|
|
|
|
|
|
class Checker(object):
|
|
def __init__(self, name, lines):
|
|
self.name = name
|
|
# Helper to yield subline containing group1 from all matching lines.
|
|
def group1s(regex):
|
|
for line in lines:
|
|
m = regex.match(line.text)
|
|
if m:
|
|
yield line.subline(m.group(1))
|
|
|
|
# Find run commands.
|
|
self.runcmds = [RunCmd.parse(sl) for sl in group1s(RUN_RE)]
|
|
if not self.runcmds:
|
|
# If no RUN command has been given, fall back to the shebang.
|
|
if lines[0].text.startswith("#!"):
|
|
# Remove the "#!" at the beginning, and the newline at the end.
|
|
self.runcmds = [RunCmd(lines[0].text[2:-1] + " %s", lines[0])]
|
|
else:
|
|
raise CheckerError("No runlines ('# RUN') found")
|
|
|
|
# Find check cmds.
|
|
self.outchecks = [
|
|
CheckCmd.parse(sl, "CHECK") for sl in group1s(CHECK_STDOUT_RE)
|
|
]
|
|
self.errchecks = [
|
|
CheckCmd.parse(sl, "CHECKERR") for sl in group1s(CHECK_STDERR_RE)
|
|
]
|
|
|
|
|
|
def check_file(input_file, name, subs, config, failure_handler):
|
|
""" Check a single file. Return a True on success, False on error. """
|
|
success = True
|
|
lines = Line.readfile(input_file, name)
|
|
checker = Checker(name, lines)
|
|
for runcmd in checker.runcmds:
|
|
failure = TestRun(name, runcmd, checker, subs, config).run()
|
|
if failure:
|
|
failure_handler(failure)
|
|
success = False
|
|
return success
|
|
|
|
|
|
def check_path(path, subs, config, failure_handler):
|
|
with io.open(path, encoding="utf-8") as fd:
|
|
return check_file(fd, path, subs, config, failure_handler)
|
|
|
|
|
|
def parse_subs(subs):
|
|
""" Given a list of input substitutions like 'foo=bar',
|
|
return a dictionary like {foo:bar}, or exit if invalid.
|
|
"""
|
|
result = {}
|
|
for sub in subs:
|
|
try:
|
|
key, val = sub.split("=", 1)
|
|
if not key:
|
|
print("Invalid substitution %s: empty key" % sub)
|
|
sys.exit(1)
|
|
if not val:
|
|
print("Invalid substitution %s: empty value" % sub)
|
|
sys.exit(1)
|
|
result[key] = val
|
|
except ValueError:
|
|
print("Invalid substitution %s: equal sign not found" % sub)
|
|
sys.exit(1)
|
|
return result
|
|
|
|
|
|
def get_argparse():
|
|
""" Return a littlecheck argument parser. """
|
|
parser = argparse.ArgumentParser(
|
|
description="littlecheck: command line tool tester."
|
|
)
|
|
parser.add_argument(
|
|
"-s",
|
|
"--substitute",
|
|
type=str,
|
|
help="Add a new substitution for RUN lines. Example: bash=/bin/bash",
|
|
action="append",
|
|
default=[],
|
|
)
|
|
parser.add_argument(
|
|
"-p",
|
|
"--progress",
|
|
action="store_true",
|
|
dest="progress",
|
|
help="Show the files to be checked",
|
|
default=False,
|
|
)
|
|
parser.add_argument("file", nargs="+", help="File to check")
|
|
parser.add_argument(
|
|
"-A",
|
|
"--after",
|
|
type=int,
|
|
help="How many non-empty lines of output after a failure to print (default: 5)",
|
|
action="store",
|
|
default=5,
|
|
)
|
|
parser.add_argument(
|
|
"-B",
|
|
"--before",
|
|
type=int,
|
|
help="How many non-empty lines of output before a failure to print (default: 5)",
|
|
action="store",
|
|
default=5,
|
|
)
|
|
return parser
|
|
|
|
|
|
def main():
|
|
args = get_argparse().parse_args()
|
|
# Default substitution is %% -> %
|
|
def_subs = {"%": "%"}
|
|
def_subs.update(parse_subs(args.substitute))
|
|
|
|
failure_count = 0
|
|
config = Config()
|
|
config.colorize = sys.stdout.isatty()
|
|
config.progress = args.progress
|
|
fields = config.colors()
|
|
config.after = args.after
|
|
config.before = args.before
|
|
if config.before < 0:
|
|
raise ValueError("Before must be at least 0")
|
|
if config.after < 0:
|
|
raise ValueError("After must be at least 0")
|
|
|
|
for path in args.file:
|
|
fields["path"] = path
|
|
if config.progress:
|
|
print("Testing file {path} ... ".format(**fields), end="")
|
|
sys.stdout.flush()
|
|
subs = def_subs.copy()
|
|
subs["s"] = path
|
|
starttime = datetime.datetime.now()
|
|
if not check_path(path, subs, config, TestFailure.print_message):
|
|
failure_count += 1
|
|
elif config.progress:
|
|
endtime = datetime.datetime.now()
|
|
duration_ms = round((endtime - starttime).total_seconds() * 1000)
|
|
print(
|
|
"{GREEN}ok{RESET} ({duration} ms)".format(
|
|
duration=duration_ms, **fields
|
|
)
|
|
)
|
|
sys.exit(failure_count)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|