Source code for grammarinator.tool.generator

# Copyright (c) 2017-2026 Renata Hodovan, Akos Kiss.
#
# Licensed under the BSD 3-Clause License
# <LICENSE.rst or https://opensource.org/licenses/BSD-3-Clause>.
# This file may not be copied, modified, or distributed except
# according to those terms.

import os
import random

from contextlib import nullcontext
from copy import deepcopy
from os.path import abspath, dirname
from shutil import rmtree
from typing import Callable, Sequence

import xxhash

from inators import log as logging

from ..runtime import Generator, DefaultModel, Individual, Listener, Model, Population, Rule, RuleSize, UnlexerRule, UnparserRule, WeightedModel

logger = logging.getLogger(__name__)


[docs] class GeneratorFactory: """ Base class of generator factories. A generator factory is a generalization of a generator class. It has to be a callable that, when called, must return a generator instance. It must also expose some properties of the generator class it generalizes that are required to guide generation or mutation by :class:`GeneratorTool`. This factory generalizes a generator class by simply wrapping it and forwarding call operations to instantiations of the wrapped class. Furthermore, generator factories deriving from this base class are guaranteed to expose all the required generator class properties. """ def __init__(self, generator_class: type[Generator]) -> None: """ :param generator_class: The class of the wrapped generator. """ self._generator_class: type[Generator] = generator_class #: The class of the wrapped generator. # Exposing some class variables of the encapsulated generator. # In the generator class, they start with `_` to avoid any kind of # collision with rule names, so they start with `_` here as well. self._rule_sizes: dict[str, RuleSize] = generator_class._rule_sizes # Sizes of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor. self._alt_sizes: tuple[tuple[RuleSize, ...], ...] = generator_class._alt_sizes # Sizes of the alternatives of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor. self._quant_sizes: tuple[RuleSize, ...] = generator_class._quant_sizes # Sizes of the quantifiers of the rules, used to determine the minimum size of the generated trees. Generated into the generator subclasses by processor.
[docs] def __call__(self, limit: RuleSize | None = None) -> Generator: """ Create a new generator instance. :param limit: The limit on the depth of the trees and on the number of tokens (number of unlexer rule calls), i.e., it must be possible to finish generation from the selected node so that the overall depth and token count of the tree does not exceed these limits (default: :class:`~grammarinator.runtime.RuleSize`. ``max``). Used to instantiate the generator. :return: The created generator instance. """ return self._generator_class(limit=limit)
[docs] class DefaultGeneratorFactory(GeneratorFactory): """ The default generator factory implementation. When called, a new generator instance is created backed by a new decision model instance and a set of newly created listener objects is attached. """ def __init__(self, generator_class: type[Generator], *, model_class: type[Model] | None = None, weights: dict[tuple[str, int, int], float] | None = None, probs: dict[tuple[str, int], float] | None = None, listener_classes: list[type[Listener]] | None = None) -> None: """ :param generator_class: The class of the generator to instantiate. :param model_class: The class of the model to instantiate. The model instance is used to instantiate the generator. :param weights: Initial multipliers of alternatives. Used to instantiate a :class:`~grammarinator.runtime.WeightedModel` wrapper around the model. :param probs: Initial custom probabilities for quantifiers. Used to instantiate a :class:`~grammarinator.runtime.WeightedModel` wrapper around the model. :param listener_classes: List of listener classes to instantiate and attach to the generator. """ super().__init__(generator_class) self._model_class: type[Model] = model_class or DefaultModel self._weights: dict[tuple[str, int, int], float] | None = weights self._probs: dict[tuple[str, int], float] | None = probs self._listener_classes: list[type[Listener]] = listener_classes or []
[docs] def __call__(self, limit: RuleSize | None = None) -> Generator: """ Create a new generator instance according to the settings specified for the factory instance and for this method. :param limit: The limit on the depth of the trees and on the number of tokens (number of unlexer rule calls), i.e., it must be possible to finish generation from the selected node so that the overall depth and token count of the tree does not exceed these limits (default: :class:`~grammarinator.runtime.RuleSize`. ``max``). Used to instantiate the generator. :return: The created generator instance. """ model = self._model_class() if self._weights or self._probs: model = WeightedModel(model, weights=self._weights, probs=self._probs) listeners = [] for listener_class in self._listener_classes: listeners.append(listener_class()) generator = self._generator_class(model=model, listeners=listeners, limit=limit) return generator
[docs] class GeneratorTool: """ Tool to create new test cases using the generator produced by ``grammarinator-process``. """ def __init__(self, generator_factory: type[Generator] | GeneratorFactory, out_format: str, lock=None, rule: str | None = None, limit: RuleSize | None = None, population: Population | None = None, keep_trees: bool = False, generate: bool = True, mutate: bool = True, recombine: bool = True, unrestricted: bool = True, allowlist: list[str] | None = None, blocklist: list[str] | None = None, transformers: list[Callable[[Rule], Rule]] | None = None, serializer: Callable[[Rule], str] | None = None, memo_size: int = 0, unique_attempts: int = 2, cleanup: bool = True, encoding: str = 'utf-8', errors: str = 'strict', dry_run: bool = False): """ :param generator_factory: A callable that can produce instances of a generator. It is a generalization of a generator class: it has to instantiate a generator object, and it may also set the decision model and the listeners of the generator as well. It also has to expose some properties of the generator class necessary to guide generation or mutation. In the simplest case, it can be a ``grammarinator-process``-created subclass of :class:`Generator`, but in more complex scenarios a factory can be used, e.g., an instance of a subclass of :class:`GeneratorFactory`, like :class:`DefaultGeneratorFactory`. :param rule: Name of the rule to start generation from (default: the default rule of the generator). :param out_format: Test output description. It can be a file path pattern possibly including the ``%d`` placeholder which will be replaced by the index of the test case. Otherwise, it can be an empty string, which will result in printing the test case to the stdout (i.e., not saving to file system). :param lock: Lock object necessary when printing test cases in parallel (optional). :type lock: :class:`multiprocessing.Lock` | None :param limit: The limit on the depth of the trees and on the number of tokens (number of unlexer rule calls), i.e., it must be possible to finish generation from the selected node so that the overall depth and token count of the tree does not exceed these limits (default: :class:`~grammarinator.runtime.RuleSize`. ``max``). :param population: Tree pool for mutation and recombination, e.g., an instance of :class:`FilePopulation`. :param keep_trees: Keep generated trees to participate in further mutations or recombinations (otherwise, only the initial population will be mutated or recombined). It has effect only if population is defined. :param generate: Enable generating new test cases from scratch, i.e., purely based on grammar. :param mutate: Enable mutating existing test cases, i.e., re-generate part of an existing test case based on grammar. :param recombine: Enable recombining existing test cases, i.e., replace part of a test case with a compatible part from another test case. :param unrestricted: Enable applying possibly grammar-violating creators. :param allowlist: List of mutators to allow (by default, all mutators are allowed). :param blocklist: List of mutators to block (by default, no mutators are blocked). :param transformers: List of transformers to be applied to postprocess the generated tree before serializing it. :param serializer: A serializer that takes a tree and produces a string from it (default: :class:`str`). See :func:`grammarinator.runtime.simple_space_serializer` for a simple solution that concatenates tokens with spaces. :param memo_size: The number of most recently created unique tests memoized (default: 0). :param unique_attempts: The limit on how many times to try to generate a unique (i.e., non-memoized) test case. It has no effect if ``memo_size`` is 0 (default: 2). :param cleanup: Enable deleting the generated tests at :meth:`__exit__`. :param encoding: Output file encoding. :param errors: Encoding error handling scheme. :param dry_run: Enable or disable the saving or printing of the result of generation. """ self._generator_factory = generator_factory self._transformers = transformers or [] self._serializer = serializer or str self._rule = rule if out_format and not dry_run: os.makedirs(abspath(dirname(out_format)), exist_ok=True) self._out_format = out_format self._lock = lock or nullcontext() self._limit = limit or RuleSize.max self._population = population self._enable_generation = generate self._enable_mutation = mutate self._enable_recombination = recombine self._keep_trees = keep_trees self._memo: dict[int, None] = {} # NOTE: value associated to test is unimportant, but dict keeps insertion order while set does not self._memo_size = memo_size self._unique_attempts = max(unique_attempts, 1) self._cleanup = cleanup self._encoding = encoding self._errors = errors self._dry_run = dry_run self.allowlist = allowlist or [] self.blocklist = blocklist or [] self._generators: list[Callable[[Individual | None, Individual | None], Rule | None]] = [] self._mutators: list[Callable[[Individual | None, Individual | None], Rule | None]] = [] self._recombiners: list[Callable[[Individual | None, Individual | None], Rule | None]] = [] if generate: self._allow_creator(self._generators, "generate", self.generate) if mutate: self._allow_creator(self._mutators, "regenerate_rule", self.regenerate_rule) self._allow_creator(self._mutators, "delete_quantified", self.delete_quantified) self._allow_creator(self._mutators, "replicate_quantified", self.replicate_quantified) self._allow_creator(self._mutators, "shuffle_quantifieds", self.shuffle_quantifieds) self._allow_creator(self._mutators, "hoist_rule", self.hoist_rule) self._allow_creator(self._mutators, "swap_local_nodes", self.swap_local_nodes) self._allow_creator(self._mutators, "insert_local_node", self.insert_local_node) if unrestricted: self._allow_creator(self._mutators, "unrestricted_delete", self.unrestricted_delete) self._allow_creator(self._mutators, "unrestricted_hoist_rule", self.unrestricted_hoist_rule) if recombine: self._allow_creator(self._recombiners, "replace_node", self.replace_node) self._allow_creator(self._recombiners, "insert_quantified", self.insert_quantified) def __enter__(self): return self
[docs] def __exit__(self, exc_type, exc_val, exc_tb): """ Delete the output directory if the tests were saved to files and if ``cleanup`` was enabled. """ if self._cleanup and self._out_format and not self._dry_run: rmtree(dirname(self._out_format))
def _allow_creator(self, creator_map: list[Callable[[Individual | None, Individual | None], Rule | None]], creator_name: str, creator: Callable[[Individual | None, Individual | None], Rule | None]): if creator_name not in self.blocklist and (not self.allowlist or creator_name in self.allowlist): creator_map.append(creator) def print_mutator(self, fmt: str, *args) -> None: logger.debug('GrammarinatorMutator [%s]', fmt.format(*args))
[docs] def create_test(self, index: int) -> str | None: """ Create a new test case with a randomly selected generator method from the available options (see :meth:`generate`, :meth:`mutate`, and :meth:`recombine`). The generated tree is transformed, serialized and saved according to the parameters used to initialize the current tool object. :param index: Index of the test case to be generated. :return: Path to the generated serialized test file. It may be empty if the tool object was initialized with an empty ``out_format`` or ``None`` if ``dry_run`` was enabled, and hence the test file was not saved. """ for attempt in range(1, self._unique_attempts + 1): root = self.create() if not root: logger.debug("test case #%d, attempt %d/%d: could not be generated", index, attempt, self._unique_attempts) continue test = self._serializer(root) if self._memoize_test(test): break logger.debug("test case #%d, attempt %d/%d: already generated among the last %d unique test cases", index, attempt, self._unique_attempts, len(self._memo)) if self._dry_run or not root: return None test_fn = self._out_format % index if '%d' in self._out_format else self._out_format if self._population is not None and self._keep_trees: self._population.add_individual(root, path=test_fn) if test_fn: with open(test_fn, 'w', encoding=self._encoding, errors=self._errors, newline='') as f: f.write(test) else: with self._lock: print(test) return test_fn
def _memoize_test(self, input: str) -> bool: # Memoize the (hash of the) test case. The size of the memo is capped # by ``memo_size``, i.e., it contains at most that many test cases. # Returns ``False`` if the test case was already in the memo, ``True`` # if it got added now (or memoization is disabled by ``memo_size=0``). # When the memo is full and a new test case is added, the oldest entry # is evicted. if self._memo_size < 1: return True test = xxhash.xxh3_64_intdigest(input.encode(encoding=self._encoding, errors=self._errors)) if test in self._memo: return False self._memo[test] = None if len(self._memo) > self._memo_size: del self._memo[next(iter(self._memo))] return True def _select_creator(self, creators: list[Callable[[Individual | None, Individual | None], Rule | None]], individual1: Individual | None, individual2: Individual | None) -> Callable[[Individual | None, Individual | None], Rule | None]: # pylint: disable=unused-argument # NOTE: May be overridden. return random.choice(creators) def _create_tree(self, creators: list[Callable[[Individual | None, Individual | None], Rule | None]], individual1: Individual | None, individual2: Individual | None) -> Rule | None: # Note: creators is potentially modified (creators that return None are removed). Always ensure it is a copy when calling this method. while creators: creator = self._select_creator(creators, individual1, individual2) if individual1 is not None: logger.trace("Original test: '%s'", self._serializer(individual1.root)) root = creator(individual1, individual2) if root: break creators.remove(creator) else: logger.warning("No test creators could produce a tree") return None for transformer in self._transformers: root = transformer(root) return root
[docs] def create(self) -> Rule | None: """ Create a new tree with a randomly selected generator method from the available options (see :meth:`generate`, :meth:`mutate`, and :meth:`recombine`). The generated tree is also transformed according to the parameters used to initialize the current tool object. :return: The root of the created tree. """ individual1, individual2 = (self._ensure_individuals(None, None)) if self._population else (None, None) creators = [] creators.extend(self._generators) if self._population: creators.extend(self._mutators) creators.extend(self._recombiners) return self._create_tree(creators, individual1, individual2)
[docs] def mutate(self, individual: Individual | None = None) -> Rule | None: """ Dispatcher method for mutation operators: it picks one operator randomly and creates a new tree by applying the operator to an individual. The generated tree is also transformed according to the parameters used to initialize the current tool object. Supported mutation operators: :meth:`regenerate_rule`, :meth:`delete_quantified`, :meth:`replicate_quantified`, :meth:`shuffle_quantifieds`, :meth:`hoist_rule`, :meth:`unrestricted_delete`, :meth:`unrestricted_hoist_rule`, :meth:`swap_local_nodes`, :meth:`insert_local_node` :param individual: The population item to be mutated. :return: The root of the mutated tree. """ # NOTE: Intentionally does not check self._enable_mutation! # If you call this explicitly, then so be it, even if mutation is disabled. # If individual is None, population MUST exist. individual = self._ensure_individual(individual) return self._create_tree(self._mutators[:], individual, None)
[docs] def recombine(self, individual1: Individual | None = None, individual2: Individual | None = None) -> Rule | None: """ Dispatcher method for recombination operators: it picks one operator randomly and creates a new tree by applying the operator to an individual. The generated tree is also transformed according to the parameters used to initialize the current tool object. Supported recombination operators: :meth:`replace_node`, :meth:`insert_quantified` :param individual1: The population item to be used as a recipient during crossover. :param individual2: The population item to be used as a donor during crossover. :return: The root of the recombined tree. """ # NOTE: Intentionally does not check self._enable_recombination! # If you call this explicitly, then so be it, even if recombination is disabled. # If any of the individuals is None, population MUST exist. individual1, individual2 = self._ensure_individuals(individual1, individual2) return self._create_tree(self._recombiners[:], individual1, individual2)
[docs] def generate(self, _individual1: Individual | None = None, _individual2: Individual | None = None, *, rule: str | None = None, reserve: RuleSize | None = None) -> UnlexerRule | UnparserRule: """ Instantiate a new generator and generate a new tree from scratch. :param rule: Name of the rule to start generation from. :param reserve: Size budget that needs to be put in reserve before generating the tree. Practically, deduced from the initially specified limit. (default values: 0, 0) :return: The root of the generated tree. """ # NOTE: Intentionally does not check self._enable_generation! # If you call this explicitly, then so be it, even if generation is disabled. reserve = reserve if reserve is not None else RuleSize() generator = self._generator_factory(limit=self._limit - reserve) rule = rule or self._rule or generator._default_rule.__name__ if not hasattr(generator, rule): logger.error('Rule %s not found.', rule) raise AttributeError(rule) self.print_mutator('{}: {}', self.generate.__name__, rule) return getattr(generator, rule)()
def _ensure_individual(self, individual: Individual | None) -> Individual: if individual is None: assert self._population is not None individual = self._population.select_individual() return individual def _ensure_individuals(self, individual1: Individual | None, individual2: Individual | None) -> tuple[Individual, Individual]: individual1 = self._ensure_individual(individual1) if individual2 is None: assert self._population is not None individual2 = self._population.select_individual(individual1) return individual1, individual2
[docs] def regenerate_rule(self, individual: Individual | None = None, _=None) -> Rule: """ Mutate a tree at a random position, i.e., discard and re-generate its sub-tree at a randomly selected node. :param individual: The population item to be mutated. :return: The root of the mutated tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations # Filter items from the nodes of the selected tree that can be regenerated # within the current maximum depth and token limit (except immutable nodes). root_tokens = annot.node_tokens[root] options = [] for rule_name, nodes in annot.rules_by_name.items(): rule_size = self._generator_factory._rule_sizes.get(rule_name) if rule_size is None: logger.error('Unknown rule name in generation: %s.', rule_name) continue for node in nodes: if (node.parent is not None and annot.node_levels[node] + rule_size.depth < self._limit.depth and root_tokens - annot.node_tokens[node] + rule_size.tokens < self._limit.tokens): options.append(node) if options: mutated_node = random.choice(options) self.print_mutator('{}: {}', self.regenerate_rule.__name__, mutated_node.name) reserve = RuleSize(depth=annot.node_levels[mutated_node], tokens=root_tokens - annot.node_tokens[mutated_node]) mutated_node = mutated_node.replace(self.generate(rule=mutated_node.name, reserve=reserve)) # type: ignore[assignment] return mutated_node.root # If selection strategy fails, we fallback and discard the whole tree # and generate a brand new one instead. logger.trace('%s failed.', self.regenerate_rule.__name__) return self.generate(rule=root.name)
[docs] def replace_node(self, recipient_individual: Individual | None = None, donor_individual: Individual | None = None) -> Rule | None: """ Recombine two trees at random positions where the nodes are compatible with each other (i.e., they share the same node name). One of the trees is called the recipient while the other is the donor. The sub-tree rooted at a random node of the recipient is discarded and replaced by the sub-tree rooted at a random node of the donor. :param recipient_individual: The population item to be used as a recipient during crossover. :param donor_individual: The population item to be used as a donor during crossover. :return: The root of the recombined tree. """ recipient_individual, donor_individual = self._ensure_individuals(recipient_individual, donor_individual) recipient_root, recipient_annot = recipient_individual.root, recipient_individual.annotations donor_annot = donor_individual.annotations recipient_lookup: dict[str, Sequence[Rule]] = dict(recipient_annot.nodes_by_name) donor_lookup: dict[str, Sequence[Rule]] = dict(donor_annot.nodes_by_name) common_types = sorted(set(recipient_lookup.keys()) & set(donor_lookup.keys())) recipient_options = [(rule_name, node) for rule_name in common_types for node in recipient_lookup[rule_name] if node.parent] recipient_root_tokens = recipient_annot.node_tokens[recipient_root] # Shuffle suitable nodes with sample. for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)): donor_options = donor_lookup[rule_name] recipient_node_level = recipient_annot.node_levels[recipient_node] recipient_node_tokens = recipient_annot.node_tokens[recipient_node] for donor_node in random.sample(donor_options, k=len(donor_options)): # Make sure that the output tree won't exceed the depth limit. if (recipient_node_level + donor_annot.node_depths[donor_node] <= self._limit.depth and recipient_root_tokens - recipient_node_tokens + donor_annot.node_tokens[donor_node] < self._limit.tokens): self.print_mutator('{}: {}', self.replace_node.__name__, recipient_node.rule_name) recipient_node.replace(donor_node) return recipient_root logger.trace('%s failed.', self.replace_node.__name__) return None
[docs] def insert_quantified(self, recipient_individual: Individual | None = None, donor_individual: Individual | None = None) -> Rule | None: """ Selects two compatible quantifier nodes from two trees randomly and if the quantifier node of the recipient tree is not full (the number of its children is less than the maximum count), then add one new child to it at a random position from the children of donors quantifier node. :param recipient_individual: The population item to be used as a recipient during crossover. :param donor_individual: The population item to be used as a donor during crossover. :return: The root of the extended tree. """ recipient_individual, donor_individual = self._ensure_individuals(recipient_individual, donor_individual) recipient_root, recipient_annot = recipient_individual.root, recipient_individual.annotations donor_annot = donor_individual.annotations common_types = sorted(set(recipient_annot.quants_by_name.keys()) & set(donor_annot.quants_by_name.keys())) recipient_options = [(name, node) for name in common_types for node in recipient_annot.quants_by_name[name] if len(node.children) < node.stop] recipient_root_tokens = recipient_annot.node_tokens[recipient_root] for rule_name, recipient_node in random.sample(recipient_options, k=len(recipient_options)): recipient_node_level = recipient_annot.node_levels[recipient_node] donor_options = [quantified for quantifier in donor_annot.quants_by_name[rule_name] for quantified in quantifier.children] for donor_node in random.sample(donor_options, k=len(donor_options)): # Make sure that the output tree won't exceed the depth and token limits. if (recipient_node_level + donor_annot.node_depths[donor_node] <= self._limit.depth and recipient_root_tokens + donor_annot.node_tokens[donor_node] < self._limit.tokens): self.print_mutator('{}: {}, {}', self.insert_quantified.__name__, recipient_node.rule_name, recipient_node.idx) recipient_node.insert_child(random.randint(0, len(recipient_node.children)), donor_node) return recipient_root logger.trace('%s failed.', self.insert_quantified.__name__) return None
[docs] def delete_quantified(self, individual: Individual | None = None, _=None) -> Rule | None: """ Removes an optional subtree randomly from a quantifier node. :param individual: The population item to be mutated. :return: The root of the modified tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations options = [child for node in annot.quants if len(node.children) > node.start for child in node.children] if options: removed_node = random.choice(options) self.print_mutator('{}: {}, {}', self.delete_quantified.__name__, removed_node.rule_name, removed_node.parent.idx) # type: ignore[union-attr] removed_node.remove() return root logger.trace('%s failed.', self.delete_quantified.__name__) return None
[docs] def unrestricted_delete(self, individual: Individual | None = None, _=None) -> Rule | None: """ Remove a subtree rooted in any kind of rule node randomly without any further restriction. :param individual: The population item to be mutated. :return: The root of the modified tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations options = [node for node in annot.rules if node != root] if options: removed_node = random.choice(options) self.print_mutator('{}: {}', self.unrestricted_delete.__name__, removed_node.name) removed_node.remove() return root logger.trace('%s failed.', self.unrestricted_delete.__name__) return None
[docs] def replicate_quantified(self, individual: Individual | None = None, _=None) -> Rule | None: """ Select a quantified sub-tree randomly, replicate it and insert it again if the maximum quantification count is not reached yet. :param individual: The population item to be mutated. :return: The root of the modified tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations root_options = [node for node in annot.quants if node.stop > len(node.children)] recipient_root_tokens = annot.node_tokens[root] node_options = [child for root in root_options for child in root.children if recipient_root_tokens < recipient_root_tokens + annot.node_tokens[child] <= self._limit.tokens] if node_options: node_to_repeat = random.choice(node_options) max_repeat = (self._limit.tokens - recipient_root_tokens) // annot.node_tokens[node_to_repeat] if self._limit.tokens != RuleSize.max.tokens else 1 repeat = random.randint(1, int(max_repeat)) if max_repeat > 1 else 1 for _ in range(repeat): node_to_repeat.parent.insert_child(idx=random.randint(0, len(node_to_repeat.parent.children)), node=deepcopy(node_to_repeat)) # type: ignore[union-attr] self.print_mutator('{}: {}, {}', self.replicate_quantified.__name__, node_to_repeat.rule_name, node_to_repeat.parent.idx) # type: ignore[union-attr] return root logger.trace('%s failed.', self.replicate_quantified.__name__) return None
[docs] def shuffle_quantifieds(self, individual: Individual | None = None, _=None) -> Rule | None: """ Select a quantifier node and shuffle its quantified sub-trees. :param individual: The population item to be mutated. :return: The root of the modified tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations options = [node for node in annot.quants if len(node.children) > 1] if options: node_to_shuffle = random.choice(options) random.shuffle(node_to_shuffle.children) self.print_mutator('{}: {}, {}', self.shuffle_quantifieds.__name__, node_to_shuffle.rule_name, node_to_shuffle.idx) return root logger.trace('%s failed.', self.shuffle_quantifieds.__name__) return None
[docs] def hoist_rule(self, individual: Individual | None = None, _=None) -> Rule | None: """ Select an individual of the population to be mutated and select two rule nodes from it which share the same rule name and are in ancestor-descendant relationship making possible for the descendant to replace its ancestor. :param individual: The population item to be mutated. :return: The root of the hoisted tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations for rule in random.sample(annot.rules, k=len(annot.rules)): parent = rule.parent while parent: if parent.name == rule.name: self.print_mutator('{}: {}', self.hoist_rule.__name__, parent.name) parent.replace(rule) return root parent = parent.parent logger.trace('%s failed.', self.hoist_rule.__name__) return None
[docs] def unrestricted_hoist_rule(self, individual: Individual | None = None, _=None) -> Rule | None: """ Select two rule nodes from the input individual which are in ancestor-descendant relationship (without type compatibility check) and replace the ancestor with the selected descendant. :param individual: The population item to be mutated. :return: The root of the modified tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations for rule in random.sample(annot.rules, k=len(annot.rules)): options = [] parent = rule.parent while parent and parent != root: if isinstance(parent, UnparserRule) and len(parent.children) > 1 and not rule.equalTokens(parent): options.append(parent) parent = parent.parent if options: hoist_parent = random.choice(options) self.print_mutator('{}: {}, {}', self.unrestricted_hoist_rule.__name__, hoist_parent.name, rule.name) hoist_parent.replace(rule) return root logger.trace('%s failed.', self.unrestricted_hoist_rule.__name__) return None
[docs] def swap_local_nodes(self, individual: Individual | None = None, _=None) -> Rule | None: """ Swap two non-overlapping subtrees at random positions in a single test where the nodes are compatible with each other (i.e., they share the same node name). :param individual: The population item to be mutated :return: The root of the mutated tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations options: dict[str, Sequence[Rule]] = dict(annot.nodes_by_name) for _, nodes in random.sample(list(options.items()), k=len(options)): # Skip node types without two instances. if len(nodes) < 2: continue shuffled = random.sample(nodes, k=len(nodes)) for i, first_node in enumerate(shuffled[:-1]): first_node_level = annot.node_levels[first_node] first_node_depth = annot.node_depths[first_node] for second_node in shuffled[i + 1:]: second_node_level = annot.node_levels[second_node] second_node_depth = annot.node_depths[second_node] if (first_node_level + second_node_depth > self._limit.depth and second_node_level + first_node_depth > self._limit.depth): continue # Avoid swapping two identical nodes with each other. if first_node.equalTokens(second_node): continue # Ensure the subtrees rooted at recipient and donor nodes are disjunct. upper_node, lower_node = (first_node, second_node) if first_node_level < second_node_level else (second_node, first_node) disjunct = True parent = lower_node.parent while parent and disjunct: disjunct = parent != upper_node parent = parent.parent if not disjunct: continue first_parent = first_node.parent second_parent = second_node.parent assert first_parent is not None and second_parent is not None, 'Both nodes must have a parent.' first_parent.children[first_parent.children.index(first_node)] = second_node second_parent.children[second_parent.children.index(second_node)] = first_node first_node.parent = second_parent second_node.parent = first_parent self.print_mutator('{}: {}', self.swap_local_nodes.__name__, first_node.rule_name) return root logger.trace('%s failed.', self.swap_local_nodes.__name__) return None
[docs] def insert_local_node(self, individual: Individual | None = None, _=None) -> Rule | None: """ Select two compatible quantifier nodes from a single test and insert a random quantified subtree of the second one into the first one at a random position, while the quantifier restrictions are ensured. :param individual: The population item to be mutated :return: The root of the mutated tree. """ individual = self._ensure_individual(individual) root, annot = individual.root, individual.annotations options = [quantifiers for quantifiers in annot.quants_by_name.values() if len(quantifiers) > 1] if not options: logger.trace('%s failed.', self.insert_local_node.__name__) return None root_tokens = annot.node_tokens[root] for quantifiers in random.sample(options, k=len(options)): shuffled = random.sample(quantifiers, k=len(quantifiers)) for i, recipient_node in enumerate(shuffled[:-1]): if len(recipient_node.children) >= recipient_node.stop: continue recipient_node_level = annot.node_levels[recipient_node] for donor_quantifier in shuffled[i + 1:]: for donor_quantified_node in donor_quantifier.children: if (recipient_node_level + annot.node_depths[donor_quantified_node] <= self._limit.depth and root_tokens + annot.node_tokens[donor_quantified_node] <= self._limit.tokens): recipient_node.insert_child(random.randint(0, len(recipient_node.children)) if recipient_node.children else 0, deepcopy(donor_quantified_node)) self.print_mutator('{}: {}, {}', self.insert_local_node.__name__, recipient_node.rule_name, recipient_node.idx) return root logger.trace('%s failed.', self.insert_local_node.__name__) return None