1329 lines
51 KiB
Python
1329 lines
51 KiB
Python
# Licensed under the Apache License: http://www.apache.org/licenses/LICENSE-2.0
|
|
# For details: https://github.com/nedbat/coveragepy/blob/master/NOTICE.txt
|
|
|
|
"""Code parsing for coverage.py."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import ast
|
|
import functools
|
|
import collections
|
|
import os
|
|
import re
|
|
import sys
|
|
import token
|
|
import tokenize
|
|
|
|
from collections.abc import Iterable, Sequence
|
|
from dataclasses import dataclass
|
|
from types import CodeType
|
|
from typing import cast, Callable, Optional, Protocol
|
|
|
|
from coverage import env
|
|
from coverage.bytecode import code_objects
|
|
from coverage.debug import short_stack
|
|
from coverage.exceptions import NoSource, NotPython
|
|
from coverage.misc import isolate_module, nice_pair
|
|
from coverage.phystokens import generate_tokens
|
|
from coverage.types import TArc, TLineNo
|
|
|
|
os = isolate_module(os)
|
|
|
|
|
|
class PythonParser:
|
|
"""Parse code to find executable lines, excluded lines, etc.
|
|
|
|
This information is all based on static analysis: no code execution is
|
|
involved.
|
|
|
|
"""
|
|
def __init__(
|
|
self,
|
|
text: str | None = None,
|
|
filename: str | None = None,
|
|
exclude: str | None = None,
|
|
) -> None:
|
|
"""
|
|
Source can be provided as `text`, the text itself, or `filename`, from
|
|
which the text will be read. Excluded lines are those that match
|
|
`exclude`, a regex string.
|
|
|
|
"""
|
|
assert text or filename, "PythonParser needs either text or filename"
|
|
self.filename = filename or "<code>"
|
|
if text is not None:
|
|
self.text: str = text
|
|
else:
|
|
from coverage.python import get_python_source
|
|
try:
|
|
self.text = get_python_source(self.filename)
|
|
except OSError as err:
|
|
raise NoSource(f"No source for code: '{self.filename}': {err}") from err
|
|
|
|
self.exclude = exclude
|
|
|
|
# The parsed AST of the text.
|
|
self._ast_root: ast.AST | None = None
|
|
|
|
# The normalized line numbers of the statements in the code. Exclusions
|
|
# are taken into account, and statements are adjusted to their first
|
|
# lines.
|
|
self.statements: set[TLineNo] = set()
|
|
|
|
# The normalized line numbers of the excluded lines in the code,
|
|
# adjusted to their first lines.
|
|
self.excluded: set[TLineNo] = set()
|
|
|
|
# The raw_* attributes are only used in this class, and in
|
|
# lab/parser.py to show how this class is working.
|
|
|
|
# The line numbers that start statements, as reported by the line
|
|
# number table in the bytecode.
|
|
self.raw_statements: set[TLineNo] = set()
|
|
|
|
# The raw line numbers of excluded lines of code, as marked by pragmas.
|
|
self.raw_excluded: set[TLineNo] = set()
|
|
|
|
# The line numbers of docstring lines.
|
|
self.raw_docstrings: set[TLineNo] = set()
|
|
|
|
# Internal detail, used by lab/parser.py.
|
|
self.show_tokens = False
|
|
|
|
# A dict mapping line numbers to lexical statement starts for
|
|
# multi-line statements.
|
|
self._multiline: dict[TLineNo, TLineNo] = {}
|
|
|
|
# Lazily-created arc data, and missing arc descriptions.
|
|
self._all_arcs: set[TArc] | None = None
|
|
self._missing_arc_fragments: TArcFragments | None = None
|
|
self._with_jump_fixers: dict[TArc, tuple[TArc, TArc]] = {}
|
|
|
|
def lines_matching(self, regex: str) -> set[TLineNo]:
|
|
"""Find the lines matching a regex.
|
|
|
|
Returns a set of line numbers, the lines that contain a match for
|
|
`regex`. The entire line needn't match, just a part of it.
|
|
Handles multiline regex patterns.
|
|
|
|
"""
|
|
matches: set[TLineNo] = set()
|
|
|
|
last_start = 0
|
|
last_start_line = 0
|
|
for match in re.finditer(regex, self.text, flags=re.MULTILINE):
|
|
start, end = match.span()
|
|
start_line = last_start_line + self.text.count('\n', last_start, start)
|
|
end_line = last_start_line + self.text.count('\n', last_start, end)
|
|
matches.update(self._multiline.get(i, i) for i in range(start_line + 1, end_line + 2))
|
|
last_start = start
|
|
last_start_line = start_line
|
|
return matches
|
|
|
|
def _raw_parse(self) -> None:
|
|
"""Parse the source to find the interesting facts about its lines.
|
|
|
|
A handful of attributes are updated.
|
|
|
|
"""
|
|
# Find lines which match an exclusion pattern.
|
|
if self.exclude:
|
|
self.raw_excluded = self.lines_matching(self.exclude)
|
|
self.excluded = set(self.raw_excluded)
|
|
|
|
# The current number of indents.
|
|
indent: int = 0
|
|
# An exclusion comment will exclude an entire clause at this indent.
|
|
exclude_indent: int = 0
|
|
# Are we currently excluding lines?
|
|
excluding: bool = False
|
|
# The line number of the first line in a multi-line statement.
|
|
first_line: int = 0
|
|
# Is the file empty?
|
|
empty: bool = True
|
|
# Parenthesis (and bracket) nesting level.
|
|
nesting: int = 0
|
|
|
|
assert self.text is not None
|
|
tokgen = generate_tokens(self.text)
|
|
for toktype, ttext, (slineno, _), (elineno, _), ltext in tokgen:
|
|
if self.show_tokens: # pragma: debugging
|
|
print("%10s %5s %-20r %r" % (
|
|
tokenize.tok_name.get(toktype, toktype),
|
|
nice_pair((slineno, elineno)), ttext, ltext,
|
|
))
|
|
if toktype == token.INDENT:
|
|
indent += 1
|
|
elif toktype == token.DEDENT:
|
|
indent -= 1
|
|
elif toktype == token.OP:
|
|
if ttext == ":" and nesting == 0:
|
|
should_exclude = (
|
|
self.excluded.intersection(range(first_line, elineno + 1))
|
|
)
|
|
if not excluding and should_exclude:
|
|
# Start excluding a suite. We trigger off of the colon
|
|
# token so that the #pragma comment will be recognized on
|
|
# the same line as the colon.
|
|
self.excluded.add(elineno)
|
|
exclude_indent = indent
|
|
excluding = True
|
|
elif ttext in "([{":
|
|
nesting += 1
|
|
elif ttext in ")]}":
|
|
nesting -= 1
|
|
elif toktype == token.NEWLINE:
|
|
if first_line and elineno != first_line:
|
|
# We're at the end of a line, and we've ended on a
|
|
# different line than the first line of the statement,
|
|
# so record a multi-line range.
|
|
for l in range(first_line, elineno+1):
|
|
self._multiline[l] = first_line
|
|
first_line = 0
|
|
|
|
if ttext.strip() and toktype != tokenize.COMMENT:
|
|
# A non-white-space token.
|
|
empty = False
|
|
if not first_line:
|
|
# The token is not white space, and is the first in a statement.
|
|
first_line = slineno
|
|
# Check whether to end an excluded suite.
|
|
if excluding and indent <= exclude_indent:
|
|
excluding = False
|
|
if excluding:
|
|
self.excluded.add(elineno)
|
|
|
|
# Find the starts of the executable statements.
|
|
if not empty:
|
|
byte_parser = ByteParser(self.text, filename=self.filename)
|
|
self.raw_statements.update(byte_parser._find_statements())
|
|
|
|
self.excluded = self.first_lines(self.excluded)
|
|
|
|
# AST lets us find classes, docstrings, and decorator-affected
|
|
# functions and classes.
|
|
assert self._ast_root is not None
|
|
for node in ast.walk(self._ast_root):
|
|
# Find docstrings.
|
|
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef, ast.Module)):
|
|
if node.body:
|
|
first = node.body[0]
|
|
if (
|
|
isinstance(first, ast.Expr)
|
|
and isinstance(first.value, ast.Constant)
|
|
and isinstance(first.value.value, str)
|
|
):
|
|
self.raw_docstrings.update(
|
|
range(first.lineno, cast(int, first.end_lineno) + 1)
|
|
)
|
|
# Exclusions carry from decorators and signatures to the bodies of
|
|
# functions and classes.
|
|
if isinstance(node, (ast.ClassDef, ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
first_line = min((d.lineno for d in node.decorator_list), default=node.lineno)
|
|
if self.excluded.intersection(range(first_line, node.lineno + 1)):
|
|
self.excluded.update(range(first_line, cast(int, node.end_lineno) + 1))
|
|
|
|
@functools.lru_cache(maxsize=1000)
|
|
def first_line(self, lineno: TLineNo) -> TLineNo:
|
|
"""Return the first line number of the statement including `lineno`."""
|
|
if lineno < 0:
|
|
lineno = -self._multiline.get(-lineno, -lineno)
|
|
else:
|
|
lineno = self._multiline.get(lineno, lineno)
|
|
return lineno
|
|
|
|
def first_lines(self, linenos: Iterable[TLineNo]) -> set[TLineNo]:
|
|
"""Map the line numbers in `linenos` to the correct first line of the
|
|
statement.
|
|
|
|
Returns a set of the first lines.
|
|
|
|
"""
|
|
return {self.first_line(l) for l in linenos}
|
|
|
|
def translate_lines(self, lines: Iterable[TLineNo]) -> set[TLineNo]:
|
|
"""Implement `FileReporter.translate_lines`."""
|
|
return self.first_lines(lines)
|
|
|
|
def translate_arcs(self, arcs: Iterable[TArc]) -> set[TArc]:
|
|
"""Implement `FileReporter.translate_arcs`."""
|
|
return {(self.first_line(a), self.first_line(b)) for (a, b) in self.fix_with_jumps(arcs)}
|
|
|
|
def parse_source(self) -> None:
|
|
"""Parse source text to find executable lines, excluded lines, etc.
|
|
|
|
Sets the .excluded and .statements attributes, normalized to the first
|
|
line of multi-line statements.
|
|
|
|
"""
|
|
try:
|
|
self._ast_root = ast.parse(self.text)
|
|
self._raw_parse()
|
|
except (tokenize.TokenError, IndentationError, SyntaxError) as err:
|
|
if hasattr(err, "lineno"):
|
|
lineno = err.lineno # IndentationError
|
|
else:
|
|
lineno = err.args[1][0] # TokenError
|
|
raise NotPython(
|
|
f"Couldn't parse '{self.filename}' as Python source: " +
|
|
f"{err.args[0]!r} at line {lineno}",
|
|
) from err
|
|
|
|
ignore = self.excluded | self.raw_docstrings
|
|
starts = self.raw_statements - ignore
|
|
self.statements = self.first_lines(starts) - ignore
|
|
|
|
def arcs(self) -> set[TArc]:
|
|
"""Get information about the arcs available in the code.
|
|
|
|
Returns a set of line number pairs. Line numbers have been normalized
|
|
to the first line of multi-line statements.
|
|
|
|
"""
|
|
if self._all_arcs is None:
|
|
self._analyze_ast()
|
|
assert self._all_arcs is not None
|
|
return self._all_arcs
|
|
|
|
def _analyze_ast(self) -> None:
|
|
"""Run the AstArcAnalyzer and save its results.
|
|
|
|
`_all_arcs` is the set of arcs in the code.
|
|
|
|
"""
|
|
assert self._ast_root is not None
|
|
aaa = AstArcAnalyzer(self.filename, self._ast_root, self.raw_statements, self._multiline)
|
|
aaa.analyze()
|
|
arcs = aaa.arcs
|
|
if env.PYBEHAVIOR.exit_through_with:
|
|
self._with_jump_fixers = aaa.with_jump_fixers()
|
|
if self._with_jump_fixers:
|
|
arcs = self.fix_with_jumps(arcs)
|
|
|
|
self._all_arcs = set()
|
|
for l1, l2 in arcs:
|
|
fl1 = self.first_line(l1)
|
|
fl2 = self.first_line(l2)
|
|
if fl1 != fl2:
|
|
self._all_arcs.add((fl1, fl2))
|
|
|
|
self._missing_arc_fragments = aaa.missing_arc_fragments
|
|
|
|
def fix_with_jumps(self, arcs: Iterable[TArc]) -> set[TArc]:
|
|
"""Adjust arcs to fix jumps leaving `with` statements.
|
|
|
|
Consider this code:
|
|
|
|
with open("/tmp/test", "w") as f1:
|
|
a = 2
|
|
b = 3
|
|
print(4)
|
|
|
|
In 3.10+, we get traces for lines 1, 2, 3, 1, 4. But we want to present
|
|
it to the user as if it had been 1, 2, 3, 4. The arc 3->1 should be
|
|
replaced with 3->4, and 1->4 should be removed.
|
|
|
|
For this code, the fixers dict is {(3, 1): ((1, 4), (3, 4))}. The key
|
|
is the actual measured arc from the end of the with block back to the
|
|
start of the with-statement. The values are start_next (the with
|
|
statement to the next statement after the with), and end_next (the end
|
|
of the with-statement to the next statement after the with).
|
|
|
|
With nested with-statements, we have to trace through a few levels to
|
|
correct a longer chain of arcs.
|
|
|
|
"""
|
|
to_remove = set()
|
|
to_add = set()
|
|
for arc in arcs:
|
|
if arc in self._with_jump_fixers:
|
|
end0 = arc[0]
|
|
to_remove.add(arc)
|
|
start_next, end_next = self._with_jump_fixers[arc]
|
|
while start_next in self._with_jump_fixers:
|
|
to_remove.add(start_next)
|
|
start_next, end_next = self._with_jump_fixers[start_next]
|
|
to_remove.add(end_next)
|
|
to_add.add((end0, end_next[1]))
|
|
to_remove.add(start_next)
|
|
arcs = (set(arcs) | to_add) - to_remove
|
|
return arcs
|
|
|
|
@functools.lru_cache
|
|
def exit_counts(self) -> dict[TLineNo, int]:
|
|
"""Get a count of exits from that each line.
|
|
|
|
Excluded lines are excluded.
|
|
|
|
"""
|
|
exit_counts: dict[TLineNo, int] = collections.defaultdict(int)
|
|
for l1, l2 in self.arcs():
|
|
assert l1 > 0, f"{l1=} should be greater than zero in {self.filename}"
|
|
if l1 in self.excluded:
|
|
# Don't report excluded lines as line numbers.
|
|
continue
|
|
if l2 in self.excluded:
|
|
# Arcs to excluded lines shouldn't count.
|
|
continue
|
|
exit_counts[l1] += 1
|
|
|
|
return exit_counts
|
|
|
|
def _finish_action_msg(self, action_msg: str | None, end: TLineNo) -> str:
|
|
"""Apply some defaulting and formatting to an arc's description."""
|
|
if action_msg is None:
|
|
if end < 0:
|
|
action_msg = "jump to the function exit"
|
|
else:
|
|
action_msg = "jump to line {lineno}"
|
|
action_msg = action_msg.format(lineno=end)
|
|
return action_msg
|
|
|
|
def missing_arc_description(self, start: TLineNo, end: TLineNo) -> str:
|
|
"""Provide an English sentence describing a missing arc."""
|
|
if self._missing_arc_fragments is None:
|
|
self._analyze_ast()
|
|
assert self._missing_arc_fragments is not None
|
|
|
|
fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)])
|
|
|
|
msgs = []
|
|
for missing_cause_msg, action_msg in fragment_pairs:
|
|
action_msg = self._finish_action_msg(action_msg, end)
|
|
msg = f"line {start} didn't {action_msg}"
|
|
if missing_cause_msg is not None:
|
|
msg += f" because {missing_cause_msg.format(lineno=start)}"
|
|
|
|
msgs.append(msg)
|
|
|
|
return " or ".join(msgs)
|
|
|
|
def arc_description(self, start: TLineNo, end: TLineNo) -> str:
|
|
"""Provide an English description of an arc's effect."""
|
|
if self._missing_arc_fragments is None:
|
|
self._analyze_ast()
|
|
assert self._missing_arc_fragments is not None
|
|
|
|
fragment_pairs = self._missing_arc_fragments.get((start, end), [(None, None)])
|
|
action_msg = self._finish_action_msg(fragment_pairs[0][1], end)
|
|
return action_msg
|
|
|
|
|
|
class ByteParser:
|
|
"""Parse bytecode to understand the structure of code."""
|
|
|
|
def __init__(
|
|
self,
|
|
text: str,
|
|
code: CodeType | None = None,
|
|
filename: str | None = None,
|
|
) -> None:
|
|
self.text = text
|
|
if code is not None:
|
|
self.code = code
|
|
else:
|
|
assert filename is not None
|
|
# We only get here if earlier ast parsing succeeded, so no need to
|
|
# catch errors.
|
|
self.code = compile(text, filename, "exec", dont_inherit=True)
|
|
|
|
def child_parsers(self) -> Iterable[ByteParser]:
|
|
"""Iterate over all the code objects nested within this one.
|
|
|
|
The iteration includes `self` as its first value.
|
|
|
|
We skip code objects named `__annotate__` since they are deferred
|
|
annotations that usually are never run. If there are errors in the
|
|
annotations, they will be caught by type checkers or other tools that
|
|
use annotations.
|
|
|
|
"""
|
|
return (
|
|
ByteParser(self.text, code=c)
|
|
for c in code_objects(self.code)
|
|
if c.co_name != "__annotate__"
|
|
)
|
|
|
|
def _line_numbers(self) -> Iterable[TLineNo]:
|
|
"""Yield the line numbers possible in this code object.
|
|
|
|
Uses co_lnotab described in Python/compile.c to find the
|
|
line numbers. Produces a sequence: l0, l1, ...
|
|
"""
|
|
if hasattr(self.code, "co_lines"):
|
|
# PYVERSIONS: new in 3.10
|
|
for _, _, line in self.code.co_lines():
|
|
if line:
|
|
yield line
|
|
else:
|
|
# Adapted from dis.py in the standard library.
|
|
byte_increments = self.code.co_lnotab[0::2]
|
|
line_increments = self.code.co_lnotab[1::2]
|
|
|
|
last_line_num: TLineNo | None = None
|
|
line_num = self.code.co_firstlineno
|
|
byte_num = 0
|
|
for byte_incr, line_incr in zip(byte_increments, line_increments):
|
|
if byte_incr:
|
|
if line_num != last_line_num:
|
|
yield line_num
|
|
last_line_num = line_num
|
|
byte_num += byte_incr
|
|
if line_incr >= 0x80:
|
|
line_incr -= 0x100
|
|
line_num += line_incr
|
|
if line_num != last_line_num:
|
|
yield line_num
|
|
|
|
def _find_statements(self) -> Iterable[TLineNo]:
|
|
"""Find the statements in `self.code`.
|
|
|
|
Produce a sequence of line numbers that start statements. Recurses
|
|
into all code objects reachable from `self.code`.
|
|
|
|
"""
|
|
for bp in self.child_parsers():
|
|
# Get all of the lineno information from this code.
|
|
yield from bp._line_numbers()
|
|
|
|
|
|
#
|
|
# AST analysis
|
|
#
|
|
|
|
@dataclass(frozen=True, order=True)
|
|
class ArcStart:
|
|
"""The information needed to start an arc.
|
|
|
|
`lineno` is the line number the arc starts from.
|
|
|
|
`cause` is an English text fragment used as the `missing_cause_msg` for
|
|
AstArcAnalyzer.missing_arc_fragments. It will be used to describe why an
|
|
arc wasn't executed, so should fit well into a sentence of the form,
|
|
"Line 17 didn't run because {cause}." The fragment can include "{lineno}"
|
|
to have `lineno` interpolated into it.
|
|
|
|
As an example, this code::
|
|
|
|
if something(x): # line 1
|
|
func(x) # line 2
|
|
more_stuff() # line 3
|
|
|
|
would have two ArcStarts:
|
|
|
|
- ArcStart(1, "the condition on line 1 was always true")
|
|
- ArcStart(1, "the condition on line 1 was never true")
|
|
|
|
The first would be used to create an arc from 1 to 3, creating a message like
|
|
"line 1 didn't jump to line 3 because the condition on line 1 was always true."
|
|
|
|
The second would be used for the arc from 1 to 2, creating a message like
|
|
"line 1 didn't jump to line 2 because the condition on line 1 was never true."
|
|
|
|
"""
|
|
lineno: TLineNo
|
|
cause: str = ""
|
|
|
|
|
|
class TAddArcFn(Protocol):
|
|
"""The type for AstArcAnalyzer.add_arc()."""
|
|
def __call__(
|
|
self,
|
|
start: TLineNo,
|
|
end: TLineNo,
|
|
missing_cause_msg: str | None = None,
|
|
action_msg: str | None = None,
|
|
) -> None:
|
|
"""
|
|
Record an arc from `start` to `end`.
|
|
|
|
`missing_cause_msg` is a description of the reason the arc wasn't
|
|
taken if it wasn't taken. For example, "the condition on line 10 was
|
|
never true."
|
|
|
|
`action_msg` is a description of what the arc does, like "jump to line
|
|
10" or "exit from function 'fooey'."
|
|
|
|
"""
|
|
|
|
|
|
TArcFragments = dict[TArc, list[tuple[Optional[str], Optional[str]]]]
|
|
|
|
class Block:
|
|
"""
|
|
Blocks need to handle various exiting statements in their own ways.
|
|
|
|
All of these methods take a list of exits, and a callable `add_arc`
|
|
function that they can use to add arcs if needed. They return True if the
|
|
exits are handled, or False if the search should continue up the block
|
|
stack.
|
|
"""
|
|
# pylint: disable=unused-argument
|
|
def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
"""Process break exits."""
|
|
return False
|
|
|
|
def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
"""Process continue exits."""
|
|
return False
|
|
|
|
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
"""Process raise exits."""
|
|
return False
|
|
|
|
def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
"""Process return exits."""
|
|
return False
|
|
|
|
|
|
class LoopBlock(Block):
|
|
"""A block on the block stack representing a `for` or `while` loop."""
|
|
def __init__(self, start: TLineNo) -> None:
|
|
# The line number where the loop starts.
|
|
self.start = start
|
|
# A set of ArcStarts, the arcs from break statements exiting this loop.
|
|
self.break_exits: set[ArcStart] = set()
|
|
|
|
def process_break_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
self.break_exits.update(exits)
|
|
return True
|
|
|
|
def process_continue_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
for xit in exits:
|
|
add_arc(xit.lineno, self.start, xit.cause)
|
|
return True
|
|
|
|
|
|
class FunctionBlock(Block):
|
|
"""A block on the block stack representing a function definition."""
|
|
def __init__(self, start: TLineNo, name: str) -> None:
|
|
# The line number where the function starts.
|
|
self.start = start
|
|
# The name of the function.
|
|
self.name = name
|
|
|
|
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
for xit in exits:
|
|
add_arc(
|
|
xit.lineno, -self.start, xit.cause,
|
|
f"except from function {self.name!r}",
|
|
)
|
|
return True
|
|
|
|
def process_return_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
for xit in exits:
|
|
add_arc(
|
|
xit.lineno, -self.start, xit.cause,
|
|
f"return from function {self.name!r}",
|
|
)
|
|
return True
|
|
|
|
|
|
class TryBlock(Block):
|
|
"""A block on the block stack representing a `try` block."""
|
|
def __init__(self, handler_start: TLineNo | None, final_start: TLineNo | None) -> None:
|
|
# The line number of the first "except" handler, if any.
|
|
self.handler_start = handler_start
|
|
# The line number of the "finally:" clause, if any.
|
|
self.final_start = final_start
|
|
|
|
def process_raise_exits(self, exits: set[ArcStart], add_arc: TAddArcFn) -> bool:
|
|
if self.handler_start is not None:
|
|
for xit in exits:
|
|
add_arc(xit.lineno, self.handler_start, xit.cause)
|
|
return True
|
|
|
|
|
|
class NodeList(ast.AST):
|
|
"""A synthetic fictitious node, containing a sequence of nodes.
|
|
|
|
This is used when collapsing optimized if-statements, to represent the
|
|
unconditional execution of one of the clauses.
|
|
|
|
"""
|
|
def __init__(self, body: Sequence[ast.AST]) -> None:
|
|
self.body = body
|
|
self.lineno = body[0].lineno # type: ignore[attr-defined]
|
|
|
|
# TODO: Shouldn't the cause messages join with "and" instead of "or"?
|
|
|
|
|
|
def is_constant_test_expr(node: ast.AST) -> tuple[bool, bool]:
|
|
"""Is this a compile-time constant test expression?
|
|
|
|
We don't try to mimic all of CPython's optimizations. We just have to
|
|
handle the kinds of constant expressions people might actually use.
|
|
|
|
"""
|
|
if isinstance(node, ast.Constant):
|
|
return True, bool(node.value)
|
|
elif isinstance(node, ast.Name):
|
|
if node.id in ["True", "False", "None", "__debug__"]:
|
|
return True, eval(node.id) # pylint: disable=eval-used
|
|
elif isinstance(node, ast.UnaryOp) and isinstance(node.op, ast.Not):
|
|
is_constant, val = is_constant_test_expr(node.operand)
|
|
return is_constant, not val
|
|
elif isinstance(node, ast.BoolOp):
|
|
rets = [is_constant_test_expr(v) for v in node.values]
|
|
is_constant = all(is_const for is_const, _ in rets)
|
|
if is_constant:
|
|
op = any if isinstance(node.op, ast.Or) else all
|
|
return True, op(v for _, v in rets)
|
|
return False, False
|
|
|
|
|
|
class AstArcAnalyzer:
|
|
"""Analyze source text with an AST to find executable code paths.
|
|
|
|
The .analyze() method does the work, and populates these attributes:
|
|
|
|
`arcs`: a set of (from, to) pairs of the the arcs possible in the code.
|
|
|
|
`missing_arc_fragments`: a dict mapping (from, to) arcs to lists of
|
|
message fragments explaining why the arc is missing from execution::
|
|
|
|
{ (start, end): [(missing_cause_msg, action_msg), ...], }
|
|
|
|
For an arc starting from line 17, they should be usable to form complete
|
|
sentences like: "Line 17 didn't {action_msg} because {missing_cause_msg}".
|
|
|
|
NOTE: Starting in July 2024, I've been whittling this down to only report
|
|
arc that are part of true branches. It's not clear how far this work will
|
|
go.
|
|
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
filename: str,
|
|
root_node: ast.AST,
|
|
statements: set[TLineNo],
|
|
multiline: dict[TLineNo, TLineNo],
|
|
) -> None:
|
|
self.filename = filename
|
|
self.root_node = root_node
|
|
self.statements = {multiline.get(l, l) for l in statements}
|
|
self.multiline = multiline
|
|
|
|
# Turn on AST dumps with an environment variable.
|
|
# $set_env.py: COVERAGE_AST_DUMP - Dump the AST nodes when parsing code.
|
|
dump_ast = bool(int(os.getenv("COVERAGE_AST_DUMP", "0")))
|
|
|
|
if dump_ast: # pragma: debugging
|
|
# Dump the AST so that failing tests have helpful output.
|
|
print(f"Statements: {self.statements}")
|
|
print(f"Multiline map: {self.multiline}")
|
|
print(ast.dump(self.root_node, include_attributes=True, indent=4))
|
|
|
|
self.arcs: set[TArc] = set()
|
|
self.missing_arc_fragments: TArcFragments = collections.defaultdict(list)
|
|
self.block_stack: list[Block] = []
|
|
|
|
# If `with` clauses jump to their start on the way out, we need
|
|
# information to be able to skip over that jump. We record the arcs
|
|
# from `with` into the clause (with_entries), and the arcs from the
|
|
# clause to the `with` (with_exits).
|
|
self.current_with_starts: set[TLineNo] = set()
|
|
self.all_with_starts: set[TLineNo] = set()
|
|
self.with_entries: set[TArc] = set()
|
|
self.with_exits: set[TArc] = set()
|
|
|
|
# $set_env.py: COVERAGE_TRACK_ARCS - Trace possible arcs added while parsing code.
|
|
self.debug = bool(int(os.getenv("COVERAGE_TRACK_ARCS", "0")))
|
|
|
|
def analyze(self) -> None:
|
|
"""Examine the AST tree from `self.root_node` to determine possible arcs."""
|
|
for node in ast.walk(self.root_node):
|
|
node_name = node.__class__.__name__
|
|
code_object_handler = getattr(self, f"_code_object__{node_name}", None)
|
|
if code_object_handler is not None:
|
|
code_object_handler(node)
|
|
|
|
def with_jump_fixers(self) -> dict[TArc, tuple[TArc, TArc]]:
|
|
"""Get a dict with data for fixing jumps out of with statements.
|
|
|
|
Returns a dict. The keys are arcs leaving a with-statement by jumping
|
|
back to its start. The values are pairs: first, the arc from the start
|
|
to the next statement, then the arc that exits the with without going
|
|
to the start.
|
|
|
|
"""
|
|
fixers = {}
|
|
with_nexts = {
|
|
arc
|
|
for arc in self.arcs
|
|
if arc[0] in self.all_with_starts and arc not in self.with_entries
|
|
}
|
|
for start in self.all_with_starts:
|
|
nexts = {arc[1] for arc in with_nexts if arc[0] == start}
|
|
if not nexts:
|
|
continue
|
|
assert len(nexts) == 1, f"Expected one arc, got {nexts} with {start = }"
|
|
nxt = nexts.pop()
|
|
ends = {arc[0] for arc in self.with_exits if arc[1] == start}
|
|
for end in ends:
|
|
fixers[(end, start)] = ((start, nxt), (end, nxt))
|
|
return fixers
|
|
|
|
# Code object dispatchers: _code_object__*
|
|
#
|
|
# These methods are used by analyze() as the start of the analysis.
|
|
# There is one for each construct with a code object.
|
|
|
|
def _code_object__Module(self, node: ast.Module) -> None:
|
|
start = self.line_for_node(node)
|
|
if node.body:
|
|
exits = self.process_body(node.body)
|
|
for xit in exits:
|
|
self.add_arc(xit.lineno, -start, xit.cause, "exit the module")
|
|
else:
|
|
# Empty module.
|
|
self.add_arc(start, -start)
|
|
|
|
def _code_object__FunctionDef(self, node: ast.FunctionDef) -> None:
|
|
start = self.line_for_node(node)
|
|
self.block_stack.append(FunctionBlock(start=start, name=node.name))
|
|
exits = self.process_body(node.body)
|
|
self.process_return_exits(exits)
|
|
self.block_stack.pop()
|
|
|
|
_code_object__AsyncFunctionDef = _code_object__FunctionDef
|
|
|
|
def _code_object__ClassDef(self, node: ast.ClassDef) -> None:
|
|
start = self.line_for_node(node)
|
|
exits = self.process_body(node.body)
|
|
for xit in exits:
|
|
self.add_arc(xit.lineno, -start, xit.cause, f"exit class {node.name!r}")
|
|
|
|
def add_arc(
|
|
self,
|
|
start: TLineNo,
|
|
end: TLineNo,
|
|
missing_cause_msg: str | None = None,
|
|
action_msg: str | None = None,
|
|
) -> None:
|
|
"""Add an arc, including message fragments to use if it is missing."""
|
|
if self.debug: # pragma: debugging
|
|
print(f"Adding possible arc: ({start}, {end}): {missing_cause_msg!r}, {action_msg!r}")
|
|
print(short_stack(), end="\n\n")
|
|
self.arcs.add((start, end))
|
|
if start in self.current_with_starts:
|
|
self.with_entries.add((start, end))
|
|
|
|
if missing_cause_msg is not None or action_msg is not None:
|
|
self.missing_arc_fragments[(start, end)].append((missing_cause_msg, action_msg))
|
|
|
|
def nearest_blocks(self) -> Iterable[Block]:
|
|
"""Yield the blocks in nearest-to-farthest order."""
|
|
return reversed(self.block_stack)
|
|
|
|
def line_for_node(self, node: ast.AST) -> TLineNo:
|
|
"""What is the right line number to use for this node?
|
|
|
|
This dispatches to _line__Node functions where needed.
|
|
|
|
"""
|
|
node_name = node.__class__.__name__
|
|
handler = cast(
|
|
Optional[Callable[[ast.AST], TLineNo]],
|
|
getattr(self, f"_line__{node_name}", None),
|
|
)
|
|
if handler is not None:
|
|
line = handler(node)
|
|
else:
|
|
line = node.lineno # type: ignore[attr-defined]
|
|
return self.multiline.get(line, line)
|
|
|
|
# First lines: _line__*
|
|
#
|
|
# Dispatched by line_for_node, each method knows how to identify the first
|
|
# line number in the node, as Python will report it.
|
|
|
|
def _line_decorated(self, node: ast.FunctionDef) -> TLineNo:
|
|
"""Compute first line number for things that can be decorated (classes and functions)."""
|
|
if node.decorator_list:
|
|
lineno = node.decorator_list[0].lineno
|
|
else:
|
|
lineno = node.lineno
|
|
return lineno
|
|
|
|
def _line__Assign(self, node: ast.Assign) -> TLineNo:
|
|
return self.line_for_node(node.value)
|
|
|
|
_line__ClassDef = _line_decorated
|
|
|
|
def _line__Dict(self, node: ast.Dict) -> TLineNo:
|
|
if node.keys:
|
|
if node.keys[0] is not None:
|
|
return node.keys[0].lineno
|
|
else:
|
|
# Unpacked dict literals `{**{"a":1}}` have None as the key,
|
|
# use the value in that case.
|
|
return node.values[0].lineno
|
|
else:
|
|
return node.lineno
|
|
|
|
_line__FunctionDef = _line_decorated
|
|
_line__AsyncFunctionDef = _line_decorated
|
|
|
|
def _line__List(self, node: ast.List) -> TLineNo:
|
|
if node.elts:
|
|
return self.line_for_node(node.elts[0])
|
|
else:
|
|
return node.lineno
|
|
|
|
def _line__Module(self, node: ast.Module) -> TLineNo:
|
|
if env.PYBEHAVIOR.module_firstline_1:
|
|
return 1
|
|
elif node.body:
|
|
return self.line_for_node(node.body[0])
|
|
else:
|
|
# Empty modules have no line number, they always start at 1.
|
|
return 1
|
|
|
|
# The node types that just flow to the next node with no complications.
|
|
OK_TO_DEFAULT = {
|
|
"AnnAssign", "Assign", "Assert", "AugAssign", "Delete", "Expr", "Global",
|
|
"Import", "ImportFrom", "Nonlocal", "Pass",
|
|
}
|
|
|
|
def node_exits(self, node: ast.AST) -> set[ArcStart]:
|
|
"""Find the set of arc starts that exit this node.
|
|
|
|
Return a set of ArcStarts, exits from this node to the next. Because a
|
|
node represents an entire sub-tree (including its children), the exits
|
|
from a node can be arbitrarily complex::
|
|
|
|
if something(1):
|
|
if other(2):
|
|
doit(3)
|
|
else:
|
|
doit(5)
|
|
|
|
There are three exits from line 1: they start at lines 1, 3 and 5.
|
|
There are two exits from line 2: lines 3 and 5.
|
|
|
|
"""
|
|
node_name = node.__class__.__name__
|
|
handler = cast(
|
|
Optional[Callable[[ast.AST], set[ArcStart]]],
|
|
getattr(self, f"_handle__{node_name}", None),
|
|
)
|
|
if handler is not None:
|
|
arc_starts = handler(node)
|
|
else:
|
|
# No handler: either it's something that's ok to default (a simple
|
|
# statement), or it's something we overlooked.
|
|
if env.TESTING:
|
|
if node_name not in self.OK_TO_DEFAULT:
|
|
raise RuntimeError(f"*** Unhandled: {node}") # pragma: only failure
|
|
|
|
# Default for simple statements: one exit from this node.
|
|
arc_starts = {ArcStart(self.line_for_node(node))}
|
|
return arc_starts
|
|
|
|
def process_body(
|
|
self,
|
|
body: Sequence[ast.AST],
|
|
from_start: ArcStart | None = None,
|
|
prev_starts: set[ArcStart] | None = None,
|
|
) -> set[ArcStart]:
|
|
"""Process the body of a compound statement.
|
|
|
|
`body` is the body node to process.
|
|
|
|
`from_start` is a single `ArcStart` that starts an arc into this body.
|
|
`prev_starts` is a set of ArcStarts that can all be the start of arcs
|
|
into this body. Only one of `from_start` and `prev_starts` should be
|
|
given.
|
|
|
|
Records arcs within the body by calling `self.add_arc`.
|
|
|
|
Returns a set of ArcStarts, the exits from this body.
|
|
|
|
"""
|
|
if prev_starts is None:
|
|
if from_start is None:
|
|
prev_starts = set()
|
|
else:
|
|
prev_starts = {from_start}
|
|
else:
|
|
assert from_start is None
|
|
|
|
# Loop over the nodes in the body, making arcs from each one's exits to
|
|
# the next node.
|
|
for body_node in body:
|
|
lineno = self.line_for_node(body_node)
|
|
if lineno not in self.statements:
|
|
maybe_body_node = self.find_non_missing_node(body_node)
|
|
if maybe_body_node is None:
|
|
continue
|
|
body_node = maybe_body_node
|
|
lineno = self.line_for_node(body_node)
|
|
for prev_start in prev_starts:
|
|
self.add_arc(prev_start.lineno, lineno, prev_start.cause)
|
|
prev_starts = self.node_exits(body_node)
|
|
return prev_starts
|
|
|
|
def find_non_missing_node(self, node: ast.AST) -> ast.AST | None:
|
|
"""Search `node` looking for a child that has not been optimized away.
|
|
|
|
This might return the node you started with, or it will work recursively
|
|
to find a child node in self.statements.
|
|
|
|
Returns a node, or None if none of the node remains.
|
|
|
|
"""
|
|
# This repeats work just done in process_body, but this duplication
|
|
# means we can avoid a function call in the 99.9999% case of not
|
|
# optimizing away statements.
|
|
lineno = self.line_for_node(node)
|
|
if lineno in self.statements:
|
|
return node
|
|
|
|
missing_fn = cast(
|
|
Optional[Callable[[ast.AST], Optional[ast.AST]]],
|
|
getattr(self, f"_missing__{node.__class__.__name__}", None),
|
|
)
|
|
if missing_fn is not None:
|
|
ret_node = missing_fn(node)
|
|
else:
|
|
ret_node = None
|
|
return ret_node
|
|
|
|
# Missing nodes: _missing__*
|
|
#
|
|
# Entire statements can be optimized away by Python. They will appear in
|
|
# the AST, but not the bytecode. These functions are called (by
|
|
# find_non_missing_node) to find a node to use instead of the missing
|
|
# node. They can return None if the node should truly be gone.
|
|
|
|
def _missing__If(self, node: ast.If) -> ast.AST | None:
|
|
# If the if-node is missing, then one of its children might still be
|
|
# here, but not both. So return the first of the two that isn't missing.
|
|
# Use a NodeList to hold the clauses as a single node.
|
|
non_missing = self.find_non_missing_node(NodeList(node.body))
|
|
if non_missing:
|
|
return non_missing
|
|
if node.orelse:
|
|
return self.find_non_missing_node(NodeList(node.orelse))
|
|
return None
|
|
|
|
def _missing__NodeList(self, node: NodeList) -> ast.AST | None:
|
|
# A NodeList might be a mixture of missing and present nodes. Find the
|
|
# ones that are present.
|
|
non_missing_children = []
|
|
for child in node.body:
|
|
maybe_child = self.find_non_missing_node(child)
|
|
if maybe_child is not None:
|
|
non_missing_children.append(maybe_child)
|
|
|
|
# Return the simplest representation of the present children.
|
|
if not non_missing_children:
|
|
return None
|
|
if len(non_missing_children) == 1:
|
|
return non_missing_children[0]
|
|
return NodeList(non_missing_children)
|
|
|
|
def _missing__While(self, node: ast.While) -> ast.AST | None:
|
|
body_nodes = self.find_non_missing_node(NodeList(node.body))
|
|
if not body_nodes:
|
|
return None
|
|
# Make a synthetic While-true node.
|
|
new_while = ast.While() # type: ignore[call-arg]
|
|
new_while.lineno = body_nodes.lineno # type: ignore[attr-defined]
|
|
new_while.test = ast.Name() # type: ignore[call-arg]
|
|
new_while.test.lineno = body_nodes.lineno # type: ignore[attr-defined]
|
|
new_while.test.id = "True"
|
|
assert hasattr(body_nodes, "body")
|
|
new_while.body = body_nodes.body
|
|
new_while.orelse = []
|
|
return new_while
|
|
|
|
# In the fullness of time, these might be good tests to write:
|
|
# while EXPR:
|
|
# while False:
|
|
# listcomps hidden deep in other expressions
|
|
# listcomps hidden in lists: x = [[i for i in range(10)]]
|
|
# nested function definitions
|
|
|
|
# Exit processing: process_*_exits
|
|
#
|
|
# These functions process the four kinds of jump exits: break, continue,
|
|
# raise, and return. To figure out where an exit goes, we have to look at
|
|
# the block stack context. For example, a break will jump to the nearest
|
|
# enclosing loop block, or the nearest enclosing finally block, whichever
|
|
# is nearer.
|
|
|
|
def process_break_exits(self, exits: set[ArcStart]) -> None:
|
|
"""Add arcs due to jumps from `exits` being breaks."""
|
|
for block in self.nearest_blocks(): # pragma: always breaks
|
|
if block.process_break_exits(exits, self.add_arc):
|
|
break
|
|
|
|
def process_continue_exits(self, exits: set[ArcStart]) -> None:
|
|
"""Add arcs due to jumps from `exits` being continues."""
|
|
for block in self.nearest_blocks(): # pragma: always breaks
|
|
if block.process_continue_exits(exits, self.add_arc):
|
|
break
|
|
|
|
def process_raise_exits(self, exits: set[ArcStart]) -> None:
|
|
"""Add arcs due to jumps from `exits` being raises."""
|
|
for block in self.nearest_blocks():
|
|
if block.process_raise_exits(exits, self.add_arc):
|
|
break
|
|
|
|
def process_return_exits(self, exits: set[ArcStart]) -> None:
|
|
"""Add arcs due to jumps from `exits` being returns."""
|
|
for block in self.nearest_blocks(): # pragma: always breaks
|
|
if block.process_return_exits(exits, self.add_arc):
|
|
break
|
|
|
|
# Node handlers: _handle__*
|
|
#
|
|
# Each handler deals with a specific AST node type, dispatched from
|
|
# node_exits. Handlers return the set of exits from that node, and can
|
|
# also call self.add_arc to record arcs they find. These functions mirror
|
|
# the Python semantics of each syntactic construct. See the docstring
|
|
# for node_exits to understand the concept of exits from a node.
|
|
#
|
|
# Every node type that represents a statement should have a handler, or it
|
|
# should be listed in OK_TO_DEFAULT.
|
|
|
|
def _handle__Break(self, node: ast.Break) -> set[ArcStart]:
|
|
here = self.line_for_node(node)
|
|
break_start = ArcStart(here, cause="the break on line {lineno} wasn't executed")
|
|
self.process_break_exits({break_start})
|
|
return set()
|
|
|
|
def _handle_decorated(self, node: ast.FunctionDef) -> set[ArcStart]:
|
|
"""Add arcs for things that can be decorated (classes and functions)."""
|
|
main_line: TLineNo = node.lineno
|
|
last: TLineNo | None = node.lineno
|
|
decs = node.decorator_list
|
|
if decs:
|
|
last = None
|
|
for dec_node in decs:
|
|
dec_start = self.line_for_node(dec_node)
|
|
if last is not None and dec_start != last:
|
|
self.add_arc(last, dec_start)
|
|
last = dec_start
|
|
assert last is not None
|
|
self.add_arc(last, main_line)
|
|
last = main_line
|
|
# The definition line may have been missed, but we should have it
|
|
# in `self.statements`. For some constructs, `line_for_node` is
|
|
# not what we'd think of as the first line in the statement, so map
|
|
# it to the first one.
|
|
assert node.body, f"Oops: {node.body = } in {self.filename}@{node.lineno}"
|
|
# The body is handled in collect_arcs.
|
|
assert last is not None
|
|
return {ArcStart(last)}
|
|
|
|
_handle__ClassDef = _handle_decorated
|
|
|
|
def _handle__Continue(self, node: ast.Continue) -> set[ArcStart]:
|
|
here = self.line_for_node(node)
|
|
continue_start = ArcStart(here, cause="the continue on line {lineno} wasn't executed")
|
|
self.process_continue_exits({continue_start})
|
|
return set()
|
|
|
|
def _handle__For(self, node: ast.For) -> set[ArcStart]:
|
|
start = self.line_for_node(node.iter)
|
|
self.block_stack.append(LoopBlock(start=start))
|
|
from_start = ArcStart(start, cause="the loop on line {lineno} never started")
|
|
exits = self.process_body(node.body, from_start=from_start)
|
|
# Any exit from the body will go back to the top of the loop.
|
|
for xit in exits:
|
|
self.add_arc(xit.lineno, start, xit.cause)
|
|
my_block = self.block_stack.pop()
|
|
assert isinstance(my_block, LoopBlock)
|
|
exits = my_block.break_exits
|
|
from_start = ArcStart(start, cause="the loop on line {lineno} didn't complete")
|
|
if node.orelse:
|
|
else_exits = self.process_body(node.orelse, from_start=from_start)
|
|
exits |= else_exits
|
|
else:
|
|
# No else clause: exit from the for line.
|
|
exits.add(from_start)
|
|
return exits
|
|
|
|
_handle__AsyncFor = _handle__For
|
|
|
|
_handle__FunctionDef = _handle_decorated
|
|
_handle__AsyncFunctionDef = _handle_decorated
|
|
|
|
def _handle__If(self, node: ast.If) -> set[ArcStart]:
|
|
start = self.line_for_node(node.test)
|
|
constant_test, val = is_constant_test_expr(node.test)
|
|
exits = set()
|
|
if not constant_test or val:
|
|
from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
|
|
exits |= self.process_body(node.body, from_start=from_start)
|
|
if not constant_test or not val:
|
|
from_start = ArcStart(start, cause="the condition on line {lineno} was always true")
|
|
exits |= self.process_body(node.orelse, from_start=from_start)
|
|
return exits
|
|
|
|
if sys.version_info >= (3, 10):
|
|
def _handle__Match(self, node: ast.Match) -> set[ArcStart]:
|
|
start = self.line_for_node(node)
|
|
last_start = start
|
|
exits = set()
|
|
for case in node.cases:
|
|
case_start = self.line_for_node(case.pattern)
|
|
self.add_arc(last_start, case_start, "the pattern on line {lineno} always matched")
|
|
from_start = ArcStart(
|
|
case_start,
|
|
cause="the pattern on line {lineno} never matched",
|
|
)
|
|
exits |= self.process_body(case.body, from_start=from_start)
|
|
last_start = case_start
|
|
|
|
# case is now the last case, check for wildcard match.
|
|
pattern = case.pattern # pylint: disable=undefined-loop-variable
|
|
while isinstance(pattern, ast.MatchOr):
|
|
pattern = pattern.patterns[-1]
|
|
while isinstance(pattern, ast.MatchAs) and pattern.pattern is not None:
|
|
pattern = pattern.pattern
|
|
had_wildcard = (
|
|
isinstance(pattern, ast.MatchAs)
|
|
and pattern.pattern is None
|
|
and case.guard is None # pylint: disable=undefined-loop-variable
|
|
)
|
|
|
|
if not had_wildcard:
|
|
exits.add(
|
|
ArcStart(case_start, cause="the pattern on line {lineno} always matched"),
|
|
)
|
|
return exits
|
|
|
|
def _handle__NodeList(self, node: NodeList) -> set[ArcStart]:
|
|
start = self.line_for_node(node)
|
|
exits = self.process_body(node.body, from_start=ArcStart(start))
|
|
return exits
|
|
|
|
def _handle__Raise(self, node: ast.Raise) -> set[ArcStart]:
|
|
here = self.line_for_node(node)
|
|
raise_start = ArcStart(here, cause="the raise on line {lineno} wasn't executed")
|
|
self.process_raise_exits({raise_start})
|
|
# `raise` statement jumps away, no exits from here.
|
|
return set()
|
|
|
|
def _handle__Return(self, node: ast.Return) -> set[ArcStart]:
|
|
here = self.line_for_node(node)
|
|
return_start = ArcStart(here, cause="the return on line {lineno} wasn't executed")
|
|
self.process_return_exits({return_start})
|
|
# `return` statement jumps away, no exits from here.
|
|
return set()
|
|
|
|
def _handle__Try(self, node: ast.Try) -> set[ArcStart]:
|
|
if node.handlers:
|
|
handler_start = self.line_for_node(node.handlers[0])
|
|
else:
|
|
handler_start = None
|
|
|
|
if node.finalbody:
|
|
final_start = self.line_for_node(node.finalbody[0])
|
|
else:
|
|
final_start = None
|
|
|
|
# This is true by virtue of Python syntax: have to have either except
|
|
# or finally, or both.
|
|
assert handler_start is not None or final_start is not None
|
|
try_block = TryBlock(handler_start, final_start)
|
|
self.block_stack.append(try_block)
|
|
|
|
start = self.line_for_node(node)
|
|
exits = self.process_body(node.body, from_start=ArcStart(start))
|
|
|
|
# We're done with the `try` body, so this block no longer handles
|
|
# exceptions. We keep the block so the `finally` clause can pick up
|
|
# flows from the handlers and `else` clause.
|
|
if node.finalbody:
|
|
try_block.handler_start = None
|
|
else:
|
|
self.block_stack.pop()
|
|
|
|
handler_exits: set[ArcStart] = set()
|
|
|
|
if node.handlers:
|
|
for handler_node in node.handlers:
|
|
handler_start = self.line_for_node(handler_node)
|
|
from_cause = "the exception caught by line {lineno} didn't happen"
|
|
from_start = ArcStart(handler_start, cause=from_cause)
|
|
handler_exits |= self.process_body(handler_node.body, from_start=from_start)
|
|
|
|
if node.orelse:
|
|
exits = self.process_body(node.orelse, prev_starts=exits)
|
|
|
|
exits |= handler_exits
|
|
|
|
if node.finalbody:
|
|
self.block_stack.pop()
|
|
final_from = exits
|
|
|
|
final_exits = self.process_body(node.finalbody, prev_starts=final_from)
|
|
|
|
if exits:
|
|
# The finally clause's exits are only exits for the try block
|
|
# as a whole if the try block had some exits to begin with.
|
|
exits = final_exits
|
|
|
|
return exits
|
|
|
|
def _handle__While(self, node: ast.While) -> set[ArcStart]:
|
|
start = to_top = self.line_for_node(node.test)
|
|
constant_test, _ = is_constant_test_expr(node.test)
|
|
top_is_body0 = False
|
|
if constant_test:
|
|
top_is_body0 = True
|
|
if env.PYBEHAVIOR.keep_constant_test:
|
|
top_is_body0 = False
|
|
if top_is_body0:
|
|
to_top = self.line_for_node(node.body[0])
|
|
self.block_stack.append(LoopBlock(start=to_top))
|
|
from_start = ArcStart(start, cause="the condition on line {lineno} was never true")
|
|
exits = self.process_body(node.body, from_start=from_start)
|
|
for xit in exits:
|
|
self.add_arc(xit.lineno, to_top, xit.cause)
|
|
exits = set()
|
|
my_block = self.block_stack.pop()
|
|
assert isinstance(my_block, LoopBlock)
|
|
exits.update(my_block.break_exits)
|
|
from_start = ArcStart(start, cause="the condition on line {lineno} was always true")
|
|
if node.orelse:
|
|
else_exits = self.process_body(node.orelse, from_start=from_start)
|
|
exits |= else_exits
|
|
else:
|
|
# No `else` clause: you can exit from the start.
|
|
if not constant_test:
|
|
exits.add(from_start)
|
|
return exits
|
|
|
|
def _handle__With(self, node: ast.With) -> set[ArcStart]:
|
|
if env.PYBEHAVIOR.exit_with_through_ctxmgr:
|
|
starts = [self.line_for_node(item.context_expr) for item in node.items]
|
|
else:
|
|
starts = [self.line_for_node(node)]
|
|
if env.PYBEHAVIOR.exit_through_with:
|
|
for start in starts:
|
|
self.current_with_starts.add(start)
|
|
self.all_with_starts.add(start)
|
|
|
|
exits = self.process_body(node.body, from_start=ArcStart(starts[-1]))
|
|
|
|
if env.PYBEHAVIOR.exit_through_with:
|
|
start = starts[-1]
|
|
self.current_with_starts.remove(start)
|
|
with_exit = {ArcStart(start)}
|
|
if exits:
|
|
for xit in exits:
|
|
self.add_arc(xit.lineno, start)
|
|
self.with_exits.add((xit.lineno, start))
|
|
exits = with_exit
|
|
|
|
return exits
|
|
|
|
_handle__AsyncWith = _handle__With
|