How to Build a Memory Consolidation Pipeline
Before You Start
Consolidation operates on an existing memory store that has been accumulating entries over time. You need at least a few hundred memories before consolidation produces meaningful results, because smaller stores rarely have enough redundancy to warrant merging. You also need entity data on your memories, because entity overlap is the primary signal for identifying related memories. If your memories do not have extracted entities, start with entity extraction before building the consolidation pipeline.
If you are using Adaptive Recall, the reflect tool handles consolidation automatically. This guide is for developers who want to understand the mechanics or build consolidation into a custom memory system.
Step-by-Step Implementation
Two memories are consolidation candidates when they share enough entities or have high enough semantic similarity that they are likely about the same topic. Set an entity overlap threshold, typically two or more shared entities, and a semantic similarity floor, typically 0.75 cosine similarity or higher. Memories that meet either threshold enter the clustering phase. Setting these values too low produces clusters of loosely related memories that should not be merged. Setting them too high misses genuinely redundant entries. Start conservative with high thresholds and lower them after observing results.
ENTITY_OVERLAP_MIN = 2
SIMILARITY_THRESHOLD = 0.75
def are_candidates(mem_a, mem_b):
shared = set(mem_a['entities']).intersection(mem_b['entities'])
if len(shared) >= ENTITY_OVERLAP_MIN:
return True
sim = cosine_similarity(mem_a['embedding'], mem_b['embedding'])
return sim >= SIMILARITY_THRESHOLDIterate through your memory store and group memories that meet the consolidation criteria. Use a union-find or connected-components algorithm so that if memory A is a candidate with memory B, and memory B is a candidate with memory C, all three end up in the same cluster even if A and C do not directly meet the threshold. Cap cluster size at a reasonable limit, such as 20 memories, to prevent runaway merges that combine loosely connected chains into a single oversized entry.
def cluster_memories(memories):
parent = {m['id']: m['id'] for m in memories}
def find(x):
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a, b):
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
for i, ma in enumerate(memories):
for mb in memories[i+1:]:
if are_candidates(ma, mb):
union(ma['id'], mb['id'])
clusters = {}
for m in memories:
root = find(m['id'])
clusters.setdefault(root, []).append(m)
return [c for c in clusters.values() if len(c) > 1]For each cluster, compare every pair of memories to determine which ones are saying essentially the same thing. Two memories are redundant when their semantic similarity exceeds 0.90 and they share the same core entities. Redundant pairs should be merged, with the more recent or more detailed version serving as the base. Non-redundant memories in the same cluster may still benefit from being linked in the knowledge graph even if they are not merged, because they cover different aspects of the same topic.
Within each cluster, scan for factual conflicts. This is the hardest part of consolidation because automated systems cannot always determine which claim is correct. Use a priority order: first check recency, because newer information is more likely current. Then check confidence scores, because well-corroborated memories are more likely accurate. Then check corroboration count, because a fact confirmed by three independent observations is more reliable than one confirmed by a single source. Log every contradiction and its resolution so the pipeline is auditable.
def resolve_contradiction(mem_a, mem_b):
# prefer the more recently updated memory
if mem_a['updated_at'] > mem_b['updated_at']:
winner, loser = mem_a, mem_b
elif mem_b['updated_at'] > mem_a['updated_at']:
winner, loser = mem_b, mem_a
# fall back to confidence
elif mem_a['confidence'] > mem_b['confidence']:
winner, loser = mem_a, mem_b
else:
winner, loser = mem_b, mem_a
log_resolution(winner, loser, reason='contradiction')
return winnerFor each set of redundant memories, create a single merged entry. The merged content should be the most complete version available, incorporating unique details from each source. The activation value should be the maximum from the group, because the merged memory should be at least as accessible as the most-accessed source. The confidence score should reflect the combined corroboration, typically increasing when multiple independent sources agree. The entity list should be the union of all source entities. Delete the source memories after the merge is complete and verified.
def merge_memories(cluster):
# sort by confidence then recency to pick best base
ranked = sorted(cluster,
key=lambda m: (m['confidence'], m['updated_at']),
reverse=True)
base = ranked[0].copy()
all_entities = set()
max_activation = base.get('activation', 0)
total_corroboration = base.get('corroboration_count', 1)
for m in ranked[1:]:
all_entities.update(m.get('entities', []))
max_activation = max(max_activation, m.get('activation', 0))
total_corroboration += m.get('corroboration_count', 1)
all_entities.update(base.get('entities', []))
base['entities'] = list(all_entities)
base['activation'] = max_activation
base['corroboration_count'] = total_corroboration
base['confidence'] = min(10.0, base['confidence'] + 0.5)
base['embedding'] = generate_embedding(base['content'])
return baseAfter running consolidation, verify that retrieval quality has not degraded. Run a set of known queries and confirm that the expected memories still appear in the top results. Log every merge operation with the source memory IDs, the resulting merged memory ID, and the reason for the merge. This audit trail lets you reverse merges if consolidation produces unexpected results, and it provides data for tuning your thresholds over time.
Scheduling Consolidation
Consolidation should run periodically rather than after every new memory, because the clustering and comparison operations scale quadratically with the number of memories in the worst case. For most applications, running consolidation weekly or bi-weekly strikes the right balance between keeping the store clean and minimizing compute costs. High-volume systems that ingest hundreds of memories per day may benefit from nightly consolidation runs.
Run consolidation during off-peak hours when retrieval traffic is low. The process can temporarily lock memory entries during merging, which may affect concurrent retrieval calls. Adaptive Recall's reflect tool handles this by operating on snapshots and applying merges atomically, so retrieval is never blocked, but custom implementations should account for concurrency.
Expected Results
A well-tuned consolidation pipeline typically reduces memory count by 30% to 40% on the first run, with smaller reductions on subsequent runs as the store reaches a steady state. Retrieval quality improves because results are drawn from more complete, higher-confidence memories with richer entity connections. Storage and embedding costs decrease proportionally to the reduction in memory count.
Skip the pipeline and let consolidation run automatically. Adaptive Recall's reflect tool handles clustering, merging, and contradiction resolution out of the box.
Get Started Free