Skip to content

API Reference

Complete reference for the reptimeline Python package.

Core Data Structures

Foundational dataclasses: ConceptSnapshot, CodeEvent, ConnectionEvent, PhaseTransition, Timeline.

reptimeline.core

Core data structures for representation timeline tracking.

These dataclasses are backend-agnostic — they work with any discrete representation system (triadic bits, VQ-VAE, FSQ, sparse autoencoders).

ConceptSnapshot dataclass

What a model "thinks" about a set of concepts at a given training step.

This is the universal exchange format between extractors and the tracker. Every backend (triadic, VQ-VAE, FSQ) produces these.

Source code in triadic-microgpt-src/reptimeline/core.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@dataclass
class ConceptSnapshot:
    """What a model "thinks" about a set of concepts at a given training step.

    This is the universal exchange format between extractors and the tracker.
    Every backend (triadic, VQ-VAE, FSQ) produces these.
    """
    step: int
    codes: Dict[str, List[int]]
    continuous: Optional[Dict[str, List[float]]] = None
    metadata: Dict[str, Any] = field(default_factory=dict)

    @property
    def concepts(self) -> List[str]:
        return list(self.codes.keys())

    @property
    def code_dim(self) -> int:
        if self.codes:
            return len(next(iter(self.codes.values())))
        return 0

    def hamming(self, concept_a: str, concept_b: str) -> int:
        """Hamming distance between two concept codes."""
        a, b = self.codes.get(concept_a), self.codes.get(concept_b)
        if a is None or b is None:
            return -1
        return sum(x != y for x, y in zip(a, b))

    def active_indices(self, concept: str) -> List[int]:
        """Indices where code == 1 for a concept."""
        code = self.codes.get(concept)
        if code is None:
            return []
        return [i for i, v in enumerate(code) if v == 1]

hamming(concept_a: str, concept_b: str) -> int

Hamming distance between two concept codes.

Source code in triadic-microgpt-src/reptimeline/core.py
36
37
38
39
40
41
def hamming(self, concept_a: str, concept_b: str) -> int:
    """Hamming distance between two concept codes."""
    a, b = self.codes.get(concept_a), self.codes.get(concept_b)
    if a is None or b is None:
        return -1
    return sum(x != y for x, y in zip(a, b))

active_indices(concept: str) -> List[int]

Indices where code == 1 for a concept.

Source code in triadic-microgpt-src/reptimeline/core.py
43
44
45
46
47
48
def active_indices(self, concept: str) -> List[int]:
    """Indices where code == 1 for a concept."""
    code = self.codes.get(concept)
    if code is None:
        return []
    return [i for i, v in enumerate(code) if v == 1]

CodeEvent dataclass

A discrete event in a code element's lifecycle.

Source code in triadic-microgpt-src/reptimeline/core.py
51
52
53
54
55
56
57
58
@dataclass
class CodeEvent:
    """A discrete event in a code element's lifecycle."""
    event_type: str  # 'birth', 'death', 'flip', 'stabilize'
    step: int
    concept: str
    code_index: int
    detail: Dict[str, Any] = field(default_factory=dict)

ConnectionEvent dataclass

When two concepts first share (or lose) a discrete feature.

Source code in triadic-microgpt-src/reptimeline/core.py
61
62
63
64
65
66
67
68
@dataclass
class ConnectionEvent:
    """When two concepts first share (or lose) a discrete feature."""
    event_type: str  # 'form', 'break'
    step: int
    concept_a: str
    concept_b: str
    shared_indices: List[int] = field(default_factory=list)

PhaseTransition dataclass

A detected discontinuity in a training metric.

Source code in triadic-microgpt-src/reptimeline/core.py
71
72
73
74
75
76
77
@dataclass
class PhaseTransition:
    """A detected discontinuity in a training metric."""
    step: int
    metric: str
    delta: float
    direction: str  # 'increase' or 'decrease'

Timeline dataclass

Complete analysis result from TimelineTracker.

Source code in triadic-microgpt-src/reptimeline/core.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@dataclass
class Timeline:
    """Complete analysis result from TimelineTracker."""
    steps: List[int]
    snapshots: List[ConceptSnapshot]
    births: List[CodeEvent]
    deaths: List[CodeEvent]
    connections: List[ConnectionEvent]
    phase_transitions: List[PhaseTransition]
    curves: Dict[str, List[float]]  # metric_name -> values per step
    stability: Dict[int, float]  # code_index -> stability score [0,1]

    def print_summary(self):
        """Print a concise console summary."""
        print()
        print("=" * 60)
        print("  REPRESENTATION TIMELINE")
        print("=" * 60)
        print(f"  Steps:              {self.steps[0]:,} -> {self.steps[-1]:,} ({len(self.steps)} checkpoints)")
        print(f"  Concepts tracked:   {len(self.snapshots[-1].concepts) if self.snapshots else 0}")
        print(f"  Code dimension:     {self.snapshots[-1].code_dim if self.snapshots else 0}")
        print()
        print(f"  Bit births:         {len(self.births)}")
        print(f"  Bit deaths:         {len(self.deaths)}")
        print(f"  Connections formed: {len([c for c in self.connections if c.event_type == 'form'])}")
        print(f"  Phase transitions:  {len(self.phase_transitions)}")
        print()

        if 'churn_rate' in self.curves:
            churn = self.curves['churn_rate']
            print(f"  Code churn:         {churn[0]:.3f} -> {churn[-1]:.3f}")
        if 'utilization' in self.curves:
            util = self.curves['utilization']
            print(f"  Code utilization:   {util[0]:.3f} -> {util[-1]:.3f}")
        if 'entropy' in self.curves:
            ent = self.curves['entropy']
            print(f"  Mean entropy:       {ent[0]:.4f} -> {ent[-1]:.4f}")

        if self.phase_transitions:
            print()
            print("  Phase transitions:")
            for pt in self.phase_transitions:
                print(f"    step {pt.step:>6,}  {pt.metric:<15s}  "
                      f"{pt.direction} delta={pt.delta:.4f}")

        # Stability: most and least stable code elements
        if self.stability:
            sorted_stab = sorted(self.stability.items(), key=lambda x: x[1])
            print()
            print(f"  Least stable bits:  {[s[0] for s in sorted_stab[:5]]}")
            print(f"  Most stable bits:   {[s[0] for s in sorted_stab[-5:]]}")

        print("=" * 60)

print_summary()

Print a concise console summary.

Source code in triadic-microgpt-src/reptimeline/core.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def print_summary(self):
    """Print a concise console summary."""
    print()
    print("=" * 60)
    print("  REPRESENTATION TIMELINE")
    print("=" * 60)
    print(f"  Steps:              {self.steps[0]:,} -> {self.steps[-1]:,} ({len(self.steps)} checkpoints)")
    print(f"  Concepts tracked:   {len(self.snapshots[-1].concepts) if self.snapshots else 0}")
    print(f"  Code dimension:     {self.snapshots[-1].code_dim if self.snapshots else 0}")
    print()
    print(f"  Bit births:         {len(self.births)}")
    print(f"  Bit deaths:         {len(self.deaths)}")
    print(f"  Connections formed: {len([c for c in self.connections if c.event_type == 'form'])}")
    print(f"  Phase transitions:  {len(self.phase_transitions)}")
    print()

    if 'churn_rate' in self.curves:
        churn = self.curves['churn_rate']
        print(f"  Code churn:         {churn[0]:.3f} -> {churn[-1]:.3f}")
    if 'utilization' in self.curves:
        util = self.curves['utilization']
        print(f"  Code utilization:   {util[0]:.3f} -> {util[-1]:.3f}")
    if 'entropy' in self.curves:
        ent = self.curves['entropy']
        print(f"  Mean entropy:       {ent[0]:.4f} -> {ent[-1]:.4f}")

    if self.phase_transitions:
        print()
        print("  Phase transitions:")
        for pt in self.phase_transitions:
            print(f"    step {pt.step:>6,}  {pt.metric:<15s}  "
                  f"{pt.direction} delta={pt.delta:.4f}")

    # Stability: most and least stable code elements
    if self.stability:
        sorted_stab = sorted(self.stability.items(), key=lambda x: x[1])
        print()
        print(f"  Least stable bits:  {[s[0] for s in sorted_stab[:5]]}")
        print(f"  Most stable bits:   {[s[0] for s in sorted_stab[-5:]]}")

    print("=" * 60)

Timeline Tracker

Analyzes representation evolution across training snapshots. Detects births, deaths, connections, and phase transitions.

reptimeline.tracker

TimelineTracker — Backend-agnostic analysis of representation evolution.

Consumes a sequence of ConceptSnapshot objects and computes lifecycle events: births, deaths, connections, phase transitions, churn, stability.

TimelineTracker

Analyzes how discrete representations evolve across training snapshots.

Source code in triadic-microgpt-src/reptimeline/tracker.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
class TimelineTracker:
    """Analyzes how discrete representations evolve across training snapshots."""

    def __init__(self, extractor: RepresentationExtractor,
                 stability_window: int = 3):
        """
        Args:
            extractor: Backend-specific extractor for similarity/shared features.
            stability_window: Number of consecutive snapshots a code element must
                remain unchanged to count as "stabilized".
        """
        self.extractor = extractor
        self.stability_window = stability_window

    def analyze(self, snapshots: List[ConceptSnapshot],
                concept_pairs: Optional[List[Tuple[str, str]]] = None,
                ) -> Timeline:
        """Run full timeline analysis on a sequence of snapshots.

        Args:
            snapshots: List of ConceptSnapshot sorted by step.
            concept_pairs: Optional pairs to track connections for.
                If None, tracks all pairs (can be slow for many concepts).

        Returns:
            Timeline with all lifecycle events and curves.
        """
        if not snapshots:
            raise ValueError("Need at least one snapshot")

        steps = [s.step for s in snapshots]
        all_concepts = sorted(set().union(*(s.concepts for s in snapshots)))

        births = self._compute_births(snapshots, steps, all_concepts)
        deaths = self._compute_deaths(snapshots, steps, all_concepts)
        connections = self._compute_connections(snapshots, steps, concept_pairs)

        curves = {}
        curves['entropy'] = self._entropy_curve(snapshots)
        curves['churn_rate'] = self._churn_curve(snapshots)
        curves['utilization'] = self._utilization_curve(snapshots)

        stability = self._compute_stability(snapshots, all_concepts)

        phase_transitions = self._detect_phase_transitions(steps, curves)

        return Timeline(
            steps=steps,
            snapshots=snapshots,
            births=births,
            deaths=deaths,
            connections=connections,
            phase_transitions=phase_transitions,
            curves=curves,
            stability=stability,
        )

    # ------------------------------------------------------------------
    # Births: first step where a code element activates for a concept
    # ------------------------------------------------------------------

    def _compute_births(self, snapshots, steps, concepts):
        births = []
        seen = set()

        for t, snap in enumerate(snapshots):
            for concept in concepts:
                code = snap.codes.get(concept)
                if code is None:
                    continue
                for idx, val in enumerate(code):
                    if val == 1:
                        key = (concept, idx)
                        if key not in seen:
                            # Verify it wasn't active in earlier snapshots
                            was_active = False
                            for prev_t in range(t):
                                prev = snapshots[prev_t].codes.get(concept)
                                if prev and prev[idx] == 1:
                                    was_active = True
                                    break
                            if not was_active:
                                seen.add(key)
                                births.append(CodeEvent(
                                    event_type='birth',
                                    step=steps[t],
                                    concept=concept,
                                    code_index=idx,
                                ))
        return births

    # ------------------------------------------------------------------
    # Deaths: first step where a code element permanently deactivates
    # ------------------------------------------------------------------

    def _compute_deaths(self, snapshots, steps, concepts):
        deaths = []
        n_steps = len(snapshots)

        for concept in concepts:
            first_code = None
            for snap in snapshots:
                if concept in snap.codes:
                    first_code = snap.codes[concept]
                    break
            if first_code is None:
                continue

            n_bits = len(first_code)
            for idx in range(n_bits):
                for t in range(1, n_steps):
                    prev = snapshots[t - 1].codes.get(concept)
                    curr = snapshots[t].codes.get(concept)
                    if prev and curr and prev[idx] == 1 and curr[idx] == 0:
                        # Check permanence
                        stays_dead = all(
                            snapshots[ft].codes.get(concept, [0] * n_bits)[idx] == 0
                            for ft in range(t + 1, n_steps)
                        )
                        if stays_dead:
                            deaths.append(CodeEvent(
                                event_type='death',
                                step=steps[t],
                                concept=concept,
                                code_index=idx,
                            ))
                            break
        return deaths

    # ------------------------------------------------------------------
    # Connections: when two concepts first share a feature
    # ------------------------------------------------------------------

    def _compute_connections(self, snapshots, steps, pairs):
        connections = []
        if pairs is None:
            # All pairs from last snapshot
            concepts = snapshots[-1].concepts if snapshots else []
            pairs = [(a, b) for i, a in enumerate(concepts)
                     for b in concepts[i + 1:]]

        for a, b in pairs:
            for t, snap in enumerate(snapshots):
                code_a = snap.codes.get(a)
                code_b = snap.codes.get(b)
                if code_a is None or code_b is None:
                    continue

                shared = self.extractor.shared_features(code_a, code_b)
                if shared:
                    # Check if first time
                    was_connected = False
                    for prev_t in range(t):
                        prev_a = snapshots[prev_t].codes.get(a)
                        prev_b = snapshots[prev_t].codes.get(b)
                        if prev_a and prev_b:
                            if self.extractor.shared_features(prev_a, prev_b):
                                was_connected = True
                                break
                    if not was_connected:
                        connections.append(ConnectionEvent(
                            event_type='form',
                            step=steps[t],
                            concept_a=a,
                            concept_b=b,
                            shared_indices=shared,
                        ))
                    break
        return connections

    # ------------------------------------------------------------------
    # Curves
    # ------------------------------------------------------------------

    def _entropy_curve(self, snapshots):
        """Per-step mean entropy across all code elements."""
        curve = []
        for snap in snapshots:
            if not snap.codes:
                curve.append(0.0)
                continue
            n_bits = snap.code_dim
            codes = list(snap.codes.values())
            n = len(codes)
            entropies = []
            for bit_idx in range(n_bits):
                active = sum(1 for c in codes if c[bit_idx] == 1)
                p = active / n if n > 0 else 0
                if 0 < p < 1:
                    entropies.append(-p * np.log2(p) - (1 - p) * np.log2(1 - p))
                else:
                    entropies.append(0.0)
            curve.append(float(np.mean(entropies)))
        return curve

    def _churn_curve(self, snapshots):
        """Fraction of concepts whose code changed between consecutive steps."""
        curve = [0.0]  # No churn at first step
        for t in range(1, len(snapshots)):
            prev, curr = snapshots[t - 1], snapshots[t]
            common = set(prev.codes.keys()) & set(curr.codes.keys())
            if not common:
                curve.append(0.0)
                continue
            changed = sum(1 for c in common if prev.codes[c] != curr.codes[c])
            curve.append(changed / len(common))
        return curve

    def _utilization_curve(self, snapshots):
        """Fraction of unique codes out of total concepts at each step."""
        curve = []
        for snap in snapshots:
            if not snap.codes:
                curve.append(0.0)
                continue
            unique = len(set(tuple(v) for v in snap.codes.values()))
            curve.append(unique / len(snap.codes))
        return curve

    # ------------------------------------------------------------------
    # Stability: per code-element stability score
    # ------------------------------------------------------------------

    def _compute_stability(self, snapshots, concepts):
        """For each code index, compute stability = fraction of (concept, step)
        pairs where the code element didn't change from the previous step."""
        if len(snapshots) < 2:
            return {}

        n_bits = snapshots[-1].code_dim
        if n_bits == 0:
            return {}

        stable_counts = [0] * n_bits
        total_counts = [0] * n_bits

        for t in range(1, len(snapshots)):
            for concept in concepts:
                prev = snapshots[t - 1].codes.get(concept)
                curr = snapshots[t].codes.get(concept)
                if prev is None or curr is None:
                    continue
                for idx in range(min(n_bits, len(prev), len(curr))):
                    total_counts[idx] += 1
                    if prev[idx] == curr[idx]:
                        stable_counts[idx] += 1

        return {
            idx: stable_counts[idx] / max(total_counts[idx], 1)
            for idx in range(n_bits)
        }

    # ------------------------------------------------------------------
    # Phase transition detection
    # ------------------------------------------------------------------

    def _detect_phase_transitions(self, steps, curves, sigma_threshold=2.0):
        """Detect steps where any metric jumps by more than sigma_threshold * std."""
        transitions = []
        for metric_name, curve in curves.items():
            if len(curve) < 3:
                continue
            deltas = [curve[i] - curve[i - 1] for i in range(1, len(curve))]
            abs_deltas = [abs(d) for d in deltas]
            mean_d = np.mean(abs_deltas)
            std_d = np.std(abs_deltas)
            if std_d < 1e-8:
                continue
            threshold = mean_d + sigma_threshold * std_d
            for i, (d, ad) in enumerate(zip(deltas, abs_deltas)):
                if ad > threshold:
                    transitions.append(PhaseTransition(
                        step=steps[i + 1],
                        metric=metric_name,
                        delta=float(ad),
                        direction='increase' if d > 0 else 'decrease',
                    ))
        return transitions

analyze(snapshots: List[ConceptSnapshot], concept_pairs: Optional[List[Tuple[str, str]]] = None) -> Timeline

Run full timeline analysis on a sequence of snapshots.

Parameters:

Name Type Description Default
snapshots List[ConceptSnapshot]

List of ConceptSnapshot sorted by step.

required
concept_pairs Optional[List[Tuple[str, str]]]

Optional pairs to track connections for. If None, tracks all pairs (can be slow for many concepts).

None

Returns:

Type Description
Timeline

Timeline with all lifecycle events and curves.

Source code in triadic-microgpt-src/reptimeline/tracker.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def analyze(self, snapshots: List[ConceptSnapshot],
            concept_pairs: Optional[List[Tuple[str, str]]] = None,
            ) -> Timeline:
    """Run full timeline analysis on a sequence of snapshots.

    Args:
        snapshots: List of ConceptSnapshot sorted by step.
        concept_pairs: Optional pairs to track connections for.
            If None, tracks all pairs (can be slow for many concepts).

    Returns:
        Timeline with all lifecycle events and curves.
    """
    if not snapshots:
        raise ValueError("Need at least one snapshot")

    steps = [s.step for s in snapshots]
    all_concepts = sorted(set().union(*(s.concepts for s in snapshots)))

    births = self._compute_births(snapshots, steps, all_concepts)
    deaths = self._compute_deaths(snapshots, steps, all_concepts)
    connections = self._compute_connections(snapshots, steps, concept_pairs)

    curves = {}
    curves['entropy'] = self._entropy_curve(snapshots)
    curves['churn_rate'] = self._churn_curve(snapshots)
    curves['utilization'] = self._utilization_curve(snapshots)

    stability = self._compute_stability(snapshots, all_concepts)

    phase_transitions = self._detect_phase_transitions(steps, curves)

    return Timeline(
        steps=steps,
        snapshots=snapshots,
        births=births,
        deaths=deaths,
        connections=connections,
        phase_transitions=phase_transitions,
        curves=curves,
        stability=stability,
    )

Discovery

Bottom-up ontology discovery without prior knowledge. Finds duals, dependencies, triadic interactions, and hierarchy.

reptimeline.discovery

BitDiscovery — Discover what each bit "means" from unsupervised training.

Instead of pre-defining primitives and supervising, this module: 1. Takes a trained model (no anchor supervision needed) 2. Runs a large concept vocabulary through it 3. Analyzes which concepts activate which bits 4. Discovers: bit semantics, hierarchy, duals, dependencies

This enables bottom-up primitive discovery: the model invents its own ontology and reptimeline discovers what it is.

BitSemantics dataclass

What a single bit "means" based on what concepts activate it.

Source code in triadic-microgpt-src/reptimeline/discovery.py
23
24
25
26
27
28
29
30
@dataclass
class BitSemantics:
    """What a single bit "means" based on what concepts activate it."""
    bit_index: int
    activation_rate: float  # fraction of concepts that activate this bit
    top_concepts: List[str]  # concepts most associated with this bit
    anti_concepts: List[str]  # concepts that never activate this bit
    label: str = ""  # auto-generated semantic label

DiscoveredDual dataclass

A pair of bits that behave as opposites (anti-correlated).

Source code in triadic-microgpt-src/reptimeline/discovery.py
33
34
35
36
37
38
39
40
@dataclass
class DiscoveredDual:
    """A pair of bits that behave as opposites (anti-correlated)."""
    bit_a: int
    bit_b: int
    anti_correlation: float  # -1 = perfect opposites, 0 = independent
    concepts_exclusive: int  # times exactly one is active
    concepts_both: int  # times both are active (should be rare)

DiscoveredDependency dataclass

Bit B almost never activates without bit A being active first.

Source code in triadic-microgpt-src/reptimeline/discovery.py
43
44
45
46
47
48
49
@dataclass
class DiscoveredDependency:
    """Bit B almost never activates without bit A being active first."""
    bit_parent: int
    bit_child: int
    confidence: float  # P(parent=1 | child=1)
    support: int  # how many concepts show this pattern

DiscoveredTriadicDep dataclass

A 3-way interaction: bit r activates only when bits i AND j are both active.

This is an AND-gate in semantic space: neither i alone nor j alone predicts r, but their conjunction does. Analogous to epistasis in genetics.

Source code in triadic-microgpt-src/reptimeline/discovery.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@dataclass
class DiscoveredTriadicDep:
    """A 3-way interaction: bit r activates only when bits i AND j are both active.

    This is an AND-gate in semantic space: neither i alone nor j alone
    predicts r, but their conjunction does. Analogous to epistasis in genetics.
    """
    bit_i: int
    bit_j: int
    bit_r: int  # the emergent bit
    p_r_given_ij: float  # P(r=1 | i=1, j=1) — should be high
    p_r_given_i: float   # P(r=1 | i=1) — should be low
    p_r_given_j: float   # P(r=1 | j=1) — should be low
    interaction_strength: float  # p_r_given_ij - max(p_r_given_i, p_r_given_j)
    support: int  # how many concepts have i=1 AND j=1

DiscoveredHierarchy dataclass

Bits ordered by when they first stabilize during training.

Source code in triadic-microgpt-src/reptimeline/discovery.py
69
70
71
72
73
74
75
@dataclass
class DiscoveredHierarchy:
    """Bits ordered by when they first stabilize during training."""
    bit_index: int
    first_stable_step: Optional[int]  # first step where meaning stabilizes
    layer: int  # discovered layer (1=earliest, N=latest)
    n_dependents: int  # how many other bits depend on this one

DiscoveryReport dataclass

Complete bottom-up discovery of what a model learned.

Source code in triadic-microgpt-src/reptimeline/discovery.py
78
79
80
81
82
83
84
85
86
87
88
@dataclass
class DiscoveryReport:
    """Complete bottom-up discovery of what a model learned."""
    bit_semantics: List[BitSemantics]
    discovered_duals: List[DiscoveredDual]
    discovered_deps: List[DiscoveredDependency]
    discovered_triadic_deps: List[DiscoveredTriadicDep]
    discovered_hierarchy: List[DiscoveredHierarchy]
    n_active_bits: int  # bits with activation_rate > threshold
    n_dead_bits: int  # bits that almost never activate
    metadata: Dict = field(default_factory=dict)

BitDiscovery

Discovers what each bit encodes without prior knowledge of primitives.

This is the inverse of PrimitiveOverlay: instead of mapping known primitives onto bits, it discovers what the bits mean by analyzing activation patterns across a large concept vocabulary.

Source code in triadic-microgpt-src/reptimeline/discovery.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
class BitDiscovery:
    """Discovers what each bit encodes without prior knowledge of primitives.

    This is the inverse of PrimitiveOverlay: instead of mapping known
    primitives onto bits, it discovers what the bits mean by analyzing
    activation patterns across a large concept vocabulary.
    """

    def __init__(self, dead_threshold: float = 0.02,
                 dual_threshold: float = -0.3,
                 dep_confidence: float = 0.9,
                 triadic_threshold: float = 0.7,
                 triadic_min_interaction: float = 0.2):
        """
        Args:
            dead_threshold: Bits with activation rate below this are "dead".
            dual_threshold: Correlation below this counts as a dual pair.
            dep_confidence: P(parent|child) above this counts as dependency.
            triadic_threshold: P(r|i,j) above this for triadic detection.
            triadic_min_interaction: Minimum interaction strength
                (P(r|i,j) - max(P(r|i), P(r|j))).
        """
        self.dead_threshold = dead_threshold
        self.dual_threshold = dual_threshold
        self.dep_confidence = dep_confidence
        self.triadic_threshold = triadic_threshold
        self.triadic_min_interaction = triadic_min_interaction

    def discover(self, snapshot: ConceptSnapshot,
                 timeline: Optional[Timeline] = None,
                 top_k: int = 10) -> DiscoveryReport:
        """Discover bit semantics from a single snapshot (or a full timeline).

        Args:
            snapshot: A ConceptSnapshot with codes for many concepts.
                Use the LAST snapshot from training for best results.
            timeline: Optional full timeline for hierarchy discovery.
            top_k: Number of top/anti concepts per bit.
        """
        concepts = list(snapshot.codes.keys())
        if not concepts:
            raise ValueError("Snapshot has no concepts")

        n_bits = snapshot.code_dim
        codes_matrix = np.array([snapshot.codes[c] for c in concepts])

        bit_semantics = self._discover_semantics(
            codes_matrix, concepts, n_bits, top_k
        )
        duals = self._discover_duals(codes_matrix, n_bits)
        deps = self._discover_dependencies(codes_matrix, n_bits)
        triadic_deps = self._discover_triadic_deps(codes_matrix, n_bits)

        hierarchy = []
        if timeline is not None:
            hierarchy = self._discover_hierarchy(timeline, n_bits, deps)

        n_active = sum(1 for bs in bit_semantics
                       if bs.activation_rate > self.dead_threshold)

        return DiscoveryReport(
            bit_semantics=bit_semantics,
            discovered_duals=duals,
            discovered_deps=deps,
            discovered_triadic_deps=triadic_deps,
            discovered_hierarchy=hierarchy,
            n_active_bits=n_active,
            n_dead_bits=n_bits - n_active,
            metadata={
                'n_concepts': len(concepts),
                'n_bits': n_bits,
            },
        )

    # ------------------------------------------------------------------
    # Bit semantics: what does each bit mean?
    # ------------------------------------------------------------------

    def _discover_semantics(self, codes: np.ndarray, concepts: List[str],
                            n_bits: int, top_k: int) -> List[BitSemantics]:
        """For each bit, find which concepts activate it most/least."""
        results = []
        n_concepts = len(concepts)

        for bit_idx in range(n_bits):
            column = codes[:, bit_idx]
            activation_rate = float(column.mean())

            # Concepts where this bit is active
            active_mask = column == 1
            active_concepts = [concepts[i] for i in range(n_concepts)
                               if active_mask[i]]
            inactive_concepts = [concepts[i] for i in range(n_concepts)
                                 if not active_mask[i]]

            # Top concepts: those where this bit is active (limited to top_k)
            top = active_concepts[:top_k]
            anti = inactive_concepts[:top_k]

            # Auto-label: common theme among top concepts (placeholder)
            label = f"bit_{bit_idx}"
            if activation_rate < self.dead_threshold:
                label = f"bit_{bit_idx}_DEAD"

            results.append(BitSemantics(
                bit_index=bit_idx,
                activation_rate=activation_rate,
                top_concepts=top,
                anti_concepts=anti,
                label=label,
            ))
        return results

    # ------------------------------------------------------------------
    # Dual discovery: which bits are opposites?
    # ------------------------------------------------------------------

    def _discover_duals(self, codes: np.ndarray,
                        n_bits: int) -> List[DiscoveredDual]:
        """Find bit pairs that anti-correlate (mutual exclusion)."""
        # Use numpy's corrcoef for correct Pearson correlation
        # Handle constant columns (dead bits) gracefully
        with np.errstate(divide='ignore', invalid='ignore'):
            corr = np.corrcoef(codes.T)
        corr = np.nan_to_num(corr, nan=0.0)

        duals = []
        seen = set()
        for i in range(n_bits):
            for j in range(i + 1, n_bits):
                if corr[i, j] < self.dual_threshold:
                    key = (min(i, j), max(i, j))
                    if key not in seen:
                        seen.add(key)
                        both = int(((codes[:, i] == 1) & (codes[:, j] == 1)).sum())
                        excl = int(((codes[:, i] == 1) ^ (codes[:, j] == 1)).sum())
                        duals.append(DiscoveredDual(
                            bit_a=i, bit_b=j,
                            anti_correlation=float(corr[i, j]),
                            concepts_exclusive=excl,
                            concepts_both=both,
                        ))

        duals.sort(key=lambda d: d.anti_correlation)
        return duals

    # ------------------------------------------------------------------
    # Dependency discovery: which bits require other bits?
    # ------------------------------------------------------------------

    def _discover_dependencies(self, codes: np.ndarray,
                               n_bits: int) -> List[DiscoveredDependency]:
        """Find bit pairs where child almost never activates without parent."""
        deps = []
        for child in range(n_bits):
            child_active = codes[:, child] == 1
            n_child = int(child_active.sum())
            if n_child < 3:  # too few activations
                continue

            for parent in range(n_bits):
                if parent == child:
                    continue
                parent_active = codes[:, parent] == 1
                # P(parent=1 | child=1)
                both = int((child_active & parent_active).sum())
                confidence = both / n_child

                if confidence >= self.dep_confidence:
                    deps.append(DiscoveredDependency(
                        bit_parent=parent,
                        bit_child=child,
                        confidence=confidence,
                        support=n_child,
                    ))

        deps.sort(key=lambda d: d.confidence, reverse=True)
        return deps

    # ------------------------------------------------------------------
    # Triadic dependency discovery: 3-way interactions
    # ------------------------------------------------------------------

    def _discover_triadic_deps(self, codes: np.ndarray,
                                n_bits: int,
                                min_support: int = 3,
                                ) -> List[DiscoveredTriadicDep]:
        """Find 3-way interactions: bit r activates when i AND j together,
        but not when either is active alone.

        For all triples (i, j, r):
            P(r=1 | i=1, j=1) > triadic_threshold
            P(r=1 | i=1)      < triadic_threshold
            P(r=1 | j=1)      < triadic_threshold
            interaction = P(r|i,j) - max(P(r|i), P(r|j)) > min_interaction

        Complexity: O(K^2 * K) where K = active bits. With 37 active bits:
        ~23,000 triples — runs in seconds.
        """
        triadic = []

        # Pre-compute active masks and counts for all bits
        active_masks = []
        active_counts = []
        for b in range(n_bits):
            mask = codes[:, b] == 1
            active_masks.append(mask)
            active_counts.append(int(mask.sum()))

        for i in range(n_bits):
            if active_counts[i] < min_support:
                continue
            i_mask = active_masks[i]

            for j in range(i + 1, n_bits):
                if active_counts[j] < min_support:
                    continue
                j_mask = active_masks[j]

                # Conjunction mask: both i and j active
                ij_mask = i_mask & j_mask
                n_ij = int(ij_mask.sum())
                if n_ij < min_support:
                    continue

                for r in range(n_bits):
                    if r == i or r == j:
                        continue
                    if active_counts[r] < min_support:
                        continue

                    r_mask = active_masks[r]

                    # P(r|i,j)
                    p_r_ij = int((ij_mask & r_mask).sum()) / n_ij
                    if p_r_ij < self.triadic_threshold:
                        continue

                    # P(r|i) and P(r|j)
                    p_r_i = int((i_mask & r_mask).sum()) / active_counts[i]
                    p_r_j = int((j_mask & r_mask).sum()) / active_counts[j]

                    # Both must be below threshold individually
                    if p_r_i >= self.triadic_threshold or p_r_j >= self.triadic_threshold:
                        continue

                    interaction = p_r_ij - max(p_r_i, p_r_j)
                    if interaction < self.triadic_min_interaction:
                        continue

                    triadic.append(DiscoveredTriadicDep(
                        bit_i=i, bit_j=j, bit_r=r,
                        p_r_given_ij=p_r_ij,
                        p_r_given_i=p_r_i,
                        p_r_given_j=p_r_j,
                        interaction_strength=interaction,
                        support=n_ij,
                    ))

        triadic.sort(key=lambda t: t.interaction_strength, reverse=True)
        return triadic

    # ------------------------------------------------------------------
    # Hierarchy: which bits stabilize first?
    # ------------------------------------------------------------------

    def _discover_hierarchy(self, timeline: Timeline, n_bits: int,
                            deps: List[DiscoveredDependency],
                            ) -> List[DiscoveredHierarchy]:
        """Order bits by when they first stabilize during training."""
        # Stability: first step where a bit's meaning doesn't change
        # for stability_window consecutive steps
        first_stable = {}
        window = 3

        for bit_idx in range(n_bits):
            consecutive_stable = 0
            for t in range(1, len(timeline.snapshots)):
                prev_snap = timeline.snapshots[t - 1]
                curr_snap = timeline.snapshots[t]

                # Check if this bit's activation pattern is the same
                changed = False
                for concept in curr_snap.concepts:
                    prev_code = prev_snap.codes.get(concept)
                    curr_code = curr_snap.codes.get(concept)
                    if prev_code and curr_code:
                        if (bit_idx < len(prev_code) and bit_idx < len(curr_code)
                                and prev_code[bit_idx] != curr_code[bit_idx]):
                            changed = True
                            break

                if not changed:
                    consecutive_stable += 1
                    if consecutive_stable >= window and bit_idx not in first_stable:
                        first_stable[bit_idx] = timeline.steps[t]
                else:
                    consecutive_stable = 0

        # Count dependents per bit
        n_dependents = defaultdict(int)
        for dep in deps:
            n_dependents[dep.bit_parent] += 1

        # Assign layers by stability order
        stable_bits = sorted(first_stable.items(), key=lambda x: x[1])
        if stable_bits:
            steps_sorted = sorted(set(s for _, s in stable_bits))
            step_to_layer = {s: i + 1 for i, s in enumerate(steps_sorted)}
        else:
            step_to_layer = {}

        results = []
        for bit_idx in range(n_bits):
            step = first_stable.get(bit_idx)
            layer = step_to_layer.get(step, 0) if step else 0
            results.append(DiscoveredHierarchy(
                bit_index=bit_idx,
                first_stable_step=step,
                layer=layer,
                n_dependents=n_dependents[bit_idx],
            ))

        results.sort(key=lambda h: (h.first_stable_step or float('inf')))
        return results

    # ------------------------------------------------------------------
    # Pretty print
    # ------------------------------------------------------------------

    def print_report(self, report: DiscoveryReport):
        """Print discovery results."""
        print()
        print("=" * 60)
        print("  BIT DISCOVERY REPORT")
        print("=" * 60)
        print(f"  Concepts analyzed: {report.metadata.get('n_concepts', 0)}")
        print(f"  Total bits:        {report.metadata.get('n_bits', 0)}")
        print(f"  Active bits:       {report.n_active_bits}")
        print(f"  Dead bits:         {report.n_dead_bits}")
        print()

        # Top active bits with their concepts
        active_bits = [bs for bs in report.bit_semantics
                       if bs.activation_rate > self.dead_threshold]
        active_bits.sort(key=lambda b: b.activation_rate, reverse=True)

        print("  MOST ACTIVE BITS (what activates most concepts)")
        print("  " + "-" * 56)
        for bs in active_bits[:10]:
            concepts_str = ", ".join(bs.top_concepts[:5])
            print(f"    bit {bs.bit_index:>2d}  rate={bs.activation_rate:.2f}"
                  f"  [{concepts_str}]")
        print()

        # Discovered duals
        if report.discovered_duals:
            print(f"  DISCOVERED DUALS ({len(report.discovered_duals)} pairs)")
            print("  " + "-" * 56)
            for dual in report.discovered_duals[:10]:
                print(f"    bit {dual.bit_a:>2d} <-> bit {dual.bit_b:>2d}"
                      f"  corr={dual.anti_correlation:+.3f}"
                      f"  (excl={dual.concepts_exclusive},"
                      f" both={dual.concepts_both})")
            print()

        # Discovered dependencies
        if report.discovered_deps:
            print(f"  DISCOVERED DEPENDENCIES ({len(report.discovered_deps)} edges)")
            print("  " + "-" * 56)
            for dep in report.discovered_deps[:15]:
                print(f"    bit {dep.bit_parent:>2d} -> bit {dep.bit_child:>2d}"
                      f"  P(parent|child)={dep.confidence:.2f}"
                      f"  support={dep.support}")
            print()

        # Triadic dependencies
        if report.discovered_triadic_deps:
            print(f"  TRIADIC INTERACTIONS ({len(report.discovered_triadic_deps)} triples)")
            print("  " + "-" * 56)
            for td in report.discovered_triadic_deps[:15]:
                print(f"    bit {td.bit_i:>2d} + bit {td.bit_j:>2d} -> bit {td.bit_r:>2d}"
                      f"  P(r|i,j)={td.p_r_given_ij:.2f}"
                      f"  P(r|i)={td.p_r_given_i:.2f}"
                      f"  P(r|j)={td.p_r_given_j:.2f}"
                      f"  strength={td.interaction_strength:.2f}"
                      f"  n={td.support}")
            print()

        # Hierarchy
        if report.discovered_hierarchy:
            layers = defaultdict(list)
            for h in report.discovered_hierarchy:
                if h.layer > 0:
                    layers[h.layer].append(h)

            if layers:
                print(f"  DISCOVERED HIERARCHY ({len(layers)} layers)")
                print("  " + "-" * 56)
                for layer_num in sorted(layers.keys())[:8]:
                    bits_in_layer = layers[layer_num]
                    bit_ids = [str(h.bit_index) for h in bits_in_layer[:8]]
                    step = bits_in_layer[0].first_stable_step
                    print(f"    Layer {layer_num}: bits [{', '.join(bit_ids)}]"
                          f"  stable at step {step:,}")
                print()

        print("=" * 60)

discover(snapshot: ConceptSnapshot, timeline: Optional[Timeline] = None, top_k: int = 10) -> DiscoveryReport

Discover bit semantics from a single snapshot (or a full timeline).

Parameters:

Name Type Description Default
snapshot ConceptSnapshot

A ConceptSnapshot with codes for many concepts. Use the LAST snapshot from training for best results.

required
timeline Optional[Timeline]

Optional full timeline for hierarchy discovery.

None
top_k int

Number of top/anti concepts per bit.

10
Source code in triadic-microgpt-src/reptimeline/discovery.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def discover(self, snapshot: ConceptSnapshot,
             timeline: Optional[Timeline] = None,
             top_k: int = 10) -> DiscoveryReport:
    """Discover bit semantics from a single snapshot (or a full timeline).

    Args:
        snapshot: A ConceptSnapshot with codes for many concepts.
            Use the LAST snapshot from training for best results.
        timeline: Optional full timeline for hierarchy discovery.
        top_k: Number of top/anti concepts per bit.
    """
    concepts = list(snapshot.codes.keys())
    if not concepts:
        raise ValueError("Snapshot has no concepts")

    n_bits = snapshot.code_dim
    codes_matrix = np.array([snapshot.codes[c] for c in concepts])

    bit_semantics = self._discover_semantics(
        codes_matrix, concepts, n_bits, top_k
    )
    duals = self._discover_duals(codes_matrix, n_bits)
    deps = self._discover_dependencies(codes_matrix, n_bits)
    triadic_deps = self._discover_triadic_deps(codes_matrix, n_bits)

    hierarchy = []
    if timeline is not None:
        hierarchy = self._discover_hierarchy(timeline, n_bits, deps)

    n_active = sum(1 for bs in bit_semantics
                   if bs.activation_rate > self.dead_threshold)

    return DiscoveryReport(
        bit_semantics=bit_semantics,
        discovered_duals=duals,
        discovered_deps=deps,
        discovered_triadic_deps=triadic_deps,
        discovered_hierarchy=hierarchy,
        n_active_bits=n_active,
        n_dead_bits=n_bits - n_active,
        metadata={
            'n_concepts': len(concepts),
            'n_bits': n_bits,
        },
    )

print_report(report: DiscoveryReport)

Print discovery results.

Source code in triadic-microgpt-src/reptimeline/discovery.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
def print_report(self, report: DiscoveryReport):
    """Print discovery results."""
    print()
    print("=" * 60)
    print("  BIT DISCOVERY REPORT")
    print("=" * 60)
    print(f"  Concepts analyzed: {report.metadata.get('n_concepts', 0)}")
    print(f"  Total bits:        {report.metadata.get('n_bits', 0)}")
    print(f"  Active bits:       {report.n_active_bits}")
    print(f"  Dead bits:         {report.n_dead_bits}")
    print()

    # Top active bits with their concepts
    active_bits = [bs for bs in report.bit_semantics
                   if bs.activation_rate > self.dead_threshold]
    active_bits.sort(key=lambda b: b.activation_rate, reverse=True)

    print("  MOST ACTIVE BITS (what activates most concepts)")
    print("  " + "-" * 56)
    for bs in active_bits[:10]:
        concepts_str = ", ".join(bs.top_concepts[:5])
        print(f"    bit {bs.bit_index:>2d}  rate={bs.activation_rate:.2f}"
              f"  [{concepts_str}]")
    print()

    # Discovered duals
    if report.discovered_duals:
        print(f"  DISCOVERED DUALS ({len(report.discovered_duals)} pairs)")
        print("  " + "-" * 56)
        for dual in report.discovered_duals[:10]:
            print(f"    bit {dual.bit_a:>2d} <-> bit {dual.bit_b:>2d}"
                  f"  corr={dual.anti_correlation:+.3f}"
                  f"  (excl={dual.concepts_exclusive},"
                  f" both={dual.concepts_both})")
        print()

    # Discovered dependencies
    if report.discovered_deps:
        print(f"  DISCOVERED DEPENDENCIES ({len(report.discovered_deps)} edges)")
        print("  " + "-" * 56)
        for dep in report.discovered_deps[:15]:
            print(f"    bit {dep.bit_parent:>2d} -> bit {dep.bit_child:>2d}"
                  f"  P(parent|child)={dep.confidence:.2f}"
                  f"  support={dep.support}")
        print()

    # Triadic dependencies
    if report.discovered_triadic_deps:
        print(f"  TRIADIC INTERACTIONS ({len(report.discovered_triadic_deps)} triples)")
        print("  " + "-" * 56)
        for td in report.discovered_triadic_deps[:15]:
            print(f"    bit {td.bit_i:>2d} + bit {td.bit_j:>2d} -> bit {td.bit_r:>2d}"
                  f"  P(r|i,j)={td.p_r_given_ij:.2f}"
                  f"  P(r|i)={td.p_r_given_i:.2f}"
                  f"  P(r|j)={td.p_r_given_j:.2f}"
                  f"  strength={td.interaction_strength:.2f}"
                  f"  n={td.support}")
        print()

    # Hierarchy
    if report.discovered_hierarchy:
        layers = defaultdict(list)
        for h in report.discovered_hierarchy:
            if h.layer > 0:
                layers[h.layer].append(h)

        if layers:
            print(f"  DISCOVERED HIERARCHY ({len(layers)} layers)")
            print("  " + "-" * 56)
            for layer_num in sorted(layers.keys())[:8]:
                bits_in_layer = layers[layer_num]
                bit_ids = [str(h.bit_index) for h in bits_in_layer[:8]]
                step = bits_in_layer[0].first_stable_step
                print(f"    Layer {layer_num}: bits [{', '.join(bit_ids)}]"
                      f"  stable at step {step:,}")
            print()

    print("=" * 60)

AutoLabeler

Translates discovered bit semantics to human-readable labels using three strategies: embedding, contrastive, and LLM.

reptimeline.autolabel

AutoLabeler — Automatically name discovered bits using semantic analysis.

Three strategies
  1. Embedding-based: find the word closest to the centroid of active concepts
  2. Contrastive: find the word that best separates active vs inactive concepts
  3. LLM-based: ask an LLM "what do these concepts have in common?"

Strategy 1 and 2 work offline (no API needed). Strategy 3 is most accurate but requires an LLM API call.

BitLabel dataclass

A discovered label for a bit.

Source code in triadic-microgpt-src/reptimeline/autolabel.py
23
24
25
26
27
28
29
30
31
@dataclass
class BitLabel:
    """A discovered label for a bit."""
    bit_index: int
    label: str
    confidence: float  # 0-1
    method: str  # 'embedding', 'contrastive', 'llm', 'manual'
    active_concepts: List[str]
    inactive_concepts: List[str]

AutoLabeler

Assigns human-readable labels to discovered bits.

Source code in triadic-microgpt-src/reptimeline/autolabel.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
class AutoLabeler:
    """Assigns human-readable labels to discovered bits."""

    def __init__(self):
        self._embeddings = None
        self._vocab = None

    # ------------------------------------------------------------------
    # Strategy 1: Embedding centroid
    # ------------------------------------------------------------------

    def label_by_embedding(self, report: DiscoveryReport,
                           embeddings: Dict[str, np.ndarray],
                           candidate_labels: Optional[List[str]] = None,
                           ) -> List[BitLabel]:
        """Name each bit by finding the word closest to the centroid
        of its active concepts in embedding space.

        Args:
            report: DiscoveryReport from BitDiscovery.
            embeddings: Dict mapping concept -> embedding vector.
            candidate_labels: Optional restricted vocabulary for labels.
                If None, uses all keys in embeddings.
        """
        if candidate_labels is None:
            candidate_labels = list(embeddings.keys())

        # Pre-compute candidate matrix
        candidates = [(w, embeddings[w]) for w in candidate_labels
                       if w in embeddings]
        if not candidates:
            return []
        cand_words, cand_vecs = zip(*candidates)
        cand_matrix = np.stack(cand_vecs)
        cand_norms = np.linalg.norm(cand_matrix, axis=1, keepdims=True)
        cand_norms = np.where(cand_norms < 1e-8, 1.0, cand_norms)
        cand_normed = cand_matrix / cand_norms

        labels = []
        for bs in report.bit_semantics:
            if bs.activation_rate < 0.02:
                labels.append(BitLabel(
                    bit_index=bs.bit_index, label="DEAD",
                    confidence=0.0, method='embedding',
                    active_concepts=[], inactive_concepts=[],
                ))
                continue

            # Centroid of active concepts
            active_vecs = [embeddings[c] for c in bs.top_concepts
                           if c in embeddings]
            if not active_vecs:
                labels.append(BitLabel(
                    bit_index=bs.bit_index, label=f"bit_{bs.bit_index}",
                    confidence=0.0, method='embedding',
                    active_concepts=bs.top_concepts,
                    inactive_concepts=bs.anti_concepts,
                ))
                continue

            centroid = np.mean(active_vecs, axis=0)
            centroid_norm = np.linalg.norm(centroid)
            if centroid_norm < 1e-8:
                continue
            centroid_normed = centroid / centroid_norm

            # Cosine similarity with all candidates
            sims = cand_normed @ centroid_normed
            best_idx = int(np.argmax(sims))
            best_word = cand_words[best_idx]
            best_sim = float(sims[best_idx])

            labels.append(BitLabel(
                bit_index=bs.bit_index,
                label=best_word,
                confidence=best_sim,
                method='embedding',
                active_concepts=bs.top_concepts,
                inactive_concepts=bs.anti_concepts,
            ))

        return labels

    # ------------------------------------------------------------------
    # Strategy 2: Contrastive (active vs inactive)
    # ------------------------------------------------------------------

    def label_by_contrast(self, report: DiscoveryReport,
                          embeddings: Dict[str, np.ndarray],
                          candidate_labels: Optional[List[str]] = None,
                          ) -> List[BitLabel]:
        """Name each bit by finding the word that best separates
        active concepts from inactive concepts.

        The label is the word whose embedding is most similar to
        (centroid_active - centroid_inactive).
        """
        if candidate_labels is None:
            candidate_labels = list(embeddings.keys())

        candidates = [(w, embeddings[w]) for w in candidate_labels
                       if w in embeddings]
        if not candidates:
            return []
        cand_words, cand_vecs = zip(*candidates)
        cand_matrix = np.stack(cand_vecs)
        cand_norms = np.linalg.norm(cand_matrix, axis=1, keepdims=True)
        cand_norms = np.where(cand_norms < 1e-8, 1.0, cand_norms)
        cand_normed = cand_matrix / cand_norms

        labels = []
        for bs in report.bit_semantics:
            if bs.activation_rate < 0.02:
                labels.append(BitLabel(
                    bit_index=bs.bit_index, label="DEAD",
                    confidence=0.0, method='contrastive',
                    active_concepts=[], inactive_concepts=[],
                ))
                continue

            active_vecs = [embeddings[c] for c in bs.top_concepts
                           if c in embeddings]
            inactive_vecs = [embeddings[c] for c in bs.anti_concepts
                             if c in embeddings]

            if not active_vecs or not inactive_vecs:
                labels.append(BitLabel(
                    bit_index=bs.bit_index, label=f"bit_{bs.bit_index}",
                    confidence=0.0, method='contrastive',
                    active_concepts=bs.top_concepts,
                    inactive_concepts=bs.anti_concepts,
                ))
                continue

            centroid_active = np.mean(active_vecs, axis=0)
            centroid_inactive = np.mean(inactive_vecs, axis=0)
            direction = centroid_active - centroid_inactive
            direction_norm = np.linalg.norm(direction)
            if direction_norm < 1e-8:
                continue
            direction_normed = direction / direction_norm

            sims = cand_normed @ direction_normed
            best_idx = int(np.argmax(sims))

            labels.append(BitLabel(
                bit_index=bs.bit_index,
                label=cand_words[best_idx],
                confidence=float(sims[best_idx]),
                method='contrastive',
                active_concepts=bs.top_concepts,
                inactive_concepts=bs.anti_concepts,
            ))

        return labels

    # ------------------------------------------------------------------
    # Strategy 3: LLM-based
    # ------------------------------------------------------------------

    def label_by_llm(self, report: DiscoveryReport,
                     llm_fn: Callable[[str], str],
                     ) -> List[BitLabel]:
        """Name each bit by asking an LLM what the active concepts
        have in common.

        Args:
            report: DiscoveryReport.
            llm_fn: Function that takes a prompt string and returns
                the LLM's response string. User provides their own
                API wrapper.

        Example llm_fn:
            def my_llm(prompt):
                response = openai.chat.completions.create(
                    model="gpt-4", messages=[{"role":"user","content":prompt}])
                return response.choices[0].message.content
        """
        labels = []
        for bs in report.bit_semantics:
            if bs.activation_rate < 0.02:
                labels.append(BitLabel(
                    bit_index=bs.bit_index, label="DEAD",
                    confidence=0.0, method='llm',
                    active_concepts=[], inactive_concepts=[],
                ))
                continue

            prompt = (
                f"I have a neural network bit that activates for these "
                f"concepts: {', '.join(bs.top_concepts[:15])}\n\n"
                f"And does NOT activate for these concepts: "
                f"{', '.join(bs.anti_concepts[:10])}\n\n"
                f"What single abstract concept or property do the "
                f"activating concepts share that the non-activating "
                f"concepts lack? Answer with just one or two words."
            )

            try:
                label = llm_fn(prompt).strip().lower()
                # Clean up common LLM verbosity
                label = label.split('\n')[0].strip('"\'.,! ')
                if len(label) > 30:
                    label = label[:30]
            except Exception as e:
                label = f"bit_{bs.bit_index}_ERROR"

            labels.append(BitLabel(
                bit_index=bs.bit_index,
                label=label,
                confidence=0.8,  # LLM labels assumed high quality
                method='llm',
                active_concepts=bs.top_concepts,
                inactive_concepts=bs.anti_concepts,
            ))

        return labels

    # ------------------------------------------------------------------
    # Output
    # ------------------------------------------------------------------

    def print_labels(self, labels: List[BitLabel]):
        """Print discovered labels."""
        print()
        print("=" * 60)
        print("  AUTO-LABELED BITS")
        print("=" * 60)
        active_labels = [l for l in labels if l.label != "DEAD"]
        dead_labels = [l for l in labels if l.label == "DEAD"]

        active_labels.sort(key=lambda l: l.confidence, reverse=True)
        for bl in active_labels:
            concepts = ", ".join(bl.active_concepts[:4])
            print(f"    bit {bl.bit_index:>2d} = {bl.label:<20s}"
                  f"  conf={bl.confidence:.2f}  [{concepts}]")

        if dead_labels:
            print(f"\n    ({len(dead_labels)} dead bits omitted)")
        print("=" * 60)

    def export_as_primitives(self, labels: List[BitLabel],
                             output_path: str):
        """Export discovered labels as a primitivos.json-compatible file.

        This allows using discovered primitives as if they were
        manually defined — closing the loop between discovery and
        supervision.
        """
        primitives = []
        for bl in labels:
            if bl.label == "DEAD":
                continue
            primitives.append({
                'bit': bl.bit_index,
                'nombre': bl.label,
                'discovered': True,
                'confidence': round(bl.confidence, 3),
                'method': bl.method,
                'top_concepts': bl.active_concepts[:10],
                'anti_concepts': bl.inactive_concepts[:5],
            })

        data = {
            'version': 'discovered_1.0',
            'total': len(primitives),
            'description': ('Primitives discovered by reptimeline AutoLabeler. '
                            'Not manually defined — learned from model behavior.'),
            'primitivos': primitives,
        }

        with open(output_path, 'w', encoding='utf-8') as f:
            json.dump(data, f, indent=2, ensure_ascii=False)

label_by_embedding(report: DiscoveryReport, embeddings: Dict[str, np.ndarray], candidate_labels: Optional[List[str]] = None) -> List[BitLabel]

Name each bit by finding the word closest to the centroid of its active concepts in embedding space.

Parameters:

Name Type Description Default
report DiscoveryReport

DiscoveryReport from BitDiscovery.

required
embeddings Dict[str, ndarray]

Dict mapping concept -> embedding vector.

required
candidate_labels Optional[List[str]]

Optional restricted vocabulary for labels. If None, uses all keys in embeddings.

None
Source code in triadic-microgpt-src/reptimeline/autolabel.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def label_by_embedding(self, report: DiscoveryReport,
                       embeddings: Dict[str, np.ndarray],
                       candidate_labels: Optional[List[str]] = None,
                       ) -> List[BitLabel]:
    """Name each bit by finding the word closest to the centroid
    of its active concepts in embedding space.

    Args:
        report: DiscoveryReport from BitDiscovery.
        embeddings: Dict mapping concept -> embedding vector.
        candidate_labels: Optional restricted vocabulary for labels.
            If None, uses all keys in embeddings.
    """
    if candidate_labels is None:
        candidate_labels = list(embeddings.keys())

    # Pre-compute candidate matrix
    candidates = [(w, embeddings[w]) for w in candidate_labels
                   if w in embeddings]
    if not candidates:
        return []
    cand_words, cand_vecs = zip(*candidates)
    cand_matrix = np.stack(cand_vecs)
    cand_norms = np.linalg.norm(cand_matrix, axis=1, keepdims=True)
    cand_norms = np.where(cand_norms < 1e-8, 1.0, cand_norms)
    cand_normed = cand_matrix / cand_norms

    labels = []
    for bs in report.bit_semantics:
        if bs.activation_rate < 0.02:
            labels.append(BitLabel(
                bit_index=bs.bit_index, label="DEAD",
                confidence=0.0, method='embedding',
                active_concepts=[], inactive_concepts=[],
            ))
            continue

        # Centroid of active concepts
        active_vecs = [embeddings[c] for c in bs.top_concepts
                       if c in embeddings]
        if not active_vecs:
            labels.append(BitLabel(
                bit_index=bs.bit_index, label=f"bit_{bs.bit_index}",
                confidence=0.0, method='embedding',
                active_concepts=bs.top_concepts,
                inactive_concepts=bs.anti_concepts,
            ))
            continue

        centroid = np.mean(active_vecs, axis=0)
        centroid_norm = np.linalg.norm(centroid)
        if centroid_norm < 1e-8:
            continue
        centroid_normed = centroid / centroid_norm

        # Cosine similarity with all candidates
        sims = cand_normed @ centroid_normed
        best_idx = int(np.argmax(sims))
        best_word = cand_words[best_idx]
        best_sim = float(sims[best_idx])

        labels.append(BitLabel(
            bit_index=bs.bit_index,
            label=best_word,
            confidence=best_sim,
            method='embedding',
            active_concepts=bs.top_concepts,
            inactive_concepts=bs.anti_concepts,
        ))

    return labels

label_by_contrast(report: DiscoveryReport, embeddings: Dict[str, np.ndarray], candidate_labels: Optional[List[str]] = None) -> List[BitLabel]

Name each bit by finding the word that best separates active concepts from inactive concepts.

The label is the word whose embedding is most similar to (centroid_active - centroid_inactive).

Source code in triadic-microgpt-src/reptimeline/autolabel.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def label_by_contrast(self, report: DiscoveryReport,
                      embeddings: Dict[str, np.ndarray],
                      candidate_labels: Optional[List[str]] = None,
                      ) -> List[BitLabel]:
    """Name each bit by finding the word that best separates
    active concepts from inactive concepts.

    The label is the word whose embedding is most similar to
    (centroid_active - centroid_inactive).
    """
    if candidate_labels is None:
        candidate_labels = list(embeddings.keys())

    candidates = [(w, embeddings[w]) for w in candidate_labels
                   if w in embeddings]
    if not candidates:
        return []
    cand_words, cand_vecs = zip(*candidates)
    cand_matrix = np.stack(cand_vecs)
    cand_norms = np.linalg.norm(cand_matrix, axis=1, keepdims=True)
    cand_norms = np.where(cand_norms < 1e-8, 1.0, cand_norms)
    cand_normed = cand_matrix / cand_norms

    labels = []
    for bs in report.bit_semantics:
        if bs.activation_rate < 0.02:
            labels.append(BitLabel(
                bit_index=bs.bit_index, label="DEAD",
                confidence=0.0, method='contrastive',
                active_concepts=[], inactive_concepts=[],
            ))
            continue

        active_vecs = [embeddings[c] for c in bs.top_concepts
                       if c in embeddings]
        inactive_vecs = [embeddings[c] for c in bs.anti_concepts
                         if c in embeddings]

        if not active_vecs or not inactive_vecs:
            labels.append(BitLabel(
                bit_index=bs.bit_index, label=f"bit_{bs.bit_index}",
                confidence=0.0, method='contrastive',
                active_concepts=bs.top_concepts,
                inactive_concepts=bs.anti_concepts,
            ))
            continue

        centroid_active = np.mean(active_vecs, axis=0)
        centroid_inactive = np.mean(inactive_vecs, axis=0)
        direction = centroid_active - centroid_inactive
        direction_norm = np.linalg.norm(direction)
        if direction_norm < 1e-8:
            continue
        direction_normed = direction / direction_norm

        sims = cand_normed @ direction_normed
        best_idx = int(np.argmax(sims))

        labels.append(BitLabel(
            bit_index=bs.bit_index,
            label=cand_words[best_idx],
            confidence=float(sims[best_idx]),
            method='contrastive',
            active_concepts=bs.top_concepts,
            inactive_concepts=bs.anti_concepts,
        ))

    return labels

label_by_llm(report: DiscoveryReport, llm_fn: Callable[[str], str]) -> List[BitLabel]

Name each bit by asking an LLM what the active concepts have in common.

Parameters:

Name Type Description Default
report DiscoveryReport

DiscoveryReport.

required
llm_fn Callable[[str], str]

Function that takes a prompt string and returns the LLM's response string. User provides their own API wrapper.

required
Example llm_fn

def my_llm(prompt): response = openai.chat.completions.create( model="gpt-4", messages=[{"role":"user","content":prompt}]) return response.choices[0].message.content

Source code in triadic-microgpt-src/reptimeline/autolabel.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def label_by_llm(self, report: DiscoveryReport,
                 llm_fn: Callable[[str], str],
                 ) -> List[BitLabel]:
    """Name each bit by asking an LLM what the active concepts
    have in common.

    Args:
        report: DiscoveryReport.
        llm_fn: Function that takes a prompt string and returns
            the LLM's response string. User provides their own
            API wrapper.

    Example llm_fn:
        def my_llm(prompt):
            response = openai.chat.completions.create(
                model="gpt-4", messages=[{"role":"user","content":prompt}])
            return response.choices[0].message.content
    """
    labels = []
    for bs in report.bit_semantics:
        if bs.activation_rate < 0.02:
            labels.append(BitLabel(
                bit_index=bs.bit_index, label="DEAD",
                confidence=0.0, method='llm',
                active_concepts=[], inactive_concepts=[],
            ))
            continue

        prompt = (
            f"I have a neural network bit that activates for these "
            f"concepts: {', '.join(bs.top_concepts[:15])}\n\n"
            f"And does NOT activate for these concepts: "
            f"{', '.join(bs.anti_concepts[:10])}\n\n"
            f"What single abstract concept or property do the "
            f"activating concepts share that the non-activating "
            f"concepts lack? Answer with just one or two words."
        )

        try:
            label = llm_fn(prompt).strip().lower()
            # Clean up common LLM verbosity
            label = label.split('\n')[0].strip('"\'.,! ')
            if len(label) > 30:
                label = label[:30]
        except Exception as e:
            label = f"bit_{bs.bit_index}_ERROR"

        labels.append(BitLabel(
            bit_index=bs.bit_index,
            label=label,
            confidence=0.8,  # LLM labels assumed high quality
            method='llm',
            active_concepts=bs.top_concepts,
            inactive_concepts=bs.anti_concepts,
        ))

    return labels

print_labels(labels: List[BitLabel])

Print discovered labels.

Source code in triadic-microgpt-src/reptimeline/autolabel.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
def print_labels(self, labels: List[BitLabel]):
    """Print discovered labels."""
    print()
    print("=" * 60)
    print("  AUTO-LABELED BITS")
    print("=" * 60)
    active_labels = [l for l in labels if l.label != "DEAD"]
    dead_labels = [l for l in labels if l.label == "DEAD"]

    active_labels.sort(key=lambda l: l.confidence, reverse=True)
    for bl in active_labels:
        concepts = ", ".join(bl.active_concepts[:4])
        print(f"    bit {bl.bit_index:>2d} = {bl.label:<20s}"
              f"  conf={bl.confidence:.2f}  [{concepts}]")

    if dead_labels:
        print(f"\n    ({len(dead_labels)} dead bits omitted)")
    print("=" * 60)

export_as_primitives(labels: List[BitLabel], output_path: str)

Export discovered labels as a primitivos.json-compatible file.

This allows using discovered primitives as if they were manually defined — closing the loop between discovery and supervision.

Source code in triadic-microgpt-src/reptimeline/autolabel.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
def export_as_primitives(self, labels: List[BitLabel],
                         output_path: str):
    """Export discovered labels as a primitivos.json-compatible file.

    This allows using discovered primitives as if they were
    manually defined — closing the loop between discovery and
    supervision.
    """
    primitives = []
    for bl in labels:
        if bl.label == "DEAD":
            continue
        primitives.append({
            'bit': bl.bit_index,
            'nombre': bl.label,
            'discovered': True,
            'confidence': round(bl.confidence, 3),
            'method': bl.method,
            'top_concepts': bl.active_concepts[:10],
            'anti_concepts': bl.inactive_concepts[:5],
        })

    data = {
        'version': 'discovered_1.0',
        'total': len(primitives),
        'description': ('Primitives discovered by reptimeline AutoLabeler. '
                        'Not manually defined — learned from model behavior.'),
        'primitivos': primitives,
    }

    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=2, ensure_ascii=False)

Reconciler

Compares discovered ontology against theoretical expectations. Suggests corrections in both directions.

reptimeline.reconcile

Reconciler — Compare discovered ontology vs manual ontology, suggest corrections.

Closes the loop
  1. Train model (with or without supervision)
  2. BitDiscovery discovers what the model learned
  3. PrimitiveOverlay defines what the theory says
  4. Reconciler finds mismatches and suggests corrections
Corrections can go in BOTH directions
  • Fix the model: generate corrected anchors for retraining
  • Fix the theory: suggest changes to primitivos.json

BitMismatch dataclass

A discovered incongruence between model and theory.

Source code in triadic-microgpt-src/reptimeline/reconcile.py
26
27
28
29
30
31
32
33
34
35
36
37
@dataclass
class BitMismatch:
    """A discovered incongruence between model and theory."""
    bit_index: int
    primitive_name: str
    mismatch_type: str  # 'semantic_drift', 'wrong_dual', 'missing_dep',
                        # 'extra_dep', 'dead_but_assigned', 'active_but_unassigned',
                        # 'swapped_bits'
    severity: str  # 'critical', 'warning', 'info'
    description: str
    suggestion: str
    evidence: Dict[str, Any] = field(default_factory=dict)

DualMismatch dataclass

A dual pair that doesn't match between discovery and theory.

Source code in triadic-microgpt-src/reptimeline/reconcile.py
40
41
42
43
44
45
46
47
48
49
@dataclass
class DualMismatch:
    """A dual pair that doesn't match between discovery and theory."""
    mismatch_type: str  # 'missing_in_theory', 'missing_in_model', 'inverted'
    bit_a: int
    bit_b: int
    name_a: str
    name_b: str
    model_correlation: float
    description: str

DepMismatch dataclass

A dependency that doesn't match between discovery and theory.

Source code in triadic-microgpt-src/reptimeline/reconcile.py
52
53
54
55
56
57
58
59
60
61
@dataclass
class DepMismatch:
    """A dependency that doesn't match between discovery and theory."""
    mismatch_type: str  # 'missing_in_theory', 'missing_in_model', 'contradicted'
    parent_bit: int
    child_bit: int
    parent_name: str
    child_name: str
    confidence: float
    description: str

ReconciliationReport dataclass

Full comparison between discovered and theoretical ontology.

Source code in triadic-microgpt-src/reptimeline/reconcile.py
64
65
66
67
68
69
70
71
72
73
@dataclass
class ReconciliationReport:
    """Full comparison between discovered and theoretical ontology."""
    bit_mismatches: List[BitMismatch]
    dual_mismatches: List[DualMismatch]
    dep_mismatches: List[DepMismatch]
    agreement_score: float  # 0-1, how well model matches theory
    suggested_anchor_corrections: Dict[str, Any]
    suggested_theory_corrections: Dict[str, Any]
    metadata: Dict[str, Any] = field(default_factory=dict)

Reconciler

Compares discovered structure with theoretical primitives.

Finds mismatches and suggests corrections in both directions.

Source code in triadic-microgpt-src/reptimeline/reconcile.py
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
class Reconciler:
    """Compares discovered structure with theoretical primitives.

    Finds mismatches and suggests corrections in both directions.
    """

    def __init__(self, overlay: PrimitiveOverlay,
                 semantic_drift_threshold: float = 0.3):
        """
        Args:
            overlay: PrimitiveOverlay with loaded primitivos.json.
            semantic_drift_threshold: How different a bit's actual usage
                can be from its theoretical meaning before flagging.
        """
        self.overlay = overlay
        self.drift_threshold = semantic_drift_threshold

    def reconcile(self, discovery_report: DiscoveryReport,
                  snapshot_codes: Dict[str, List[int]],
                  ) -> ReconciliationReport:
        """Compare discovered ontology with theoretical primitives.

        Args:
            discovery_report: Output from BitDiscovery.discover().
            snapshot_codes: The codes dict from the snapshot used for discovery.
        """
        bit_mismatches = self._check_bit_assignments(
            discovery_report, snapshot_codes
        )
        dual_mismatches = self._check_duals(discovery_report)
        dep_mismatches = self._check_dependencies(discovery_report)

        total_checks = (len(bit_mismatches) + len(dual_mismatches)
                        + len(dep_mismatches))
        critical = sum(1 for m in bit_mismatches if m.severity == 'critical')
        critical += sum(1 for m in dual_mismatches
                        if m.mismatch_type == 'missing_in_model')

        n_prims = len(self.overlay.primitives)
        agreement = 1.0 - (critical / max(n_prims, 1))

        anchor_corrections = self._suggest_anchor_corrections(
            bit_mismatches, dep_mismatches
        )
        theory_corrections = self._suggest_theory_corrections(
            bit_mismatches, dual_mismatches, dep_mismatches
        )

        return ReconciliationReport(
            bit_mismatches=bit_mismatches,
            dual_mismatches=dual_mismatches,
            dep_mismatches=dep_mismatches,
            agreement_score=max(0.0, agreement),
            suggested_anchor_corrections=anchor_corrections,
            suggested_theory_corrections=theory_corrections,
            metadata={
                'n_primitives': n_prims,
                'n_discovered_active': discovery_report.n_active_bits,
                'n_discovered_dead': discovery_report.n_dead_bits,
                'total_mismatches': total_checks,
                'critical_mismatches': critical,
            },
        )

    # ------------------------------------------------------------------
    # Bit assignment checks
    # ------------------------------------------------------------------

    def _check_bit_assignments(self, report: DiscoveryReport,
                               codes: Dict[str, List[int]],
                               ) -> List[BitMismatch]:
        """Check if each primitive's bit is used as the theory expects."""
        mismatches = []
        semantics = {bs.bit_index: bs for bs in report.bit_semantics}
        assigned_bits = {p.bit for p in self.overlay.primitives}

        for prim in self.overlay.primitives:
            bs = semantics.get(prim.bit)
            if bs is None:
                continue

            # Dead bit that should be active
            if bs.activation_rate < 0.02:
                mismatches.append(BitMismatch(
                    bit_index=prim.bit,
                    primitive_name=prim.name,
                    mismatch_type='dead_but_assigned',
                    severity='critical',
                    description=(f"Bit {prim.bit} ({prim.name}) is dead "
                                 f"(rate={bs.activation_rate:.3f}) but "
                                 f"theory assigns it as a primitive"),
                    suggestion=(f"Check if {prim.name} concepts are in "
                                f"training data. Consider reassigning to "
                                f"an active bit."),
                    evidence={'activation_rate': bs.activation_rate},
                ))

            # Very high activation = not discriminative
            elif bs.activation_rate > 0.95:
                mismatches.append(BitMismatch(
                    bit_index=prim.bit,
                    primitive_name=prim.name,
                    mismatch_type='semantic_drift',
                    severity='warning',
                    description=(f"Bit {prim.bit} ({prim.name}) activates "
                                 f"for {bs.activation_rate:.0%} of concepts — "
                                 f"not discriminative"),
                    suggestion=(f"This bit may have collapsed to 'always on'. "
                                f"Check discretization pressure."),
                    evidence={
                        'activation_rate': bs.activation_rate,
                        'top_concepts': bs.top_concepts[:5],
                    },
                ))

        # Active bits that have no primitive assigned
        for bs in report.bit_semantics:
            if bs.activation_rate > 0.05 and bs.bit_index not in assigned_bits:
                mismatches.append(BitMismatch(
                    bit_index=bs.bit_index,
                    primitive_name='(unassigned)',
                    mismatch_type='active_but_unassigned',
                    severity='info',
                    description=(f"Bit {bs.bit_index} is active "
                                 f"(rate={bs.activation_rate:.2f}) but has "
                                 f"no primitive assigned"),
                    suggestion=(f"Consider assigning a primitive to this bit. "
                                f"Top concepts: {bs.top_concepts[:5]}"),
                    evidence={
                        'activation_rate': bs.activation_rate,
                        'top_concepts': bs.top_concepts[:5],
                    },
                ))

        return mismatches

    # ------------------------------------------------------------------
    # Dual pair checks
    # ------------------------------------------------------------------

    def _check_duals(self, report: DiscoveryReport) -> List[DualMismatch]:
        """Compare discovered duals with theoretical duals."""
        mismatches = []

        # Theoretical duals
        theory_duals: Dict[Tuple[int, int], Tuple[str, str]] = {}
        for prim in self.overlay.primitives:
            if prim.dual:
                dual_info = self.overlay._name_to_info.get(prim.dual)
                if dual_info:
                    key = tuple(sorted([prim.bit, dual_info.bit]))
                    theory_duals[key] = (prim.name, prim.dual)

        # Discovered duals
        discovered_pairs: Dict[Tuple[int, int], float] = {}
        for dd in report.discovered_duals:
            key = tuple(sorted([dd.bit_a, dd.bit_b]))
            discovered_pairs[key] = dd.anti_correlation

        # Theory says dual but model doesn't show it
        for bits, names in theory_duals.items():
            if bits not in discovered_pairs:
                mismatches.append(DualMismatch(
                    mismatch_type='missing_in_model',
                    bit_a=bits[0], bit_b=bits[1],
                    name_a=names[0], name_b=names[1],
                    model_correlation=0.0,
                    description=(f"Theory says {names[0]}/{names[1]} "
                                 f"(bits {bits[0]},{bits[1]}) are duals, "
                                 f"but model shows no anti-correlation"),
                ))

        # Model shows dual but theory doesn't list it
        for bits, corr in discovered_pairs.items():
            if bits not in theory_duals and corr < -0.5:
                name_a = self.overlay._bit_to_name.get(bits[0], f"bit_{bits[0]}")
                name_b = self.overlay._bit_to_name.get(bits[1], f"bit_{bits[1]}")
                mismatches.append(DualMismatch(
                    mismatch_type='missing_in_theory',
                    bit_a=bits[0], bit_b=bits[1],
                    name_a=name_a, name_b=name_b,
                    model_correlation=corr,
                    description=(f"Model shows {name_a}/{name_b} "
                                 f"(bits {bits[0]},{bits[1]}) as duals "
                                 f"(corr={corr:.3f}), but theory doesn't "
                                 f"list them"),
                ))

        return mismatches

    # ------------------------------------------------------------------
    # Dependency checks
    # ------------------------------------------------------------------

    def _check_dependencies(self, report: DiscoveryReport) -> List[DepMismatch]:
        """Compare discovered deps with theoretical deps."""
        mismatches = []

        # Build discovered dep set
        discovered_deps: Dict[Tuple[int, int], float] = {}
        for dd in report.discovered_deps:
            discovered_deps[(dd.bit_parent, dd.bit_child)] = dd.confidence

        # Check each theoretical dependency
        for prim in self.overlay.primitives:
            child_bit = prim.bit
            for dep_name in prim.deps:
                dep_info = self.overlay._name_to_info.get(dep_name)
                if dep_info is None:
                    continue
                parent_bit = dep_info.bit
                key = (parent_bit, child_bit)

                if key not in discovered_deps:
                    mismatches.append(DepMismatch(
                        mismatch_type='missing_in_model',
                        parent_bit=parent_bit, child_bit=child_bit,
                        parent_name=dep_name, child_name=prim.name,
                        confidence=0.0,
                        description=(f"Theory says {prim.name} depends on "
                                     f"{dep_name}, but model doesn't show "
                                     f"this dependency"),
                    ))

        # Check for discovered deps not in theory
        assigned_bits = {p.bit: p.name for p in self.overlay.primitives}
        theory_deps: Set[Tuple[int, int]] = set()
        for prim in self.overlay.primitives:
            for dep_name in prim.deps:
                dep_info = self.overlay._name_to_info.get(dep_name)
                if dep_info:
                    theory_deps.add((dep_info.bit, prim.bit))

        for (parent, child), conf in discovered_deps.items():
            if (parent, child) not in theory_deps:
                parent_name = assigned_bits.get(parent, f"bit_{parent}")
                child_name = assigned_bits.get(child, f"bit_{child}")
                if conf > 0.95:  # only flag strong unexpected deps
                    mismatches.append(DepMismatch(
                        mismatch_type='missing_in_theory',
                        parent_bit=parent, child_bit=child,
                        parent_name=parent_name, child_name=child_name,
                        confidence=conf,
                        description=(f"Model shows {child_name} depends on "
                                     f"{parent_name} (conf={conf:.2f}), "
                                     f"but theory doesn't list this"),
                    ))

        return mismatches

    # ------------------------------------------------------------------
    # Correction suggestions
    # ------------------------------------------------------------------

    def _suggest_anchor_corrections(self, bit_mismatches, dep_mismatches):
        """Suggest corrections to anchor files for retraining."""
        corrections = {
            'add_anchors_for': [],
            'remove_anchors_for': [],
            'modify_bit_targets': [],
        }

        for m in bit_mismatches:
            if m.mismatch_type == 'dead_but_assigned':
                corrections['add_anchors_for'].append({
                    'primitive': m.primitive_name,
                    'bit': m.bit_index,
                    'reason': 'Bit is dead — add more training examples '
                              'that should activate this bit',
                })
            elif m.mismatch_type == 'semantic_drift':
                corrections['modify_bit_targets'].append({
                    'primitive': m.primitive_name,
                    'bit': m.bit_index,
                    'reason': 'Bit activates for too many concepts — '
                              'add negative examples to make it selective',
                })

        return corrections

    def _suggest_theory_corrections(self, bit_mismatches, dual_mismatches,
                                    dep_mismatches):
        """Suggest corrections to primitivos.json based on what model learned."""
        corrections = {
            'add_duals': [],
            'remove_duals': [],
            'add_dependencies': [],
            'remove_dependencies': [],
            'review_primitives': [],
        }

        for m in dual_mismatches:
            if m.mismatch_type == 'missing_in_theory':
                corrections['add_duals'].append({
                    'pair': [m.name_a, m.name_b],
                    'correlation': m.model_correlation,
                    'reason': m.description,
                })
            elif m.mismatch_type == 'missing_in_model':
                corrections['review_primitives'].append({
                    'pair': [m.name_a, m.name_b],
                    'reason': f"Listed as duals but model doesn't "
                              f"anti-correlate them",
                })

        for m in dep_mismatches:
            if m.mismatch_type == 'missing_in_theory':
                corrections['add_dependencies'].append({
                    'child': m.child_name,
                    'parent': m.parent_name,
                    'confidence': m.confidence,
                    'reason': m.description,
                })
            elif m.mismatch_type == 'missing_in_model':
                corrections['remove_dependencies'].append({
                    'child': m.child_name,
                    'parent': m.parent_name,
                    'reason': m.description,
                })

        return corrections

    # ------------------------------------------------------------------
    # Pretty print
    # ------------------------------------------------------------------

    def print_report(self, report: ReconciliationReport):
        """Print reconciliation results."""
        print()
        print("=" * 60)
        print("  RECONCILIATION REPORT")
        print("=" * 60)
        meta = report.metadata
        print(f"  Primitives:          {meta.get('n_primitives', 0)}")
        print(f"  Active bits (model): {meta.get('n_discovered_active', 0)}")
        print(f"  Dead bits (model):   {meta.get('n_discovered_dead', 0)}")
        print(f"  Agreement score:     {report.agreement_score:.1%}")
        print(f"  Total mismatches:    {meta.get('total_mismatches', 0)}")
        print(f"  Critical:            {meta.get('critical_mismatches', 0)}")
        print()

        # Bit mismatches
        if report.bit_mismatches:
            print("  BIT MISMATCHES")
            print("  " + "-" * 56)
            for m in sorted(report.bit_mismatches,
                            key=lambda x: {'critical': 0, 'warning': 1,
                                           'info': 2}[x.severity]):
                icon = {'critical': '!!', 'warning': '! ', 'info': '  '}
                print(f"    {icon[m.severity]} [{m.severity.upper()}] {m.description}")
                print(f"       -> {m.suggestion}")
            print()

        # Dual mismatches
        if report.dual_mismatches:
            print("  DUAL MISMATCHES")
            print("  " + "-" * 56)
            for m in report.dual_mismatches:
                print(f"    {m.description}")
            print()

        # Dep mismatches
        if report.dep_mismatches:
            n_model = sum(1 for m in report.dep_mismatches
                          if m.mismatch_type == 'missing_in_model')
            n_theory = sum(1 for m in report.dep_mismatches
                           if m.mismatch_type == 'missing_in_theory')
            print(f"  DEPENDENCY MISMATCHES "
                  f"({n_model} theory-only, {n_theory} model-only)")
            print("  " + "-" * 56)
            for m in report.dep_mismatches[:20]:
                print(f"    {m.description}")
            if len(report.dep_mismatches) > 20:
                print(f"    ... and {len(report.dep_mismatches) - 20} more")
            print()

        # Suggestions
        ac = report.suggested_anchor_corrections
        tc = report.suggested_theory_corrections

        has_suggestions = (ac.get('add_anchors_for')
                           or ac.get('modify_bit_targets')
                           or tc.get('add_duals')
                           or tc.get('add_dependencies'))

        if has_suggestions:
            print("  SUGGESTED CORRECTIONS")
            print("  " + "-" * 56)
            if ac.get('add_anchors_for'):
                print("  For retraining (fix anchors):")
                for s in ac['add_anchors_for']:
                    print(f"    + Add anchors for '{s['primitive']}' "
                          f"(bit {s['bit']}): {s['reason']}")
            if ac.get('modify_bit_targets'):
                for s in ac['modify_bit_targets']:
                    print(f"    ~ Modify targets for '{s['primitive']}' "
                          f"(bit {s['bit']}): {s['reason']}")
            if tc.get('add_duals'):
                print("  For theory (fix primitivos.json):")
                for s in tc['add_duals']:
                    print(f"    + Add dual pair: {s['pair']} "
                          f"(corr={s['correlation']:.3f})")
            if tc.get('add_dependencies'):
                for s in tc['add_dependencies'][:10]:
                    print(f"    + Add dep: {s['child']} -> {s['parent']} "
                          f"(conf={s['confidence']:.2f})")
            print()

        print("=" * 60)

reconcile(discovery_report: DiscoveryReport, snapshot_codes: Dict[str, List[int]]) -> ReconciliationReport

Compare discovered ontology with theoretical primitives.

Parameters:

Name Type Description Default
discovery_report DiscoveryReport

Output from BitDiscovery.discover().

required
snapshot_codes Dict[str, List[int]]

The codes dict from the snapshot used for discovery.

required
Source code in triadic-microgpt-src/reptimeline/reconcile.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
def reconcile(self, discovery_report: DiscoveryReport,
              snapshot_codes: Dict[str, List[int]],
              ) -> ReconciliationReport:
    """Compare discovered ontology with theoretical primitives.

    Args:
        discovery_report: Output from BitDiscovery.discover().
        snapshot_codes: The codes dict from the snapshot used for discovery.
    """
    bit_mismatches = self._check_bit_assignments(
        discovery_report, snapshot_codes
    )
    dual_mismatches = self._check_duals(discovery_report)
    dep_mismatches = self._check_dependencies(discovery_report)

    total_checks = (len(bit_mismatches) + len(dual_mismatches)
                    + len(dep_mismatches))
    critical = sum(1 for m in bit_mismatches if m.severity == 'critical')
    critical += sum(1 for m in dual_mismatches
                    if m.mismatch_type == 'missing_in_model')

    n_prims = len(self.overlay.primitives)
    agreement = 1.0 - (critical / max(n_prims, 1))

    anchor_corrections = self._suggest_anchor_corrections(
        bit_mismatches, dep_mismatches
    )
    theory_corrections = self._suggest_theory_corrections(
        bit_mismatches, dual_mismatches, dep_mismatches
    )

    return ReconciliationReport(
        bit_mismatches=bit_mismatches,
        dual_mismatches=dual_mismatches,
        dep_mismatches=dep_mismatches,
        agreement_score=max(0.0, agreement),
        suggested_anchor_corrections=anchor_corrections,
        suggested_theory_corrections=theory_corrections,
        metadata={
            'n_primitives': n_prims,
            'n_discovered_active': discovery_report.n_active_bits,
            'n_discovered_dead': discovery_report.n_dead_bits,
            'total_mismatches': total_checks,
            'critical_mismatches': critical,
        },
    )

print_report(report: ReconciliationReport)

Print reconciliation results.

Source code in triadic-microgpt-src/reptimeline/reconcile.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def print_report(self, report: ReconciliationReport):
    """Print reconciliation results."""
    print()
    print("=" * 60)
    print("  RECONCILIATION REPORT")
    print("=" * 60)
    meta = report.metadata
    print(f"  Primitives:          {meta.get('n_primitives', 0)}")
    print(f"  Active bits (model): {meta.get('n_discovered_active', 0)}")
    print(f"  Dead bits (model):   {meta.get('n_discovered_dead', 0)}")
    print(f"  Agreement score:     {report.agreement_score:.1%}")
    print(f"  Total mismatches:    {meta.get('total_mismatches', 0)}")
    print(f"  Critical:            {meta.get('critical_mismatches', 0)}")
    print()

    # Bit mismatches
    if report.bit_mismatches:
        print("  BIT MISMATCHES")
        print("  " + "-" * 56)
        for m in sorted(report.bit_mismatches,
                        key=lambda x: {'critical': 0, 'warning': 1,
                                       'info': 2}[x.severity]):
            icon = {'critical': '!!', 'warning': '! ', 'info': '  '}
            print(f"    {icon[m.severity]} [{m.severity.upper()}] {m.description}")
            print(f"       -> {m.suggestion}")
        print()

    # Dual mismatches
    if report.dual_mismatches:
        print("  DUAL MISMATCHES")
        print("  " + "-" * 56)
        for m in report.dual_mismatches:
            print(f"    {m.description}")
        print()

    # Dep mismatches
    if report.dep_mismatches:
        n_model = sum(1 for m in report.dep_mismatches
                      if m.mismatch_type == 'missing_in_model')
        n_theory = sum(1 for m in report.dep_mismatches
                       if m.mismatch_type == 'missing_in_theory')
        print(f"  DEPENDENCY MISMATCHES "
              f"({n_model} theory-only, {n_theory} model-only)")
        print("  " + "-" * 56)
        for m in report.dep_mismatches[:20]:
            print(f"    {m.description}")
        if len(report.dep_mismatches) > 20:
            print(f"    ... and {len(report.dep_mismatches) - 20} more")
        print()

    # Suggestions
    ac = report.suggested_anchor_corrections
    tc = report.suggested_theory_corrections

    has_suggestions = (ac.get('add_anchors_for')
                       or ac.get('modify_bit_targets')
                       or tc.get('add_duals')
                       or tc.get('add_dependencies'))

    if has_suggestions:
        print("  SUGGESTED CORRECTIONS")
        print("  " + "-" * 56)
        if ac.get('add_anchors_for'):
            print("  For retraining (fix anchors):")
            for s in ac['add_anchors_for']:
                print(f"    + Add anchors for '{s['primitive']}' "
                      f"(bit {s['bit']}): {s['reason']}")
        if ac.get('modify_bit_targets'):
            for s in ac['modify_bit_targets']:
                print(f"    ~ Modify targets for '{s['primitive']}' "
                      f"(bit {s['bit']}): {s['reason']}")
        if tc.get('add_duals'):
            print("  For theory (fix primitivos.json):")
            for s in tc['add_duals']:
                print(f"    + Add dual pair: {s['pair']} "
                      f"(corr={s['correlation']:.3f})")
        if tc.get('add_dependencies'):
            for s in tc['add_dependencies'][:10]:
                print(f"    + Add dep: {s['child']} -> {s['parent']} "
                      f"(conf={s['confidence']:.2f})")
        print()

    print("=" * 60)

Extractors

Base Extractor

Abstract interface for representation backends. Implement three methods to add a new backend.

reptimeline.extractors.base

Abstract base class for representation extractors.

Each backend (triadic, VQ-VAE, FSQ, sparse autoencoder) implements this interface to produce standardized ConceptSnapshot objects.

RepresentationExtractor

Bases: ABC

Extracts discrete concept representations from model checkpoints.

Source code in triadic-microgpt-src/reptimeline/extractors/base.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class RepresentationExtractor(ABC):
    """Extracts discrete concept representations from model checkpoints."""

    @abstractmethod
    def extract(self, checkpoint_path: str, concepts: List[str],
                device: str = 'cpu') -> ConceptSnapshot:
        """Extract a snapshot from a single checkpoint.

        Args:
            checkpoint_path: Path to the model checkpoint file.
            concepts: List of concept strings to extract.
            device: Torch device string.

        Returns:
            ConceptSnapshot with codes for each concept found.
        """
        ...

    @abstractmethod
    def similarity(self, code_a: List[int], code_b: List[int]) -> float:
        """Compute similarity between two concept codes.

        Returns a value in [0, 1] where 1 means identical.
        """
        ...

    @abstractmethod
    def shared_features(self, code_a: List[int], code_b: List[int]) -> List[int]:
        """Return indices of features shared by both codes."""
        ...

    def discover_checkpoints(self, directory: str) -> List[Tuple[int, str]]:
        """Find all step checkpoints in a directory, sorted by step.

        Handles common naming patterns:
          - model_step5000.pt
          - model_xl_step5000.pt
          - model_best.pt (excluded — not a step checkpoint)

        Returns:
            List of (step, path) tuples sorted ascending.
        """
        pattern = re.compile(r'model_.*?step(\d+)\.pt$|model_step(\d+)\.pt$')
        results = []
        for fname in os.listdir(directory):
            m = pattern.match(fname)
            if m:
                step = int(m.group(1) or m.group(2))
                results.append((step, os.path.join(directory, fname)))
        results.sort(key=lambda x: x[0])
        return results

    def extract_sequence(self, directory: str, concepts: List[str],
                         device: str = 'cpu',
                         max_checkpoints: Optional[int] = None
                         ) -> List[ConceptSnapshot]:
        """Extract snapshots from all checkpoints in a directory.

        Args:
            directory: Checkpoint directory.
            concepts: Concepts to track.
            device: Torch device.
            max_checkpoints: Limit number of checkpoints (evenly spaced).

        Returns:
            List of ConceptSnapshot sorted by step.
        """
        checkpoints = self.discover_checkpoints(directory)
        if not checkpoints:
            raise FileNotFoundError(f"No step checkpoints found in {directory}")

        if max_checkpoints and len(checkpoints) > max_checkpoints:
            indices = [int(i * (len(checkpoints) - 1) / (max_checkpoints - 1))
                       for i in range(max_checkpoints)]
            checkpoints = [checkpoints[i] for i in indices]

        snapshots = []
        for i, (step, path) in enumerate(checkpoints):
            print(f"  [{i+1}/{len(checkpoints)}] Extracting step {step:,}...")
            snapshot = self.extract(path, concepts, device)
            snapshots.append(snapshot)

        return snapshots

extract(checkpoint_path: str, concepts: List[str], device: str = 'cpu') -> ConceptSnapshot abstractmethod

Extract a snapshot from a single checkpoint.

Parameters:

Name Type Description Default
checkpoint_path str

Path to the model checkpoint file.

required
concepts List[str]

List of concept strings to extract.

required
device str

Torch device string.

'cpu'

Returns:

Type Description
ConceptSnapshot

ConceptSnapshot with codes for each concept found.

Source code in triadic-microgpt-src/reptimeline/extractors/base.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@abstractmethod
def extract(self, checkpoint_path: str, concepts: List[str],
            device: str = 'cpu') -> ConceptSnapshot:
    """Extract a snapshot from a single checkpoint.

    Args:
        checkpoint_path: Path to the model checkpoint file.
        concepts: List of concept strings to extract.
        device: Torch device string.

    Returns:
        ConceptSnapshot with codes for each concept found.
    """
    ...

similarity(code_a: List[int], code_b: List[int]) -> float abstractmethod

Compute similarity between two concept codes.

Returns a value in [0, 1] where 1 means identical.

Source code in triadic-microgpt-src/reptimeline/extractors/base.py
34
35
36
37
38
39
40
@abstractmethod
def similarity(self, code_a: List[int], code_b: List[int]) -> float:
    """Compute similarity between two concept codes.

    Returns a value in [0, 1] where 1 means identical.
    """
    ...

shared_features(code_a: List[int], code_b: List[int]) -> List[int] abstractmethod

Return indices of features shared by both codes.

Source code in triadic-microgpt-src/reptimeline/extractors/base.py
42
43
44
45
@abstractmethod
def shared_features(self, code_a: List[int], code_b: List[int]) -> List[int]:
    """Return indices of features shared by both codes."""
    ...

discover_checkpoints(directory: str) -> List[Tuple[int, str]]

Find all step checkpoints in a directory, sorted by step.

Handles common naming patterns
  • model_step5000.pt
  • model_xl_step5000.pt
  • model_best.pt (excluded — not a step checkpoint)

Returns:

Type Description
List[Tuple[int, str]]

List of (step, path) tuples sorted ascending.

Source code in triadic-microgpt-src/reptimeline/extractors/base.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def discover_checkpoints(self, directory: str) -> List[Tuple[int, str]]:
    """Find all step checkpoints in a directory, sorted by step.

    Handles common naming patterns:
      - model_step5000.pt
      - model_xl_step5000.pt
      - model_best.pt (excluded — not a step checkpoint)

    Returns:
        List of (step, path) tuples sorted ascending.
    """
    pattern = re.compile(r'model_.*?step(\d+)\.pt$|model_step(\d+)\.pt$')
    results = []
    for fname in os.listdir(directory):
        m = pattern.match(fname)
        if m:
            step = int(m.group(1) or m.group(2))
            results.append((step, os.path.join(directory, fname)))
    results.sort(key=lambda x: x[0])
    return results

extract_sequence(directory: str, concepts: List[str], device: str = 'cpu', max_checkpoints: Optional[int] = None) -> List[ConceptSnapshot]

Extract snapshots from all checkpoints in a directory.

Parameters:

Name Type Description Default
directory str

Checkpoint directory.

required
concepts List[str]

Concepts to track.

required
device str

Torch device.

'cpu'
max_checkpoints Optional[int]

Limit number of checkpoints (evenly spaced).

None

Returns:

Type Description
List[ConceptSnapshot]

List of ConceptSnapshot sorted by step.

Source code in triadic-microgpt-src/reptimeline/extractors/base.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def extract_sequence(self, directory: str, concepts: List[str],
                     device: str = 'cpu',
                     max_checkpoints: Optional[int] = None
                     ) -> List[ConceptSnapshot]:
    """Extract snapshots from all checkpoints in a directory.

    Args:
        directory: Checkpoint directory.
        concepts: Concepts to track.
        device: Torch device.
        max_checkpoints: Limit number of checkpoints (evenly spaced).

    Returns:
        List of ConceptSnapshot sorted by step.
    """
    checkpoints = self.discover_checkpoints(directory)
    if not checkpoints:
        raise FileNotFoundError(f"No step checkpoints found in {directory}")

    if max_checkpoints and len(checkpoints) > max_checkpoints:
        indices = [int(i * (len(checkpoints) - 1) / (max_checkpoints - 1))
                   for i in range(max_checkpoints)]
        checkpoints = [checkpoints[i] for i in indices]

    snapshots = []
    for i, (step, path) in enumerate(checkpoints):
        print(f"  [{i+1}/{len(checkpoints)}] Extracting step {step:,}...")
        snapshot = self.extract(path, concepts, device)
        snapshots.append(snapshot)

    return snapshots

Triadic Extractor

TriadicGPT-specific extractor that loads model checkpoints and extracts 63-bit codes.

reptimeline.extractors.triadic

Triadic extractor — loads TriadicGPT checkpoints and extracts 63-bit codes.

This is the concrete proof-of-concept backend. It wraps the existing src/evaluate.py, src/triadic.py infrastructure.

TriadicExtractor

Bases: RepresentationExtractor

Extracts triadic bit representations from TriadicGPT checkpoints.

Uses PrimeMapper for bit->prime mapping and TriadicValidator for algebraic similarity (Jaccard on prime factors, GCD-based connections).

Source code in triadic-microgpt-src/reptimeline/extractors/triadic.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
class TriadicExtractor(RepresentationExtractor):
    """Extracts triadic bit representations from TriadicGPT checkpoints.

    Uses PrimeMapper for bit->prime mapping and TriadicValidator for
    algebraic similarity (Jaccard on prime factors, GCD-based connections).
    """

    def __init__(self, tokenizer_path: Optional[str] = None,
                 n_bits: int = 63, max_tokens: int = 4):
        """
        Args:
            tokenizer_path: Path to tokenizer.json. If None, auto-detected
                from checkpoint directory.
            n_bits: Number of triadic bits (default: 63).
            max_tokens: Max tokens per concept (4 for custom BPE, 8 for GPT-2).
        """
        self.tokenizer_path = tokenizer_path
        self.n_bits = n_bits
        self.max_tokens = max_tokens

    def extract(self, checkpoint_path: str, concepts: List[str],
                device: str = 'cpu') -> ConceptSnapshot:
        """Extract triadic snapshot from a TriadicGPT checkpoint."""
        from src.evaluate import load_model
        from src.triadic import PrimeMapper

        # Auto-detect tokenizer
        tok_path = self.tokenizer_path
        if tok_path is None:
            ckpt_dir = os.path.dirname(checkpoint_path)
            tok_path = os.path.join(ckpt_dir, 'tokenizer.json')

        model, tokenizer, config = load_model(checkpoint_path, tok_path, device)
        mapper = PrimeMapper(config.n_triadic_bits)

        codes = {}
        continuous = {}
        composites = {}

        for concept in concepts:
            try:
                ids = tokenizer.encode(concept, add_special=False)[:self.max_tokens]
            except Exception:
                continue
            if not ids:
                continue

            x = torch.tensor([ids], dtype=torch.long, device=device)
            with torch.no_grad():
                triadic_proj = model(x)[1]

            proj = triadic_proj[0].mean(dim=0).cpu().numpy()
            bits = mapper.get_bits(proj)
            composite = int(mapper.map(proj.tolist()))

            codes[concept] = bits
            continuous[concept] = proj.tolist()
            composites[concept] = composite

        # Parse step from filename
        basename = os.path.basename(checkpoint_path)
        step = 0
        import re
        m = re.search(r'step(\d+)', basename)
        if m:
            step = int(m.group(1))

        # Free GPU
        del model
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        return ConceptSnapshot(
            step=step,
            codes=codes,
            continuous=continuous,
            metadata={'composites': composites, 'n_bits': config.n_triadic_bits},
        )

    def similarity(self, code_a: List[int], code_b: List[int]) -> float:
        """Jaccard similarity on active bits."""
        active_a = set(i for i, v in enumerate(code_a) if v == 1)
        active_b = set(i for i, v in enumerate(code_b) if v == 1)
        if not active_a and not active_b:
            return 1.0
        union = active_a | active_b
        if not union:
            return 0.0
        return len(active_a & active_b) / len(union)

    def shared_features(self, code_a: List[int], code_b: List[int]) -> List[int]:
        """Indices where both codes are active (both == 1)."""
        return [i for i in range(min(len(code_a), len(code_b)))
                if code_a[i] == 1 and code_b[i] == 1]

    def algebraic_similarity(self, composite_a: int, composite_b: int) -> float:
        """GCD-based similarity using prime factorization.

        This is the triadic-specific similarity: Jaccard on prime factors.
        More meaningful than bit-level Jaccard because it respects the
        algebraic structure.
        """
        from src.triadic import prime_factors
        factors_a = set(prime_factors(composite_a))
        factors_b = set(prime_factors(composite_b))
        union = factors_a | factors_b
        if not union:
            return 0.0
        return len(factors_a & factors_b) / len(union)

    def are_connected(self, composite_a: int, composite_b: int) -> bool:
        """Two concepts are connected if they share at least one prime factor."""
        return composite_a > 1 and composite_b > 1 and math.gcd(composite_a, composite_b) > 1

extract(checkpoint_path: str, concepts: List[str], device: str = 'cpu') -> ConceptSnapshot

Extract triadic snapshot from a TriadicGPT checkpoint.

Source code in triadic-microgpt-src/reptimeline/extractors/triadic.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def extract(self, checkpoint_path: str, concepts: List[str],
            device: str = 'cpu') -> ConceptSnapshot:
    """Extract triadic snapshot from a TriadicGPT checkpoint."""
    from src.evaluate import load_model
    from src.triadic import PrimeMapper

    # Auto-detect tokenizer
    tok_path = self.tokenizer_path
    if tok_path is None:
        ckpt_dir = os.path.dirname(checkpoint_path)
        tok_path = os.path.join(ckpt_dir, 'tokenizer.json')

    model, tokenizer, config = load_model(checkpoint_path, tok_path, device)
    mapper = PrimeMapper(config.n_triadic_bits)

    codes = {}
    continuous = {}
    composites = {}

    for concept in concepts:
        try:
            ids = tokenizer.encode(concept, add_special=False)[:self.max_tokens]
        except Exception:
            continue
        if not ids:
            continue

        x = torch.tensor([ids], dtype=torch.long, device=device)
        with torch.no_grad():
            triadic_proj = model(x)[1]

        proj = triadic_proj[0].mean(dim=0).cpu().numpy()
        bits = mapper.get_bits(proj)
        composite = int(mapper.map(proj.tolist()))

        codes[concept] = bits
        continuous[concept] = proj.tolist()
        composites[concept] = composite

    # Parse step from filename
    basename = os.path.basename(checkpoint_path)
    step = 0
    import re
    m = re.search(r'step(\d+)', basename)
    if m:
        step = int(m.group(1))

    # Free GPU
    del model
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    return ConceptSnapshot(
        step=step,
        codes=codes,
        continuous=continuous,
        metadata={'composites': composites, 'n_bits': config.n_triadic_bits},
    )

similarity(code_a: List[int], code_b: List[int]) -> float

Jaccard similarity on active bits.

Source code in triadic-microgpt-src/reptimeline/extractors/triadic.py
105
106
107
108
109
110
111
112
113
114
def similarity(self, code_a: List[int], code_b: List[int]) -> float:
    """Jaccard similarity on active bits."""
    active_a = set(i for i, v in enumerate(code_a) if v == 1)
    active_b = set(i for i, v in enumerate(code_b) if v == 1)
    if not active_a and not active_b:
        return 1.0
    union = active_a | active_b
    if not union:
        return 0.0
    return len(active_a & active_b) / len(union)

shared_features(code_a: List[int], code_b: List[int]) -> List[int]

Indices where both codes are active (both == 1).

Source code in triadic-microgpt-src/reptimeline/extractors/triadic.py
116
117
118
119
def shared_features(self, code_a: List[int], code_b: List[int]) -> List[int]:
    """Indices where both codes are active (both == 1)."""
    return [i for i in range(min(len(code_a), len(code_b)))
            if code_a[i] == 1 and code_b[i] == 1]

algebraic_similarity(composite_a: int, composite_b: int) -> float

GCD-based similarity using prime factorization.

This is the triadic-specific similarity: Jaccard on prime factors. More meaningful than bit-level Jaccard because it respects the algebraic structure.

Source code in triadic-microgpt-src/reptimeline/extractors/triadic.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def algebraic_similarity(self, composite_a: int, composite_b: int) -> float:
    """GCD-based similarity using prime factorization.

    This is the triadic-specific similarity: Jaccard on prime factors.
    More meaningful than bit-level Jaccard because it respects the
    algebraic structure.
    """
    from src.triadic import prime_factors
    factors_a = set(prime_factors(composite_a))
    factors_b = set(prime_factors(composite_b))
    union = factors_a | factors_b
    if not union:
        return 0.0
    return len(factors_a & factors_b) / len(union)

are_connected(composite_a: int, composite_b: int) -> bool

Two concepts are connected if they share at least one prime factor.

Source code in triadic-microgpt-src/reptimeline/extractors/triadic.py
136
137
138
def are_connected(self, composite_a: int, composite_b: int) -> bool:
    """Two concepts are connected if they share at least one prime factor."""
    return composite_a > 1 and composite_b > 1 and math.gcd(composite_a, composite_b) > 1

Overlays

Primitive Overlay

Triadic-specific analysis: layer emergence, dual coherence, dependency completions.

reptimeline.overlays.primitive_overlay

PrimitiveOverlay — Maps generic timeline events onto the 63 triadic primitives.

This overlay adds triadic-specific semantics on top of the backend-agnostic Timeline produced by TimelineTracker:

  • Primitive activation epochs: when each of the 63 primitives first activates
  • Dependency chain completion: when all deps of a primitive are satisfied
  • Layer emergence order: do layer-1 primitives stabilize before layer-4?
  • Dual axis coherence: when dual pairs become anti-correlated

PrimitiveInfo dataclass

Parsed info for one of the 63 primitives.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
21
22
23
24
25
26
27
28
29
30
@dataclass
class PrimitiveInfo:
    """Parsed info for one of the 63 primitives."""
    bit: int
    prime: int
    name: str
    layer: int
    deps: List[str]
    dual: Optional[str] = None
    definition: str = ""

ActivationEpoch dataclass

When a primitive first becomes active for a given concept.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
33
34
35
36
37
38
39
@dataclass
class ActivationEpoch:
    """When a primitive first becomes active for a given concept."""
    primitive: str
    concept: str
    step: int
    bit_index: int

DepsCompletion dataclass

When all dependencies of a primitive are simultaneously active.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
42
43
44
45
46
47
48
49
@dataclass
class DepsCompletion:
    """When all dependencies of a primitive are simultaneously active."""
    primitive: str
    concept: str
    step: int
    deps: List[str]
    deps_met_steps: Dict[str, int]  # dep_name -> step it was first active

LayerEmergence dataclass

Aggregate statistics for when a layer's primitives emerge.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
52
53
54
55
56
57
58
59
60
61
@dataclass
class LayerEmergence:
    """Aggregate statistics for when a layer's primitives emerge."""
    layer: int
    layer_name: str
    n_primitives: int
    first_activation_step: Optional[int]
    median_activation_step: Optional[float]
    last_activation_step: Optional[int]
    primitives_activated: int

DualCoherence dataclass

Tracks whether dual pairs show expected anti-correlation.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
64
65
66
67
68
69
70
71
@dataclass
class DualCoherence:
    """Tracks whether dual pairs show expected anti-correlation."""
    primitive_a: str
    primitive_b: str
    steps_both_active: List[int]
    steps_exclusive: List[int]  # one active, other not
    coherence_score: float  # fraction of steps that are exclusive (anti-correlated)

PrimitiveReport dataclass

Full overlay analysis output.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
74
75
76
77
78
79
80
81
@dataclass
class PrimitiveReport:
    """Full overlay analysis output."""
    activations: List[ActivationEpoch]
    deps_completions: List[DepsCompletion]
    layer_emergence: List[LayerEmergence]
    dual_coherence: List[DualCoherence]
    metadata: Dict[str, Any] = field(default_factory=dict)

PrimitiveOverlay

Interprets a Timeline through the lens of the 63 triadic primitives.

This is not an extractor or tracker — it takes an already-computed Timeline and overlays domain-specific analysis.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
class PrimitiveOverlay:
    """Interprets a Timeline through the lens of the 63 triadic primitives.

    This is not an extractor or tracker — it takes an already-computed Timeline
    and overlays domain-specific analysis.
    """

    def __init__(self, primitivos_path: Optional[str] = None):
        """
        Args:
            primitivos_path: Path to primitivos.json. If None, auto-detected
                from the project layout.
        """
        if primitivos_path is None:
            here = os.path.dirname(os.path.abspath(__file__))
            primitivos_path = os.path.join(
                here, '..', '..', 'playground', 'danza_data', 'primitivos.json'
            )
        self.primitivos_path = primitivos_path
        self.primitives = self._load_primitives()
        self._name_to_bit = {p.name: p.bit for p in self.primitives}
        self._bit_to_name = {p.bit: p.name for p in self.primitives}
        self._name_to_info = {p.name: p for p in self.primitives}

    def _load_primitives(self) -> List[PrimitiveInfo]:
        with open(self.primitivos_path, 'r', encoding='utf-8') as f:
            data = json.load(f)

        prims = []
        for entry in data['primitivos']:
            prims.append(PrimitiveInfo(
                bit=entry['bit'],
                prime=entry['primo'],
                name=entry['nombre'],
                layer=entry['capa'],
                deps=entry.get('deps', []),
                dual=entry.get('dual'),
                definition=entry.get('def', ''),
            ))
        return prims

    def analyze(self, timeline: Timeline,
                concepts: Optional[List[str]] = None) -> PrimitiveReport:
        """Run full primitive overlay analysis on a Timeline.

        Args:
            timeline: A Timeline from TimelineTracker.analyze().
            concepts: Subset of concepts to analyze. If None, uses all
                concepts from the last snapshot.
        """
        if concepts is None:
            concepts = timeline.snapshots[-1].concepts if timeline.snapshots else []

        activations = self._compute_activations(timeline, concepts)
        deps_completions = self._compute_deps_completions(timeline, concepts, activations)
        layer_emergence = self._compute_layer_emergence(activations)
        dual_coherence = self._compute_dual_coherence(timeline, concepts)

        return PrimitiveReport(
            activations=activations,
            deps_completions=deps_completions,
            layer_emergence=layer_emergence,
            dual_coherence=dual_coherence,
            metadata={
                'n_primitives': len(self.primitives),
                'n_concepts': len(concepts),
                'n_steps': len(timeline.steps),
            },
        )

    # ------------------------------------------------------------------
    # Activation epochs
    # ------------------------------------------------------------------

    def _compute_activations(self, timeline: Timeline,
                             concepts: List[str]) -> List[ActivationEpoch]:
        """Find the first step where each primitive bit activates per concept."""
        activations = []
        for concept in concepts:
            for prim in self.primitives:
                bit_idx = prim.bit
                for snap in timeline.snapshots:
                    code = snap.codes.get(concept)
                    if code is None or bit_idx >= len(code):
                        continue
                    if code[bit_idx] == 1:
                        activations.append(ActivationEpoch(
                            primitive=prim.name,
                            concept=concept,
                            step=snap.step,
                            bit_index=bit_idx,
                        ))
                        break
        return activations

    # ------------------------------------------------------------------
    # Dependency chain completion
    # ------------------------------------------------------------------

    def _compute_deps_completions(self, timeline: Timeline,
                                  concepts: List[str],
                                  activations: List[ActivationEpoch],
                                  ) -> List[DepsCompletion]:
        """Find when all deps of a primitive are simultaneously active."""
        # Build activation lookup: (concept, primitive_name) -> step
        act_lookup: Dict[Tuple[str, str], int] = {}
        for act in activations:
            act_lookup[(act.concept, act.primitive)] = act.step

        completions = []
        for concept in concepts:
            for prim in self.primitives:
                if not prim.deps:
                    continue

                # Check if all deps have activated
                deps_steps: Dict[str, int] = {}
                all_met = True
                for dep_name in prim.deps:
                    key = (concept, dep_name)
                    if key in act_lookup:
                        deps_steps[dep_name] = act_lookup[key]
                    else:
                        all_met = False
                        break

                if not all_met:
                    continue

                # Find first step where all deps are simultaneously active
                completion_step = self._find_simultaneous_activation(
                    timeline, concept, prim.deps
                )
                if completion_step is not None:
                    completions.append(DepsCompletion(
                        primitive=prim.name,
                        concept=concept,
                        step=completion_step,
                        deps=prim.deps,
                        deps_met_steps=deps_steps,
                    ))

        return completions

    def _find_simultaneous_activation(self, timeline: Timeline,
                                      concept: str,
                                      dep_names: List[str]) -> Optional[int]:
        """Return first step where all dep bits are == 1 for a concept."""
        dep_bits = [self._name_to_bit[d] for d in dep_names if d in self._name_to_bit]
        if len(dep_bits) != len(dep_names):
            return None

        for snap in timeline.snapshots:
            code = snap.codes.get(concept)
            if code is None:
                continue
            if all(b < len(code) and code[b] == 1 for b in dep_bits):
                return snap.step
        return None

    # ------------------------------------------------------------------
    # Layer emergence
    # ------------------------------------------------------------------

    def _compute_layer_emergence(self, activations: List[ActivationEpoch],
                                 ) -> List[LayerEmergence]:
        """Aggregate activation epochs by layer."""
        layer_names = {
            1: "Punto (0D)", 2: "Línea (1D)", 3: "Tiempo (1D+t)",
            4: "Plano (2D)", 5: "Volumen (3D)", 6: "Meta (3D+)",
        }

        # Group primitives by layer
        prims_per_layer: Dict[int, List[str]] = {}
        for p in self.primitives:
            prims_per_layer.setdefault(p.layer, []).append(p.name)

        # Earliest activation per primitive (across all concepts)
        prim_first_step: Dict[str, int] = {}
        for act in activations:
            if act.primitive not in prim_first_step:
                prim_first_step[act.primitive] = act.step
            else:
                prim_first_step[act.primitive] = min(
                    prim_first_step[act.primitive], act.step
                )

        results = []
        for layer in sorted(prims_per_layer.keys()):
            prims = prims_per_layer[layer]
            steps = [prim_first_step[p] for p in prims if p in prim_first_step]

            results.append(LayerEmergence(
                layer=layer,
                layer_name=layer_names.get(layer, f"Layer {layer}"),
                n_primitives=len(prims),
                first_activation_step=min(steps) if steps else None,
                median_activation_step=sorted(steps)[len(steps) // 2] if steps else None,
                last_activation_step=max(steps) if steps else None,
                primitives_activated=len(steps),
            ))
        return results

    # ------------------------------------------------------------------
    # Dual coherence
    # ------------------------------------------------------------------

    def _compute_dual_coherence(self, timeline: Timeline,
                                concepts: List[str]) -> List[DualCoherence]:
        """Check whether dual pairs show anti-correlation across training."""
        dual_pairs = [(p.name, p.dual) for p in self.primitives if p.dual]
        # Deduplicate (a,b) and (b,a)
        seen = set()
        unique_pairs = []
        for a, b in dual_pairs:
            key = tuple(sorted([a, b]))
            if key not in seen:
                seen.add(key)
                unique_pairs.append((a, b))

        results = []
        for prim_a, prim_b in unique_pairs:
            bit_a = self._name_to_bit.get(prim_a)
            bit_b = self._name_to_bit.get(prim_b)
            if bit_a is None or bit_b is None:
                continue

            both_active = []
            exclusive = []

            for snap in timeline.snapshots:
                for concept in concepts:
                    code = snap.codes.get(concept)
                    if code is None:
                        continue
                    if bit_a >= len(code) or bit_b >= len(code):
                        continue

                    a_on = code[bit_a] == 1
                    b_on = code[bit_b] == 1

                    if a_on and b_on:
                        both_active.append(snap.step)
                    elif a_on or b_on:
                        exclusive.append(snap.step)

            total = len(both_active) + len(exclusive)
            coherence = len(exclusive) / total if total > 0 else 0.0

            results.append(DualCoherence(
                primitive_a=prim_a,
                primitive_b=prim_b,
                steps_both_active=both_active,
                steps_exclusive=exclusive,
                coherence_score=coherence,
            ))
        return results

    # ------------------------------------------------------------------
    # Pretty print
    # ------------------------------------------------------------------

    def print_report(self, report: PrimitiveReport):
        """Print a structured console summary."""
        print()
        print("=" * 60)
        print("  PRIMITIVE OVERLAY REPORT")
        print("=" * 60)
        meta = report.metadata
        print(f"  Primitives: {meta.get('n_primitives', 0)}")
        print(f"  Concepts:   {meta.get('n_concepts', 0)}")
        print(f"  Steps:      {meta.get('n_steps', 0)}")
        print()

        # Layer emergence
        print("  LAYER EMERGENCE ORDER")
        print("  " + "-" * 56)
        for le in report.layer_emergence:
            activated = f"{le.primitives_activated}/{le.n_primitives}"
            if le.first_activation_step is not None:
                print(f"    L{le.layer} {le.layer_name:<18s}  "
                      f"first={le.first_activation_step:>6,}  "
                      f"median={le.median_activation_step:>6,}  "
                      f"activated={activated}")
            else:
                print(f"    L{le.layer} {le.layer_name:<18s}  "
                      f"NOT YET ACTIVATED  ({activated})")
        print()

        # Dual coherence
        if report.dual_coherence:
            print("  DUAL AXIS COHERENCE")
            print("  " + "-" * 56)
            sorted_dc = sorted(report.dual_coherence,
                               key=lambda d: d.coherence_score, reverse=True)
            for dc in sorted_dc:
                n_both = len(dc.steps_both_active)
                n_excl = len(dc.steps_exclusive)
                print(f"    {dc.primitive_a:<16s} <-> {dc.primitive_b:<16s}  "
                      f"coherence={dc.coherence_score:.2f}  "
                      f"(excl={n_excl}, both={n_both})")
            print()

        # Activation summary by primitive
        if report.activations:
            print(f"  ACTIVATIONS: {len(report.activations)} "
                  f"(primitive x concept events)")
            # Show earliest activation per primitive
            earliest: Dict[str, int] = {}
            for act in report.activations:
                if act.primitive not in earliest:
                    earliest[act.primitive] = act.step
                else:
                    earliest[act.primitive] = min(earliest[act.primitive], act.step)
            sorted_prims = sorted(earliest.items(), key=lambda x: x[1])
            print("  First 10 primitives to activate:")
            for name, step in sorted_prims[:10]:
                info = self._name_to_info.get(name)
                layer = f"L{info.layer}" if info else "?"
                print(f"    {layer}  {name:<20s}  step {step:>6,}")
            print()

        # Deps completion
        if report.deps_completions:
            print(f"  DEPENDENCY COMPLETIONS: {len(report.deps_completions)}")
            print()

        print("=" * 60)

analyze(timeline: Timeline, concepts: Optional[List[str]] = None) -> PrimitiveReport

Run full primitive overlay analysis on a Timeline.

Parameters:

Name Type Description Default
timeline Timeline

A Timeline from TimelineTracker.analyze().

required
concepts Optional[List[str]]

Subset of concepts to analyze. If None, uses all concepts from the last snapshot.

None
Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def analyze(self, timeline: Timeline,
            concepts: Optional[List[str]] = None) -> PrimitiveReport:
    """Run full primitive overlay analysis on a Timeline.

    Args:
        timeline: A Timeline from TimelineTracker.analyze().
        concepts: Subset of concepts to analyze. If None, uses all
            concepts from the last snapshot.
    """
    if concepts is None:
        concepts = timeline.snapshots[-1].concepts if timeline.snapshots else []

    activations = self._compute_activations(timeline, concepts)
    deps_completions = self._compute_deps_completions(timeline, concepts, activations)
    layer_emergence = self._compute_layer_emergence(activations)
    dual_coherence = self._compute_dual_coherence(timeline, concepts)

    return PrimitiveReport(
        activations=activations,
        deps_completions=deps_completions,
        layer_emergence=layer_emergence,
        dual_coherence=dual_coherence,
        metadata={
            'n_primitives': len(self.primitives),
            'n_concepts': len(concepts),
            'n_steps': len(timeline.steps),
        },
    )

print_report(report: PrimitiveReport)

Print a structured console summary.

Source code in triadic-microgpt-src/reptimeline/overlays/primitive_overlay.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
def print_report(self, report: PrimitiveReport):
    """Print a structured console summary."""
    print()
    print("=" * 60)
    print("  PRIMITIVE OVERLAY REPORT")
    print("=" * 60)
    meta = report.metadata
    print(f"  Primitives: {meta.get('n_primitives', 0)}")
    print(f"  Concepts:   {meta.get('n_concepts', 0)}")
    print(f"  Steps:      {meta.get('n_steps', 0)}")
    print()

    # Layer emergence
    print("  LAYER EMERGENCE ORDER")
    print("  " + "-" * 56)
    for le in report.layer_emergence:
        activated = f"{le.primitives_activated}/{le.n_primitives}"
        if le.first_activation_step is not None:
            print(f"    L{le.layer} {le.layer_name:<18s}  "
                  f"first={le.first_activation_step:>6,}  "
                  f"median={le.median_activation_step:>6,}  "
                  f"activated={activated}")
        else:
            print(f"    L{le.layer} {le.layer_name:<18s}  "
                  f"NOT YET ACTIVATED  ({activated})")
    print()

    # Dual coherence
    if report.dual_coherence:
        print("  DUAL AXIS COHERENCE")
        print("  " + "-" * 56)
        sorted_dc = sorted(report.dual_coherence,
                           key=lambda d: d.coherence_score, reverse=True)
        for dc in sorted_dc:
            n_both = len(dc.steps_both_active)
            n_excl = len(dc.steps_exclusive)
            print(f"    {dc.primitive_a:<16s} <-> {dc.primitive_b:<16s}  "
                  f"coherence={dc.coherence_score:.2f}  "
                  f"(excl={n_excl}, both={n_both})")
        print()

    # Activation summary by primitive
    if report.activations:
        print(f"  ACTIVATIONS: {len(report.activations)} "
              f"(primitive x concept events)")
        # Show earliest activation per primitive
        earliest: Dict[str, int] = {}
        for act in report.activations:
            if act.primitive not in earliest:
                earliest[act.primitive] = act.step
            else:
                earliest[act.primitive] = min(earliest[act.primitive], act.step)
        sorted_prims = sorted(earliest.items(), key=lambda x: x[1])
        print("  First 10 primitives to activate:")
        for name, step in sorted_prims[:10]:
            info = self._name_to_info.get(name)
            layer = f"L{info.layer}" if info else "?"
            print(f"    {layer}  {name:<20s}  step {step:>6,}")
        print()

    # Deps completion
    if report.deps_completions:
        print(f"  DEPENDENCY COMPLETIONS: {len(report.deps_completions)}")
        print()

    print("=" * 60)