Claude API Integration Patterns: Architecture for Production SaaS
Production-ready patterns for integrating Claude into Rails, Next.js, and Node apps. Error handling, streaming, and cost control.
The challenge
You've decided to use Claude. Now: how do you actually integrate it into your production app?
Most teams make mistakes:
- Call Claude synchronously (blocks requests)
- No error handling (crashes on API errors)
- Unbounded token usage (destroys margins)
- No observability (can't debug failures)
This guide shows the patterns we use at Techcologic.
Pattern 1: Simple synchronous (for prototypes only)
from anthropic import Anthropic
def summarize_document(text):
client = Anthropic()
response = client.messages.create(
model="claude-opus",
max_tokens=500,
messages=[{"role": "user", "content": text}]
)
return response.content[0].text- Pros: Simple, works immediately
- Cons: Blocks the request, no error handling, unbounded costs
- Use when: Prototyping only
Pattern 2: Async with error handling (recommended)
import anthropic
from tenacity import retry, stop_after_attempt, wait_exponential
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def call_claude_safe(messages, model="claude-sonnet"):
"""Call Claude with retries and error handling"""
try:
client = anthropic.Anthropic()
response = client.messages.create(
model=model,
max_tokens=1000,
messages=messages
)
return {"success": True, "response": response.content[0].text}
except anthropic.RateLimitError:
# Rate limited — retry with backoff
raise
except anthropic.APIError as e:
# API error — log and fail gracefully
log_error(f"Claude API error: {e}")
return {"success": False, "error": "Service temporarily unavailable"}- Pros: Handles failures, retries automatically, safe for production
- Cons: Slightly more code
- Use when: Production systems
Pattern 3: Streaming for better UX
def stream_response(messages):
"""Stream Claude's response to the client in real-time"""
client = anthropic.Anthropic()
with client.messages.stream(
model="claude-sonnet",
max_tokens=1000,
messages=messages
) as stream:
for text in stream.text_stream:
# Send each token to the client immediately
yield text
# Track token usage for cost monitoring
track_token(text)Flask example:
@app.route("/stream", methods=["POST"])
def stream():
messages = request.json["messages"]
return Response(
stream_response(messages),
mimetype="text/event-stream"
)- Pros: 4–5× faster perceived latency, better UX
- Cons: Client must handle streaming
- Use when: Chat interfaces, long-form generation
Pattern 4: Background jobs (for heavy processing)
# In your SaaS app:
from celery import shared_task
@shared_task
def analyze_document(document_id):
"""Process a document with Claude in the background"""
doc = Document.get(document_id)
# Call Claude
response = call_claude_safe(
messages=[{"role": "user", "content": doc.text}]
)
# Store result
doc.analysis = response
doc.save()
# Notify user
notify_user(document_id, "Analysis complete")In your web request:
@app.route("/analyze", methods=["POST"])
def analyze():
doc_id = request.json["document_id"]
# Queue the job, return immediately
analyze_document.delay(doc_id)
return {"status": "queued"}- Pros: Non-blocking, handles slow Claude calls gracefully
- Cons: Requires a job queue (Celery, Redis)
- Use when: Reports, batch processing, anything >5 seconds
Pattern 5: Cost-controlled wrapper
class ClaudeAPI:
def __init__(self, monthly_budget=1000):
self.monthly_budget = monthly_budget
self.spent = 0
self.requests = 0
def call(self, messages, model="claude-sonnet"):
# Estimate cost
input_cost = len(str(messages)) * 0.003 / 1000
if self.spent + input_cost > self.monthly_budget:
raise BudgetExceededError(
f"Budget exceeded: ${self.spent} / ${self.monthly_budget}"
)
# Call Claude
response = call_claude_safe(messages, model)
# Track spending
actual_cost = (
response.usage.input_tokens * 0.003 / 1_000_000 +
response.usage.output_tokens * 0.006 / 1_000_000
)
self.spent += actual_cost
self.requests += 1
return responseUse this if: you're bootstrapped or have cost constraints.
Pattern 6: Multi-model smart routing
def call_claude_smart(messages, complexity="medium"):
"""Route to the right model based on task complexity"""
model_choice = {
"simple": "claude-haiku", # $0.25/M
"medium": "claude-sonnet", # $3/M
"complex": "claude-opus" # $15/M
}[complexity]
return call_claude_safe(messages, model_choice)Usage:
# Simple classification
summary = call_claude_smart(
[{"role": "user", "content": text}],
complexity="simple" # Uses Haiku
)
# Complex reasoning
plan = call_claude_smart(
[{"role": "user", "content": requirements}],
complexity="complex" # Uses Opus
)Impact: 80% cost reduction.
Pattern 7: Observability & monitoring
import logging
from datetime import datetime
class ClaudeLogger:
def log_request(self, messages, model, start_time):
duration = time.time() - start_time
log_entry = {
"timestamp": datetime.now().isoformat(),
"model": model,
"input_tokens": len(str(messages)),
"output_tokens": 0, # Will update after response
"duration_ms": duration * 1000,
"cost": 0, # Will calculate after
"status": "pending"
}
self.db.insert("claude_logs", log_entry)
return log_entry
def log_response(self, log_id, response):
actual_cost = (
response.usage.input_tokens * 0.003 / 1_000_000 +
response.usage.output_tokens * 0.006 / 1_000_000
)
self.db.update("claude_logs", log_id, {
"output_tokens": response.usage.output_tokens,
"cost": actual_cost,
"status": "success"
})Monitor:
- Total tokens per day
- Cost per request
- Error rate
- Average latency
- Model distribution
Complete example: Rails integration
# app/services/claude_service.rb
class ClaudeService
def self.analyze(text)
client = Anthropic::Client.new(api_key: ENV['ANTHROPIC_API_KEY'])
response = client.messages.create(
model: "claude-sonnet",
max_tokens: 1000,
messages: [
{ role: "user", content: text }
]
)
ClaudeLog.create(
tokens: response.usage.input_tokens + response.usage.output_tokens,
cost: calculate_cost(response)
)
response.content[0].text
rescue Anthropic::ApiError => e
Rails.logger.error("Claude API error: #{e}")
raise
end
end
# app/controllers/analyses_controller.rb
class AnalysesController < ApplicationController
def create
@analysis = Analysis.create(
result: ClaudeService.analyze(params[:text])
)
render json: @analysis
end
endBest practices checklist
- All Claude calls have error handling
- Implement exponential backoff for retries
- Log every request for debugging
- Monitor token usage per endpoint
- Set monthly budget limits
- Use the right model for the task (don't use Opus everywhere)
- Stream responses for better UX
- Use background jobs for long tasks (>5s)
- Test failover scenarios
The takeaway
Production Claude integration isn't just calling the API. It requires:
- Error handling (retries, fallbacks)
- Cost control (right model, monitoring)
- Performance (streaming, async, jobs)
- Observability (logging everything)
Start with Pattern 2 (async with error handling) for 80% of cases.
Need help architecting Claude into your SaaS? Book an architecture call to review your design. We've shipped production systems with Claude — we'll make sure your integration is reliable, cost-effective, and performant.