Skip to content

API Reference

Complete reference for the triadic-engine Python package.

neurosym

neurosym

Triadic Neurosymbolic Engine — Core Package

Deterministic algebraic framework for neurosymbolic validation, semantic projection, and AI model auditing.

Install: pip install triadic-engine Docs: https://github.com/arturoornelasb/Triadic-Neurosymbolic-Engine

Encoder

Multi-backend embedding encoder with four LSH-to-prime projection modes.

neurosym.encoder

BaseEncoder

Bases: ABC

Abstract interface for all embedding backends. Subclass this to add new providers (OpenAI, Cohere, etc.)

Source code in engine-src/src/neurosym/encoder.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseEncoder(ABC):
    """
    Abstract interface for all embedding backends.
    Subclass this to add new providers (OpenAI, Cohere, etc.)
    """

    @abstractmethod
    def encode(self, concepts: List[str]) -> np.ndarray:
        """Returns a 2D numpy array of shape (len(concepts), dim)."""
        ...

    @property
    @abstractmethod
    def name(self) -> str:
        """Human-readable model identifier."""
        ...

name: str abstractmethod property

Human-readable model identifier.

encode(concepts: List[str]) -> np.ndarray abstractmethod

Returns a 2D numpy array of shape (len(concepts), dim).

Source code in engine-src/src/neurosym/encoder.py
20
21
22
23
@abstractmethod
def encode(self, concepts: List[str]) -> np.ndarray:
    """Returns a 2D numpy array of shape (len(concepts), dim)."""
    ...

ContinuousEncoder

Bases: BaseEncoder

A lightweight wrapper around sentence-transformers to generate continuous vector embeddings for natural language concepts. Optimized for CPU usage. Runs entirely offline.

Source code in engine-src/src/neurosym/encoder.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
class ContinuousEncoder(BaseEncoder):
    """
    A lightweight wrapper around sentence-transformers to generate 
    continuous vector embeddings for natural language concepts.
    Optimized for CPU usage. Runs entirely offline.
    """
    def __init__(self, model_name: str = 'all-MiniLM-L6-v2'):
        logger.info(f"Loading local embedding model: {model_name}")
        import os
        from sentence_transformers import SentenceTransformer
        # Suppress tqdm progress bars during model loading to avoid
        # write errors in environments that redirect stdout (Streamlit, Jupyter)
        _prev = os.environ.get("TQDM_DISABLE")
        os.environ["TQDM_DISABLE"] = "1"
        try:
            self.model = SentenceTransformer(model_name)
        finally:
            if _prev is None:
                os.environ.pop("TQDM_DISABLE", None)
            else:
                os.environ["TQDM_DISABLE"] = _prev
        self._name = model_name

    def encode(self, concepts: List[str]) -> np.ndarray:
        logger.info(f"Encoding {len(concepts)} concepts with {self._name}...")
        return self.model.encode(concepts, convert_to_numpy=True)

    @property
    def name(self) -> str:
        return self._name

OpenAIEncoder

Bases: BaseEncoder

Embedding backend using OpenAI's text-embedding API. Requires: pip install openai Set OPENAI_API_KEY environment variable.

Source code in engine-src/src/neurosym/encoder.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class OpenAIEncoder(BaseEncoder):
    """
    Embedding backend using OpenAI's text-embedding API.
    Requires: pip install openai
    Set OPENAI_API_KEY environment variable.
    """
    def __init__(self, model_name: str = "text-embedding-3-small"):
        self._name = model_name
        import os
        try:
            from openai import OpenAI
            self.client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
        except ImportError:
            raise ImportError("OpenAI backend requires: pip install openai")

    def encode(self, concepts: List[str]) -> np.ndarray:
        logger.info(f"Encoding {len(concepts)} concepts with OpenAI/{self._name}...")
        response = self.client.embeddings.create(input=concepts, model=self._name)
        embeddings = [item.embedding for item in response.data]
        return np.array(embeddings, dtype=np.float32)

    @property
    def name(self) -> str:
        return f"openai/{self._name}"

CohereEncoder

Bases: BaseEncoder

Embedding backend using Cohere's embed API. Requires: pip install cohere Set COHERE_API_KEY environment variable.

Source code in engine-src/src/neurosym/encoder.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
class CohereEncoder(BaseEncoder):
    """
    Embedding backend using Cohere's embed API.
    Requires: pip install cohere
    Set COHERE_API_KEY environment variable.
    """
    def __init__(self, model_name: str = "embed-english-v3.0"):
        self._name = model_name
        import os
        try:
            import cohere
            self.client = cohere.Client(api_key=os.environ.get("COHERE_API_KEY"))
        except ImportError:
            raise ImportError("Cohere backend requires: pip install cohere")

    def encode(self, concepts: List[str]) -> np.ndarray:
        logger.info(f"Encoding {len(concepts)} concepts with Cohere/{self._name}...")
        response = self.client.embed(
            texts=concepts,
            model=self._name,
            input_type="search_document"
        )
        return np.array(response.embeddings, dtype=np.float32)

    @property
    def name(self) -> str:
        return f"cohere/{self._name}"

DiscreteMapper

Maps continuous dense vectors to discrete integer space using Locality Sensitive Hashing (LSH) and Prime Factorization.

Each LSH hyperplane is assigned a unique prime number. A concept's discrete representation is the product of all primes corresponding to hyperplanes where its projection is positive.

Supports four projection modes
  • 'random' (default): Random hyperplanes from N(0, I_d)
  • 'pca': Principal Component directions from the corpus
  • 'consensus': Multi-seed voting — keeps only stable primes
  • 'contrastive': Trains hyperplanes on known hypernym pairs
Source code in engine-src/src/neurosym/encoder.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
class DiscreteMapper:
    """
    Maps continuous dense vectors to discrete integer space 
    using Locality Sensitive Hashing (LSH) and Prime Factorization.

    Each LSH hyperplane is assigned a unique prime number. A concept's
    discrete representation is the product of all primes corresponding
    to hyperplanes where its projection is positive.

    Supports four projection modes:
        - 'random' (default): Random hyperplanes from N(0, I_d)
        - 'pca': Principal Component directions from the corpus
        - 'consensus': Multi-seed voting — keeps only stable primes
        - 'contrastive': Trains hyperplanes on known hypernym pairs
    """
    def __init__(self, n_bits: int = 16, seed: int = 42, projection: str = "random",
                 consensus_seeds: int = 20, consensus_threshold: float = 0.7,
                 hypernym_pairs: Optional[List[Tuple[str, str]]] = None):
        self.n_bits = n_bits
        self.seed = seed
        self.random_state = np.random.RandomState(seed)
        self.projection = projection
        self.planes = None
        self.concept_to_prime: Dict[str, int] = {}
        # Consensus parameters
        self.consensus_seeds = consensus_seeds
        self.consensus_threshold = consensus_threshold
        # Contrastive parameters
        self.hypernym_pairs = hypernym_pairs or []

    def _generate_random_planes(self, dim: int):
        """Generate random hyperplanes from N(0, I_d)."""
        self.planes = self.random_state.randn(self.n_bits, dim)

    def _generate_pca_planes(self, embeddings: np.ndarray):
        """
        Generate hyperplanes from the top-k principal components.
        Deterministic, seed-independent, corpus-adapted.
        """
        from sklearn.decomposition import PCA

        k = min(self.n_bits, embeddings.shape[1], embeddings.shape[0])
        pca = PCA(n_components=k)
        pca.fit(embeddings)
        self.planes = pca.components_[:self.n_bits]

        if self.planes.shape[0] < self.n_bits:
            extra = self.random_state.randn(
                self.n_bits - self.planes.shape[0], embeddings.shape[1]
            )
            self.planes = np.vstack([self.planes, extra])

        logger.info(
            f"PCA projection: {k} components explain "
            f"{sum(pca.explained_variance_ratio_[:k])*100:.1f}% of variance"
        )

    def _generate_consensus_encoding(self, concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]:
        """
        Multi-seed consensus encoding.

        Runs N random projections and, for each concept, keeps only the
        prime factors that appear in more than `threshold` fraction of runs.
        This filters out random projection noise and retains only stable
        semantic features.
        """
        from collections import Counter

        plane_primes = [sympy.prime(i + 1) for i in range(self.n_bits)]
        concept_prime_votes: Dict[str, Counter] = {c: Counter() for c in concepts}

        for seed in range(self.consensus_seeds):
            rng = np.random.RandomState(seed)
            planes = rng.randn(self.n_bits, embeddings.shape[1])

            for concept, emb in zip(concepts, embeddings):
                projections = np.dot(planes, emb)
                bits = (projections > 0).astype(int)
                for bit, prime in zip(bits, plane_primes):
                    if bit == 1:
                        concept_prime_votes[concept][prime] += 1

        # Keep only primes that appear in > threshold fraction of seeds
        min_votes = int(self.consensus_seeds * self.consensus_threshold)
        result = {}
        for concept in concepts:
            composite = 1
            for prime, votes in concept_prime_votes[concept].items():
                if votes >= min_votes:
                    composite *= prime
            if composite == 1:
                composite = 2
            result[concept] = composite

        stable_count = sum(
            1 for c in concepts 
            if len([v for v in concept_prime_votes[c].values() if v >= min_votes]) > 0
        )
        logger.info(
            f"Consensus encoding: {self.consensus_seeds} seeds, "
            f"threshold={self.consensus_threshold}, "
            f"{stable_count}/{len(concepts)} concepts have stable factors"
        )
        return result

    def _generate_contrastive_planes(self, embeddings: np.ndarray, concepts: List[str]):
        """
        Contrastive hyperplane learning.

        Given known hypernym pairs (broader ⊇ narrower), optimize hyperplane
        directions to maximize the number of correct subsumption relationships
        (broader % narrower == 0).

        Uses gradient-free optimization: start with PCA directions, then
        iteratively perturb each hyperplane and keep changes that improve
        subsumption accuracy on the training pairs.
        """
        if not self.hypernym_pairs:
            logger.warning("No hypernym pairs provided for contrastive learning, falling back to PCA")
            self._generate_pca_planes(embeddings)
            return

        concept_idx = {c: i for i, c in enumerate(concepts)}
        valid_pairs = [
            (broader, narrower) 
            for broader, narrower in self.hypernym_pairs
            if broader in concept_idx and narrower in concept_idx
        ]

        if not valid_pairs:
            logger.warning("No valid hypernym pairs found in vocabulary, falling back to PCA")
            self._generate_pca_planes(embeddings)
            return

        # Start from PCA directions
        self._generate_pca_planes(embeddings)
        best_planes = self.planes.copy()
        best_score = self._score_subsumption(best_planes, embeddings, concepts, valid_pairs)

        logger.info(f"Contrastive learning: {len(valid_pairs)} hypernym pairs, initial score={best_score}")

        # Iterative perturbation (gradient-free optimization)
        rng = np.random.RandomState(self.seed)
        n_iterations = 200
        temperature = 0.1

        for iteration in range(n_iterations):
            plane_idx = rng.randint(self.n_bits)
            perturbation = rng.randn(embeddings.shape[1]) * temperature

            candidate = best_planes.copy()
            candidate[plane_idx] += perturbation
            candidate[plane_idx] /= np.linalg.norm(candidate[plane_idx])

            score = self._score_subsumption(candidate, embeddings, concepts, valid_pairs)

            if score > best_score:
                best_planes = candidate
                best_score = score

            if iteration % 50 == 49:
                temperature *= 0.8

        self.planes = best_planes
        logger.info(f"Contrastive learning complete: final score={best_score}")

    def _score_subsumption(self, planes, embeddings, concepts, hypernym_pairs):
        """Score a set of planes by how many hypernym relationships they capture correctly."""
        plane_primes = [sympy.prime(i + 1) for i in range(self.n_bits)]

        prime_map = {}
        for concept, emb in zip(concepts, embeddings):
            projections = np.dot(planes, emb)
            bits = (projections > 0).astype(int)
            composite = 1
            for bit, prime in zip(bits, plane_primes):
                if bit == 1:
                    composite *= prime
            if composite == 1:
                composite = 2
            prime_map[concept] = composite

        # Count correct subsumptions
        tp = sum(
            1 for broader, narrower in hypernym_pairs
            if broader in prime_map and narrower in prime_map
            and prime_map[broader] % prime_map[narrower] == 0
        )

        # Penalize false positives
        rng = np.random.RandomState(99)
        fp = 0
        for _ in range(50):
            i, j = rng.choice(len(concepts), 2, replace=False)
            a, b = concepts[i], concepts[j]
            if a in prime_map and b in prime_map:
                if prime_map[a] % prime_map[b] == 0:
                    fp += 1

        return tp - fp * 0.5

    def fit_transform(self, concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]:
        """
        Maps a list of concepts and their continuous embeddings to discrete composite primes.
        Each LSH hyperplane corresponds to a unique prime number.
        """
        logger.info(f"Mapping {len(concepts)} concepts to discrete integer space (mode={self.projection})...")

        # Consensus mode has its own encoding pipeline
        if self.projection == "consensus":
            self.concept_to_prime = self._generate_consensus_encoding(concepts, embeddings)
            return self.concept_to_prime

        if self.planes is None:
            if self.projection == "pca":
                self._generate_pca_planes(embeddings)
            elif self.projection == "contrastive":
                self._generate_contrastive_planes(embeddings, concepts)
            else:
                self._generate_random_planes(embeddings.shape[1])

        # Assign a prime to each hyperplane (Semantic Feature)
        plane_primes = [sympy.prime(i + 1) for i in range(self.n_bits)]

        for concept, emb in zip(concepts, embeddings):
            projections = np.dot(self.planes, emb)
            bits = (projections > 0).astype(int)

            composite_integer = 1
            for bit, prime_factor in zip(bits, plane_primes):
                if bit == 1:
                    composite_integer *= prime_factor

            if composite_integer == 1:
                composite_integer = 2

            self.concept_to_prime[concept] = composite_integer

        return self.concept_to_prime

    def get_factor(self, concept: str) -> int:
        if concept not in self.concept_to_prime:
            raise ValueError(f"Concept '{concept}' not found in the discrete mapping. Use transform() for new concepts.")
        return self.concept_to_prime[concept]

    def transform(self, concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]:
        """
        Maps a list of new concepts to discrete composite primes using EXISTING planes.
        Does NOT update internal planes.
        """
        if self.planes is None:
            raise ValueError("DiscreteMapper has not been fitted. Call fit_transform first.")

        plane_primes = [sympy.prime(i + 1) for i in range(self.n_bits)]
        results = {}

        for concept, emb in zip(concepts, embeddings):
            projections = np.dot(self.planes, emb)
            bits = (projections > 0).astype(int)

            composite_integer = 1
            for bit, prime_factor in zip(bits, plane_primes):
                if bit == 1:
                    composite_integer *= prime_factor

            if composite_integer == 1:
                composite_integer = 2

            results[concept] = composite_integer
            # Optionally cache it
            self.concept_to_prime[concept] = composite_integer

        return results

fit_transform(concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]

Maps a list of concepts and their continuous embeddings to discrete composite primes. Each LSH hyperplane corresponds to a unique prime number.

Source code in engine-src/src/neurosym/encoder.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
def fit_transform(self, concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]:
    """
    Maps a list of concepts and their continuous embeddings to discrete composite primes.
    Each LSH hyperplane corresponds to a unique prime number.
    """
    logger.info(f"Mapping {len(concepts)} concepts to discrete integer space (mode={self.projection})...")

    # Consensus mode has its own encoding pipeline
    if self.projection == "consensus":
        self.concept_to_prime = self._generate_consensus_encoding(concepts, embeddings)
        return self.concept_to_prime

    if self.planes is None:
        if self.projection == "pca":
            self._generate_pca_planes(embeddings)
        elif self.projection == "contrastive":
            self._generate_contrastive_planes(embeddings, concepts)
        else:
            self._generate_random_planes(embeddings.shape[1])

    # Assign a prime to each hyperplane (Semantic Feature)
    plane_primes = [sympy.prime(i + 1) for i in range(self.n_bits)]

    for concept, emb in zip(concepts, embeddings):
        projections = np.dot(self.planes, emb)
        bits = (projections > 0).astype(int)

        composite_integer = 1
        for bit, prime_factor in zip(bits, plane_primes):
            if bit == 1:
                composite_integer *= prime_factor

        if composite_integer == 1:
            composite_integer = 2

        self.concept_to_prime[concept] = composite_integer

    return self.concept_to_prime

transform(concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]

Maps a list of new concepts to discrete composite primes using EXISTING planes. Does NOT update internal planes.

Source code in engine-src/src/neurosym/encoder.py
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
def transform(self, concepts: List[str], embeddings: np.ndarray) -> Dict[str, int]:
    """
    Maps a list of new concepts to discrete composite primes using EXISTING planes.
    Does NOT update internal planes.
    """
    if self.planes is None:
        raise ValueError("DiscreteMapper has not been fitted. Call fit_transform first.")

    plane_primes = [sympy.prime(i + 1) for i in range(self.n_bits)]
    results = {}

    for concept, emb in zip(concepts, embeddings):
        projections = np.dot(self.planes, emb)
        bits = (projections > 0).astype(int)

        composite_integer = 1
        for bit, prime_factor in zip(bits, plane_primes):
            if bit == 1:
                composite_integer *= prime_factor

        if composite_integer == 1:
            composite_integer = 2

        results[concept] = composite_integer
        # Optionally cache it
        self.concept_to_prime[concept] = composite_integer

    return results

create_encoder(provider: str = 'local', model: str = None) -> BaseEncoder

Factory function to create an encoder from a provider name.

Examples:

create_encoder("local", "all-MiniLM-L6-v2") create_encoder("openai", "text-embedding-3-large") create_encoder("cohere", "embed-english-v3.0")

Source code in engine-src/src/neurosym/encoder.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def create_encoder(provider: str = "local", model: str = None) -> BaseEncoder:
    """
    Factory function to create an encoder from a provider name.

    Examples:
        create_encoder("local", "all-MiniLM-L6-v2")
        create_encoder("openai", "text-embedding-3-large")
        create_encoder("cohere", "embed-english-v3.0")
    """
    if provider not in ENCODER_REGISTRY:
        raise ValueError(f"Unknown provider '{provider}'. Available: {list(ENCODER_REGISTRY.keys())}")

    cls = ENCODER_REGISTRY[provider]
    if model:
        return cls(model_name=model)
    return cls()

Discrete Validator

Algebraic operations on prime-factor signatures: subsumption, composition, gap analysis, analogy.

neurosym.triadic

ValidationResult dataclass

Result of a discrete algebraic validation.

Source code in engine-src/src/neurosym/triadic.py
 8
 9
10
11
12
13
14
15
16
17
18
@dataclass
class ValidationResult:
    """
    Result of a discrete algebraic validation.
    """
    is_valid: bool
    output_value: float
    simplicity_k: float
    is_hypothetical: bool = False
    missing_factor: int = None
    trace: str = ""

DiscreteValidator

Abductive Algebraic Resolver for Relational Data. Merges Triadic Relational Framework and Shadow Engine logic. Project continuous latent spaces into discrete integer factors for rigorous validation.

Source code in engine-src/src/neurosym/triadic.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class DiscreteValidator:
    """
    Abductive Algebraic Resolver for Relational Data.
    Merges Triadic Relational Framework and Shadow Engine logic.
    Project continuous latent spaces into discrete integer factors for rigorous validation.
    """
    def __init__(self):
        pass

    @staticmethod
    def _compute_simplicity(rule_a: int, rule_b: int) -> float:
        """
        Computes simplicity K constant based on the balancing rules.
        K = 1 / (a * b)
        """
        return float(Fraction(1, rule_a * rule_b))

    @staticmethod
    def discover_missing_factor(numerator: int, denominator: int) -> int:
        """
        Abductive Discovery: Identifies the missing integer factor that prevents
        the equation from resolving to a clean integer state (Topological Obstruction).
        """
        common = math.gcd(numerator, denominator)
        missing_link = denominator // common
        return missing_link

    def validate_relationship(self, source: int, transform1: int, transform2: int, 
                              rule_a: int = 1, rule_b: int = 1) -> ValidationResult:
        """
        Validates if the relationship: 
        (rule_a * transform1 * transform2) / (rule_b * source) resolves cleanly.

        Args:
            source (C1): The base concept/entity (e.g. integer projection of a word)
            transform1 (C2): The first relational modifier
            transform2 (C3): The second relational modifier
            rule_a, rule_b: The invariant ratio (default 1:1)

        Returns:
            ValidationResult containing the expected target (C4) or the missing abductive factor.
        """
        # Guard against zero inputs
        if source == 0 or transform1 == 0 or transform2 == 0:
            return ValidationResult(
                is_valid=False,
                output_value=0,
                simplicity_k=0.0,
                trace="Zero input detected — cannot validate."
            )

        # 1. GCD Normalization (Projection to Terminal Object)
        gcd_in = math.gcd(source, math.gcd(transform1, transform2))
        c1_p = source // gcd_in
        c2_p = transform1 // gcd_in
        c3_p = transform2 // gcd_in

        # 2. Relational Path Construction
        numerator = rule_a * c2_p * c3_p
        denominator = rule_b * c1_p

        trace = f"({rule_a} * {c2_p} * {c3_p}) / ({rule_b} * {c1_p})"
        simplicity = self._compute_simplicity(rule_a, rule_b)

        # 3. Validation and Abductive Discovery
        if numerator % denominator != 0:
            logger.info("Topological obstruction detected. Initiating Abductive Discovery.")
            missing = self.discover_missing_factor(numerator, denominator)

            # Hypothetical resolution
            c4_hypothetical_p = (numerator * missing) // denominator
            c4_final = c4_hypothetical_p * gcd_in

            return ValidationResult(
                is_valid=False,
                output_value=c4_final,
                simplicity_k=simplicity,
                is_hypothetical=True,
                missing_factor=missing,
                trace=trace + f" -> Fails. Requires factor: {missing}"
            )

        # 4. Success Path
        c4_p = numerator // denominator
        c4 = c4_p * gcd_in

        return ValidationResult(
            is_valid=True,
            output_value=c4,
            simplicity_k=simplicity,
            is_hypothetical=False,
            trace=trace + f" -> Success: {c4}"
        )

    def analogy_prediction(self, source_a: int, source_b: int, target_a: int) -> ValidationResult:
        """
        Resolves semantic analogies (A:B :: C:D)
        Returns the integer prediction for D.
        Formula: D = (C * B) / A
        """
        return self.validate_relationship(
            source=source_a, 
            transform1=source_b, 
            transform2=target_a,
            rule_a=1, 
            rule_b=1
        )

    # --- Logical Verification API (Unique to Prime Representation) ---

    @staticmethod
    def _prime_factors(n: int) -> list[int]:
        """Returns the unique prime factors of n."""
        if n <= 1:
            return []
        import sympy
        return list(sympy.factorint(n).keys())

    @staticmethod
    def subsumes(a: int, b: int) -> bool:
        """
        Logical Subsumption: Does concept A contain ALL semantic features of B?

        In prime space: A subsumes B iff B divides A (A % B == 0).
        Example: King(2*3*5) subsumes Male(3) → True (King contains 'male' feature)

        This operation is IMPOSSIBLE with Hamming distance over bitstrings.
        Hamming only tells you "how many bits differ", not containment.
        """
        return a % b == 0

    @staticmethod
    def compose(*concepts: int) -> int:
        """
        Algebraic Composition: Create a new concept by combining features.

        In prime space: the union of all semantic features = LCM of the integers.
        This preserves uniqueness: compose(Royal, Male) contains all features of both.

        Example: compose(Royal=2*5, Male=3*7) → 2*3*5*7 = 210

        This operation is IMPOSSIBLE with vector cosine similarity.
        """
        result = concepts[0]
        for c in concepts[1:]:
            result = (result * c) // math.gcd(result, c)  # LCM
            # Guard against astronomically large composites
            if result.bit_length() > 4096:
                raise OverflowError(
                    f"Compose result exceeds 4096 bits ({result.bit_length()} bits). "
                    f"Reduce the number of concepts or LSH bits."
                )
        return result

    @staticmethod
    def explain_gap(a: int, b: int) -> dict:
        """
        Abductive Discovery: Explains exactly WHY two concepts differ.

        Returns:
            - shared: GCD(a, b) — the common semantic backbone
            - only_in_a: features present in A but missing from B
            - only_in_b: features present in B but missing from A

        Example: explain_gap(King=30, Queen=10)
            → shared=10, only_in_king=3 (the 'male' factor), only_in_queen=1

        This deterministic decomposition is IMPOSSIBLE with continuous vectors.
        Cosine similarity only says "0.87 similar" — not WHICH features differ.
        """
        shared = math.gcd(a, b)
        only_in_a = a // shared
        only_in_b = b // shared
        return {
            "shared": shared,
            "only_in_a": only_in_a,
            "only_in_b": only_in_b,
            "a_contains_b": (a % b == 0),
            "b_contains_a": (b % a == 0),
        }

discover_missing_factor(numerator: int, denominator: int) -> int staticmethod

Abductive Discovery: Identifies the missing integer factor that prevents the equation from resolving to a clean integer state (Topological Obstruction).

Source code in engine-src/src/neurosym/triadic.py
37
38
39
40
41
42
43
44
45
@staticmethod
def discover_missing_factor(numerator: int, denominator: int) -> int:
    """
    Abductive Discovery: Identifies the missing integer factor that prevents
    the equation from resolving to a clean integer state (Topological Obstruction).
    """
    common = math.gcd(numerator, denominator)
    missing_link = denominator // common
    return missing_link

validate_relationship(source: int, transform1: int, transform2: int, rule_a: int = 1, rule_b: int = 1) -> ValidationResult

Validates if the relationship: (rule_a * transform1 * transform2) / (rule_b * source) resolves cleanly.

Parameters:

Name Type Description Default
source C1

The base concept/entity (e.g. integer projection of a word)

required
transform1 C2

The first relational modifier

required
transform2 C3

The second relational modifier

required
rule_a, rule_b

The invariant ratio (default 1:1)

required

Returns:

Type Description
ValidationResult

ValidationResult containing the expected target (C4) or the missing abductive factor.

Source code in engine-src/src/neurosym/triadic.py
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def validate_relationship(self, source: int, transform1: int, transform2: int, 
                          rule_a: int = 1, rule_b: int = 1) -> ValidationResult:
    """
    Validates if the relationship: 
    (rule_a * transform1 * transform2) / (rule_b * source) resolves cleanly.

    Args:
        source (C1): The base concept/entity (e.g. integer projection of a word)
        transform1 (C2): The first relational modifier
        transform2 (C3): The second relational modifier
        rule_a, rule_b: The invariant ratio (default 1:1)

    Returns:
        ValidationResult containing the expected target (C4) or the missing abductive factor.
    """
    # Guard against zero inputs
    if source == 0 or transform1 == 0 or transform2 == 0:
        return ValidationResult(
            is_valid=False,
            output_value=0,
            simplicity_k=0.0,
            trace="Zero input detected — cannot validate."
        )

    # 1. GCD Normalization (Projection to Terminal Object)
    gcd_in = math.gcd(source, math.gcd(transform1, transform2))
    c1_p = source // gcd_in
    c2_p = transform1 // gcd_in
    c3_p = transform2 // gcd_in

    # 2. Relational Path Construction
    numerator = rule_a * c2_p * c3_p
    denominator = rule_b * c1_p

    trace = f"({rule_a} * {c2_p} * {c3_p}) / ({rule_b} * {c1_p})"
    simplicity = self._compute_simplicity(rule_a, rule_b)

    # 3. Validation and Abductive Discovery
    if numerator % denominator != 0:
        logger.info("Topological obstruction detected. Initiating Abductive Discovery.")
        missing = self.discover_missing_factor(numerator, denominator)

        # Hypothetical resolution
        c4_hypothetical_p = (numerator * missing) // denominator
        c4_final = c4_hypothetical_p * gcd_in

        return ValidationResult(
            is_valid=False,
            output_value=c4_final,
            simplicity_k=simplicity,
            is_hypothetical=True,
            missing_factor=missing,
            trace=trace + f" -> Fails. Requires factor: {missing}"
        )

    # 4. Success Path
    c4_p = numerator // denominator
    c4 = c4_p * gcd_in

    return ValidationResult(
        is_valid=True,
        output_value=c4,
        simplicity_k=simplicity,
        is_hypothetical=False,
        trace=trace + f" -> Success: {c4}"
    )

analogy_prediction(source_a: int, source_b: int, target_a: int) -> ValidationResult

Resolves semantic analogies (A:B :: C:D) Returns the integer prediction for D. Formula: D = (C * B) / A

Source code in engine-src/src/neurosym/triadic.py
114
115
116
117
118
119
120
121
122
123
124
125
126
def analogy_prediction(self, source_a: int, source_b: int, target_a: int) -> ValidationResult:
    """
    Resolves semantic analogies (A:B :: C:D)
    Returns the integer prediction for D.
    Formula: D = (C * B) / A
    """
    return self.validate_relationship(
        source=source_a, 
        transform1=source_b, 
        transform2=target_a,
        rule_a=1, 
        rule_b=1
    )

subsumes(a: int, b: int) -> bool staticmethod

Logical Subsumption: Does concept A contain ALL semantic features of B?

In prime space: A subsumes B iff B divides A (A % B == 0). Example: King(235) subsumes Male(3) → True (King contains 'male' feature)

This operation is IMPOSSIBLE with Hamming distance over bitstrings. Hamming only tells you "how many bits differ", not containment.

Source code in engine-src/src/neurosym/triadic.py
138
139
140
141
142
143
144
145
146
147
148
149
@staticmethod
def subsumes(a: int, b: int) -> bool:
    """
    Logical Subsumption: Does concept A contain ALL semantic features of B?

    In prime space: A subsumes B iff B divides A (A % B == 0).
    Example: King(2*3*5) subsumes Male(3) → True (King contains 'male' feature)

    This operation is IMPOSSIBLE with Hamming distance over bitstrings.
    Hamming only tells you "how many bits differ", not containment.
    """
    return a % b == 0

compose(*concepts: int) -> int staticmethod

Algebraic Composition: Create a new concept by combining features.

In prime space: the union of all semantic features = LCM of the integers. This preserves uniqueness: compose(Royal, Male) contains all features of both.

Example: compose(Royal=25, Male=37) → 235*7 = 210

This operation is IMPOSSIBLE with vector cosine similarity.

Source code in engine-src/src/neurosym/triadic.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
@staticmethod
def compose(*concepts: int) -> int:
    """
    Algebraic Composition: Create a new concept by combining features.

    In prime space: the union of all semantic features = LCM of the integers.
    This preserves uniqueness: compose(Royal, Male) contains all features of both.

    Example: compose(Royal=2*5, Male=3*7) → 2*3*5*7 = 210

    This operation is IMPOSSIBLE with vector cosine similarity.
    """
    result = concepts[0]
    for c in concepts[1:]:
        result = (result * c) // math.gcd(result, c)  # LCM
        # Guard against astronomically large composites
        if result.bit_length() > 4096:
            raise OverflowError(
                f"Compose result exceeds 4096 bits ({result.bit_length()} bits). "
                f"Reduce the number of concepts or LSH bits."
            )
    return result

explain_gap(a: int, b: int) -> dict staticmethod

Abductive Discovery: Explains exactly WHY two concepts differ.

Returns:

Type Description
dict
  • shared: GCD(a, b) — the common semantic backbone
dict
  • only_in_a: features present in A but missing from B
dict
  • only_in_b: features present in B but missing from A
explain_gap(King=30, Queen=10)

→ shared=10, only_in_king=3 (the 'male' factor), only_in_queen=1

This deterministic decomposition is IMPOSSIBLE with continuous vectors. Cosine similarity only says "0.87 similar" — not WHICH features differ.

Source code in engine-src/src/neurosym/triadic.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
@staticmethod
def explain_gap(a: int, b: int) -> dict:
    """
    Abductive Discovery: Explains exactly WHY two concepts differ.

    Returns:
        - shared: GCD(a, b) — the common semantic backbone
        - only_in_a: features present in A but missing from B
        - only_in_b: features present in B but missing from A

    Example: explain_gap(King=30, Queen=10)
        → shared=10, only_in_king=3 (the 'male' factor), only_in_queen=1

    This deterministic decomposition is IMPOSSIBLE with continuous vectors.
    Cosine similarity only says "0.87 similar" — not WHICH features differ.
    """
    shared = math.gcd(a, b)
    only_in_a = a // shared
    only_in_b = b // shared
    return {
        "shared": shared,
        "only_in_a": only_in_a,
        "only_in_b": only_in_b,
        "a_contains_b": (a % b == 0),
        "b_contains_a": (b % a == 0),
    }

Graph Builder

Scalable graph construction with inverted prime index.

neurosym.graph

Scalable Graph Builder for the Triadic Neurosymbolic Engine.

Replaces the naive O(N²) all-pairs GCD scan with an inverted prime index approach that only compares concepts sharing at least one prime factor.

O(N * F * B) where:

N = number of concepts F = average number of prime factors per concept
B = average bucket size per prime factor

For sparse LSH projections (k=8-16), F ≈ 4-8 and B << N, so this is dramatically faster than O(N²) for large datasets.

ScalableGraphBuilder

Builds semantic graphs using an inverted prime factor index instead of brute-force all-pairs comparison.

Source code in engine-src/src/neurosym/graph.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class ScalableGraphBuilder:
    """
    Builds semantic graphs using an inverted prime factor index
    instead of brute-force all-pairs comparison.
    """

    def __init__(self):
        self.inverted_index: Dict[int, Set[str]] = defaultdict(set)
        self.concept_factors: Dict[str, List[int]] = {}

    def _factorize(self, n: int) -> List[int]:
        if n <= 1:
            return []
        return list(sympy.factorint(n).keys())

    def build_index(self, prime_map: Dict[str, int]):
        """
        Build the inverted index from a prime map.
        Maps each prime factor → set of concepts containing that factor.
        """
        self.inverted_index.clear()
        self.concept_factors.clear()

        for concept, composite in prime_map.items():
            factors = self._factorize(composite)
            self.concept_factors[concept] = factors
            for f in factors:
                self.inverted_index[f].add(concept)

        logger.info(
            f"Built inverted index: {len(prime_map)} concepts, "
            f"{len(self.inverted_index)} prime buckets"
        )

    def find_edges(self, prime_map: Dict[str, int], min_shared: int = 1) -> List[Tuple[str, str, int, List[int]]]:
        """
        Find all edges (concept pairs with shared prime factors) using
        the inverted index. Returns list of (concept_a, concept_b, weight, shared_factors).

        This is O(N * F * B) instead of O(N²).
        """
        if not self.inverted_index:
            self.build_index(prime_map)

        # Collect candidate pairs from the inverted index
        pair_shared: Dict[Tuple[str, str], Set[int]] = defaultdict(set)

        for prime_factor, concepts in self.inverted_index.items():
            concept_list = sorted(concepts)  # Sort for consistent pair ordering
            for i in range(len(concept_list)):
                for j in range(i + 1, len(concept_list)):
                    pair = (concept_list[i], concept_list[j])
                    pair_shared[pair].add(prime_factor)

        # Filter by minimum shared weight
        edges = []
        for (a, b), shared_primes in pair_shared.items():
            weight = len(shared_primes)
            if weight >= min_shared:
                edges.append((a, b, weight, sorted(shared_primes)))

        logger.info(
            f"Found {len(edges)} edges (min_shared={min_shared}) "
            f"from {len(pair_shared)} candidate pairs"
        )
        return edges

    def find_neighbors(self, concept: str, prime_map: Dict[str, int], min_shared: int = 1) -> List[Tuple[str, int, List[int]]]:
        """
        Find all neighbors of a single concept using the inverted index.
        Returns list of (neighbor, weight, shared_factors).

        This is O(F * B) — constant relative to total dataset size.
        """
        if not self.inverted_index:
            self.build_index(prime_map)

        if concept not in self.concept_factors:
            return []

        neighbor_shared: Dict[str, Set[int]] = defaultdict(set)

        for factor in self.concept_factors[concept]:
            for other in self.inverted_index[factor]:
                if other != concept:
                    neighbor_shared[other].add(factor)

        results = []
        for neighbor, shared_primes in neighbor_shared.items():
            weight = len(shared_primes)
            if weight >= min_shared:
                results.append((neighbor, weight, sorted(shared_primes)))

        results.sort(key=lambda x: -x[1])
        return results

    def get_stats(self) -> dict:
        """Return index statistics."""
        if not self.inverted_index:
            return {"indexed": False}

        bucket_sizes = [len(v) for v in self.inverted_index.values()]
        return {
            "indexed": True,
            "total_concepts": len(self.concept_factors),
            "total_prime_buckets": len(self.inverted_index),
            "avg_bucket_size": sum(bucket_sizes) / max(1, len(bucket_sizes)),
            "max_bucket_size": max(bucket_sizes) if bucket_sizes else 0,
            "avg_factors_per_concept": sum(len(f) for f in self.concept_factors.values()) / max(1, len(self.concept_factors)),
        }

build_index(prime_map: Dict[str, int])

Build the inverted index from a prime map. Maps each prime factor → set of concepts containing that factor.

Source code in engine-src/src/neurosym/graph.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def build_index(self, prime_map: Dict[str, int]):
    """
    Build the inverted index from a prime map.
    Maps each prime factor → set of concepts containing that factor.
    """
    self.inverted_index.clear()
    self.concept_factors.clear()

    for concept, composite in prime_map.items():
        factors = self._factorize(composite)
        self.concept_factors[concept] = factors
        for f in factors:
            self.inverted_index[f].add(concept)

    logger.info(
        f"Built inverted index: {len(prime_map)} concepts, "
        f"{len(self.inverted_index)} prime buckets"
    )

find_edges(prime_map: Dict[str, int], min_shared: int = 1) -> List[Tuple[str, str, int, List[int]]]

Find all edges (concept pairs with shared prime factors) using the inverted index. Returns list of (concept_a, concept_b, weight, shared_factors).

This is O(N * F * B) instead of O(N²).

Source code in engine-src/src/neurosym/graph.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def find_edges(self, prime_map: Dict[str, int], min_shared: int = 1) -> List[Tuple[str, str, int, List[int]]]:
    """
    Find all edges (concept pairs with shared prime factors) using
    the inverted index. Returns list of (concept_a, concept_b, weight, shared_factors).

    This is O(N * F * B) instead of O(N²).
    """
    if not self.inverted_index:
        self.build_index(prime_map)

    # Collect candidate pairs from the inverted index
    pair_shared: Dict[Tuple[str, str], Set[int]] = defaultdict(set)

    for prime_factor, concepts in self.inverted_index.items():
        concept_list = sorted(concepts)  # Sort for consistent pair ordering
        for i in range(len(concept_list)):
            for j in range(i + 1, len(concept_list)):
                pair = (concept_list[i], concept_list[j])
                pair_shared[pair].add(prime_factor)

    # Filter by minimum shared weight
    edges = []
    for (a, b), shared_primes in pair_shared.items():
        weight = len(shared_primes)
        if weight >= min_shared:
            edges.append((a, b, weight, sorted(shared_primes)))

    logger.info(
        f"Found {len(edges)} edges (min_shared={min_shared}) "
        f"from {len(pair_shared)} candidate pairs"
    )
    return edges

find_neighbors(concept: str, prime_map: Dict[str, int], min_shared: int = 1) -> List[Tuple[str, int, List[int]]]

Find all neighbors of a single concept using the inverted index. Returns list of (neighbor, weight, shared_factors).

This is O(F * B) — constant relative to total dataset size.

Source code in engine-src/src/neurosym/graph.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
def find_neighbors(self, concept: str, prime_map: Dict[str, int], min_shared: int = 1) -> List[Tuple[str, int, List[int]]]:
    """
    Find all neighbors of a single concept using the inverted index.
    Returns list of (neighbor, weight, shared_factors).

    This is O(F * B) — constant relative to total dataset size.
    """
    if not self.inverted_index:
        self.build_index(prime_map)

    if concept not in self.concept_factors:
        return []

    neighbor_shared: Dict[str, Set[int]] = defaultdict(set)

    for factor in self.concept_factors[concept]:
        for other in self.inverted_index[factor]:
            if other != concept:
                neighbor_shared[other].add(factor)

    results = []
    for neighbor, shared_primes in neighbor_shared.items():
        weight = len(shared_primes)
        if weight >= min_shared:
            results.append((neighbor, weight, sorted(shared_primes)))

    results.sort(key=lambda x: -x[1])
    return results

get_stats() -> dict

Return index statistics.

Source code in engine-src/src/neurosym/graph.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def get_stats(self) -> dict:
    """Return index statistics."""
    if not self.inverted_index:
        return {"indexed": False}

    bucket_sizes = [len(v) for v in self.inverted_index.values()]
    return {
        "indexed": True,
        "total_concepts": len(self.concept_factors),
        "total_prime_buckets": len(self.inverted_index),
        "avg_bucket_size": sum(bucket_sizes) / max(1, len(bucket_sizes)),
        "max_bucket_size": max(bucket_sizes) if bucket_sizes else 0,
        "avg_factors_per_concept": sum(len(f) for f in self.concept_factors.values()) / max(1, len(self.concept_factors)),
    }

Storage

SQLite persistence for prime indices and audit results.

neurosym.storage

SQLite persistence layer for the Triadic Neurosymbolic Engine. Stores prime indexes so they survive server restarts.

PrimeIndexDB

Persistent storage for discrete prime indexes using SQLite.

Default location: ~/.neurosym/index.db

Source code in engine-src/src/neurosym/storage.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
class PrimeIndexDB:
    """
    Persistent storage for discrete prime indexes using SQLite.

    Default location: ~/.neurosym/index.db
    """

    def __init__(self, db_path: str = None):
        if db_path is None:
            db_dir = os.path.join(os.path.expanduser("~"), ".neurosym")
            os.makedirs(db_dir, exist_ok=True)
            db_path = os.path.join(db_dir, "index.db")

        self.db_path = db_path
        self._init_db()

    def _init_db(self):
        """Create tables if they don't exist."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute("""
                CREATE TABLE IF NOT EXISTS concepts (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    text TEXT NOT NULL,
                    prime_factor INTEGER NOT NULL,
                    model TEXT NOT NULL,
                    lsh_bits INTEGER NOT NULL,
                    seed INTEGER NOT NULL,
                    created_at TEXT NOT NULL,
                    UNIQUE(text, model, lsh_bits, seed)
                )
            """)
            conn.execute("""
                CREATE TABLE IF NOT EXISTS audit_results (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    concept_a TEXT NOT NULL,
                    concept_b TEXT NOT NULL,
                    model_a TEXT NOT NULL,
                    model_b TEXT NOT NULL,
                    dist_model_a TEXT,
                    dist_model_b TEXT,
                    chain TEXT,
                    created_at TEXT NOT NULL
                )
            """)
            conn.execute("""
                CREATE INDEX IF NOT EXISTS idx_concepts_model 
                ON concepts(model, lsh_bits, seed)
            """)

    def save_index(self, prime_map: Dict[str, int], model: str, lsh_bits: int, seed: int):
        """Save a prime map to the database. Upserts on conflict."""
        now = datetime.now(timezone.utc).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            for text, prime_factor in prime_map.items():
                conn.execute("""
                    INSERT INTO concepts (text, prime_factor, model, lsh_bits, seed, created_at)
                    VALUES (?, ?, ?, ?, ?, ?)
                    ON CONFLICT(text, model, lsh_bits, seed) 
                    DO UPDATE SET prime_factor = excluded.prime_factor, created_at = excluded.created_at
                """, (text, prime_factor, model, lsh_bits, seed, now))

    def load_index(self, model: str, lsh_bits: int, seed: int) -> Dict[str, int]:
        """Load a previously saved prime map from the database."""
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute(
                "SELECT text, prime_factor FROM concepts WHERE model = ? AND lsh_bits = ? AND seed = ?",
                (model, lsh_bits, seed)
            ).fetchall()
        return {text: prime for text, prime in rows}

    def save_audit(self, results: List[dict], model_a: str, model_b: str):
        """Save audit discrepancy results."""
        now = datetime.now(timezone.utc).isoformat()
        with sqlite3.connect(self.db_path) as conn:
            for r in results:
                conn.execute("""
                    INSERT INTO audit_results (concept_a, concept_b, model_a, model_b, dist_model_a, dist_model_b, chain, created_at)
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """, (
                    r.get("concept_a", ""), r.get("concept_b", ""),
                    model_a, model_b,
                    str(r.get("distance_model_a", "")), str(r.get("distance_model_b", "")),
                    r.get("chain", ""), now
                ))

    def list_indexes(self) -> List[dict]:
        """List all stored indexes with their metadata."""
        with sqlite3.connect(self.db_path) as conn:
            rows = conn.execute("""
                SELECT model, lsh_bits, seed, COUNT(*) as concept_count, MAX(created_at) as last_updated
                FROM concepts
                GROUP BY model, lsh_bits, seed
                ORDER BY last_updated DESC
            """).fetchall()
        return [
            {"model": r[0], "lsh_bits": r[1], "seed": r[2], "concept_count": r[3], "last_updated": r[4]}
            for r in rows
        ]

    def export_csv(self, model: str = None) -> str:
        """Export concepts to CSV string."""
        with sqlite3.connect(self.db_path) as conn:
            if model:
                rows = conn.execute(
                    "SELECT text, prime_factor, model, lsh_bits, seed, created_at FROM concepts WHERE model = ?",
                    (model,)
                ).fetchall()
            else:
                rows = conn.execute(
                    "SELECT text, prime_factor, model, lsh_bits, seed, created_at FROM concepts"
                ).fetchall()

        output = io.StringIO()
        writer = csv.writer(output)
        writer.writerow(["text", "prime_factor", "model", "lsh_bits", "seed", "created_at"])
        writer.writerows(rows)
        return output.getvalue()

    def delete_index(self, model: str, lsh_bits: int, seed: int) -> int:
        """Delete all concepts for a given named index. Returns number of rows deleted."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                "DELETE FROM concepts WHERE model = ? AND lsh_bits = ? AND seed = ?",
                (model, lsh_bits, seed)
            )
        return cursor.rowcount

    def concept_count(self) -> int:
        """Return total number of stored concepts."""
        with sqlite3.connect(self.db_path) as conn:
            row = conn.execute("SELECT COUNT(*) FROM concepts").fetchone()
        return row[0] if row else 0

save_index(prime_map: Dict[str, int], model: str, lsh_bits: int, seed: int)

Save a prime map to the database. Upserts on conflict.

Source code in engine-src/src/neurosym/storage.py
62
63
64
65
66
67
68
69
70
71
72
def save_index(self, prime_map: Dict[str, int], model: str, lsh_bits: int, seed: int):
    """Save a prime map to the database. Upserts on conflict."""
    now = datetime.now(timezone.utc).isoformat()
    with sqlite3.connect(self.db_path) as conn:
        for text, prime_factor in prime_map.items():
            conn.execute("""
                INSERT INTO concepts (text, prime_factor, model, lsh_bits, seed, created_at)
                VALUES (?, ?, ?, ?, ?, ?)
                ON CONFLICT(text, model, lsh_bits, seed) 
                DO UPDATE SET prime_factor = excluded.prime_factor, created_at = excluded.created_at
            """, (text, prime_factor, model, lsh_bits, seed, now))

load_index(model: str, lsh_bits: int, seed: int) -> Dict[str, int]

Load a previously saved prime map from the database.

Source code in engine-src/src/neurosym/storage.py
74
75
76
77
78
79
80
81
def load_index(self, model: str, lsh_bits: int, seed: int) -> Dict[str, int]:
    """Load a previously saved prime map from the database."""
    with sqlite3.connect(self.db_path) as conn:
        rows = conn.execute(
            "SELECT text, prime_factor FROM concepts WHERE model = ? AND lsh_bits = ? AND seed = ?",
            (model, lsh_bits, seed)
        ).fetchall()
    return {text: prime for text, prime in rows}

save_audit(results: List[dict], model_a: str, model_b: str)

Save audit discrepancy results.

Source code in engine-src/src/neurosym/storage.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def save_audit(self, results: List[dict], model_a: str, model_b: str):
    """Save audit discrepancy results."""
    now = datetime.now(timezone.utc).isoformat()
    with sqlite3.connect(self.db_path) as conn:
        for r in results:
            conn.execute("""
                INSERT INTO audit_results (concept_a, concept_b, model_a, model_b, dist_model_a, dist_model_b, chain, created_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
            """, (
                r.get("concept_a", ""), r.get("concept_b", ""),
                model_a, model_b,
                str(r.get("distance_model_a", "")), str(r.get("distance_model_b", "")),
                r.get("chain", ""), now
            ))

list_indexes() -> List[dict]

List all stored indexes with their metadata.

Source code in engine-src/src/neurosym/storage.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def list_indexes(self) -> List[dict]:
    """List all stored indexes with their metadata."""
    with sqlite3.connect(self.db_path) as conn:
        rows = conn.execute("""
            SELECT model, lsh_bits, seed, COUNT(*) as concept_count, MAX(created_at) as last_updated
            FROM concepts
            GROUP BY model, lsh_bits, seed
            ORDER BY last_updated DESC
        """).fetchall()
    return [
        {"model": r[0], "lsh_bits": r[1], "seed": r[2], "concept_count": r[3], "last_updated": r[4]}
        for r in rows
    ]

export_csv(model: str = None) -> str

Export concepts to CSV string.

Source code in engine-src/src/neurosym/storage.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def export_csv(self, model: str = None) -> str:
    """Export concepts to CSV string."""
    with sqlite3.connect(self.db_path) as conn:
        if model:
            rows = conn.execute(
                "SELECT text, prime_factor, model, lsh_bits, seed, created_at FROM concepts WHERE model = ?",
                (model,)
            ).fetchall()
        else:
            rows = conn.execute(
                "SELECT text, prime_factor, model, lsh_bits, seed, created_at FROM concepts"
            ).fetchall()

    output = io.StringIO()
    writer = csv.writer(output)
    writer.writerow(["text", "prime_factor", "model", "lsh_bits", "seed", "created_at"])
    writer.writerows(rows)
    return output.getvalue()

delete_index(model: str, lsh_bits: int, seed: int) -> int

Delete all concepts for a given named index. Returns number of rows deleted.

Source code in engine-src/src/neurosym/storage.py
131
132
133
134
135
136
137
138
def delete_index(self, model: str, lsh_bits: int, seed: int) -> int:
    """Delete all concepts for a given named index. Returns number of rows deleted."""
    with sqlite3.connect(self.db_path) as conn:
        cursor = conn.execute(
            "DELETE FROM concepts WHERE model = ? AND lsh_bits = ? AND seed = ?",
            (model, lsh_bits, seed)
        )
    return cursor.rowcount

concept_count() -> int

Return total number of stored concepts.

Source code in engine-src/src/neurosym/storage.py
140
141
142
143
144
def concept_count(self) -> int:
    """Return total number of stored concepts."""
    with sqlite3.connect(self.db_path) as conn:
        row = conn.execute("SELECT COUNT(*) FROM concepts").fetchone()
    return row[0] if row else 0

Reports

Exportable reports in HTML, JSON, and CSV formats.

neurosym.reports

Report Generator for the Triadic Neurosymbolic Engine.

Generates exportable audit reports in HTML, JSON, and CSV formats. No external dependencies — uses Python stdlib only.

ReportGenerator

Generates structured audit reports from Triadic Engine results. Supports HTML (standalone, embeddable), JSON, and CSV output.

Source code in engine-src/src/neurosym/reports.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class ReportGenerator:
    """
    Generates structured audit reports from Triadic Engine results.
    Supports HTML (standalone, embeddable), JSON, and CSV output.
    """

    def __init__(self, title: str = "Triadic Neurosymbolic Audit Report"):
        self.title = title
        self.timestamp = datetime.utcnow().isoformat()
        self.sections: List[dict] = []

    def add_encoding_section(self, prime_map: Dict[str, int], model: str, lsh_bits: int, factorize_fn=None):
        """Add the prime encoding results to the report."""
        rows = []
        for concept, prime in sorted(prime_map.items(), key=lambda x: x[1]):
            factors = factorize_fn(prime) if factorize_fn else []
            rows.append({
                "concept": concept,
                "prime_factor": prime,
                "decomposition": factors,
            })

        self.sections.append({
            "type": "encoding",
            "title": "Prime Factor Encoding",
            "model": model,
            "lsh_bits": lsh_bits,
            "total_concepts": len(prime_map),
            "unique_primes": len(set(prime_map.values())),
            "rows": rows,
        })

    def add_audit_section(self, discrepancies: List[dict], model_a: str, model_b: str, 
                          total_pairs: int, total_concepts: int):
        """Add the model audit discrepancy results to the report."""
        self.sections.append({
            "type": "audit",
            "title": f"Model Comparison: {model_a} vs {model_b}",
            "model_a": model_a,
            "model_b": model_b,
            "total_concepts": total_concepts,
            "total_pairs": total_pairs,
            "discrepancies_found": len(discrepancies),
            "discrepancy_rate": f"{len(discrepancies) / max(1, total_pairs) * 100:.1f}%",
            "rows": discrepancies,
        })

    def add_graph_section(self, edges: list, node_count: int):
        """Add graph statistics to the report."""
        self.sections.append({
            "type": "graph",
            "title": "Semantic Graph Statistics",
            "total_nodes": node_count,
            "total_edges": len(edges),
            "avg_edge_weight": sum(e[2] for e in edges) / max(1, len(edges)) if edges else 0,
            "top_connections": [
                {"concept_a": a, "concept_b": b, "weight": w, "shared_primes": p}
                for a, b, w, p in sorted(edges, key=lambda x: -x[2])[:20]
            ],
        })

    # === Export Methods ===

    def to_json(self, indent: int = 2) -> str:
        """Export the full report as a JSON string."""
        report = {
            "title": self.title,
            "generated_at": self.timestamp,
            "engine": f"Triadic Neurosymbolic Engine v{_engine_version()}",
            "sections": self.sections,
        }
        return json.dumps(report, indent=indent, default=str)

    def to_csv(self) -> str:
        """Export encoding and audit rows as CSV."""
        output = io.StringIO()
        writer = csv.writer(output)

        for section in self.sections:
            if section["type"] == "encoding":
                writer.writerow(["--- Encoding Results ---"])
                writer.writerow(["Concept", "Prime Factor", "Decomposition"])
                for row in section["rows"]:
                    writer.writerow([row["concept"], row["prime_factor"], str(row["decomposition"])])
                writer.writerow([])

            elif section["type"] == "audit":
                writer.writerow([f"--- Audit: {section['model_a']} vs {section['model_b']} ---"])
                writer.writerow(["Concept A", "Concept B", f"Dist {section['model_a']}", f"Dist {section['model_b']}", "Chain"])
                for row in section["rows"]:
                    writer.writerow([
                        row.get("concept_a", ""),
                        row.get("concept_b", ""),
                        row.get("distance_model_a", ""),
                        row.get("distance_model_b", ""),
                        row.get("chain", ""),
                    ])
                writer.writerow([])

        return output.getvalue()

    def to_html(self) -> str:
        """Export a standalone HTML report with embedded dark-theme styling."""
        html_sections = []

        for section in self.sections:
            if section["type"] == "encoding":
                rows_html = ""
                for row in section["rows"]:
                    factors_str = ", ".join(str(f) for f in row["decomposition"])
                    safe_concept = html_mod.escape(str(row['concept']))
                    rows_html += f"""
                    <tr>
                        <td>{safe_concept}</td>
                        <td class="mono">{row['prime_factor']}</td>
                        <td class="mono">[{factors_str}]</td>
                    </tr>"""

                html_sections.append(f"""
                <div class="section">
                    <h2>🧬 {section['title']}</h2>
                    <div class="stats">
                        <div class="stat"><span class="stat-value">{section['total_concepts']}</span><span class="stat-label">Concepts</span></div>
                        <div class="stat"><span class="stat-value">{section['unique_primes']}</span><span class="stat-label">Unique Clusters</span></div>
                        <div class="stat"><span class="stat-value">{section['model']}</span><span class="stat-label">Model</span></div>
                        <div class="stat"><span class="stat-value">{section['lsh_bits']}</span><span class="stat-label">LSH Bits</span></div>
                    </div>
                    <table>
                        <thead><tr><th>Concept</th><th>Prime Factor</th><th>Decomposition</th></tr></thead>
                        <tbody>{rows_html}</tbody>
                    </table>
                </div>""")

            elif section["type"] == "audit":
                rows_html = ""
                for row in section["rows"]:
                    safe_a = html_mod.escape(str(row.get('concept_a', '')))
                    safe_b = html_mod.escape(str(row.get('concept_b', '')))
                    safe_chain = html_mod.escape(str(row.get('chain', '')))
                    rows_html += f"""
                    <tr>
                        <td>{safe_a}</td>
                        <td>{safe_b}</td>
                        <td class="mono">{html_mod.escape(str(row.get('distance_model_a', '')))}</td>
                        <td class="mono">{html_mod.escape(str(row.get('distance_model_b', '')))}</td>
                        <td>{safe_chain}</td>
                    </tr>"""

                badge_class = "badge-ok" if section['discrepancies_found'] == 0 else "badge-warn"
                html_sections.append(f"""
                <div class="section">
                    <h2>🤖 {section['title']}</h2>
                    <div class="stats">
                        <div class="stat"><span class="stat-value">{section['total_concepts']}</span><span class="stat-label">Concepts</span></div>
                        <div class="stat"><span class="stat-value">{section['total_pairs']}</span><span class="stat-label">Pairs Tested</span></div>
                        <div class="stat"><span class="stat-value {badge_class}">{section['discrepancies_found']}</span><span class="stat-label">Discrepancies</span></div>
                        <div class="stat"><span class="stat-value">{section['discrepancy_rate']}</span><span class="stat-label">Bias Rate</span></div>
                    </div>
                    <table>
                        <thead><tr><th>Concept A</th><th>Concept B</th><th>Dist Model A</th><th>Dist Model B</th><th>Chain</th></tr></thead>
                        <tbody>{rows_html}</tbody>
                    </table>
                </div>""")

            elif section["type"] == "graph":
                conn_html = ""
                for c in section.get("top_connections", []):
                    safe_a = html_mod.escape(str(c['concept_a']))
                    safe_b = html_mod.escape(str(c['concept_b']))
                    conn_html += f"""
                    <tr>
                        <td>{safe_a}</td>
                        <td>{safe_b}</td>
                        <td class="mono">{c['weight']}</td>
                        <td class="mono">{c['shared_primes']}</td>
                    </tr>"""

                html_sections.append(f"""
                <div class="section">
                    <h2>🌐 {section['title']}</h2>
                    <div class="stats">
                        <div class="stat"><span class="stat-value">{section['total_nodes']}</span><span class="stat-label">Nodes</span></div>
                        <div class="stat"><span class="stat-value">{section['total_edges']}</span><span class="stat-label">Edges</span></div>
                        <div class="stat"><span class="stat-value">{section['avg_edge_weight']:.1f}</span><span class="stat-label">Avg Weight</span></div>
                    </div>
                    <h3>Top 20 Strongest Connections</h3>
                    <table>
                        <thead><tr><th>Concept A</th><th>Concept B</th><th>Weight</th><th>Shared Primes</th></tr></thead>
                        <tbody>{conn_html}</tbody>
                    </table>
                </div>""")

        sections_joined = "\n".join(html_sections)

        return f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{self.title}</title>
    <style>
        * {{ margin: 0; padding: 0; box-sizing: border-box; }}
        body {{ 
            font-family: 'Segoe UI', -apple-system, system-ui, sans-serif; 
            background: #0d1117; color: #c9d1d9; 
            padding: 40px; line-height: 1.6;
        }}
        h1 {{ 
            color: #e94560; font-size: 2em; margin-bottom: 8px;
            background: linear-gradient(135deg, #e94560, #0f3460);
            -webkit-background-clip: text; -webkit-text-fill-color: transparent;
        }}
        .subtitle {{ color: #8b949e; margin-bottom: 32px; }}
        .section {{ 
            background: #161b22; border: 1px solid #30363d; 
            border-radius: 12px; padding: 24px; margin-bottom: 24px;
        }}
        h2 {{ color: #58a6ff; margin-bottom: 16px; }}
        h3 {{ color: #8b949e; margin: 16px 0 8px; }}
        .stats {{ 
            display: flex; gap: 16px; margin-bottom: 20px; flex-wrap: wrap;
        }}
        .stat {{ 
            background: linear-gradient(135deg, #1a1a2e, #16213e);
            border: 1px solid #0f3460; border-radius: 8px; 
            padding: 12px 20px; text-align: center; min-width: 120px;
        }}
        .stat-value {{ display: block; font-size: 1.5em; font-weight: 700; color: #e94560; }}
        .stat-label {{ display: block; font-size: 0.8em; color: #8b949e; margin-top: 4px; }}
        table {{ 
            width: 100%; border-collapse: collapse; margin-top: 12px;
        }}
        th {{ 
            background: #21262d; color: #58a6ff; padding: 10px 14px; 
            text-align: left; font-weight: 600; border-bottom: 2px solid #30363d;
        }}
        td {{ 
            padding: 8px 14px; border-bottom: 1px solid #21262d; 
        }}
        tr:hover {{ background: #1c2128; }}
        .mono {{ font-family: 'Fira Code', 'Consolas', monospace; font-size: 0.9em; }}
        .badge-ok {{ color: #3fb950; }}
        .badge-warn {{ color: #f85149; }}
        .footer {{ 
            text-align: center; color: #484f58; margin-top: 40px; 
            padding-top: 20px; border-top: 1px solid #21262d;
        }}
    </style>
</head>
<body>
    <h1>⚛️ {self.title}</h1>
    <p class="subtitle">Generated {self.timestamp} · Triadic Neurosymbolic Engine v{_engine_version()}</p>
    {sections_joined}
    <div class="footer">
        <p>© 2026 J. Arturo Ornelas Brand · Triadic Neurosymbolic Engine</p>
        <p>Deterministic Algebraic Framework for AI Interpretability</p>
    </div>
</body>
</html>"""

    def save(self, path: str, format: str = "html"):
        """Save the report to a file."""
        if format == "html":
            content = self.to_html()
        elif format == "json":
            content = self.to_json()
        elif format == "csv":
            content = self.to_csv()
        else:
            raise ValueError(f"Unsupported format: {format}. Use 'html', 'json', or 'csv'.")

        with open(path, "w", encoding="utf-8") as f:
            f.write(content)

add_encoding_section(prime_map: Dict[str, int], model: str, lsh_bits: int, factorize_fn=None)

Add the prime encoding results to the report.

Source code in engine-src/src/neurosym/reports.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def add_encoding_section(self, prime_map: Dict[str, int], model: str, lsh_bits: int, factorize_fn=None):
    """Add the prime encoding results to the report."""
    rows = []
    for concept, prime in sorted(prime_map.items(), key=lambda x: x[1]):
        factors = factorize_fn(prime) if factorize_fn else []
        rows.append({
            "concept": concept,
            "prime_factor": prime,
            "decomposition": factors,
        })

    self.sections.append({
        "type": "encoding",
        "title": "Prime Factor Encoding",
        "model": model,
        "lsh_bits": lsh_bits,
        "total_concepts": len(prime_map),
        "unique_primes": len(set(prime_map.values())),
        "rows": rows,
    })

add_audit_section(discrepancies: List[dict], model_a: str, model_b: str, total_pairs: int, total_concepts: int)

Add the model audit discrepancy results to the report.

Source code in engine-src/src/neurosym/reports.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def add_audit_section(self, discrepancies: List[dict], model_a: str, model_b: str, 
                      total_pairs: int, total_concepts: int):
    """Add the model audit discrepancy results to the report."""
    self.sections.append({
        "type": "audit",
        "title": f"Model Comparison: {model_a} vs {model_b}",
        "model_a": model_a,
        "model_b": model_b,
        "total_concepts": total_concepts,
        "total_pairs": total_pairs,
        "discrepancies_found": len(discrepancies),
        "discrepancy_rate": f"{len(discrepancies) / max(1, total_pairs) * 100:.1f}%",
        "rows": discrepancies,
    })

add_graph_section(edges: list, node_count: int)

Add graph statistics to the report.

Source code in engine-src/src/neurosym/reports.py
70
71
72
73
74
75
76
77
78
79
80
81
82
def add_graph_section(self, edges: list, node_count: int):
    """Add graph statistics to the report."""
    self.sections.append({
        "type": "graph",
        "title": "Semantic Graph Statistics",
        "total_nodes": node_count,
        "total_edges": len(edges),
        "avg_edge_weight": sum(e[2] for e in edges) / max(1, len(edges)) if edges else 0,
        "top_connections": [
            {"concept_a": a, "concept_b": b, "weight": w, "shared_primes": p}
            for a, b, w, p in sorted(edges, key=lambda x: -x[2])[:20]
        ],
    })

to_json(indent: int = 2) -> str

Export the full report as a JSON string.

Source code in engine-src/src/neurosym/reports.py
86
87
88
89
90
91
92
93
94
def to_json(self, indent: int = 2) -> str:
    """Export the full report as a JSON string."""
    report = {
        "title": self.title,
        "generated_at": self.timestamp,
        "engine": f"Triadic Neurosymbolic Engine v{_engine_version()}",
        "sections": self.sections,
    }
    return json.dumps(report, indent=indent, default=str)

to_csv() -> str

Export encoding and audit rows as CSV.

Source code in engine-src/src/neurosym/reports.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def to_csv(self) -> str:
    """Export encoding and audit rows as CSV."""
    output = io.StringIO()
    writer = csv.writer(output)

    for section in self.sections:
        if section["type"] == "encoding":
            writer.writerow(["--- Encoding Results ---"])
            writer.writerow(["Concept", "Prime Factor", "Decomposition"])
            for row in section["rows"]:
                writer.writerow([row["concept"], row["prime_factor"], str(row["decomposition"])])
            writer.writerow([])

        elif section["type"] == "audit":
            writer.writerow([f"--- Audit: {section['model_a']} vs {section['model_b']} ---"])
            writer.writerow(["Concept A", "Concept B", f"Dist {section['model_a']}", f"Dist {section['model_b']}", "Chain"])
            for row in section["rows"]:
                writer.writerow([
                    row.get("concept_a", ""),
                    row.get("concept_b", ""),
                    row.get("distance_model_a", ""),
                    row.get("distance_model_b", ""),
                    row.get("chain", ""),
                ])
            writer.writerow([])

    return output.getvalue()

to_html() -> str

Export a standalone HTML report with embedded dark-theme styling.

Source code in engine-src/src/neurosym/reports.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
    def to_html(self) -> str:
        """Export a standalone HTML report with embedded dark-theme styling."""
        html_sections = []

        for section in self.sections:
            if section["type"] == "encoding":
                rows_html = ""
                for row in section["rows"]:
                    factors_str = ", ".join(str(f) for f in row["decomposition"])
                    safe_concept = html_mod.escape(str(row['concept']))
                    rows_html += f"""
                    <tr>
                        <td>{safe_concept}</td>
                        <td class="mono">{row['prime_factor']}</td>
                        <td class="mono">[{factors_str}]</td>
                    </tr>"""

                html_sections.append(f"""
                <div class="section">
                    <h2>🧬 {section['title']}</h2>
                    <div class="stats">
                        <div class="stat"><span class="stat-value">{section['total_concepts']}</span><span class="stat-label">Concepts</span></div>
                        <div class="stat"><span class="stat-value">{section['unique_primes']}</span><span class="stat-label">Unique Clusters</span></div>
                        <div class="stat"><span class="stat-value">{section['model']}</span><span class="stat-label">Model</span></div>
                        <div class="stat"><span class="stat-value">{section['lsh_bits']}</span><span class="stat-label">LSH Bits</span></div>
                    </div>
                    <table>
                        <thead><tr><th>Concept</th><th>Prime Factor</th><th>Decomposition</th></tr></thead>
                        <tbody>{rows_html}</tbody>
                    </table>
                </div>""")

            elif section["type"] == "audit":
                rows_html = ""
                for row in section["rows"]:
                    safe_a = html_mod.escape(str(row.get('concept_a', '')))
                    safe_b = html_mod.escape(str(row.get('concept_b', '')))
                    safe_chain = html_mod.escape(str(row.get('chain', '')))
                    rows_html += f"""
                    <tr>
                        <td>{safe_a}</td>
                        <td>{safe_b}</td>
                        <td class="mono">{html_mod.escape(str(row.get('distance_model_a', '')))}</td>
                        <td class="mono">{html_mod.escape(str(row.get('distance_model_b', '')))}</td>
                        <td>{safe_chain}</td>
                    </tr>"""

                badge_class = "badge-ok" if section['discrepancies_found'] == 0 else "badge-warn"
                html_sections.append(f"""
                <div class="section">
                    <h2>🤖 {section['title']}</h2>
                    <div class="stats">
                        <div class="stat"><span class="stat-value">{section['total_concepts']}</span><span class="stat-label">Concepts</span></div>
                        <div class="stat"><span class="stat-value">{section['total_pairs']}</span><span class="stat-label">Pairs Tested</span></div>
                        <div class="stat"><span class="stat-value {badge_class}">{section['discrepancies_found']}</span><span class="stat-label">Discrepancies</span></div>
                        <div class="stat"><span class="stat-value">{section['discrepancy_rate']}</span><span class="stat-label">Bias Rate</span></div>
                    </div>
                    <table>
                        <thead><tr><th>Concept A</th><th>Concept B</th><th>Dist Model A</th><th>Dist Model B</th><th>Chain</th></tr></thead>
                        <tbody>{rows_html}</tbody>
                    </table>
                </div>""")

            elif section["type"] == "graph":
                conn_html = ""
                for c in section.get("top_connections", []):
                    safe_a = html_mod.escape(str(c['concept_a']))
                    safe_b = html_mod.escape(str(c['concept_b']))
                    conn_html += f"""
                    <tr>
                        <td>{safe_a}</td>
                        <td>{safe_b}</td>
                        <td class="mono">{c['weight']}</td>
                        <td class="mono">{c['shared_primes']}</td>
                    </tr>"""

                html_sections.append(f"""
                <div class="section">
                    <h2>🌐 {section['title']}</h2>
                    <div class="stats">
                        <div class="stat"><span class="stat-value">{section['total_nodes']}</span><span class="stat-label">Nodes</span></div>
                        <div class="stat"><span class="stat-value">{section['total_edges']}</span><span class="stat-label">Edges</span></div>
                        <div class="stat"><span class="stat-value">{section['avg_edge_weight']:.1f}</span><span class="stat-label">Avg Weight</span></div>
                    </div>
                    <h3>Top 20 Strongest Connections</h3>
                    <table>
                        <thead><tr><th>Concept A</th><th>Concept B</th><th>Weight</th><th>Shared Primes</th></tr></thead>
                        <tbody>{conn_html}</tbody>
                    </table>
                </div>""")

        sections_joined = "\n".join(html_sections)

        return f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{self.title}</title>
    <style>
        * {{ margin: 0; padding: 0; box-sizing: border-box; }}
        body {{ 
            font-family: 'Segoe UI', -apple-system, system-ui, sans-serif; 
            background: #0d1117; color: #c9d1d9; 
            padding: 40px; line-height: 1.6;
        }}
        h1 {{ 
            color: #e94560; font-size: 2em; margin-bottom: 8px;
            background: linear-gradient(135deg, #e94560, #0f3460);
            -webkit-background-clip: text; -webkit-text-fill-color: transparent;
        }}
        .subtitle {{ color: #8b949e; margin-bottom: 32px; }}
        .section {{ 
            background: #161b22; border: 1px solid #30363d; 
            border-radius: 12px; padding: 24px; margin-bottom: 24px;
        }}
        h2 {{ color: #58a6ff; margin-bottom: 16px; }}
        h3 {{ color: #8b949e; margin: 16px 0 8px; }}
        .stats {{ 
            display: flex; gap: 16px; margin-bottom: 20px; flex-wrap: wrap;
        }}
        .stat {{ 
            background: linear-gradient(135deg, #1a1a2e, #16213e);
            border: 1px solid #0f3460; border-radius: 8px; 
            padding: 12px 20px; text-align: center; min-width: 120px;
        }}
        .stat-value {{ display: block; font-size: 1.5em; font-weight: 700; color: #e94560; }}
        .stat-label {{ display: block; font-size: 0.8em; color: #8b949e; margin-top: 4px; }}
        table {{ 
            width: 100%; border-collapse: collapse; margin-top: 12px;
        }}
        th {{ 
            background: #21262d; color: #58a6ff; padding: 10px 14px; 
            text-align: left; font-weight: 600; border-bottom: 2px solid #30363d;
        }}
        td {{ 
            padding: 8px 14px; border-bottom: 1px solid #21262d; 
        }}
        tr:hover {{ background: #1c2128; }}
        .mono {{ font-family: 'Fira Code', 'Consolas', monospace; font-size: 0.9em; }}
        .badge-ok {{ color: #3fb950; }}
        .badge-warn {{ color: #f85149; }}
        .footer {{ 
            text-align: center; color: #484f58; margin-top: 40px; 
            padding-top: 20px; border-top: 1px solid #21262d;
        }}
    </style>
</head>
<body>
    <h1>⚛️ {self.title}</h1>
    <p class="subtitle">Generated {self.timestamp} · Triadic Neurosymbolic Engine v{_engine_version()}</p>
    {sections_joined}
    <div class="footer">
        <p>© 2026 J. Arturo Ornelas Brand · Triadic Neurosymbolic Engine</p>
        <p>Deterministic Algebraic Framework for AI Interpretability</p>
    </div>
</body>
</html>"""

save(path: str, format: str = 'html')

Save the report to a file.

Source code in engine-src/src/neurosym/reports.py
283
284
285
286
287
288
289
290
291
292
293
294
295
def save(self, path: str, format: str = "html"):
    """Save the report to a file."""
    if format == "html":
        content = self.to_html()
    elif format == "json":
        content = self.to_json()
    elif format == "csv":
        content = self.to_csv()
    else:
        raise ValueError(f"Unsupported format: {format}. Use 'html', 'json', or 'csv'.")

    with open(path, "w", encoding="utf-8") as f:
        f.write(content)

Ingest

DataFrame ingestion with inverted prime index and semantic search.

neurosym.ingest

DatabaseIngestor

Ingests tabular data (CSV) and builds a Discrete Prime Index for fast semantic search using integer arithmetic instead of vector distance.

Uses an inverted index keyed by prime factors for sub-linear lookup.

Source code in engine-src/src/neurosym/ingest.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
class DatabaseIngestor:
    """
    Ingests tabular data (CSV) and builds a Discrete Prime Index for fast
    semantic search using integer arithmetic instead of vector distance.

    Uses an inverted index keyed by prime factors for sub-linear lookup.
    """
    def __init__(self, encoder: ContinuousEncoder, mapper: DiscreteMapper):
        self.encoder = encoder
        self.mapper = mapper
        self.discrete_index: Dict[int, Dict] = {}  # {id: {text, prime_factor}}
        self.inverted_index: Dict[int, Set[int]] = defaultdict(set)  # {prime: {record_ids}}
        self.is_indexed = False

    def _factorize(self, n: int) -> List[int]:
        """Returns the list of unique prime factors of n."""
        if n <= 1:
            return []
        return list(sympy.factorint(n).keys())

    def ingest_dataframe(self, df: pd.DataFrame, text_column: str, id_column: str = None) -> Dict:
        """
        Takes a pandas DataFrame, vectorizes the text column, hashes it 
        into composite prime factors, and builds an inverted index.

        Complexity: O(N * E) where N = records and E = embedding time.
        """
        logger.info(f"Ingesting {len(df)} records into the Triadic Engine...")

        texts = df[text_column].astype(str).tolist()

        # 1. Continuous Vector Batch Encoding
        embeddings = self.encoder.encode(texts)

        # 2. Discrete Projection (LSH → Composite Primes)
        prime_map = self.mapper.fit_transform(texts, embeddings)

        # 3. Build the forward index AND the inverted index
        self.discrete_index.clear()
        self.inverted_index.clear()

        for idx, row in df.iterrows():
            record_id = row[id_column] if id_column and id_column in df.columns else idx
            text_content = row[text_column]
            prime_factor = prime_map[text_content]

            self.discrete_index[record_id] = {
                "text": text_content,
                "prime_factor": prime_factor
            }

            # Inverted index: map each prime factor to the records that contain it
            for p in self._factorize(prime_factor):
                self.inverted_index[p].add(record_id)

        self.is_indexed = True
        logger.info(f"Indexed {len(df)} records. Inverted index has {len(self.inverted_index)} prime buckets.")
        return self.discrete_index

    def triadic_search(self, query: str, top_k: int = 5) -> List[Tuple[int, str, int, int]]:
        """
        Semantic search using the inverted prime index.

        1. Encode the query into a composite prime.
        2. Factorize the query prime to get its active semantic features.
        3. Use the inverted index to find candidate records that share
           at least one prime factor (set union over factor buckets).
        4. Rank candidates by GCD-based distance.

        Complexity: O(C * log(C)) where C = number of candidate records
        sharing at least one factor. In the best case C << N.
        Falls back to full scan if no candidates are found.
        """
        if not self.is_indexed:
            raise ValueError("No database loaded. Please ingest data first.")

        # 1. Project query into discrete prime space (reuse existing planes)
        query_emb = self.encoder.encode([query])
        query_prime_map = self.mapper.transform([query], query_emb)
        query_prime = query_prime_map[query]

        # 2. Find candidates via inverted index (set union)
        query_factors = self._factorize(query_prime)
        candidate_ids: Set[int] = set()
        for p in query_factors:
            if p in self.inverted_index:
                candidate_ids.update(self.inverted_index[p])

        # 3. If no candidates share any factor, fall back to full scan
        if not candidate_ids:
            logger.info("No shared factors found. Falling back to full scan.")
            candidate_ids = set(self.discrete_index.keys())

        logger.info(f"Query '{query}' (prime={query_prime}): {len(candidate_ids)} candidates from {len(self.discrete_index)} total records.")

        # 4. Rank candidates by GCD-based distance
        results = []
        for record_id in candidate_ids:
            data = self.discrete_index[record_id]
            db_prime = data["prime_factor"]

            shared = math.gcd(query_prime, db_prime)
            missing = query_prime // shared
            extra = db_prime // shared

            if extra == 1 and missing == 1:
                distance = 0
            else:
                distance = abs(extra - missing) + (extra * missing)

            results.append((distance, record_id, data["text"], db_prime))

        results.sort(key=lambda x: x[0])
        return [(r[1], r[2], r[0], r[3]) for r in results[:top_k]]

ingest_dataframe(df: pd.DataFrame, text_column: str, id_column: str = None) -> Dict

Takes a pandas DataFrame, vectorizes the text column, hashes it into composite prime factors, and builds an inverted index.

Complexity: O(N * E) where N = records and E = embedding time.

Source code in engine-src/src/neurosym/ingest.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def ingest_dataframe(self, df: pd.DataFrame, text_column: str, id_column: str = None) -> Dict:
    """
    Takes a pandas DataFrame, vectorizes the text column, hashes it 
    into composite prime factors, and builds an inverted index.

    Complexity: O(N * E) where N = records and E = embedding time.
    """
    logger.info(f"Ingesting {len(df)} records into the Triadic Engine...")

    texts = df[text_column].astype(str).tolist()

    # 1. Continuous Vector Batch Encoding
    embeddings = self.encoder.encode(texts)

    # 2. Discrete Projection (LSH → Composite Primes)
    prime_map = self.mapper.fit_transform(texts, embeddings)

    # 3. Build the forward index AND the inverted index
    self.discrete_index.clear()
    self.inverted_index.clear()

    for idx, row in df.iterrows():
        record_id = row[id_column] if id_column and id_column in df.columns else idx
        text_content = row[text_column]
        prime_factor = prime_map[text_content]

        self.discrete_index[record_id] = {
            "text": text_content,
            "prime_factor": prime_factor
        }

        # Inverted index: map each prime factor to the records that contain it
        for p in self._factorize(prime_factor):
            self.inverted_index[p].add(record_id)

    self.is_indexed = True
    logger.info(f"Indexed {len(df)} records. Inverted index has {len(self.inverted_index)} prime buckets.")
    return self.discrete_index

Semantic search using the inverted prime index.

  1. Encode the query into a composite prime.
  2. Factorize the query prime to get its active semantic features.
  3. Use the inverted index to find candidate records that share at least one prime factor (set union over factor buckets).
  4. Rank candidates by GCD-based distance.

Complexity: O(C * log(C)) where C = number of candidate records sharing at least one factor. In the best case C << N. Falls back to full scan if no candidates are found.

Source code in engine-src/src/neurosym/ingest.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def triadic_search(self, query: str, top_k: int = 5) -> List[Tuple[int, str, int, int]]:
    """
    Semantic search using the inverted prime index.

    1. Encode the query into a composite prime.
    2. Factorize the query prime to get its active semantic features.
    3. Use the inverted index to find candidate records that share
       at least one prime factor (set union over factor buckets).
    4. Rank candidates by GCD-based distance.

    Complexity: O(C * log(C)) where C = number of candidate records
    sharing at least one factor. In the best case C << N.
    Falls back to full scan if no candidates are found.
    """
    if not self.is_indexed:
        raise ValueError("No database loaded. Please ingest data first.")

    # 1. Project query into discrete prime space (reuse existing planes)
    query_emb = self.encoder.encode([query])
    query_prime_map = self.mapper.transform([query], query_emb)
    query_prime = query_prime_map[query]

    # 2. Find candidates via inverted index (set union)
    query_factors = self._factorize(query_prime)
    candidate_ids: Set[int] = set()
    for p in query_factors:
        if p in self.inverted_index:
            candidate_ids.update(self.inverted_index[p])

    # 3. If no candidates share any factor, fall back to full scan
    if not candidate_ids:
        logger.info("No shared factors found. Falling back to full scan.")
        candidate_ids = set(self.discrete_index.keys())

    logger.info(f"Query '{query}' (prime={query_prime}): {len(candidate_ids)} candidates from {len(self.discrete_index)} total records.")

    # 4. Rank candidates by GCD-based distance
    results = []
    for record_id in candidate_ids:
        data = self.discrete_index[record_id]
        db_prime = data["prime_factor"]

        shared = math.gcd(query_prime, db_prime)
        missing = query_prime // shared
        extra = db_prime // shared

        if extra == 1 and missing == 1:
            distance = 0
        else:
            distance = abs(extra - missing) + (extra * missing)

        results.append((distance, record_id, data["text"], db_prime))

    results.sort(key=lambda x: x[0])
    return [(r[1], r[2], r[0], r[3]) for r in results[:top_k]]

Anomaly Detection

Multiplicative anomaly detection for tabular data.

neurosym.anomaly

RelationalRule dataclass

Defines a multiplicative relationship between columns in a DataFrame.

Total = Quantity × Unit_Price × Tax_Rate

rule = RelationalRule( name="Invoice Total", factor_columns=["qty", "unit_price", "tax_rate"], result_column="total" )

Source code in engine-src/src/neurosym/anomaly.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@dataclass
class RelationalRule:
    """
    Defines a multiplicative relationship between columns in a DataFrame.

    Example: Total = Quantity × Unit_Price × Tax_Rate
        rule = RelationalRule(
            name="Invoice Total",
            factor_columns=["qty", "unit_price", "tax_rate"],
            result_column="total"
        )
    """
    name: str
    factor_columns: List[str]            # Columns whose product should equal the result
    result_column: str                    # Column that holds the expected product
    tolerance: float = 0.01              # Fractional tolerance for floating point (1%)

Anomaly dataclass

A detected anomaly in one row of data.

Source code in engine-src/src/neurosym/anomaly.py
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class Anomaly:
    """A detected anomaly in one row of data."""
    row_index: int
    rule_name: str
    expected: float
    actual: float
    ratio: float                         # actual / expected — should be 1.0 for clean data
    missing_factor: float                # What factor would fix it
    severity: str                        # "CRITICAL", "WARNING", "INFO"
    explanation: str

AnomalyDetector

Detects anomalies in tabular data by verifying multiplicative relationships.

Uses the Shadow Engine's core insight: when a product relationship fails, GCD-based factor analysis identifies EXACTLY what is wrong and by how much.

This works on REAL numbers in your data — no embeddings, no LSH, no random primes.

Source code in engine-src/src/neurosym/anomaly.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
class AnomalyDetector:
    """
    Detects anomalies in tabular data by verifying multiplicative relationships.

    Uses the Shadow Engine's core insight: when a product relationship fails,
    GCD-based factor analysis identifies EXACTLY what is wrong and by how much.

    This works on REAL numbers in your data — no embeddings, no LSH, no random primes.
    """

    def __init__(self):
        self.rules: List[RelationalRule] = []

    def add_rule(self, rule: RelationalRule):
        """Register a multiplicative relationship rule."""
        self.rules.append(rule)
        logger.info(f"Added rule '{rule.name}': {' × '.join(rule.factor_columns)} = {rule.result_column}")

    def _classify_severity(self, ratio: float, tolerance: float) -> str:
        """Classify how severe the anomaly is based on deviation relative to tolerance.

        Thresholds are expressed as multiples of the rule's tolerance, so a tight
        tolerance (0.001) flags small deviations as CRITICAL while a loose tolerance
        (0.05) only escalates large ones.
        """
        deviation = abs(ratio - 1.0)
        if deviation <= tolerance:
            return "CLEAN"
        elif deviation <= tolerance * 5:
            return "INFO"      # Up to 5× tolerance — likely rounding
        elif deviation <= tolerance * 20:
            return "WARNING"   # Up to 20× tolerance — significant
        else:
            return "CRITICAL"  # More than 20× tolerance — major discrepancy

    def scan(self, df: pd.DataFrame) -> List[Anomaly]:
        """
        Scan every row of the DataFrame against all registered rules.
        Returns a list of Anomaly objects for rows that violate any rule.
        """
        if not self.rules:
            raise ValueError("No rules defined. Use add_rule() first.")

        anomalies = []

        for rule in self.rules:
            # Validate columns exist
            missing_cols = [c for c in rule.factor_columns + [rule.result_column] if c not in df.columns]
            if missing_cols:
                logger.warning(f"Rule '{rule.name}': columns {missing_cols} not found. Skipping.")
                continue

            for idx, row in df.iterrows():
                # Compute expected product
                expected_product = 1.0
                factor_parts = []
                skip = False

                for col in rule.factor_columns:
                    val = row[col]
                    if pd.isna(val) or val == 0:
                        skip = True
                        break
                    expected_product *= float(val)
                    factor_parts.append(f"{col}={val}")

                if skip:
                    continue

                actual = float(row[rule.result_column])

                if actual == 0:
                    continue

                # The ratio tells us the discrepancy factor
                ratio = actual / expected_product
                severity = self._classify_severity(ratio, rule.tolerance)

                if severity == "CLEAN":
                    continue

                # What factor would fix this row?
                missing_factor = ratio  # actual = expected × missing_factor

                # Build human-readable explanation
                explanation = (
                    f"Rule '{rule.name}': expected {' × '.join(factor_parts)} = {expected_product:.2f}, "
                    f"but {rule.result_column} = {actual:.2f}. "
                    f"Off by factor {ratio:.4f}x."
                )

                anomalies.append(Anomaly(
                    row_index=idx,
                    rule_name=rule.name,
                    expected=expected_product,
                    actual=actual,
                    ratio=ratio,
                    missing_factor=missing_factor,
                    severity=severity,
                    explanation=explanation
                ))

        # Sort by severity (CRITICAL first)
        severity_order = {"CRITICAL": 0, "WARNING": 1, "INFO": 2}
        anomalies.sort(key=lambda a: severity_order.get(a.severity, 3))

        logger.info(f"Scan complete: {len(anomalies)} anomalies found in {len(df)} rows.")
        return anomalies

add_rule(rule: RelationalRule)

Register a multiplicative relationship rule.

Source code in engine-src/src/neurosym/anomaly.py
53
54
55
56
def add_rule(self, rule: RelationalRule):
    """Register a multiplicative relationship rule."""
    self.rules.append(rule)
    logger.info(f"Added rule '{rule.name}': {' × '.join(rule.factor_columns)} = {rule.result_column}")

scan(df: pd.DataFrame) -> List[Anomaly]

Scan every row of the DataFrame against all registered rules. Returns a list of Anomaly objects for rows that violate any rule.

Source code in engine-src/src/neurosym/anomaly.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def scan(self, df: pd.DataFrame) -> List[Anomaly]:
    """
    Scan every row of the DataFrame against all registered rules.
    Returns a list of Anomaly objects for rows that violate any rule.
    """
    if not self.rules:
        raise ValueError("No rules defined. Use add_rule() first.")

    anomalies = []

    for rule in self.rules:
        # Validate columns exist
        missing_cols = [c for c in rule.factor_columns + [rule.result_column] if c not in df.columns]
        if missing_cols:
            logger.warning(f"Rule '{rule.name}': columns {missing_cols} not found. Skipping.")
            continue

        for idx, row in df.iterrows():
            # Compute expected product
            expected_product = 1.0
            factor_parts = []
            skip = False

            for col in rule.factor_columns:
                val = row[col]
                if pd.isna(val) or val == 0:
                    skip = True
                    break
                expected_product *= float(val)
                factor_parts.append(f"{col}={val}")

            if skip:
                continue

            actual = float(row[rule.result_column])

            if actual == 0:
                continue

            # The ratio tells us the discrepancy factor
            ratio = actual / expected_product
            severity = self._classify_severity(ratio, rule.tolerance)

            if severity == "CLEAN":
                continue

            # What factor would fix this row?
            missing_factor = ratio  # actual = expected × missing_factor

            # Build human-readable explanation
            explanation = (
                f"Rule '{rule.name}': expected {' × '.join(factor_parts)} = {expected_product:.2f}, "
                f"but {rule.result_column} = {actual:.2f}. "
                f"Off by factor {ratio:.4f}x."
            )

            anomalies.append(Anomaly(
                row_index=idx,
                rule_name=rule.name,
                expected=expected_product,
                actual=actual,
                ratio=ratio,
                missing_factor=missing_factor,
                severity=severity,
                explanation=explanation
            ))

    # Sort by severity (CRITICAL first)
    severity_order = {"CRITICAL": 0, "WARNING": 1, "INFO": 2}
    anomalies.sort(key=lambda a: severity_order.get(a.severity, 3))

    logger.info(f"Scan complete: {len(anomalies)} anomalies found in {len(df)} rows.")
    return anomalies