How to Implement Experience Replay in Production
Before You Start
You need a retrieval system with a feedback loop that produces reward signals for each interaction. If you have not built that yet, start with How to Build a Feedback Loop for AI Retrieval. Experience replay sits on top of the feedback loop, acting as a training data management layer that decouples feedback collection from ranking parameter updates.
Step-by-Step Implementation
Each experience in the replay buffer captures the full context of a retrieval interaction: the query, the state of the ranking model at the time, the results that were served, the user's behavioral response, and the computed reward. This tuple contains everything needed to learn from the experience without accessing external systems.
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class Experience:
event_id: str
timestamp: float
query: str
query_embedding: List[float]
results_served: List[Dict]
ranking_params: Dict[str, float]
feedback_signals: Dict[str, float]
reward: float
priority: float = 1.0The ranking_params field captures the scoring weights used when the results were generated. This is critical for learning: you need to know which parameter configuration produced which outcome to update parameters effectively.
The replay buffer is a fixed-capacity store that holds recent experiences. When the buffer is full, new experiences replace the oldest ones. This circular buffer design ensures bounded memory usage while always containing the most recent interactions.
from collections import deque
import random
class ReplayBuffer:
def __init__(self, capacity=10000):
self.buffer = deque(maxlen=capacity)
self.capacity = capacity
def add(self, experience):
self.buffer.append(experience)
def size(self):
return len(self.buffer)
def sample_uniform(self, batch_size):
batch_size = min(batch_size, len(self.buffer))
return random.sample(list(self.buffer), batch_size)
def sample_priority(self, batch_size):
batch_size = min(batch_size, len(self.buffer))
weights = [exp.priority for exp in self.buffer]
total = sum(weights)
probs = [w / total for w in weights]
indices = random.choices(
range(len(self.buffer)),
weights=probs,
k=batch_size
)
return [self.buffer[i] for i in indices]
def clear(self):
self.buffer.clear()Uniform sampling treats all experiences equally. Priority sampling weights experiences by their learning value, so unusual or surprising outcomes get replayed more frequently. Priority sampling accelerates learning but can introduce bias if not managed carefully.
Set priority based on the absolute value of the reward (strong positive or strong negative outcomes are more informative than neutral ones) and the prediction error (how different the actual reward was from what the system expected). Experiences where the system was confidently wrong are the most valuable for learning.
def compute_priority(experience, predicted_reward):
# TD error: how surprised were we by the actual reward
td_error = abs(experience.reward - predicted_reward)
# Also prioritize extreme outcomes
magnitude = abs(experience.reward)
# Combine with a small epsilon to ensure all experiences
# have non-zero priority
priority = td_error + 0.1 * magnitude + 0.01
return priority
def add_experience_with_priority(buffer, experience,
reward_predictor):
predicted = reward_predictor.predict(
experience.query_embedding,
experience.ranking_params
)
experience.priority = compute_priority(
experience, predicted
)
buffer.add(experience)Periodically sample a batch from the replay buffer and use it to update ranking parameters. The update process takes each experience in the batch, computes what the reward would have been under the current ranking parameters (counterfactual evaluation), and adjusts parameters to increase the expected reward.
def train_from_replay(buffer, ranking_model,
batch_size=64, learning_rate=0.01):
if buffer.size() < batch_size:
return # Not enough data yet
batch = buffer.sample_priority(batch_size)
for experience in batch:
# Compute gradient direction: what parameter change
# would have increased the reward?
gradient = ranking_model.compute_gradient(
query=experience.query_embedding,
results=experience.results_served,
reward=experience.reward,
old_params=experience.ranking_params
)
# Apply update with learning rate
ranking_model.update_params(
gradient,
learning_rate=learning_rate
)Recent experiences should influence learning more than old ones because user behavior and content change over time. Apply a temporal discount that reduces the weight of older experiences during training without removing them from the buffer entirely.
import math
def temporal_weight(experience, current_time,
half_life_hours=168):
age_hours = (current_time - experience.timestamp) / 3600
decay = math.exp(
-0.693 * age_hours / half_life_hours
)
return decay
def train_with_temporal_weighting(buffer, ranking_model,
batch_size=64):
batch = buffer.sample_priority(batch_size)
current_time = time.time()
for experience in batch:
weight = temporal_weight(experience, current_time)
gradient = ranking_model.compute_gradient(
query=experience.query_embedding,
results=experience.results_served,
reward=experience.reward * weight,
old_params=experience.ranking_params
)
ranking_model.update_params(
gradient,
learning_rate=0.01 * weight
)Track buffer utilization (how full the buffer is), sample diversity (whether training batches represent a range of query types and users), and learning stability (whether ranking parameters are converging or oscillating). Alert if the buffer becomes dominated by a narrow set of query types, which would cause the system to overfit to those patterns.
def buffer_health_report(buffer):
experiences = list(buffer.buffer)
if not experiences:
return {"status": "empty"}
rewards = [e.reward for e in experiences]
ages = [time.time() - e.timestamp for e in experiences]
users = set(e.event_id.split("_")[0]
for e in experiences)
return {
"size": len(experiences),
"capacity_pct": len(experiences) / buffer.capacity,
"avg_reward": sum(rewards) / len(rewards),
"reward_std": statistics.stdev(rewards)
if len(rewards) > 1 else 0,
"avg_age_hours": sum(ages) / len(ages) / 3600,
"unique_contexts": len(users),
"oldest_hours": max(ages) / 3600,
"newest_hours": min(ages) / 3600
}How Adaptive Recall Handles This
Adaptive Recall's ACT-R activation system functions as a continuous experience replay mechanism. Every retrieval event updates the activation history of each memory, and the activation equation considers all past access events (weighted by recency) when computing the current activation level. This is mathematically equivalent to replaying all past experiences with exponential temporal decay, but it is computed efficiently as a single equation rather than requiring explicit buffer management and batch training.
Get experience-driven retrieval improvement without managing replay infrastructure. Adaptive Recall's activation dynamics handle the learning automatically.
Get Started Free