Source code for haive.mcp.utils.extract_mcp_github_repos

#!/usr/bin/env python3
"""Enhanced MCP Repository Extractor with README Processing.

This script:
1. Extracts repository URLs from awesome-mcp-servers
2. Downloads and processes README files
3. Converts to LangChain Documents with metadata
4. Organizes resources for agent access
"""

import asyncio
import hashlib
import json
import os
import re
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Any

import aiohttp
import yaml
from langchain_core.documents import Document
from pydantic import BaseModel, ConfigDict, Field, field_validator
from rich.console import Console
from rich.progress import BarColumn, Progress, SpinnerColumn, TextColumn
from rich.table import Table

console = Console()


[docs] class MCPCategory(str, Enum): """MCP Server Categories.""" FILE_SYSTEMS = "File Systems" SANDBOX_VIRTUALIZATION = "Sandbox & Virtualization" VERSION_CONTROL = "Version Control" CLOUD_STORAGE = "Cloud Storage" DATABASES = "Databases" COMMUNICATION = "Communication" MONITORING = "Monitoring" SEARCH_WEB = "Search & Web" LOCATION_SERVICES = "Location Services" MARKETING = "Marketing" NOTE_TAKING = "Note Taking" CLOUD_PLATFORMS = "Cloud Platforms" WORKFLOW_AUTOMATION = "Workflow Automation" SYSTEM_AUTOMATION = "System Automation" SOCIAL_MEDIA = "Social Media" GAMING = "Gaming" FINANCE = "Finance" RESEARCH_DATA = "Research & Data" AI_SERVICES = "AI Services" DEVELOPMENT_TOOLS = "Development Tools" DATA_VISUALIZATION = "Data Visualization" IDENTITY = "Identity" AGGREGATORS = "Aggregators" LANGUAGE_TRANSLATION = "Language & Translation" SECURITY = "Security" IOT = "IoT" ART_LITERATURE = "Art & Literature" OTHER = "Other"
[docs] class MCPLanguage(str, Enum): """Programming Languages.""" PYTHON = "Python" TYPESCRIPT_JAVASCRIPT = "TypeScript/JavaScript" GO = "Go" RUST = "Rust" CSHARP = "C#" JAVA = "Java" C_CPP = "C/C++" OTHER = "Other"
[docs] class MCPPlatform(str, Enum): """Supported Platforms.""" MACOS = "macOS" WINDOWS = "Windows" LINUX = "Linux" CROSS_PLATFORM = "Cross-Platform"
[docs] class MCPScope(str, Enum): """Server Scope.""" CLOUD = "cloud" LOCAL = "local" EMBEDDED = "embedded"
[docs] class MCPServerMetadata(BaseModel): """Metadata for an MCP Server.""" model_config = ConfigDict( extra="forbid", validate_assignment=True, use_enum_values=True ) name: str = Field(..., description="Server name") owner: str = Field(..., description="GitHub owner/organization") repo_name: str = Field(..., description="Repository name") repo_url: str = Field(..., description="Full GitHub repository URL") description: str | None = Field(None, description="Server description") category: MCPCategory = Field(MCPCategory.OTHER, description="Server category") languages: list[MCPLanguage] = Field( default_factory=list, description="Programming languages used" ) is_official: bool = Field( False, description="Whether this is an official implementation" ) platforms: list[MCPPlatform] = Field( default_factory=list, description="Supported platforms" ) scopes: list[MCPScope] = Field( default_factory=list, description="Server scopes (cloud/local/embedded)" ) stars: int | None = Field(None, description="GitHub stars count") last_updated: datetime | None = Field(None, description="Last update timestamp") license: str | None = Field(None, description="License type") readme_url: str | None = Field(None, description="README URL") api_base_url: str | None = Field(None, description="GitHub API base URL")
[docs] @field_validator("repo_url") @classmethod def validate_repo_url(cls, v: str) -> str: """Validate GitHub repository URL.""" if not v.startswith(("https://github.com/", "http://github.com/")): raise ValueError("Invalid GitHub repository URL") return v
[docs] def get_unique_id(self) -> str: """Generate unique ID for this server.""" return f"{self.owner}/{self.repo_name}"
[docs] def to_langchain_metadata(self) -> dict[str, Any]: """Convert to LangChain Document metadata format.""" return { "source": self.repo_url, "server_name": self.name, "owner": self.owner, "repo_name": self.repo_name, "category": self.category, "languages": list(self.languages), "is_official": self.is_official, "platforms": list(self.platforms), "scopes": list(self.scopes), "stars": self.stars, "last_updated": ( self.last_updated.isoformat() if self.last_updated else None ), "license": self.license, "description": self.description, }
[docs] class MCPServerDocument(BaseModel): """Complete MCP Server Document.""" model_config = ConfigDict(extra="forbid", validate_assignment=True) metadata: MCPServerMetadata = Field(..., description="Server metadata") readme_content: str | None = Field(None, description="README content") extracted_at: datetime = Field( default_factory=datetime.now, description="Extraction timestamp" ) content_hash: str | None = Field(None, description="SHA256 hash of README content")
[docs] def compute_content_hash(self) -> str: """Compute SHA256 hash of README content.""" if self.readme_content: return hashlib.sha256(self.readme_content.encode()).hexdigest() return ""
[docs] def to_langchain_document(self) -> Document: """Convert to LangChain Document.""" metadata = self.metadata.to_langchain_metadata() metadata.update( { "extracted_at": self.extracted_at.isoformat(), "content_hash": self.content_hash or self.compute_content_hash(), "document_type": "mcp_server_readme", } ) return Document( page_content=self.readme_content or f"# {self.metadata.name}\n\nNo README content available.", metadata=metadata, )
[docs] class ExtractionStats(BaseModel): """Statistics for extraction process.""" total_found: int = 0 successfully_extracted: int = 0 failed_extractions: int = 0 categories: dict[str, int] = Field(default_factory=dict) languages: dict[str, int] = Field(default_factory=dict) extraction_duration: float | None = None
[docs] class MCPRepositoryExtractor: """Enhanced MCP Repository Extractor.""" def __init__(self, output_dir: str = "agent_resources/mcp_servers"): """ Init . Args: output_dir: [TODO: Add description] """ self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) # Create subdirectories self.docs_dir = self.output_dir / "documents" self.metadata_dir = self.output_dir / "metadata" self.raw_dir = self.output_dir / "raw_readmes" for dir in [self.docs_dir, self.metadata_dir, self.raw_dir]: dir.mkdir(exist_ok=True) self.source_url = "https://github.com/TensorBlock/awesome-mcp-servers" self.session = None # Category mappings from emoji to enum self.category_mappings = { "📂": MCPCategory.FILE_SYSTEMS, "📦": MCPCategory.SANDBOX_VIRTUALIZATION, "🔄": MCPCategory.VERSION_CONTROL, "☁️": MCPCategory.CLOUD_STORAGE, "🗄️": MCPCategory.DATABASES, "💬": MCPCategory.COMMUNICATION, "📈": MCPCategory.MONITORING, "🔍": MCPCategory.SEARCH_WEB, "🗺️": MCPCategory.LOCATION_SERVICES, "🎯": MCPCategory.MARKETING, "📝": MCPCategory.NOTE_TAKING, "⚡": MCPCategory.CLOUD_PLATFORMS, "⚙️": MCPCategory.WORKFLOW_AUTOMATION, "🤖": MCPCategory.SYSTEM_AUTOMATION, "📱": MCPCategory.SOCIAL_MEDIA, "🎮": MCPCategory.GAMING, "💹": MCPCategory.FINANCE, "🧬": MCPCategory.RESEARCH_DATA, "🤝": MCPCategory.AI_SERVICES, "💻": MCPCategory.DEVELOPMENT_TOOLS, "📊": MCPCategory.DATA_VISUALIZATION, "🆔": MCPCategory.IDENTITY, "🔗": MCPCategory.AGGREGATORS, "🌐": MCPCategory.LANGUAGE_TRANSLATION, "🔒": MCPCategory.SECURITY, "🔌": MCPCategory.IOT, "🧑‍🎨": MCPCategory.ART_LITERATURE, } # Language indicators self.language_indicators = { "🐍": MCPLanguage.PYTHON, "📇": MCPLanguage.TYPESCRIPT_JAVASCRIPT, "🏎️": MCPLanguage.GO, "🦀": MCPLanguage.RUST, "#️⃣": MCPLanguage.CSHARP, "☕": MCPLanguage.JAVA, "🌊": MCPLanguage.C_CPP, } # Platform indicators self.platform_indicators = { "🍎": MCPPlatform.MACOS, "🪟": MCPPlatform.WINDOWS, "🐧": MCPPlatform.LINUX, } # Scope indicators self.scope_indicators = { "☁️": MCPScope.CLOUD, "🏠": MCPScope.LOCAL, "📟": MCPScope.EMBEDDED, } self.stats = ExtractionStats()
[docs] async def extract_repositories_from_readme(self) -> list[MCPServerMetadata]: """Extract repository information from the awesome-mcp-servers. README. """ try: # Fetch the raw README content raw_url = "https://raw.githubusercontent.com/TensorBlock/awesome-mcp-servers/main/README.md" if not self.session: raise RuntimeError("Session not initialized") async with self.session.get(raw_url) as response: readme_content = await response.text() repositories = [] current_category = MCPCategory.OTHER # Parse line by line lines = readme_content.split("\n") # Regex patterns repo_pattern = re.compile( r"\[([^\]]+)\]\(https://github\.com/([^/]+)/([^)]+)\)" ) official_pattern = re.compile(r"⭐|🎖️") for line in lines: # Check for category headers for emoji, category in self.category_mappings.items(): if emoji in line and line.strip().startswith(f"{emoji}"): current_category = category break # Extract repository links repo_matches = repo_pattern.findall(line) for match in repo_matches: name, owner, repo_name = match repo_url = f"https://github.com/{owner}/{repo_name}" # Clean repo_name (remove any anchors or query params) repo_name = repo_name.split("#")[0].split("?")[0] # Check if official is_official = bool(official_pattern.search(line)) # Extract languages, platforms, and scopes languages = [] platforms = [] scopes = [] for indicator, language in self.language_indicators.items(): if indicator in line: languages.append(language) for indicator, platform in self.platform_indicators.items(): if indicator in line: platforms.append(platform) for indicator, scope in self.scope_indicators.items(): if indicator in line: scopes.append(scope) # Extract description (text after the dash) desc_match = re.search(r"- (.+?)(?:\[|$)", line) description = desc_match.group(1).strip() if desc_match else None # Create metadata object metadata = MCPServerMetadata( name=name.strip(), owner=owner, repo_name=repo_name, repo_url=repo_url, description=description, category=current_category, languages=languages or [MCPLanguage.OTHER], is_official=is_official, platforms=platforms or [MCPPlatform.CROSS_PLATFORM], scopes=scopes or [MCPScope.LOCAL], stars=None, last_updated=None, license=None, readme_url=None, api_base_url=None, ) repositories.append(metadata) self.stats.total_found = len(repositories) return repositories except Exception as e: console.print(f"[red]Error extracting repositories: {e}[/red]") return []
[docs] async def fetch_readme_content(self, metadata: MCPServerMetadata) -> str | None: """Fetch README content from GitHub.""" try: # Try multiple README file names readme_names = ["README.md", "readme.md", "README.MD", "Readme.md"] for readme_name in readme_names: raw_url = f"https://raw.githubusercontent.com/{metadata.owner}/{ metadata.repo_name }/main/{readme_name}" if not self.session: return None async with self.session.get(raw_url) as response: if response.status == 200: content = await response.text() metadata.readme_url = raw_url return content # Try master branch raw_url = f"https://raw.githubusercontent.com/{metadata.owner}/{ metadata.repo_name }/master/{readme_name}" if not self.session: return None async with self.session.get(raw_url) as response: if response.status == 200: content = await response.text() metadata.readme_url = raw_url return content return None except Exception as e: console.print( f"[yellow]Error fetching README for {metadata.get_unique_id()}: {e}[/yellow]" ) return None
[docs] async def fetch_github_metadata(self, metadata: MCPServerMetadata) -> None: """Fetch additional metadata from GitHub API.""" try: # Use GitHub API to get repo info api_url = ( f"https://api.github.com/repos/{metadata.owner}/{metadata.repo_name}" ) headers = { "Accept": "application/vnd.github.v3+json", "User-Agent": "MCP-Repository-Extractor", } # Add GitHub token if available github_token = os.getenv("GITHUB_TOKEN") if github_token: headers["Authorization"] = f"token {github_token}" if not self.session: return metadata async with self.session.get(api_url, headers=headers) as response: if response.status == 200: data = await response.json() metadata.stars = data.get("stargazers_count") metadata.license = ( data.get("license", {}).get("name") if data.get("license") else None ) metadata.last_updated = datetime.fromisoformat( data.get("updated_at", "").replace("Z", "+00:00") ) metadata.api_base_url = api_url except Exception as e: console.print( f"[yellow]Error fetching GitHub metadata for {metadata.get_unique_id()}: { e }[/yellow]" )
[docs] async def process_repository( self, metadata: MCPServerMetadata ) -> MCPServerDocument | None: """Process a single repository.""" try: # Fetch README content readme_content = await self.fetch_readme_content(metadata) # Fetch additional GitHub metadata await self.fetch_github_metadata(metadata) # Create document document = MCPServerDocument( metadata=metadata, readme_content=readme_content, content_hash=None, # Will be computed if needed ) # Compute content hash document.content_hash = document.compute_content_hash() return document except Exception as e: console.print( f"[red]Error processing {metadata.get_unique_id()}: {e}[/red]" ) return None
[docs] def save_documents(self, documents: list[MCPServerDocument]) -> None: """Save documents in various formats.""" # Save as LangChain documents langchain_docs = [] for doc in documents: # Save raw README if doc.readme_content: raw_path = ( self.raw_dir / f"{doc.metadata.owner}_{doc.metadata.repo_name}.md" ) raw_path.write_text(doc.readme_content, encoding="utf-8") # Convert to LangChain document langchain_doc = doc.to_langchain_document() langchain_docs.append(langchain_doc) # Save individual document as JSON doc_path = ( self.docs_dir / f"{doc.metadata.owner}_{doc.metadata.repo_name}.json" ) doc_path.write_text(doc.model_dump_json(indent=2), encoding="utf-8") # Save all documents as a single JSON file all_docs_path = self.output_dir / "all_mcp_documents.json" all_docs_data = [doc.model_dump() for doc in documents] all_docs_path.write_text( json.dumps(all_docs_data, indent=2, default=str), encoding="utf-8" ) # Save metadata summary metadata_summary = { "extraction_timestamp": datetime.now().isoformat(), "total_documents": len(documents), "stats": self.stats.model_dump(), "servers": [ { "id": doc.metadata.get_unique_id(), "name": doc.metadata.name, "category": doc.metadata.category, "languages": doc.metadata.languages, "url": doc.metadata.repo_url, } for doc in documents ], } summary_path = self.metadata_dir / "extraction_summary.json" summary_path.write_text( json.dumps(metadata_summary, indent=2, default=str), encoding="utf-8" ) # Save as YAML for easy reading yaml_path = self.metadata_dir / "servers_overview.yaml" yaml_data = { "mcp_servers": [ { "name": doc.metadata.name, "repo": doc.metadata.get_unique_id(), "category": doc.metadata.category, "description": doc.metadata.description, } for doc in documents ] } yaml_path.write_text( yaml.dump(yaml_data, default_flow_style=False), encoding="utf-8" )
[docs] def generate_statistics_report(self, documents: list[MCPServerDocument]) -> None: """Generate statistics report.""" # Update stats self.stats.successfully_extracted = len( [d for d in documents if d.readme_content] ) self.stats.failed_extractions = ( len(documents) - self.stats.successfully_extracted ) # Count by category for doc in documents: category = doc.metadata.category self.stats.categories[category] = self.stats.categories.get(category, 0) + 1 # Count by language for doc in documents: for lang in doc.metadata.languages: self.stats.languages[lang] = self.stats.languages.get(lang, 0) + 1 # Create report table table = Table(title="MCP Repository Extraction Statistics", show_header=True) table.add_column("Metric", style="cyan") table.add_column("Value", style="green") table.add_row("Total Repositories Found", str(self.stats.total_found)) table.add_row("Successfully Extracted", str(self.stats.successfully_extracted)) table.add_row("Failed Extractions", str(self.stats.failed_extractions)) console.print("\n", table) # Category breakdown cat_table = Table(title="Repositories by Category", show_header=True) cat_table.add_column("Category", style="cyan") cat_table.add_column("Count", style="green") for category, count in sorted( self.stats.categories.items(), key=lambda x: x[1], reverse=True ): cat_table.add_row(category, str(count)) console.print("\n", cat_table) # Language breakdown lang_table = Table(title="Repositories by Language", show_header=True) lang_table.add_column("Language", style="cyan") lang_table.add_column("Count", style="green") for language, count in sorted( self.stats.languages.items(), key=lambda x: x[1], reverse=True ): lang_table.add_row(language, str(count)) console.print("\n", lang_table)
[docs] async def extract_all(self) -> list[MCPServerDocument]: """Main extraction method.""" start_time = datetime.now() console.print("[bold blue]MCP Repository Extractor[/bold blue]") console.print(f"Output directory: {self.output_dir}") async with aiohttp.ClientSession() as session: self.session = session # Step 1: Extract repository information console.print( "\n[yellow]Step 1: Extracting repository information...[/yellow]" ) repositories = await self.extract_repositories_from_readme() console.print(f"[green]Found {len(repositories)} repositories[/green]") # Step 2: Process repositories console.print("\n[yellow]Step 2: Processing repositories...[/yellow]") documents = [] with Progress( SpinnerColumn(), TextColumn("[progress.description]{task.description}"), BarColumn(), TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), console=console, ) as progress: task = progress.add_task( "Processing repositories...", total=len(repositories) ) # Process in batches to avoid rate limiting batch_size = 10 for i in range(0, len(repositories), batch_size): batch = repositories[i : i + batch_size] # Process batch concurrently batch_tasks = [self.process_repository(repo) for repo in batch] batch_results = await asyncio.gather(*batch_tasks) # Add successful results for doc in batch_results: if doc: documents.append(doc) progress.update(task, advance=len(batch)) # Small delay to avoid rate limiting if i + batch_size < len(repositories): await asyncio.sleep(1) # Step 3: Save documents console.print("\n[yellow]Step 3: Saving documents...[/yellow]") self.save_documents(documents) # Calculate duration duration = (datetime.now() - start_time).total_seconds() self.stats.extraction_duration = duration # Generate statistics self.generate_statistics_report(documents) console.print("\n[green]✓ Extraction complete![/green]") console.print(f"Duration: {duration:.2f} seconds") console.print(f"Output directory: {self.output_dir}") return documents
[docs] def create_agent_loader(output_dir: str = "agent_resources/mcp_servers") -> callable: """Create a loader function for agents to access MCP documents.""" def load_mcp_documents( category: MCPCategory | None = None, language: MCPLanguage | None = None, search_query: str | None = None, ) -> list[Document]: """Load MCP server documents with optional filtering.""" # Load all documents all_docs_path = Path(output_dir) / "all_mcp_documents.json" if not all_docs_path.exists(): return [] with open(all_docs_path, encoding="utf-8") as f: docs_data = json.load(f) # Convert to MCPServerDocument objects documents = [] for data in docs_data: # Handle datetime conversion if "extracted_at" in data: data["extracted_at"] = datetime.fromisoformat(data["extracted_at"]) if ( "metadata" in data and "last_updated" in data["metadata"] and data["metadata"]["last_updated"] ): data["metadata"]["last_updated"] = datetime.fromisoformat( data["metadata"]["last_updated"] ) doc = MCPServerDocument(**data) # Apply filters if category and doc.metadata.category != category: continue if language and language not in doc.metadata.languages: continue if search_query: search_text = f"{doc.metadata.name} {doc.metadata.description or ''} { doc.readme_content or '' }".lower() if search_query.lower() not in search_text: continue documents.append(doc.to_langchain_document()) return documents return load_mcp_documents
[docs] async def main(): """Main function.""" extractor = MCPRepositoryExtractor() await extractor.extract_all() # Create a loader for agents loader = create_agent_loader() # Example usage console.print("\n[blue]Example usage:[/blue]") console.print("Loading all database-related MCP servers:") db_docs = loader(category=MCPCategory.DATABASES) console.print(f"Found {len(db_docs)} database servers")
if __name__ == "__main__": asyncio.run(main())