Diwash Bhandari
Software Developer | AI Enthusiast
Versatile Software Developer with over years of experience in designing, developing, and testing high-quality software solutions.
About Me
My professional profile and what I bring to the table
Versatile Software Developer with over 4 years of experience in designing, developing, and testing high-quality software solutions. Strong expertise in implementing robust testing strategies to ensure code reliability, maintainability, and performance. Adept at collaborating across cross-functional teams to deliver user-focused, scalable applications that align with business objectives. Skilled in translating complex requirements into effective technical solutions, focusing on clarity, visualization, and continuous improvement. Known for delivering clean, efficient code and driving innovation in fast-paced development environments.
5+
Years of Experience
Since 2020
10+
Projects Completed
And counting...
4+
Technologies
Always learning
Clean Code
Writing maintainable, scalable, and efficient code with proper testing strategies
Innovation
Leveraging cutting-edge technologies and AI to solve complex business problems
Collaboration
Working effectively with cross-functional teams to deliver user-focused solutions
Work Experience
A journey of growth and innovation in software development
Software Engineer
Codavatar Tech Pvt. Ltd.
June 2023 - Present
Kalopul Commercial Building, Kalopul, Kathmandu
Designed and developed scalable RESTful APIs using FastAPI and Starlette, enabling high-throughput data handling and improved backend responsiveness for SaaS platforms.
Architected modular microservices to support distributed systems, simplifying deployments and enhancing maintainability across enterprise-grade applications.
Implemented real-time features with WebSockets and asynchronous programming, improving user experience and enabling live collaboration in multi-tenant environments.
Built robust internal frameworks and developer tooling to streamline onboarding, enforce standards, and reduce integration overhead across teams.
Worked extensively with technologies like PostgreSQL, Redis, gRPC, and GraphQL to deliver reliable, secure, and high-performance backend systems.
Software Engineer
Chuchuro Firm
May 2022 - July 2023
Sinamangal Kathmandu
Developed and maintained Python applications using Peewee ORM, Tornado Framework, RabbitMQ, and Meilisearch, enhancing performance and scalability.
Ensured high code quality by writing clean, maintainable, and testable Python code, and implemented rigorous testing with Pytest, improving software reliability.
Implemented RabbitMQ for asynchronous processing, optimizing system efficiency and throughput, and integrated Meilisearch to enhance search capabilities.
Contributed to QA efforts by combining manual and automated testing to effectively identify and resolve issues, leading to a more stable and user-friendly product.
Intern , Associate Software Engineer
Young Minds Creation (P) Ltd
December 2020 - April 2022
Young Minds Tower
Developed and maintained complex Laravel-based web applications, ensuring strong performance and scalability.
Wrote clean, maintainable, and testable PHP code, utilizing Laravel's built-in features to enhance application functionality and reliability.
Built and integrated RESTful APIs for seamless data exchange between systems, implementing security measures such as password hashing and encryption to protect data.
Extended application functionality by integrating third-party packages and libraries, contributing to a more versatile and feature-rich platform.
Graphic Designer
Pinches Artcore
May 2019 - Jan 2020
Creating designs for various mediums, such as print materials, digital platforms, and social media.
Designing logos, brochures, flyers, posters, and other marketing materials.
Working with clients to understand their design needs and preferences.
Featured Projects
A selection of my recent work
Recent Articles
Exploring ideas in tech, development, and beyond
Docker Model Runner: The Game-Changer for Local AI Development — A Complete Developer’s Guide
Docker Model Runner: The Game-Changer for Local AI Development — A Complete Developer’s Guide How Docker Desktop 4.40+ transforms AI development with zero-setup local LLM inference and OpenAI-compatible APIs Introduction: AI Development Just Got Simpler If you’ve ever spent hours wrestling with Python virtual environments, CUDA installations, and model downloads just to run a simple AI model locally, Docker Model Runner is about to change your life. This groundbreaking new feature, integrated directly into Docker Desktop 4.40+, brings Large Language Model (LLM) inference into your development workflow with literally zero setup complexity. The bottom line: Docker Model Runner provides OpenAI-compatible APIs for local AI models, eliminating the infrastructure headaches that have plagued AI development for years. 🚧 Beta Alert: Docker Model Runner is currently in beta (as of Docker Desktop 4.40+), representing Docker’s bold vision for the future of AI development. While in beta, it’s already production-capable and actively being adopted by forward-thinking development teams worldwide. 🌟 The Revolutionary Impact of Docker Model Runner A Paradigm Shift in AI Development Docker Model Runner isn’t just another tool — it’s a fundamental transformation in how we approach AI development. As a beta feature that’s already reshaping development workflows, it represents Docker’s vision for democratizing AI and making machine learning as accessible as deploying a web server. This is bigger than just convenience. Docker Model Runner is creating a new category of AI-first applications where local inference becomes the default, not the exception. Industry Impact and Adoption Trends 🏢 Enterprise Benefits: Cost Reduction: Eliminate expensive cloud API calls during development and testing Data Privacy: Keep sensitive data on-premises without external API dependencies Compliance: Meet strict regulatory requirements with fully local AI processing Performance: Sub-second response times without network latency Scalability: Independent scaling without per-request API costs 👥 Developer Community Impact: Democratization: AI development accessible to developers without ML expertise Innovation Acceleration: Rapid prototyping without infrastructure barriers Open Source Enablement: Local models become first-class citizens in open source projects Educational Access: Students and learners can experiment without cloud costs 🌍 Ecosystem Transformation: Hybrid Workflows: Seamless switching between local and cloud models based on requirements Model Distribution: OCI artifacts make AI models as distributable as container images DevOps Integration: AI inference becomes part of standard CI/CD pipelines 🎯 Why Docker Model Runner Matters for Developers The Old Way vs. The New Way Before Docker Model Runner: Install Python, manage virtual environments Download and configure CUDA drivers Manually download multi-gigabyte model files Set up inference servers (Ollama, vLLM, etc.) Deal with version conflicts and dependency hell With Docker Model Runner: docker model pull ai/smollm2 docker model run ai/smollm2 "Hello world" Done. 🎉 Key Benefits That Matter ✅ Zero Infrastructure Setup — Runs natively on your machine without additional servers or VMs ✅ OpenAI-Compatible API — Drop-in replacement for OpenAI API calls in existing applications ✅ GPU Acceleration — Optimized performance on Apple Silicon (M1/M2/M3) and NVIDIA GPUs ✅ OCI Artifact Distribution — Models are distributed as standard container artifacts ✅ Host-Based Execution — Maximum performance without virtualization overhead ✅ Beta Innovation — Access cutting-edge features and influence the future of AI tooling Real-World Benefits by Developer Type 🎯 For Full-Stack Developers: // Before: Complex setup, API keys, rate limitsconst response = await openai.chat.completions.create({...})// After: Local, unlimited, privateconst response = await localAI.chat.completions.create({...}) 🤖 For AI/ML Engineers: Rapid model experimentation without cloud costs A/B testing different models locally Custom model fine-tuning workflows Offline development capabilities 🏢 For Enterprise Teams: Compliance-friendly AI development Reduced operational costs Enhanced data privacy and security Predictable performance characteristics 🎓 For Students and Researchers: Free access to powerful AI models Unlimited experimentation without budget constraints Reproducible research environments Learning without cloud complexity The Beta Advantage: Being at the Forefront Why Join the Beta Community: 🔬 Early Access to Innovation: Experience next-generation AI tooling before widespread adoption 🗣️ Community Influence: Your feedback directly shapes the future of Docker’s AI strategy 📈 Competitive Edge: Build applications with capabilities your competitors don’t have yet 🛠️ Learning Opportunity: Master emerging technologies while they’re still forming Beta Considerations: Features may change based on community feedback Documentation and tooling continue to evolve Active development means rapid improvements and new capabilities Early adopter advantages in understanding and implementing AI workflows 🚀 Getting Started: Prerequisites and Setup System Requirements Docker Desktop 4.40+ Required (4.41+ for Windows GPU support) macOS: Apple Silicon (M1/M2/M3) for optimal performance Windows: NVIDIA GPU recommended for acceleration Linux: Docker Engine with Model Runner plugin Enabling Docker Model Runner Method 1: Docker Desktop GUI Open Docker Desktop Settings Navigate to Features in development → Beta Enable “Docker Model Runner” Apply & Restart Docker Desktop Method 2: Command Line Interface # Enable Model Runnerdocker desktop enable model-runner# Enable with TCP support for host accessdocker desktop enable model-runner --tcp 12434# Verify installationdocker desktop status Method 3: Docker Engine (Linux) sudo apt-get updatesudo apt-get install docker-model-plugin 📋 Essential Commands Every Developer Should Know Model Management Pulling Models # Pull the latest version of a modeldocker model pull ai/smollm2# Pull specific model variantsdocker model pull ai/llama3.2:1bdocker model pull ai/qwen2.5:3b Listing Available Models # Show all locally available modelsdocker model ls Model Cleanup # Remove specific models to free spacedocker model rm ai/smollm2docker model rm ai/llama3.2:1b Running Models Interactive Inference # Quick one-shot inferencedocker model run ai/smollm2 "Explain Docker in one sentence"# Interactive conversation modedocker model run -it ai/smollm2 Model Inspection # Get detailed model informationdocker model inspect ai/smollm2 🔗 API Integration: OpenAI-Compatible Endpoints This is where Docker Model Runner truly shines. The OpenAI-compatible API means you can replace https://api.openai.com/v1 with your local endpoint and everything just works. Endpoint URLs From Docker Containers: http://model-runner.docker.internal/engines/llama.cpp/v1/ From Host Machine (TCP enabled): http://localhost:12434/engines/llama.cpp/v1/ Chat Completions API Examples cURL Example curl http://localhost:12434/engines/llama.cpp/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "ai/smollm2", "messages": [ { "role": "system", "content": "You are a helpful coding assistant specializing in containerization." }, { "role": "user", "content": "Write a Docker Compose file for a React app with PostgreSQL" } ], "temperature": 0.7, "max_tokens": 500 }' Python Integration import openai# Configure client for local Model Runnerclient = openai.OpenAI( base_url="http://model-runner.docker.internal/engines/llama.cpp/v1", api_key="not-needed" # Local inference doesn't require API key)def chat_with_local_model(prompt): response = client.chat.completions.create( model="ai/smollm2", messages=[ {"role": "system", "content": "You are a helpful DevOps assistant."}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=200 ) return response.choices[0].message.content# Example usageresult = chat_with_local_model("Explain containerization benefits for microservices")print(result) Node.js Integration import OpenAI from 'openai';const openai = new OpenAI({ baseURL: 'http://model-runner.docker.internal/engines/llama.cpp/v1', apiKey: 'not-needed'});async function generateCode(prompt) { try { const completion = await openai.chat.completions.create({ model: 'ai/smollm2', messages: [ { role: 'system', content: 'You are an expert software architect.' }, { role: 'user', content: prompt } ], temperature: 0.8, max_tokens: 300 }); return completion.choices[0].message.content; } catch (error) { console.error('Error generating code:', error); throw error; }}// Usage exampleconst architectureAdvice = await generateCode( 'Best practices for designing scalable Docker microservices?');console.log(architectureAdvice); FastAPI Integration Example from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport openaiimport asyncioapp = FastAPI(title="Local AI API")# Configure local model clientlocal_client = openai.OpenAI( base_url="http://model-runner.docker.internal/engines/llama.cpp/v1", api_key="not-needed")class ChatRequest(BaseModel): message: str temperature: float = 0.7class ChatResponse(BaseModel): response: str model: str@app.post("/chat", response_model=ChatResponse)async def chat_endpoint(request: ChatRequest): try: response = local_client.chat.completions.create( model="ai/smollm2", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": request.message} ], temperature=request.temperature, max_tokens=500 ) return ChatResponse( response=response.choices[0].message.content, model="ai/smollm2" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e))@app.get("/models")async def list_models(): # This would typically call the local model runner API return {"models": ["ai/smollm2", "ai/llama3.2:1b"]} 🐳 Docker Compose Integration One of the most powerful features is the ability to integrate AI models directly into your Docker Compose workflows: Basic AI Service Integration version: '3.8'services: web-app: build: . ports: - "3000:3000" depends_on: - ai_service environment: - AI_BASE_URL=http://ai_service:8000 ai_service: provider: type: model options: model: ai/smollm2 # Optional configurations temperature: 0.7 max_tokens: 1000 database: image: postgres:15 environment: POSTGRES_DB: myapp POSTGRES_USER: user POSTGRES_PASSWORD: password Advanced Multi-Model Setup version: '3.8'services: api: build: ./api ports: - "8000:8000" environment: - CHAT_MODEL_URL=http://chat_model:8000 - CODE_MODEL_URL=http://code_model:8000 depends_on: - chat_model - code_model chat_model: provider: type: model options: model: ai/smollm2 context_length: 4096 code_model: provider: type: model options: model: ai/qwen2.5:3b temperature: 0.2 max_tokens: 2000 frontend: build: ./frontend ports: - "3000:3000" depends_on: - api 🛠️ Advanced API Endpoints and Model Management Docker Model Management API Docker Model Runner provides comprehensive REST APIs for model management: # Create/pull a modelPOST /models/createContent-Type: application/json{ "name": "ai/smollm2", "tag": "latest"}# List all modelsGET /models# Get specific model infoGET /models/ai/smollm2# Delete a modelDELETE /models/ai/smollm2 OpenAI-Compatible Endpoints # List available modelsGET /engines/llama.cpp/v1/models# Get specific model detailsGET /engines/llama.cpp/v1/models/ai/smollm2# Chat completions (primary endpoint)POST /engines/llama.cpp/v1/chat/completions# Legacy completionsPOST /engines/llama.cpp/v1/completions# Generate embeddingsPOST /engines/llama.cpp/v1/embeddings Embeddings API Example import openaiclient = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed")# Generate embeddings for semantic searchdef get_embeddings(texts): response = client.embeddings.create( model="ai/smollm2", input=texts ) return [embedding.embedding for embedding in response.data]# Example usagedocuments = [ "Docker containers provide isolation and portability", "Kubernetes orchestrates containerized applications", "Model Runner simplifies AI development workflows"]embeddings = get_embeddings(documents)print(f"Generated {len(embeddings)} embeddings") 🏗️ Real-World Development Patterns Pattern 1: Local Development with Fallback import openaiimport osfrom typing import Optionalclass AIClient: def __init__(self): self.local_available = self._check_local_model() def _check_local_model(self) -> bool: try: client = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed", timeout=5.0 ) client.models.list() return True except: return False def chat(self, messages, temperature=0.7): if self.local_available: return self._chat_local(messages, temperature) else: return self._chat_remote(messages, temperature) def _chat_local(self, messages, temperature): client = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) return client.chat.completions.create( model="ai/smollm2", messages=messages, temperature=temperature ) def _chat_remote(self, messages, temperature): client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) return client.chat.completions.create( model="gpt-3.5-turbo", messages=messages, temperature=temperature )# Usageai = AIClient()response = ai.chat([ {"role": "user", "content": "Explain Docker layers"}]) Pattern 2: Model-Specific Routing class MultiModelAI: def __init__(self): self.client = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) def chat_general(self, prompt): return self._complete("ai/smollm2", prompt, temperature=0.7) def generate_code(self, prompt): return self._complete("ai/qwen2.5:3b", prompt, temperature=0.2) def creative_writing(self, prompt): return self._complete("ai/llama3.2:1b", prompt, temperature=0.9) def _complete(self, model, prompt, temperature): return self.client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=temperature ) 🔧 Troubleshooting Common Issues Model Not Found Errors # Check available modelsdocker model ls# Pull the model if missingdocker model pull ai/smollm2# Verify model is runningdocker ps | grep model Connection Issues # Check Docker Model Runner statusdocker desktop status# Verify TCP port is enabled (for host access)docker desktop enable model-runner --tcp 12434# Test connectivitycurl http://localhost:12434/engines/llama.cpp/v1/models Performance Optimization # Monitor resource usagedocker stats# Check GPU utilization (if available)nvidia-smi # For NVIDIA GPUs# macOS Activity Monitor for Apple Silicon Memory Management # Implement connection pooling for high-throughput applicationsfrom openai import OpenAIimport threadingclass PooledAIClient: def __init__(self, pool_size=5): self._clients = [] self._lock = threading.Lock() for _ in range(pool_size): client = OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) self._clients.append(client) def get_client(self): with self._lock: if self._clients: return self._clients.pop() return None def return_client(self, client): with self._lock: self._clients.append(client) 🚀 Production Considerations Security Best Practices Network Isolation: Use internal Docker networks for model communication Resource Limits: Set appropriate memory and CPU limits Access Control: Implement authentication layers for external access # docker-compose.production.ymlversion: '3.8'services: ai_service: provider: type: model options: model: ai/smollm2 deploy: resources: limits: memory: 4G cpus: '2.0' networks: - ai_internalnetworks: ai_internal: internal: true Monitoring and Logging import loggingfrom datetime import datetime# Set up comprehensive logginglogging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')class MonitoredAIClient: def __init__(self): self.logger = logging.getLogger(__name__) self.client = OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) def chat_with_metrics(self, messages): start_time = datetime.now() try: response = self.client.chat.completions.create( model="ai/smollm2", messages=messages ) duration = (datetime.now() - start_time).total_seconds() self.logger.info(f"Chat completed in {duration:.2f}s") return response except Exception as e: self.logger.error(f"Chat failed: {str(e)}") raise 🎯 Conclusion: The Future of Local AI Development Docker Model Runner represents a paradigm shift in how we approach AI development. By eliminating infrastructure complexity and providing OpenAI-compatible APIs, it democratizes AI development and makes local inference accessible to every developer. Key Takeaways Zero Setup Complexity: No more wrestling with Python environments and CUDA installations OpenAI Compatibility: Drop-in replacement for existing OpenAI integrations Production Ready: Docker Compose integration and comprehensive API support Performance Optimized: GPU acceleration and host-based execution Developer Friendly: Familiar Docker commands and standard REST APIs What’s Next? The future of AI development is local-first, privacy-conscious, and infrastructure-agnostic. Docker Model Runner is leading this transformation by making AI models as easy to deploy as any other containerized application. Whether you’re building chatbots, code generation tools, or complex AI-powered applications, Docker Model Runner provides the foundation for reliable, scalable, and maintainable AI development workflows. Ready to get started? Pull your first model today: docker model pull ai/smollm2docker model run ai/smollm2 "Hello, Docker Model Runner!" The future of AI development is here, and it runs in Docker. 🚀 Have you tried Docker Model Runner in your projects? Share your experiences and use cases in the comments below!
Transform basic phone bots into intelligent conversational agents using Retrieval-Augmented Generation and modern Python architecture The Problem with Traditional Call Bots Picture this: You’ve built a call assistant that can transcribe speech and respond to customers, but it feels robotic and generic. When customers ask specific questions about your product, pricing, or policies, your bot either gives canned responses or completely misses the mark. Sound familiar? Traditional call bots suffer from a fundamental limitation: they lack access to dynamic, contextual information. They’re essentially expensive tape recorders with basic NLP capabilities. But what if your call assistant could instantly access your entire knowledge base, understand the context of the conversation, and generate intelligent, personalized responses in real-time? Enter Retrieval-Augmented Generation (RAG) — the game-changing architecture that’s revolutionizing how we build intelligent conversational AI. What is RAG and Why Does It Matter for Call Assistants? RAG is an AI architecture pattern that combines the power of large language models with external knowledge retrieval. Instead of relying solely on the model’s training data, RAG systems can dynamically fetch relevant information from external sources and use it to generate more accurate, contextual responses. For call assistants, this is transformative: • Dynamic Knowledge: Access up-to-date company information, product details, and policies • Contextual Responses: Generate replies based on specific customer queries and conversation history • Consistent Messaging: Ensure all responses align with approved company information • Scalable Intelligence: Add new knowledge without retraining models Architecture Overview: From Audio to Intelligent Response Let’s break down the complete architecture of a production-ready RAG-powered call assistant: ┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ Customer │───▶│ Twilio │───▶│ FastAPI ││ Phone │ │ WebSocket │ │ Server │ └─────────────┘ └──────────────┘ └─────────────┘ │ ▼┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ Response │◀───│ Speech-to- │───▶│ RAG System ││ Generation │ │ Text Service │ │ │└─────────────┘ └──────────────┘ └─────────────┘ │ │ ▼ ▼┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ OpenAI │◀───│ Conversation │───▶│ ChromaDB ││ GPT │ │ Context │ │ Vector DB │└─────────────┘ └──────────────┘ └─────────────┘ This architecture ensures sub-1000ms response times while maintaining intelligent, context-aware conversations. Project Structure: Building for Scale and Maintainability Before diving into implementation, let’s establish a clean, scalable project structure that follows Python best practices: ├── app/│ ├── core/│ │ └── config.py # Centralized configuration│ ├── services/│ │ ├── rag_service.py # RAG implementation with ChromaDB│ │ ├── response_service.py # Intelligent response generation│ │ └── speech_service.py # Speech processing services│ └── utils/│ ├── data_loader.py # Knowledge base utilities│ └── audio_utils.py # Audio processing helpers├── tests/│ └── test_rag.py # Comprehensive testing suite├── main.py # FastAPI application entry point├── requirements.txt # Dependencies└── docker-compose.yml # Production deployment This modular structure separates concerns, making the codebase maintainable and testable. Each service has a single responsibility, and the configuration is centralized for easy management. Step 1: Building the RAG Foundation Setting Up the Vector Database The heart of our RAG system is ChromaDB, a powerful vector database that enables semantic search across our knowledge base: # app/services/rag_service.pyimport chromadbfrom sentence_transformers import SentenceTransformerfrom chromadb.config import Settingsfrom typing import List, Dict, Anyfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass DocumentChunk: """Structured document chunk with metadata""" content: str doc_id: str chunk_id: str category: str source: str created_at: datetime metadata: Dict[str, Any] class RAGService: def __init__(self, embedding_model_name: str = "all-MiniLM-L6-v2"): self.embedding_model = SentenceTransformer(embedding_model_name) # Initialize ChromaDB with persistence self.client = chromadb.PersistentClient( path="./data/chroma_db", settings=Settings( anonymized_telemetry=False, is_persistent=True ) ) # Create collections for different knowledge domains self.collections = { 'products': self._get_or_create_collection('products'), 'policies': self._get_or_create_collection('policies'), 'faqs': self._get_or_create_collection('faqs'), 'conversations': self._get_or_create_collection('conversations') } # Hybrid search weights self.keyword_weights = {'exact_match': 0.4, 'semantic': 0.6} Implementing Hybrid Search The magic happens in our hybrid search implementation, which combines semantic similarity with keyword matching for optimal retrieval: def hybrid_search(self, query: str, collections: List[str] = None, n_results: int = 5) -> Dict[str, Any]: """Advanced hybrid search combining semantic and keyword matching""" if collections is None: collections = ['products', 'policies', 'faqs'] all_results = [] query_embedding = self.embedding_model.encode([query])[0] for collection_name in collections: collection = self.collections[collection_name] # Semantic search using vector similarity semantic_results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=n_results * 2, # Get more for filtering include=['documents', 'metadatas', 'distances'] ) # Process and score results for doc, metadata, distance in zip( semantic_results['documents'][0], semantic_results['metadatas'][0], semantic_results['distances'][0] ): # Calculate hybrid score semantic_score = 1 - distance # Convert distance to similarity keyword_score = self._keyword_similarity(query, doc) final_score = ( self.keyword_weights['semantic'] * semantic_score + self.keyword_weights['exact_match'] * keyword_score ) all_results.append({ 'content': doc, 'metadata': metadata, 'score': final_score, 'collection': collection_name }) # Sort by final score and return top results all_results.sort(key=lambda x: x['score'], reverse=True) return { 'results': all_results[:n_results], 'total_searched': len(all_results), 'query_analysis': self._analyze_query(query) } Step 2: Intelligent Response Generation Context-Aware Conversation Management Our response service maintains conversation context and adapts responses based on the call stage: # app/services/response_service.pyfrom enum import Enumfrom dataclasses import dataclassimport openai class CallStage(Enum): INTRODUCTION = "introduction" DISCOVERY = "discovery" PRESENTATION = "presentation" OBJECTION_HANDLING = "objection_handling" CLOSING = "closing" FOLLOWUP = "followup" @dataclassclass ConversationContext: """Comprehensive conversation state management""" call_id: str customer_profile: Dict[str, Any] discussed_topics: List[str] objections_raised: List[str] interests_expressed: List[str] call_stage: CallStage conversation_history: List[Dict[str, str]] sentiment_history: List[Dict[str, float]] last_rag_results: Optional[Dict[str, Any]] = None class ResponseService: def __init__(self, rag_service: RAGService): self.rag_service = rag_service self.openai_client = openai.OpenAI() # Stage-specific response templates self.response_templates = { 'introduction': """ You are a professional and friendly sales assistant. Keep the introduction concise, warm, and focus on understanding the customer's needs. """, 'discovery': """ Focus on understanding the customer's specific needs, pain points, and requirements. Ask thoughtful follow-up questions based on their responses. """, 'presentation': """ Present relevant solutions based on the customer's expressed needs. Use the retrieved company information to provide accurate, detailed responses. """ } Dynamic Response Generation The core of our intelligent response system combines RAG retrieval with contextual prompt engineering: async def generate_contextual_response(self, user_input: str, context: ConversationContext) -> Dict[str, Any]: """Generate intelligent, context-aware responses using RAG""" # Perform enhanced RAG retrieval rag_results = self.rag_service.hybrid_search( query=user_input, collections=self._select_relevant_collections(context), n_results=3 ) # Build comprehensive prompt with retrieved context system_prompt = self._build_system_prompt(context, rag_results) conversation_prompt = self._build_conversation_prompt( user_input, context, rag_results ) try: # Generate response using GPT with retrieved context response = await self._generate_with_retry( system_prompt=system_prompt, user_prompt=conversation_prompt, max_tokens=200, temperature=0.7 ) # Enhance response with confidence scoring enhanced_response = await self._enhance_response( response, context, rag_results ) return { 'response': enhanced_response['text'], 'confidence_score': enhanced_response['confidence'], 'sources_used': [r['metadata']['source'] for r in rag_results['results']], 'conversation_stage': context.call_stage.value, 'suggested_followups': enhanced_response['followups'] } except Exception as e: return self._generate_fallback_response(user_input, context) Step 3: Real-Time Voice Integration FastAPI WebSocket Handler Our main application handles real-time voice streams from Twilio using WebSocket connections: # main.pyfrom fastapi import FastAPI, WebSocket, WebSocketDisconnectimport asyncioimport jsonimport base64 @app.websocket("/media-stream")async def handle_media_stream(websocket: WebSocket): """Handle Twilio Media Stream WebSocket connection""" stream_sid = None audio_buffer = [] vad = VoiceActivityDetector() try: await websocket.accept() while True: message = await asyncio.wait_for( websocket.receive_text(), timeout=30.0 ) data = json.loads(message) if data["event"] == "start": stream_sid = data["start"]["streamSid"] await connection_manager.connect(websocket, stream_sid) elif data["event"] == "media" and stream_sid: # Process incoming audio payload = data["media"]["payload"] audio_chunk = base64.b64decode(payload) audio_buffer.append(audio_chunk) # Voice activity detection for speech segmentation if vad.detect_speech_end(audio_buffer): await process_audio_buffer(stream_sid, audio_buffer) audio_buffer = [] except WebSocketDisconnect: if stream_sid: connection_manager.disconnect(stream_sid) Audio Processing Pipeline The audio processing pipeline handles speech-to-text conversion with multiple provider fallbacks: # app/services/speech_service.pyclass SpeechRecognitionService: def __init__(self): self.google_client = speech.SpeechClient() if config.GOOGLE_CREDENTIALS_PATH else None async def transcribe_streaming(self, audio_chunks: List[bytes]) -> str: """Transcribe audio using Google Cloud Speech-to-Text with Whisper fallback""" if self.google_client: try: # Primary: Google Cloud Speech-to-Text return await self._transcribe_google(audio_chunks) except Exception as e: logger.warning(f"Google STT failed: {e}") # Fallback: OpenAI Whisper return await self._transcribe_whisper(b''.join(audio_chunks)) async def _transcribe_google(self, audio_chunks: List[bytes]) -> str: """Google Cloud Speech-to-Text implementation""" config_obj = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.MULAW, sample_rate_hertz=8000, language_code="en-US", enable_automatic_punctuation=True, model="phone_call" ) streaming_config = speech.StreamingRecognitionConfig( config=config_obj, interim_results=True, ) # Process streaming audio audio_generator = (speech.StreamingRecognizeRequest(audio_content=chunk) for chunk in audio_chunks) responses = self.google_client.streaming_recognize(streaming_config, audio_generator) for response in responses: for result in response.results: if result.is_final: return result.alternatives[0].transcript.strip() return "" Step 4: Data Loading and Knowledge Management Flexible Data Loading System Our data loading system supports multiple formats and provides easy knowledge base management: # app/utils/data_loader.pyclass DataLoader: def __init__(self, rag_service: RAGService): self.rag_service = rag_service async def load_sample_data(self): """Load sample company data into the RAG system""" # Sample product data products = [ { "content": "CloudSync Pro is our flagship cloud storage solution offering 1TB of secure storage with end-to-end encryption. Features include real-time sync across all devices, advanced sharing controls, and 99.9% uptime guarantee. Pricing starts at $9.99/month for individuals and $19.99/month for teams.", "category": "product", "source": "product_catalog", "metadata": {"product_name": "CloudSync Pro", "price_individual": 9.99, "price_team": 19.99} }, # ... more products ] # Convert to DocumentChunk objects and load await self._load_documents(products, "products") async def load_from_csv(self, file_path: str, collection_name: str): """Load documents from CSV file""" doc_chunks = [] with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for i, row in enumerate(reader): chunk = DocumentChunk( content=row.get('content', ''), doc_id=f"{collection_name}_{i}", chunk_id="0", category=row.get('category', 'general'), source=row.get('source', file_path), created_at=datetime.now(), metadata={k: v for k, v in row.items() if k not in ['content', 'category', 'source']} ) doc_chunks.append(chunk) await self.rag_service.add_document_batch(doc_chunks, collection_name) Step 5: Testing and Validation Comprehensive Testing Suite A robust testing framework ensures your RAG system performs optimally: # tests/test_rag.pyasync def test_rag_system(): """Test the RAG system with sample queries""" print("🚀 Initializing RAG System...") # Initialize and load data rag_service = RAGService() loader = DataLoader(rag_service) await loader.load_sample_data() # Test queries test_queries = [ "What cloud storage options do you have?", "How much does CloudSync Pro cost?", "Can I cancel my subscription?", "Is my data encrypted?", "Do you offer enterprise solutions?" ] print("\n🔍 Testing RAG Search...") for query in test_queries: results = rag_service.hybrid_search(query, n_results=2) print(f"\n📝 Query: {query}") print(f" Found {len(results['results'])} results:") for i, result in enumerate(results['results']): print(f" {i+1}. Score: {result['score']:.3f} | {result['content'][:80]}...") print(f" Query Type: {results['query_analysis']['query_type']}") async def test_search_performance(): """Test search performance with multiple queries""" rag_service = RAGService() loader = DataLoader(rag_service) await loader.load_sample_data() queries = ["pricing information", "security features", "enterprise solutions"] * 20 start_time = time.time() for query in queries: results = rag_service.hybrid_search(query, n_results=3) total_time = time.time() - start_time avg_time = total_time / len(queries) print(f"📊 Processed {len(queries)} queries in {total_time:.2f} seconds") print(f"📊 Average query time: {avg_time*1000:.1f} ms") print(f"📊 Queries per second: {len(queries)/total_time:.1f}") Challenges and Best Practices Performance Optimization Challenge: Maintaining sub-1000ms response times while processing complex RAG queries. Solution: Implement several optimization strategies: 1. Batch Processing: Process multiple documents efficiently 2. Caching: Use Redis for frequent query results 3. Async Operations: Leverage Python’s asyncio for concurrent processing 4. Connection Pooling: Reuse database connections # Performance optimization exampleclass OptimizedRAGService(RAGService): def __init__(self): super().__init__() self.cache = redis.Redis(host='localhost', port=6379, db=0) self.cache_ttl = 3600 # 1 hour async def hybrid_search_cached(self, query: str, **kwargs) -> Dict[str, Any]: # Check cache first cache_key = f"rag_query:{hash(query + str(kwargs))}" cached_result = self.cache.get(cache_key) if cached_result: return json.loads(cached_result) # Perform search result = self.hybrid_search(query, **kwargs) # Cache result self.cache.setex(cache_key, self.cache_ttl, json.dumps(result)) return result Context Management Challenge: Maintaining conversation context across multiple turns while avoiding context window limits. Solution: Implement intelligent context pruning and summarization: def _manage_context_window(self, context: ConversationContext) -> ConversationContext: """Intelligently manage conversation context to stay within limits""" if len(context.conversation_history) > self.max_context_length: # Keep first few turns (introduction) and recent turns important_turns = context.conversation_history[:2] # Introduction recent_turns = context.conversation_history[-8:] # Recent context context.conversation_history = important_turns + recent_turns # Summarize older discussions if context.discussed_topics: context.discussed_topics = list(set(context.discussed_topics))[:10] return context Error Handling and Resilience Challenge: Ensuring system reliability when external services fail. Solution: Implement circuit breaker patterns and graceful degradation: class CircuitBreaker: def __init__(self, failure_threshold: int = 5, timeout: int = 60): self.failure_threshold = failure_threshold self.timeout = timeout self.failure_count = 0 self.last_failure_time = None self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN async def call(self, func, *args, **kwargs): if self.state == "OPEN": if time.time() - self.last_failure_time > self.timeout: self.state = "HALF_OPEN" else: raise Exception("Circuit breaker is open") try: result = await func(*args, **kwargs) self.reset() return result except Exception as e: self.record_failure() raise e def reset(self): self.failure_count = 0 self.state = "CLOSED" def record_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count >= self.failure_threshold: self.state = "OPEN" Security Considerations Challenge: Protecting against prompt injection and ensuring data privacy. Solution: Implement input validation and content filtering: class SecurityValidator: def __init__(self): self.dangerous_patterns = [ r"ignore previous instructions", r"system prompt", r"act as.*different", r"pretend.*you are" ] def validate_input(self, user_input: str) -> bool: """Validate user input for potential security issues""" # Check for prompt injection patterns for pattern in self.dangerous_patterns: if re.search(pattern, user_input.lower()): logger.warning(f"Potential prompt injection detected: {user_input}") return False # Length validation if len(user_input) > 1000: logger.warning(f"Input too long: {len(user_input)} characters") return False return True def sanitize_response(self, response: str) -> str: """Sanitize generated responses""" # Remove any potential system information sanitized = re.sub(r'(api[_\s]?key|token|password)', '[REDACTED]', response, flags=re.IGNORECASE) return sanitized Production Deployment Docker Configuration Deploy your RAG-powered call assistant using Docker for consistent environments: # DockerfileFROM python:3.11-slim WORKDIR /app # Install system dependenciesRUN apt-get update && apt-get install -y \ gcc \ g++ \ portaudio19-dev \ && rm -rf /var/lib/apt/lists/* # Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt # Copy application codeCOPY . . # Create data directoryRUN mkdir -p ./data EXPOSE 8000 # Health checkHEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 CMD ["python", "main.py"] Docker Compose for Multi-Service Setup # docker-compose.ymlversion: '3.8' services: app: build: . ports: - "8000:8000" environment: - REDIS_URL=redis://redis:6379 depends_on: - redis volumes: - ./data:/app/data - ./.env:/app/.env restart: unless-stopped redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data restart: unless-stopped command: redis-server --appendonly yes volumes: redis_data: Environment Configuration # Production environment variablesexport OPENAI_API_KEY="your_production_key"export TWILIO_ACCOUNT_SID="your_twilio_sid"export TWILIO_AUTH_TOKEN="your_twilio_token"export REDIS_URL="redis://your-redis-server:6379"export SESSION_TIMEOUT="3600"export MAX_CONCURRENT_CALLS="500" Monitoring and Analytics Performance Metrics Implement comprehensive monitoring to track system performance: # app/utils/metrics.pyfrom prometheus_client import Counter, Histogram, Gaugeimport time # Metricsrag_queries_total = Counter('rag_queries_total', 'Total RAG queries processed')rag_query_duration = Histogram('rag_query_duration_seconds', 'RAG query processing time')active_calls = Gauge('active_calls_total', 'Number of active calls')response_confidence = Histogram('response_confidence_score', 'Response confidence scores') class MetricsCollector: def __init__(self): self.start_time = time.time() def record_rag_query(self, query_time: float, confidence: float): rag_queries_total.inc() rag_query_duration.observe(query_time) response_confidence.observe(confidence) def update_active_calls(self, count: int): active_calls.set(count) Logging Strategy # app/utils/logging_config.pyimport loggingimport jsonfrom datetime import datetime class StructuredLogger: def __init__(self, name: str): self.logger = logging.getLogger(name) def log_conversation_event(self, event_type: str, call_id: str, user_input: str = None, response: str = None, confidence: float = None, sources: List[str] = None): """Log structured conversation events""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event_type": event_type, "call_id": call_id, "user_input": user_input, "response": response, "confidence_score": confidence, "sources_used": sources } self.logger.info(json.dumps(log_data)) Future Directions and Advanced Features Multi-Modal RAG Extend your system to handle images, documents, and other media types: class MultiModalRAGService(RAGService): def __init__(self): super().__init__() self.image_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") self.image_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") async def process_image_query(self, image_data: bytes, text_query: str) -> Dict[str, Any]: """Process queries that include images""" # Extract image features image_features = self._extract_image_features(image_data) # Combine with text search text_results = self.hybrid_search(text_query) image_results = self._search_by_image(image_features) # Merge and rank results return self._merge_multimodal_results(text_results, image_results) Real-Time Learning Implement systems that learn from successful conversations: class AdaptiveRAGService(RAGService): def __init__(self): super().__init__() self.feedback_collector = FeedbackCollector() async def learn_from_conversation(self, conversation_data: Dict[str, Any]): """Learn from successful conversation patterns""" if conversation_data['success_rating'] > 0.8: # Extract successful response patterns successful_patterns = self._extract_patterns(conversation_data) # Update knowledge base with successful interactions await self._update_conversation_collection(successful_patterns) # Fine-tune response templates await self._update_response_templates(successful_patterns) Advanced Analytics Implement conversation analytics for business insights: class ConversationAnalytics: def __init__(self): self.sentiment_analyzer = pipeline("sentiment-analysis") self.topic_extractor = pipeline("zero-shot-classification") async def analyze_conversation(self, conversation_history: List[Dict]) -> Dict[str, Any]: """Analyze conversation for business insights""" # Sentiment analysis sentiments = [self.sentiment_analyzer(turn['content']) for turn in conversation_history] # Topic extraction topics = self._extract_conversation_topics(conversation_history) # Intent classification intents = self._classify_customer_intents(conversation_history) return { "sentiment_progression": sentiments, "main_topics": topics, "customer_intents": intents, "conversion_probability": self._calculate_conversion_probability(conversation_history) } Conclusion: The Future of Intelligent Call Assistants Building RAG-powered call assistants represents a significant leap forward in conversational AI. By combining the power of large language models with dynamic knowledge retrieval, we can create systems that are not just responsive, but truly intelligent and contextually aware. Key Takeaways for Developers 1. Architecture Matters: A well-structured, modular codebase is essential for maintaining and scaling RAG systems 2. Performance is Critical: Sub-second response times require careful optimization of every component 3. Context is King: Intelligent conversation management makes the difference between a bot and an assistant 4. Reliability is Non-Negotiable: Implement robust error handling and fallback mechanisms 5. Security is Paramount: Protect against prompt injection and ensure data privacy Next Steps 1. Start Small: Begin with a focused knowledge domain and expand gradually 2. Measure Everything: Implement comprehensive monitoring and analytics from day one 3. Iterate Rapidly: Use feedback to continuously improve response quality 4. Plan for Scale: Design your architecture to handle growth in users and knowledge 5. Stay Current: Keep up with advances in RAG techniques and LLM capabilities The future of customer service lies in intelligent, context-aware AI assistants that can understand, learn, and adapt. By following the patterns and practices outlined in this guide, you’re well-equipped to build the next generation of conversational AI systems. Resources and Further Reading • ChromaDB Documentation: https://docs.trychroma.com/ • FastAPI Documentation: https://fastapi.tiangolo.com/ • Twilio Media Streams: https://www.twilio.com/docs/voice/media-streams • OpenAI API Documentation: https://platform.openai.com/docs Ready to build your own intelligent call assistant? Clone the complete project repository and start experimenting with RAG-powered conversations today. The future of customer service is intelligent, contextual, and just a few lines of code away.
Connecting FastAPI and PostgreSQL Across Separate Docker Compose Files: A Developer’s Journey
Ever found yourself staring at Docker containers that refuse to talk to each other? Let me share a story that might sound familiar. Last Tuesday, I was knee-deep in microservices architecture (because apparently, I enjoy making my life complicated), and I hit a wall that many of us have faced: I had my shiny FastAPI backend running in one Docker Compose setup, my trusty PostgreSQL database humming along in another, and they were acting like strangers at a party who refuse to make eye contact. If you’re reading this, chances are you’re in the same boat. Maybe you’re following a microservices pattern, or perhaps you just want to keep your database and API services cleanly separated. Whatever your reason, I’ve got your back. The Problem: When Containers Live in Isolation Here’s what I started with — sound familiar? Backend Setup (docker-compose-backend.yml): version: '3.8'services: fastapi: build: . ports: - "8000:8000" environment: - ENVIRONMENT=development Database Setup (docker-compose-database.yml): version: '3.8'services: postgres: image: postgres:15 environment: POSTGRES_DB: myapp POSTGRES_USER: developer POSTGRES_PASSWORD: secretpassword ports: - "5432:5432" Looks reasonable, right? Both services start up fine, but when FastAPI tries to connect to PostgreSQL, it’s like they’re speaking different languages. The containers are isolated in their own networks, blissfully unaware of each other’s existence. The Real-World Project: TaskMaster Instead of showing you another “Hello World” example, let me walk you through how I solved this problem while building TaskMaster — a task management API that I built for a client. This is a real application with authentication, database relationships, and all the complexity you’ll face in production. TaskMaster needed: User registration and JWT authentication Task creation with categories and priorities Real database relationships Production-ready error handling Health checks and monitoring Perfect for demonstrating Docker networking in action. Solution 1: Docker Networks — The Professional Approach After some coffee and Stack Overflow diving, I discovered Docker networks. Think of them as virtual meeting rooms where your containers can actually have conversations. Step 1: Create a Shared Network First, let’s create a network that both our services can join: docker network create taskmaster-network It’s that simple. Now we have a virtual space where our containers can find each other. Step 2: The Database Setup Here’s the production-ready database configuration I used for TaskMaster: # docker-compose-database.ymlversion: '3.8'services: taskmaster-db: image: postgres:15 container_name: taskmaster_postgres environment: POSTGRES_DB: taskmaster POSTGRES_USER: taskuser POSTGRES_PASSWORD: supersecretpassword POSTGRES_HOST_AUTH_METHOD: trust # Development only! ports: - "5432:5432" volumes: - taskmaster_data:/var/lib/postgresql/data - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql networks: - taskmaster-network healthcheck: test: ["CMD-SHELL", "pg_isready -U taskuser -d taskmaster"] interval: 30s timeout: 10s retries: 3 restart: unless-stoppedvolumes: taskmaster_data:networks: taskmaster-network: external: true Step 3: The FastAPI Backend Configuration # docker-compose-backend.ymlversion: '3.8'services: taskmaster-api: build: context: . dockerfile: Dockerfile container_name: taskmaster_fastapi ports: - "8000:8000" environment: - DATABASE_URL=postgresql://taskuser:supersecretpassword@taskmaster_postgres:5432/taskmaster - SECRET_KEY=your-secret-jwt-key-change-in-production - ENVIRONMENT=development networks: - taskmaster-network restart: unless-stopped depends_on: taskmaster_postgres: condition: service_healthynetworks: taskmaster-network: external: true Step 4: The Real FastAPI Application Here’s the actual TaskMaster code — not a toy example, but production-ready code: Database Models (models.py): from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Enumfrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import relationshipfrom sqlalchemy.dialects.postgresql import UUIDfrom datetime import datetimeimport uuidimport enumBase = declarative_base()class Priority(enum.Enum): low = "low" medium = "medium" high = "high" urgent = "urgent"class Status(enum.Enum): pending = "pending" in_progress = "in_progress" completed = "completed" cancelled = "cancelled"class User(Base): __tablename__ = "users" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) username = Column(String(50), unique=True, nullable=False, index=True) email = Column(String(100), unique=True, nullable=False, index=True) full_name = Column(String(100)) hashed_password = Column(String(255), nullable=False) is_active = Column(Boolean, default=True) created_at = Column(DateTime, default=datetime.utcnow) tasks = relationship("Task", back_populates="owner", cascade="all, delete-orphan")class Task(Base): __tablename__ = "tasks" id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) title = Column(String(200), nullable=False) description = Column(Text) priority = Column(Enum(Priority), default=Priority.medium) status = Column(Enum(Status), default=Status.pending) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) due_date = Column(DateTime) owner_id = Column(UUID(as_uuid=True), ForeignKey("users.id"), nullable=False) owner = relationship("User", back_populates="tasks") Database Connection (database.py): from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy.orm import sessionmakerimport osimport timeimport logging# Set up logginglogging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)DATABASE_URL = os.getenv( "DATABASE_URL", "postgresql://taskuser:supersecretpassword@localhost:5432/taskmaster")# Create engine with connection poolingengine = create_engine( DATABASE_URL, pool_size=10, max_overflow=20, pool_pre_ping=True, # Validates connections before use echo=True # Set to False in production)SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()def get_database(): """Dependency to get database session""" db = SessionLocal() try: yield db except Exception as e: logger.error(f"Database error: {e}") db.rollback() raise finally: db.close()def wait_for_database(max_retries=30, delay=1): """Wait for database to be ready""" retries = 0 while retries < max_retries: try: # Try to create a connection connection = engine.connect() connection.close() logger.info("Database connection successful!") return True except Exception as e: retries += 1 logger.warning(f"Database connection failed (attempt {retries}/{max_retries}): {e}") time.sleep(delay) logger.error("Could not connect to database after maximum retries") return False Main FastAPI Application (main.py): from fastapi import FastAPI, Depends, HTTPException, statusfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentialsfrom sqlalchemy.orm import Sessionfrom pydantic import BaseModel, EmailStrfrom passlib.context import CryptContextfrom jose import JWTError, jwtfrom datetime import datetime, timedeltaimport loggingfrom database import get_database, wait_for_database, enginefrom models import Base, User, Task, Priority, Statuslogger = logging.getLogger(__name__)# Security setupSECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")ALGORITHM = "HS256"ACCESS_TOKEN_EXPIRE_MINUTES = 30pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")security = HTTPBearer()app = FastAPI( title="TaskMaster API", version="1.0.0", description="A production-ready task management API")# Pydantic modelsclass UserCreate(BaseModel): username: str email: EmailStr password: str full_name: str = Noneclass UserLogin(BaseModel): username: str password: strclass TaskCreate(BaseModel): title: str description: str = None priority: Priority = Priority.medium due_date: datetime = Noneclass TaskResponse(BaseModel): id: str title: str description: str priority: Priority status: Status created_at: datetime due_date: datetime = None class Config: from_attributes = True# Authentication utilitiesdef verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password)def get_password_hash(password): return pwd_context.hash(password)def create_access_token(data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwtasync def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_database)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(credentials.credentials, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception except JWTError: raise credentials_exception user = db.query(User).filter(User.username == username).first() if user is None: raise credentials_exception return user@app.on_event("startup")async def startup_event(): """Initialize database connection on startup""" logger.info("Starting TaskMaster API...") if not wait_for_database(): raise Exception("Could not connect to database") # Create tables Base.metadata.create_all(bind=engine) logger.info("Database tables created successfully")@app.get("/")async def root(): return {"message": "Welcome to TaskMaster API! 🚀", "status": "operational"}@app.get("/health")async def health_check(db: Session = Depends(get_database)): """Health check endpoint that tests database connection""" try: # Simple query to test connection result = db.execute("SELECT 1") return {"status": "healthy", "database": "connected", "timestamp": datetime.utcnow()} except Exception as e: logger.error(f"Health check failed: {e}") raise HTTPException(status_code=503, detail="Database connection failed")@app.post("/register")async def register(user: UserCreate, db: Session = Depends(get_database)): """Register a new user""" # Check if user exists if db.query(User).filter(User.username == user.username).first(): raise HTTPException(status_code=400, detail="Username already registered") if db.query(User).filter(User.email == user.email).first(): raise HTTPException(status_code=400, detail="Email already registered") # Create new user hashed_password = get_password_hash(user.password) db_user = User( username=user.username, email=user.email, full_name=user.full_name, hashed_password=hashed_password ) db.add(db_user) db.commit() db.refresh(db_user) return {"message": "User registered successfully", "user_id": str(db_user.id)}@app.post("/login")async def login(user: UserLogin, db: Session = Depends(get_database)): """Login user and return JWT token""" db_user = db.query(User).filter(User.username == user.username).first() if not db_user or not verify_password(user.password, db_user.hashed_password): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password" ) access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": db_user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"}@app.post("/tasks", response_model=TaskResponse)async def create_task( task: TaskCreate, current_user: User = Depends(get_current_user), db: Session = Depends(get_database)): """Create a new task""" db_task = Task( title=task.title, description=task.description, priority=task.priority, due_date=task.due_date, owner_id=current_user.id ) db.add(db_task) db.commit() db.refresh(db_task) return db_task@app.get("/tasks", response_model=list[TaskResponse])async def get_tasks( current_user: User = Depends(get_current_user), db: Session = Depends(get_database), status_filter: Status = None, priority_filter: Priority = None): """Get user's tasks with optional filtering""" query = db.query(Task).filter(Task.owner_id == current_user.id) if status_filter: query = query.filter(Task.status == status_filter) if priority_filter: query = query.filter(Task.priority == priority_filter) tasks = query.order_by(Task.created_at.desc()).all() return tasks@app.put("/tasks/{task_id}/status")async def update_task_status( task_id: str, new_status: Status, current_user: User = Depends(get_current_user), db: Session = Depends(get_database)): """Update task status""" task = db.query(Task).filter( Task.id == task_id, Task.owner_id == current_user.id ).first() if not task: raise HTTPException(status_code=404, detail="Task not found") task.status = new_status task.updated_at = datetime.utcnow() db.commit() return {"message": "Task status updated successfully"} Step 5: Testing Our Real TaskMaster API Now comes the moment of truth. Let’s start our services and test the real functionality: # Start the database firstdocker-compose -f docker-compose-database.yml up -d# Wait for postgres to initialize (check the logs)docker-compose -f docker-compose-database.yml logs -f taskmaster-db# Once you see "database system is ready to accept connections"# Start the backenddocker-compose -f docker-compose-backend.yml up -d# Check the logs to see if connection workeddocker-compose -f docker-compose-backend.yml logs -f taskmaster-api Testing the API endpoints: # Health checkcurl http://localhost:8000/health# Register a new usercurl -X POST "http://localhost:8000/register" \ -H "Content-Type: application/json" \ -d '{ "username": "testuser", "email": "test@example.com", "password": "testpassword123", "full_name": "Test User" }'# Login to get JWT tokencurl -X POST "http://localhost:8000/login" \ -H "Content-Type: application/json" \ -d '{ "username": "testuser", "password": "testpassword123" }'# Use the token to create a task (replace YOUR_TOKEN with the actual token)curl -X POST "http://localhost:8000/tasks" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer YOUR_TOKEN" \ -d '{ "title": "Test Docker connection", "description": "Make sure FastAPI can talk to PostgreSQL", "priority": "high" }'# Get your taskscurl -X GET "http://localhost:8000/tasks" \ -H "Authorization: Bearer YOUR_TOKEN" When I first got this working, I literally did a little victory dance. Seeing that JSON response with my task data coming from PostgreSQL running in a separate container was magical. Expected successful response: [ { "id": "123e4567-e89b-12d3-a456-426614174000", "title": "Test Docker connection", "description": "Make sure FastAPI can talk to PostgreSQL", "priority": "high", "status": "pending", "created_at": "2024-03-15T10:30:00.000Z", "due_date": null }] Solution 2: The Host Network Approach (When Networks Feel Overkill) Sometimes you just want something simple that works. If you’re developing locally and don’t need the full network isolation, you can connect through the host: # docker-compose-backend.yml (Alternative approach)version: '3.8'services: taskmaster-api: build: . ports: - "8000:8000" environment: # Connect via host machine - DATABASE_URL=postgresql://taskuser:supersecretpassword@host.docker.internal:5432/taskmaster extra_hosts: - "host.docker.internal:host-gateway" # For Linux compatibility This approach uses your host machine as a bridge between containers. It’s simpler but less secure for production. The Dockerfile That Actually Works Here’s a Dockerfile that I’ve learned works reliably with database connections: FROM python:3.11-slimWORKDIR /app# Install system dependenciesRUN apt-get update && apt-get install -y \ gcc \ postgresql-client \ curl \ && rm -rf /var/lib/apt/lists/*# Copy requirements first (for better caching)COPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt# Copy application codeCOPY . .# Create non-root userRUN useradd -m -u 1000 appuser && chown -R appuser:appuser /appUSER appuser# Health checkHEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1EXPOSE 8000CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] The requirements.txt for TaskMaster: fastapi==0.104.1uvicorn[standard]==0.24.0sqlalchemy==2.0.23psycopg2-binary==2.9.9python-jose[cryptography]==3.3.0passlib[bcrypt]==1.7.4python-multipart==0.0.6pydantic[email]==2.5.0 Debugging: When Things Go Wrong (And They Will) Here are the commands that have saved my sanity more times than I can count: # Check if containers can see each otherdocker exec -it taskmaster_fastapi ping taskmaster_postgres# View container logsdocker-compose -f docker-compose-backend.yml logs -f taskmaster-apidocker-compose -f docker-compose-database.yml logs -f taskmaster-db# Check network connectivitydocker network lsdocker network inspect taskmaster-network# Connect to postgres directly to testdocker exec -it taskmaster_postgres psql -U taskuser -d taskmaster# Check if FastAPI can reach postgresdocker exec -it taskmaster_fastapi nc -zv taskmaster_postgres 5432# Test database connection from within containerdocker exec -it taskmaster_fastapi python -c "from database import wait_for_databaseprint('Database connection:', wait_for_database())" Production Lessons Learned from TaskMaster After running TaskMaster in production for 6 months, here are the real-world insights: 1. Monitoring is Critical I added health checks everywhere and monitoring with Prometheus: # Add to main.pyfrom prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATESTREQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint'])REQUEST_LATENCY = Histogram('http_request_duration_seconds', 'HTTP request latency')@app.middleware("http")async def add_metrics(request: Request, call_next): start_time = time.time() response = await call_next(request) process_time = time.time() - start_time REQUEST_COUNT.labels(method=request.method, endpoint=request.url.path).inc() REQUEST_LATENCY.observe(process_time) return response@app.get("/metrics")async def metrics(): return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST) 2. Database Backups Are Non-Negotiable I learned this the hard way when a container crashed and I almost lost client data: # Add to your database compose fileservices: taskmaster-db: # ... existing config ... volumes: - taskmaster_data:/var/lib/postgresql/data - ./backups:/backups # Backup service db-backup: image: postgres:15 container_name: taskmaster_backup environment: - PGPASSWORD=supersecretpassword volumes: - ./backups:/backups - /etc/localtime:/etc/localtime:ro networks: - taskmaster-network command: | bash -c " while true; do echo 'Creating backup...' pg_dump -h taskmaster_postgres -U taskuser taskmaster > /backups/taskmaster_$(date +%Y%m%d_%H%M%S).sql find /backups -name '*.sql' -mtime +7 -delete # Keep backups for 7 days sleep 86400 # Run daily done " depends_on: - taskmaster-db 3. Load Testing Revealed Surprises I used locust to load test TaskMaster and discovered bottlenecks I never expected: # locustfile.pyfrom locust import HttpUser, task, betweenimport jsonclass TaskMasterUser(HttpUser): wait_time = between(1, 3) def on_start(self): self.register_and_login() def register_and_login(self): # Register user user_data = { "username": f"loadtest_{self.user_id}", "email": f"@example.com">loadtest_{self.user_id}@example.com", "password": "testpass123", "full_name": "Load Test User" } self.client.post("/register", json=user_data) # Login and get token login_response = self.client.post("/login", json={ "username": user_data["username"], "password": "testpass123" }) self.token = login_response.json()["access_token"] self.headers = {"Authorization": f"Bearer {self.token}"} @task(3) def get_tasks(self): self.client.get("/tasks", headers=self.headers) @task(1) def create_task(self): task_data = { "title": "Load test task", "description": "Testing system load", "priority": "medium" } self.client.post("/tasks", json=task_data, headers=self.headers) @task(1) def health_check(self): self.client.get("/health") Run with: locust -f locustfile.py --host=http://localhost:8000 This revealed that my connection pool was too small and JWT token validation was a bottleneck. The Alternative: Single Compose File After building TaskMaster with separate compose files, my client asked: “Why not just put everything in one file?” Good question. Here’s when I recommend each approach: Use separate compose files when: Different teams manage database and API Different deployment schedules Production scaling needs Microservices architecture Use single compose file when: Small team (2–3 developers) Simple application architecture Development environment only Quick prototyping Here’s how TaskMaster would look as a single file: # docker-compose-taskmaster-all.ymlversion: '3.8'services: taskmaster-db: image: postgres:15 container_name: taskmaster_postgres environment: POSTGRES_DB: taskmaster POSTGRES_USER: taskuser POSTGRES_PASSWORD: supersecretpassword volumes: - taskmaster_data:/var/lib/postgresql/data - ./database/init.sql:/docker-entrypoint-initdb.d/init.sql healthcheck: test: ["CMD-SHELL", "pg_isready -U taskuser -d taskmaster"] interval: 30s timeout: 10s retries: 3 networks: - taskmaster-network taskmaster-api: build: . container_name: taskmaster_fastapi ports: - "8000:8000" environment: - DATABASE_URL=postgresql://taskuser:supersecretpassword@taskmaster-db:5432/taskmaster - SECRET_KEY=your-secret-jwt-key-change-in-production - ENVIRONMENT=development depends_on: taskmaster-db: condition: service_healthy networks: - taskmaster-network restart: unless-stopped # Optional: Add Redis for caching/sessions redis: image: redis:7-alpine container_name: taskmaster_redis networks: - taskmaster-network command: redis-server --appendonly yes volumes: - redis_data:/datavolumes: taskmaster_data: redis_data:networks: taskmaster-network: driver: bridge Starting everything with one command: docker-compose -f docker-compose-taskmaster-all.yml up -d Production Architecture: What TaskMaster Looks Like Today After all the iterations, here’s the production-ready setup I’m proud of: TaskMaster Production Architecture┌─────────────────┐ ┌──────────────────┐ ┌─────────────────┐│ Load Balancer │────│ FastAPI API │────│ PostgreSQL ││ (Nginx) │ │ (4 workers) │ │ (Primary) │└─────────────────┘ └──────────────────┘ └─────────────────┘ │ │ ┌──────────────────┐ │ │ Redis │ │ │ (Cache/Queue) │ │ └──────────────────┘ │ │ ┌─────────────────┐ │ PostgreSQL │ │ (Read Replica)│ └─────────────────┘ Each component runs in its own Docker Compose stack, connected through Docker networks. It’s served over 2 million API requests without a hitch. Production Security Improvements: # Production security improvementsservices: taskmaster-db: environment: - POSTGRES_HOST_AUTH_METHOD=md5 # Not 'trust' - POSTGRES_INITDB_ARGS=--auth-host=md5 command: | postgres -c log_statement=all -c log_min_duration_statement=1000 -c max_connections=100 -c shared_preload_libraries=pg_stat_statements deploy: resources: limits: cpus: '1.0' memory: 1G reservations: cpus: '0.5' memory: 512M taskmaster-api: deploy: resources: limits: cpus: '0.5' memory: 512M reservations: cpus: '0.25' memory: 256M Common Pitfalls and How to Avoid Them 1. Container Name vs Service Name Confusion # ❌ Wrong - using service name when you need container name- DATABASE_URL=postgresql://user:pass@taskmaster-db:5432/db# ✅ Correct - using container name for cross-compose communication- DATABASE_URL=postgresql://user:pass@taskmaster_postgres:5432/db 2. Network Creation Timing # ❌ Wrong - containers fail to find networkdocker-compose -f docker-compose-backend.yml up -ddocker-compose -f docker-compose-database.yml up -d# ✅ Correct - create network firstdocker network create taskmaster-networkdocker-compose -f docker-compose-database.yml up -ddocker-compose -f docker-compose-backend.yml up -d 3. Database Connection Timing # ❌ Wrong - no retry logicdef connect_to_database(): engine = create_engine(DATABASE_URL) return engine.connect() # Fails if DB not ready# ✅ Correct - with retry logic (as shown in our wait_for_database function)def wait_for_database(max_retries=30, delay=1): for attempt in range(max_retries): try: connection = engine.connect() connection.close() return True except Exception as e: time.sleep(delay) return False Troubleshooting Guide: When Things Break Error: “Could not translate host name” # Problem: Container can't resolve other container's name# Solution: Check network configurationdocker network inspect taskmaster-networkdocker exec -it taskmaster_fastapi nslookup taskmaster_postgres Error: “Connection refused on port 5432” # Problem: PostgreSQL not ready or wrong port# Solution: Check database health and port mappingdocker-compose -f docker-compose-database.yml logs taskmaster-dbdocker exec -it taskmaster_postgres pg_isready -U taskuser Error: “Authentication failed” # Problem: Wrong credentials or auth method# Solution: Check environment variables and pg_hba.confdocker exec -it taskmaster_postgres cat /var/lib/postgresql/data/pg_hba.conf Performance Optimization Tips 1. Connection Pool Tuning Based on TaskMaster’s production metrics: # Optimized connection pool settingsengine = create_engine( DATABASE_URL, pool_size=20, # Base connections max_overflow=30, # Additional connections under load pool_pre_ping=True, # Validate connections pool_recycle=3600, # Recycle connections every hour echo=False # Disable SQL logging in production) 2. Database Indexing -- Add these indexes for better TaskMaster performanceCREATE INDEX CONCURRENTLY idx_tasks_owner_status ON tasks(owner_id, status);CREATE INDEX CONCURRENTLY idx_tasks_created_at ON tasks(created_at DESC);CREATE INDEX CONCURRENTLY idx_users_username ON users(username);CREATE INDEX CONCURRENTLY idx_users_email ON users(email); 3. FastAPI Optimization # Add to main.py for better performancefrom fastapi.middleware.gzip import GZipMiddlewarefrom fastapi.middleware.trustedhost import TrustedHostMiddlewareapp.add_middleware(GZipMiddleware, minimum_size=1000)app.add_middleware(TrustedHostMiddleware, allowed_hosts=["*"]) # Configure for production# Enable response caching for read-only endpointsfrom functools import lru_cache@lru_cache(maxsize=100)def get_user_task_count(user_id: str, db: Session): return db.query(Task).filter(Task.owner_id == user_id).count() Environment-Specific Configurations Development Environment # docker-compose-dev.ymlversion: '3.8'services: taskmaster-api: build: . volumes: - .:/app # Hot reload for development environment: - ENVIRONMENT=development - DEBUG=true - LOG_LEVEL=DEBUG command: ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] Production Environment # docker-compose-prod.ymlversion: '3.8'services: taskmaster-api: image: taskmaster-api:latest # Pre-built image environment: - ENVIRONMENT=production - DEBUG=false - LOG_LEVEL=INFO - WORKERS=4 command: ["gunicorn", "main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"] deploy: replicas: 3 resources: limits: memory: 512M reservations: memory: 256M Testing Strategy Integration Testing # test_integration.pyimport pytestimport requestsfrom sqlalchemy import create_enginefrom sqlalchemy.orm import sessionmakerfrom testcontainers.postgres import PostgresContainerfrom testcontainers.compose import DockerCompose@pytest.fixture(scope="session")def docker_services(): with DockerCompose(".", compose_file_name="docker-compose-test.yml") as compose: compose.wait_for("http://localhost:8001/health") yield composedef test_full_user_workflow(docker_services): base_url = "http://localhost:8001" # Register user register_response = requests.post(f"{base_url}/register", json={ "username": "testuser", "email": "test@example.com", "password": "testpass123", "full_name": "Test User" }) assert register_response.status_code == 200 # Login login_response = requests.post(f"{base_url}/login", json={ "username": "testuser", "password": "testpass123" }) assert login_response.status_code == 200 token = login_response.json()["access_token"] # Create task headers = {"Authorization": f"Bearer {token}"} task_response = requests.post(f"{base_url}/tasks", json={"title": "Test Task", "description": "Integration test"}, headers=headers ) assert task_response.status_code == 200 # Get tasks tasks_response = requests.get(f"{base_url}/tasks", headers=headers) assert tasks_response.status_code == 200 assert len(tasks_response.json()) == 1 Docker Compose for Testing # docker-compose-test.ymlversion: '3.8'services: test-db: image: postgres:15 environment: POSTGRES_DB: taskmaster_test POSTGRES_USER: testuser POSTGRES_PASSWORD: testpass tmpfs: - /var/lib/postgresql/data # In-memory for faster tests test-api: build: . ports: - "8001:8000" environment: - DATABASE_URL=postgresql://testuser:testpass@test-db:5432/taskmaster_test - SECRET_KEY=test-secret-key depends_on: - test-db CI/CD Pipeline GitHub Actions Workflow # .github/workflows/ci.ymlname: TaskMaster CI/CDon: push: branches: [ main, develop ] pull_request: branches: [ main ]jobs: test: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Create Docker network run: docker network create taskmaster-network - name: Start test services run: | docker-compose -f docker-compose-test.yml up -d sleep 10 # Wait for services to be ready - name: Run tests run: | docker-compose -f docker-compose-test.yml exec -T test-api pytest tests/ -v - name: Run integration tests run: pytest test_integration.py -v - name: Clean up run: docker-compose -f docker-compose-test.yml down -v deploy: needs: test runs-on: ubuntu-latest if: github.ref == 'refs/heads/main' steps: - name: Deploy to production run: | # Your deployment script here echo "Deploying TaskMaster to production..." Monitoring and Observability Logging Configuration # logging_config.pyimport loggingimport sysfrom pythonjsonlogger import jsonloggerdef setup_logging(): # Create a custom formatter formatter = jsonlogger.JsonFormatter( '%(asctime)s %(name)s %(levelname)s %(message)s' ) # Configure root logger root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # Console handler console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) root_logger.addHandler(console_handler) # Database query logging logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING) Health Check Enhancement @app.get("/health/detailed")async def detailed_health_check(db: Session = Depends(get_database)): """Detailed health check with component status""" health_status = { "status": "healthy", "timestamp": datetime.utcnow(), "components": {} } # Database health try: db_start = time.time() result = db.execute("SELECT 1") db_latency = (time.time() - db_start) * 1000 # ms health_status["components"]["database"] = { "status": "healthy", "latency_ms": round(db_latency, 2) } except Exception as e: health_status["status"] = "unhealthy" health_status["components"]["database"] = { "status": "unhealthy", "error": str(e) } # Memory usage import psutil memory_usage = psutil.virtual_memory().percent health_status["components"]["memory"] = { "status": "healthy" if memory_usage < 80 else "warning", "usage_percent": memory_usage } return health_status Security Best Practices Environment Variables Management # .env file (never commit to git)DATABASE_URL=postgresql://taskuser:$(cat /run/secrets/db_password)@taskmaster_postgres:5432/taskmasterSECRET_KEY=$(cat /run/secrets/jwt_secret)ENVIRONMENT=production# Docker secrets in productiondocker secret create db_password db_password.txtdocker secret create jwt_secret jwt_secret.txt Production Dockerfile with Security FROM python:3.11-slim# Security updatesRUN apt-get update && apt-get upgrade -y \ && apt-get install -y --no-install-recommends \ gcc \ postgresql-client \ curl \ && rm -rf /var/lib/apt/lists/*WORKDIR /app# Non-root userRUN groupadd -r appuser && useradd -r -g appuser -u 1001 appuser# Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir --upgrade pip \ && pip install --no-cache-dir -r requirements.txt# Copy applicationCOPY --chown=appuser:appuser . .# Switch to non-root userUSER appuser# Security headers and configurationENV PYTHONDONTWRITEBYTECODE=1 \ PYTHONUNBUFFERED=1 \ PYTHONHASHSEED=randomEXPOSE 8000HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"] Wrapping Up: Your Next Steps Building TaskMaster taught me that connecting FastAPI and PostgreSQL across Docker Compose files isn’t just about the technical setup — it’s about building maintainable, scalable systems that can evolve with your needs. Key Takeaways: Docker Networks are Your Friend: They provide the cleanest, most production-ready way to connect containers across compose files. Start Simple, Evolve Gradually: Begin with basic connectivity, then add monitoring, security, and scaling as needed. Test Everything: From database connections to load testing, comprehensive testing saves you from production headaches. Monitor From Day One: Health checks, logging, and metrics aren’t optional — they’re essential for maintaining reliable systems. Security is Non-Negotiable: Use proper authentication, secure secrets management, and follow Docker security best practices. What You Should Do Next: If you’re just getting started: Clone the TaskMaster repository and run it locally Experiment with the Docker network setup Try the API endpoints and see the database integration in action Break things intentionally and learn how to debug them If you’re ready for production: Implement proper secrets management Set up monitoring and alerting Create automated backup strategies Plan your scaling strategy Load test your application Remember: The best architecture is the one that works for your team and scales with your needs. TaskMaster started as a simple todo API and evolved into a production system serving thousands of users. Your journey will be unique, but the principles remain the same. Resources and Further Reading: TaskMaster GitHub Repository: Full source code with all examples Docker Networks Documentation: Deep dive into container networking FastAPI Production Guide: Official deployment recommendations PostgreSQL Performance Tuning: Database optimization techniques Container Security Best Practices: Securing your Docker deployments Final Thoughts After six months running TaskMaster in production, serving over 2 million API requests, and helping dozens of developers implement similar setups, I can confidently say that mastering Docker container communication is one of the most valuable skills you can develop as a backend developer. The techniques you’ve learned here — Docker networks, health checks, proper error handling, and production considerations — will serve you well beyond just FastAPI and PostgreSQL. These patterns apply to any microservices architecture, any database technology, and any containerized application. What’s your Docker networking story? Have you built something similar? What challenges did you face, and how did you solve them? I’d love to hear about your experiences and the creative solutions you’ve discovered. Keep building, keep learning, and remember: every expert was once a beginner who refused to give up. Happy coding! 🚀 Found this helpful? Follow me for more real-world development stories and practical tutorials. If you build something cool with TaskMaster or these Docker techniques, tag me — I love seeing what the community creates! Connect with me: GitHub: Follow for more open-source projects LinkedIn: Professional updates and development insights Twitter: Quick tips and development thoughts Support the Project: If TaskMaster helped you build something awesome, consider: ⭐ Starring the GitHub repository 📝 Contributing improvements or bug fixes 💬 Sharing your use case in the discussions 📖 Writing about your own Docker journey Until next time, keep containerizing! 🐳
Technical Skills & Tools
Frameworks, tools, and technologies I use to build solutions
Machine Learning & AI
LangChain, Cohere, ChromaDB, scikit-learn, NLTK, Pandas, NumPy, Matplotlib, Jupyter Notebook
Languages
Python, Go, JavaScript, PHP, SQL
Web Development
REST, gRPC, GraphQL, OpenAPI, FastAPI, React, Laravel
Cloud & DevOps
GCP, AWS, Docker, MongoDB, PostgreSQL, Git, GitHub, Linux
Frameworks & Tools
Starlette, WebSockets, Flask, Tornado, RabbitMQ, Meilisearch, Gel (EdgeDB)
Testing & Quality
Pytest, Unit Testing, Integration Testing, Code Quality, TDD
Education
Academic background and professional certifications that shaped my technical expertise.
Nepal Commerce Campus (NCC)
Bachelor in Information Management
Bachelor Degree · 2017 - 2021
Focused on producing IT professionals with strong management and technical skills, and a results-driven, socially responsible mindset.
Ambition College
Mangement (Computer Science)
NEB · 2014 - 2016
Focused on computer science, it combines business strategy and technical expertise. It equips students with skills in programming, databases, and leadership for tech-driven roles. This blend enables innovative solutions to complex business challenges.
Get In Touch
Let's discuss your next project or opportunity