Skip to content

Core Engine (alnoms.core)

The orchestration and performance profiling logic for Pre-Deployment Governance.

⏱️ Performance Profiling

Industrial‑grade performance analyzer for algorithm benchmarking.

The Profiler supports:

  • Precision timing using timeit.default_timer
  • Warmup runs to stabilize CPU cache and branch predictors
  • Statistical aggregation (min, mean, median)
  • Doubling‑test complexity estimation
  • Decorator‑based profiling for normal program flow
  • Stress‑suite benchmarking for head‑to‑head comparisons

Attributes:

Name Type Description
repeats int

Number of timed runs per benchmark.

warmup int

Number of untimed warmup runs.

mode str

Statistical mode for final timing ('min', 'mean', 'median').

Source code in src/alnoms/core/profiler.py
class Profiler:
    """Industrial‑grade performance analyzer for algorithm benchmarking.

    The Profiler supports:

    - Precision timing using `timeit.default_timer`
    - Warmup runs to stabilize CPU cache and branch predictors
    - Statistical aggregation (min, mean, median)
    - Doubling‑test complexity estimation
    - Decorator‑based profiling for normal program flow
    - Stress‑suite benchmarking for head‑to‑head comparisons

    Attributes:
        repeats (int): Number of timed runs per benchmark.
        warmup (int): Number of untimed warmup runs.
        mode (str): Statistical mode for final timing ('min', 'mean', 'median').
    """

    def __init__(self, repeats: int = 5, warmup: int = 1, mode: str = "min"):
        """Initialize the Profiler with benchmark settings.

        Args:
            repeats (int): Number of timed runs per benchmark.
            warmup (int): Number of warmup runs to prime CPU cache.
            mode (str): Statistical mode ('min', 'mean', 'median').

        Notes:
            - `repeats` is clamped to at least 1.
            - `warmup` is clamped to at least 0.
        """
        self.repeats = max(1, repeats)
        self.warmup = max(0, warmup)
        self.mode = mode
        self._profile_stats = {}

    @contextmanager
    def stopwatch(self, label: str = "Block") -> Generator[None, None, None]:
        """Context manager for precision timing of a code block.

        Args:
            label (str): Identifier for the timed block.

        Yields:
            None: Execution of the wrapped block.

        Side Effects:
            - Records elapsed time under `self._profile_stats[label]`.
        """
        start = timeit.default_timer()
        try:
            yield
        finally:
            end = timeit.default_timer()
            elapsed = end - start
            self._profile_stats.setdefault(label, []).append(elapsed)

    def benchmark(self, func: Callable, *args: Any) -> float:
        """Benchmark a function with GC disabled for timing purity.

        Args:
            func (Callable): Function to benchmark.
            *args (Any): Arguments passed to the function.

        Returns:
            float: Execution time in seconds, aggregated using the configured mode.

        Notes:
            - Deepcopies arguments to avoid mutation across runs.
            - Disables garbage collection to reduce jitter.
        """
        # Warmup runs
        for _ in range(self.warmup):
            safe_args = copy.deepcopy(args)
            func(*safe_args)

        times = []
        gc_old = gc.isenabled()
        gc.disable()
        try:
            for _ in range(self.repeats):
                safe_args = copy.deepcopy(args)
                start = timeit.default_timer()
                func(*safe_args)
                end = timeit.default_timer()
                times.append(end - start)
        finally:
            if gc_old:
                gc.enable()

        # Statistical mode selection
        if self.mode == "median":
            return statistics.median(times)
        elif self.mode == "mean":
            return statistics.mean(times)
        return min(times)

    def run_doubling_test(
        self,
        func: Callable,
        input_gen: Callable[[int], Any],
        start_n: int = 50,
        rounds: int = 3,
        timeout: float = 15.0,
    ) -> List[Dict[str, Any]]:
        """Perform doubling analysis to estimate algorithmic complexity.

        Args:
            func (Callable): Algorithm under test.
            input_gen (Callable): Function generating input for size N.
            start_n (int): Initial input size.
            rounds (int): Number of doubling iterations.
            timeout (float): Maximum allowed runtime for the entire test.

        Returns:
            List[Dict[str, Any]]: A list of records containing:
                - "N": Input size
                - "Time": Execution time
                - "Ratio": T(2N) / T(N)
                - "Complexity": Estimated Big‑O class

        Notes:
            - Automatically increases recursion limit for deep algorithms.
            - Stops early if timeout is exceeded.
        """
        sys.setrecursionlimit(max(3000, sys.getrecursionlimit()))
        results = []
        prev_time = 0.0
        n = start_n
        start_clock = time.perf_counter()

        for _ in range(rounds):
            if time.perf_counter() - start_clock > timeout:
                break

            data = input_gen(n)
            args = data if isinstance(data, tuple) else (data,)
            curr_time = self.benchmark(func, *args)

            ratio = curr_time / prev_time if prev_time > 0 else 0.0
            complexity = self._guess_complexity(ratio)

            results.append(
                {"N": n, "Time": curr_time, "Ratio": ratio, "Complexity": complexity}
            )
            prev_time = curr_time
            n *= 2

        return results

    def profile(self, func: Callable) -> Callable:
        """Decorator for lightweight profiling during normal execution.

        Args:
            func (Callable): Function to wrap.

        Returns:
            Callable: Wrapped function that records execution time.

        Notes:
            - Stores timing data under `self._profile_stats[func.__name__]`.
        """

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start = timeit.default_timer()
            result = func(*args, **kwargs)
            end = timeit.default_timer()
            elapsed = end - start
            self._profile_stats.setdefault(func.__name__, []).append(elapsed)
            return result

        return wrapper

    def print_decorator_report(self) -> None:
        """Print a summary table of all decorator‑tracked timings.

        Displays:
            - Function/block label
            - Number of calls
            - Average time
            - Total time
        """
        print("\n📝 ALNOMS PROFILE REPORT")
        print(
            f"{'Label/Function':<20} | {'Calls':<6} | {'Avg Time (s)':<12} | {'Total Time'}"
        )
        print("-" * 65)
        for fname, times in self._profile_stats.items():
            avg_t = statistics.mean(times) if times else 0.0
            total_t = sum(times)
            print(f"{fname:<20} | {len(times):<6} | {avg_t:<12.5f} | {total_t:.5f}")

    def _guess_complexity(self, ratio: float) -> str:
        """Map doubling ratios to approximate Big‑O complexity classes.

        Args:
            ratio (float): Ratio T(2N) / T(N).

        Returns:
            str: Estimated complexity class.

        Notes:
            - Thresholds are widened to account for CPU jitter and frequency scaling.
        """
        if ratio <= 0:
            return "Initial Round"
        if ratio < 1.4:
            return "O(1) / O(log N)"
        if ratio < 2.8:
            return "O(N)"
        if ratio < 5.5:
            return "O(N^2)"
        if ratio < 10.0:
            return "O(N^3)"
        return "High Growth / Exponential"

    def print_analysis(self, func_name: str, results: List[Dict[str, Any]]) -> None:
        """Print a formatted table from a doubling test.

        Args:
            func_name (str): Name of the analyzed function.
            results (List[Dict[str, Any]]): Output from `run_doubling_test`.
        """
        print(f"\n🔬 ANALYSIS: {func_name} (Mode: {self.mode})")
        print(f"{'N':<10} | {'Time (s)':<12} | {'Ratio':<8} | {'Est. Complexity':<15}")
        print("-" * 55)
        for row in results:
            r_str = f"{row['Ratio']:.2f}" if row["Ratio"] > 0 else "-"
            print(
                f"{row['N']:<10} | {row['Time']:<12.5f} | {r_str:<8} | {row['Complexity']:<15}"
            )

    def run_stress_suite(
        self,
        funcs: Dict[str, Callable],
        input_gen: Callable[[int], Any],
        n_values: List[int] = [1000, 2000, 4000],
    ) -> Dict[int, Dict[str, float]]:
        """Run multiple algorithms across multiple input sizes.

        Useful for head‑to‑head comparisons in research, teaching, and
        performance governance.

        Args:
            funcs (Dict[str, Callable]): Mapping of function names to callables.
            input_gen (Callable): Data generator for size N.
            n_values (List[int]): Input sizes to test.

        Returns:
            Dict[int, Dict[str, float]]:
                Nested mapping of `{N: {FunctionName: Time}}`.
        """
        suite_results = {}
        for n in n_values:
            suite_results[n] = {}
            data = input_gen(n)
            args = data if isinstance(data, tuple) else (data,)

            for name, func in funcs.items():
                suite_results[n][name] = self.benchmark(func, *args)
        return suite_results

__init__(repeats=5, warmup=1, mode='min')

Initialize the Profiler with benchmark settings.

Parameters:

Name Type Description Default
repeats int

Number of timed runs per benchmark.

5
warmup int

Number of warmup runs to prime CPU cache.

1
mode str

Statistical mode ('min', 'mean', 'median').

'min'
Notes
  • repeats is clamped to at least 1.
  • warmup is clamped to at least 0.
Source code in src/alnoms/core/profiler.py
def __init__(self, repeats: int = 5, warmup: int = 1, mode: str = "min"):
    """Initialize the Profiler with benchmark settings.

    Args:
        repeats (int): Number of timed runs per benchmark.
        warmup (int): Number of warmup runs to prime CPU cache.
        mode (str): Statistical mode ('min', 'mean', 'median').

    Notes:
        - `repeats` is clamped to at least 1.
        - `warmup` is clamped to at least 0.
    """
    self.repeats = max(1, repeats)
    self.warmup = max(0, warmup)
    self.mode = mode
    self._profile_stats = {}

benchmark(func, *args)

Benchmark a function with GC disabled for timing purity.

Parameters:

Name Type Description Default
func Callable

Function to benchmark.

required
*args Any

Arguments passed to the function.

()

Returns:

Name Type Description
float float

Execution time in seconds, aggregated using the configured mode.

Notes
  • Deepcopies arguments to avoid mutation across runs.
  • Disables garbage collection to reduce jitter.
Source code in src/alnoms/core/profiler.py
def benchmark(self, func: Callable, *args: Any) -> float:
    """Benchmark a function with GC disabled for timing purity.

    Args:
        func (Callable): Function to benchmark.
        *args (Any): Arguments passed to the function.

    Returns:
        float: Execution time in seconds, aggregated using the configured mode.

    Notes:
        - Deepcopies arguments to avoid mutation across runs.
        - Disables garbage collection to reduce jitter.
    """
    # Warmup runs
    for _ in range(self.warmup):
        safe_args = copy.deepcopy(args)
        func(*safe_args)

    times = []
    gc_old = gc.isenabled()
    gc.disable()
    try:
        for _ in range(self.repeats):
            safe_args = copy.deepcopy(args)
            start = timeit.default_timer()
            func(*safe_args)
            end = timeit.default_timer()
            times.append(end - start)
    finally:
        if gc_old:
            gc.enable()

    # Statistical mode selection
    if self.mode == "median":
        return statistics.median(times)
    elif self.mode == "mean":
        return statistics.mean(times)
    return min(times)

print_analysis(func_name, results)

Print a formatted table from a doubling test.

Parameters:

Name Type Description Default
func_name str

Name of the analyzed function.

required
results List[Dict[str, Any]]

Output from run_doubling_test.

required
Source code in src/alnoms/core/profiler.py
def print_analysis(self, func_name: str, results: List[Dict[str, Any]]) -> None:
    """Print a formatted table from a doubling test.

    Args:
        func_name (str): Name of the analyzed function.
        results (List[Dict[str, Any]]): Output from `run_doubling_test`.
    """
    print(f"\n🔬 ANALYSIS: {func_name} (Mode: {self.mode})")
    print(f"{'N':<10} | {'Time (s)':<12} | {'Ratio':<8} | {'Est. Complexity':<15}")
    print("-" * 55)
    for row in results:
        r_str = f"{row['Ratio']:.2f}" if row["Ratio"] > 0 else "-"
        print(
            f"{row['N']:<10} | {row['Time']:<12.5f} | {r_str:<8} | {row['Complexity']:<15}"
        )

print_decorator_report()

Print a summary table of all decorator‑tracked timings.

Displays
  • Function/block label
  • Number of calls
  • Average time
  • Total time
Source code in src/alnoms/core/profiler.py
def print_decorator_report(self) -> None:
    """Print a summary table of all decorator‑tracked timings.

    Displays:
        - Function/block label
        - Number of calls
        - Average time
        - Total time
    """
    print("\n📝 ALNOMS PROFILE REPORT")
    print(
        f"{'Label/Function':<20} | {'Calls':<6} | {'Avg Time (s)':<12} | {'Total Time'}"
    )
    print("-" * 65)
    for fname, times in self._profile_stats.items():
        avg_t = statistics.mean(times) if times else 0.0
        total_t = sum(times)
        print(f"{fname:<20} | {len(times):<6} | {avg_t:<12.5f} | {total_t:.5f}")

profile(func)

Decorator for lightweight profiling during normal execution.

Parameters:

Name Type Description Default
func Callable

Function to wrap.

required

Returns:

Name Type Description
Callable Callable

Wrapped function that records execution time.

Notes
  • Stores timing data under self._profile_stats[func.__name__].
Source code in src/alnoms/core/profiler.py
def profile(self, func: Callable) -> Callable:
    """Decorator for lightweight profiling during normal execution.

    Args:
        func (Callable): Function to wrap.

    Returns:
        Callable: Wrapped function that records execution time.

    Notes:
        - Stores timing data under `self._profile_stats[func.__name__]`.
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start = timeit.default_timer()
        result = func(*args, **kwargs)
        end = timeit.default_timer()
        elapsed = end - start
        self._profile_stats.setdefault(func.__name__, []).append(elapsed)
        return result

    return wrapper

run_doubling_test(func, input_gen, start_n=50, rounds=3, timeout=15.0)

Perform doubling analysis to estimate algorithmic complexity.

Parameters:

Name Type Description Default
func Callable

Algorithm under test.

required
input_gen Callable

Function generating input for size N.

required
start_n int

Initial input size.

50
rounds int

Number of doubling iterations.

3
timeout float

Maximum allowed runtime for the entire test.

15.0

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: A list of records containing: - "N": Input size - "Time": Execution time - "Ratio": T(2N) / T(N) - "Complexity": Estimated Big‑O class

Notes
  • Automatically increases recursion limit for deep algorithms.
  • Stops early if timeout is exceeded.
Source code in src/alnoms/core/profiler.py
def run_doubling_test(
    self,
    func: Callable,
    input_gen: Callable[[int], Any],
    start_n: int = 50,
    rounds: int = 3,
    timeout: float = 15.0,
) -> List[Dict[str, Any]]:
    """Perform doubling analysis to estimate algorithmic complexity.

    Args:
        func (Callable): Algorithm under test.
        input_gen (Callable): Function generating input for size N.
        start_n (int): Initial input size.
        rounds (int): Number of doubling iterations.
        timeout (float): Maximum allowed runtime for the entire test.

    Returns:
        List[Dict[str, Any]]: A list of records containing:
            - "N": Input size
            - "Time": Execution time
            - "Ratio": T(2N) / T(N)
            - "Complexity": Estimated Big‑O class

    Notes:
        - Automatically increases recursion limit for deep algorithms.
        - Stops early if timeout is exceeded.
    """
    sys.setrecursionlimit(max(3000, sys.getrecursionlimit()))
    results = []
    prev_time = 0.0
    n = start_n
    start_clock = time.perf_counter()

    for _ in range(rounds):
        if time.perf_counter() - start_clock > timeout:
            break

        data = input_gen(n)
        args = data if isinstance(data, tuple) else (data,)
        curr_time = self.benchmark(func, *args)

        ratio = curr_time / prev_time if prev_time > 0 else 0.0
        complexity = self._guess_complexity(ratio)

        results.append(
            {"N": n, "Time": curr_time, "Ratio": ratio, "Complexity": complexity}
        )
        prev_time = curr_time
        n *= 2

    return results

run_stress_suite(funcs, input_gen, n_values=[1000, 2000, 4000])

Run multiple algorithms across multiple input sizes.

Useful for head‑to‑head comparisons in research, teaching, and performance governance.

Parameters:

Name Type Description Default
funcs Dict[str, Callable]

Mapping of function names to callables.

required
input_gen Callable

Data generator for size N.

required
n_values List[int]

Input sizes to test.

[1000, 2000, 4000]

Returns:

Type Description
Dict[int, Dict[str, float]]

Dict[int, Dict[str, float]]: Nested mapping of {N: {FunctionName: Time}}.

Source code in src/alnoms/core/profiler.py
def run_stress_suite(
    self,
    funcs: Dict[str, Callable],
    input_gen: Callable[[int], Any],
    n_values: List[int] = [1000, 2000, 4000],
) -> Dict[int, Dict[str, float]]:
    """Run multiple algorithms across multiple input sizes.

    Useful for head‑to‑head comparisons in research, teaching, and
    performance governance.

    Args:
        funcs (Dict[str, Callable]): Mapping of function names to callables.
        input_gen (Callable): Data generator for size N.
        n_values (List[int]): Input sizes to test.

    Returns:
        Dict[int, Dict[str, float]]:
            Nested mapping of `{N: {FunctionName: Time}}`.
    """
    suite_results = {}
    for n in n_values:
        suite_results[n] = {}
        data = input_gen(n)
        args = data if isinstance(data, tuple) else (data,)

        for name, func in funcs.items():
            suite_results[n][name] = self.benchmark(func, *args)
    return suite_results

stopwatch(label='Block')

Context manager for precision timing of a code block.

Parameters:

Name Type Description Default
label str

Identifier for the timed block.

'Block'

Yields:

Name Type Description
None None

Execution of the wrapped block.

Side Effects
  • Records elapsed time under self._profile_stats[label].
Source code in src/alnoms/core/profiler.py
@contextmanager
def stopwatch(self, label: str = "Block") -> Generator[None, None, None]:
    """Context manager for precision timing of a code block.

    Args:
        label (str): Identifier for the timed block.

    Yields:
        None: Execution of the wrapped block.

    Side Effects:
        - Records elapsed time under `self._profile_stats[label]`.
    """
    start = timeit.default_timer()
    try:
        yield
    finally:
        end = timeit.default_timer()
        elapsed = end - start
        self._profile_stats.setdefault(label, []).append(elapsed)

🧠 Analysis & Decision Engine

Central orchestrator for the Alnoms governance pipeline.

This class coordinates:

  • Script execution and dynamic profiling
  • Static AST pattern detection
  • Loop‑depth and static complexity estimation
  • Optional empirical scaling tests
  • Metadata‑driven algorithmic recommendations
  • Fixer‑based prescriptive remediation

All methods are static and the class is stateless.

Source code in src/alnoms/core/analyzer.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
class ScriptAnalyzer:
    """Central orchestrator for the Alnoms governance pipeline.

    This class coordinates:

    - Script execution and dynamic profiling
    - Static AST pattern detection
    - Loop‑depth and static complexity estimation
    - Optional empirical scaling tests
    - Metadata‑driven algorithmic recommendations
    - Fixer‑based prescriptive remediation

    All methods are static and the class is stateless.
    """

    # ----------------------------------------------------------------------
    # LOOP DEPTH ANALYSIS
    # ----------------------------------------------------------------------
    @staticmethod
    def _get_loop_depth(node: ast.AST) -> int:
        """Recursively compute the maximum nesting depth of loops.

        Comprehensions (list, dict, set, generator) are ignored because they
        are optimized internally by CPython and do not represent explicit
        nested loops in the same semantic sense.

        Args:
            node (ast.AST): The AST node to inspect.

        Returns:
            int: Maximum loop nesting depth. Returns 0 if no loops are found.
        """
        if isinstance(
            node, (ast.ListComp, ast.DictComp, ast.SetComp, ast.GeneratorExp)
        ):
            return 0

        if not isinstance(node, (ast.For, ast.While)):
            return 0

        max_child = 0
        for child in getattr(node, "body", []):
            max_child = max(max_child, ScriptAnalyzer._get_loop_depth(child))

        return 1 + max_child

    @staticmethod
    def _find_target_loop_node(tree: ast.AST, lineno: int) -> Optional[ast.AST]:
        """Locate the loop node closest to a given line number.

        This is a Python‑version‑safe method that does not rely on `end_lineno`.
        It finds the deepest loop whose starting line is less than or equal to
        the pattern's line number.

        Args:
            tree (ast.AST): Parsed AST of the entire file.
            lineno (int): Line number associated with a detected pattern.

        Returns:
            Optional[ast.AST]: The best matching loop node, or None.
        """
        best_match = None
        for node in ast.walk(tree):
            if isinstance(node, (ast.For, ast.While)) and hasattr(node, "lineno"):
                if node.lineno <= lineno:
                    if best_match is None or node.lineno > best_match.lineno:
                        best_match = node
        return best_match

    # ----------------------------------------------------------------------
    # SCRIPT EXECUTION & PROFILING
    # ----------------------------------------------------------------------
    @staticmethod
    def run_script(path: str):
        """Execute a Python script in an isolated module namespace.

        Args:
            path (str): Path to the Python script.

        Returns:
            module: The executed module object.
        """
        spec = importlib.util.spec_from_file_location("__main__", path)
        module = importlib.util.module_from_spec(spec)
        sys.modules["__main__"] = module
        spec.loader.exec_module(module)
        return module

    @staticmethod
    def profile_script(path: str):
        """Profile a script and extract the top slowest developer functions.

        Uses `cProfile` to gather cumulative execution time and filters out
        non‑user code.

        Args:
            path (str): Path to the Python script.

        Returns:
            tuple: A tuple containing:
                - list: Top 5 slowest functions with timing info.
                - float: Total cumulative execution time.
                - module: The executed module object.
        """
        pr = cProfile.Profile()
        pr.enable()
        module = ScriptAnalyzer.run_script(path)
        pr.disable()

        s = io.StringIO()
        ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
        stats = ps.stats

        results = []
        total_time = sum([v[3] for v in stats.values()])
        target_filename = os.path.basename(path)

        for func, stat in stats.items():
            filename, lineno, funcname = func
            cumtime = stat[3]

            if target_filename not in filename:
                continue
            if funcname.startswith("<") and funcname.endswith(">"):
                continue

            results.append(
                {
                    "function": funcname,
                    "time": round(cumtime, 5),
                    "percent": round((cumtime / total_time) * 100, 2)
                    if total_time
                    else 0,
                }
            )

        results.sort(key=lambda x: x["time"], reverse=True)
        return results[:5], total_time, module

    # ----------------------------------------------------------------------
    # AST EXTRACTION
    # ----------------------------------------------------------------------
    @staticmethod
    def _get_function_ast(tree: ast.AST, func_name: str) -> Optional[ast.AST]:
        """Extract AST node for a given function name."""
        for node in ast.walk(tree):
            if isinstance(node, ast.FunctionDef) and node.name == func_name:
                return node
        return None

    # ----------------------------------------------------------------------
    # EMPIRICAL SCALING TESTS
    # ----------------------------------------------------------------------
    @staticmethod
    def run_empirical_test(
        module: Any,
        slowest_func_name: str,
        gen_name: str = None,
        data_file: str = None,
        start_n: int = 50,
        rounds: int = 3,
        func_ast: Optional[ast.AST] = None,  # ✅ ADDED
    ) -> Optional[List[Dict[str, Any]]]:
        """Run empirical doubling tests on a target function.

        Input data can come from:

        - A script-defined `data_gen()`
        - A standard generator in `alnoms.core.generators`
        - A data file loaded via `DataReader`

        Args:
            module (Any): The executed script module.
            slowest_func_name (str): Function selected for empirical testing.
            gen_name (str, optional): Name of a standard generator.
            data_file (str, optional): Path to a data file.
            start_n (int): Initial input size.
            rounds (int): Number of doubling rounds.

        Returns:
            Optional[List[Dict[str, Any]]]: Empirical results or None.
        """
        input_gen = None

        # File-based generator
        if data_file:
            try:
                file_data = std_io.read_all_ints(data_file)
            except ValueError:
                file_data = std_io.read_lines(data_file)

            def input_gen(n):
                if isinstance(n, (list, tuple)):
                    n = len(n)
                return (file_data[:n],)

        # Standard generator
        elif gen_name:
            raw_gen = getattr(std_gen, gen_name, None)

            def input_gen(n):
                if isinstance(n, (list, tuple)):
                    n = len(n)
                res = raw_gen(n)
                return res if isinstance(res, tuple) else (res,)

        # Script-defined generator OR AutoGen fallback
        else:
            if hasattr(module, "data_gen"):
                raw_gen = module.data_gen

                def input_gen(n):
                    # If n is a list or tuple, convert to its length
                    if isinstance(n, (list, tuple)):
                        n = len(n)

                    out = raw_gen(n)

                    # Always return a tuple
                    return out if isinstance(out, tuple) else (out,)
            else:
                # ✅ SAFE AUTOGEN FALLBACK
                if func_ast:
                    pattern = AutoGen._classify(func_ast)

                    def input_gen(n):
                        if isinstance(n, (list, tuple)):
                            n = len(n)
                        try:
                            samples = AutoGen.generate(pattern, n)
                            return samples

                        except Exception:
                            pass

                        # 🔥 HARD FALLBACK (guaranteed to work)
                        return ("a" * n,)
                else:
                    # 🔥 LAST RESORT FALLBACK
                    def input_gen(n):
                        if isinstance(n, (list, tuple)):
                            n = len(n)
                        return ("a" * n,)

        if not input_gen:
            return None

        # Detect config overrides
        sample_data = input_gen(start_n)
        config = sample_data if isinstance(sample_data, dict) else {}
        final_start_n = config.get("start_n", start_n)
        final_rounds = config.get("rounds", rounds)

        # Determine target function
        if isinstance(sample_data, dict) and "target" in sample_data:
            target_name = sample_data["target"]

            def effective_gen(n):
                return input_gen(n)["args"]
        else:
            target_name = slowest_func_name

            def effective_gen(n):
                data = input_gen(n)
                return data if isinstance(data, tuple) else (data,)

        target_func = getattr(module, target_name, None)
        if not target_func:
            return None

        # Validate argument count
        sig = inspect.signature(target_func)
        required_params = [
            p
            for p in sig.parameters.values()
            if p.default == p.empty and p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
        ]

        test_args = effective_gen(final_start_n)
        args_count = len(test_args) if isinstance(test_args, tuple) else 1

        if args_count < len(required_params):
            return None

        prof = Profiler(repeats=3, warmup=1, mode="min")
        try:
            return prof.run_doubling_test(
                target_func, effective_gen, start_n=final_start_n, rounds=final_rounds
            )
        except Exception:
            # If empirical scaling fails (signature mismatch, bad generator, etc.),
            # degrade gracefully and skip empirical results.
            return None

    # ----------------------------------------------------------------------
    # FULL PIPELINE ORCHESTRATION
    # ----------------------------------------------------------------------
    @staticmethod
    def analyze_file(
        path: str,
        deep: bool = False,
        target_override: str = None,
        gen_name: str = None,
        data_file: str = None,
        start_n: int = 50,
        rounds: int = 3,
    ) -> dict:
        """Perform full governance analysis on a Python script.

        Pipeline:
            1. Execute + profile the script
            2. Run static AST pattern detection
            3. Compute loop depth and static complexity
            4. Optionally run empirical scaling tests
            5. Integrate DecisionEngine metadata
            6. Integrate Fixers for prescriptive remediation
            7. Produce a unified governance report

        Args:
            path (str): Path to the Python script.
            deep (bool): Whether to run empirical scaling tests.
            target_override (str, optional): Explicit function name for empirical tests.
            gen_name (str, optional): Name of a standard generator.
            data_file (str, optional): Path to a data file.
            start_n (int): Initial input size for empirical tests.
            rounds (int): Number of doubling rounds.

        Returns:
            dict: A complete governance analysis report.
        """
        # 1. Profile and Execute
        profile_results, total_time, module = ScriptAnalyzer.profile_script(path)

        # 2. Static Analysis
        raw_patterns = analyze_code(path)
        with open(path, "r", encoding="utf-8") as f:
            full_tree = ast.parse(f.read())
            # Attach parent pointers for AST classification
            for node in ast.walk(full_tree):
                for child in ast.iter_child_nodes(node):
                    child.parent = node

        empirical_results = None
        slowest_func_name = None
        for entry in profile_results:
            if entry["function"] != "data_gen":
                slowest_func_name = entry["function"]
                break
        if not slowest_func_name:
            for node in ast.walk(full_tree):
                if isinstance(node, ast.FunctionDef) and node.name != "data_gen":
                    slowest_func_name = node.name
                    break
        empirical_target = target_override or slowest_func_name
        print("DEBUG deep =", deep)
        print("DEBUG empirical_target =", empirical_target)

        func_ast = ScriptAnalyzer._get_function_ast(full_tree, empirical_target)

        if deep and empirical_target:
            empirical_results = ScriptAnalyzer.run_empirical_test(
                module,
                empirical_target,
                gen_name,
                data_file,
                start_n,
                rounds,
                func_ast=func_ast,
            )

        # 3. Decision Engine
        engine = DecisionEngine(MetadataRegistry.get_all())
        aggregated_findings = {}

        detected_complexity = (
            empirical_results[-1].get("Complexity", "Unknown")
            if empirical_results
            else "Unknown"
        )

        # 4. Remediation Orchestration
        for finding in raw_patterns:
            func = finding.get("function", "global")
            pid = finding.get("pattern_id", "unknown")
            line_no = finding.get("line")

            key = (func, pid)

            # Aggregate occurrences
            if key not in aggregated_findings:
                aggregated_findings[key] = finding
                finding["occurrence_count"] = 1
                finding["occurrence_lines"] = [line_no]
            else:
                aggregated_findings[key]["occurrence_count"] += 1
                aggregated_findings[key]["occurrence_lines"].append(line_no)
                continue

            # Static loop depth
            static_depth = 1
            if pid == "nested_loops":
                target_node = ScriptAnalyzer._find_target_loop_node(full_tree, line_no)
                if target_node:
                    static_depth = ScriptAnalyzer._get_loop_depth(target_node)

            finding["loop_depth"] = static_depth

            # Static vs empirical complexity
            finding["static_complexity"] = (
                f"O(N^{static_depth})" if pid == "nested_loops" else None
            )
            finding["empirical_complexity"] = detected_complexity

            # Decision Engine metadata
            is_cubic = (
                pid == "nested_loops" and static_depth >= 3
            ) or detected_complexity == "O(N^3)"

            if not is_cubic:
                recommended_algo = engine.decide_algorithm(pid)
                if recommended_algo:
                    finding["dsa_meta"] = engine.decide_metadata(recommended_algo)
            else:
                finding["dsa_meta"] = None
                finding["is_domain_override"] = True

            # Fixer integration
            fixer = get_fixer(pid)
            if fixer:
                finding["cure_type"] = fixer.cure_type()
                finding["explanation"] = fixer.explain(finding, detected_complexity)
                finding["cost_estimate"] = fixer.cost_estimate(
                    finding, detected_complexity
                )
                finding["snippets"] = fixer.snippet_before_after(
                    finding, detected_complexity
                )

        return {
            "file": path,
            "profile": profile_results,
            "patterns": list(aggregated_findings.values()),
            "total_time": round(total_time, 4),
            "empirical": empirical_results,
            "empirical_target": empirical_target,
            "meta": {
                "version": "0.1.3",
                "timestamp": datetime.now(timezone.utc)
                .isoformat()
                .replace("+00:00", "Z"),
            },
        }

analyze_file(path, deep=False, target_override=None, gen_name=None, data_file=None, start_n=50, rounds=3) staticmethod

Perform full governance analysis on a Python script.

Pipeline
  1. Execute + profile the script
  2. Run static AST pattern detection
  3. Compute loop depth and static complexity
  4. Optionally run empirical scaling tests
  5. Integrate DecisionEngine metadata
  6. Integrate Fixers for prescriptive remediation
  7. Produce a unified governance report

Parameters:

Name Type Description Default
path str

Path to the Python script.

required
deep bool

Whether to run empirical scaling tests.

False
target_override str

Explicit function name for empirical tests.

None
gen_name str

Name of a standard generator.

None
data_file str

Path to a data file.

None
start_n int

Initial input size for empirical tests.

50
rounds int

Number of doubling rounds.

3

Returns:

Name Type Description
dict dict

A complete governance analysis report.

Source code in src/alnoms/core/analyzer.py
@staticmethod
def analyze_file(
    path: str,
    deep: bool = False,
    target_override: str = None,
    gen_name: str = None,
    data_file: str = None,
    start_n: int = 50,
    rounds: int = 3,
) -> dict:
    """Perform full governance analysis on a Python script.

    Pipeline:
        1. Execute + profile the script
        2. Run static AST pattern detection
        3. Compute loop depth and static complexity
        4. Optionally run empirical scaling tests
        5. Integrate DecisionEngine metadata
        6. Integrate Fixers for prescriptive remediation
        7. Produce a unified governance report

    Args:
        path (str): Path to the Python script.
        deep (bool): Whether to run empirical scaling tests.
        target_override (str, optional): Explicit function name for empirical tests.
        gen_name (str, optional): Name of a standard generator.
        data_file (str, optional): Path to a data file.
        start_n (int): Initial input size for empirical tests.
        rounds (int): Number of doubling rounds.

    Returns:
        dict: A complete governance analysis report.
    """
    # 1. Profile and Execute
    profile_results, total_time, module = ScriptAnalyzer.profile_script(path)

    # 2. Static Analysis
    raw_patterns = analyze_code(path)
    with open(path, "r", encoding="utf-8") as f:
        full_tree = ast.parse(f.read())
        # Attach parent pointers for AST classification
        for node in ast.walk(full_tree):
            for child in ast.iter_child_nodes(node):
                child.parent = node

    empirical_results = None
    slowest_func_name = None
    for entry in profile_results:
        if entry["function"] != "data_gen":
            slowest_func_name = entry["function"]
            break
    if not slowest_func_name:
        for node in ast.walk(full_tree):
            if isinstance(node, ast.FunctionDef) and node.name != "data_gen":
                slowest_func_name = node.name
                break
    empirical_target = target_override or slowest_func_name
    print("DEBUG deep =", deep)
    print("DEBUG empirical_target =", empirical_target)

    func_ast = ScriptAnalyzer._get_function_ast(full_tree, empirical_target)

    if deep and empirical_target:
        empirical_results = ScriptAnalyzer.run_empirical_test(
            module,
            empirical_target,
            gen_name,
            data_file,
            start_n,
            rounds,
            func_ast=func_ast,
        )

    # 3. Decision Engine
    engine = DecisionEngine(MetadataRegistry.get_all())
    aggregated_findings = {}

    detected_complexity = (
        empirical_results[-1].get("Complexity", "Unknown")
        if empirical_results
        else "Unknown"
    )

    # 4. Remediation Orchestration
    for finding in raw_patterns:
        func = finding.get("function", "global")
        pid = finding.get("pattern_id", "unknown")
        line_no = finding.get("line")

        key = (func, pid)

        # Aggregate occurrences
        if key not in aggregated_findings:
            aggregated_findings[key] = finding
            finding["occurrence_count"] = 1
            finding["occurrence_lines"] = [line_no]
        else:
            aggregated_findings[key]["occurrence_count"] += 1
            aggregated_findings[key]["occurrence_lines"].append(line_no)
            continue

        # Static loop depth
        static_depth = 1
        if pid == "nested_loops":
            target_node = ScriptAnalyzer._find_target_loop_node(full_tree, line_no)
            if target_node:
                static_depth = ScriptAnalyzer._get_loop_depth(target_node)

        finding["loop_depth"] = static_depth

        # Static vs empirical complexity
        finding["static_complexity"] = (
            f"O(N^{static_depth})" if pid == "nested_loops" else None
        )
        finding["empirical_complexity"] = detected_complexity

        # Decision Engine metadata
        is_cubic = (
            pid == "nested_loops" and static_depth >= 3
        ) or detected_complexity == "O(N^3)"

        if not is_cubic:
            recommended_algo = engine.decide_algorithm(pid)
            if recommended_algo:
                finding["dsa_meta"] = engine.decide_metadata(recommended_algo)
        else:
            finding["dsa_meta"] = None
            finding["is_domain_override"] = True

        # Fixer integration
        fixer = get_fixer(pid)
        if fixer:
            finding["cure_type"] = fixer.cure_type()
            finding["explanation"] = fixer.explain(finding, detected_complexity)
            finding["cost_estimate"] = fixer.cost_estimate(
                finding, detected_complexity
            )
            finding["snippets"] = fixer.snippet_before_after(
                finding, detected_complexity
            )

    return {
        "file": path,
        "profile": profile_results,
        "patterns": list(aggregated_findings.values()),
        "total_time": round(total_time, 4),
        "empirical": empirical_results,
        "empirical_target": empirical_target,
        "meta": {
            "version": "0.1.3",
            "timestamp": datetime.now(timezone.utc)
            .isoformat()
            .replace("+00:00", "Z"),
        },
    }

profile_script(path) staticmethod

Profile a script and extract the top slowest developer functions.

Uses cProfile to gather cumulative execution time and filters out non‑user code.

Parameters:

Name Type Description Default
path str

Path to the Python script.

required

Returns:

Name Type Description
tuple

A tuple containing: - list: Top 5 slowest functions with timing info. - float: Total cumulative execution time. - module: The executed module object.

Source code in src/alnoms/core/analyzer.py
@staticmethod
def profile_script(path: str):
    """Profile a script and extract the top slowest developer functions.

    Uses `cProfile` to gather cumulative execution time and filters out
    non‑user code.

    Args:
        path (str): Path to the Python script.

    Returns:
        tuple: A tuple containing:
            - list: Top 5 slowest functions with timing info.
            - float: Total cumulative execution time.
            - module: The executed module object.
    """
    pr = cProfile.Profile()
    pr.enable()
    module = ScriptAnalyzer.run_script(path)
    pr.disable()

    s = io.StringIO()
    ps = pstats.Stats(pr, stream=s).sort_stats("cumulative")
    stats = ps.stats

    results = []
    total_time = sum([v[3] for v in stats.values()])
    target_filename = os.path.basename(path)

    for func, stat in stats.items():
        filename, lineno, funcname = func
        cumtime = stat[3]

        if target_filename not in filename:
            continue
        if funcname.startswith("<") and funcname.endswith(">"):
            continue

        results.append(
            {
                "function": funcname,
                "time": round(cumtime, 5),
                "percent": round((cumtime / total_time) * 100, 2)
                if total_time
                else 0,
            }
        )

    results.sort(key=lambda x: x["time"], reverse=True)
    return results[:5], total_time, module

run_empirical_test(module, slowest_func_name, gen_name=None, data_file=None, start_n=50, rounds=3, func_ast=None) staticmethod

Run empirical doubling tests on a target function.

Input data can come from:

  • A script-defined data_gen()
  • A standard generator in alnoms.core.generators
  • A data file loaded via DataReader

Parameters:

Name Type Description Default
module Any

The executed script module.

required
slowest_func_name str

Function selected for empirical testing.

required
gen_name str

Name of a standard generator.

None
data_file str

Path to a data file.

None
start_n int

Initial input size.

50
rounds int

Number of doubling rounds.

3

Returns:

Type Description
Optional[List[Dict[str, Any]]]

Optional[List[Dict[str, Any]]]: Empirical results or None.

Source code in src/alnoms/core/analyzer.py
@staticmethod
def run_empirical_test(
    module: Any,
    slowest_func_name: str,
    gen_name: str = None,
    data_file: str = None,
    start_n: int = 50,
    rounds: int = 3,
    func_ast: Optional[ast.AST] = None,  # ✅ ADDED
) -> Optional[List[Dict[str, Any]]]:
    """Run empirical doubling tests on a target function.

    Input data can come from:

    - A script-defined `data_gen()`
    - A standard generator in `alnoms.core.generators`
    - A data file loaded via `DataReader`

    Args:
        module (Any): The executed script module.
        slowest_func_name (str): Function selected for empirical testing.
        gen_name (str, optional): Name of a standard generator.
        data_file (str, optional): Path to a data file.
        start_n (int): Initial input size.
        rounds (int): Number of doubling rounds.

    Returns:
        Optional[List[Dict[str, Any]]]: Empirical results or None.
    """
    input_gen = None

    # File-based generator
    if data_file:
        try:
            file_data = std_io.read_all_ints(data_file)
        except ValueError:
            file_data = std_io.read_lines(data_file)

        def input_gen(n):
            if isinstance(n, (list, tuple)):
                n = len(n)
            return (file_data[:n],)

    # Standard generator
    elif gen_name:
        raw_gen = getattr(std_gen, gen_name, None)

        def input_gen(n):
            if isinstance(n, (list, tuple)):
                n = len(n)
            res = raw_gen(n)
            return res if isinstance(res, tuple) else (res,)

    # Script-defined generator OR AutoGen fallback
    else:
        if hasattr(module, "data_gen"):
            raw_gen = module.data_gen

            def input_gen(n):
                # If n is a list or tuple, convert to its length
                if isinstance(n, (list, tuple)):
                    n = len(n)

                out = raw_gen(n)

                # Always return a tuple
                return out if isinstance(out, tuple) else (out,)
        else:
            # ✅ SAFE AUTOGEN FALLBACK
            if func_ast:
                pattern = AutoGen._classify(func_ast)

                def input_gen(n):
                    if isinstance(n, (list, tuple)):
                        n = len(n)
                    try:
                        samples = AutoGen.generate(pattern, n)
                        return samples

                    except Exception:
                        pass

                    # 🔥 HARD FALLBACK (guaranteed to work)
                    return ("a" * n,)
            else:
                # 🔥 LAST RESORT FALLBACK
                def input_gen(n):
                    if isinstance(n, (list, tuple)):
                        n = len(n)
                    return ("a" * n,)

    if not input_gen:
        return None

    # Detect config overrides
    sample_data = input_gen(start_n)
    config = sample_data if isinstance(sample_data, dict) else {}
    final_start_n = config.get("start_n", start_n)
    final_rounds = config.get("rounds", rounds)

    # Determine target function
    if isinstance(sample_data, dict) and "target" in sample_data:
        target_name = sample_data["target"]

        def effective_gen(n):
            return input_gen(n)["args"]
    else:
        target_name = slowest_func_name

        def effective_gen(n):
            data = input_gen(n)
            return data if isinstance(data, tuple) else (data,)

    target_func = getattr(module, target_name, None)
    if not target_func:
        return None

    # Validate argument count
    sig = inspect.signature(target_func)
    required_params = [
        p
        for p in sig.parameters.values()
        if p.default == p.empty and p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
    ]

    test_args = effective_gen(final_start_n)
    args_count = len(test_args) if isinstance(test_args, tuple) else 1

    if args_count < len(required_params):
        return None

    prof = Profiler(repeats=3, warmup=1, mode="min")
    try:
        return prof.run_doubling_test(
            target_func, effective_gen, start_n=final_start_n, rounds=final_rounds
        )
    except Exception:
        # If empirical scaling fails (signature mismatch, bad generator, etc.),
        # degrade gracefully and skip empirical results.
        return None

run_script(path) staticmethod

Execute a Python script in an isolated module namespace.

Parameters:

Name Type Description Default
path str

Path to the Python script.

required

Returns:

Name Type Description
module

The executed module object.

Source code in src/alnoms/core/analyzer.py
@staticmethod
def run_script(path: str):
    """Execute a Python script in an isolated module namespace.

    Args:
        path (str): Path to the Python script.

    Returns:
        module: The executed module object.
    """
    spec = importlib.util.spec_from_file_location("__main__", path)
    module = importlib.util.module_from_spec(spec)
    sys.modules["__main__"] = module
    spec.loader.exec_module(module)
    return module

Deterministic rule‑based mapping for OSS‑tier algorithm selection.

The DecisionEngine provides a stable, non‑adaptive mapping from detected performance patterns to recommended data‑structure or algorithmic remedies. All identifiers returned by this engine use snake_case to satisfy OSS‑tier test and governance requirements.

Metadata lookup is also performed using snake_case keys, matching the canonical identifiers stored in the MetadataRegistry.

Source code in src/alnoms/core/decision_engine.py
class DecisionEngine:
    """Deterministic rule‑based mapping for OSS‑tier algorithm selection.

    The DecisionEngine provides a stable, non‑adaptive mapping from detected
    performance patterns to recommended data‑structure or algorithmic
    remedies. All identifiers returned by this engine use **snake_case**
    to satisfy OSS‑tier test and governance requirements.

    Metadata lookup is also performed using snake_case keys, matching the
    canonical identifiers stored in the MetadataRegistry.
    """

    def __init__(self, metadata: Dict[str, dict]):
        """Initialize the decision engine with metadata.

        Args:
            metadata (Dict[str, dict]):
                Mapping of snake_case algorithm identifiers to metadata
                dictionaries. Each metadata entry typically includes
                complexity, category, tier, and module import path.
        """
        self.metadata = metadata

        # Base rules for non‑nested‑loop patterns (snake_case outward)
        self.rule_map = {
            "inefficient_membership": "separate_chaining_hash_st",
            "redundant_sort": "merge_sort",
            "inplace_concat": "list_concat",
            "expensive_calls": "memoization",
            "high_freq_io": "buffered_io",
        }

        # Intent‑aware rules for nested loops (snake_case outward)
        self.nested_loop_rules = {
            "membership": "separate_chaining_hash_st",
            "sorting": "merge_sort",
            "dfs": "graph_traversal",
            "generic": "pruning",
        }

    def decide_algorithm(
        self, pattern: str, intent: Optional[str] = None
    ) -> Optional[str]:
        """Return the recommended algorithm identifier (snake_case).

        Args:
            pattern (str):
                Detected performance pattern identifier.
            intent (Optional[str]):
                Developer intent extracted from AST heuristics. Relevant only
                for nested‑loop patterns. Examples include:
                `"membership"`, `"sorting"`, `"dfs"`, `"generic"`.

        Returns:
            Optional[str]:
                Snake_case algorithm identifier, or None if no mapping exists.
        """
        if pattern == "nested_loops":
            if intent:
                return self.nested_loop_rules.get(intent, "pruning")
            return "pruning"

        return self.rule_map.get(pattern)

    def decide_metadata(self, algorithm: str) -> Optional[dict]:
        """Retrieve metadata for a recommended algorithm.

        Args:
            algorithm (str):
                Snake_case algorithm identifier returned by `decide_algorithm`.
                If a caller passes a non‑canonical identifier, it is normalized
                to snake_case before lookup.

        Returns:
            Optional[dict]:
                Metadata dictionary for the algorithm, or None if not found.
        """
        algo_key = algorithm.lower()

        # Normalize PascalCase → snake_case if needed
        if algo_key not in self.metadata:
            # Example: "MergeSort" → "merge_sort"
            normalized = []
            for c in algorithm:
                if c.isupper() and normalized:
                    normalized.append("_")
                normalized.append(c.lower())
            algo_key = "".join(normalized)

        return self.metadata.get(algo_key)

    def decide_fix(self, pattern: str, intent: Optional[str] = None) -> Optional[str]:
        """Return a human‑readable fix recommendation.

        Args:
            pattern (str):
                Detected performance pattern.
            intent (Optional[str]):
                Developer intent for nested loops.

        Returns:
            Optional[str]:
                Short prescriptive recommendation string, or None.
        """
        algo = self.decide_algorithm(pattern, intent)
        if algo:
            return f"Use {algo} to reduce complexity."
        return None

    def decide(self, pattern: str, intent: Optional[str] = None) -> Optional[str]:
        """Primary OSS entrypoint for algorithm selection.

        Args:
            pattern (str):
                Detected performance pattern.
            intent (Optional[str]):
                Developer intent for nested loops.

        Returns:
            Optional[str]:
                Snake_case recommended algorithm identifier.
        """
        return self.decide_algorithm(pattern, intent)

__init__(metadata)

Initialize the decision engine with metadata.

Parameters:

Name Type Description Default
metadata Dict[str, dict]

Mapping of snake_case algorithm identifiers to metadata dictionaries. Each metadata entry typically includes complexity, category, tier, and module import path.

required
Source code in src/alnoms/core/decision_engine.py
def __init__(self, metadata: Dict[str, dict]):
    """Initialize the decision engine with metadata.

    Args:
        metadata (Dict[str, dict]):
            Mapping of snake_case algorithm identifiers to metadata
            dictionaries. Each metadata entry typically includes
            complexity, category, tier, and module import path.
    """
    self.metadata = metadata

    # Base rules for non‑nested‑loop patterns (snake_case outward)
    self.rule_map = {
        "inefficient_membership": "separate_chaining_hash_st",
        "redundant_sort": "merge_sort",
        "inplace_concat": "list_concat",
        "expensive_calls": "memoization",
        "high_freq_io": "buffered_io",
    }

    # Intent‑aware rules for nested loops (snake_case outward)
    self.nested_loop_rules = {
        "membership": "separate_chaining_hash_st",
        "sorting": "merge_sort",
        "dfs": "graph_traversal",
        "generic": "pruning",
    }

decide(pattern, intent=None)

Primary OSS entrypoint for algorithm selection.

Parameters:

Name Type Description Default
pattern str

Detected performance pattern.

required
intent Optional[str]

Developer intent for nested loops.

None

Returns:

Type Description
Optional[str]

Optional[str]: Snake_case recommended algorithm identifier.

Source code in src/alnoms/core/decision_engine.py
def decide(self, pattern: str, intent: Optional[str] = None) -> Optional[str]:
    """Primary OSS entrypoint for algorithm selection.

    Args:
        pattern (str):
            Detected performance pattern.
        intent (Optional[str]):
            Developer intent for nested loops.

    Returns:
        Optional[str]:
            Snake_case recommended algorithm identifier.
    """
    return self.decide_algorithm(pattern, intent)

decide_algorithm(pattern, intent=None)

Return the recommended algorithm identifier (snake_case).

Parameters:

Name Type Description Default
pattern str

Detected performance pattern identifier.

required
intent Optional[str]

Developer intent extracted from AST heuristics. Relevant only for nested‑loop patterns. Examples include: "membership", "sorting", "dfs", "generic".

None

Returns:

Type Description
Optional[str]

Optional[str]: Snake_case algorithm identifier, or None if no mapping exists.

Source code in src/alnoms/core/decision_engine.py
def decide_algorithm(
    self, pattern: str, intent: Optional[str] = None
) -> Optional[str]:
    """Return the recommended algorithm identifier (snake_case).

    Args:
        pattern (str):
            Detected performance pattern identifier.
        intent (Optional[str]):
            Developer intent extracted from AST heuristics. Relevant only
            for nested‑loop patterns. Examples include:
            `"membership"`, `"sorting"`, `"dfs"`, `"generic"`.

    Returns:
        Optional[str]:
            Snake_case algorithm identifier, or None if no mapping exists.
    """
    if pattern == "nested_loops":
        if intent:
            return self.nested_loop_rules.get(intent, "pruning")
        return "pruning"

    return self.rule_map.get(pattern)

decide_fix(pattern, intent=None)

Return a human‑readable fix recommendation.

Parameters:

Name Type Description Default
pattern str

Detected performance pattern.

required
intent Optional[str]

Developer intent for nested loops.

None

Returns:

Type Description
Optional[str]

Optional[str]: Short prescriptive recommendation string, or None.

Source code in src/alnoms/core/decision_engine.py
def decide_fix(self, pattern: str, intent: Optional[str] = None) -> Optional[str]:
    """Return a human‑readable fix recommendation.

    Args:
        pattern (str):
            Detected performance pattern.
        intent (Optional[str]):
            Developer intent for nested loops.

    Returns:
        Optional[str]:
            Short prescriptive recommendation string, or None.
    """
    algo = self.decide_algorithm(pattern, intent)
    if algo:
        return f"Use {algo} to reduce complexity."
    return None

decide_metadata(algorithm)

Retrieve metadata for a recommended algorithm.

Parameters:

Name Type Description Default
algorithm str

Snake_case algorithm identifier returned by decide_algorithm. If a caller passes a non‑canonical identifier, it is normalized to snake_case before lookup.

required

Returns:

Type Description
Optional[dict]

Optional[dict]: Metadata dictionary for the algorithm, or None if not found.

Source code in src/alnoms/core/decision_engine.py
def decide_metadata(self, algorithm: str) -> Optional[dict]:
    """Retrieve metadata for a recommended algorithm.

    Args:
        algorithm (str):
            Snake_case algorithm identifier returned by `decide_algorithm`.
            If a caller passes a non‑canonical identifier, it is normalized
            to snake_case before lookup.

    Returns:
        Optional[dict]:
            Metadata dictionary for the algorithm, or None if not found.
    """
    algo_key = algorithm.lower()

    # Normalize PascalCase → snake_case if needed
    if algo_key not in self.metadata:
        # Example: "MergeSort" → "merge_sort"
        normalized = []
        for c in algorithm:
            if c.isupper() and normalized:
                normalized.append("_")
            normalized.append(c.lower())
        algo_key = "".join(normalized)

    return self.metadata.get(algo_key)

🎲 Data Generators & I/O

Collection of deterministic and high‑performance dataset generators.

These generators are used throughout the Alnoms ecosystem for:

  • Algorithm benchmarking
  • Worst‑case and best‑case scenario construction
  • Empirical scaling tests (doubling tests)
  • Teaching and demonstration notebooks
  • Reproducible research workflows

All methods are static and side‑effect‑free.

Source code in src/alnoms/core/generators.py
class DataGenerator:
    """Collection of deterministic and high‑performance dataset generators.

    These generators are used throughout the Alnoms ecosystem for:

    - Algorithm benchmarking
    - Worst‑case and best‑case scenario construction
    - Empirical scaling tests (doubling tests)
    - Teaching and demonstration notebooks
    - Reproducible research workflows

    All methods are static and side‑effect‑free.
    """

    @staticmethod
    def random_array(n: int, lo: int = 0, hi: int = 1000) -> List[int]:
        """Generate an array of random integers.

        This is the default dependency‑free generator used across the OSS tier.
        It relies solely on Python's built‑in `random` module and is suitable
        for lightweight benchmarking or environments where NumPy is unavailable.

        Args:
            n (int): Number of integers to generate.
            lo (int): Lower bound of the random range (inclusive).
            hi (int): Upper bound of the random range (inclusive).

        Returns:
            List[int]: A list of `n` random integers.
        """
        return [random.randint(lo, hi) for _ in range(n)]

    @staticmethod
    def sorted_array(n: int, reverse: bool = False) -> List[int]:
        """Generate a sorted array of integers from 0 to n‑1.

        Useful for constructing best‑case or worst‑case inputs for sorting
        algorithms and search routines.

        Args:
            n (int): Number of elements to generate.
            reverse (bool): If True, return the array in descending order.

        Returns:
            List[int]: A sorted list of integers.
        """
        arr = list(range(n))
        if reverse:
            arr.reverse()
        return arr

    @staticmethod
    def reverse_sorted_array(n: int) -> List[int]:
        """Generate a descending array from n‑1 to 0.

        This is a convenience wrapper around `sorted_array(reverse=True)` and
        is frequently used to construct worst‑case inputs for algorithms such
        as insertion sort or bubble sort.

        Args:
            n (int): Number of elements to generate.

        Returns:
            List[int]: A descending list of integers.
        """
        return DataGenerator.sorted_array(n, reverse=True)

    @staticmethod
    def large_scale_dataset(n: int) -> List[int]:
        """Generate a large dataset optimized for high‑volume research.

        Attempts to use NumPy for high‑throughput integer generation. If NumPy
        is unavailable, falls back to the pure‑Python `random_array` generator.

        Args:
            n (int): Number of integers to generate.

        Returns:
            List[int]: A list of random integers suitable for large‑scale tests.
        """
        try:
            import numpy as np

            return np.random.randint(0, 1000, n).tolist()  # pragma: no cover
        except ImportError:
            return DataGenerator.random_array(n)

    @staticmethod
    def square_matrices(n: int) -> tuple:
        """Generate a pair of N×N matrices filled with constant values.

        Designed for benchmarking matrix multiplication algorithms where the
        computational complexity—not the numerical values—is the primary focus.

        Complexity:
            - Time: O(N²) to initialize both matrices.
            - Space: O(N²) for storage.

        Args:
            n (int): Dimension of each square matrix.

        Returns:
            tuple: A tuple `(matrix_a, matrix_b)` where:
                - `matrix_a` is filled with 1s
                - `matrix_b` is filled with 2s
        """
        matrix_a = [[1 for _ in range(n)] for _ in range(n)]
        matrix_b = [[2 for _ in range(n)] for _ in range(n)]
        return (matrix_a, matrix_b)

    @staticmethod
    def random_string(n: int, alphabet="abcdefghijklmnopqrstuvwxyz") -> str:
        return "".join(random.choice(alphabet) for _ in range(n))

large_scale_dataset(n) staticmethod

Generate a large dataset optimized for high‑volume research.

Attempts to use NumPy for high‑throughput integer generation. If NumPy is unavailable, falls back to the pure‑Python random_array generator.

Parameters:

Name Type Description Default
n int

Number of integers to generate.

required

Returns:

Type Description
List[int]

List[int]: A list of random integers suitable for large‑scale tests.

Source code in src/alnoms/core/generators.py
@staticmethod
def large_scale_dataset(n: int) -> List[int]:
    """Generate a large dataset optimized for high‑volume research.

    Attempts to use NumPy for high‑throughput integer generation. If NumPy
    is unavailable, falls back to the pure‑Python `random_array` generator.

    Args:
        n (int): Number of integers to generate.

    Returns:
        List[int]: A list of random integers suitable for large‑scale tests.
    """
    try:
        import numpy as np

        return np.random.randint(0, 1000, n).tolist()  # pragma: no cover
    except ImportError:
        return DataGenerator.random_array(n)

random_array(n, lo=0, hi=1000) staticmethod

Generate an array of random integers.

This is the default dependency‑free generator used across the OSS tier. It relies solely on Python's built‑in random module and is suitable for lightweight benchmarking or environments where NumPy is unavailable.

Parameters:

Name Type Description Default
n int

Number of integers to generate.

required
lo int

Lower bound of the random range (inclusive).

0
hi int

Upper bound of the random range (inclusive).

1000

Returns:

Type Description
List[int]

List[int]: A list of n random integers.

Source code in src/alnoms/core/generators.py
@staticmethod
def random_array(n: int, lo: int = 0, hi: int = 1000) -> List[int]:
    """Generate an array of random integers.

    This is the default dependency‑free generator used across the OSS tier.
    It relies solely on Python's built‑in `random` module and is suitable
    for lightweight benchmarking or environments where NumPy is unavailable.

    Args:
        n (int): Number of integers to generate.
        lo (int): Lower bound of the random range (inclusive).
        hi (int): Upper bound of the random range (inclusive).

    Returns:
        List[int]: A list of `n` random integers.
    """
    return [random.randint(lo, hi) for _ in range(n)]

reverse_sorted_array(n) staticmethod

Generate a descending array from n‑1 to 0.

This is a convenience wrapper around sorted_array(reverse=True) and is frequently used to construct worst‑case inputs for algorithms such as insertion sort or bubble sort.

Parameters:

Name Type Description Default
n int

Number of elements to generate.

required

Returns:

Type Description
List[int]

List[int]: A descending list of integers.

Source code in src/alnoms/core/generators.py
@staticmethod
def reverse_sorted_array(n: int) -> List[int]:
    """Generate a descending array from n‑1 to 0.

    This is a convenience wrapper around `sorted_array(reverse=True)` and
    is frequently used to construct worst‑case inputs for algorithms such
    as insertion sort or bubble sort.

    Args:
        n (int): Number of elements to generate.

    Returns:
        List[int]: A descending list of integers.
    """
    return DataGenerator.sorted_array(n, reverse=True)

sorted_array(n, reverse=False) staticmethod

Generate a sorted array of integers from 0 to n‑1.

Useful for constructing best‑case or worst‑case inputs for sorting algorithms and search routines.

Parameters:

Name Type Description Default
n int

Number of elements to generate.

required
reverse bool

If True, return the array in descending order.

False

Returns:

Type Description
List[int]

List[int]: A sorted list of integers.

Source code in src/alnoms/core/generators.py
@staticmethod
def sorted_array(n: int, reverse: bool = False) -> List[int]:
    """Generate a sorted array of integers from 0 to n‑1.

    Useful for constructing best‑case or worst‑case inputs for sorting
    algorithms and search routines.

    Args:
        n (int): Number of elements to generate.
        reverse (bool): If True, return the array in descending order.

    Returns:
        List[int]: A sorted list of integers.
    """
    arr = list(range(n))
    if reverse:
        arr.reverse()
    return arr

square_matrices(n) staticmethod

Generate a pair of N×N matrices filled with constant values.

Designed for benchmarking matrix multiplication algorithms where the computational complexity—not the numerical values—is the primary focus.

Complexity
  • Time: O(N²) to initialize both matrices.
  • Space: O(N²) for storage.

Parameters:

Name Type Description Default
n int

Dimension of each square matrix.

required

Returns:

Name Type Description
tuple tuple

A tuple (matrix_a, matrix_b) where: - matrix_a is filled with 1s - matrix_b is filled with 2s

Source code in src/alnoms/core/generators.py
@staticmethod
def square_matrices(n: int) -> tuple:
    """Generate a pair of N×N matrices filled with constant values.

    Designed for benchmarking matrix multiplication algorithms where the
    computational complexity—not the numerical values—is the primary focus.

    Complexity:
        - Time: O(N²) to initialize both matrices.
        - Space: O(N²) for storage.

    Args:
        n (int): Dimension of each square matrix.

    Returns:
        tuple: A tuple `(matrix_a, matrix_b)` where:
            - `matrix_a` is filled with 1s
            - `matrix_b` is filled with 2s
    """
    matrix_a = [[1 for _ in range(n)] for _ in range(n)]
    matrix_b = [[2 for _ in range(n)] for _ in range(n)]
    return (matrix_a, matrix_b)

Utility functions for loading test datasets from files.

All methods are static and designed for predictable, dependency‑free behavior. They support common formats used in algorithm benchmarking, including whitespace‑separated integers, tokens, and raw lines.

Source code in src/alnoms/core/io.py
class DataReader:
    """Utility functions for loading test datasets from files.

    All methods are static and designed for predictable, dependency‑free
    behavior. They support common formats used in algorithm benchmarking,
    including whitespace‑separated integers, tokens, and raw lines.
    """

    @staticmethod
    def read_all_ints(path: str) -> List[int]:
        """Read all whitespace‑separated integers from a file.

        The file may contain integers separated by spaces, tabs, or newlines.
        This format is commonly used for sorting and searching benchmarks.

        Args:
            path (str): Absolute or relative path to the input file.

        Returns:
            List[int]: A list of parsed integers.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If any token cannot be parsed as an integer.
        """
        DataReader._validate_path(path)
        with open(path, "r", encoding="utf-8") as f:
            content = f.read()
            tokens = content.split()
            return [int(token) for token in tokens]

    @staticmethod
    def read_all_strings(path: str) -> List[str]:
        """Read all whitespace‑separated tokens from a file.

        Useful for loading datasets for Trie benchmarks, MSD/LSD string sorts,
        and token‑based algorithm tests.

        Args:
            path (str): Absolute or relative path to the input file.

        Returns:
            List[str]: A list of string tokens.

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        DataReader._validate_path(path)
        with open(path, "r", encoding="utf-8") as f:
            return f.read().split()

    @staticmethod
    def read_lines(path: str) -> List[str]:
        """Read all lines from a file, stripping leading and trailing whitespace.

        Empty lines are preserved as empty strings. This is useful for
        line‑oriented algorithms, text processing, and structured input formats.

        Args:
            path (str): Absolute or relative path to the input file.

        Returns:
            List[str]: A list of cleaned lines.

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        DataReader._validate_path(path)
        lines = []
        with open(path, "r", encoding="utf-8") as f:
            for line in f:
                lines.append(line.strip())
        return lines

    @staticmethod
    def _validate_path(path: str) -> None:
        """Validate that a file exists before attempting to read it.

        Args:
            path (str): Path to validate.

        Raises:
            FileNotFoundError: If the file does not exist.
        """
        if not os.path.exists(path):
            raise FileNotFoundError(f"File not found: {path}")

read_all_ints(path) staticmethod

Read all whitespace‑separated integers from a file.

The file may contain integers separated by spaces, tabs, or newlines. This format is commonly used for sorting and searching benchmarks.

Parameters:

Name Type Description Default
path str

Absolute or relative path to the input file.

required

Returns:

Type Description
List[int]

List[int]: A list of parsed integers.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If any token cannot be parsed as an integer.

Source code in src/alnoms/core/io.py
@staticmethod
def read_all_ints(path: str) -> List[int]:
    """Read all whitespace‑separated integers from a file.

    The file may contain integers separated by spaces, tabs, or newlines.
    This format is commonly used for sorting and searching benchmarks.

    Args:
        path (str): Absolute or relative path to the input file.

    Returns:
        List[int]: A list of parsed integers.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If any token cannot be parsed as an integer.
    """
    DataReader._validate_path(path)
    with open(path, "r", encoding="utf-8") as f:
        content = f.read()
        tokens = content.split()
        return [int(token) for token in tokens]

read_all_strings(path) staticmethod

Read all whitespace‑separated tokens from a file.

Useful for loading datasets for Trie benchmarks, MSD/LSD string sorts, and token‑based algorithm tests.

Parameters:

Name Type Description Default
path str

Absolute or relative path to the input file.

required

Returns:

Type Description
List[str]

List[str]: A list of string tokens.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in src/alnoms/core/io.py
@staticmethod
def read_all_strings(path: str) -> List[str]:
    """Read all whitespace‑separated tokens from a file.

    Useful for loading datasets for Trie benchmarks, MSD/LSD string sorts,
    and token‑based algorithm tests.

    Args:
        path (str): Absolute or relative path to the input file.

    Returns:
        List[str]: A list of string tokens.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    DataReader._validate_path(path)
    with open(path, "r", encoding="utf-8") as f:
        return f.read().split()

read_lines(path) staticmethod

Read all lines from a file, stripping leading and trailing whitespace.

Empty lines are preserved as empty strings. This is useful for line‑oriented algorithms, text processing, and structured input formats.

Parameters:

Name Type Description Default
path str

Absolute or relative path to the input file.

required

Returns:

Type Description
List[str]

List[str]: A list of cleaned lines.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

Source code in src/alnoms/core/io.py
@staticmethod
def read_lines(path: str) -> List[str]:
    """Read all lines from a file, stripping leading and trailing whitespace.

    Empty lines are preserved as empty strings. This is useful for
    line‑oriented algorithms, text processing, and structured input formats.

    Args:
        path (str): Absolute or relative path to the input file.

    Returns:
        List[str]: A list of cleaned lines.

    Raises:
        FileNotFoundError: If the file does not exist.
    """
    DataReader._validate_path(path)
    lines = []
    with open(path, "r", encoding="utf-8") as f:
        for line in f:
            lines.append(line.strip())
    return lines