Diwash Bhandari
</>
[]

Diwash Bhandari

Software Developer | AI Enthusiast

Versatile Software Developer with over years of experience in designing, developing, and testing high-quality software solutions.

Kathmandu, Nepal

About Me

My professional profile and what I bring to the table

Versatile Software Developer with over 4 years of experience in designing, developing, and testing high-quality software solutions. Strong expertise in implementing robust testing strategies to ensure code reliability, maintainability, and performance. Adept at collaborating across cross-functional teams to deliver user-focused, scalable applications that align with business objectives. Skilled in translating complex requirements into effective technical solutions, focusing on clarity, visualization, and continuous improvement. Known for delivering clean, efficient code and driving innovation in fast-paced development environments.

5+

Years of Experience

Since 2020

10+

Projects Completed

And counting...

4+

Technologies

Always learning

Clean Code

Writing maintainable, scalable, and efficient code with proper testing strategies

Innovation

Leveraging cutting-edge technologies and AI to solve complex business problems

Collaboration

Working effectively with cross-functional teams to deliver user-focused solutions

Work Experience

A journey of growth and innovation in software development

Software Engineer

Codavatar Tech Pvt. Ltd.

June 2023 - Present

Kalopul Commercial Building, Kalopul, Kathmandu

  • Designed and developed scalable RESTful APIs using FastAPI and Starlette, enabling high-throughput data handling and improved backend responsiveness for SaaS platforms.

  • Architected modular microservices to support distributed systems, simplifying deployments and enhancing maintainability across enterprise-grade applications.

  • Implemented real-time features with WebSockets and asynchronous programming, improving user experience and enabling live collaboration in multi-tenant environments.

  • Built robust internal frameworks and developer tooling to streamline onboarding, enforce standards, and reduce integration overhead across teams.

  • Worked extensively with technologies like PostgreSQL, Redis, gRPC, and GraphQL to deliver reliable, secure, and high-performance backend systems.

Software Engineer

Chuchuro Firm

May 2022 - July 2023

Sinamangal Kathmandu

  • Developed and maintained Python applications using Peewee ORM, Tornado Framework, RabbitMQ, and Meilisearch, enhancing performance and scalability.

  • Ensured high code quality by writing clean, maintainable, and testable Python code, and implemented rigorous testing with Pytest, improving software reliability.

  • Implemented RabbitMQ for asynchronous processing, optimizing system efficiency and throughput, and integrated Meilisearch to enhance search capabilities.

  • Contributed to QA efforts by combining manual and automated testing to effectively identify and resolve issues, leading to a more stable and user-friendly product.

Intern , Associate Software Engineer

Young Minds Creation (P) Ltd

December 2020 - April 2022

Young Minds Tower

  • Developed and maintained complex Laravel-based web applications, ensuring strong performance and scalability.

  • Wrote clean, maintainable, and testable PHP code, utilizing Laravel's built-in features to enhance application functionality and reliability.

  • Built and integrated RESTful APIs for seamless data exchange between systems, implementing security measures such as password hashing and encryption to protect data.

  • Extended application functionality by integrating third-party packages and libraries, contributing to a more versatile and feature-rich platform.

Graphic Designer

Pinches Artcore

May 2019 - Jan 2020

  • Creating designs for various mediums, such as print materials, digital platforms, and social media.

  • Designing logos, brochures, flyers, posters, and other marketing materials.

  • Working with clients to understand their design needs and preferences.

Featured Projects

A selection of my recent work

ResearchGen

AI-Powered Research Assistant Application Generator. Generates personalized research assistant applications using AI with flexible API key management and multiple AI provider support.

FastAPI
OpenAI GPT
Google Gemini

IntelliDocs AI

RAG-powered chatbot for customer support. Provides accurate, context-aware responses. Built with FastAPI and hexagonal architecture for maintainability and testability.

FastAPI
LangChain
ChromaDB

ResumeCraft

Collaborative resume builder using Starlette and WebSockets for real-time editing.

Starlette
WebSockets
JavaScript

Bash-based CI/CD pipeline

Lightweight CI/CD tool written in Bash that automates pull, test, Docker build, deploy, and notify steps.

Bash
Docker
CI/CD

SMS Spam Classifier

Real-time SMS spam detection app using FastAPI, Streamlit UI, and a Naive Bayes model.

FastAPI
Streamlit
Machine Learning

Disposable Email Checker

Fast, scalable API for detecting disposable emails with asynchronous validation and real-time stats.

Starlette
AsyncIO
API

Site Monitor App

Asynchronous Python-based tool for real-time website monitoring and uptime tracking.

Python
AsyncIO
Monitoring

News Classification

Machine learning model to classify news articles into categories using text analysis and ML pipelines.

Scikit-learn
NLP
Python

Stock Price Prediction

Forecasting stock prices using historical data and machine learning regression models.

Machine Learning
Pandas
Time Series

Flight Fare Prediction

Trained machine learning model to predict airline ticket prices from historical flight data.

Regression
Scikit-learn
Pandas

Recent Articles

Exploring ideas in tech, development, and beyond

How Anthropic’s Model Context Protocol is revolutionizing the way AI systems connect to the world The AI landscape is evolving rapidly, but there’s been a persistent problem: how do we let AI models safely and effectively interact with external tools, databases, and APIs? Enter the Model Context Protocol (MCP) — Anthropic’s open standard that’s changing the game. If you’ve ever wanted to build an AI agent that can query your database, read files, call APIs, or interact with your existing infrastructure, MCP is the missing piece you’ve been looking for. And with FastMCP, building these integrations in Python has never been easier. Let’s dive deep into what MCP is, how it works, and how you can start building powerful AI integrations today. The Problem MCP Solves Imagine you’re building an AI application. You want Claude to help analyze your company’s data, but that data lives in databases, spreadsheets, APIs, and various systems. How do you give Claude access without: Compromising security? Writing custom integration code for every use case? Maintaining dozens of different connection patterns? Traditional approaches have been fragmented. Every AI application had its own way of connecting to external systems, leading to duplicated effort and security concerns. MCP provides a universal answer: a standardized protocol that defines how AI applications connect to data sources and tools, similar to how USB-C standardized device connections. Understanding MCP Architecture MCP uses a clean three-part architecture: ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐│ │ │ │ │ ││ MCP Host │◄───────►│ MCP Client │◄───────►│ MCP Server ││ (Claude App) │ │ (Protocol) │ │ (Your Tools) ││ │ │ │ │ │└─────────────────┘ └─────────────────┘ └─────────────────┘ MCP Hosts are AI applications (like Claude Desktop, IDEs, or custom apps) that need to access external context. MCP Clients maintain connections to servers and handle the protocol communication. MCP Servers are lightweight programs you build that expose specific capabilities to the AI. This separation of concerns is brilliant. You write servers once, and any MCP-compatible host can use them. No vendor lock-in, complete flexibility. What Can MCP Servers Expose? An MCP server can provide three types of capabilities: 1. Resources Think of resources as readable data sources — files, database queries, API responses, configuration data. They’re like GET endpoints that the AI can request. 2. Tools Tools are functions the AI can call. These are executable operations — writing files, sending emails, running calculations, updating databases. This is where the real power lies. 3. Prompts Reusable prompt templates that can be invoked with parameters. Perfect for standardizing common AI interactions across your organization. Enter FastMCP: MCP Made Simple Building MCP servers from scratch involves handling JSON-RPC protocol details, managing connections, and writing boilerplate. FastMCP abstracts all of this away with a beautiful, decorator-based API that feels like FastAPI. Let’s see how elegant this is. Your First MCP Server in 10 Lines from fastmcp import FastMCPmcp = FastMCP("My First Server")@mcp.tool()def add_numbers(a: int, b: int) -&gt; int: """Add two numbers together""" return a + bif __name__ == "__main__": mcp.run() That’s it. You now have a working MCP server that exposes a calculator tool to any AI application. The @mcp.tool() decorator handles: Protocol serialization/deserialization Type validation Error handling Documentation generation Building Real-World Tools Let’s build something more practical — a database query tool. from fastmcp import FastMCPfrom pydantic import BaseModelmcp = FastMCP("Database Tools")class QueryResult(BaseModel): rows: list[dict] count: int execution_time: float@mcp.tool()async def query_database( sql: str, limit: int = 100) -&gt; QueryResult: """ Execute a SQL query against the database Args: sql: SQL query to execute limit: Maximum number of rows to return """ import time start = time.time() # Execute your query (with proper security!) results = await db.execute(sql, limit=limit) return QueryResult( rows=results, count=len(results), execution_time=time.time() - start ) Notice how we’re using: Async/await for non-blocking I/O Pydantic models for structured responses Type hints for automatic validation Docstrings that become tool descriptions The AI can now query your database naturally: “Show me the top 10 customers by revenue this month.” Exposing Resources Resources let the AI read data on-demand. Here’s a file system resource: @mcp.resource("file://logs/{date}")def get_logs(date: str) -&gt; str: """Get application logs for a specific date""" log_path = f"/var/log/app_{date}.log" if not os.path.exists(log_path): raise ValueError(f"No logs found for {date}") with open(log_path) as f: return f.read() The URI template file://logs/{date} makes this discoverable. The AI can now request file://logs/2025-11-02 and get those specific logs. Dynamic Configuration Resources @mcp.resource("config://{environment}/{service}")def get_service_config(environment: str, service: str) -&gt; str: """ Get configuration for a service in a specific environment Args: environment: prod, staging, or dev service: Service name (api, frontend, worker, etc.) """ config = load_config(environment, service) return json.dumps(config, indent=2) Now the AI can explore your entire configuration landscape by requesting different environment/service combinations. Creating Smart Prompts Prompts are reusable templates that structure AI interactions: @mcp.prompt()def code_review( file_path: str, focus_areas: list[str] = None) -&gt; list[dict]: """ Generate a code review prompt Args: file_path: Path to the code file focus_areas: Specific aspects to review (security, performance, etc.) """ code = read_file(file_path) language = Path(file_path).suffix[1:] focus = ", ".join(focus_areas) if focus_areas else "general code quality" return [{ "role": "user", "content": f"""Review this {language} code focusing on {focus}:```{language}{code}```Please provide:1. Overall assessment2. Specific issues found3. Recommendations for improvement4. Security considerations""" }] Teams can now invoke consistent code reviews across all their projects with code_review(file_path="src/auth.py", focus_areas=["security", "error handling"]). Advanced Patterns Managing Server Lifecycle Real applications need database connections, caches, and other resources: from contextlib import asynccontextmanager@asynccontextmanagerasync def app_lifespan(): """Manage server lifecycle""" # Startup db = await connect_database() cache = await Redis.connect() print("✓ Server ready") yield {"db": db, "cache": cache} # Cleanup await db.close() await cache.close() print("✓ Server shutdown")mcp = FastMCP("My Server", lifespan=app_lifespan)@mcp.tool()async def fetch_user(user_id: int, ctx: dict) -&gt; dict: """Fetch user with caching""" db = ctx["db"] cache = ctx["cache"] # Try cache first if cached := await cache.get(f"user:{user_id}"): return json.loads(cached) # Query database user = await db.fetch_one( "SELECT * FROM users WHERE id = ?", user_id ) # Cache for next time await cache.set( f"user:{user_id}", json.dumps(user), expire=300 ) return user Robust Error Handling from fastmcp.exceptions import ToolError@mcp.tool()async def fetch_api_data(url: str) -&gt; dict: """Fetch data from external API""" try: async with httpx.AsyncClient() as client: response = await client.get(url, timeout=10.0) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: raise ToolError( f"API returned {e.response.status_code}: {e.response.text}" ) except httpx.TimeoutException: raise ToolError(f"Request to {url} timed out") except Exception as e: raise ToolError(f"Unexpected error: {str(e)}") Proper error handling ensures the AI gets actionable feedback when things go wrong. Progress Reporting for Long Operations from fastmcp import Progress@mcp.tool()async def process_large_dataset( file_path: str, progress: Progress) -&gt; dict: """Process a large dataset with progress updates""" total_rows = count_rows(file_path) processed = 0 errors = 0 async with open_file(file_path) as f: async for row in f: try: await process_row(row) processed += 1 except Exception as e: errors += 1 # Update progress every 100 rows if processed % 100 == 0: await progress.update( processed=processed, total=total_rows, message=f"Processed {processed:,}/{total_rows:,} rows ({errors} errors)" ) return { "total": total_rows, "processed": processed, "errors": errors } A Complete Example: File System Server Let’s build a practical server that gives AI safe access to the file system: from fastmcp import FastMCPfrom pathlib import Pathfrom fastmcp.exceptions import ToolErrorimport osmcp = FastMCP("File System Server")# Define safe base directoryBASE_DIR = Path("/safe/workspace")def validate_path(path: str) -&gt; Path: """Ensure path is within allowed directory""" full_path = (BASE_DIR / path).resolve() if not str(full_path).startswith(str(BASE_DIR)): raise ToolError("Access denied: path outside workspace") return full_path@mcp.tool()def read_file(path: str) -&gt; str: """ Read contents of a file Args: path: Relative path from workspace root """ file_path = validate_path(path) if not file_path.exists(): raise ToolError(f"File not found: {path}") if not file_path.is_file(): raise ToolError(f"Not a file: {path}") return file_path.read_text()@mcp.tool()def write_file(path: str, content: str) -&gt; str: """ Write content to a file Args: path: Relative path from workspace root content: Content to write """ file_path = validate_path(path) file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_text(content) return f"✓ Wrote {len(content)} bytes to {path}"@mcp.tool()def list_directory(path: str = ".") -&gt; list[dict]: """ List contents of a directory Args: path: Relative path from workspace root """ dir_path = validate_path(path) if not dir_path.is_dir(): raise ToolError(f"Not a directory: {path}") items = [] for item in sorted(dir_path.iterdir()): stat = item.stat() items.append({ "name": item.name, "type": "directory" if item.is_dir() else "file", "size": stat.st_size if item.is_file() else None, "modified": stat.st_mtime }) return items@mcp.tool()def search_files(pattern: str, path: str = ".") -&gt; list[str]: """ Search for files matching a pattern Args: pattern: Glob pattern (e.g., "*.py", "**/*.json") path: Directory to search from """ dir_path = validate_path(path) matches = [ str(p.relative_to(BASE_DIR)) for p in dir_path.glob(pattern) if p.is_file() ] return sorted(matches)@mcp.resource("file://{path}")def get_file_resource(path: str) -&gt; str: """Expose files as readable resources""" return read_file(path)@mcp.prompt()def review_file(path: str, focus: str = "code quality") -&gt; list[dict]: """Generate a file review prompt""" content = read_file(path) extension = Path(path).suffix[1:] return [{ "role": "user", "content": f"""Review this {extension} file focusing on {focus}:**File:** {path}```{extension}{content}```Provide detailed feedback.""" }]if __name__ == "__main__": mcp.run() This server gives Claude controlled access to a workspace while preventing directory traversal attacks and other security issues. Testing Your MCP Servers FastMCP includes testing utilities: import pytestfrom fastmcp.testing import MCPTestClient@pytest.fixtureasync def client(): async with MCPTestClient(mcp) as client: yield client@pytest.mark.asyncioasync def test_read_file(client): # Create test file test_file = BASE_DIR / "test.txt" test_file.write_text("Hello, MCP!") # Test reading result = await client.call_tool("read_file", {"path": "test.txt"}) assert result == "Hello, MCP!" # Cleanup test_file.unlink()@pytest.mark.asyncioasync def test_list_directory(client): result = await client.call_tool("list_directory", {"path": "."}) assert isinstance(result, list) assert all("name" in item for item in result) How Clients Connect to Your Server While FastMCP focuses on building servers, understanding client connections completes the picture: from mcp import ClientSession, StdioServerParametersfrom mcp.client.stdio import stdio_client# Configure serverserver_params = StdioServerParameters( command="python", args=["file_system_server.py"], env={"WORKSPACE": "/safe/workspace"})# Connect and useasync with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: await session.initialize() # List available tools tools = await session.list_tools() print(f"Available tools: {[t.name for t in tools.tools]}") # Call a tool result = await session.call_tool( "read_file", {"path": "README.md"} ) print(result.content) Best Practices for Production 1. Security First # ✓ Good: Validate and sanitize inputsdef query_db(sql: str) -&gt; list: if any(keyword in sql.upper() for keyword in ["DROP", "DELETE", "UPDATE"]): raise ToolError("Only SELECT queries allowed") return execute_query(sql)# ✗ Bad: Direct execution without validationdef query_db(sql: str) -&gt; list: return execute_query(sql) # SQL injection risk! 2. Comprehensive Documentation @mcp.tool()def analyze_data( data: list[dict], metric: str, groupby: str = None) -&gt; dict: """ Analyze dataset and compute metrics Args: data: List of data records to analyze metric: Metric to compute (sum, avg, count, min, max) groupby: Optional field to group results by Returns: Analysis results with computed metrics Examples: - analyze_data(sales, "sum", "region") - analyze_data(users, "count", "signup_date") """ # Implementation 3. Idempotent Operations @mcp.tool()def create_user(email: str, name: str) -&gt; dict: """Create user (idempotent - safe to retry)""" # Check if exists first if user := db.get_user_by_email(email): return {"user_id": user.id, "created": False} # Create new user user = db.create_user(email=email, name=name) return {"user_id": user.id, "created": True} 4. Rate Limiting and Quotas from collections import defaultdictimport timecall_counts = defaultdict(list)@mcp.tool()async def call_expensive_api(query: str) -&gt; dict: """API call with rate limiting""" now = time.time() # Clean old entries call_counts[query] = [ t for t in call_counts[query] if now - t &lt; 60 ] # Check limit if len(call_counts[query]) &gt;= 10: raise ToolError("Rate limit: max 10 calls per minute") call_counts[query].append(now) return await external_api.call(query) 5. Observability import loggingfrom datetime import datetimelogger = logging.getLogger(__name__)@mcp.tool()async def important_operation(data: dict) -&gt; dict: """Operation with comprehensive logging""" operation_id = generate_id() logger.info( f"[{operation_id}] Starting operation", extra={"data_size": len(data)} ) try: result = await process(data) logger.info( f"[{operation_id}] Operation completed", extra={"result_size": len(result)} ) return result except Exception as e: logger.error( f"[{operation_id}] Operation failed: {e}", exc_info=True ) raise Real-World Use Cases Customer Support Agent Build an MCP server that connects to your CRM, ticketing system, and knowledge base. The AI can now: Look up customer history Search documentation Create and update tickets Access order information Data Analysis Assistant Expose your data warehouse through MCP. Analysts can ask questions in natural language: “What were our top-selling products last quarter?” “Show me customer churn by segment” “Compare revenue growth year-over-year” DevOps Copilot Connect to your infrastructure: Query logs and metrics Check service health Deploy applications Manage configurations Code Assistant Give AI access to your codebase: Search for implementations Analyze dependencies Generate documentation Suggest refactorings The Future of AI Integration MCP represents a fundamental shift in how we build AI applications. Instead of proprietary integrations, we now have an open standard that: Works across platforms — Write once, use everywhere Ensures security — Controlled, auditable access Scales elegantly — Add capabilities without complexity Promotes reuse — Share servers across teams and projects FastMCP makes this vision accessible. You can go from idea to working integration in minutes, not days. Getting Started Ready to build your first MCP server? Here’s how: # Install FastMCPpip install fastmcp# Create your servercat &gt; my_server.py &lt;&lt; 'EOF'from fastmcp import FastMCPmcp = FastMCP("My Server")@mcp.tool()def greet(name: str) -&gt; str: """Greet someone""" return f"Hello, {name}!"if __name__ == "__main__": mcp.run()EOF# Run itpython my_server.py Then configure Claude Desktop to use your server, and you’re off to the races. Key Takeaways MCP standardizes AI-to-tool communication — No more custom integrations for every use case FastMCP makes server development elegant — Decorator-based API, automatic serialization, type safety Three primitives cover everything — Resources (data), Tools (actions), Prompts (templates) Security and testing are built-in — Proper error handling, validation, and test utilities included The ecosystem is just beginning — As an open protocol, MCP will only grow more powerful The future of AI isn’t just smarter models — it’s smarter integration. MCP and FastMCP give you the building blocks to create AI systems that are powerful, secure, and maintainable. Now go build something amazing. Resources MCP Documentation FastMCP GitHub Claude Desktop MCP Guide Example MCP Servers Have questions or built something cool with MCP? Share in the comments below!

Docker Model Runner: The Game-Changer for Local AI Development — A Complete Developer’s Guide How Docker Desktop 4.40+ transforms AI development with zero-setup local LLM inference and OpenAI-compatible APIs Introduction: AI Development Just Got Simpler If you’ve ever spent hours wrestling with Python virtual environments, CUDA installations, and model downloads just to run a simple AI model locally, Docker Model Runner is about to change your life. This groundbreaking new feature, integrated directly into Docker Desktop 4.40+, brings Large Language Model (LLM) inference into your development workflow with literally zero setup complexity. The bottom line: Docker Model Runner provides OpenAI-compatible APIs for local AI models, eliminating the infrastructure headaches that have plagued AI development for years. 🚧 Beta Alert: Docker Model Runner is currently in beta (as of Docker Desktop 4.40+), representing Docker’s bold vision for the future of AI development. While in beta, it’s already production-capable and actively being adopted by forward-thinking development teams worldwide. 🌟 The Revolutionary Impact of Docker Model Runner A Paradigm Shift in AI Development Docker Model Runner isn’t just another tool — it’s a fundamental transformation in how we approach AI development. As a beta feature that’s already reshaping development workflows, it represents Docker’s vision for democratizing AI and making machine learning as accessible as deploying a web server. This is bigger than just convenience. Docker Model Runner is creating a new category of AI-first applications where local inference becomes the default, not the exception. Industry Impact and Adoption Trends 🏢 Enterprise Benefits: Cost Reduction: Eliminate expensive cloud API calls during development and testing Data Privacy: Keep sensitive data on-premises without external API dependencies Compliance: Meet strict regulatory requirements with fully local AI processing Performance: Sub-second response times without network latency Scalability: Independent scaling without per-request API costs 👥 Developer Community Impact: Democratization: AI development accessible to developers without ML expertise Innovation Acceleration: Rapid prototyping without infrastructure barriers Open Source Enablement: Local models become first-class citizens in open source projects Educational Access: Students and learners can experiment without cloud costs 🌍 Ecosystem Transformation: Hybrid Workflows: Seamless switching between local and cloud models based on requirements Model Distribution: OCI artifacts make AI models as distributable as container images DevOps Integration: AI inference becomes part of standard CI/CD pipelines 🎯 Why Docker Model Runner Matters for Developers The Old Way vs. The New Way Before Docker Model Runner: Install Python, manage virtual environments Download and configure CUDA drivers Manually download multi-gigabyte model files Set up inference servers (Ollama, vLLM, etc.) Deal with version conflicts and dependency hell With Docker Model Runner: docker model pull ai/smollm2 docker model run ai/smollm2 "Hello world" Done. 🎉 Key Benefits That Matter ✅ Zero Infrastructure Setup — Runs natively on your machine without additional servers or VMs ✅ OpenAI-Compatible API — Drop-in replacement for OpenAI API calls in existing applications ✅ GPU Acceleration — Optimized performance on Apple Silicon (M1/M2/M3) and NVIDIA GPUs ✅ OCI Artifact Distribution — Models are distributed as standard container artifacts ✅ Host-Based Execution — Maximum performance without virtualization overhead ✅ Beta Innovation — Access cutting-edge features and influence the future of AI tooling Real-World Benefits by Developer Type 🎯 For Full-Stack Developers: // Before: Complex setup, API keys, rate limitsconst response = await openai.chat.completions.create({...})// After: Local, unlimited, privateconst response = await localAI.chat.completions.create({...}) 🤖 For AI/ML Engineers: Rapid model experimentation without cloud costs A/B testing different models locally Custom model fine-tuning workflows Offline development capabilities 🏢 For Enterprise Teams: Compliance-friendly AI development Reduced operational costs Enhanced data privacy and security Predictable performance characteristics 🎓 For Students and Researchers: Free access to powerful AI models Unlimited experimentation without budget constraints Reproducible research environments Learning without cloud complexity The Beta Advantage: Being at the Forefront Why Join the Beta Community: 🔬 Early Access to Innovation: Experience next-generation AI tooling before widespread adoption 🗣️ Community Influence: Your feedback directly shapes the future of Docker’s AI strategy 📈 Competitive Edge: Build applications with capabilities your competitors don’t have yet 🛠️ Learning Opportunity: Master emerging technologies while they’re still forming Beta Considerations: Features may change based on community feedback Documentation and tooling continue to evolve Active development means rapid improvements and new capabilities Early adopter advantages in understanding and implementing AI workflows 🚀 Getting Started: Prerequisites and Setup System Requirements Docker Desktop 4.40+ Required (4.41+ for Windows GPU support) macOS: Apple Silicon (M1/M2/M3) for optimal performance Windows: NVIDIA GPU recommended for acceleration Linux: Docker Engine with Model Runner plugin Enabling Docker Model Runner Method 1: Docker Desktop GUI Open Docker Desktop Settings Navigate to Features in development → Beta Enable “Docker Model Runner” Apply &amp; Restart Docker Desktop Method 2: Command Line Interface # Enable Model Runnerdocker desktop enable model-runner# Enable with TCP support for host accessdocker desktop enable model-runner --tcp 12434# Verify installationdocker desktop status Method 3: Docker Engine (Linux) sudo apt-get updatesudo apt-get install docker-model-plugin 📋 Essential Commands Every Developer Should Know Model Management Pulling Models # Pull the latest version of a modeldocker model pull ai/smollm2# Pull specific model variantsdocker model pull ai/llama3.2:1bdocker model pull ai/qwen2.5:3b Listing Available Models # Show all locally available modelsdocker model ls Model Cleanup # Remove specific models to free spacedocker model rm ai/smollm2docker model rm ai/llama3.2:1b Running Models Interactive Inference # Quick one-shot inferencedocker model run ai/smollm2 "Explain Docker in one sentence"# Interactive conversation modedocker model run -it ai/smollm2 Model Inspection # Get detailed model informationdocker model inspect ai/smollm2 🔗 API Integration: OpenAI-Compatible Endpoints This is where Docker Model Runner truly shines. The OpenAI-compatible API means you can replace https://api.openai.com/v1 with your local endpoint and everything just works. Endpoint URLs From Docker Containers: http://model-runner.docker.internal/engines/llama.cpp/v1/ From Host Machine (TCP enabled): http://localhost:12434/engines/llama.cpp/v1/ Chat Completions API Examples cURL Example curl http://localhost:12434/engines/llama.cpp/v1/chat/completions \ -H "Content-Type: application/json" \ -d '{ "model": "ai/smollm2", "messages": [ { "role": "system", "content": "You are a helpful coding assistant specializing in containerization." }, { "role": "user", "content": "Write a Docker Compose file for a React app with PostgreSQL" } ], "temperature": 0.7, "max_tokens": 500 }' Python Integration import openai# Configure client for local Model Runnerclient = openai.OpenAI( base_url="http://model-runner.docker.internal/engines/llama.cpp/v1", api_key="not-needed" # Local inference doesn't require API key)def chat_with_local_model(prompt): response = client.chat.completions.create( model="ai/smollm2", messages=[ {"role": "system", "content": "You are a helpful DevOps assistant."}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=200 ) return response.choices[0].message.content# Example usageresult = chat_with_local_model("Explain containerization benefits for microservices")print(result) Node.js Integration import OpenAI from 'openai';const openai = new OpenAI({ baseURL: 'http://model-runner.docker.internal/engines/llama.cpp/v1', apiKey: 'not-needed'});async function generateCode(prompt) { try { const completion = await openai.chat.completions.create({ model: 'ai/smollm2', messages: [ { role: 'system', content: 'You are an expert software architect.' }, { role: 'user', content: prompt } ], temperature: 0.8, max_tokens: 300 }); return completion.choices[0].message.content; } catch (error) { console.error('Error generating code:', error); throw error; }}// Usage exampleconst architectureAdvice = await generateCode( 'Best practices for designing scalable Docker microservices?');console.log(architectureAdvice); FastAPI Integration Example from fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport openaiimport asyncioapp = FastAPI(title="Local AI API")# Configure local model clientlocal_client = openai.OpenAI( base_url="http://model-runner.docker.internal/engines/llama.cpp/v1", api_key="not-needed")class ChatRequest(BaseModel): message: str temperature: float = 0.7class ChatResponse(BaseModel): response: str model: str@app.post("/chat", response_model=ChatResponse)async def chat_endpoint(request: ChatRequest): try: response = local_client.chat.completions.create( model="ai/smollm2", messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": request.message} ], temperature=request.temperature, max_tokens=500 ) return ChatResponse( response=response.choices[0].message.content, model="ai/smollm2" ) except Exception as e: raise HTTPException(status_code=500, detail=str(e))@app.get("/models")async def list_models(): # This would typically call the local model runner API return {"models": ["ai/smollm2", "ai/llama3.2:1b"]} 🐳 Docker Compose Integration One of the most powerful features is the ability to integrate AI models directly into your Docker Compose workflows: Basic AI Service Integration version: '3.8'services: web-app: build: . ports: - "3000:3000" depends_on: - ai_service environment: - AI_BASE_URL=http://ai_service:8000 ai_service: provider: type: model options: model: ai/smollm2 # Optional configurations temperature: 0.7 max_tokens: 1000 database: image: postgres:15 environment: POSTGRES_DB: myapp POSTGRES_USER: user POSTGRES_PASSWORD: password Advanced Multi-Model Setup version: '3.8'services: api: build: ./api ports: - "8000:8000" environment: - CHAT_MODEL_URL=http://chat_model:8000 - CODE_MODEL_URL=http://code_model:8000 depends_on: - chat_model - code_model chat_model: provider: type: model options: model: ai/smollm2 context_length: 4096 code_model: provider: type: model options: model: ai/qwen2.5:3b temperature: 0.2 max_tokens: 2000 frontend: build: ./frontend ports: - "3000:3000" depends_on: - api 🛠️ Advanced API Endpoints and Model Management Docker Model Management API Docker Model Runner provides comprehensive REST APIs for model management: # Create/pull a modelPOST /models/createContent-Type: application/json{ "name": "ai/smollm2", "tag": "latest"}# List all modelsGET /models# Get specific model infoGET /models/ai/smollm2# Delete a modelDELETE /models/ai/smollm2 OpenAI-Compatible Endpoints # List available modelsGET /engines/llama.cpp/v1/models# Get specific model detailsGET /engines/llama.cpp/v1/models/ai/smollm2# Chat completions (primary endpoint)POST /engines/llama.cpp/v1/chat/completions# Legacy completionsPOST /engines/llama.cpp/v1/completions# Generate embeddingsPOST /engines/llama.cpp/v1/embeddings Embeddings API Example import openaiclient = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed")# Generate embeddings for semantic searchdef get_embeddings(texts): response = client.embeddings.create( model="ai/smollm2", input=texts ) return [embedding.embedding for embedding in response.data]# Example usagedocuments = [ "Docker containers provide isolation and portability", "Kubernetes orchestrates containerized applications", "Model Runner simplifies AI development workflows"]embeddings = get_embeddings(documents)print(f"Generated {len(embeddings)} embeddings") 🏗️ Real-World Development Patterns Pattern 1: Local Development with Fallback import openaiimport osfrom typing import Optionalclass AIClient: def __init__(self): self.local_available = self._check_local_model() def _check_local_model(self) -&gt; bool: try: client = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed", timeout=5.0 ) client.models.list() return True except: return False def chat(self, messages, temperature=0.7): if self.local_available: return self._chat_local(messages, temperature) else: return self._chat_remote(messages, temperature) def _chat_local(self, messages, temperature): client = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) return client.chat.completions.create( model="ai/smollm2", messages=messages, temperature=temperature ) def _chat_remote(self, messages, temperature): client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY")) return client.chat.completions.create( model="gpt-3.5-turbo", messages=messages, temperature=temperature )# Usageai = AIClient()response = ai.chat([ {"role": "user", "content": "Explain Docker layers"}]) Pattern 2: Model-Specific Routing class MultiModelAI: def __init__(self): self.client = openai.OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) def chat_general(self, prompt): return self._complete("ai/smollm2", prompt, temperature=0.7) def generate_code(self, prompt): return self._complete("ai/qwen2.5:3b", prompt, temperature=0.2) def creative_writing(self, prompt): return self._complete("ai/llama3.2:1b", prompt, temperature=0.9) def _complete(self, model, prompt, temperature): return self.client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=temperature ) 🔧 Troubleshooting Common Issues Model Not Found Errors # Check available modelsdocker model ls# Pull the model if missingdocker model pull ai/smollm2# Verify model is runningdocker ps | grep model Connection Issues # Check Docker Model Runner statusdocker desktop status# Verify TCP port is enabled (for host access)docker desktop enable model-runner --tcp 12434# Test connectivitycurl http://localhost:12434/engines/llama.cpp/v1/models Performance Optimization # Monitor resource usagedocker stats# Check GPU utilization (if available)nvidia-smi # For NVIDIA GPUs# macOS Activity Monitor for Apple Silicon Memory Management # Implement connection pooling for high-throughput applicationsfrom openai import OpenAIimport threadingclass PooledAIClient: def __init__(self, pool_size=5): self._clients = [] self._lock = threading.Lock() for _ in range(pool_size): client = OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) self._clients.append(client) def get_client(self): with self._lock: if self._clients: return self._clients.pop() return None def return_client(self, client): with self._lock: self._clients.append(client) 🚀 Production Considerations Security Best Practices Network Isolation: Use internal Docker networks for model communication Resource Limits: Set appropriate memory and CPU limits Access Control: Implement authentication layers for external access # docker-compose.production.ymlversion: '3.8'services: ai_service: provider: type: model options: model: ai/smollm2 deploy: resources: limits: memory: 4G cpus: '2.0' networks: - ai_internalnetworks: ai_internal: internal: true Monitoring and Logging import loggingfrom datetime import datetime# Set up comprehensive logginglogging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')class MonitoredAIClient: def __init__(self): self.logger = logging.getLogger(__name__) self.client = OpenAI( base_url="http://localhost:12434/engines/llama.cpp/v1", api_key="not-needed" ) def chat_with_metrics(self, messages): start_time = datetime.now() try: response = self.client.chat.completions.create( model="ai/smollm2", messages=messages ) duration = (datetime.now() - start_time).total_seconds() self.logger.info(f"Chat completed in {duration:.2f}s") return response except Exception as e: self.logger.error(f"Chat failed: {str(e)}") raise 🎯 Conclusion: The Future of Local AI Development Docker Model Runner represents a paradigm shift in how we approach AI development. By eliminating infrastructure complexity and providing OpenAI-compatible APIs, it democratizes AI development and makes local inference accessible to every developer. Key Takeaways Zero Setup Complexity: No more wrestling with Python environments and CUDA installations OpenAI Compatibility: Drop-in replacement for existing OpenAI integrations Production Ready: Docker Compose integration and comprehensive API support Performance Optimized: GPU acceleration and host-based execution Developer Friendly: Familiar Docker commands and standard REST APIs What’s Next? The future of AI development is local-first, privacy-conscious, and infrastructure-agnostic. Docker Model Runner is leading this transformation by making AI models as easy to deploy as any other containerized application. Whether you’re building chatbots, code generation tools, or complex AI-powered applications, Docker Model Runner provides the foundation for reliable, scalable, and maintainable AI development workflows. Ready to get started? Pull your first model today: docker model pull ai/smollm2docker model run ai/smollm2 "Hello, Docker Model Runner!" The future of AI development is here, and it runs in Docker. 🚀 Have you tried Docker Model Runner in your projects? Share your experiences and use cases in the comments below!

Transform basic phone bots into intelligent conversational agents using Retrieval-Augmented Generation and modern Python architecture The Problem with Traditional Call Bots Picture this: You’ve built a call assistant that can transcribe speech and respond to customers, but it feels robotic and generic. When customers ask specific questions about your product, pricing, or policies, your bot either gives canned responses or completely misses the mark. Sound familiar? Traditional call bots suffer from a fundamental limitation: they lack access to dynamic, contextual information. They’re essentially expensive tape recorders with basic NLP capabilities. But what if your call assistant could instantly access your entire knowledge base, understand the context of the conversation, and generate intelligent, personalized responses in real-time? Enter Retrieval-Augmented Generation (RAG) — the game-changing architecture that’s revolutionizing how we build intelligent conversational AI. What is RAG and Why Does It Matter for Call Assistants? RAG is an AI architecture pattern that combines the power of large language models with external knowledge retrieval. Instead of relying solely on the model’s training data, RAG systems can dynamically fetch relevant information from external sources and use it to generate more accurate, contextual responses. For call assistants, this is transformative: • Dynamic Knowledge: Access up-to-date company information, product details, and policies • Contextual Responses: Generate replies based on specific customer queries and conversation history • Consistent Messaging: Ensure all responses align with approved company information • Scalable Intelligence: Add new knowledge without retraining models Architecture Overview: From Audio to Intelligent Response Let’s break down the complete architecture of a production-ready RAG-powered call assistant: ┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ Customer │───▶│ Twilio │───▶│ FastAPI ││ Phone │ │ WebSocket │ │ Server │ └─────────────┘ └──────────────┘ └─────────────┘ │ ▼┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ Response │◀───│ Speech-to- │───▶│ RAG System ││ Generation │ │ Text Service │ │ │└─────────────┘ └──────────────┘ └─────────────┘ │ │ ▼ ▼┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ OpenAI │◀───│ Conversation │───▶│ ChromaDB ││ GPT │ │ Context │ │ Vector DB │└─────────────┘ └──────────────┘ └─────────────┘ This architecture ensures sub-1000ms response times while maintaining intelligent, context-aware conversations. Project Structure: Building for Scale and Maintainability Before diving into implementation, let’s establish a clean, scalable project structure that follows Python best practices: ├── app/│ ├── core/│ │ └── config.py # Centralized configuration│ ├── services/│ │ ├── rag_service.py # RAG implementation with ChromaDB│ │ ├── response_service.py # Intelligent response generation│ │ └── speech_service.py # Speech processing services│ └── utils/│ ├── data_loader.py # Knowledge base utilities│ └── audio_utils.py # Audio processing helpers├── tests/│ └── test_rag.py # Comprehensive testing suite├── main.py # FastAPI application entry point├── requirements.txt # Dependencies└── docker-compose.yml # Production deployment This modular structure separates concerns, making the codebase maintainable and testable. Each service has a single responsibility, and the configuration is centralized for easy management. Step 1: Building the RAG Foundation Setting Up the Vector Database The heart of our RAG system is ChromaDB, a powerful vector database that enables semantic search across our knowledge base: # app/services/rag_service.pyimport chromadbfrom sentence_transformers import SentenceTransformerfrom chromadb.config import Settingsfrom typing import List, Dict, Anyfrom dataclasses import dataclassfrom datetime import datetime @dataclassclass DocumentChunk: """Structured document chunk with metadata""" content: str doc_id: str chunk_id: str category: str source: str created_at: datetime metadata: Dict[str, Any] class RAGService: def __init__(self, embedding_model_name: str = "all-MiniLM-L6-v2"): self.embedding_model = SentenceTransformer(embedding_model_name) # Initialize ChromaDB with persistence self.client = chromadb.PersistentClient( path="./data/chroma_db", settings=Settings( anonymized_telemetry=False, is_persistent=True ) ) # Create collections for different knowledge domains self.collections = { 'products': self._get_or_create_collection('products'), 'policies': self._get_or_create_collection('policies'), 'faqs': self._get_or_create_collection('faqs'), 'conversations': self._get_or_create_collection('conversations') } # Hybrid search weights self.keyword_weights = {'exact_match': 0.4, 'semantic': 0.6} Implementing Hybrid Search The magic happens in our hybrid search implementation, which combines semantic similarity with keyword matching for optimal retrieval: def hybrid_search(self, query: str, collections: List[str] = None, n_results: int = 5) -&gt; Dict[str, Any]: """Advanced hybrid search combining semantic and keyword matching""" if collections is None: collections = ['products', 'policies', 'faqs'] all_results = [] query_embedding = self.embedding_model.encode([query])[0] for collection_name in collections: collection = self.collections[collection_name] # Semantic search using vector similarity semantic_results = collection.query( query_embeddings=[query_embedding.tolist()], n_results=n_results * 2, # Get more for filtering include=['documents', 'metadatas', 'distances'] ) # Process and score results for doc, metadata, distance in zip( semantic_results['documents'][0], semantic_results['metadatas'][0], semantic_results['distances'][0] ): # Calculate hybrid score semantic_score = 1 - distance # Convert distance to similarity keyword_score = self._keyword_similarity(query, doc) final_score = ( self.keyword_weights['semantic'] * semantic_score + self.keyword_weights['exact_match'] * keyword_score ) all_results.append({ 'content': doc, 'metadata': metadata, 'score': final_score, 'collection': collection_name }) # Sort by final score and return top results all_results.sort(key=lambda x: x['score'], reverse=True) return { 'results': all_results[:n_results], 'total_searched': len(all_results), 'query_analysis': self._analyze_query(query) } Step 2: Intelligent Response Generation Context-Aware Conversation Management Our response service maintains conversation context and adapts responses based on the call stage: # app/services/response_service.pyfrom enum import Enumfrom dataclasses import dataclassimport openai class CallStage(Enum): INTRODUCTION = "introduction" DISCOVERY = "discovery" PRESENTATION = "presentation" OBJECTION_HANDLING = "objection_handling" CLOSING = "closing" FOLLOWUP = "followup" @dataclassclass ConversationContext: """Comprehensive conversation state management""" call_id: str customer_profile: Dict[str, Any] discussed_topics: List[str] objections_raised: List[str] interests_expressed: List[str] call_stage: CallStage conversation_history: List[Dict[str, str]] sentiment_history: List[Dict[str, float]] last_rag_results: Optional[Dict[str, Any]] = None class ResponseService: def __init__(self, rag_service: RAGService): self.rag_service = rag_service self.openai_client = openai.OpenAI() # Stage-specific response templates self.response_templates = { 'introduction': """ You are a professional and friendly sales assistant. Keep the introduction concise, warm, and focus on understanding the customer's needs. """, 'discovery': """ Focus on understanding the customer's specific needs, pain points, and requirements. Ask thoughtful follow-up questions based on their responses. """, 'presentation': """ Present relevant solutions based on the customer's expressed needs. Use the retrieved company information to provide accurate, detailed responses. """ } Dynamic Response Generation The core of our intelligent response system combines RAG retrieval with contextual prompt engineering: async def generate_contextual_response(self, user_input: str, context: ConversationContext) -&gt; Dict[str, Any]: """Generate intelligent, context-aware responses using RAG""" # Perform enhanced RAG retrieval rag_results = self.rag_service.hybrid_search( query=user_input, collections=self._select_relevant_collections(context), n_results=3 ) # Build comprehensive prompt with retrieved context system_prompt = self._build_system_prompt(context, rag_results) conversation_prompt = self._build_conversation_prompt( user_input, context, rag_results ) try: # Generate response using GPT with retrieved context response = await self._generate_with_retry( system_prompt=system_prompt, user_prompt=conversation_prompt, max_tokens=200, temperature=0.7 ) # Enhance response with confidence scoring enhanced_response = await self._enhance_response( response, context, rag_results ) return { 'response': enhanced_response['text'], 'confidence_score': enhanced_response['confidence'], 'sources_used': [r['metadata']['source'] for r in rag_results['results']], 'conversation_stage': context.call_stage.value, 'suggested_followups': enhanced_response['followups'] } except Exception as e: return self._generate_fallback_response(user_input, context) Step 3: Real-Time Voice Integration FastAPI WebSocket Handler Our main application handles real-time voice streams from Twilio using WebSocket connections: # main.pyfrom fastapi import FastAPI, WebSocket, WebSocketDisconnectimport asyncioimport jsonimport base64 @app.websocket("/media-stream")async def handle_media_stream(websocket: WebSocket): """Handle Twilio Media Stream WebSocket connection""" stream_sid = None audio_buffer = [] vad = VoiceActivityDetector() try: await websocket.accept() while True: message = await asyncio.wait_for( websocket.receive_text(), timeout=30.0 ) data = json.loads(message) if data["event"] == "start": stream_sid = data["start"]["streamSid"] await connection_manager.connect(websocket, stream_sid) elif data["event"] == "media" and stream_sid: # Process incoming audio payload = data["media"]["payload"] audio_chunk = base64.b64decode(payload) audio_buffer.append(audio_chunk) # Voice activity detection for speech segmentation if vad.detect_speech_end(audio_buffer): await process_audio_buffer(stream_sid, audio_buffer) audio_buffer = [] except WebSocketDisconnect: if stream_sid: connection_manager.disconnect(stream_sid) Audio Processing Pipeline The audio processing pipeline handles speech-to-text conversion with multiple provider fallbacks: # app/services/speech_service.pyclass SpeechRecognitionService: def __init__(self): self.google_client = speech.SpeechClient() if config.GOOGLE_CREDENTIALS_PATH else None async def transcribe_streaming(self, audio_chunks: List[bytes]) -&gt; str: """Transcribe audio using Google Cloud Speech-to-Text with Whisper fallback""" if self.google_client: try: # Primary: Google Cloud Speech-to-Text return await self._transcribe_google(audio_chunks) except Exception as e: logger.warning(f"Google STT failed: {e}") # Fallback: OpenAI Whisper return await self._transcribe_whisper(b''.join(audio_chunks)) async def _transcribe_google(self, audio_chunks: List[bytes]) -&gt; str: """Google Cloud Speech-to-Text implementation""" config_obj = speech.RecognitionConfig( encoding=speech.RecognitionConfig.AudioEncoding.MULAW, sample_rate_hertz=8000, language_code="en-US", enable_automatic_punctuation=True, model="phone_call" ) streaming_config = speech.StreamingRecognitionConfig( config=config_obj, interim_results=True, ) # Process streaming audio audio_generator = (speech.StreamingRecognizeRequest(audio_content=chunk) for chunk in audio_chunks) responses = self.google_client.streaming_recognize(streaming_config, audio_generator) for response in responses: for result in response.results: if result.is_final: return result.alternatives[0].transcript.strip() return "" Step 4: Data Loading and Knowledge Management Flexible Data Loading System Our data loading system supports multiple formats and provides easy knowledge base management: # app/utils/data_loader.pyclass DataLoader: def __init__(self, rag_service: RAGService): self.rag_service = rag_service async def load_sample_data(self): """Load sample company data into the RAG system""" # Sample product data products = [ { "content": "CloudSync Pro is our flagship cloud storage solution offering 1TB of secure storage with end-to-end encryption. Features include real-time sync across all devices, advanced sharing controls, and 99.9% uptime guarantee. Pricing starts at $9.99/month for individuals and $19.99/month for teams.", "category": "product", "source": "product_catalog", "metadata": {"product_name": "CloudSync Pro", "price_individual": 9.99, "price_team": 19.99} }, # ... more products ] # Convert to DocumentChunk objects and load await self._load_documents(products, "products") async def load_from_csv(self, file_path: str, collection_name: str): """Load documents from CSV file""" doc_chunks = [] with open(file_path, 'r', encoding='utf-8') as file: reader = csv.DictReader(file) for i, row in enumerate(reader): chunk = DocumentChunk( content=row.get('content', ''), doc_id=f"{collection_name}_{i}", chunk_id="0", category=row.get('category', 'general'), source=row.get('source', file_path), created_at=datetime.now(), metadata={k: v for k, v in row.items() if k not in ['content', 'category', 'source']} ) doc_chunks.append(chunk) await self.rag_service.add_document_batch(doc_chunks, collection_name) Step 5: Testing and Validation Comprehensive Testing Suite A robust testing framework ensures your RAG system performs optimally: # tests/test_rag.pyasync def test_rag_system(): """Test the RAG system with sample queries""" print("🚀 Initializing RAG System...") # Initialize and load data rag_service = RAGService() loader = DataLoader(rag_service) await loader.load_sample_data() # Test queries test_queries = [ "What cloud storage options do you have?", "How much does CloudSync Pro cost?", "Can I cancel my subscription?", "Is my data encrypted?", "Do you offer enterprise solutions?" ] print("\n🔍 Testing RAG Search...") for query in test_queries: results = rag_service.hybrid_search(query, n_results=2) print(f"\n📝 Query: {query}") print(f" Found {len(results['results'])} results:") for i, result in enumerate(results['results']): print(f" {i+1}. Score: {result['score']:.3f} | {result['content'][:80]}...") print(f" Query Type: {results['query_analysis']['query_type']}") async def test_search_performance(): """Test search performance with multiple queries""" rag_service = RAGService() loader = DataLoader(rag_service) await loader.load_sample_data() queries = ["pricing information", "security features", "enterprise solutions"] * 20 start_time = time.time() for query in queries: results = rag_service.hybrid_search(query, n_results=3) total_time = time.time() - start_time avg_time = total_time / len(queries) print(f"📊 Processed {len(queries)} queries in {total_time:.2f} seconds") print(f"📊 Average query time: {avg_time*1000:.1f} ms") print(f"📊 Queries per second: {len(queries)/total_time:.1f}") Challenges and Best Practices Performance Optimization Challenge: Maintaining sub-1000ms response times while processing complex RAG queries. Solution: Implement several optimization strategies: 1. Batch Processing: Process multiple documents efficiently 2. Caching: Use Redis for frequent query results 3. Async Operations: Leverage Python’s asyncio for concurrent processing 4. Connection Pooling: Reuse database connections # Performance optimization exampleclass OptimizedRAGService(RAGService): def __init__(self): super().__init__() self.cache = redis.Redis(host='localhost', port=6379, db=0) self.cache_ttl = 3600 # 1 hour async def hybrid_search_cached(self, query: str, **kwargs) -&gt; Dict[str, Any]: # Check cache first cache_key = f"rag_query:{hash(query + str(kwargs))}" cached_result = self.cache.get(cache_key) if cached_result: return json.loads(cached_result) # Perform search result = self.hybrid_search(query, **kwargs) # Cache result self.cache.setex(cache_key, self.cache_ttl, json.dumps(result)) return result Context Management Challenge: Maintaining conversation context across multiple turns while avoiding context window limits. Solution: Implement intelligent context pruning and summarization: def _manage_context_window(self, context: ConversationContext) -&gt; ConversationContext: """Intelligently manage conversation context to stay within limits""" if len(context.conversation_history) &gt; self.max_context_length: # Keep first few turns (introduction) and recent turns important_turns = context.conversation_history[:2] # Introduction recent_turns = context.conversation_history[-8:] # Recent context context.conversation_history = important_turns + recent_turns # Summarize older discussions if context.discussed_topics: context.discussed_topics = list(set(context.discussed_topics))[:10] return context Error Handling and Resilience Challenge: Ensuring system reliability when external services fail. Solution: Implement circuit breaker patterns and graceful degradation: class CircuitBreaker: def __init__(self, failure_threshold: int = 5, timeout: int = 60): self.failure_threshold = failure_threshold self.timeout = timeout self.failure_count = 0 self.last_failure_time = None self.state = "CLOSED" # CLOSED, OPEN, HALF_OPEN async def call(self, func, *args, **kwargs): if self.state == "OPEN": if time.time() - self.last_failure_time &gt; self.timeout: self.state = "HALF_OPEN" else: raise Exception("Circuit breaker is open") try: result = await func(*args, **kwargs) self.reset() return result except Exception as e: self.record_failure() raise e def reset(self): self.failure_count = 0 self.state = "CLOSED" def record_failure(self): self.failure_count += 1 self.last_failure_time = time.time() if self.failure_count &gt;= self.failure_threshold: self.state = "OPEN" Security Considerations Challenge: Protecting against prompt injection and ensuring data privacy. Solution: Implement input validation and content filtering: class SecurityValidator: def __init__(self): self.dangerous_patterns = [ r"ignore previous instructions", r"system prompt", r"act as.*different", r"pretend.*you are" ] def validate_input(self, user_input: str) -&gt; bool: """Validate user input for potential security issues""" # Check for prompt injection patterns for pattern in self.dangerous_patterns: if re.search(pattern, user_input.lower()): logger.warning(f"Potential prompt injection detected: {user_input}") return False # Length validation if len(user_input) &gt; 1000: logger.warning(f"Input too long: {len(user_input)} characters") return False return True def sanitize_response(self, response: str) -&gt; str: """Sanitize generated responses""" # Remove any potential system information sanitized = re.sub(r'(api[_\s]?key|token|password)', '[REDACTED]', response, flags=re.IGNORECASE) return sanitized Production Deployment Docker Configuration Deploy your RAG-powered call assistant using Docker for consistent environments: # DockerfileFROM python:3.11-slim WORKDIR /app # Install system dependenciesRUN apt-get update &amp;&amp; apt-get install -y \ gcc \ g++ \ portaudio19-dev \ &amp;&amp; rm -rf /var/lib/apt/lists/* # Install Python dependenciesCOPY requirements.txt .RUN pip install --no-cache-dir -r requirements.txt # Copy application codeCOPY . . # Create data directoryRUN mkdir -p ./data EXPOSE 8000 # Health checkHEALTHCHECK --interval=30s --timeout=30s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8000/health || exit 1 CMD ["python", "main.py"] Docker Compose for Multi-Service Setup # docker-compose.ymlversion: '3.8' services: app: build: . ports: - "8000:8000" environment: - REDIS_URL=redis://redis:6379 depends_on: - redis volumes: - ./data:/app/data - ./.env:/app/.env restart: unless-stopped redis: image: redis:7-alpine ports: - "6379:6379" volumes: - redis_data:/data restart: unless-stopped command: redis-server --appendonly yes volumes: redis_data: Environment Configuration # Production environment variablesexport OPENAI_API_KEY="your_production_key"export TWILIO_ACCOUNT_SID="your_twilio_sid"export TWILIO_AUTH_TOKEN="your_twilio_token"export REDIS_URL="redis://your-redis-server:6379"export SESSION_TIMEOUT="3600"export MAX_CONCURRENT_CALLS="500" Monitoring and Analytics Performance Metrics Implement comprehensive monitoring to track system performance: # app/utils/metrics.pyfrom prometheus_client import Counter, Histogram, Gaugeimport time # Metricsrag_queries_total = Counter('rag_queries_total', 'Total RAG queries processed')rag_query_duration = Histogram('rag_query_duration_seconds', 'RAG query processing time')active_calls = Gauge('active_calls_total', 'Number of active calls')response_confidence = Histogram('response_confidence_score', 'Response confidence scores') class MetricsCollector: def __init__(self): self.start_time = time.time() def record_rag_query(self, query_time: float, confidence: float): rag_queries_total.inc() rag_query_duration.observe(query_time) response_confidence.observe(confidence) def update_active_calls(self, count: int): active_calls.set(count) Logging Strategy # app/utils/logging_config.pyimport loggingimport jsonfrom datetime import datetime class StructuredLogger: def __init__(self, name: str): self.logger = logging.getLogger(name) def log_conversation_event(self, event_type: str, call_id: str, user_input: str = None, response: str = None, confidence: float = None, sources: List[str] = None): """Log structured conversation events""" log_data = { "timestamp": datetime.utcnow().isoformat(), "event_type": event_type, "call_id": call_id, "user_input": user_input, "response": response, "confidence_score": confidence, "sources_used": sources } self.logger.info(json.dumps(log_data)) Future Directions and Advanced Features Multi-Modal RAG Extend your system to handle images, documents, and other media types: class MultiModalRAGService(RAGService): def __init__(self): super().__init__() self.image_processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32") self.image_model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32") async def process_image_query(self, image_data: bytes, text_query: str) -&gt; Dict[str, Any]: """Process queries that include images""" # Extract image features image_features = self._extract_image_features(image_data) # Combine with text search text_results = self.hybrid_search(text_query) image_results = self._search_by_image(image_features) # Merge and rank results return self._merge_multimodal_results(text_results, image_results) Real-Time Learning Implement systems that learn from successful conversations: class AdaptiveRAGService(RAGService): def __init__(self): super().__init__() self.feedback_collector = FeedbackCollector() async def learn_from_conversation(self, conversation_data: Dict[str, Any]): """Learn from successful conversation patterns""" if conversation_data['success_rating'] &gt; 0.8: # Extract successful response patterns successful_patterns = self._extract_patterns(conversation_data) # Update knowledge base with successful interactions await self._update_conversation_collection(successful_patterns) # Fine-tune response templates await self._update_response_templates(successful_patterns) Advanced Analytics Implement conversation analytics for business insights: class ConversationAnalytics: def __init__(self): self.sentiment_analyzer = pipeline("sentiment-analysis") self.topic_extractor = pipeline("zero-shot-classification") async def analyze_conversation(self, conversation_history: List[Dict]) -&gt; Dict[str, Any]: """Analyze conversation for business insights""" # Sentiment analysis sentiments = [self.sentiment_analyzer(turn['content']) for turn in conversation_history] # Topic extraction topics = self._extract_conversation_topics(conversation_history) # Intent classification intents = self._classify_customer_intents(conversation_history) return { "sentiment_progression": sentiments, "main_topics": topics, "customer_intents": intents, "conversion_probability": self._calculate_conversion_probability(conversation_history) } Conclusion: The Future of Intelligent Call Assistants Building RAG-powered call assistants represents a significant leap forward in conversational AI. By combining the power of large language models with dynamic knowledge retrieval, we can create systems that are not just responsive, but truly intelligent and contextually aware. Key Takeaways for Developers 1. Architecture Matters: A well-structured, modular codebase is essential for maintaining and scaling RAG systems 2. Performance is Critical: Sub-second response times require careful optimization of every component 3. Context is King: Intelligent conversation management makes the difference between a bot and an assistant 4. Reliability is Non-Negotiable: Implement robust error handling and fallback mechanisms 5. Security is Paramount: Protect against prompt injection and ensure data privacy Next Steps 1. Start Small: Begin with a focused knowledge domain and expand gradually 2. Measure Everything: Implement comprehensive monitoring and analytics from day one 3. Iterate Rapidly: Use feedback to continuously improve response quality 4. Plan for Scale: Design your architecture to handle growth in users and knowledge 5. Stay Current: Keep up with advances in RAG techniques and LLM capabilities The future of customer service lies in intelligent, context-aware AI assistants that can understand, learn, and adapt. By following the patterns and practices outlined in this guide, you’re well-equipped to build the next generation of conversational AI systems. Resources and Further Reading • ChromaDB Documentation: https://docs.trychroma.com/ • FastAPI Documentation: https://fastapi.tiangolo.com/ • Twilio Media Streams: https://www.twilio.com/docs/voice/media-streams • OpenAI API Documentation: https://platform.openai.com/docs Ready to build your own intelligent call assistant? Clone the complete project repository and start experimenting with RAG-powered conversations today. The future of customer service is intelligent, contextual, and just a few lines of code away.

Technical Skills & Tools

Frameworks, tools, and technologies I use to build solutions

Machine Learning & AI

LangChain, Cohere, ChromaDB, scikit-learn, NLTK, Pandas, NumPy, Matplotlib, Jupyter Notebook

Languages

Python, Go, JavaScript, PHP, SQL

Web Development

REST, gRPC, GraphQL, OpenAPI, FastAPI, React, Laravel

Cloud & DevOps

GCP, AWS, Docker, MongoDB, PostgreSQL, Git, GitHub, Linux

Frameworks & Tools

Starlette, WebSockets, Flask, Tornado, RabbitMQ, Meilisearch, Gel (EdgeDB)

Testing & Quality

Pytest, Unit Testing, Integration Testing, Code Quality, TDD

Education

Academic background and professional certifications that shaped my technical expertise.

Nepal Commerce Campus (NCC)

Bachelor in Information Management

Bachelor Degree · 2017 - 2021

Focused on producing IT professionals with strong management and technical skills, and a results-driven, socially responsible mindset.

Ambition College

Mangement (Computer Science)

NEB · 2014 - 2016

Focused on computer science, it combines business strategy and technical expertise. It equips students with skills in programming, databases, and leadership for tech-driven roles. This blend enables innovative solutions to complex business challenges.

Get In Touch

Let's discuss your next project or opportunity