How to Build a Knowledge Graph from Documents with LLMs
Practical tutorial: Build a knowledge graph from documents with LLMs
How to Build a Knowledge Graph from Documents with LLMs
Table of Contents
- How to Build a Knowledge Graph from Documents with LLMs
- Create a virtual environment
- Install core dependencies
- For document parsing
- For performance monitoring
📺 Watch: Intro to Large Language Models
Video by Andrej Karpathy
Knowledge graphs have become the backbone of enterprise AI systems, enabling semantic search, recommendation engines, and reasoning over complex relationships. While traditional knowledge graph construction required extensive manual ontology engineering, large language models (LLMs) now automate the extraction of entities and relationships from unstructured text. In this tutorial, you'll build a production-ready pipeline that ingests documents, extracts structured knowledge using LLMs, and stores the results in a queryable graph database.
Why Knowledge Graphs Matter in Production
Consider a legal firm processing thousands of contract documents. A vector database alone can retrieve similar clauses, but it cannot answer "Which contracts mention both Force Majeure and Indemnification clauses signed after 2023?" Knowledge graphs capture these explicit relationships—entities like "Contract A" connected to "Force Majeure Clause" via a "contains" relationship, with temporal attributes. According to a 2024 survey by Gartner, organizations using knowledge graphs for data integration report 40% faster query times on complex multi-hop questions compared to traditional relational databases.
The architecture we'll build handles three critical production concerns: entity disambiguation (resolving "Apple" the fruit vs. "Apple" the company), relationship extraction (identifying meaningful connections beyond co-occurrence), and scalability (processing thousands of documents without exceeding LLM API rate limits).
Prerequisites and Environment Setup
Before writing code, ensure your environment has the following dependencies. We'll use Python 3.11+ with these core libraries:
# Create a virtual environment
python -m venv kg_env
source kg_env/bin/activate # On Windows: kg_env\Scripts\activate
# Install core dependencies
pip install langchain==0.3.0 openai [8]==1.55.0 neo4j==5.26.0 \
python-dotenv==1.0.1 pydantic==2.9.0 tiktoken==0.8.0 \
tenacity==9.0.0 networkx==3.4.0
# For document parsing
pip install pypdf==5.1.0 python-docx==1.1.2 beautifulsoup4==4.12.3
# For performance monitoring
pip install loguru==0.7.3
You'll need:
- An OpenAI API key (or compatible LLM provider) with access to GPT [4]-4 or GPT-4o-mini
- A Neo4j database instance (local Docker or AuraDB free tier)
- Sample documents (we'll use public SEC filings or Wikipedia articles)
Set up your environment variables:
echo "OPENAI_API_KEY=sk-your-key-here" > .env
echo "NEO4J_URI=bolt://localhost:7687" >> .env
echo "NEO4J_USER=neo4j" >> .env
echo "NEO4J_PASSWORD=your-password" >> .env
Designing the Extraction Pipeline
Our pipeline consists of four stages: document chunking, entity-relationship extraction, disambiguation, and graph insertion. The critical design decision is how to structure the LLM prompt to produce consistent, parseable output.
Chunking Strategy
Documents must be split into chunks that fit within the LLM's context window while preserving semantic boundaries. We'll use LangChain [7]'s recursive character text splitter with overlap to maintain context across chunk boundaries:
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List, Dict
def chunk_document(text: str, chunk_size: int = 4000, chunk_overlap: int = 200) -> List[Dict[str, any]]:
"""
Split document into overlapping chunks for processing.
Args:
text: Raw document text
chunk_size: Target tokens per chunk (conservative for GPT-4 context)
chunk_overlap: Overlap tokens to maintain entity continuity
Returns:
List of chunk dictionaries with metadata
"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len, # Character-based for simplicity; use tiktoken for token accuracy
separators=["\n\n", "\n", ". ", " ", ""]
)
chunks = splitter.split_text(text)
return [{"text": chunk, "index": i} for i, chunk in enumerate(chunks)]
Edge case: When chunking legal or technical documents, preserve section headers. The splitter's separators parameter ensures we break at paragraph boundaries first, avoiding mid-sentence splits that confuse entity extraction.
Structured Extraction with LLM
We'll define a Pydantic model for our knowledge graph schema and use LangChain's structured output parser to enforce JSON consistency:
from pydantic import BaseModel, Field
from typing import List, Optional
from langchain.output_parsers import PydanticOutputParser
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
class Entity(BaseModel):
"""Represents a named entity in the knowledge graph."""
name: str = Field(description="Canonical name of the entity")
type: str = Field(description="Entity type (e.g., Person, Organization, Location, Concept)")
description: Optional[str] = Field(default=None, description="Brief description for disambiguation")
class Relationship(BaseModel):
"""Represents a directed relationship between two entities."""
source: str = Field(description="Name of the source entity")
target: str = Field(description="Name of the target entity")
relation: str = Field(description="Type of relationship (e.g., 'founded_by', 'located_in')")
confidence: float = Field(ge=0.0, le=1.0, description="Extraction confidence score")
class KnowledgeExtraction(BaseModel):
"""Container for extracted entities and relationships from a text chunk."""
entities: List[Entity] = Field(description="Entities found in this chunk")
relationships: List[Relationship] = Field(description="Relationships found in this chunk")
def create_extraction_chain(llm_model: str = "gpt-4o-mini"):
"""
Build the extraction chain with structured output parsing.
Using GPT-4o-mini for cost efficiency; switch to gpt-4 for complex documents.
"""
llm = ChatOpenAI(
model=llm_model,
temperature=0.1, # Low temperature for consistent extraction
max_tokens=2000
)
parser = PydanticOutputParser(pydantic_object=KnowledgeExtraction)
prompt = ChatPromptTemplate.from_messages([
("system", """You are a knowledge graph extraction specialist. Extract entities and their relationships from the given text.
Guidelines:
- Extract only explicitly stated information; do not infer relationships
- Use consistent entity names (e.g., always "Apple Inc." not "Apple" or "Apple Company")
- For relationships, use standard predicates: 'founded_by', 'acquired', 'located_in', 'employs', 'produces', 'part_of', 'contains'
- Assign confidence scores: 1.0 for explicit statements, 0.7 for implied relationships
- Skip generic entities like "the company" or "the document"
{format_instructions}"""),
("human", "Text: {text}")
])
chain = prompt | llm | parser
return chain
Production consideration: The prompt explicitly restricts relationship types to a controlled vocabulary. In a real deployment, you'd maintain a dynamic ontology in a configuration file or database, updating it as new relationship types emerge from your data.
Handling Disambiguation and Deduplication
Raw LLM output often contains duplicate entities ("Microsoft" vs. "Microsoft Corporation") or ambiguous references ("Paris" the city vs. "Paris" the person). We'll implement a two-stage deduplication pipeline:
from rapidfuzz import fuzz, process
import networkx as nx
from typing import Set, Tuple
class EntityResolver:
"""
Resolve duplicate and ambiguous entities using fuzzy matching and context.
"""
def __init__(self, threshold: float = 0.85):
self.threshold = threshold
self.entity_registry: Dict[str, Entity] = {} # canonical_name -> Entity
def resolve_entity(self, entity: Entity) -> str:
"""
Return canonical name for an entity, merging if similar exists.
"""
# Check for exact match first
if entity.name in self.entity_registry:
return entity.name
# Fuzzy match against existing entities
existing_names = list(self.entity_registry.keys())
if existing_names:
best_match, score = process.extractOne(
entity.name,
existing_names,
scorer=fuzz.token_sort_ratio
)
if score >= self.threshold * 100:
# Merge descriptions if available
existing = self.entity_registry[best_match]
if entity.description and not existing.description:
existing.description = entity.description
return best_match
# Register new entity
self.entity_registry[entity.name] = entity
return entity.name
def merge_relationships(self, relationships: List[Relationship]) -> List[Relationship]:
"""
Deduplicate relationships after entity resolution.
"""
seen: Set[Tuple[str, str, str]] = set()
merged = []
for rel in relationships:
# Resolve entity names
source = self.resolve_entity(Entity(name=rel.source, type=""))
target = self.resolve_entity(Entity(name=rel.target, type=""))
key = (source, target, rel.relation)
if key not in seen:
seen.add(key)
merged.append(Relationship(
source=source,
target=target,
relation=rel.relation,
confidence=rel.confidence
))
# Could implement confidence averaging for duplicates
return merged
Edge case: Fuzzy matching can incorrectly merge distinct entities with similar names (e.g., "Apple Bank" and "Apple Inc."). The description field helps disambiguate—we could extend the resolver to compare entity types before merging.
Graph Insertion with Neo4j
With resolved entities and relationships, we insert into Neo4j using batch operations for performance:
from neo4j import GraphDatabase, AsyncGraphDatabase
from loguru import logger
class KnowledgeGraphStore:
"""
Manages insertion and querying of knowledge graph data in Neo4j.
"""
def __init__(self, uri: str, user: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
self._create_constraints()
def _create_constraints(self):
"""Create uniqueness constraints for entity names."""
with self.driver.session() as session:
session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (e:Entity) REQUIRE e.name IS UNIQUE")
session.run("CREATE INDEX IF NOT EXISTS FOR (e:Entity) ON (e.type)")
def insert_extraction(self, extraction: KnowledgeExtraction, document_id: str):
"""
Insert entities and relationships from a single extraction.
Uses MERGE to avoid duplicates.
"""
with self.driver.session() as session:
# Batch insert entities
for entity in extraction.entities:
session.run(
"""
MERGE (e:Entity {name: $name})
SET e.type = $type,
e.description = COALESCE($description, e.description)
""",
name=entity.name,
type=entity.type,
description=entity.description
)
# Batch insert relationships
for rel in extraction.relationships:
session.run(
"""
MATCH (source:Entity {name: $source})
MATCH (target:Entity {name: $target})
MERGE (source)-[r:RELATES {type: $relation}]->(target)
SET r.confidence = $confidence,
r.document_id = $document_id
""",
source=rel.source,
target=rel.target,
relation=rel.relation,
confidence=rel.confidence,
document_id=document_id
)
logger.info(f"Inserted {len(extraction.entities)} entities and {len(extraction.relationships)} relationships")
def query_graph(self, cypher_query: str, params: dict = None) -> List[dict]:
"""Execute arbitrary Cypher query for downstream applications."""
with self.driver.session() as session:
result = session.run(cypher_query, params or {})
return [record.data() for record in result]
def close(self):
self.driver.close()
Production consideration: For high-throughput scenarios, use Neo4j's UNWIND batch operations instead of individual MERGE statements. The above code prioritizes readability; a production version would batch 100-500 operations per transaction.
Complete Pipeline Orchestration
Now we tie everything together into a production pipeline with retry logic and rate limiting:
from tenacity import retry, stop_after_attempt, wait_exponential
from dotenv import load_dotenv
import os
import json
load_dotenv()
class KnowledgeGraphPipeline:
"""
End-to-end pipeline for building knowledge graphs from documents.
"""
def __init__(self):
self.extraction_chain = create_extraction_chain()
self.resolver = EntityResolver()
self.graph_store = KnowledgeGraphStore(
uri=os.getenv("NEO4J_URI"),
user=os.getenv("NEO4J_USER"),
password=os.getenv("NEO4J_PASSWORD")
)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10),
reraise=True
)
def process_chunk(self, chunk: Dict[str, any], document_id: str) -> KnowledgeExtraction:
"""
Process a single chunk with retry logic for API failures.
"""
try:
extraction = self.extraction_chain.invoke({"text": chunk["text"]})
# Resolve entities across chunks
for entity in extraction.entities:
entity.name = self.resolver.resolve_entity(entity)
extraction.relationships = self.resolver.merge_relationships(extraction.relationships)
return extraction
except Exception as e:
logger.error(f"Failed to process chunk {chunk['index']}: {e}")
raise
def process_document(self, text: str, document_id: str, batch_size: int = 5):
"""
Process a full document, chunking and extracting in batches.
"""
chunks = chunk_document(text)
logger.info(f"Processing {len(chunks)} chunks for document {document_id}")
# Process in batches to respect API rate limits
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i+batch_size]
for chunk in batch:
extraction = self.process_chunk(chunk, document_id)
self.graph_store.insert_extraction(extraction, document_id)
logger.info(f"Completed batch {i//batch_size + 1}/{(len(chunks)-1)//batch_size + 1}")
def query(self, question: str) -> List[dict]:
"""
Example query: Find entities related to a given concept.
"""
cypher = """
MATCH (e:Entity)-[r]-(related)
WHERE e.name CONTAINS $concept
RETURN e.name AS entity, type(r) AS relation, related.name AS related_entity
LIMIT 50
"""
return self.graph_store.query_graph(cypher, {"concept": question})
def close(self):
self.graph_store.close()
# Usage example
if __name__ == "__main__":
pipeline = KnowledgeGraphPipeline()
# Sample document (in production, load from PDF or web)
sample_text = """
Apple Inc. was founded by Steve Jobs, Steve Wozniak, and Ronald Wayne in 1976.
The company is headquartered in Cupertino, California. In 2023, Apple acquired
the music recognition company Shazam. Tim Cook serves as the CEO of Apple.
"""
pipeline.process_document(sample_text, "doc_001")
# Query the graph
results = pipeline.query("Apple")
for r in results:
print(f"{r['entity']} --[{r['relation']}]--> {r['related_entity']}")
pipeline.close()
Edge Cases and Production Hardening
1. LLM Hallucination Detection
LLMs may invent entities or relationships not present in the source text. Implement a validation layer:
def validate_extraction(extraction: KnowledgeExtraction, source_text: str) -> KnowledgeExtraction:
"""
Filter out entities and relationships not explicitly mentioned in source text.
Uses simple substring matching; for production, use semantic similarity.
"""
valid_entities = []
for entity in extraction.entities:
if entity.name.lower() in source_text.lower():
valid_entities.append(entity)
valid_relationships = []
for rel in extraction.relationships:
if (rel.source.lower() in source_text.lower() and
rel.target.lower() in source_text.lower()):
valid_relationships.append(rel)
return KnowledgeExtraction(entities=valid_entities, relationships=valid_relationships)
2. Token Budget Management
Large documents can exceed API token limits. Implement a token counter:
import tiktoken
def count_tokens(text: str, model: str = "gpt-4o-mini") -> int:
"""Count tokens for a given text and model."""
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
def adaptive_chunking(text: str, max_tokens: int = 3000) -> List[str]:
"""
Chunk text based on token count rather than character count.
"""
encoding = tiktoken.encoding_for_model("gpt-4o-mini")
tokens = encoding.encode(text)
chunks = []
for i in range(0, len(tokens), max_tokens):
chunk_tokens = tokens[i:i + max_tokens]
chunks.append(encoding.decode(chunk_tokens))
return chunks
3. Concurrent Processing with Rate Limits
For processing thousands of documents, use asyncio with rate limiting:
import asyncio
from aiolimiter import AsyncLimiter
class AsyncKnowledgeGraphPipeline:
def __init__(self, rpm: int = 500):
self.limiter = AsyncLimiter(rpm, 60) # 500 requests per minute
# .. other initialization
async def process_chunk_async(self, chunk: Dict[str, any], document_id: str):
async with self.limiter:
# Async extraction logic
pass
Querying the Knowledge Graph
Once built, the knowledge graph enables powerful queries. Here are three production use cases:
Multi-hop Reasoning
// Find all people who founded companies that Apple acquired
MATCH (person:Entity {type: "Person"})-[:founded_by]->(company:Entity)
MATCH (company)<-[:acquired]-(apple:Entity {name: "Apple Inc."})
RETURN person.name AS founder, company.name AS acquired_company
Temporal Queries
// Find entities with relationships established after 2020
MATCH (e:Entity)-[r:RELATES]->(target)
WHERE r.document_id IN $recent_document_ids
RETURN e.name, r.type, target.name
Graph Analytics
import networkx as nx
def build_networkx_graph(neo4j_driver):
"""Export Neo4j graph to NetworkX for centrality analysis."""
G = nx.DiGraph()
with neo4j_driver.session() as session:
result = session.run("MATCH (s)-[r]->(t) RETURN s.name, r.type, t.name")
for record in result:
G.add_edge(record["s.name"], record["t.name"], relation=record["r.type"])
# Calculate PageRank
pagerank = nx.pagerank(G)
top_entities = sorted(pagerank.items(), key=lambda x: x[1], reverse=True)[:10]
return top_entities
What's Next
You've built a production-ready knowledge graph pipeline that transforms unstructured documents into a queryable graph structure. The system handles entity disambiguation, relationship extraction, and scalable insertion into Neo4j.
To extend this project:
- Add incremental updates: Track document versions and update only changed entities using Neo4j's temporal features
- Implement feedback loops: Allow human reviewers to correct extraction errors, feeding corrections back into the LLM prompt
- Explore graph neural networks: Use the constructed graph for link prediction (predicting missing relationships) using libraries like PyTorch [5] Geometric
For further reading, explore our guides on vector search optimization and LLM fine-tuning strategies. The complete source code for this tutorial is available on GitHub (search for "knowledge-graph-pipeline").
Remember: knowledge graphs are only as good as the extraction quality. Invest in prompt engineering and validation—your downstream applications depend on it.
References
Was this article helpful?
Let us know to improve our AI generation.
Related Articles
How to Build a Multimodal App with Gemini 2.0 Vision API
Practical tutorial: Build a multimodal app with Gemini 2.0 Vision API
How to Build an AI Pentesting Assistant with LangChain
Practical tutorial: Build an AI-powered pentesting assistant
How to Build Autonomous Scientific Discovery Agents with EurekAgent
Practical tutorial: The story discusses a significant advancement in AI research that could impact autonomous scientific discovery.