Multi-Tenant Application Security Cheat Sheet¶
Introduction¶
Multi-tenant applications serve multiple customers (tenants) from a shared infrastructure, codebase, and often shared databases. This architecture is the foundation of modern SaaS platforms, offering cost efficiency and simplified operations.
However, multi-tenancy introduces critical security challenges: a single vulnerability can expose all tenants' data, misconfigurations can leak data across tenant boundaries, and resource contention can impact availability.
This cheat sheet provides best practices to secure multi-tenant applications, ensure tenant isolation, and prevent cross-tenant attacks.
Key Risks¶
- Cross-Tenant Data Leakage: Bugs or misconfigurations exposing one tenant's data to another.
- Tenant Impersonation: Attackers gaining access to another tenant's context or resources.
- Broken Tenant Isolation: Insufficient separation at database, cache, storage, or compute layers.
- Insecure Direct Object References (IDOR): Accessing resources by manipulating tenant/resource IDs.
- Noisy Neighbor Attacks: One tenant exhausting shared resources, impacting others (DoS).
- Privilege Escalation Across Tenants: Exploiting admin functions to access other tenants.
- Tenant Context Injection: Manipulating tenant identifiers in requests, tokens, or headers.
- Shared Resource Poisoning: Cache poisoning, queue injection, or storage pollution affecting other tenants.
- Insecure Tenant Onboarding/Offboarding: Incomplete provisioning or data retention after deletion.
- Audit & Compliance Gaps: Insufficient tenant-specific logging for regulatory requirements.
Best Practices¶
1. Tenant Identification & Context Management¶
- Establish tenant context early in the request lifecycle (middleware/interceptor).
- Use cryptographically secure, non-guessable tenant identifiers.
- Never trust client-supplied tenant IDs without validation.
- Bind tenant context to the authenticated user session.
- Propagate tenant context securely through all application layers.
Bad example: Trusting client-supplied tenant ID
# Dangerous: Tenant ID from request header without validation/query parameterization
def get_tenant_data(request):
tenant_id = request.headers.get("X-Tenant-ID") # Attacker can modify!
return db.execute("SELECT * FROM data WHERE tenant_id = :tid", {"tid": tenant_id})
Good example: Deriving tenant from authenticated session
from functools import wraps
from contextvars import ContextVar
from typing import Optional
# Thread-safe tenant context
current_tenant: ContextVar[Optional[str]] = ContextVar('current_tenant', default=None)
class TenantContext:
def __init__(self, tenant_id: str, user_id: str, roles: list):
self.tenant_id = tenant_id
self.user_id = user_id
self.roles = roles
self.is_validated = True
class TenantMiddleware:
"""Extract and validate tenant context from authenticated session."""
async def __call__(self, request, call_next):
# Get tenant from verified JWT claims - NOT from headers
token_claims = request.state.verified_claims # Set by auth middleware
if not token_claims or "tenant_id" not in token_claims:
return JSONResponse(status_code=401, content={"error": "Missing tenant context"})
tenant_id = token_claims["tenant_id"]
# Validate tenant exists and is active
tenant = await self.tenant_service.get_active_tenant(tenant_id)
if not tenant:
return JSONResponse(status_code=403, content={"error": "Invalid tenant"})
# Set tenant context for this request
ctx = TenantContext(
tenant_id=tenant_id,
user_id=token_claims["sub"],
roles=token_claims.get("roles", [])
)
token = current_tenant.set(ctx)
try:
response = await call_next(request)
return response
finally:
current_tenant.reset(token)
def require_tenant(func):
"""Decorator ensuring tenant context is present."""
@wraps(func)
async def wrapper(*args, **kwargs):
ctx = current_tenant.get()
if not ctx or not ctx.is_validated:
raise SecurityException("Tenant context required")
return await func(*args, **kwargs)
return wrapper
2. Database Isolation Strategies¶
Choose an isolation strategy based on security requirements, compliance needs, and operational complexity:
| Strategy | Isolation Level | Use Case |
|---|---|---|
| Separate Databases | Highest | Regulated industries, enterprise clients |
| Separate Schemas | High | Balance of isolation and manageability |
| Shared Tables (Row-Level) | Medium | Cost-sensitive, high tenant count |
| Hybrid | Variable | Different tiers for different customers |
Row-Level Security Implementation (PostgreSQL)
-- Enable RLS on tenant tables
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
ALTER TABLE customers ENABLE ROW LEVEL SECURITY;
-- Create policy that restricts access to current tenant
CREATE POLICY tenant_isolation_policy ON orders
FOR ALL
USING (tenant_id = current_setting('app.current_tenant')::uuid);
CREATE POLICY tenant_isolation_policy ON customers
FOR ALL
USING (tenant_id = current_setting('app.current_tenant')::uuid);
-- Force RLS for table owners too (important!)
ALTER TABLE orders FORCE ROW LEVEL SECURITY;
ALTER TABLE customers FORCE ROW LEVEL SECURITY;
Application-Level Enforcement (Python/SQLAlchemy)
from sqlalchemy import event, Column, String
from sqlalchemy.orm import Session, Query
from sqlalchemy.ext.declarative import declared_attr
from contextlib import contextmanager
class TenantMixin:
"""Mixin that adds tenant_id to all models."""
@declared_attr
def tenant_id(cls):
return Column(String(36), nullable=False, index=True)
class TenantAwareSession(Session):
"""Session that automatically filters by tenant."""
def __init__(self, *args, tenant_id: str = None, **kwargs):
super().__init__(*args, **kwargs)
self._tenant_id = tenant_id
@property
def tenant_id(self):
if not self._tenant_id:
raise SecurityException("Tenant ID not set on session")
return self._tenant_id
# Automatically add tenant filter to all queries
@event.listens_for(Query, "before_compile", retval=True)
def add_tenant_filter(query):
tenant_id = current_tenant.get()
if not tenant_id:
raise SecurityException("No tenant context for query")
for desc in query.column_descriptions:
entity = desc.get('entity')
if entity and hasattr(entity, 'tenant_id'):
query = query.filter(entity.tenant_id == tenant_id.tenant_id)
return query
# Automatically set tenant_id on insert
@event.listens_for(TenantMixin, "before_insert", propagate=True)
def set_tenant_on_insert(mapper, connection, target):
ctx = current_tenant.get()
if not ctx:
raise SecurityException("Cannot insert without tenant context")
target.tenant_id = ctx.tenant_id
# Secure session factory
@contextmanager
def tenant_session(tenant_id: str):
"""Create a tenant-scoped database session."""
session = TenantAwareSession(bind=engine, tenant_id=tenant_id)
# Set PostgreSQL RLS context
session.execute(f"SELECT set_config('app.current_tenant', :tenant_id, true);")
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
3. Preventing Cross-Tenant Data Access (IDOR Prevention)¶
- Always validate that requested resources belong to the current tenant.
- Use composite keys (tenant_id + resource_id) for all lookups.
- Implement authorization checks at the data access layer, not just API layer.
- Avoid exposing sequential or guessable IDs.
Bad example: Direct object reference without tenant validation
# Dangerous: Only checks resource_id, not tenant ownership
@app.get("/api/documents/{document_id}")
async def get_document(document_id: str):
doc = db.query(Document).filter(Document.id == document_id).first()
if not doc:
raise HTTPException(404)
return doc # Could return another tenant's document!
Good example: Tenant-scoped resource access
from uuid import UUID
from typing import TypeVar, Generic, Type
T = TypeVar('T')
class TenantScopedRepository(Generic[T]):
"""Repository that enforces tenant isolation on all operations."""
def __init__(self, model: Type[T], session: Session):
self.model = model
self.session = session
@property
def tenant_id(self) -> str:
ctx = current_tenant.get()
if not ctx:
raise SecurityException("Tenant context required")
return ctx.tenant_id
def get_by_id(self, resource_id: UUID) -> Optional[T]:
"""Get resource only if it belongs to current tenant."""
return self.session.query(self.model).filter(
self.model.id == resource_id,
self.model.tenant_id == self.tenant_id # Always include tenant check
).first()
def list_all(self, limit: int = 100, offset: int = 0) -> list[T]:
"""List resources for current tenant only."""
return self.session.query(self.model).filter(
self.model.tenant_id == self.tenant_id
).limit(limit).offset(offset).all()
def create(self, **kwargs) -> T:
"""Create resource with tenant_id automatically set."""
if 'tenant_id' in kwargs and kwargs['tenant_id'] != self.tenant_id:
raise SecurityException("Cannot create resource for different tenant")
kwargs['tenant_id'] = self.tenant_id
instance = self.model(**kwargs)
self.session.add(instance)
return instance
def delete(self, resource_id: UUID) -> bool:
"""Delete resource only if it belongs to current tenant."""
result = self.session.query(self.model).filter(
self.model.id == resource_id,
self.model.tenant_id == self.tenant_id
).delete()
return result > 0
# Usage
@app.get("/api/documents/{document_id}")
@require_tenant
async def get_document(document_id: UUID, db: Session = Depends(get_db)):
repo = TenantScopedRepository(Document, db)
doc = repo.get_by_id(document_id)
if not doc:
raise HTTPException(404, "Document not found") # Don't reveal if it exists for other tenant
return doc
4. Cache & Session Isolation¶
- Prefix all cache keys with tenant identifier.
- Use separate cache namespaces or instances for sensitive tenants.
- Implement cache key validation to prevent injection.
- Set appropriate TTLs and validate tenant on cache retrieval.
Bad example: Shared cache without tenant isolation
# Dangerous: Cache key collision between tenants
def get_user_preferences(user_id: str):
cache_key = f"preferences:{user_id}" # Same key for different tenants!
cached = redis.get(cache_key)
if cached:
return json.loads(cached)
# ...
Good example: Tenant-isolated caching
import hashlib
import json
from typing import Optional, Any
from functools import wraps
class TenantAwareCache:
"""Cache implementation with tenant isolation."""
def __init__(self, redis_client):
self.redis = redis_client
self.default_ttl = 3600
def _build_key(self, tenant_id: str, key: str) -> str:
"""Build tenant-scoped cache key."""
# Validate key format to prevent injection
if not key or any(c in key for c in ['{', '}', '\n', '\r']):
raise ValueError("Invalid cache key format")
# Use hash of tenant_id to prevent key enumeration
tenant_hash = hashlib.sha256(tenant_id.encode()).hexdigest()[:16]
return f"t:{tenant_hash}:{key}"
def get(self, key: str, tenant_id: str = None) -> Optional[Any]:
"""Get cached value for current tenant."""
tenant_id = tenant_id or current_tenant.get().tenant_id
full_key = self._build_key(tenant_id, key)
cached = self.redis.get(full_key)
if cached:
data = json.loads(cached)
# Verify tenant_id in cached data matches (defense in depth)
if data.get("_tenant_id") != tenant_id:
self.redis.delete(full_key) # Purge potentially poisoned entry
return None
return data.get("value")
return None
def set(self, key: str, value: Any, ttl: int = None, tenant_id: str = None):
"""Set cached value for current tenant."""
tenant_id = tenant_id or current_tenant.get().tenant_id
full_key = self._build_key(tenant_id, key)
# Include tenant_id in cached data for verification
data = {
"_tenant_id": tenant_id,
"value": value
}
self.redis.setex(full_key, ttl or self.default_ttl, json.dumps(data))
def invalidate_tenant(self, tenant_id: str):
"""Invalidate all cache entries for a tenant."""
tenant_hash = hashlib.sha256(tenant_id.encode()).hexdigest()[:16]
pattern = f"t:{tenant_hash}:*"
cursor = 0
while True:
cursor, keys = self.redis.scan(cursor, match=pattern, count=1000)
if keys:
self.redis.delete(*keys)
if cursor == 0:
break
def tenant_cached(key_template: str, ttl: int = 3600):
"""Decorator for tenant-aware caching."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
cache = get_tenant_cache()
cache_key = key_template.format(**kwargs)
cached = cache.get(cache_key)
if cached is not None:
return cached
result = await func(*args, **kwargs)
cache.set(cache_key, result, ttl=ttl)
return result
return wrapper
return decorator
# Usage
@tenant_cached("user_prefs:{user_id}", ttl=1800)
async def get_user_preferences(user_id: str):
# This is automatically cached per-tenant
return await db.fetch_preferences(user_id)
5. API Security & Rate Limiting¶
- Implement per-tenant rate limiting and quotas.
- Apply tenant-specific API throttling.
- Validate tenant context on every API request.
- Use separate API keys per tenant.
- Implement tenant-aware request signing for B2B APIs.
Tenant-Aware Rate Limiting
import time
from dataclasses import dataclass
from enum import Enum
class TenantTier(Enum):
FREE = "free"
STARTER = "starter"
BUSINESS = "business"
ENTERPRISE = "enterprise"
@dataclass
class RateLimitConfig:
requests_per_minute: int
requests_per_day: int
burst_size: int
TIER_LIMITS = {
TenantTier.FREE: RateLimitConfig(60, 1000, 10),
TenantTier.STARTER: RateLimitConfig(300, 10000, 50),
TenantTier.BUSINESS: RateLimitConfig(1000, 100000, 100),
TenantTier.ENTERPRISE: RateLimitConfig(5000, 1000000, 500),
}
class TenantRateLimiter:
"""Per-tenant rate limiting with tier support."""
def __init__(self, redis_client):
self.redis = redis_client
async def check_rate_limit(self, tenant_id: str, tenant_tier: TenantTier) -> dict:
"""Check and update rate limit for tenant."""
config = TIER_LIMITS[tenant_tier]
now = time.time()
minute_key = f"rl:{tenant_id}:min:{int(now // 60)}"
day_key = f"rl:{tenant_id}:day:{int(now // 86400)}"
pipe = self.redis.pipeline()
# Increment counters
pipe.incr(minute_key)
pipe.expire(minute_key, 60)
pipe.incr(day_key)
pipe.expire(day_key, 86400)
results = pipe.execute()
minute_count = results[0]
day_count = results[2]
# Check limits
if minute_count > config.requests_per_minute:
return {
"allowed": False,
"reason": "minute_limit_exceeded",
"retry_after": 60 - (now % 60),
"limit": config.requests_per_minute
}
if day_count > config.requests_per_day:
return {
"allowed": False,
"reason": "daily_limit_exceeded",
"retry_after": 86400 - (now % 86400),
"limit": config.requests_per_day
}
return {
"allowed": True,
"remaining_minute": config.requests_per_minute - minute_count,
"remaining_day": config.requests_per_day - day_count
}
class RateLimitMiddleware:
"""Middleware that enforces tenant rate limits."""
async def __call__(self, request, call_next):
ctx = current_tenant.get()
if not ctx:
return await call_next(request)
tenant = await self.tenant_service.get_tenant(ctx.tenant_id)
result = await self.rate_limiter.check_rate_limit(
ctx.tenant_id,
tenant.tier
)
if not result["allowed"]:
return JSONResponse(
status_code=429,
content={"error": "Rate limit exceeded", "details": result},
headers={
"Retry-After": str(int(result["retry_after"])),
"X-RateLimit-Limit": str(result["limit"]),
"X-RateLimit-Remaining": "0"
}
)
response = await call_next(request)
# Add rate limit headers
response.headers["X-RateLimit-Remaining-Minute"] = str(result["remaining_minute"])
response.headers["X-RateLimit-Remaining-Day"] = str(result["remaining_day"])
return response
6. File Storage & Blob Isolation¶
- Use tenant-prefixed paths for all file storage.
- Implement storage access policies per tenant.
- Validate tenant ownership before serving files.
- Use signed URLs with tenant context embedded.
- Encrypt files at rest with tenant-specific keys (for high-security requirements).
Secure Multi-Tenant File Storage
import boto3
from botocore.config import Config
from datetime import datetime, timedelta
import hashlib
import hmac
class TenantFileStorage:
"""S3-based file storage with tenant isolation."""
def __init__(self, bucket_name: str, kms_key_id: str = None):
self.bucket = bucket_name
self.s3 = boto3.client('s3', config=Config(signature_version='s3v4'))
self.kms_key_id = kms_key_id
def _get_tenant_prefix(self, tenant_id: str) -> str:
"""Generate tenant-specific path prefix."""
# Use hashed prefix to prevent enumeration
tenant_hash = hashlib.sha256(tenant_id.encode()).hexdigest()[:12]
return f"tenants/{tenant_hash}"
def _build_key(self, tenant_id: str, file_path: str) -> str:
"""Build full S3 key with tenant isolation."""
# Sanitize file path to prevent traversal
safe_path = file_path.lstrip('/').replace('..', '')
return f"{self._get_tenant_prefix(tenant_id)}/{safe_path}"
async def upload_file(self, tenant_id: str, file_path: str,
content: bytes, content_type: str) -> dict:
"""Upload file for tenant."""
key = self._build_key(tenant_id, file_path)
extra_args = {
'ContentType': content_type,
'Metadata': {
'tenant-id': tenant_id,
'uploaded-at': datetime.utcnow().isoformat()
}
}
# Use tenant-specific KMS key if available
if self.kms_key_id:
extra_args['ServerSideEncryption'] = 'aws:kms'
extra_args['SSEKMSKeyId'] = self.kms_key_id
self.s3.put_object(
Bucket=self.bucket,
Key=key,
Body=content,
**extra_args
)
return {"key": key, "size": len(content)}
async def get_file(self, tenant_id: str, file_path: str) -> Optional[bytes]:
"""Get file only if it belongs to tenant."""
key = self._build_key(tenant_id, file_path)
try:
response = self.s3.get_object(Bucket=self.bucket, Key=key)
# Verify tenant ownership from metadata
metadata_tenant = response.get('Metadata', {}).get('tenant-id')
if metadata_tenant != tenant_id:
raise SecurityException("Tenant mismatch in file metadata")
return response['Body'].read()
except self.s3.exceptions.NoSuchKey:
return None
def generate_presigned_url(self, tenant_id: str, file_path: str,
expiration: int = 3600,
operation: str = 'get_object') -> str:
"""Generate presigned URL with tenant validation."""
key = self._build_key(tenant_id, file_path)
# Include tenant_id in the signed URL for validation
url = self.s3.generate_presigned_url(
ClientMethod=operation,
Params={
'Bucket': self.bucket,
'Key': key,
},
ExpiresIn=expiration
)
return url
async def delete_tenant_data(self, tenant_id: str):
"""Delete all files for a tenant (for offboarding)."""
prefix = self._get_tenant_prefix(tenant_id)
paginator = self.s3.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
objects = page.get('Contents', [])
if objects:
self.s3.delete_objects(
Bucket=self.bucket,
Delete={'Objects': [{'Key': obj['Key']} for obj in objects]}
)
7. Tenant Onboarding & Offboarding Security¶
- Implement secure tenant provisioning with isolated resources.
- Generate unique encryption keys per tenant where required.
- Ensure complete data deletion on tenant offboarding.
- Maintain audit trail of provisioning/deprovisioning.
- Implement data export for tenant portability.
Secure Tenant Lifecycle Management
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
import secrets
class TenantStatus(Enum):
PROVISIONING = "provisioning"
ACTIVE = "active"
SUSPENDED = "suspended"
OFFBOARDING = "offboarding"
DELETED = "deleted"
@dataclass
class TenantProvisioningResult:
tenant_id: str
status: TenantStatus
api_key: str
database_schema: str
storage_prefix: str
class TenantLifecycleManager:
"""Manages secure tenant onboarding and offboarding."""
def __init__(self, db, cache, storage, audit_log):
self.db = db
self.cache = cache
self.storage = storage
self.audit = audit_log
async def provision_tenant(self, tenant_name: str, admin_email: str,
tier: TenantTier) -> TenantProvisioningResult:
"""Securely provision a new tenant."""
tenant_id = secrets.token_urlsafe(16)
await self.audit.log("tenant_provisioning_started", {
"tenant_id": tenant_id,
"tenant_name": tenant_name,
"tier": tier.value
})
try:
# 1. Create tenant record
tenant = await self.db.create_tenant(
id=tenant_id,
name=tenant_name,
status=TenantStatus.PROVISIONING,
tier=tier
)
# 2. Create isolated database schema (if using schema isolation)
schema_name = f"tenant_{tenant_id.replace('-', '_')}"
await self.db.execute(f"CREATE SCHEMA {schema_name}")
await self._apply_schema_migrations(schema_name)
# 3. Generate API credentials
api_key = secrets.token_urlsafe(32)
api_key_hash = hashlib.sha256(api_key.encode()).hexdigest()
await self.db.store_api_key(tenant_id, api_key_hash)
# 4. Create storage prefix
storage_prefix = self.storage._get_tenant_prefix(tenant_id)
# 5. Initialize tenant-specific encryption key (if required)
if tier in [TenantTier.BUSINESS, TenantTier.ENTERPRISE]:
await self._provision_tenant_kms_key(tenant_id)
# 6. Activate tenant
await self.db.update_tenant_status(tenant_id, TenantStatus.ACTIVE)
await self.audit.log("tenant_provisioning_completed", {
"tenant_id": tenant_id,
"schema": schema_name
})
return TenantProvisioningResult(
tenant_id=tenant_id,
status=TenantStatus.ACTIVE,
api_key=api_key, # Return only once, never stored in plain text
database_schema=schema_name,
storage_prefix=storage_prefix
)
except Exception as e:
await self.audit.log("tenant_provisioning_failed", {
"tenant_id": tenant_id,
"error": str(e)
})
await self._cleanup_failed_provisioning(tenant_id)
raise
async def offboard_tenant(self, tenant_id: str,
retain_days: int = 30) -> dict:
"""Securely offboard a tenant with data retention."""
await self.audit.log("tenant_offboarding_started", {"tenant_id": tenant_id})
# 1. Mark tenant as offboarding (prevents new operations)
await self.db.update_tenant_status(tenant_id, TenantStatus.OFFBOARDING)
# 2. Revoke all active sessions and API keys
await self._revoke_all_access(tenant_id)
# 3. Export data for compliance/portability
export_location = await self._export_tenant_data(tenant_id)
# 4. Schedule data deletion after retention period
deletion_date = datetime.utcnow() + timedelta(days=retain_days)
await self.db.schedule_tenant_deletion(tenant_id, deletion_date)
await self.audit.log("tenant_offboarding_completed", {
"tenant_id": tenant_id,
"export_location": export_location,
"scheduled_deletion": deletion_date.isoformat()
})
return {
"status": "offboarding_complete",
"data_export": export_location,
"final_deletion": deletion_date.isoformat()
}
async def execute_tenant_deletion(self, tenant_id: str):
"""Permanently delete all tenant data."""
await self.audit.log("tenant_deletion_started", {"tenant_id": tenant_id})
# 1. Delete database schema/data
schema_name = f"tenant_{tenant_id.replace('-', '_')}"
await self.db.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE")
# For shared table model, delete rows
await self.db.execute(
"DELETE FROM shared_table WHERE tenant_id = :tid",
{"tid": tenant_id}
)
# 2. Delete cached data
await self.cache.invalidate_tenant(tenant_id)
# 3. Delete stored files
await self.storage.delete_tenant_data(tenant_id)
# 4. Delete encryption keys
await self._delete_tenant_kms_key(tenant_id)
# 5. Mark as deleted (keep minimal audit record)
await self.db.update_tenant_status(tenant_id, TenantStatus.DELETED)
await self.audit.log("tenant_deletion_completed", {"tenant_id": tenant_id})
8. Logging, Monitoring & Audit¶
- Include tenant context in all log entries.
- Implement tenant-isolated audit trails.
- Monitor for cross-tenant access attempts.
- Set up alerts for tenant isolation violations.
- Ensure compliance with tenant-specific retention policies.
Tenant-Aware Logging & Monitoring
import structlog
from typing import Any, Dict
from datetime import datetime
class TenantAwareLogger:
"""Logger that automatically includes tenant context."""
def __init__(self):
self.logger = structlog.get_logger()
def _enrich_with_tenant(self, event_data: dict) -> dict:
"""Add tenant context to log entry."""
ctx = current_tenant.get()
if ctx:
event_data["tenant_id"] = ctx.tenant_id
event_data["user_id"] = ctx.user_id
return event_data
def info(self, message: str, **kwargs):
self.logger.info(message, **self._enrich_with_tenant(kwargs))
def warning(self, message: str, **kwargs):
self.logger.warning(message, **self._enrich_with_tenant(kwargs))
def error(self, message: str, **kwargs):
self.logger.error(message, **self._enrich_with_tenant(kwargs))
def security_event(self, event_type: str, severity: str, **kwargs):
"""Log security-relevant events."""
self.logger.warning(
"security_event",
event_type=event_type,
severity=severity,
**self._enrich_with_tenant(kwargs)
)
class TenantAuditLog:
"""Immutable audit log with tenant isolation."""
def __init__(self, db):
self.db = db
async def log(self, action: str, details: Dict[str, Any],
tenant_id: str = None):
"""Record audit entry."""
ctx = current_tenant.get()
tenant_id = tenant_id or (ctx.tenant_id if ctx else "system")
entry = {
"id": secrets.token_urlsafe(16),
"tenant_id": tenant_id,
"user_id": ctx.user_id if ctx else None,
"action": action,
"details": details,
"timestamp": datetime.utcnow(),
"ip_address": get_client_ip(),
"user_agent": get_user_agent()
}
# Insert into append-only audit table
await self.db.execute("""
INSERT INTO audit_log
(id, tenant_id, user_id, action, details, timestamp, ip_address, user_agent)
VALUES (:id, :tenant_id, :user_id, :action, :details, :timestamp, :ip_address, :user_agent)
""", entry)
async def get_tenant_audit_trail(self, tenant_id: str,
start_date: datetime,
end_date: datetime) -> list:
"""Retrieve audit trail for a specific tenant."""
ctx = current_tenant.get()
# Ensure requester can only access their own audit logs
if ctx.tenant_id != tenant_id and "admin" not in ctx.roles:
raise SecurityException("Cannot access other tenant's audit logs")
return await self.db.fetch_all("""
SELECT * FROM audit_log
WHERE tenant_id = :tenant_id
AND timestamp BETWEEN :start AND :end
ORDER BY timestamp DESC
""", {"tenant_id": tenant_id, "start": start_date, "end": end_date})
class CrossTenantAccessMonitor:
"""Monitor and alert on potential cross-tenant access attempts."""
def __init__(self, alert_service):
self.alerts = alert_service
self.violation_counts = {}
async def check_access(self, requested_tenant: str,
resource_type: str, resource_id: str):
"""Check for cross-tenant access attempts."""
ctx = current_tenant.get()
if ctx.tenant_id != requested_tenant:
# Log violation
logger.security_event(
"cross_tenant_access_attempt",
severity="HIGH",
requested_tenant=requested_tenant,
resource_type=resource_type,
resource_id=resource_id
)
# Track violations per user
key = f"{ctx.user_id}:{ctx.tenant_id}"
self.violation_counts[key] = self.violation_counts.get(key, 0) + 1
# Alert on repeated attempts
if self.violation_counts[key] >= 3:
await self.alerts.send(
severity="CRITICAL",
message=f"Repeated cross-tenant access attempts detected",
details={
"user_id": ctx.user_id,
"tenant_id": ctx.tenant_id,
"attempts": self.violation_counts[key]
}
)
raise SecurityException("Access denied: resource belongs to different tenant")
Do's and Don'ts¶
Do:
- Derive tenant context from authenticated, verified tokens.
- Use database-level isolation (RLS, schemas) as defense in depth.
- Include tenant_id in all resource queries, cache keys, and storage paths.
- Implement per-tenant rate limiting and quotas.
- Log tenant context with every operation.
- Validate tenant ownership at the data access layer.
- Use separate encryption keys for high-security tenants.
- Implement complete data deletion for offboarding.
- Monitor and alert on cross-tenant access attempts.
Don't:
- Trust tenant IDs from client headers or request parameters.
- Use shared cache keys without tenant prefixes.
- Expose sequential or guessable tenant/resource IDs.
- Allow queries without tenant filters (even for admins without explicit override).
- Store tenant data without tenant_id columns.
- Share API keys or credentials across tenants.
- Skip tenant validation for "internal" services.
- Retain tenant data indefinitely after offboarding.
- Log sensitive tenant data in plain text.