Building Event-Driven Microservices with Pub/Sub: A Production Architecture
Recently, I built a production backend that processes complex workflows using event-driven microservices. The goal was simple: build a system that's reliable, cost-efficient, and can handle unpredictable load patterns. Here's what I learned building a fully event-driven architecture with Google Cloud.
The Architecture Challenge
The system needed to:
- Process multi-step workflows that could take minutes to hours
- Scale from zero to hundreds of concurrent operations
- Maintain complete audit trails for compliance
- Keep costs minimal during idle periods
- Handle failures gracefully without data loss
Traditional monolithic or REST-based microservices wouldn't cut it. I needed true event-driven architecture.
Core Architecture: Pub/Sub as the Backbone
The entire system communicates through Google Cloud Pub/Sub. No service talks directly to another. Every interaction is a message on a topic.
The Pub/Sub Client
I built a simple publisher that every service uses:
class EventPublisher:
def __init__(self):
self.project_id = "project-id"
self.publisher = pubsub_v1.PublisherClient()
def publish(self, topic_name, message_data):
topic_path = self.publisher.topic_path(self.project_id, topic_name)
message_data['timestamp'] = datetime.utcnow().isoformat()
encoded = json.dumps(message_data).encode('utf-8')
future = self.publisher.publish(topic_path, encoded)
return future.result()
Simple. Every service imports this, publishes messages, and moves on.
17 Topics, 6 Services, Zero Direct Calls
The system has 17 Pub/Sub topics organizing the entire workflow:
Onboarding Flow:
workflow.initiate- Triggers case creationdata.fetch- Initiates data retrieval
Processing Flow:
data.analyze- Starts AI analysisdocument.generate- Creates documentsdocument.submit- Sends to external systems
Monitoring Flow:
workflow.track- Monitors progressresponse.received- Handles responsesworkflow.followup- Schedules follow-ups
Admin Flow:
workflow.complete- Marks completionworkflow.escalate- Handles escalationscompliance.review- Compliance checks
Communication:
notify.client- Client notificationsnotify.admin- Admin alerts
Infrastructure:
dead.letter- Failed message handlingaudit.log- Complete audit trailretry.failed- Retry mechanism
Why So Many Topics?
Each topic represents a single responsibility. When you see document.generate in the logs, you know exactly what's happening. No guessing.
Microservices That Scale to Zero
All services run on Google Cloud Run with minInstances=0. They literally shut down when not processing.
Service Structure
Every service follows the same pattern:
@app.post("/pubsub-push")
async def handle_push(request: Request):
envelope = await request.json()
message_data = envelope['message'].get('data', '')
decoded = base64.b64decode(message_data).decode('utf-8')
message = json.loads(decoded)
result = await process_message(message)
await log_event(message, result)
await publish_next_step(result)
return {"status": "processed"}
Push, not pull. Pub/Sub wakes the service, delivers the message, and the service shuts down after responding.
The Cost Benefit
Traditional always-on services: ~$200/month (6 services × $35/month)
Scale-to-zero services: ~$1.40/month for 100 operations
Cold starts? 1-3 seconds. For async workflows, it's perfectly acceptable.
Multi-Agent AI System
The system uses 4 specialized AI agents built with an agent framework, all powered by modern LLMs.
Agent Architecture
# Specialized agent for processing
processor_agent = Agent(
model="llm-model",
name="processor_agent",
instruction="You process data and identify patterns...",
tools=[
FunctionTool(func=fetch_data),
FunctionTool(func=analyze_patterns),
FunctionTool(func=generate_output)
]
)
Why 4 Agents Instead of 1?
Each agent has domain expertise:
- Intake Agent - Client qualification, validation, routing
- Processor Agent - Data processing, pattern recognition, output generation
- Communication Agent - Email, SMS, notifications, authentication
- Admin Agent - Dashboards, compliance, escalations
Benefits:
- Smaller context windows - Agents only need relevant instructions
- Lower token costs - No wasted tokens on irrelevant context
- Better accuracy - Specialized agents make fewer mistakes
- Independent updates - Change one agent without affecting others
Agent Tool Pattern
Agents don't call services directly. They call tools, which publish Pub/Sub messages:
def submit_tool(case_id: str, data: dict):
"""Submit processed data"""
message = {
'caseId': case_id,
'action': 'submit_data',
'payload': data,
'timestamp': datetime.utcnow().isoformat()
}
pubsub_client.publish('document.submit', message)
return {
'status': 'submitted',
'message_id': pubsub_client.last_message_id
}
The agent gets immediate feedback, but the actual work happens asynchronously.
State Management: Firestore
All state lives in Firestore. Services are completely stateless.
Document Structure
{
"id": "WF-20250129-1234",
"status": "processing",
"currentPhase": 4,
"clientId": "client_456",
"createdAt": "2025-01-29T10:30:00Z",
"updatedAt": "2025-01-29T11:45:00Z",
"intake": {
"validated": true,
"documentsReceived": true,
"approved": true
},
"processing": {
"startedAt": "2025-01-29T10:35:00Z",
"completedAt": "2025-01-29T11:20:00Z",
"itemsProcessed": 12,
"outputsGenerated": 8
}
}
Event Log Collection
Every message gets logged:
def log_event(case_id, from_agent, to_agent, topic, message_id, payload):
firestore_client.collection('event_log').add({
'caseId': case_id,
'fromAgent': from_agent,
'toAgent': to_agent,
'topic': topic,
'messageId': message_id,
'payload': payload,
'timestamp': firestore.SERVER_TIMESTAMP,
'status': 'published'
})
This gives us:
- Complete audit trail - Every state transition tracked
- Debugging - Trace any case through the entire system
- Compliance - Proof of every action taken
- Analytics - Performance metrics per agent/service
Data Flow: A Complete Journey
Let's trace a workflow from creation to completion:
1. Initiation
Frontend → POST /api/workflows/create
→ Firestore: Create document
→ Cloud Function: on_created
→ Pub/Sub: workflow.initiate
2. Intake Service Wakes
Pub/Sub push → Intake Service
→ Validates: payment, documents, consent
→ Updates status
→ Pub/Sub: data.fetch
→ Service scales to zero
3. Data Service Wakes
Pub/Sub push → Data Service
→ Retrieves data from external API
→ Stores in Cloud Storage
→ Pub/Sub: data.analyze
→ Service scales to zero
4. AI Processing
Pub/Sub push → Processing Service
→ Loads data from Cloud Storage
→ Agent processes data
→ Identifies patterns
→ Stores results in Firestore
→ Pub/Sub: document.generate
→ Service scales to zero
5. Document Generation
Pub/Sub push → Processing Service
→ Agent generates documents
→ Stores PDFs in Cloud Storage
→ Pub/Sub: document.submit
→ Service scales to zero
6. Submission
Pub/Sub push → Processing Service
→ Submits to external systems
→ Pub/Sub: workflow.track
→ Service scales to zero
7. Tracking & Notifications
Pub/Sub push → Tracking Service
→ Sets up monitoring
→ Pub/Sub: notify.client
Pub/Sub push → Communication Service
→ Sends email + SMS
→ Service scales to zero
Total Time: 2-5 minutes
Services Active: Only during processing
Cost per Operation: ~$0.01-0.02
Messages Exchanged: 8-12
Event Log Entries: 8-12
Handling Failures: Dead Letter Queues
Services fail. Networks timeout. External APIs go down. The system handles it.
Pub/Sub Retry Configuration
Every subscription has retry settings:
subscription:
ackDeadlineSeconds: 600
retryPolicy:
minimumBackoff: 10s
maximumBackoff: 600s
deadLetterPolicy:
deadLetterTopic: dead.letter
maxDeliveryAttempts: 5
If a message fails 5 times, it goes to the dead letter topic.
Dead Letter Handler Service
@app.post("/pubsub-push")
async def handle_dead_letter(request: Request):
message = decode_message(request)
firestore_client.collection('failed_messages').add({
'originalTopic': message.get('originalTopic'),
'workflowId': message.get('workflowId'),
'error': message.get('error'),
'attemptCount': message.get('attemptCount'),
'failedAt': firestore.SERVER_TIMESTAMP,
'payload': message
})
await notify_admin(f"Failed message: {message.get('workflowId')}")
return {"status": "logged"}
Admins can manually retry from the dashboard or trigger retry.failed.
Observability: Knowing What's Happening
The event log gives us everything:
Query: "What happened to workflow WF-123?"
events = (
firestore_client
.collection('event_log')
.where('workflowId', '==', 'WF-123')
.order_by('timestamp')
.get()
)
for event in events:
print(f"{event['timestamp']}: {event['fromAgent']} → {event['toAgent']}")
print(f" Topic: {event['topic']}")
print(f" Status: {event['status']}")
Output:
2025-01-29 10:30:15: frontend → intake
Topic: workflow.initiate
Status: published
2025-01-29 10:31:03: intake → processing
Topic: data.fetch
Status: published
2025-01-29 10:35:47: processing → processing
Topic: data.analyze
Status: published
Every step. Every transition. Complete visibility.
Key Architectural Decisions
1. Event-Driven Over REST
Decision: Use Pub/Sub for all inter-service communication
Result: Complete decoupling, independent scaling, automatic retries
2. Scale to Zero
Decision: All services with minInstances=0
Result: ~99% cost reduction ($200 → $1.40/month)
3. Multi-Agent System
Decision: 4 specialized agents vs 1 general agent
Result: Better accuracy, lower token costs, easier maintenance
4. Firestore as State Store
Decision: Firestore for all state management
Result: Real-time updates, scalable, multi-tenant ready
5. Stateless Services
Decision: Services have no in-memory state
Result: Scale-to-zero possible, crash-resistant
6. Push Subscriptions
Decision: Pub/Sub push (not pull)
Result: Services wake on-demand, no polling overhead
Lessons Learned
What Worked
Event-driven is liberating. Services don't care who calls them or what happens next. They process a message, publish the next step, and shut down.
Scale-to-zero is amazing. The system handles 100 operations/month for $1.40. With always-on services, it would be $200+.
Specialized agents are better. One general agent tried to do too much. Four specialized agents are more accurate and cheaper.
Complete audit trail matters. When something breaks (and it will), the event log is gold. You can trace exactly what happened.
What Was Hard
Cold starts are real. 1-3 seconds feels slow coming from always-on services. You adjust your expectations for async workflows.
Message ordering is tricky. Pub/Sub doesn't guarantee order. Design your system to be idempotent and handle out-of-order messages.
Dead letter queues are critical. Don't skip them. External APIs fail, networks timeout, services crash. Plan for failure.
Local development is complex. Running 6 services + Pub/Sub emulator + Firestore emulator locally is painful. I use Cloud Run jobs for staging.
Performance Stats
Production Numbers (100 operations/month):
- Average processing time: 2-5 minutes per operation
- Cost per operation: $0.01-0.02
- Total monthly cost: $1.40
- Uptime: 99.9%
- Failed messages: 0.2% (handled by dead letter queue)
- P95 latency: 3.2 seconds (including cold starts)
Without Scale-to-Zero:
- Cost: $210/month (6 services × $35/month)
- 150x more expensive
The Bottom Line
Event-driven microservices with Pub/Sub and scale-to-zero gave me:
- ✅ 99% cost reduction - $1.40 vs $200/month
- ✅ Complete decoupling - Services don't know about each other
- ✅ Automatic retries - Failures handled gracefully
- ✅ Full audit trail - Every action logged
- ✅ Multi-agent AI - Specialized agents for better results
Is it more complex than a monolith? Yes.
Is it worth it? Absolutely.
When you need reliable, cost-efficient, scalable async workflows, event-driven microservices with Pub/Sub and scale-to-zero is hard to beat.
In the next post, I'll cover the frontend architecture: real-time updates, optimistic UI, and state management for an event-driven backend.