Back to Tutorials
tutorialstutorialai

How to Build a Telegram Bot with DeepSeek-R1 Reasoning

Practical tutorial: Build a Telegram bot with DeepSeek-R1 reasoning

BlogIA AcademyJune 19, 202614 min read2 724 words

How to Build a Telegram Bot with DeepSeek-R1 Reasoning

Table of Contents

📺 Watch: Neural Networks Explained

Video by 3Blue1Brown


Building a Telegram bot that leverag [2]es advanced reasoning capabilities requires careful architecture design and robust error handling. In this tutorial, we'll create a production-ready Telegram bot that uses DeepSeek-R1 for complex reasoning tasks, with proper state management, rate limiting, and error recovery.

Real-World Use Case and Architecture

The combination of Telegram's massive user base (over 900 million monthly active users as of 2026) with DeepSeek-R1's reasoning capabilities opens up numerous production applications. Consider a scientific research assistant that helps researchers analyze papers, or a technical support bot that can debug complex code issues. The architecture we'll build handles these use cases by implementing:

  • Asynchronous message processing to handle multiple concurrent users
  • Conversation state management for multi-turn reasoning
  • Rate limiting to respect API constraints
  • Error recovery with automatic retry logic
  • Caching for frequently accessed reasoning results

The system architecture follows a microservices pattern where the Telegram bot acts as a thin client, forwarding messages to a reasoning engine that interfaces with DeepSeek-R1 through its API.

Prerequisites and Environment Setup

Before we begin, ensure you have Python 3.10+ installed. We'll use the following stack:

# Create a virtual environment
python -m venv telegram-deepseek
source telegram-deepseek/bin/activate  # On Windows: telegram-deepseek\Scripts\activate

# Install core dependencies
pip install python-telegram-bot==20.7
pip install openai [9]==1.12.0  # DeepSeek uses OpenAI-compatible API
pip install redis==5.0.1    # For state management and caching
pip install aiohttp==3.9.1  # Async HTTP client
pip install pydantic==2.5.3 # Data validation
pip install python-dotenv==1.0.0

You'll need:

  1. A Telegram Bot Token from @BotFather
  2. A DeepSeek API key from platform.deepseek.com
  3. Redis server running locally or remotely

Create a .env file:

TELEGRAM_BOT_TOKEN=your_telegram_bot_token_here
DEEPSEEK_API_KEY=your_deepseek_api_key_here
DEEPSEEK_API_BASE=https://api.deepseek.com/v1
REDIS_URL=redis://localhost:6379/0
MAX_CONVERSATION_LENGTH=10
RATE_LIMIT_PER_MINUTE=30

Core Implementation: Building the Reasoning Bot

1. Configuration and Data Models

First, let's establish our configuration and data models using Pydantic for type safety:

# config.py
from pydantic import BaseSettings, Field
from typing import Optional

class Settings(BaseSettings):
    telegram_bot_token: str
    deepseek_api_key: str
    deepseek_api_base: str = "https://api.deepseek.com/v1"
    redis_url: str = "redis://localhost:6379/0"
    max_conversation_length: int = 10
    rate_limit_per_minute: int = 30

    class Config:
        env_file = ".env"
        env_file_encoding = "utf-8"

settings = Settings()

2. Conversation State Management with Redis

We need robust state management to handle multi-turn reasoning conversations. Redis provides the perfect solution with its fast key-value storage and TTL support:

# state_manager.py
import json
import hashlib
from typing import Optional, List, Dict
import redis.asyncio as redis
from datetime import datetime, timedelta

class ConversationState:
    def __init__(self, user_id: int, chat_id: int):
        self.user_id = user_id
        self.chat_id = chat_id
        self.messages: List[Dict] = []
        self.context: Dict = {}
        self.created_at: datetime = datetime.utcnow()
        self.last_activity: datetime = datetime.utcnow()

class StateManager:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url, decode_responses=True)
        self.session_ttl = timedelta(hours=24)  # Sessions expire after 24 hours

    def _get_state_key(self, user_id: int, chat_id: int) -> str:
        """Generate a unique key for each user-chat combination."""
        raw_key = f"{user_id}:{chat_id}"
        return f"conversation:{hashlib.sha256(raw_key.encode()).hexdigest()}"

    async def get_state(self, user_id: int, chat_id: int) -> Optional[ConversationState]:
        """Retrieve conversation state from Redis."""
        key = self._get_state_key(user_id, chat_id)
        data = await self.redis.get(key)

        if data is None:
            return None

        state_dict = json.loads(data)
        state = ConversationState(
            user_id=state_dict['user_id'],
            chat_id=state_dict['chat_id']
        )
        state.messages = state_dict['messages']
        state.context = state_dict.get('context', {})
        state.created_at = datetime.fromisoformat(state_dict['created_at'])
        state.last_activity = datetime.fromisoformat(state_dict['last_activity'])

        return state

    async def save_state(self, state: ConversationState) -> None:
        """Persist conversation state to Redis with TTL."""
        key = self._get_state_key(state.user_id, state.chat_id)
        state.last_activity = datetime.utcnow()

        state_dict = {
            'user_id': state.user_id,
            'chat_id': state.chat_id,
            'messages': state.messages,
            'context': state.context,
            'created_at': state.created_at.isoformat(),
            'last_activity': state.last_activity.isoformat()
        }

        await self.redis.setex(
            key,
            int(self.session_ttl.total_seconds()),
            json.dumps(state_dict)
        )

    async def clear_state(self, user_id: int, chat_id: int) -> None:
        """Clear conversation state for a user."""
        key = self._get_state_key(user_id, chat_id)
        await self.redis.delete(key)

3. DeepSeek-R1 Integration with Reasoning Pipeline

The core reasoning engine interfaces with DeepSeek's API. We implement proper error handling, retry logic, and streaming support:

# reasoning_engine.py
import asyncio
from typing import AsyncGenerator, List, Dict, Optional
from openai import AsyncOpenAI
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import aiohttp

class ReasoningEngine:
    def __init__(self, api_key: str, api_base: str):
        self.client = AsyncOpenAI(
            api_key=api_key,
            base_url=api_base
        )
        self.model = "deepseek-reasoner"  # DeepSeek-R1 model identifier
        self.max_tokens = 4096
        self.temperature = 0.7

    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=4, max=10),
        retry=retry_if_exception_type((aiohttp.ClientError, asyncio.TimeoutError))
    )
    async def reason(
        self,
        messages: List[Dict[str, str]],
        stream: bool = False
    ) -> AsyncGenerator[str, None]:
        """
        Send messages to DeepSeek-R1 for reasoning.
        Implements exponential backoff retry for transient failures.
        """
        try:
            response = await self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                max_tokens=self.max_tokens,
                temperature=self.temperature,
                stream=stream
            )

            if stream:
                async for chunk in response:
                    if chunk.choices[0].delta.content:
                        yield chunk.choices[0].delta.content
            else:
                # For non-streaming, yield the complete response
                yield response.choices[0].message.content

        except Exception as e:
            # Log the error and re-raise for retry
            print(f"DeepSeek API error: {str(e)}")
            raise

    async def analyze_with_context(
        self,
        user_message: str,
        conversation_history: List[Dict],
        context: Optional[Dict] = None
    ) -> str:
        """
        Perform reasoning with full conversation context.
        This is where the actual reasoning pipeline executes.
        """
        system_prompt = """You are an advanced reasoning assistant powered by DeepSeek-R1. 
        Provide step-by-step reasoning for complex problems. When analyzing scientific papers 
        or technical content, break down your reasoning into clear, logical steps.

        If the user asks about specific research papers, you can reference findings from 
        published studies. For example, the combined analysis of CMS and LHCb data on the 
        rare B^0_s→μ^+μ^- decay demonstrates how collaborative scientific analysis can 
        reveal rare physical processes [Source: ArXiv]. Similarly, the ATLAS experiment's 
        expected performance documentation shows how detector systems are characterized 
        before data collection begins [Source: ArXiv].

        Always maintain scientific rigor and cite sources when making factual claims."""

        full_messages = [
            {"role": "system", "content": system_prompt}
        ]

        # Add conversation history for context
        full_messages.extend(conversation_history[-10:])  # Last 10 messages

        # Add current user message
        full_messages.append({"role": "user", "content": user_message})

        # Execute reasoning
        result_parts = []
        async for chunk in self.reason(full_messages, stream=True):
            result_parts.append(chunk)

        return "".join(result_parts)

4. Rate Limiter Implementation

To prevent abuse and respect API limits, we implement a token bucket rate limiter:

# rate_limiter.py
import time
import asyncio
from collections import defaultdict
from typing import Dict, Tuple

class TokenBucket:
    """Token bucket rate limiter implementation."""

    def __init__(self, rate: int, per: float = 60.0):
        self.rate = rate  # Number of tokens
        self.per = per    # Time period in seconds
        self.tokens = rate
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self) -> bool:
        """Try to acquire a token. Returns True if successful."""
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill

            # Refill tokens based on elapsed time
            self.tokens = min(
                self.rate,
                self.tokens + (elapsed * (self.rate / self.per))
            )
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False

class RateLimiter:
    """Global rate limiter managing per-user buckets."""

    def __init__(self, default_rate: int = 30, default_per: float = 60.0):
        self.default_rate = default_rate
        self.default_per = default_per
        self.buckets: Dict[int, TokenBucket] = {}
        self._lock = asyncio.Lock()

    async def check_rate_limit(self, user_id: int) -> Tuple[bool, float]:
        """
        Check if user is rate limited.
        Returns (allowed, retry_after_seconds).
        """
        async with self._lock:
            if user_id not in self.buckets:
                self.buckets[user_id] = TokenBucket(
                    self.default_rate, 
                    self.default_per
                )

            bucket = self.buckets[user_id]
            allowed = await bucket.acquire()

            if not allowed:
                # Calculate retry time
                retry_after = self.default_per / self.default_rate
                return False, retry_after

            return True, 0.0

5. Main Telegram Bot Handler

Now we tie everything together in the main bot handler:

# bot.py
import logging
from typing import Optional
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
    Application,
    CommandHandler,
    MessageHandler,
    CallbackQueryHandler,
    filters,
    ContextTypes
)
from telegram.constants import ParseMode

from config import settings
from state_manager import StateManager, ConversationState
from reasoning_engine import ReasoningEngine
from rate_limiter import RateLimiter

# Configure logging
logging.basicConfig(
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    level=logging.INFO
)
logger = logging.getLogger(__name__)

class DeepSeekTelegramBot:
    def __init__(self):
        self.state_manager = StateManager(settings.redis_url)
        self.reasoning_engine = ReasoningEngine(
            settings.deepseek_api_key,
            settings.deepseek_api_base
        )
        self.rate_limiter = RateLimiter(
            default_rate=settings.rate_limit_per_minute
        )
        self.application = Application.builder().token(
            settings.telegram_bot_token
        ).build()

        # Register handlers
        self._register_handlers()

    def _register_handlers(self):
        """Register all command and message handlers."""
        self.application.add_handler(CommandHandler("start", self.start_command))
        self.application.add_handler(CommandHandler("help", self.help_command))
        self.application.add_handler(CommandHandler("clear", self.clear_command))
        self.application.add_handler(CommandHandler("stats", self.stats_command))

        # Handle text messages
        self.application.add_handler(
            MessageHandler(filters.TEXT & ~filters.COMMAND, self.handle_message)
        )

        # Handle callback queries from inline keyboards
        self.application.add_handler(CallbackQueryHandler(self.handle_callback))

        # Error handler
        self.application.add_error_handler(self.error_handler)

    async def start_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle /start command."""
        welcome_text = (
            "🤖 Welcome to the DeepSeek-R1 Reasoning Bot!\n\n"
            "I can help you with complex reasoning tasks, scientific analysis, "
            "and technical problem-solving. Here's what I can do:\n\n"
            "• Analyze research papers and scientific data\n"
            "• Debug code and explain algorithms\n"
            "• Solve complex mathematical problems\n"
            "• Provide step-by-step reasoning for any query\n\n"
            "Send me a message to get started, or use /help for more options."
        )

        keyboard = [
            [InlineKeyboardButton("📚 Example Queries", callback_data="examples")],
            [InlineKeyboardButton("❓ Help", callback_data="help")]
        ]
        reply_markup = InlineKeyboardMarkup(keyboard)

        await update.message.reply_text(
            welcome_text,
            reply_markup=reply_markup,
            parse_mode=ParseMode.MARKDOWN
        )

    async def handle_message(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle incoming text messages with reasoning."""
        user_id = update.effective_user.id
        chat_id = update.effective_chat.id
        message_text = update.message.text

        # Check rate limit
        allowed, retry_after = await self.rate_limiter.check_rate_limit(user_id)
        if not allowed:
            await update.message.reply_text(
                f"⏳ Rate limit exceeded. Please wait {retry_after:.1f} seconds."
            )
            return

        # Send typing indicator
        await context.bot.send_chat_action(
            chat_id=chat_id,
            action="typing"
        )

        try:
            # Get or create conversation state
            state = await self.state_manager.get_state(user_id, chat_id)
            if state is None:
                state = ConversationState(user_id, chat_id)

            # Add user message to history
            state.messages.append({
                "role": "user",
                "content": message_text,
                "timestamp": str(datetime.utcnow())
            })

            # Trim conversation history if too long
            if len(state.messages) > settings.max_conversation_length:
                state.messages = state.messages[-settings.max_conversation_length:]

            # Perform reasoning
            response = await self.reasoning_engine.analyze_with_context(
                message_text,
                state.messages[:-1],  # Exclude current message
                state.context
            )

            # Add assistant response to history
            state.messages.append({
                "role": "assistant",
                "content": response,
                "timestamp": str(datetime.utcnow())
            })

            # Save updated state
            await self.state_manager.save_state(state)

            # Send response with inline actions
            keyboard = [
                [InlineKeyboardButton("🔄 Regenerate", callback_data="regenerate")],
                [InlineKeyboardButton("🗑️ Clear Context", callback_data="clear")]
            ]
            reply_markup = InlineKeyboardMarkup(keyboard)

            # Split long messages if needed
            if len(response) > 4096:
                for i in range(0, len(response), 4096):
                    await update.message.reply_text(
                        response[i:i+4096],
                        reply_markup=reply_markup if i + 4096 >= len(response) else None,
                        parse_mode=ParseMode.MARKDOWN
                    )
            else:
                await update.message.reply_text(
                    response,
                    reply_markup=reply_markup,
                    parse_mode=ParseMode.MARKDOWN
                )

        except Exception as e:
            logger.error(f"Error processing message: {str(e)}", exc_info=True)
            await update.message.reply_text(
                "❌ An error occurred while processing your request. "
                "Please try again or use /clear to reset the conversation."
            )

    async def clear_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Clear conversation context."""
        user_id = update.effective_user.id
        chat_id = update.effective_chat.id

        await self.state_manager.clear_state(user_id, chat_id)
        await update.message.reply_text(
            "✅ Conversation context cleared. Starting fresh!"
        )

    async def stats_command(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Show conversation statistics."""
        user_id = update.effective_user.id
        chat_id = update.effective_chat.id

        state = await self.state_manager.get_state(user_id, chat_id)
        if state is None:
            await update.message.reply_text("No active conversation found.")
            return

        stats_text = (
            f"📊 **Conversation Statistics**\n\n"
            f"• Messages exchanged: {len(state.messages)}\n"
            f"• Session started: {state.created_at.strftime('%Y-%m-%d %H:%M UTC')}\n"
            f"• Last activity: {state.last_activity.strftime('%Y-%m-%d %H:%M UTC')}\n"
            f"• Context size: {len(state.context)} items"
        )

        await update.message.reply_text(stats_text, parse_mode=ParseMode.MARKDOWN)

    async def handle_callback(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle inline keyboard callbacks."""
        query = update.callback_query
        await query.answer()

        if query.data == "examples":
            examples_text = (
                "Example Queries:**\n\n"
                "1. \"Explain the significance of the B^0_s→μ^+μ^- decay observation "
                "from the CMS and LHCb combined analysis\"\n\n"
                "2. \"How does the ATLAS detector's trigger system work for "
                "high-energy physics experiments?\"\n\n"
                "3. \"Analyze the joint search for gravitational waves and "
                "high-energy neutrinos using IceCube data\"\n\n"
                "4. \"Debug this Python code: [paste your code]\"\n\n"
                "5. \"Solve this differential equation step by step: dy/dx = x^2 + y\""
            )
            await query.edit_message_text(examples_text, parse_mode=ParseMode.MARKDOWN)

        elif query.data == "regenerate":
            # Regenerate last response
            user_id = update.effective_user.id
            chat_id = update.effective_chat.id

            state = await self.state_manager.get_state(user_id, chat_id)
            if state and len(state.messages) >= 2:
                # Remove last assistant response
                state.messages.pop()
                last_user_msg = state.messages[-1]["content"]

                # Regenerate
                response = await self.reasoning_engine.analyze_with_context(
                    last_user_msg,
                    state.messages[:-1],
                    state.context
                )

                state.messages.append({
                    "role": "assistant",
                    "content": response,
                    "timestamp": str(datetime.utcnow())
                })

                await self.state_manager.save_state(state)
                await query.edit_message_text(response, parse_mode=ParseMode.MARKDOWN)

        elif query.data == "clear":
            user_id = update.effective_user.id
            chat_id = update.effective_chat.id
            await self.state_manager.clear_state(user_id, chat_id)
            await query.edit_message_text("✅ Context cleared!")

    async def error_handler(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
        """Handle errors gracefully."""
        logger.error(f"Update {update} caused error {context.error}")

        try:
            if update and update.effective_message:
                await update.effective_message.reply_text(
                    "⚠️ An unexpected error occurred. Our team has been notified."
                )
        except Exception:
            pass

    def run(self):
        """Start the bot."""
        logger.info("Starting DeepSeek-R1 Telegram Bot..")
        self.application.run_polling(allowed_updates=Update.ALL_TYPES)

if __name__ == "__main__":
    bot = DeepSeekTelegramBot()
    bot.run()

Edge Cases and Production Considerations

Handling Long-Running Reasoning Tasks

DeepSeek-R1 can take significant time for complex reasoning tasks. Implement timeout handling:

async def handle_message_with_timeout(self, update, context, timeout=120):
    """Handle messages with a timeout for long reasoning tasks."""
    try:
        async with asyncio.timeout(timeout):
            await self.handle_message(update, context)
    except asyncio.TimeoutError:
        await update.message.reply_text(
            "⏰ The reasoning task took too long. Please try a simpler query."
        )

Memory Management for Large Conversations

For conversations that accumulate significant context, implement smart truncation:

def _smart_truncate_messages(self, messages: List[Dict], max_tokens: int = 8000) -> List[Dict]:
    """Truncate conversation history while preserving context."""
    total_tokens = sum(len(msg["content"].split()) for msg in messages)

    if total_tokens <= max_tokens:
        return messages

    # Keep system message and recent messages
    truncated = []
    token_count = 0

    for msg in reversed(messages):
        msg_tokens = len(msg["content"].split())
        if token_count + msg_tokens > max_tokens:
            break
        truncated.insert(0, msg)
        token_count += msg_tokens

    return truncated

Handling API Rate Limits and Quotas

DeepSeek's API has rate limits that vary by plan. Implement adaptive rate limiting:

class AdaptiveRateLimiter:
    """Rate limiter that adapts based on API responses."""

    def __init__(self):
        self.consecutive_errors = 0
        self.backoff_multiplier = 1
        self.max_backoff = 60  # Maximum 60 seconds backoff

    async def handle_api_error(self, error: Exception):
        """Adjust rate limiting based on API errors."""
        if "rate_limit" in str(error).lower():
            self.consecutive_errors += 1
            self.backoff_multiplier = min(
                self.backoff_multiplier * 2,
                self.max_backoff
            )
            await asyncio.sleep(self.backoff_multiplier)
        else:
            self.consecutive_errors = 0
            self.backoff_multiplier = 1

Deployment and Monitoring

Docker Deployment

Create a production-ready Dockerfile:

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY .

# Create non-root user
RUN useradd -m -u 1000 botuser && chown -R botuser:botuser /app
USER botuser

CMD ["python", "bot.py"]

Health Check Endpoint

Add a simple health check for monitoring:

from fastapi import FastAPI
import uvicorn

app = FastAPI()

@app.get("/health")
async def health_check():
    """Health check endpoint for monitoring."""
    return {
        "status": "healthy",
        "timestamp": datetime.utcnow().isoformat(),
        "redis_connected": await check_redis_connection()
    }

async def check_redis_connection() -> bool:
    """Check Redis connection health."""
    try:
        await state_manager.redis.ping()
        return True
    except Exception:
        return False

Conclusion

We've built a production-ready Telegram bot that leverages DeepSeek-R1's reasoning capabilities. The architecture handles concurrent users, maintains conversation state, implements rate limiting, and provides graceful error recovery. The bot can analyze scientific papers, debug code, and solve complex problems with step-by-step reasoning.

Key production considerations include:

  • State persistence with Redis for fault tolerance
  • Rate limiting to prevent abuse and respect API quotas
  • Error recovery with exponential backoff retry logic
  • Memory management for long-running conversations
  • Monitoring through health checks and structured logging

What's Next

To extend this bot for production use:

  1. Add user authentication for premium features
  2. Implement webhook mode instead of polling for better scalability
  3. Add analytics to track usage patterns and popular queries
  4. Integrate with vector database [3]s for long-term memory
  5. Add support for file uploads (PDFs, images) for multimodal reasoning

The complete source code is available on GitHub. For more tutorials on building AI-powered applications, check out our guides on building reasoning systems and deploying LLM applications.


References

1. Wikipedia - OpenAI. Wikipedia. [Source]
2. Wikipedia - Rag. Wikipedia. [Source]
3. Wikipedia - Vector database. Wikipedia. [Source]
4. arXiv - Observation of the rare $B^0_s\toμ^+μ^-$ decay from the comb. Arxiv. [Source]
5. arXiv - Expected Performance of the ATLAS Experiment - Detector, Tri. Arxiv. [Source]
6. GitHub - openai/openai-python. Github. [Source]
7. GitHub - Shubhamsaboo/awesome-llm-apps. Github. [Source]
8. GitHub - milvus-io/milvus. Github. [Source]
9. OpenAI Pricing. Pricing. [Source]
tutorialai
Share this article:

Was this article helpful?

Let us know to improve our AI generation.

Related Articles