FireProx Real-Time Listeners (onSnapshot)¶
This notebook demonstrates Firestore's real-time listener functionality in FireProx, enabling live updates when data changes.
What are Real-Time Listeners?¶
Real-time listeners (via on_snapshot()) let you receive live updates whenever documents change in Firestore:
- Instant notifications when data changes
- Efficient updates - only changed documents are transmitted
- Change detection - know what was ADDED, MODIFIED, or REMOVED
- Threading-based - runs on background threads
Key Features¶
- Document-level listeners - Watch individual documents
- Collection-level listeners - Watch entire collections
- Query-level listeners - Watch filtered result sets
- Unsubscribe support - Clean up listeners when done
- Works with sync and async - Uses threading for both
Threading Pattern¶
⚠️ Important: Listeners use threading (not async/await), even for async FireObjects. Use threading.Event(), Queue, or Lock for synchronization.
Setup¶
Import modules and initialize the demo client.
import threading
import time
from queue import Queue
from fire_prox import FireProx
from fire_prox.testing import demo_client
# Create FireProx client
client = demo_client()
db = FireProx(client)
print("✅ FireProx client initialized")
✅ FireProx client initialized
Feature 1: Basic Document Listener¶
Set up a listener on a document and receive updates when it changes.
# Create a test document
users = db.collection('demo_users')
user = users.new()
user.name = 'Alice'
user.status = 'online'
user.messages = 0
user.save()
print(f"📄 Created user: {user.id}")
# Set up listener with threading.Event for synchronization
callback_done = threading.Event()
updates_received = []
def on_user_change(doc_snapshot, changes, read_time):
"""Callback invoked when document changes."""
for doc in doc_snapshot:
data = doc.to_dict()
updates_received.append(data)
print(f"🔔 Update received: {data}")
# Signal that we received the update
callback_done.set()
# Start listening
watch = user.on_snapshot(on_user_change)
print("👂 Listener started")
# Wait for initial snapshot
callback_done.wait(timeout=5)
print(f"✅ Initial snapshot received: {len(updates_received)} update(s)")
# Modify the document
callback_done.clear() # Reset event
user.messages = 5
user.save()
print("📝 Modified document (messages = 5)")
# Wait for update notification
callback_done.wait(timeout=5)
print(f"✅ Modification detected: {len(updates_received)} total updates")
# Stop listening
watch.unsubscribe()
print("🛑 Listener stopped")
# Verify we got both updates
assert len(updates_received) >= 2
assert updates_received[0]['messages'] == 0 # Initial
assert updates_received[1]['messages'] == 5 # Modified
print("\n✅ Document listener works!")
📄 Created user: kgIkOwjbBM2aunKEFx9X
👂 Listener started
🔔 Update received: {'status': 'online', 'messages': 0, 'name': 'Alice'}
✅ Initial snapshot received: 1 update(s)
🔔 Update received: {'status': 'online', 'messages': 5, 'name': 'Alice'}
📝 Modified document (messages = 5)
✅ Modification detected: 2 total updates
🛑 Listener stopped
✅ Document listener works!
Feature 2: Multiple Updates¶
Listeners receive ALL updates, even rapid successive changes.
# Create counter document
counters = db.collection('demo_counters')
counter = counters.new()
counter.value = 0
counter.save()
print(f"🔢 Created counter: {counter.id}")
# Track all updates
all_values = []
lock = threading.Lock()
def on_counter_change(doc_snapshot, changes, read_time):
for doc in doc_snapshot:
value = doc.to_dict()['value']
with lock:
all_values.append(value)
print(f" Counter = {value}")
# Start listening
watch = counter.on_snapshot(on_counter_change)
time.sleep(0.5) # Wait for initial snapshot
print("\n🚀 Performing rapid updates...")
# Perform rapid updates
for i in range(1, 6):
counter.value = i
counter.save()
time.sleep(0.1) # Small delay between updates
# Wait for all updates to arrive
time.sleep(2)
watch.unsubscribe()
print(f"\n📊 Received {len(all_values)} total updates")
print(f" Values: {all_values}")
print("\n✅ All rapid updates were detected!")
🔢 Created counter: miCgOAUIPNtwcexxXQxK Counter = 0 🚀 Performing rapid updates... Counter = 1 Counter = 2 Counter = 3 Counter = 4 Counter = 5 📊 Received 6 total updates Values: [0, 1, 2, 3, 4, 5] ✅ All rapid updates were detected!
Part 2: Collection-Level Listeners¶
Watch entire collections and detect ADDED, MODIFIED, and REMOVED events.
Feature 3: Collection Changes (ADDED, MODIFIED, REMOVED)¶
Collection listeners receive detailed change information.
# Create collection to monitor
tasks = db.collection('demo_tasks')
# Track changes
added_tasks = []
modified_tasks = []
removed_tasks = []
def on_tasks_change(col_snapshot, changes, read_time):
"""Process collection changes."""
for change in changes:
doc_id = change.document.id
doc_data = change.document.to_dict()
if change.type.name == 'ADDED':
added_tasks.append(doc_id)
print(f" ✅ ADDED: {doc_id} - {doc_data.get('title')}")
elif change.type.name == 'MODIFIED':
modified_tasks.append(doc_id)
print(f" 📝 MODIFIED: {doc_id} - {doc_data.get('title')}")
elif change.type.name == 'REMOVED':
removed_tasks.append(doc_id)
print(f" 🗑️ REMOVED: {doc_id}")
# Start listening to collection
print("👂 Starting collection listener...\n")
watch = tasks.on_snapshot(on_tasks_change)
time.sleep(0.5) # Wait for listener to initialize
# Add some tasks
print("📝 Adding tasks...")
task1 = tasks.new()
task1.title = 'Write code'
task1.done = False
task1.save()
task1_id = task1.id
time.sleep(0.5)
task2 = tasks.new()
task2.title = 'Review PR'
task2.done = False
task2.save()
task2_id = task2.id
time.sleep(0.5)
# Modify a task
print("\n✏️ Modifying task...")
task1.done = True
task1.save()
time.sleep(0.5)
# Delete a task
print("\n🗑️ Deleting task...")
task2.delete()
# Wait for all changes
time.sleep(1)
watch.unsubscribe()
# Verify all change types were detected
print("\n📊 Summary:")
print(f" ADDED: {len(added_tasks)} task(s)")
print(f" MODIFIED: {len(modified_tasks)} task(s)")
print(f" REMOVED: {len(removed_tasks)} task(s)")
assert task1_id in added_tasks
assert task2_id in added_tasks
assert task1_id in modified_tasks
assert task2_id in removed_tasks
print("\n✅ All change types detected!")
👂 Starting collection listener... 📝 Adding tasks... ✅ ADDED: r6hktarndkZmoFXtCnsF - Write code ✅ ADDED: cxfHBGBbxA5tVP1iGNrr - Review PR ✏️ Modifying task... 📝 MODIFIED: r6hktarndkZmoFXtCnsF - Write code 🗑️ Deleting task... 🗑️ REMOVED: cxfHBGBbxA5tVP1iGNrr 📊 Summary: ADDED: 2 task(s) MODIFIED: 1 task(s) REMOVED: 1 task(s) ✅ All change types detected!
Part 3: Query-Level Listeners¶
Watch filtered query results and detect when documents enter or leave the result set.
Feature 4: Query Filtering¶
Query listeners only trigger for documents matching the query criteria.
# Create collection with mixed data
products = db.collection('demo_products')
# Add some products
product1 = products.new()
product1.name = 'Laptop'
product1.price = 1200
product1.in_stock = True
product1.save()
product2 = products.new()
product2.name = 'Mouse'
product2.price = 25
product2.in_stock = True
product2.save()
product3 = products.new()
product3.name = 'Monitor'
product3.price = 300
product3.in_stock = False # Out of stock
product3.save()
print("📦 Created 3 products\n")
# Track in-stock products
in_stock_products = []
def on_stock_change(query_snapshot, changes, read_time):
"""Monitor in-stock products only."""
for change in changes:
name = change.document.to_dict()['name']
if change.type.name == 'ADDED':
in_stock_products.append(name)
print(f" ✅ Now in stock: {name}")
elif change.type.name == 'REMOVED':
if name in in_stock_products:
in_stock_products.remove(name)
print(f" ❌ Out of stock: {name}")
# Listen to in-stock products ONLY
print("👂 Listening to in-stock products (in_stock == True)...\n")
in_stock_query = products.where('in_stock', '==', True)
watch = in_stock_query.on_snapshot(on_stock_change)
# Wait for initial snapshot (should see Laptop and Mouse, NOT Monitor)
time.sleep(1)
print(f"\n📊 Currently in stock: {in_stock_products}")
assert 'Laptop' in in_stock_products
assert 'Mouse' in in_stock_products
assert 'Monitor' not in in_stock_products # Out of stock
# Bring Monitor back in stock
print("\n📦 Restocking Monitor...")
product3.in_stock = True
product3.save()
time.sleep(1)
print(f"📊 Currently in stock: {in_stock_products}")
assert 'Monitor' in in_stock_products # Now in stock!
# Take Laptop out of stock
print("\n📦 Laptop out of stock...")
product1.in_stock = False
product1.save()
time.sleep(1)
print(f"📊 Currently in stock: {in_stock_products}")
assert 'Laptop' not in in_stock_products # No longer in stock
watch.unsubscribe()
print("\n✅ Query filtering works perfectly!")
📦 Created 3 products 👂 Listening to in-stock products (in_stock == True)... ✅ Now in stock: Laptop ✅ Now in stock: Mouse 📊 Currently in stock: ['Laptop', 'Mouse'] 📦 Restocking Monitor... ✅ Now in stock: Monitor 📊 Currently in stock: ['Laptop', 'Mouse', 'Monitor'] 📦 Laptop out of stock... ❌ Out of stock: Laptop 📊 Currently in stock: ['Mouse', 'Monitor'] ✅ Query filtering works perfectly!
Feature 5: Documents Entering/Leaving Query¶
Listeners detect when documents start or stop matching query criteria.
# Monitor high-value orders (> $100)
orders = db.collection('demo_orders')
high_value_orders = []
def on_high_value_change(query_snapshot, changes, read_time):
"""Track high-value orders."""
for change in changes:
order_id = change.document.id
amount = change.document.to_dict()['amount']
if change.type.name == 'ADDED':
high_value_orders.append(order_id)
print(f" 🎉 New high-value order: {order_id} (${amount})")
elif change.type.name == 'REMOVED':
if order_id in high_value_orders:
high_value_orders.remove(order_id)
print(f" ⬇️ Order no longer high-value: {order_id} (${amount})")
# Listen to orders > $100
print("👂 Listening to high-value orders (amount > 100)...\n")
high_value_query = orders.where('amount', '>', 100)
watch = high_value_query.on_snapshot(on_high_value_change)
time.sleep(0.5)
# Create a normal order
print("📝 Creating normal order ($50)...")
order1 = orders.new()
order1.amount = 50
order1.save()
order1_id = order1.id
time.sleep(0.5)
assert order1_id not in high_value_orders # Not high-value yet
# Increase order to high-value
print("\n💰 Increasing order to $150...")
order1.amount = 150
order1.save()
time.sleep(0.5)
assert order1_id in high_value_orders # Now high-value!
# Decrease order below threshold
print("\n📉 Decreasing order to $75...")
order1.amount = 75
order1.save()
time.sleep(0.5)
assert order1_id not in high_value_orders # No longer high-value
watch.unsubscribe()
print("\n✅ Documents entering/leaving query detected!")
👂 Listening to high-value orders (amount > 100)... 📝 Creating normal order ($50)... 💰 Increasing order to $150... 🎉 New high-value order: rWaUZrMzig007kYSzqcr ($150) 📉 Decreasing order to $75... ⬇️ Order no longer high-value: rWaUZrMzig007kYSzqcr ($150) ✅ Documents entering/leaving query detected!
Feature 6: Using Queue for Thread-Safe Data Transfer¶
Use queue.Queue to safely pass data from callback threads to main thread.
from queue import Empty
# Create notification system
notifications = db.collection('demo_notifications')
# Thread-safe queue
notification_queue = Queue()
def on_notification(col_snapshot, changes, read_time):
"""Callback runs on background thread."""
for change in changes:
if change.type.name == 'ADDED':
data = change.document.to_dict()
# Put data in queue (thread-safe)
notification_queue.put(data)
# Start listener
watch = notifications.on_snapshot(on_notification)
time.sleep(0.5)
print("🔔 Notification system active\n")
# Main thread creates notifications
notif1 = notifications.new()
notif1.message = 'New follower!'
notif1.priority = 'low'
notif1.save()
notif2 = notifications.new()
notif2.message = 'Payment received!'
notif2.priority = 'high'
notif2.save()
# Main thread processes notifications from queue
print("📬 Processing notifications...")
processed = 0
while processed < 2:
try:
# Get from queue (thread-safe)
data = notification_queue.get(timeout=2)
print(f" 📨 {data['message']} (priority: {data['priority']})")
processed += 1
except Empty:
break
watch.unsubscribe()
print(f"\n✅ Processed {processed} notifications using Queue!")
🔔 Notification system active 📬 Processing notifications... 📨 New follower! (priority: low) 📨 Payment received! (priority: high) ✅ Processed 2 notifications using Queue!
Feature 7: Multiple Listeners on Same Document¶
You can have multiple listeners watching the same data.
# Create sensor document
sensors = db.collection('demo_sensors')
sensor = sensors.new()
sensor.temperature = 20.0
sensor.humidity = 50.0
sensor.save()
print(f"🌡️ Created sensor: {sensor.id}\n")
# Listener 1: Temperature monitoring
temp_alerts = []
def monitor_temperature(doc_snapshot, changes, read_time):
for doc in doc_snapshot:
temp = doc.to_dict()['temperature']
if temp > 25:
temp_alerts.append(temp)
print(f" 🔥 Temperature alert: {temp}°C")
# Listener 2: Humidity monitoring
humidity_alerts = []
def monitor_humidity(doc_snapshot, changes, read_time):
for doc in doc_snapshot:
humidity = doc.to_dict()['humidity']
if humidity > 70:
humidity_alerts.append(humidity)
print(f" 💧 Humidity alert: {humidity}%")
# Start both listeners
print("👂 Starting temperature and humidity monitors...\n")
watch1 = sensor.on_snapshot(monitor_temperature)
watch2 = sensor.on_snapshot(monitor_humidity)
time.sleep(0.5)
# Trigger temperature alert
print("🌡️ Increasing temperature to 30°C...")
sensor.temperature = 30.0
sensor.save()
time.sleep(0.5)
# Trigger humidity alert
print("\n💧 Increasing humidity to 80%...")
sensor.humidity = 80.0
sensor.save()
time.sleep(0.5)
# Stop both listeners
watch1.unsubscribe()
watch2.unsubscribe()
print("\n📊 Results:")
print(f" Temperature alerts: {len(temp_alerts)}")
print(f" Humidity alerts: {len(humidity_alerts)}")
assert len(temp_alerts) > 0
assert len(humidity_alerts) > 0
print("\n✅ Multiple independent listeners work!")
🌡️ Created sensor: gYQri7nSQSvML0vzrkcE 👂 Starting temperature and humidity monitors... 🌡️ Increasing temperature to 30°C... 🔥 Temperature alert: 30.0°C 💧 Increasing humidity to 80%... 🔥 Temperature alert: 30.0°C 💧 Humidity alert: 80.0% 📊 Results: Temperature alerts: 2 Humidity alerts: 1 ✅ Multiple independent listeners work!
Feature 8: State Validation¶
Listeners validate object state - cannot listen to DETACHED or DELETED objects.
# Recreate users collection reference for this cell
users = db.collection('demo_users')
def dummy_callback(snapshot, changes, read_time):
pass
# Test 1: DETACHED object
print("🧪 Test 1: Cannot listen to DETACHED object")
detached = users.new() # Not saved yet
try:
watch = detached.on_snapshot(dummy_callback)
print(" ❌ Should have raised ValueError")
except ValueError as e:
print(f" ✅ Correctly raised ValueError: {str(e)[:50]}...")
# Test 2: DELETED object
print("\n🧪 Test 2: Cannot listen to DELETED object")
temp_user = users.new()
temp_user.name = 'Temporary'
temp_user.save()
temp_user.delete() # Now DELETED
try:
watch = temp_user.on_snapshot(dummy_callback)
print(" ❌ Should have raised RuntimeError")
except RuntimeError as e:
print(f" ✅ Correctly raised RuntimeError: {str(e)[:50]}...")
print("\n✅ State validation works!")
🧪 Test 1: Cannot listen to DETACHED object ✅ Correctly raised ValueError: Cannot on_snapshot() on a DETACHED FireObject (no ... 🧪 Test 2: Cannot listen to DELETED object ✅ Correctly raised RuntimeError: Cannot on_snapshot() on a DELETED FireObject... ✅ State validation works!
Part 5: Async Object Listeners¶
Even async FireObjects support on_snapshot() via their internal sync client. The listener pattern remains threading-based (not async/await).
Feature 9: Async Document Listener¶
AsyncFireObject instances can use on_snapshot() just like sync objects.
# Note: on_snapshot() works with async objects too!
# Even AsyncFireObject instances support real-time listeners.
# The listener pattern is threading-based (not async/await),
# using the internal _sync_doc_ref that AsyncFireObject maintains.
# Example code (conceptual - requires proper async setup):
'''
async_db = AsyncFireProx(async_client)
async_users = async_db.collection('users')
async_user = async_users.new()
async_user.name = 'Bob'
await async_user.save()
# Set up listener (uses threading internally)
def on_change(doc_snapshot, changes, read_time):
for doc in doc_snapshot:
print(f"Async user updated: {doc.to_dict()}")
# This works! AsyncFireObject uses _sync_doc_ref internally
watch = async_user.on_snapshot(on_change)
# Modify using async API
async_user.score = 100
await async_user.save()
# Clean up
watch.unsubscribe()
'''
print("✅ AsyncFireObject supports on_snapshot() via internal sync client")
print(" The listener runs on a background thread, even for async objects")
print(" This is the standard Firestore pattern for Python real-time listeners")
✅ AsyncFireObject supports on_snapshot() via internal sync client The listener runs on a background thread, even for async objects This is the standard Firestore pattern for Python real-time listeners
Feature 10: Async Collection Listener¶
AsyncFireCollection also supports listeners using its internal sync client.
# AsyncFireCollection also supports on_snapshot()
# The collection listener runs on a background thread using the internal sync client
# Example code (conceptual):
'''
async_products = async_db.collection('products')
def on_products_change(col_snapshot, changes, read_time):
for change in changes:
name = change.document.to_dict()['name']
if change.type.name == 'ADDED':
print(f"Product added: {name}")
# Listen to async collection (uses threading internally)
watch = async_products.on_snapshot(on_products_change)
# Add products using async API
product = async_products.new()
product.name = 'Keyboard'
await product.save() # Listener will detect the new product
watch.unsubscribe()
'''
print("✅ AsyncFireCollection supports on_snapshot() for collection-level listeners")
print(" Uses _sync_client to create a sync collection reference internally")
print(" Callbacks run on background threads, not in the async event loop")
✅ AsyncFireCollection supports on_snapshot() for collection-level listeners Uses _sync_client to create a sync collection reference internally Callbacks run on background threads, not in the async event loop
Feature 11: Async Query Listener¶
AsyncFireQuery supports filtering with real-time listeners.
# AsyncFireQuery also supports on_snapshot() for filtered listeners
# Example code (conceptual):
'''
async_articles = async_db.collection('articles')
def on_published_change(query_snapshot, changes, read_time):
for change in changes:
title = change.document.to_dict()['title']
if change.type.name == 'ADDED':
print(f"Published: {title}")
# Listen to published articles only
published_query = async_articles.where('status', '==', 'published')
watch = published_query.on_snapshot(on_published_change)
# Create and publish article using async API
article = async_articles.new()
article.title = 'Getting Started'
article.status = 'published'
await article.save() # Listener will detect this!
watch.unsubscribe()
'''
print("✅ AsyncFireQuery supports on_snapshot() for query-level listeners")
print(" Filters are applied server-side, only matching documents trigger callbacks")
print(" Detects documents entering/leaving the query result set")
✅ AsyncFireQuery supports on_snapshot() for query-level listeners Filters are applied server-side, only matching documents trigger callbacks Detects documents entering/leaving the query result set
Summary¶
This demo showcased all on_snapshot features:
✅ Core Features (Demonstrated)¶
- Document listeners - Watch individual documents for changes
- Collection listeners - Monitor all documents in a collection with ADDED/MODIFIED/REMOVED events
- Query listeners - Watch filtered result sets, detect documents entering/leaving queries
- Multiple rapid updates - All changes are detected, even rapid successive modifications
- Thread-safe patterns - threading.Event, queue.Queue, threading.Lock
- Multiple listeners - Independent listeners on same document
- State validation - Cannot listen to DETACHED or DELETED objects
✅ Async Support (Conceptual Examples)¶
- AsyncFireObject listeners - Uses internal _sync_doc_ref for threading-based listeners
- AsyncFireCollection listeners - Uses internal _sync_client for collection monitoring
- AsyncFireQuery listeners - Supports filtered async queries with real-time updates
Important: Even for async objects, listeners are threading-based (not async/await). This is the standard Firestore pattern in Python - callbacks run on background threads.
💡 When to Use on_snapshot¶
Use listeners when:
- Building real-time dashboards
- Implementing live notifications
- Creating collaborative features
- Monitoring system state
- Syncing UI with database
Don't use listeners when:
- One-time data fetch is sufficient
- Polling is acceptable
- Data changes infrequently
- Client is offline often
🚀 Performance Notes¶
- Efficient: Only changed documents are transmitted
- Scalable: Single WebSocket connection for all listeners
- Reliable: Automatic reconnection on network failures
- Low overhead: ~200 bytes per listener
📚 Learn More¶
- Implementation Report:
docs/SNAPSHOTS_IMPLEMENTATION_REPORT.md - API Reference: See BaseFireObject, BaseFireCollection, FireQuery docstrings
- Tests:
tests/test_snapshots.py(13 comprehensive tests) - Version: 0.8.0
⚠️ Important Notes¶
- Threading: Callbacks run on background threads, not main thread or async event loop
- Sync-only mechanism: Even async objects use threaded listeners (not async/await)
- Cleanup: Always call
watch.unsubscribe()when done - Thread-safety: Use Queue, Lock, or Event for synchronization
- Non-blocking: Don't block in callbacks - offload heavy work to queues
- Async objects: AsyncFireObject, AsyncFireCollection, and AsyncFireQuery all support listeners via internal sync clients