"""Core MCP Downloader implementation.
This module provides the main GeneralMCPDownloader class that orchestrates
the downloading, installation, and configuration of MCP servers from various sources.
Example:
Basic usage:
.. code-block:: python
downloader = GeneralMCPDownloader()
result = await downloader.download_servers(["filesystem", "github"])
Auto-discovery:
.. code-block:: python
result = await downloader.auto_discover_and_download(limit=10)
Custom configuration:
.. code-block:: python
downloader = GeneralMCPDownloader(
config_file="my_config.yaml",
install_dir="/custom/path"
)
Classes:
GeneralMCPDownloader: Main downloader orchestrator
DownloadResult: Result of download operations
ServerStatus: Status tracking for servers
Version: 1.0.0
Author: Haive MCP Team
"""
import asyncio
import json
import logging
import shutil
import time
from datetime import datetime
from pathlib import Path
from typing import Any
from pydantic import BaseModel, Field
from haive.mcp.downloader.config import (
DiscoveryConfig,
DownloaderConfig,
InstallationMethod,
ServerConfig,
ServerTemplate,
load_config,
save_config,
)
from haive.mcp.downloader.discovery import ServerDiscovery
from haive.mcp.downloader.installers import (
BinaryInstaller,
CurlInstaller,
DockerInstaller,
GitInstaller,
MCPInstaller,
NPMInstaller,
PipInstaller,
)
logger = logging.getLogger(__name__)
[docs]
class ServerStatus(BaseModel):
"""Status information for an MCP server.
Attributes:
name: Server name
status: Current status (installed, failed, pending)
last_check: Timestamp of last status check
last_success: Timestamp of last successful operation
install_result: Result of installation attempt
health_status: Health check status
error: Error message if failed
Example:
Creating status:
.. code-block:: python
status = ServerStatus(
name="filesystem",
status="installed",
last_check=datetime.now(),
last_success=datetime.now()
)
"""
name: str = Field(..., description="Server name")
status: str = Field(default="pending", description="Current status")
last_check: datetime | None = Field(None, description="Last check time")
last_success: datetime | None = Field(None, description="Last success time")
install_result: dict[str, Any] | None = Field(None, description="Install result")
health_status: str | None = Field(None, description="Health status")
error: str | None = Field(None, description="Error message")
[docs]
class DownloadResult(BaseModel):
"""Result of a download operation.
Attributes:
total: Total servers attempted
successful: Number of successful installations
failed: Number of failed installations
success_rate: Percentage success rate
successful_servers: List of successful server details
failed_servers: List of failed server details
config_file: Path to generated configuration
duration: Operation duration in seconds
Example:
Checking results:
.. code-block:: python
if result.success_rate > 80:
print(f"Good success rate: {result.success_rate}%")
"""
total: int = Field(..., description="Total servers")
successful: int = Field(..., description="Successful installs")
failed: int = Field(..., description="Failed installs")
success_rate: float = Field(..., description="Success percentage")
successful_servers: list[dict[str, Any]] = Field(default_factory=list)
failed_servers: list[dict[str, Any]] = Field(default_factory=list)
config_file: str | None = Field(None, description="Config file path")
duration: float | None = Field(None, description="Duration in seconds")
[docs]
class GeneralMCPDownloader:
"""General MCP Server Downloader with configurable patterns and installers.
This is the main orchestrator class that manages the downloading, installation,
and configuration of MCP servers from various sources using a plugin-based
architecture.
Attributes:
config: Downloader configuration
installers: List of available installers
discovery: Server discovery instance
status_tracker: Server status tracking
Example:
Creating and using downloader:
.. code-block:: python
downloader = GeneralMCPDownloader()
# Download specific servers
result = await downloader.download_servers(["filesystem", "github"])
# Auto-discover and install
result = await downloader.auto_discover_and_download(limit=10)
# Check server health
health = await downloader.check_server_health()
Note:
The downloader automatically creates necessary directories and
default configuration if not provided.
"""
[docs]
def __init__(self, config_file: str | None = None, install_dir: str | None = None):
"""Initialize the General MCP Downloader.
Args:
config_file: Path to YAML configuration file. If not provided,
creates a default configuration.
install_dir: Directory for server installations. Defaults to
~/.mcp/servers if not specified.
Example:
Initialization:
.. code-block:: python
# Use defaults
downloader = GeneralMCPDownloader()
# Custom paths
downloader = GeneralMCPDownloader(
config_file="/path/to/config.yaml",
install_dir="/custom/install/dir"
)
"""
# Load or create configuration
if config_file and Path(config_file).exists():
self.config = load_config(Path(config_file))
logger.info(f"Loaded configuration from {config_file}")
else:
self.config = self._create_default_config()
if config_file:
save_config(self.config, Path(config_file))
logger.info(f"Created default configuration at {config_file}")
# Override install directory if provided
if install_dir:
self.config.install_dir = Path(install_dir)
# Ensure directories exist
self._ensure_directories()
# Initialize components
self.installers = self._initialize_installers()
self.discovery = ServerDiscovery(self.config.discovery)
self.status_tracker: dict[str, ServerStatus] = {}
# Load existing status
self._load_status()
logger.info(
f"Initialized downloader with {len(self.config.templates)} templates "
f"and {len(self.config.servers)} servers"
)
def _create_default_config(self) -> DownloaderConfig:
"""Create default configuration with common templates.
Returns:
Default DownloaderConfig instance
Note:
This creates a sensible default configuration that covers
the most common MCP server patterns.
"""
# Default templates
templates = [
ServerTemplate(
name="npm_official",
installation_method=InstallationMethod.NPM,
command_pattern="@modelcontextprotocol/server-{service}",
capabilities=["tools"],
category="official",
prerequisites=["node", "npm"],
health_check=None, # Add missing health_check parameter
),
ServerTemplate(
name="npm_community",
installation_method=InstallationMethod.NPM,
command_pattern="{package}",
capabilities=["tools"],
category="community",
prerequisites=["node", "npm"],
health_check=None, # Add missing health_check parameter
),
ServerTemplate(
name="git_repo",
installation_method=InstallationMethod.GIT,
command_pattern="python server.py",
post_install=["pip install -r requirements.txt"],
capabilities=["tools"],
category="development",
health_check=None, # Add missing health_check parameter
prerequisites=["git", "python"],
),
ServerTemplate(
name="docker_image",
installation_method=InstallationMethod.DOCKER,
command_pattern="{image}",
capabilities=["tools"],
category="containerized",
prerequisites=["docker"],
health_check=None, # Add missing health_check parameter
),
ServerTemplate(
name="pypi_package",
installation_method=InstallationMethod.PIP,
command_pattern="{package}",
capabilities=["tools"],
category="python",
prerequisites=["python", "pip"],
health_check=None, # Add missing health_check parameter
),
]
# Default servers
servers = [
ServerConfig(
name="filesystem",
template="npm_official",
source="npm",
variables={"service": "filesystem"},
tags={"official", "file-operations", "core"},
version=None, # Add missing version parameter
),
ServerConfig(
name="github",
template="npm_official",
source="npm",
variables={"service": "github"},
tags={"official", "git", "development"},
version=None, # Add missing version parameter
),
ServerConfig(
name="sqlite",
template="npm_official",
source="npm",
variables={"service": "sqlite"},
tags={"official", "database", "sql"},
version=None, # Add missing version parameter
),
]
# Discovery configuration
discovery = DiscoveryConfig(
sources=[
"https://raw.githubusercontent.com/modelcontextprotocol/servers/main/README.md",
"https://api.github.com/search/repositories?q=mcp+server+in:name&sort=stars",
"https://registry.npmjs.org/-/v1/search?text=mcp+server",
],
patterns={
"npm": [
"@modelcontextprotocol/server-*",
"mcp-server-*",
"*-mcp-server",
],
"pypi": ["mcp-*", "*-mcp", "mcp-server-*"],
"docker": ["mcp/*", "modelcontextprotocol/*"],
"github": ["*mcp*server*", "mcp-*"],
},
)
return DownloaderConfig(
templates=templates, servers=servers, discovery=discovery
)
def _ensure_directories(self) -> None:
"""Ensure all required directories exist.
Creates:
- Installation directory
- Configuration directory
- Log directory
- Backup directory (if enabled)
"""
directories = [
self.config.install_dir,
self.config.config_dir,
self.config.log_dir,
]
if self.config.backup_enabled:
directories.append(self.config.backup_dir)
for directory in directories:
directory.mkdir(parents=True, exist_ok=True)
logger.debug(f"Ensured directory exists: {directory}")
def _initialize_installers(self) -> list[MCPInstaller]:
"""Initialize all available installers.
Returns:
List of installer instances
Note:
New installers can be added here as they are developed.
"""
return [
NPMInstaller(),
PipInstaller(),
GitInstaller(),
DockerInstaller(),
BinaryInstaller(),
CurlInstaller(),
]
def _load_status(self) -> None:
"""Load server status from persistent storage.
Loads the status tracking information from the status file if it
exists.
"""
status_file = self.config.config_dir / "server_status.json"
if status_file.exists():
try:
with open(status_file) as f:
data = json.load(f)
for name, status_data in data.items():
# Convert timestamps
if status_data.get("last_check"):
status_data["last_check"] = datetime.fromisoformat(
status_data["last_check"]
)
if status_data.get("last_success"):
status_data["last_success"] = datetime.fromisoformat(
status_data["last_success"]
)
self.status_tracker[name] = ServerStatus(**status_data)
logger.info(f"Loaded status for {len(self.status_tracker)} servers")
except Exception as e:
logger.exception(f"Error loading status: {e}")
def _save_status(self) -> None:
"""Save server status to persistent storage.
Saves the current status tracking information to a JSON file for
persistence across runs.
"""
status_file = self.config.config_dir / "server_status.json"
try:
data = {}
for name, status in self.status_tracker.items():
status_dict = status.model_dump(mode="json")
# Convert timestamps to ISO format
if status.last_check:
status_dict["last_check"] = status.last_check.isoformat()
if status.last_success:
status_dict["last_success"] = status.last_success.isoformat()
data[name] = status_dict
with open(status_file, "w") as f:
json.dump(data, f, indent=2)
logger.debug(f"Saved status for {len(data)} servers")
except Exception as e:
logger.exception(f"Error saving status: {e}")
@property
def templates(self) -> dict[str, ServerTemplate]:
"""Get templates as a dictionary.
Returns:
Dict mapping template names to ServerTemplate objects
Example:
Accessing templates:
.. code-block:: python
npm_template = downloader.templates["npm_official"]
print(f"Command: {npm_template.command_pattern}")
"""
return {t.name: t for t in self.config.templates}
@property
def servers(self) -> list[ServerConfig]:
"""Get list of server configurations.
Returns:
List of ServerConfig objects
Example:
Listing servers:
.. code-block:: python
for server in downloader.servers:
print(f"{server.name}: {server.template}")
"""
return self.config.servers
[docs]
async def download_servers(
self,
server_names: list[str] | None = None,
categories: list[str] | None = None,
tags: set[str] | None = None,
max_concurrent: int | None = None,
) -> DownloadResult:
"""Download and install MCP servers.
This is the main method for downloading servers. It supports filtering
by name, category, or tags, and handles concurrent downloads with
retry logic.
Args:
server_names: Specific server names to download. If None,
downloads all enabled servers.
categories: Filter servers by category (e.g., "official", "community")
tags: Filter servers by tags (e.g., {"database", "file-operations"})
max_concurrent: Maximum concurrent downloads. Uses config default if None.
Returns:
DownloadResult with details of the operation
Example:
Download specific servers:
.. code-block:: python
result = await downloader.download_servers(["filesystem", "github"])
print(f"Success rate: {result.success_rate}%")
Download by category:
.. code-block:: python
result = await downloader.download_servers(categories=["official"])
Download by tags:
.. code-block:: python
result = await downloader.download_servers(tags={"database", "sql"})
Raises:
ValueError: If no servers match the criteria
"""
start_time = time.time()
# Filter servers to download
servers_to_download = self._filter_servers(server_names, categories, tags)
if not servers_to_download:
raise ValueError("No servers match the download criteria")
logger.info(f"Downloading {len(servers_to_download)} servers...")
# Use configured max concurrent if not specified
if max_concurrent is None:
max_concurrent = self.config.max_concurrent
# Create semaphore for concurrent downloads
semaphore = asyncio.Semaphore(max_concurrent)
# Download servers concurrently
tasks = []
for server in servers_to_download:
task = asyncio.create_task(
self._download_server_with_semaphore(semaphore, server)
)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
successful_servers = []
failed_servers = []
for i, result in enumerate(results):
server = servers_to_download[i]
if isinstance(result, Exception):
failed_servers.append({"server": server.name, "error": str(result)})
self._update_server_status(server.name, "failed", error=str(result))
elif isinstance(result, dict) and result.get("success"):
successful_servers.append({"server": server.name, "result": result})
self._update_server_status(server.name, "installed", result=result)
else:
error = (
result.get("error", "Unknown error")
if isinstance(result, dict)
else "Unknown error"
)
failed_servers.append({"server": server.name, "error": error})
self._update_server_status(server.name, "failed", error=error)
# Save configuration
config_file = await self._save_server_config(successful_servers)
# Save status
self._save_status()
# Create result
total = len(servers_to_download)
successful = len(successful_servers)
failed = len(failed_servers)
result = DownloadResult(
total=total,
successful=successful,
failed=failed,
success_rate=(successful / total * 100) if total > 0 else 0,
successful_servers=successful_servers,
failed_servers=failed_servers,
config_file=str(config_file),
duration=time.time() - start_time,
)
logger.info(
f"Download complete: {successful}/{total} successful ({result.success_rate:.1f}%)"
)
return result
def _filter_servers(
self,
server_names: list[str] | None = None,
categories: list[str] | None = None,
tags: set[str] | None = None,
) -> list[ServerConfig]:
"""Filter servers based on criteria.
Args:
server_names: Specific server names
categories: Categories to include
tags: Tags to filter by
Returns:
List of filtered ServerConfig objects
"""
filtered = []
for server in self.config.servers:
# Skip disabled servers
if not server.enabled:
continue
# Filter by name
if server_names and server.name not in server_names:
continue
# Filter by category
if categories:
template = self.templates.get(server.template)
if not template or template.category not in categories:
continue
# Filter by tags
if tags and not server.tags.intersection(tags):
continue
filtered.append(server)
return filtered
async def _download_server_with_semaphore(
self, semaphore: asyncio.Semaphore, server: ServerConfig
) -> dict[str, Any]:
"""Download a server with concurrency control.
Args:
semaphore: Asyncio semaphore for concurrency control
server: Server configuration
Returns:
Download result dictionary
"""
async with semaphore:
return await self._download_server(server)
async def _download_server(self, server: ServerConfig) -> dict[str, Any]:
"""Download and install a single MCP server.
Args:
server: Server configuration
Returns:
Result dictionary with success status and details
"""
template = self.templates.get(server.template)
if not template:
return {"success": False, "error": f"Template {server.template} not found"}
logger.info(f"Installing {server.name} using template {server.template}")
# Find appropriate installer
installer = None
for inst in self.installers:
if await inst.can_handle(server, template):
installer = inst
break
if not installer:
return {
"success": False,
"error": f"No installer found for {template.installation_method}",
}
# Attempt installation with retries
last_error = None
for attempt in range(self.config.retry_attempts):
if attempt > 0:
logger.info(f"Retry attempt {attempt + 1} for {server.name}")
await asyncio.sleep(self.config.retry_delay)
try:
result = await installer.install(
server, template, self.config.install_dir
)
if result.get("success"):
# Verify installation if health check enabled
if self.config.health_check_enabled:
verified = await self._verify_installation(
server, template, installer
)
result["verified"] = verified
return result
last_error = result.get("error", "Installation failed")
except Exception as e:
last_error = str(e)
logger.exception(f"Error installing {server.name}: {e}")
return {"success": False, "error": last_error}
async def _verify_installation(
self, server: ServerConfig, template: ServerTemplate, installer: MCPInstaller
) -> bool:
"""Verify server installation with health check.
Args:
server: Server configuration
template: Server template
installer: Installer that was used
Returns:
True if verification successful
"""
try:
# Use installer's verify method
verified = await asyncio.wait_for(
installer.verify(server, template, self.config.install_dir),
timeout=self.config.health_check_timeout,
)
if verified:
logger.info(f"✓ Verified installation of {server.name}")
else:
logger.warning(f"⚠ Verification failed for {server.name}")
return verified
except TimeoutError:
logger.warning(f"Health check timed out for {server.name}")
return False
except Exception as e:
logger.exception(f"Error verifying {server.name}: {e}")
return False
def _update_server_status(
self,
name: str,
status: str,
result: dict | None = None,
error: str | None = None,
) -> None:
"""Update server status in tracker.
Args:
name: Server name
status: New status
result: Installation result if applicable
error: Error message if failed
"""
if name not in self.status_tracker:
self.status_tracker[name] = ServerStatus(
name=name,
last_check=None,
last_success=None,
install_result=None,
health_status=None,
error=None,
)
server_status = self.status_tracker[name]
server_status.status = status
server_status.last_check = datetime.now()
if status == "installed" and result:
server_status.last_success = datetime.now()
server_status.install_result = result
server_status.error = None
elif error:
server_status.error = error
async def _save_server_config(
self, successful_servers: list[dict[str, Any]]
) -> Path:
"""Save successful server configurations.
Args:
successful_servers: List of successfully installed servers
Returns:
Path to saved configuration file
"""
config = {
"mcpServers": {},
"generated_at": datetime.now().isoformat(),
"install_dir": str(self.config.install_dir),
"downloader_version": "1.0.0",
}
for server_info in successful_servers:
server_name = server_info["server"]
server_result = server_info["result"]
# Find server config
server_config = next(
(s for s in self.config.servers if s.name == server_name), None
)
if not server_config:
continue
# Find template
template = self.templates.get(server_config.template)
if not template:
continue
# Create MCP server configuration
mcp_config = {
"command": server_result.get("command", template.command_pattern),
"args": template.args_pattern,
"env": {**template.env_vars, **server_config.env_vars},
}
# Add transport-specific fields
if server_result.get("method") == "docker":
mcp_config["transport"] = "docker"
else:
mcp_config["transport"] = "stdio"
config["mcpServers"][server_name] = mcp_config
# Save configuration
config_path = self.config.config_dir / "mcp_servers_config.json"
# Backup existing config if enabled
if self.config.backup_enabled and config_path.exists():
backup_path = (
self.config.backup_dir / f"mcp_servers_config_{int(time.time())}.json"
)
shutil.copy2(config_path, backup_path)
logger.debug(f"Backed up config to {backup_path}")
with open(config_path, "w") as f:
json.dump(config, f, indent=2)
logger.info(f"Saved configuration to {config_path}")
return config_path
[docs]
async def auto_discover_and_download(
self, limit: int | None = None, auto_install: bool = True
) -> DownloadResult:
"""Auto-discover servers from registries and optionally download them.
This method discovers MCP servers from configured sources and can
automatically install them.
Args:
limit: Maximum number of servers to discover per source
auto_install: Whether to automatically install discovered servers
Returns:
DownloadResult with discovery and installation details
Example:
Auto-discover and install:
.. code-block:: python
result = await downloader.auto_discover_and_download(limit=20)
print(f"Discovered and installed {result.successful} servers")
Discover only:
.. code-block:: python
result = await downloader.auto_discover_and_download(
limit=50,
auto_install=False
)
"""
logger.info("Starting auto-discovery of MCP servers...")
# Discover servers
discovered_servers = await self.discovery.discover_all(limit_per_source=limit)
logger.info(f"Discovered {len(discovered_servers)} unique servers")
# Convert discovered servers to ServerConfig
new_servers = []
for server_data in discovered_servers:
# Skip if already configured
if any(s.name == server_data["name"] for s in self.config.servers):
continue
# Determine template based on source/type
template_name = self.discovery.determine_template(server_data)
if template_name not in self.templates:
logger.warning(f"No template for {server_data['name']}, skipping")
continue
server = ServerConfig(
name=server_data["name"],
template=template_name,
source=server_data.get("source", ""),
variables=server_data.get("variables", {}),
tags=set(server_data.get("tags", [])),
enabled=True,
version=server_data.get("version"), # Add missing version parameter
)
new_servers.append(server)
self.config.servers.append(server)
logger.info(f"Added {len(new_servers)} new servers to configuration")
if auto_install and new_servers:
# Download newly discovered servers
server_names = [s.name for s in new_servers]
return await self.download_servers(server_names)
# Return discovery-only result
return DownloadResult(
total=len(new_servers),
successful=len(new_servers),
failed=0,
success_rate=100.0,
successful_servers=[{"server": s.name} for s in new_servers],
failed_servers=[],
config_file=None, # Add missing config_file parameter
duration=0,
)
[docs]
async def check_server_health(
self, server_names: list[str] | None = None
) -> dict[str, Any]:
"""Check health status of installed servers.
Args:
server_names: Specific servers to check. If None, checks all installed.
Returns:
Dict with health check results
Example:
Checking health:
.. code-block:: python
health = await downloader.check_server_health()
for server, status in health["servers"].items():
print(f"{server}: {status}")
"""
servers_to_check = []
if server_names:
servers_to_check = [
s for s in self.config.servers if s.name in server_names
]
else:
# Check all installed servers
servers_to_check = [
s
for s in self.config.servers
if self.status_tracker.get(
s.name,
ServerStatus(
name=s.name,
last_check=None,
last_success=None,
install_result=None,
health_status=None,
error=None,
),
).status
== "installed"
]
health_results = {}
for server in servers_to_check:
template = self.templates.get(server.template)
if not template:
continue
# Find installer
installer = None
for inst in self.installers:
if await inst.can_handle(server, template):
installer = inst
break
if installer:
try:
is_healthy = await installer.verify(
server, template, self.config.install_dir
)
health_results[server.name] = (
"healthy" if is_healthy else "unhealthy"
)
# Update status
if server.name in self.status_tracker:
self.status_tracker[server.name].health_status = health_results[
server.name
]
self.status_tracker[server.name].last_check = datetime.now()
except Exception as e:
health_results[server.name] = f"error: {e!s}"
logger.exception(f"Health check failed for {server.name}: {e}")
else:
health_results[server.name] = "no_installer"
# Save updated status
self._save_status()
# Calculate summary
healthy = sum(1 for status in health_results.values() if status == "healthy")
total = len(health_results)
return {
"servers": health_results,
"summary": {
"total_checked": total,
"healthy": healthy,
"unhealthy": total - healthy,
"health_rate": (healthy / total * 100) if total > 0 else 0,
},
}
[docs]
def add_custom_server(self, server: ServerConfig) -> None:
"""Add a custom server configuration.
Args:
server: ServerConfig to add
Example:
Adding custom server:
.. code-block:: python
custom = ServerConfig(
name="my-custom-mcp",
template="git_repo",
source="https://github.com/user/my-mcp.git",
variables={"owner": "user", "repo": "my-mcp"}
)
downloader.add_custom_server(custom)
"""
# Check for duplicates
if any(s.name == server.name for s in self.config.servers):
logger.warning(f"Server {server.name} already exists, updating")
self.config.servers = [
s if s.name != server.name else server for s in self.config.servers
]
else:
self.config.servers.append(server)
logger.info(f"Added custom server: {server.name}")
[docs]
def add_custom_template(self, template: ServerTemplate) -> None:
"""Add a custom template.
Args:
template: ServerTemplate to add
Example:
Adding custom template:
.. code-block:: python
template = ServerTemplate(
name="rust_binary",
installation_method=InstallationMethod.BINARY,
command_pattern="./{service}-mcp",
category="rust"
)
downloader.add_custom_template(template)
"""
# Check for duplicates
existing_names = [t.name for t in self.config.templates]
if template.name in existing_names:
logger.warning(f"Template {template.name} already exists, updating")
self.config.templates = [
t if t.name != template.name else template
for t in self.config.templates
]
else:
self.config.templates.append(template)
logger.info(f"Added custom template: {template.name}")
[docs]
def get_server_status(self, server_name: str) -> ServerStatus | None:
"""Get status for a specific server.
Args:
server_name: Name of the server
Returns:
ServerStatus if found, None otherwise
Example:
Checking status:
.. code-block:: python
status = downloader.get_server_status("filesystem")
if status and status.status == "installed":
print(f"Last success: {status.last_success}")
"""
return self.status_tracker.get(server_name)
[docs]
def get_all_status(self) -> dict[str, ServerStatus]:
"""Get status for all servers.
Returns:
Dict mapping server names to ServerStatus objects
"""
return self.status_tracker.copy()
[docs]
def save_configuration(self, config_file: Path) -> None:
"""Save current configuration to file.
Args:
config_file: Path to save configuration
Example:
Saving config:
.. code-block:: python
downloader.save_configuration(Path("my_config.yaml"))
"""
save_config(self.config, config_file)
logger.info(f"Saved configuration to {config_file}")