Skip to main content
AI/ML

Implementing RAG Pipelines with AWS Bedrock

Building production-ready AI applications with retrieval-augmented generation

MSCLOUDTECH Team
Author
Jan 31, 2026
14 min read

Large Language Models are powerful, but they have limitations: knowledge cutoffs, hallucinations, and inability to access private data. Retrieval-Augmented Generation (RAG) solves these problems by grounding LLM responses in your actual data. This post covers how to build production RAG systems with AWS Bedrock.

What is RAG?

RAG combines two capabilities:

  1. Retrieval: Finding relevant documents from your knowledge base
  2. Generation: Using an LLM to synthesize an answer based on retrieved context

Instead of relying solely on the LLM's training data, RAG retrieves relevant documents and includes them in the prompt. The LLM then generates responses grounded in your actual content.

RAG Architecture Overview

A production RAG system consists of:

  1. Document ingestion: Processing and chunking source documents
  2. Embedding generation: Converting chunks to vector representations
  3. Vector storage: Storing embeddings for similarity search
  4. Query processing: Converting user queries to embeddings and retrieving matches
  5. Response generation: Passing retrieved context to the LLM

Document Processing Pipeline

Chunking Strategy

How you chunk documents significantly impacts retrieval quality:

from langchain.text_splitter import RecursiveCharacterTextSplitter

# Chunking configuration
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,
    chunk_overlap=200,
    separators=["

", "
", ". ", " ", ""],
    length_function=len
)

# Process documents
def process_document(doc):
    chunks = text_splitter.split_text(doc.content)

    return [
        {
            "content": chunk,
            "metadata": {
                "source": doc.source,
                "title": doc.title,
                "section": detect_section(chunk),
                "chunk_index": i
            }
        }
        for i, chunk in enumerate(chunks)
    ]

Metadata Enrichment

Rich metadata enables filtering and improves retrieval:

{
  "content": "The show runs from March 15 to June 30...",
  "metadata": {
    "source": "shows/phantom-of-the-opera",
    "title": "The Phantom of the Opera",
    "category": "show_info",
    "venue": "Her Majesty's Theatre",
    "last_updated": "2026-01-15",
    "content_type": "schedule"
  }
}

Embedding with AWS Bedrock

Bedrock provides embedding models through a simple API:

import boto3
import json

bedrock = boto3.client('bedrock-runtime')

def get_embedding(text):
    response = bedrock.invoke_model(
        modelId='amazon.titan-embed-text-v2:0',
        body=json.dumps({
            "inputText": text,
            "dimensions": 1024,  # Configurable
            "normalize": True
        })
    )

    result = json.loads(response['body'].read())
    return result['embedding']

# Batch processing for efficiency
def batch_embed(texts, batch_size=25):
    embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        batch_embeddings = [get_embedding(t) for t in batch]
        embeddings.extend(batch_embeddings)
    return embeddings

Vector Storage with FAISS

FAISS provides fast similarity search for millions of vectors:

import faiss
import numpy as np

class VectorStore:
    def __init__(self, dimension=1024):
        # IVF index for scalability
        quantizer = faiss.IndexFlatIP(dimension)
        self.index = faiss.IndexIVFFlat(
            quantizer, dimension, 100,  # 100 clusters
            faiss.METRIC_INNER_PRODUCT
        )
        self.documents = []

    def add(self, embeddings, documents):
        vectors = np.array(embeddings).astype('float32')

        if not self.index.is_trained:
            self.index.train(vectors)

        self.index.add(vectors)
        self.documents.extend(documents)

    def search(self, query_embedding, k=5, filter_fn=None):
        query = np.array([query_embedding]).astype('float32')
        scores, indices = self.index.search(query, k * 2)  # Over-fetch for filtering

        results = []
        for score, idx in zip(scores[0], indices[0]):
            if idx == -1:
                continue
            doc = self.documents[idx]
            if filter_fn and not filter_fn(doc):
                continue
            results.append({
                "document": doc,
                "score": float(score)
            })
            if len(results) >= k:
                break

        return results

Query Processing

Query Expansion

Improve retrieval by expanding the user's query:

async def expand_query(query):
    """Use LLM to generate alternative phrasings"""
    prompt = f"""Given this user question, generate 3 alternative
phrasings that might help find relevant information:

Question: {query}

Alternative phrasings (one per line):"""

    response = await invoke_bedrock(prompt)
    alternatives = response.strip().split('
')

    return [query] + alternatives[:3]

Hybrid Search

Combine semantic and keyword search for better results:

def hybrid_search(query, k=5):
    # Semantic search
    query_embedding = get_embedding(query)
    semantic_results = vector_store.search(query_embedding, k=k)

    # Keyword search (BM25 or Elasticsearch)
    keyword_results = keyword_search(query, k=k)

    # Reciprocal Rank Fusion
    combined = reciprocal_rank_fusion(
        [semantic_results, keyword_results],
        k=60  # RRF constant
    )

    return combined[:k]

Response Generation with Bedrock Claude

Craft prompts that ground the LLM in retrieved context:

def generate_response(query, retrieved_docs):
    context = "

".join([
        f"[Source: {doc['metadata']['title']}]
{doc['content']}"
        for doc in retrieved_docs
    ])

    prompt = f"""You are a helpful assistant for a theatre company.
Answer the user's question based ONLY on the provided context.
If the context doesn't contain enough information to answer, say so.

Context:
{context}

User question: {query}

Instructions:
- Answer based only on the provided context
- Cite sources when possible
- If unsure, say "I don't have enough information"
- Be concise but complete

Answer:"""

    response = bedrock.invoke_model(
        modelId='anthropic.claude-3-sonnet-20240229-v1:0',
        body=json.dumps({
            "anthropic_version": "bedrock-2023-05-31",
            "messages": [{"role": "user", "content": prompt}],
            "max_tokens": 1000,
            "temperature": 0.1  # Low temperature for factual responses
        })
    )

    return json.loads(response['body'].read())['content'][0]['text']

Conversation Memory

Maintain context across multi-turn conversations:

class ConversationRAG:
    def __init__(self, session_id):
        self.session_id = session_id
        self.history = []

    async def query(self, user_message):
        # Contextualize query with conversation history
        contextualized = await self.contextualize_query(user_message)

        # Retrieve relevant documents
        docs = hybrid_search(contextualized, k=5)

        # Generate response with history
        response = await self.generate_with_history(
            user_message, docs, self.history
        )

        # Update history
        self.history.append({"role": "user", "content": user_message})
        self.history.append({"role": "assistant", "content": response})

        # Trim history to prevent context overflow
        if len(self.history) > 10:
            self.history = self.history[-10:]

        return response

    async def contextualize_query(self, query):
        """Rewrite query to be standalone using history"""
        if not self.history:
            return query

        prompt = f"""Given the conversation history and new question,
rewrite the question to be standalone (no pronouns like "it", "they"):

History:
{self.format_history()}

New question: {query}

Standalone question:"""

        return await invoke_bedrock(prompt)

Evaluation and Monitoring

Retrieval Quality Metrics

def evaluate_retrieval(test_set):
    metrics = {
        "precision_at_k": [],
        "recall_at_k": [],
        "mrr": []  # Mean Reciprocal Rank
    }

    for query, expected_docs in test_set:
        retrieved = search(query, k=5)
        retrieved_ids = {d['id'] for d in retrieved}

        # Precision: relevant docs in top k
        precision = len(retrieved_ids & expected_docs) / len(retrieved)
        metrics["precision_at_k"].append(precision)

        # Recall: fraction of relevant docs found
        recall = len(retrieved_ids & expected_docs) / len(expected_docs)
        metrics["recall_at_k"].append(recall)

        # MRR: rank of first relevant doc
        for i, doc in enumerate(retrieved):
            if doc['id'] in expected_docs:
                metrics["mrr"].append(1 / (i + 1))
                break
        else:
            metrics["mrr"].append(0)

    return {k: np.mean(v) for k, v in metrics.items()}

Response Quality Monitoring

# Log every interaction for analysis
async def log_interaction(query, retrieved, response, feedback=None):
    await dynamodb.put_item(
        TableName='RAGInteractions',
        Item={
            'interaction_id': str(uuid4()),
            'timestamp': datetime.utcnow().isoformat(),
            'query': query,
            'retrieved_doc_ids': [d['id'] for d in retrieved],
            'retrieval_scores': [d['score'] for d in retrieved],
            'response': response,
            'response_latency_ms': ...,
            'user_feedback': feedback,
            'flagged_for_review': response_contains_uncertainty(response)
        }
    )

Production Considerations

Scaling the Vector Store

  • Under 1M vectors: FAISS in Lambda with EFS storage
  • 1M-100M vectors: Amazon OpenSearch with k-NN plugin
  • 100M+ vectors: Dedicated vector database (Pinecone, Weaviate)

Keeping Knowledge Fresh

# Incremental update pipeline
EventBridge Schedule (hourly)
  -> Lambda: Check for updated documents
  -> Lambda: Re-embed changed chunks
  -> Lambda: Update vector store
  -> CloudWatch: Log update metrics

Key Takeaways

  1. Chunk thoughtfully: Chunk size and overlap significantly impact retrieval quality
  2. Enrich metadata: Good metadata enables filtering and improves relevance
  3. Use hybrid search: Combine semantic and keyword search for best results
  4. Ground responses: Prompt the LLM to use only retrieved context
  5. Monitor continuously: Track retrieval and response quality metrics

RAG transforms LLMs from impressive but unreliable tools into grounded, trustworthy assistants. With AWS Bedrock handling the infrastructure, you can focus on the application logic that delivers value to your users.

Topics Covered

BedrockRAGLangChainFAISSClaudeAIServerless
Found this helpful? Share it with your team.

Ready to Build Something Great?

Our senior engineering pods deliver production-ready solutions using the architectures we write about.

Free AWS Architecture Roadmap
48-hour delivery. $12K value.