LLM Orchestration
LLM orchestration frameworks enable developers to chain together multiple components - prompts, models, tools, and retrievers - into cohesive AI applications with complex workflows.
LangChain Fundamentals
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
# Basic chain
llm = ChatOpenAI(model="gpt-4", temperature=0)
prompt = ChatPromptTemplate.from_template(
"Explain {topic} in {style} style."
)
parser = StrOutputParser()
chain = prompt | llm | parser
result = chain.invoke({"topic": "quantum computing", "style": "simple"})
# RAG chain with retrieval
from langchain_community.vectorstores import FAISS
from langchain_openai import OpenAIEmbeddings
vectorstore = FAISS.from_documents(docs, OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
rag_prompt = ChatPromptTemplate.from_template(
"""Answer based on context:
Context: {context}
Question: {question}
Answer:"""
)
def format_docs(docs):
return "\n\n".join([d.page_content for d in docs])
rag_chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| rag_prompt
| llm
| parser
)
result = rag_chain.invoke("What is machine learning?")
LlamaIndex Orchestration
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core import Settings
from llama_index.llms.openai import OpenAI
from llama_index.embeddings.openai import OpenAIEmbedding
# Configure LlamaIndex
Settings.llm = OpenAI(model="gpt-4", temperature=0)
Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-small")
# Load and index documents
documents = SimpleDirectoryReader("./data").load_data()
node_parser = SentenceSplitter(chunk_size=512, chunk_overlap=50)
nodes = node_parser.get_documents(documents)
# Create index with vector store
index = VectorStoreIndex(nodes)
# Create query engine
query_engine = index.as_query_engine(
similarity_top_k=5,
response_mode="compact"
)
# Query
response = query_engine.query("What are the main findings?")
print(response)
print(response.source_nodes)
# Custom query engine
from llama_index.core.query_engine import CustomQueryEngine
from llama_index.core.retrievers import BaseRetriever
class RAGFusionQueryEngine(CustomQueryEngine):
def __init__(self, retrievers, llm, top_k=5):
self.retrievers = retrievers
self.llm = llm
self.top_k = top_k
def custom_query(self, query_str: str):
all_docs = []
for retriever in self.retrievers:
docs = retriever.retrieve(query_str)
all_docs.extend(docs)
# Reciprocal rank fusion
scored = self.rrf_fusion(all_docs, query_str)
context = "\n".join([d.text for d, _ in scored[:self.top_k]])
response = self.llm.complete(f"Context: {context}\nQuery: {query_str}")
return response
Multi-Model Orchestration
from langchain_core.runnables import RunnableParallel
class MultiModelOrchestrator:
def __init__(self):
self.models = {
"fast": ChatOpenAI(model="gpt-3.5-turbo", temperature=0),
"powerful": ChatOpenAI(model="gpt-4", temperature=0),
"creative": ChatOpenAI(model="gpt-4", temperature=0.7)
}
def route_by_complexity(self, query: str) -> str:
router_prompt = ChatPromptTemplate.from_template(
"""Classify complexity: simple, medium, complex
Query: {query}
Complexity:"""
)
complexity = (router_prompt | self.models["fast"] | StrOutputParser()).invoke(
{"query": query}
).strip().lower()
model = self.models["medium"] if complexity == "medium" else self.models[complexity]
return model
def parallel_analysis(self, query: str) -> dict:
analysis_chain = RunnableParallel(
summary=self.models["fast"] | ChatPromptTemplate.from_template(
"Summarize: {query}" ) | StrOutputParser(),
analysis=self.models["powerful"] | ChatPromptTemplate.from_template(
"Analyze in depth: {query}" ) | StrOutputParser(),
creative=self.models["creative"] | ChatPromptTemplate.from_template(
"Creative response: {query}" ) | StrOutputParser()
)
return analysis_chain.invoke({"query": query})
Workflow Patterns
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
class WorkflowState(TypedDict):
query: str
retrieved_docs: list
analysis: str
answer: str
def retrieve(state: WorkflowState) -> WorkflowState:
docs = retriever.get_relevant_documents(state["query"])
return {"retrieved_docs": docs}
def analyze(state: WorkflowState) -> WorkflowState:
context = "\n".join([d.page_content for d in state["retrieved_docs"]])
analysis = llm.invoke(f"Analyze: {context}").content
return {"analysis": analysis}
def answer(state: WorkflowState) -> WorkflowState:
final = llm.invoke(
f"Query: {state['query']}\nAnalysis: {state['analysis']}\nAnswer:"
).content
return {"answer": final}
# Build workflow
workflow = StateGraph(WorkflowState)
workflow.add_node("retrieve", retrieve)
workflow.add_node("analyze", analyze)
workflow.add_node("answer", answer)
workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "analyze")
workflow.add_edge("analyze", "answer")
workflow.add_edge("answer", END)
app = workflow.compile()
result = app.invoke({"query": "What are the trends?", "retrieved_docs": [], "analysis": "", "answer": ""})
Key Takeaways
- LangChain excels at chaining prompts, models, and tools
- LlamaIndex specializes in data indexing and retrieval
- LangGraph enables complex stateful workflows with cycles
- Multi-model routing optimizes cost vs. quality
- Observability is crucial for debugging orchestration pipelines