πŸŽ‰ 75% of content is free forever β€” Unlock Premium from $10/mo β†’
CW
Search courses…
πŸ’Ό Servicesℹ️ Aboutβœ‰οΈ ContactView Pricing Plansfrom $10

AI Agent Workflow System (LangGraph + Tools + Memory)

AI/ML ProjectsAI Agents⭐ Premium

Advertisement

AI Agent Workflow System

LangGraph + Tools + Memory | Autonomous Multi-Step Reasoning

Expert16+ HoursGPU Recommended

Project Overview

Problem Statement

Simple prompt-response LLMs cannot handle complex, multi-step tasks that require planning, tool usage, and state management. AI agents combine LLM reasoning with external tools, memory, and workflow orchestration to solve tasks autonomously.

Objectives

  • Build a multi-agent system with LangGraph workflow orchestration
  • Implement tool integration (web search, code execution, API calls)
  • Create persistent memory for long-running conversations
  • Add human-in-the-loop approval for critical actions
  • Deploy with monitoring and safety guardrails
ComponentTechnology
Agent FrameworkLangGraph + LangChain
LLM BackendGPT-4 / Claude 3
Vector StoreChromaDB / Pinecone
Tool SystemLangChain Tools + Custom
MemoryPostgreSQL + Redis
Workflow EngineLangGraph State Machine
MonitoringLangSmith + LangFuse
SafetyGuardrails AI

Architecture Diagram

Architecture Diagram
+-------------------------------------------------------------------+
|              AI Agent Workflow Architecture                       |
+-------------------------------------------------------------------+
|  +--------------+    +--------------+    +------------------+     |
|  | User Input   |--->| Agent Router |--->| Planning Agent   |     |
|  | (API/Chat)   |    | (LangGraph)  |    | (Task Decompose) |     |
|  +--------------+    +--------------+    +--------+---------+     |
|                                                  |               |
|                                                  v               |
|  +--------------+    +--------------+    +------------------+     |
|  |  Tool        |<---|  Executor    |<---|  Action Plan     |     |
|  |  Registry    |    |  (LangGraph) |    |  (Step Sequence) |     |
|  +--------------+    +--------------+    +------------------+     |
|        |                                                   |     |
|        v                                                   v     |
|  +--------------+    +--------------+    +------------------+     |
|  |  Memory      |    |  Human-in-   |    |  Response        |     |
|  |  Store       |    |  the-Loop    |    |  Generator       |     |
|  +--------------+    +--------------+    +------------------+     |
+-------------------------------------------------------------------+

Step-by-Step Implementation

Step 1: Environment Setup

mkdir ai-agent-system && cd ai-agent-system
pip install langgraph langchain langchain-openai
pip install chromadb psycopg2-binary redis
pip install fastapi uvicorn
pip install langsmith guardrails-ai
pip install python-dotenv pydantic

Step 2: Agent State and Graph Definition

Define the agent state machine using LangGraph for multi-step reasoning workflows.

# src/agent/state.py
from typing import TypedDict, Annotated, List, Dict, Optional, Any
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from langchain_core.messages import BaseMessage, HumanMessage, AIMessage
from langchain_core.tools import tool
import operator


class AgentState(TypedDict):
    messages: Annotated[List[BaseMessage], add_messages]
    current_task: str
    task_plan: List[str]
    completed_steps: List[str]
    tool_results: Dict[str, Any]
    memory: Dict[str, Any]
    iteration: int
    max_iterations: int
    human_approval_needed: bool


class AgentWorkflow:
    def __init__(self, llm, tools: list):
        self.llm = llm
        self.tools = {tool.name: tool for tool in tools}
        self.graph = self._build_graph()

    def _build_graph(self) -> StateGraph:
        workflow = StateGraph(AgentState)

        # Add nodes
        workflow.add_node("planner", self._plan_step)
        workflow.add_node("executor", self._execute_step)
        workflow.add_node("evaluator", self._evaluate_progress)
        workflow.add_node("human_review", self._human_review)
        workflow.add_node("responder", self._generate_response)

        # Add edges
        workflow.set_entry_point("planner")
        workflow.add_conditional_edges("planner", self._route_after_plan)
        workflow.add_conditional_edges("executor", self._route_after_execute)
        workflow.add_conditional_edges("evaluator", self._route_after_evaluate)
        workflow.add_edge("human_review", "executor")
        workflow.add_edge("responder", END)

        return workflow.compile()

    def _plan_step(self, state: AgentState) -> dict:
        """Generate a plan for the current task."""
        messages = state["messages"] + [
            ("system", "You are a planning agent. Break the task into clear, executable steps."),
        ]
        response = self.llm.invoke(messages)
        return {
            "task_plan": response.content.split("\n"),
            "iteration": state.get("iteration", 0) + 1,
        }

    def _execute_step(self, state: AgentState) -> dict:
        """Execute the current step using available tools."""
        plan = state.get("task_plan", [])
        completed = state.get("completed_steps", [])

        if not plan:
            return {"current_task": "no_more_steps"}

        current_step = plan[0]
        remaining_plan = plan[1:]

        # Execute with tools
        messages = state["messages"] + [
            ("system", f"Execute this step: {current_step}. Use tools if needed."),
        ]
        response = self.llm.bind_tools(list(self.tools.values())).invoke(messages)

        return {
            "task_plan": remaining_plan,
            "completed_steps": completed + [current_step],
            "messages": [response],
        }

    def _evaluate_progress(self, state: AgentState) -> dict:
        """Evaluate if the task is progressing well."""
        completed = state.get("completed_steps", [])
        plan = state.get("task_plan", [])
        iteration = state.get("iteration", 0)
        max_iter = state.get("max_iterations", 10)

        if not plan or iteration >= max_iter:
            return {"current_task": "complete"}
        return {"current_task": "continue"}

    def _human_review(self, state: AgentState) -> dict:
        """Placeholder for human-in-the-loop review."""
        return {}

    def _generate_response(self, state: AgentState) -> dict:
        """Generate final response summarizing completed work."""
        completed = state.get("completed_steps", [])
        return {
            "messages": [AIMessage(content=f"Task completed. Steps executed: {len(completed)}")],
        }

    def _route_after_plan(self, state: AgentState) -> str:
        if state.get("human_approval_needed"):
            return "human_review"
        return "executor"

    def _route_after_execute(self, state: AgentState) -> str:
        return "evaluator"

    def _route_after_evaluate(self, state: AgentState) -> str:
        if state["current_task"] == "complete":
            return "responder"
        return "planner"

    def run(self, query: str, max_iterations: int = 10) -> dict:
        initial_state = {
            "messages": [HumanMessage(content=query)],
            "current_task": query,
            "task_plan": [],
            "completed_steps": [],
            "tool_results": {},
            "memory": {},
            "iteration": 0,
            "max_iterations": max_iterations,
            "human_approval_needed": False,
        }
        return self.graph.invoke(initial_state)

Step 3: Tool Integration

Create custom tools for the agent to interact with external systems.

# src/tools/registry.py
from langchain_core.tools import tool
from typing import Optional
import requests
import json


@tool
def web_search(query: str) -> str:
    """Search the web for information using a search API."""
    # Placeholder for actual search API
    response = requests.get(
        "https://api.search.example.com/search",
        params={"q": query, "num": 5},
        headers={"Authorization": "Bearer $SEARCH_API_KEY"},
    )
    results = response.json()
    return json.dumps(results.get("results", []), indent=2)


@tool
def execute_python(code: str) -> str:
    """Execute Python code and return the output. Use for calculations and data processing."""
    try:
        # WARNING: In production, use a sandboxed execution environment
        local_vars = {}
        exec(code, {"__builtins__": {}}, local_vars)
        return str(local_vars.get("result", "Code executed successfully"))
    except Exception as e:
        return f"Error: {str(e)}"


@tool
def query_database(sql: str) -> str:
    """Query the database using SQL. Read-only access only."""
    import sqlite3
    conn = sqlite3.connect("data.db")
    cursor = conn.execute(sql)
    results = cursor.fetchall()
    conn.close()
    return json.dumps(results[:100])


@tool
def api_call(url: str, method: str = "GET", payload: Optional[str] = None) -> str:
    """Make an HTTP API call."""
    headers = {"Content-Type": "application/json"}
    data = json.loads(payload) if payload else None
    if method == "GET":
        resp = requests.get(url, headers=headers)
    elif method == "POST":
        resp = requests.post(url, headers=headers, json=data)
    else:
        resp = requests.request(method, url, headers=headers, json=data)
    return f"Status: {resp.status_code}\nResponse: {resp.text[:1000]}"


@tool
def save_to_memory(key: str, value: str) -> str:
    """Save a key-value pair to persistent memory for future reference."""
    # In production, save to Redis/PostgreSQL
    return f"Saved {key} = {value} to memory"


DEFAULT_TOOLS = [web_search, execute_python, query_database, api_call, save_to_memory]

Step 4: Memory System

# src/memory/memory_store.py
import redis
import json
from typing import Dict, List, Optional
from datetime import timedelta
import psycopg2
from dataclasses import dataclass


@dataclass
class MemoryConfig:
    redis_url: str = "redis://localhost:6379"
    pg_url: str = "postgresql://localhost:5432/agent_memory"
    short_term_ttl: int = 3600  # 1 hour
    long_term_ttl: int = 86400 * 30  # 30 days


class MemoryStore:
    def __init__(self, config: MemoryConfig = None):
        self.config = config or MemoryConfig()
        self.redis = redis.from_url(self.config.redis_url, decode_responses=True)
        self._init_pg()

    def _init_pg(self):
        self.pg = psycopg2.connect(self.config.pg_url)
        self.pg.execute("""
            CREATE TABLE IF NOT EXISTS agent_memories (
                id SERIAL PRIMARY KEY,
                agent_id VARCHAR(255),
                session_id VARCHAR(255),
                memory_type VARCHAR(50),
                key VARCHAR(255),
                value TEXT,
                metadata JSONB DEFAULT '{}',
                created_at TIMESTAMP DEFAULT NOW(),
                expires_at TIMESTAMP
            )
        """)
        self.pg.commit()

    def save_short_term(self, session_id: str, key: str, value: str):
        self.redis.setex(
            f"stm:{session_id}:{key}",
            self.config.short_term_ttl,
            value,
        )

    def get_short_term(self, session_id: str, key: str) -> Optional[str]:
        return self.redis.get(f"stm:{session_id}:{key}")

    def save_long_term(self, agent_id: str, key: str, value: str, metadata: Dict = None):
        self.pg.execute(
            "INSERT INTO agent_memories (agent_id, memory_type, key, value, metadata) VALUES (%s, %s, %s, %s, %s)",
            (agent_id, "long_term", key, value, json.dumps(metadata or {})),
        )
        self.pg.commit()

    def query_memories(self, agent_id: str, query: str, limit: int = 10) -> List[Dict]:
        self.pg.execute(
            "SELECT key, value, metadata FROM agent_memories WHERE agent_id = %s AND value ILIKE %s ORDER BY created_at DESC LIMIT %s",
            (agent_id, f"%{query}%", limit),
        )
        return [{"key": r[0], "value": r[1], "metadata": r[2]} for r in self.pg.fetchall()]

    def get_conversation_history(self, session_id: str, limit: int = 50) -> List[Dict]:
        keys = self.redis.lrange(f"conversation:{session_id}", 0, limit - 1)
        return [json.loads(k) for k in keys]

    def add_to_conversation(self, session_id: str, role: str, content: str):
        entry = json.dumps({"role": role, "content": content})
        self.redis.lpush(f"conversation:{session_id}", entry)
        self.redis.ltrim(f"conversation:{session_id}", 0, 999)

Step 5: Safety Guardrails

# src/safety/guardrails.py
from typing import Dict, List, Tuple
from dataclasses import dataclass
import re


@dataclass
class SafetyConfig:
    blocked_patterns: List[str] = None
    max_tool_calls_per_step: int = 5
    require_approval_tools: List[str] = None
    max_iterations: int = 20

    def __post_init__(self):
        if self.blocked_patterns is None:
            self.blocked_patterns = [
                r"rm\s+-rf", r"DROP\s+TABLE", r"DELETE\s+FROM",
                r"password", r"secret", r"api_key",
            ]
        if self.require_approval_tools is None:
            self.require_approval_tools = ["execute_python", "api_call", "query_database"]


class SafetyGuardrail:
    def __init__(self, config: SafetyConfig = None):
        self.config = config or SafetyConfig()
        self.violations = []

    def check_action(self, action: Dict) -> Tuple[bool, str]:
        """Check if an action is safe to execute."""
        tool_name = action.get("tool", "")
        tool_input = action.get("input", "")

        # Check blocked patterns
        for pattern in self.config.blocked_patterns:
            if re.search(pattern, tool_input, re.IGNORECASE):
                return False, f"Blocked: matches pattern {pattern}"

        # Check if approval needed
        if tool_name in self.config.require_approval_tools:
            return False, f"Approval required for tool: {tool_name}"

        return True, "Approved"

    def check_iteration_limit(self, iteration: int) -> bool:
        return iteration < self.config.max_iterations

    def log_violation(self, action: Dict, reason: str):
        self.violations.append({
            "action": action,
            "reason": reason,
        })

ℹ️

Always implement tool execution limits and human-in-the-loop approval for high-risk actions (database writes, API calls with side effects, code execution).

πŸ’‘

Use LangSmith or LangFuse for tracing agent execution. Understanding why an agent took a specific action is critical for debugging and improvement.

Performance Metrics

MetricTargetDescription
Task Completion Rate> 85%Successful task execution
Avg Steps to Completion< 8Efficiency metric
Tool Call Accuracy> 90%Correct tool selection
Response Latency< 30sFull task completion
Human Intervention Rate< 15%When HILO is enabled

Interview Talking Points

  1. LangGraph: Provides state machine-based workflow orchestration that enables complex multi-step agent reasoning with clear control flow.
  2. Tool Use: LLMs select tools based on descriptions. Clear tool descriptions are critical for reliable tool selection.
  3. Memory Architecture: Short-term (conversation context) + long-term (knowledge base) memory enables context-aware reasoning.
  4. Safety: Guardrails prevent harmful actions. Implement defense-in-depth with pattern matching, approval gates, and sandboxed execution.
  5. Evaluation: Agent evaluation requires task-level metrics (completion rate, efficiency) rather than just token-level metrics.
  6. Human-in-the-Loop: Critical for production deployments. Agents should escalate uncertain decisions to humans.

⚠️

LLM-based agents can hallucinate tool calls or create infinite loops. Always set iteration limits and implement fallback mechanisms.

ℹ️

For production agent systems, consider frameworks like AutoGen or CrewAI for multi-agent collaboration scenarios.

Advertisement