The Architecture of 77,000 Examples: Complete Data Structure Design
The Architecture of 77,000 Examples: Complete Data Structure Design Guide
⏱️ Read Time: 25 minutes | 🎓 Level: Intermediate | 📊 Real Production Architecture
<div className="bg-gradient-to-r from-purple-900/20 to-blue-900/20 p-6 rounded-lg border border-purple-500/20 mb-8"> <h2 className="text-xl font-bold text-purple-400 mb-4">🏗️ Why This Architecture Guide Matters</h2> <ul className="space-y-2 text-gray-300"> <li>✓ <strong>Battle-tested at scale</strong> - Proven with 77,000 real examples</li> <li>✓ <strong>Complete blueprint</strong> - Every design decision explained</li> <li>✓ <strong>Avoid my mistakes</strong> - Learn from 18 months of iteration</li> <li>✓ <strong>Production-ready</strong> - Used to train actual AI models</li> <li>✓ <strong>Scalable design</strong> - From 100 to 100,000+ examples</li> </ul> </div>Table of Contents
- The Journey: 100 to 77,000
- Core Architecture Principles
- Schema Evolution: 5 Major Versions
- Final Production Schema
- Storage Strategy: JSON to Parquet
- Version Control at Scale
- Quality Assurance Architecture
- Performance Optimization
- Lessons Learned & Mistakes
- Complete Implementation Guide
The Journey: From 100 to 77,000 {#journey}
When I started, I thought creating a few hundred examples would be enough. I was wrong. Following research from papers like <a href="https://arxiv.org/abs/2203.15556" target="_blank" rel="noopener noreferrer">Training language models to follow instructions with human feedback</a>, I learned that quality and scale both matter.
The Realization
After training my first model with 500 examples, the results were disappointing. The AI could handle basic cases but failed on edge cases, nuanced scenarios, and anything outside the narrow training distribution.
The moment of truth came when I compared my results to industry benchmarks:
- My 500 examples: 65% accuracy
- Commercial datasets (10,000+): 89% accuracy
- Research datasets (50,000+): 94% accuracy
I needed scale. But more importantly, I needed the RIGHT architecture to handle that scale.
Growth Timeline
- Month 1: 100 examples (manual JSON files)
- Month 3: 1,000 examples (structured folders)
- Month 6: 5,000 examples (database required)
- Month 12: 25,000 examples (version control crisis)
- Month 18: 77,000 examples (final architecture)
Each milestone forced architectural changes. Here's what I learned.
Core Architecture Principles {#principles}
After 18 months and 5 major redesigns, these principles emerged:
1. Immutable Examples
Once created, examples never change. Updates create new versions.
❌ Wrong: Modify example_001.json
✅ Right: Create example_001_v2.json
2. Rich Metadata
Every example carries complete context about its creation, quality, and usage.
3. Hierarchical Organization
Examples are organized by domain → category → subcategory → difficulty.
4. Versioned Everything
Schema, examples, validation rules, and tools all have explicit versions.
5. Quality First
Architecture must support automated quality checks at every level.
6. Performance by Design
Fast querying, filtering, and batch processing built-in from day one.
Schema Evolution: 5 Major Versions {#evolution}
Version 1: The Naive Approach (Examples 1-100)
{
"input": "How do I install Python?",
"output": "Download from python.org and run the installer."
}
Problems discovered:
- No metadata or context
- No quality tracking
- No categorization
- No versioning
Lasted: 2 weeks
Version 2: Adding Basic Metadata (Examples 101-500)
{
"id": "example_001",
"input": "How do I install Python?",
"output": "Download from python.org and run the installer.",
"created_date": "2023-06-15",
"category": "programming"
}
Improvements:
- Unique IDs
- Creation tracking
- Basic categorization
Problems discovered:
- No quality scoring
- Limited search capability
- No difficulty levels
Lasted: 1 month
Version 3: Quality and Context (Examples 501-2,000)
{
"id": "example_001",
"version": "1.0",
"input": "How do I install Python?",
"output": "Download from python.org and run the installer.",
"metadata": {
"category": "programming",
"subcategory": "installation",
"difficulty": "beginner",
"quality_score": 0.85,
"created_date": "2023-06-15",
"created_by": "manual",
"tags": ["python", "installation", "beginner"]
}
}
Improvements:
- Quality scoring
- Difficulty levels
- Better categorization
- Tagging system
Problems discovered:
- Inconsistent quality scoring
- No validation rules
- Manual tagging errors
Lasted: 3 months
Version 4: Validation and Standards (Examples 2,001-10,000)
{
"id": "example_001",
"version": "2.0",
"schema_version": "4.0",
"input": {
"text": "How do I install Python?",
"context": "User is on Windows 10, new to programming",
"intent": "installation_help"
},
"output": {
"text": "Download from python.org and run the installer.",
"confidence": 0.95,
"citations": ["https://python.org/downloads"]
},
"metadata": {
"domain": "programming",
"category": "installation",
"subcategory": "python",
"difficulty": "beginner",
"quality_metrics": {
"accuracy": 0.90,
"completeness": 0.85,
"clarity": 0.88,
"overall": 0.87
},
"validation": {
"auto_checks_passed": true,
"manual_review": true,
"reviewer": "expert_001"
},
"created_date": "2023-06-15T10:30:00Z",
"modified_date": "2023-06-15T10:30:00Z",
"tags": ["python", "installation", "windows", "beginner"],
"usage_stats": {
"training_count": 5,
"success_rate": 0.92
}
}
}
Improvements:
- Structured input/output
- Multi-dimensional quality metrics
- Validation tracking
- Usage statistics
- Proper ISO timestamps
Problems discovered:
- Too complex for simple examples
- Performance issues with large objects
- Difficult to query efficiently
Lasted: 6 months
Version 5: Production Architecture (Examples 10,001-77,000)
This is the final schema that scaled to 77,000 examples:
{
"core": {
"id": "ds_77k_12345",
"version": "1.0",
"schema_version": "5.0",
"created_at": "2024-01-15T14:22:33.123Z",
"updated_at": "2024-01-15T14:22:33.123Z"
},
"content": {
"input": {
"primary": "How do I install Python on Windows 10?",
"context": "User is completely new to programming",
"constraints": ["Windows 10", "beginner-friendly", "latest_version"]
},
"output": {
"primary": "1. Go to python.org/downloads\n2. Click 'Download Python 3.12.0'\n3. Run the installer\n4. Check 'Add Python to PATH'\n5. Click 'Install Now'",
"reasoning": "Step-by-step approach for beginners with critical PATH setting",
"confidence": 0.95,
"alternatives": ["Using Microsoft Store", "Using Anaconda"],
"validation": "Tested on Windows 10 Home and Pro"
}
},
"classification": {
"domain": "programming",
"category": "installation",
"subcategory": "python",
"difficulty": "beginner",
"complexity": 2,
"intent": "procedural_help",
"tags": ["python", "windows", "installation", "beginner", "setup"]
},
"quality": {
"scores": {
"accuracy": 0.95,
"completeness": 0.90,
"clarity": 0.92,
"usefulness": 0.94,
"overall": 0.93
},
"validation": {
"auto_passed": true,
"manual_reviewed": true,
"expert_approved": true,
"test_results": "passed"
},
"reviewer": {
"id": "reviewer_003",
"expertise_level": "expert",
"review_date": "2024-01-15T15:45:12.456Z"
}
},
"usage": {
"training_sessions": 12,
"success_rate": 0.91,
"feedback_score": 4.2,
"last_used": "2024-03-10T09:15:22.789Z"
},
"provenance": {
"source": "manual_creation",
"creator": "dataset_architect_001",
"method": "expert_knowledge",
"inspiration": null,
"verification": "community_validated"
}
}
Key improvements:
- Modular structure for easy querying
- Comprehensive provenance tracking
- Real usage feedback integration
- Optimized for database storage
- Backward compatibility
Result: Scaled to 77,000 examples with excellent performance.
Storage Strategy: From JSON to Parquet {#storage}
The Storage Evolution
Phase 1: Individual JSON Files (1-1,000)
dataset/
├── example_001.json
├── example_002.json
└── ...
Pros: Simple, human-readable, version control friendly Cons: Slow queries, no indexing, file system limits
Phase 2: Structured Directories (1,001-5,000)
dataset/
├── programming/
│ ├── installation/
│ │ ├── python/
│ │ │ ├── beginner/
│ │ │ │ ├── example_001.json
│ │ │ └── intermediate/
│ │ └── javascript/
│ └── debugging/
└── writing/
Pros: Logical organization, faster category queries Cons: Deep nesting, cross-category queries slow
Phase 3: Database Storage (5,001-25,000)
Switched to PostgreSQL with JSONB columns.
CREATE TABLE training_examples (
id SERIAL PRIMARY KEY,
example_id VARCHAR(50) UNIQUE,
content JSONB,
metadata JSONB,
created_at TIMESTAMP,
updated_at TIMESTAMP
);
CREATE INDEX idx_category ON training_examples
USING GIN ((metadata->'classification'->>'category'));
CREATE INDEX idx_quality ON training_examples
USING GIN ((metadata->'quality'->>'overall'));
Pros: Fast queries, ACID compliance, indexing Cons: Complex setup, JSON still inefficient for analytics
Phase 4: Parquet + Database Hybrid (25,001-77,000)
Final architecture: PostgreSQL for metadata, Parquet for bulk storage.
# Metadata in PostgreSQL
CREATE TABLE example_metadata (
id VARCHAR(50) PRIMARY KEY,
domain VARCHAR(50),
category VARCHAR(50),
difficulty VARCHAR(20),
quality_overall DECIMAL(3,2),
created_at TIMESTAMP,
file_path VARCHAR(200)
);
# Content in Parquet files
# Partitioned by domain/category for fast queries
data/
├── programming/
│ ├── installation/
│ │ └── examples_001.parquet
│ └── debugging/
│ └── examples_002.parquet
└── writing/
├── creative/
│ └── examples_003.parquet
└── technical/
└── examples_004.parquet
Query Performance Results:
- Filter by category: 15ms → 2ms
- Complex multi-field search: 2.3s → 180ms
- Bulk export: 45s → 8s
- Storage space: 2.1GB → 890MB
Version Control at Scale {#version}
The Version Control Crisis (Month 12)
At 25,000 examples, Git became unusable:
- 45-minute commits
- 2GB repository size
- Merge conflicts on binary data
- Team collaboration impossible
Solution: Git + DVC (Data Version Control)
# Initialize DVC
dvc init
# Add remote storage (S3)
dvc remote add -d storage s3://ai-dataset-storage/77k-project
# Track data with DVC, metadata with Git
dvc add data/
git add data.dvc .gitignore
git commit -m "Add dataset v1.0"
# Create data versions
dvc tag v1.0
git tag dataset-v1.0
Branching Strategy for Data
main (production dataset)
├── feature/quality-improvements
├── experiment/synthetic-augmentation
└── release/v2.0-candidate
Git tracks:
- Schema definitions
- Validation rules
- Processing scripts
- Metadata changes
DVC tracks:
- Actual example content
- Generated datasets
- Model checkpoints
- Processed features
Version Tagging System
dataset-v1.0-stable (10,000 examples)
dataset-v1.1-quality (12,500 examples, improved quality)
dataset-v1.2-augmented (15,000 examples, synthetic data)
dataset-v2.0-restructured (25,000 examples, new schema)
dataset-v2.1-validated (30,000 examples, expert review)
dataset-v3.0-production (77,000 examples, final version)
Quality Assurance Architecture {#quality}
Multi-Layer Quality System
Layer 1: Automated Validation
Every example passes through automated checks:
class ExampleValidator:
def __init__(self):
self.checks = [
self.validate_schema,
self.validate_content_length,
self.validate_language,
self.check_for_duplicates,
self.validate_tags,
self.check_quality_scores
]
def validate_example(self, example):
results = {}
for check in self.checks:
try:
results[check.__name__] = check(example)
except Exception as e:
results[check.__name__] = {"passed": False, "error": str(e)}
return {
"passed": all(r.get("passed", False) for r in results.values()),
"details": results
}
def validate_schema(self, example):
# Validate against JSON schema
jsonschema.validate(example, self.schema)
return {"passed": True}
def validate_content_length(self, example):
input_len = len(example["content"]["input"]["primary"])
output_len = len(example["content"]["output"]["primary"])
if input_len < 10 or input_len > 2000:
return {"passed": False, "reason": "Input length out of range"}
if output_len < 5 or output_len > 5000:
return {"passed": False, "reason": "Output length out of range"}
return {"passed": True}
def check_for_duplicates(self, example):
# Use embeddings to find semantic duplicates
embedding = self.get_embedding(example["content"]["input"]["primary"])
similar = self.find_similar(embedding, threshold=0.95)
if similar:
return {"passed": False, "reason": f"Similar to {similar[0]['id']}"}
return {"passed": True}
Layer 2: Statistical Quality Control
class QualityAnalyzer:
def analyze_batch(self, examples):
metrics = {
"length_distribution": self.analyze_lengths(examples),
"difficulty_distribution": self.analyze_difficulty(examples),
"quality_trends": self.analyze_quality_trends(examples),
"tag_consistency": self.analyze_tags(examples),
"outlier_detection": self.detect_outliers(examples)
}
return metrics
def detect_outliers(self, examples):
# Find examples that don't fit the pattern
features = self.extract_features(examples)
# Use Isolation Forest for outlier detection
iso_forest = IsolationForest(contamination=0.05)
outliers = iso_forest.fit_predict(features)
outlier_examples = [
ex for ex, is_outlier in zip(examples, outliers)
if is_outlier == -1
]
return outlier_examples
Layer 3: Human Review Process
class ReviewWorkflow:
def __init__(self):
self.review_queue = PriorityQueue()
self.reviewers = {
"expert": ["reviewer_001", "reviewer_002"],
"intermediate": ["reviewer_003", "reviewer_004", "reviewer_005"],
"junior": ["reviewer_006", "reviewer_007", "reviewer_008"]
}
def assign_for_review(self, example):
# Determine review priority
priority = self.calculate_priority(example)
# Select appropriate reviewer level
reviewer_level = self.select_reviewer_level(example)
# Add to review queue
self.review_queue.put((priority, example, reviewer_level))
def calculate_priority(self, example):
# High priority for:
# - Low automated quality scores
# - New domains/categories
# - Flagged by outlier detection
# - Random sampling (10% of all examples)
score = 0
if example["quality"]["scores"]["overall"] < 0.8:
score += 10
if example["classification"]["domain"] not in self.known_domains:
score += 5
if example["id"] in self.flagged_examples:
score += 8
if random.random() < 0.1: # 10% random sampling
score += 3
return score
Quality Metrics Dashboard
Real-time monitoring of quality across the entire dataset:
class QualityDashboard:
def generate_report(self):
return {
"overall_stats": {
"total_examples": 77000,
"avg_quality": 0.89,
"examples_under_threshold": 432,
"auto_validation_rate": 0.94
},
"domain_breakdown": {
"programming": {"count": 35000, "avg_quality": 0.91},
"writing": {"count": 20000, "avg_quality": 0.87},
"analysis": {"count": 15000, "avg_quality": 0.88},
"creative": {"count": 7000, "avg_quality": 0.85}
},
"quality_trends": {
"last_30_days": [0.87, 0.88, 0.89, 0.90, 0.89],
"improvement_rate": 0.023
},
"review_status": {
"pending_review": 156,
"in_review": 23,
"review_backlog_days": 2.3
}
}
Performance Optimization {#performance}
Query Optimization Strategies
1. Smart Indexing
-- Multi-column indexes for common query patterns
CREATE INDEX idx_domain_category_difficulty
ON example_metadata (domain, category, difficulty);
-- Partial indexes for high-quality examples
CREATE INDEX idx_high_quality
ON example_metadata (quality_overall)
WHERE quality_overall >= 0.85;
-- Gin indexes for tag arrays
CREATE INDEX idx_tags
ON example_metadata USING GIN (tags);
2. Query Pattern Analysis
After analyzing 10,000+ queries, I identified the most common patterns:
# Most common query patterns (with optimization)
class OptimizedQueries:
def get_by_category(self, category, limit=100):
# Optimized with category index
return self.db.execute(
"SELECT * FROM example_metadata WHERE category = %s LIMIT %s",
[category, limit]
).fetchall()
def get_high_quality_examples(self, min_quality=0.9, domain=None):
# Uses partial index for quality + domain filter
query = "SELECT * FROM example_metadata WHERE quality_overall >= %s"
params = [min_quality]
if domain:
query += " AND domain = %s"
params.append(domain)
return self.db.execute(query, params).fetchall()
def search_by_tags(self, tags, match_all=True):
# Optimized GIN index query
if match_all:
query = "SELECT * FROM example_metadata WHERE tags @> %s"
else:
query = "SELECT * FROM example_metadata WHERE tags && %s"
return self.db.execute(query, [tags]).fetchall()
3. Caching Strategy
class CacheManager:
def __init__(self):
self.redis_client = redis.Redis()
self.cache_ttl = {
"metadata": 3600, # 1 hour
"search_results": 1800, # 30 minutes
"quality_stats": 7200, # 2 hours
}
def get_cached_search(self, query_hash):
return self.redis_client.get(f"search:{query_hash}")
def cache_search_results(self, query_hash, results):
self.redis_client.setex(
f"search:{query_hash}",
self.cache_ttl["search_results"],
json.dumps(results)
)
Batch Processing Pipeline
class BatchProcessor:
def __init__(self, batch_size=1000):
self.batch_size = batch_size
def process_examples(self, examples):
# Process in batches for memory efficiency
for i in range(0, len(examples), self.batch_size):
batch = examples[i:i + self.batch_size]
# Parallel processing within batch
with ProcessPoolExecutor(max_workers=8) as executor:
futures = [
executor.submit(self.process_single_example, ex)
for ex in batch
]
results = [future.result() for future in futures]
# Batch insert to database
self.bulk_insert(results)
# Progress tracking
self.update_progress(i + len(batch), len(examples))
def bulk_insert(self, examples):
# Use COPY for fast insertion
with self.db.cursor() as cursor:
cursor.copy_from(
io.StringIO(self.format_for_copy(examples)),
'example_metadata',
sep='\t',
columns=('id', 'domain', 'category', 'content_path', 'quality_overall')
)
Lessons Learned & Mistakes {#lessons}
Major Mistakes and Their Costs
Mistake 1: No Schema Versioning (Cost: 2 weeks of rework)
What happened: Changed schema without versioning, broke compatibility with existing tools.
Lesson: Always version your schema and maintain backward compatibility.
# Wrong approach
example = {"input": "...", "output": "..."}
# Right approach
example = {
"schema_version": "5.0",
"core": {...},
"content": {...}
}
def migrate_schema(example, target_version):
current = example.get("schema_version", "1.0")
while current != target_version:
example = MIGRATION_FUNCTIONS[current](example)
current = example["schema_version"]
return example
Mistake 2: Premature Database Optimization (Cost: 1 month)
What happened: Built complex indexing before understanding query patterns.
Lesson: Profile real usage before optimizing.
Mistake 3: Manual Quality Scoring (Cost: 3 months of inconsistent data)
What happened: Different reviewers used different quality standards.
Lesson: Create detailed rubrics and calibration processes.
# Quality scoring rubric
QUALITY_RUBRIC = {
"accuracy": {
1.0: "Factually perfect, no errors",
0.8: "Minor inaccuracies that don't affect core message",
0.6: "Some errors but generally correct",
0.4: "Significant errors affecting usefulness",
0.2: "Major errors, misleading information",
0.0: "Completely incorrect or harmful"
},
"completeness": {
1.0: "Addresses all aspects of the input thoroughly",
0.8: "Covers most important aspects",
0.6: "Adequate coverage of main points",
0.4: "Missing important information",
0.2: "Incomplete, leaves major gaps",
0.0: "Severely incomplete"
}
# ... more dimensions
}
Mistake 4: Ignoring Data Lineage (Cost: Unable to debug model issues)
What happened: Couldn't trace which examples caused model problems.
Lesson: Track complete data lineage from creation to usage.
class DataLineage:
def track_creation(self, example_id, source_info):
self.lineage_db.insert({
"example_id": example_id,
"event_type": "created",
"timestamp": datetime.utcnow(),
"details": source_info
})
def track_modification(self, example_id, change_info):
self.lineage_db.insert({
"example_id": example_id,
"event_type": "modified",
"timestamp": datetime.utcnow(),
"details": change_info
})
def track_usage(self, example_id, model_id, training_session):
self.lineage_db.insert({
"example_id": example_id,
"event_type": "used_in_training",
"timestamp": datetime.utcnow(),
"model_id": model_id,
"session_id": training_session
})
Key Success Factors
1. Start Simple, Evolve Systematically
Don't build for 100,000 examples when you have 100. But design evolution paths.
2. Automate Quality Early
Manual review doesn't scale. Build automated quality checks from day one.
3. Measure Everything
You can't improve what you don't measure. Track quality, usage, and performance.
4. Plan for Distribution
Consider how others will use your data. APIs, exports, and documentation matter.
Complete Implementation Guide {#implementation}
Step 1: Set Up the Foundation
# Project structure
mkdir ai-dataset-project
cd ai-dataset-project
# Initialize Git and DVC
git init
dvc init
# Create directory structure
mkdir -p {data,scripts,schemas,docs,tests}
mkdir -p data/{raw,processed,exports}
mkdir -p scripts/{validation,processing,analysis}
Step 2: Define Your Schema
# schemas/example_schema_v1.py
from pydantic import BaseModel, Field
from typing import List, Dict, Optional
from datetime import datetime
class ExampleCore(BaseModel):
id: str = Field(..., regex=r"^[a-zA-Z0-9_-]+$")
version: str = "1.0"
schema_version: str = "1.0"
created_at: datetime
updated_at: datetime
class ExampleContent(BaseModel):
input: Dict[str, str] # primary, context, constraints
output: Dict[str, str] # primary, reasoning, confidence
class ExampleClassification(BaseModel):
domain: str
category: str
subcategory: str
difficulty: str
tags: List[str]
class QualityScores(BaseModel):
accuracy: float = Field(..., ge=0.0, le=1.0)
completeness: float = Field(..., ge=0.0, le=1.0)
clarity: float = Field(..., ge=0.0, le=1.0)
overall: float = Field(..., ge=0.0, le=1.0)
class TrainingExample(BaseModel):
core: ExampleCore
content: ExampleContent
classification: ExampleClassification
quality: QualityScores
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
Step 3: Build Validation Pipeline
# scripts/validation/validator.py
import json
import jsonschema
from typing import List, Dict
class ExampleValidator:
def __init__(self, schema_path: str):
with open(schema_path) as f:
self.schema = json.load(f)
self.validation_rules = [
self.validate_schema,
self.validate_uniqueness,
self.validate_quality_scores,
self.validate_tags,
self.validate_content_length
]
def validate_batch(self, examples: List[Dict]) -> Dict:
results = {
"total": len(examples),
"passed": 0,
"failed": 0,
"errors": []
}
for i, example in enumerate(examples):
try:
is_valid, errors = self.validate_single(example)
if is_valid:
results["passed"] += 1
else:
results["failed"] += 1
results["errors"].append({
"index": i,
"id": example.get("core", {}).get("id", "unknown"),
"errors": errors
})
except Exception as e:
results["failed"] += 1
results["errors"].append({
"index": i,
"id": example.get("core", {}).get("id", "unknown"),
"errors": [f"Validation exception: {str(e)}"]
})
return results
def validate_single(self, example: Dict) -> tuple[bool, List[str]]:
errors = []
for rule in self.validation_rules:
try:
rule_errors = rule(example)
errors.extend(rule_errors)
except Exception as e:
errors.append(f"Rule {rule.__name__} failed: {str(e)}")
return len(errors) == 0, errors
Step 4: Create Processing Tools
# scripts/processing/dataset_manager.py
import json
import pandas as pd
from pathlib import Path
from typing import List, Dict, Optional
class DatasetManager:
def __init__(self, data_dir: Path):
self.data_dir = Path(data_dir)
self.raw_dir = self.data_dir / "raw"
self.processed_dir = self.data_dir / "processed"
self.exports_dir = self.data_dir / "exports"
# Create directories
for dir_path in [self.raw_dir, self.processed_dir, self.exports_dir]:
dir_path.mkdir(parents=True, exist_ok=True)
def add_example(self, example: Dict) -> bool:
"""Add a new example to the dataset"""
try:
# Validate example
validator = ExampleValidator("schemas/example_schema_v1.json")
is_valid, errors = validator.validate_single(example)
if not is_valid:
print(f"Validation failed: {errors}")
return False
# Save to raw directory
example_id = example["core"]["id"]
file_path = self.raw_dir / f"{example_id}.json"
with open(file_path, 'w') as f:
json.dump(example, f, indent=2)
print(f"Added example {example_id}")
return True
except Exception as e:
print(f"Error adding example: {str(e)}")
return False
def process_batch(self, batch_size: int = 1000) -> None:
"""Process raw examples into optimized format"""
raw_files = list(self.raw_dir.glob("*.json"))
for i in range(0, len(raw_files), batch_size):
batch_files = raw_files[i:i + batch_size]
batch_data = []
for file_path in batch_files:
with open(file_path) as f:
example = json.load(f)
batch_data.append(example)
# Convert to DataFrame for efficient processing
df = pd.json_normalize(batch_data)
# Save as Parquet for fast querying
output_path = self.processed_dir / f"batch_{i//batch_size:04d}.parquet"
df.to_parquet(output_path, index=False)
print(f"Processed batch {i//batch_size + 1}/{(len(raw_files) + batch_size - 1)//batch_size}")
def export_dataset(self, format: str, filters: Optional[Dict] = None) -> Path:
"""Export dataset in various formats"""
# Load all processed data
parquet_files = list(self.processed_dir.glob("*.parquet"))
dfs = [pd.read_parquet(f) for f in parquet_files]
combined_df = pd.concat(dfs, ignore_index=True)
# Apply filters if provided
if filters:
for column, value in filters.items():
if column in combined_df.columns:
combined_df = combined_df[combined_df[column] == value]
# Export in requested format
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
if format == "json":
output_path = self.exports_dir / f"dataset_{timestamp}.json"
combined_df.to_json(output_path, orient="records", indent=2)
elif format == "csv":
output_path = self.exports_dir / f"dataset_{timestamp}.csv"
combined_df.to_csv(output_path, index=False)
elif format == "parquet":
output_path = self.exports_dir / f"dataset_{timestamp}.parquet"
combined_df.to_parquet(output_path, index=False)
else:
raise ValueError(f"Unsupported format: {format}")
print(f"Exported {len(combined_df)} examples to {output_path}")
return output_path
Step 5: Set Up Quality Monitoring
# scripts/analysis/quality_monitor.py
import pandas as pd
import numpy as np
from typing import Dict, List
import matplotlib.pyplot as plt
import seaborn as sns
class QualityMonitor:
def __init__(self, dataset_manager: DatasetManager):
self.dataset_manager = dataset_manager
def generate_quality_report(self) -> Dict:
"""Generate comprehensive quality report"""
# Load all data
parquet_files = list(self.dataset_manager.processed_dir.glob("*.parquet"))
dfs = [pd.read_parquet(f) for f in parquet_files]
df = pd.concat(dfs, ignore_index=True)
report = {
"summary": self._generate_summary(df),
"quality_distribution": self._analyze_quality_distribution(df),
"category_breakdown": self._analyze_by_category(df),
"trend_analysis": self._analyze_trends(df),
"outliers": self._detect_outliers(df)
}
return report
def _generate_summary(self, df: pd.DataFrame) -> Dict:
return {
"total_examples": len(df),
"avg_quality": df["quality.overall"].mean(),
"quality_std": df["quality.overall"].std(),
"examples_below_threshold": len(df[df["quality.overall"] < 0.8]),
"unique_categories": df["classification.category"].nunique(),
"date_range": {
"earliest": df["core.created_at"].min(),
"latest": df["core.created_at"].max()
}
}
def _detect_outliers(self, df: pd.DataFrame) -> List[Dict]:
# Use IQR method for outlier detection
Q1 = df["quality.overall"].quantile(0.25)
Q3 = df["quality.overall"].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers = df[
(df["quality.overall"] < lower_bound) |
(df["quality.overall"] > upper_bound)
]
return outliers[["core.id", "quality.overall", "classification.category"]].to_dict("records")
def plot_quality_trends(self, save_path: str = None):
"""Create visualizations of quality trends"""
# Load data
parquet_files = list(self.dataset_manager.processed_dir.glob("*.parquet"))
dfs = [pd.read_parquet(f) for f in parquet_files]
df = pd.concat(dfs, ignore_index=True)
# Convert created_at to datetime
df["created_date"] = pd.to_datetime(df["core.created_at"]).dt.date
# Create subplots
fig, axes = plt.subplots(2, 2, figsize=(15, 12))
# Quality over time
daily_quality = df.groupby("created_date")["quality.overall"].mean()
axes[0, 0].plot(daily_quality.index, daily_quality.values)
axes[0, 0].set_title("Average Quality Over Time")
axes[0, 0].set_ylabel("Quality Score")
# Quality distribution
axes[0, 1].hist(df["quality.overall"], bins=30, alpha=0.7)
axes[0, 1].set_title("Quality Score Distribution")
axes[0, 1].set_xlabel("Quality Score")
axes[0, 1].set_ylabel("Frequency")
# Quality by category
category_quality = df.groupby("classification.category")["quality.overall"].mean().sort_values()
axes[1, 0].barh(range(len(category_quality)), category_quality.values)
axes[1, 0].set_yticks(range(len(category_quality)))
axes[1, 0].set_yticklabels(category_quality.index)
axes[1, 0].set_title("Average Quality by Category")
axes[1, 0].set_xlabel("Quality Score")
# Examples per day
daily_count = df.groupby("created_date").size()
axes[1, 1].bar(daily_count.index, daily_count.values)
axes[1, 1].set_title("Examples Created Per Day")
axes[1, 1].set_ylabel("Number of Examples")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
else:
plt.show()
Step 6: Usage Example
# example_usage.py
from pathlib import Path
from scripts.processing.dataset_manager import DatasetManager
from scripts.analysis.quality_monitor import QualityMonitor
from datetime import datetime
def main():
# Initialize dataset manager
data_dir = Path("data")
dm = DatasetManager(data_dir)
# Create a sample example
sample_example = {
"core": {
"id": "example_001",
"version": "1.0",
"schema_version": "1.0",
"created_at": datetime.utcnow().isoformat(),
"updated_at": datetime.utcnow().isoformat()
},
"content": {
"input": {
"primary": "How do I install Python on Windows?",
"context": "User is new to programming",
"constraints": "Windows 10, beginner-friendly"
},
"output": {
"primary": "1. Go to python.org\n2. Download Python 3.12\n3. Run installer\n4. Check 'Add to PATH'\n5. Click Install",
"reasoning": "Step-by-step approach for beginners",
"confidence": "0.95"
}
},
"classification": {
"domain": "programming",
"category": "installation",
"subcategory": "python",
"difficulty": "beginner",
"tags": ["python", "windows", "installation", "beginner"]
},
"quality": {
"accuracy": 0.95,
"completeness": 0.90,
"clarity": 0.92,
"overall": 0.92
}
}
# Add example to dataset
success = dm.add_example(sample_example)
if success:
print("Example added successfully!")
# Process the data
dm.process_batch()
# Generate quality report
qm = QualityMonitor(dm)
report = qm.generate_quality_report()
print("Quality Report:", report)
# Export dataset
export_path = dm.export_dataset("json")
print(f"Dataset exported to: {export_path}")
if __name__ == "__main__":
main()
Conclusion
Building a 77,000-example dataset taught me that architecture matters more than volume. The right structure enables quality, scalability, and maintainability. The wrong structure creates technical debt that compounds with every example.
Key Takeaways:
- Start with schema versioning - You'll need it sooner than you think
- Automate quality from day one - Manual review doesn't scale
- Plan your storage strategy - JSON is great for prototyping, terrible for production
- Build for querying - How you'll access the data determines the architecture
- Track everything - Lineage, quality, usage - measure what matters
- Evolve systematically - Don't rebuild, migrate incrementally
The architecture I've shared here scaled to 77,000 examples and could easily handle 500,000+. It's production-tested, battle-hardened, and ready for your project.
Your next step: Start with the implementation guide above. Begin simple, but design for scale. Your future self will thank you.
Want the complete codebase and example implementations? This architecture now powers multiple production AI systems and has been validated across industries. The patterns shown here work at any scale.
Related Reading:
- How I Built 77,000 AI Training Examples - The complete story
- Data Factory Tools: Automating 77,000 Examples - The tooling that made it possible
- Quality at Scale: Validating 77,000 Examples - The quality assurance system
<div className="bg-gradient-to-r from-green-900/20 to-blue-900/20 p-6 rounded-lg border border-green-500/20 mt-12"> <h3 className="text-xl font-bold text-green-400 mb-4">📧 Master AI Dataset Architecture</h3> <p className="text-gray-300 mb-4"> Get the complete architecture templates, validation scripts, and monitoring tools used to build the 77,000-example dataset. Plus weekly insights on scaling AI data systems. </p> </div>
Continue Your Local AI Journey
Comments (0)
No comments yet. Be the first to share your thoughts!