Source code for grammarinator.tool.parser

# Copyright (c) 2018-2025 Renata Hodovan, Akos Kiss.
#
# Licensed under the BSD 3-Clause License
# <LICENSE.rst or https://opensource.org/licenses/BSD-3-Clause>.
# This file may not be copied, modified, or distributed except
# according to those terms.

import itertools
import logging
import math
import os
import shutil
import sys

from math import inf
from os import listdir
from os.path import basename, commonprefix, split, splitext
from subprocess import CalledProcessError, PIPE, run
from typing import Callable, Iterable

from antlr4 import CommonTokenStream, error, FileStream, InputStream, Lexer, ParseTreeListener, Parser, ParserRuleContext, TerminalNode, Token

from ..runtime import Population, Rule, RuleSize, UnlexerRule, UnparserRule, UnparserRuleAlternative, UnparserRuleQuantified, UnparserRuleQuantifier
from .processor import AlternationNode, AlternativeNode, LambdaNode, ProcessorTool, QuantifierNode, UnlexerRuleNode, UnparserRuleNode


logger = logging.getLogger(__name__)


# Override ConsoleErrorListener to suppress parse issues in non-verbose mode.
class ConsoleListener(error.ErrorListener.ConsoleErrorListener):
    def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
        logger.info('line %d:%d %s', line, column, msg)


error.ErrorListener.ConsoleErrorListener.INSTANCE = ConsoleListener()


class ExtendedErrorListener(error.ErrorListener.ErrorListener):
    """
    Custom error listener for the ANTLR lexer ensuring to insert the
    unrecognized tokens into the tree as well.
    """
    def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
        recognizer.inputStream.consume()
        recognizer.type = Token.INVALID_TYPE
        recognizer.channel = Token.DEFAULT_CHANNEL
        recognizer.emit()
        recognizer.type = Token.MIN_USER_TOKEN_TYPE


[docs] class ParserTool: """ Tool to parse existing sources and create a tree pool from them. These trees can be reused later by generation. """ def __init__(self, grammars: list[str], parser_dir: str, antlr: str, population: Population | None, rule: str | None = None, hidden: list[str] | None = None, transformers: list[Callable[[Rule], Rule]] | None = None, max_depth: int | float = RuleSize.max.depth, strict: bool = False, lib_dir: str | None = None, cleanup: bool = True, encoding: str = 'utf-8', errors: str = 'strict'): """ :param grammars: List of resources (grammars and additional sources) needed to parse the input. :param parser_dir: Directory where grammars and the generated parser will be placed. :param antlr: Path to the ANTLR4 tool (Java jar binary). :param population: Tree pool where the trees will be saved, e.g., an instance of :class:`FilePopulation`. :param rule: Name of the rule to start parsing with (default: first parser rule in the grammar). :param hidden: List of hidden rule names that are expected to be added to the grammar tree (hidden rules are skipped by default). :param transformers: List of transformers to be applied to postprocess the parsed tree before serializing it. :param max_depth: Maximum depth of trees. Deeper trees are not saved. :param strict: Tests that contain syntax errors are discarded. :param lib_dir: Alternative directory to look for grammar imports beside the current working directory. :param cleanup: Boolean to enable the removal of the helper parser resources after processing the inputs. :param encoding: Encoding of the input file. :param errors: Encoding error handling scheme. """ self._population = population self._hidden = hidden or [] self._transformers = transformers or [] self._max_depth = max_depth self._strict = strict self._cleanup = cleanup self._encoding = encoding self._errors = errors self._parser_dir = parser_dir os.makedirs(self._parser_dir, exist_ok=True) lexer_root, parser_root = ProcessorTool.parse_grammars(grammars, parser_dir, encoding, errors, lib_dir) self._graph = ProcessorTool.build_graph(False, lexer_root, parser_root, None, rule) for i, grammar in enumerate(grammars): shutil.copy(grammar, self._parser_dir) grammars[i] = basename(grammar) self._lexer_cls, self._parser_cls, self._listener_cls = self._build_grammars(grammars, self._parser_dir, antlr, lib_dir) self._rule = rule or self._parser_cls.ruleNames[0] def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): if self._cleanup: shutil.rmtree(self._parser_dir, ignore_errors=True) @staticmethod def _build_grammars(in_files: list[str], out: str, antlr: str, lib_dir: str | None = None) -> tuple[type[Lexer], type[Parser], type[ParseTreeListener]]: """ Build lexer and grammar from ANTLRv4 grammar files in Python3 target. :param in_files: List resources (grammars and additional sources) needed to parse the input. :param out: Directory where grammars are placed and where the output will be generated to. :param antlr: Path to the ANTLR4 tool (Java jar binary). :param lib_dir: Alternative directory to look for grammar imports beside the current working directory. :return: Tuple of references/names of the lexer, parser and listener classes of the target. """ try: # TODO: support Java parsers too. languages = { 'python': {'antlr_arg': '-Dlanguage=Python3', 'ext': 'py', 'listener_format': 'Listener'} } grammars = tuple(fn for fn in in_files if fn.endswith('.g4')) # Generate parser and lexer in the target language and return either with # python class ref or the name of java classes. try: run(('java', '-jar', antlr, languages['python']['antlr_arg']) + (('-lib', lib_dir) if lib_dir else ()) + grammars, stdout=PIPE, stderr=PIPE, cwd=out, check=True) except CalledProcessError as e: logger.error('Building grammars %r failed!\n%s\n%s\n', grammars, e.stdout.decode('utf-8', 'ignore'), e.stderr.decode('utf-8', 'ignore')) raise files = set(listdir(out)) - set(in_files) filename = basename(grammars[0]) def file_endswith(end_pattern): return splitext(split(list( filter(lambda x: len(commonprefix([filename, x])) > 0 and x.endswith(end_pattern), files))[0])[1])[0] # Extract the name of lexer and parser from their path. lexer = file_endswith(f'Lexer.{languages["python"]["ext"]}') parser = file_endswith(f'Parser.{languages["python"]["ext"]}') # The name of the generated listeners differs if Python or other language target is used. listener = file_endswith(f'{languages["python"]["listener_format"]}.{languages["python"]["ext"]}') # Add the path of the built lexer and parser to the Python path to be available for importing. if out not in sys.path: sys.path.append(out) return tuple(getattr(__import__(x, globals(), locals(), [x], 0), x) for x in [lexer, parser, listener]) except Exception as e: logger.error('Exception while loading parser modules', exc_info=e) raise # TODO: Typing of _antlr_to_grammarinator_tree is a nightmare, hence it's postponed. def _antlr_to_grammarinator_tree(self, antlr_node, parser, visited=None): """ Convert an ANTRL tree to Grammarinator tree. :param antlr4.ParserRuleContext or antlr4.TerminalNode antlr_node: Root of ANTLR tree to convert. :param antlr4.Parser parser: Parser object that created the ANTLR tree. :param set visited: Set of visited ANTLR nodes in the tree to be transformed. """ rules = set() if visited is None: visited = set() if isinstance(antlr_node, ParserRuleContext): rule_name = parser.ruleNames[antlr_node.getRuleIndex()] class_name = antlr_node.__class__.__name__ # Temporary use tuples as rule names to ease their comparison with grammar nodes, # while adjusting the decision nodes. However, they will be stringified eventually. node = UnparserRule(name=(rule_name,)) rules.add(node) parent_node = node # Check if the rule is a labeled alternative. if class_name.endswith('Context') and class_name.lower() != rule_name.lower() + 'context': alt_name = class_name[:-len('Context')] labeled_alt_node = UnparserRule(name=(rule_name, alt_name[0].upper() + alt_name[1:])) rules.add(labeled_alt_node) node += labeled_alt_node parent_node = labeled_alt_node assert node.name, 'Node name of a parser rule is empty or None.' depth = 0 for antlr_child in (antlr_node.children or []): child, child_depth, child_rules = self._antlr_to_grammarinator_tree(antlr_child, parser, visited) if not child: continue rules.update(child_rules) parent_node += child depth = max(depth, child_depth + 1) else: assert isinstance(antlr_node, TerminalNode), f'An ANTLR node must either be a ParserRuleContext or a TerminalNode but {antlr_node.__class__.__name__} was found.' name, text = parser.symbolicNames[antlr_node.symbol.type] if len(parser.symbolicNames) > antlr_node.symbol.type else '<INVALID>', antlr_node.symbol.text assert name, f'{name} is None or empty' if (antlr_node.symbol.type == Token.EOF # The DefaultErrorStrategy of ANTLR creates and adds artificial tokens # to the parse tree if it sees that even if a token is missing but the next # one in the stream is what it was assumed. In such cases, it creates a # CommonToken with the prefix '<missing ' and postfix '>'. However, these # must be filtered out, since it would cause extra tokens in our test cases. # Link: https://github.com/antlr/antlr4/blob/67228355c5bfd1ed5ebb89e726992ec43dda7b53/runtime/Java/src/org/antlr/v4/runtime/DefaultErrorStrategy.java#L583 or text.startswith('<missing') and text.endswith('>')): return None, 0, [] if not self._hidden: node = UnlexerRule(name=(name,), src=text, immutable=(name,) in self._graph.immutables or name == '<INVALID>') rules.add(node) else: node = [] hidden_tokens_to_left = parser.getTokenStream().getHiddenTokensToLeft(antlr_node.symbol.tokenIndex, -1) or [] for token in hidden_tokens_to_left: if parser.symbolicNames[token.type] in self._hidden: if token not in visited: hidden_name = (parser.symbolicNames[token.type],) node.append(UnlexerRule(name=hidden_name, src=token.text, immutable=hidden_name in self._graph.immutables)) visited.add(token) node.append(UnlexerRule(name=(name,), src=text, immutable=(name,) in self._graph.immutables or name == '<INVALID>')) hidden_tokens_to_right = parser.getTokenStream().getHiddenTokensToRight(antlr_node.symbol.tokenIndex, -1) or [] for token in hidden_tokens_to_right: if parser.symbolicNames[token.type] in self._hidden: if token not in visited: hidden_name = (parser.symbolicNames[token.type],) node.append(UnlexerRule(name=hidden_name, src=token.text, immutable=hidden_name in self._graph.immutables)) visited.add(token) rules.update(node) depth = 0 return node, depth, rules # The parse trees generated by the ANTLR parser consist solely of a rule hierarchy, lacking # information about the decisions made during parsing. As a result, they do not include # alternative, quantifier, or quantified nodes in the output tree, unlike the generator tool. # The function below reconstructs such structures. The concept is that since we can assign # to every tree node a grammar rule that produced it, it is sufficient to perform the # matching - and hence the reconstruction - on a single rule level. In other words, there is # no need to recursively match the entire tree. def _adjust_tree_to_generator(self, rules: Iterable[UnlexerRule | UnparserRule]) -> None: def _adjust_rule(rule): def _match_seq(grammar_vertices, tree_node_pos): seq_children = [] for vertex_pos, vertex in enumerate(grammar_vertices): if vertex is None: # end-of-rule marker return seq_children if tree_node_pos == len(tree_nodes) else None, tree_node_pos if isinstance(vertex, LambdaNode): continue if isinstance(vertex, UnparserRuleNode): if tree_node_pos < len(tree_nodes) and isinstance(tree_nodes[tree_node_pos], UnparserRule) and vertex.name == '_'.join(tree_nodes[tree_node_pos].name): seq_children += [tree_nodes[tree_node_pos]] tree_node_pos += 1 continue return None, tree_node_pos if isinstance(vertex, UnlexerRuleNode): if (tree_node_pos < len(tree_nodes) and isinstance(tree_nodes[tree_node_pos], UnlexerRule) and (vertex.name == '_'.join(tree_nodes[tree_node_pos].name) or tree_nodes[tree_node_pos].name == ('<INVALID>',) and vertex.name.startswith('T__') and tree_nodes[tree_node_pos].src == vertex.out_neighbours[0].src)): seq_children += [tree_nodes[tree_node_pos]] tree_node_pos += 1 continue return None, tree_node_pos if isinstance(vertex, AlternationNode): for alternative_vertex in vertex.out_neighbours: assert isinstance(alternative_vertex, AlternativeNode), alternative_vertex out_neighbours = alternative_vertex.out_neighbours # If the next alternative is a labelled alternative with recurring name, then # compare the tree nodes to the children of this alternative. if (len(out_neighbours) == 1 and isinstance(out_neighbours[0], UnparserRuleNode) and len(out_neighbours[0].id) == 3 and out_neighbours[0].name == '_'.join(vertex.rule_id)): out_neighbours = out_neighbours[0].out_neighbours alt_children, alt_tree_node_pos = _match_seq(out_neighbours, tree_node_pos) if alt_children is not None: rest_children, rest_tree_node_pos = _match_seq(grammar_vertices[vertex_pos + 1:], alt_tree_node_pos) if rest_children is not None: return seq_children + [(UnparserRuleAlternative(alt_idx=alternative_vertex.alt_idx, idx=alternative_vertex.idx), alt_children)] + rest_children, rest_tree_node_pos return None, tree_node_pos if isinstance(vertex, QuantifierNode): quantifier_children = [] for _ in range(0, int(vertex.start)): quant_children, quant_tree_node_pos = _match_seq(vertex.out_neighbours, tree_node_pos) if quant_children is None: return None, tree_node_pos quantifier_children += [(UnparserRuleQuantified(), quant_children)] tree_node_pos = quant_tree_node_pos # Track the positions of quantified subexpressions for potential backtracking. backtrack_positions = [] for _ in range(int(vertex.start), int(vertex.stop)) if vertex.stop != inf else itertools.count(): quant_children, quant_tree_node_pos = _match_seq(vertex.out_neighbours, tree_node_pos) if quant_children is None: rest_children, rest_tree_node_pos = _match_seq(grammar_vertices[vertex_pos + 1:], tree_node_pos) if rest_children is not None: return seq_children + [(UnparserRuleQuantifier(idx=vertex.idx, start=vertex.start, stop=vertex.stop if vertex.stop != 'inf' else math.inf), quantifier_children)] + rest_children, rest_tree_node_pos # If matching fails, attempt to backtrack to a previous quantified position. while backtrack_positions: tree_node_pos = backtrack_positions.pop() quantifier_children.pop() # Remove the last unsuccessful quantified attempt rest_children, rest_tree_node_pos = _match_seq(grammar_vertices[vertex_pos + 1:], tree_node_pos) if rest_children is not None: return seq_children + [(UnparserRuleQuantifier(idx=vertex.idx, start=vertex.start, stop=vertex.stop if vertex.stop != 'inf' else math.inf), quantifier_children)] + rest_children, rest_tree_node_pos return None, tree_node_pos # Complete failure, return the last unmatched position # Successful match, store the position for possible backtracking. quantifier_children += [(UnparserRuleQuantified(), quant_children)] backtrack_positions.append(tree_node_pos) tree_node_pos = quant_tree_node_pos rest_children, rest_tree_node_pos = _match_seq(grammar_vertices[vertex_pos + 1:], tree_node_pos) if rest_children is not None: return seq_children + [(UnparserRuleQuantifier(idx=vertex.idx, start=vertex.start, stop=vertex.stop), quantifier_children)] + rest_children, rest_tree_node_pos return None, tree_node_pos assert False, vertex return seq_children, tree_node_pos # Separate regular and hidden children of a tree node tree_nodes, hidden_nodes = [], [] prev_child = None for child in rule.children: if isinstance(child, UnlexerRule) and '_'.join(child.name) in self._hidden: hidden_nodes.append((child, prev_child)) else: tree_nodes.append(child) prev_child = child # Match the right-hand side of the parser rule to the regular children of the tree node # They MUST match, since ANTLR has already parsed them # During matching, quantifier and alternation structures are identified rule_children, rule_tree_node_pos = _match_seq(self._graph.vertices[rule.name].out_neighbours + [None], 0) if rule_children is None: logger.warning('Failed to match %s tree node to the related grammar rule at %d.', rule.name, rule_tree_node_pos) return # Detach all children from the tree node so that they can be reattached # in a structured way afterwards for child in rule.children: child.parent = None rule.children = [] # Reattach all regular children def _reattach_children(rule, children): for child in children: if isinstance(child, tuple): child, grandchildren = child _reattach_children(child, grandchildren) rule.add_child(child) _reattach_children(rule, rule_children) # Reattach all hidden children for child, prev_child in hidden_nodes: if prev_child is None: rule.insert_child(0, child) else: prev_child.parent.insert_child(prev_child.parent.children.index(prev_child) + 1, child) # Adjust all rules ... for rule in rules: # ... except for unlexer rules. if isinstance(rule, UnlexerRule) or not rule.children: continue _adjust_rule(rule) # Post-process parser rules to remove the artificial alternative inserted # above labelled alternatives with recurring label and fix the alternative # index of the root alternative of such constructs. for rule in rules: if not isinstance(rule, UnparserRule): continue for child in rule.children: if (isinstance(child, UnparserRuleAlternative) and len(child.children) == 1 and isinstance(grandchild := child.children[0], UnparserRule) and len(grandchild.children) == 1 and isinstance(grandgrandchild := grandchild.children[0], UnparserRuleAlternative) and len(rule.name) == 1 and len(grandchild.name) == 2 and rule.name[0] == grandchild.name[0] and child.alt_idx == grandgrandchild.alt_idx): child.idx = grandgrandchild.idx children_to_hoist = list(grandgrandchild.children) grandgrandchild.remove() grandchild.add_children(children_to_hoist) # Stringify rule names. for rule in rules: assert isinstance(rule.name, tuple), rule.name rule.name = '_'.join(rule.name) # Create an ANTLR tree from the input stream and convert it to Grammarinator tree. def _create_tree(self, input_stream: InputStream, fn: str | None) -> Rule | None: try: lexer = self._lexer_cls(input_stream) lexer.addErrorListener(ExtendedErrorListener()) parser = self._parser_cls(CommonTokenStream(lexer)) parse_tree_root = getattr(parser, self._rule)() if parser._syntaxErrors: if self._strict: logger.warning('%s syntax errors detected in %s. Skipping.', parser._syntaxErrors, fn) return None logger.warning('%s syntax errors detected in %s.', parser._syntaxErrors, fn) root, depth, rules = self._antlr_to_grammarinator_tree(parse_tree_root, parser) assert root and isinstance(root, UnparserRule), 'The root of the converted tree is None or not a parser rule.' if depth > self._max_depth: logger.warning('The tree representation of %s is %s, too deep. Skipping.', fn, depth) return None self._adjust_tree_to_generator(rules) for transformer in self._transformers: root = transformer(root) return root except Exception as e: logger.warning('Exception while parsing %s.', fn or '', exc_info=e) return None
[docs] def parse(self, fn: str) -> None: """ Load content from file, parse it to an ANTLR tree, convert it to Grammarinator tree, and save it to population. :param fn: Path to the input file. """ logger.info('Process file %s.', fn) try: root = self._create_tree(FileStream(fn, encoding=self._encoding, errors=self._errors), fn) if root is not None and self._population is not None: self._population.add_individual(root, path=fn) except Exception as e: logger.warning('Exception while processing %s.', fn, exc_info=e)