diff --git a/mcp-servers/dlt-mcp/.gitignore b/mcp-servers/dlt-mcp/.gitignore new file mode 100644 index 0000000000..a2adc6fd95 --- /dev/null +++ b/mcp-servers/dlt-mcp/.gitignore @@ -0,0 +1,48 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg + +# Virtual environments +venv/ +env/ +ENV/ +.venv + +# IDE +.vscode/ +.idea/ +*.swp +*.swo +*~ + +# Testing +.pytest_cache/ +.coverage +htmlcov/ +.tox/ + +# dlt +.dlt/ + +# OS +.DS_Store +Thumbs.db + diff --git a/mcp-servers/dlt-mcp/LICENSE b/mcp-servers/dlt-mcp/LICENSE new file mode 100644 index 0000000000..d2e5365380 --- /dev/null +++ b/mcp-servers/dlt-mcp/LICENSE @@ -0,0 +1,22 @@ +MIT License + +Copyright (c) 2024 Continue.dev + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/mcp-servers/dlt-mcp/README.md b/mcp-servers/dlt-mcp/README.md new file mode 100644 index 0000000000..42066158b9 --- /dev/null +++ b/mcp-servers/dlt-mcp/README.md @@ -0,0 +1,435 @@ +# dlt MCP Server + +Model Context Protocol (MCP) server for [dlt](https://dlthub.com) (data load tool). This server enables AI agents to inspect, debug, and explore local dlt pipelines through safe, read-only tooling. + +## Features + +- **Pipeline Inspection**: View execution details, timing, file sizes, and rows loaded +- **Schema Metadata**: Retrieve table schemas, columns, data types, and constraints +- **Data Querying**: Execute read-only SQL queries on destination databases (DuckDB, PostgreSQL) +- **Error Analysis**: Analyze load errors with root cause explanations and suggested fixes +- **Pipeline Scaffolding**: Generate starter code for new dlt pipelines +- **Schema Evolution**: Review schema changes across pipeline runs + +## Prerequisites + +- Python 3.8 or higher +- dlt library installed (`pip install dlt`) +- A dlt pipeline project (or create one using the scaffold tool) + +## Installation + +### Using uvx (Recommended) + +```bash +# The server can be run directly with uvx +uvx dlt-mcp +``` + +### From Source + +```bash +# Clone or navigate to the mcp-servers/dlt-mcp directory +cd mcp-servers/dlt-mcp + +# Install in development mode +pip install -e . + +# Or install dependencies only +pip install -r requirements.txt +``` + +## Configuration + +Add the dlt MCP server to your Continue configuration: + +### YAML Configuration + +```yaml +name: dlt MCP Server +version: 0.1.0 +schema: v1 +mcpServers: + - name: dlt-mcp + command: uvx + args: + - "dlt-mcp" +``` + +### JSON Configuration (Claude Desktop, Cursor, etc.) + +```json +{ + "mcpServers": { + "dlt-mcp": { + "command": "uvx", + "args": ["dlt-mcp"] + } + } +} +``` + +### With Custom Working Directory + +```yaml +mcpServers: + - name: dlt-mcp + command: uvx + args: + - "dlt-mcp" + env: + DLT_WORKING_DIR: "/path/to/your/pipelines" +``` + +## Available Tools + +### 1. `inspect_pipeline` + +Inspect dlt pipeline execution details including load info, timing, file sizes, and rows loaded. + +**Parameters:** +- `pipeline_name` (optional): Name of the pipeline to inspect. If omitted, auto-discovers pipelines. +- `working_dir` (optional): Directory to search for pipelines. Defaults to current working directory. + +**Example:** +```json +{ + "pipeline_name": "my_pipeline", + "working_dir": "/path/to/pipelines" +} +``` + +**Output:** +```json +{ + "pipeline": { + "name": "my_pipeline", + "destination": "duckdb", + "dataset_name": "my_pipeline_data" + }, + "loads": [ + { + "load_id": "1234567890", + "status": "completed", + "started_at": "2024-01-01T00:00:00Z", + "finished_at": "2024-01-01T00:05:00Z", + "duration_seconds": 300.0 + } + ], + "file_sizes": { + "loads/1234567890/users.parquet": 1024000 + }, + "rows_loaded": { + "users": 1000 + } +} +``` + +--- + +### 2. `get_schema` + +Retrieve pipeline schema metadata including tables, columns, data types, and constraints. + +**Parameters:** +- `pipeline_name` (optional): Name of the pipeline. If omitted, uses first available pipeline. +- `table_name` (optional): Specific table name. If omitted, returns schema for all tables. +- `include_hints` (optional): Include dlt hints and annotations. Defaults to false. + +**Example:** +```json +{ + "pipeline_name": "my_pipeline", + "table_name": "users", + "include_hints": true +} +``` + +**Output:** +```json +{ + "pipeline_name": "my_pipeline", + "tables": [ + { + "name": "users", + "resource_name": "users", + "columns": [ + { + "name": "id", + "data_type": "bigint", + "nullable": false, + "primary_key": true, + "hints": { + "unique": true + } + }, + { + "name": "email", + "data_type": "text", + "nullable": true, + "primary_key": false + } + ] + } + ], + "table_count": 1 +} +``` + +--- + +### 3. `query_destination` + +Execute a read-only SQL SELECT query on the pipeline's destination database. + +**Parameters:** +- `pipeline_name` (optional): Name of the pipeline. If omitted, uses first available pipeline. +- `query` (required): SQL SELECT query to execute. Only SELECT statements are allowed. +- `limit` (optional): Maximum number of rows to return. Defaults to 100. + +**Example:** +```json +{ + "pipeline_name": "my_pipeline", + "query": "SELECT * FROM users WHERE created_at > '2024-01-01' LIMIT 10", + "limit": 10 +} +``` + +**Output:** +```json +{ + "pipeline_name": "my_pipeline", + "query": "SELECT * FROM users WHERE created_at > '2024-01-01' LIMIT 10", + "columns": ["id", "name", "email", "created_at"], + "rows": [ + { + "id": 1, + "name": "John Doe", + "email": "john@example.com", + "created_at": "2024-01-15T10:00:00Z" + } + ], + "row_count": 1, + "limit_applied": 10 +} +``` + +**Safety:** Only SELECT statements are allowed. The tool validates queries to prevent INSERT, UPDATE, DELETE, DROP, and other mutation operations. + +--- + +### 4. `analyze_errors` + +Analyze pipeline load errors and explain root causes with suggested fixes. + +**Parameters:** +- `pipeline_name` (optional): Name of the pipeline. If omitted, uses first available pipeline. +- `last_n_runs` (optional): Number of recent runs to analyze. Defaults to 1. + +**Example:** +```json +{ + "pipeline_name": "my_pipeline", + "last_n_runs": 3 +} +``` + +**Output:** +```json +{ + "pipeline_name": "my_pipeline", + "errors": [ + { + "error_type": "TypeError", + "message": "Cannot cast string to integer", + "table_name": "users", + "load_id": "1234567890", + "suggested_fix": "Check data types in source data. Consider adding data type hints in your pipeline schema." + } + ], + "error_count": 1, + "last_n_runs_checked": 3 +} +``` + +--- + +### 5. `scaffold_pipeline` + +Generate scaffold code for a new dlt pipeline from a data source. + +**Parameters:** +- `source_type` (required): Type of data source. Options: `"rest_api"`, `"sql_database"`, `"filesystem"`. +- `destination` (required): Destination database. Options: `"duckdb"`, `"postgres"`, `"bigquery"`, `"snowflake"`. +- `pipeline_name` (required): Name for the new pipeline. + +**Example:** +```json +{ + "source_type": "rest_api", + "destination": "duckdb", + "pipeline_name": "my_new_pipeline" +} +``` + +**Output:** +```json +{ + "pipeline_name": "my_new_pipeline", + "source_type": "rest_api", + "destination": "duckdb", + "code": { + "main_file": "my_new_pipeline_pipeline.py", + "content": "# Generated pipeline code...", + "description": "REST API pipeline scaffold for my_new_pipeline" + }, + "file_structure": { + "files": [ + "my_new_pipeline_pipeline.py", + ".dlt/secrets.toml", + ".dlt/config.toml", + "requirements.txt" + ], + "directories": [".dlt/", "data/"] + }, + "next_steps": [ + "1. Create the pipeline file: my_new_pipeline_pipeline.py", + "2. Add API credentials to .dlt/secrets.toml", + "3. Configure destination credentials in .dlt/secrets.toml", + "4. Install dependencies: pip install dlt", + "5. Run the pipeline: python my_new_pipeline_pipeline.py" + ] +} +``` + +--- + +### 6. `review_schema_evolution` + +Review and summarize schema changes across pipeline runs. + +**Parameters:** +- `pipeline_name` (optional): Name of the pipeline. If omitted, uses first available pipeline. +- `compare_runs` (optional): Number of recent runs to compare. Defaults to 2. + +**Example:** +```json +{ + "pipeline_name": "my_pipeline", + "compare_runs": 3 +} +``` + +**Output:** +```json +{ + "pipeline_name": "my_pipeline", + "evolution": [ + { + "table_name": "users", + "added_columns": [ + { + "name": "last_login", + "data_type": "timestamp", + "nullable": true, + "primary_key": false + } + ], + "removed_columns": [], + "modified_columns": [], + "type_changes": [] + } + ], + "compare_runs": 3, + "summary": { + "tables_changed": 1, + "total_added_columns": 1, + "total_removed_columns": 0, + "total_type_changes": 0 + } +} +``` + +## Supported Destinations + +The server supports querying data from the following destination databases: + +- **DuckDB**: Local file-based database (default for local development) +- **PostgreSQL**: Requires `psycopg2-binary` package +- **BigQuery**: (Schema inspection supported, querying requires additional setup) +- **Snowflake**: (Schema inspection supported, querying requires additional setup) + +## Safety Features + +- **Read-only queries**: SQL validation ensures only SELECT statements are executed +- **Query sanitization**: Dangerous keywords (INSERT, UPDATE, DELETE, DROP, etc.) are blocked +- **Connection limits**: Query results are limited by default (configurable) +- **Error handling**: Comprehensive error messages with suggested fixes + +## Development + +### Running Tests + +```bash +# Install test dependencies +pip install -e ".[dev]" + +# Run tests +pytest + +# Run with coverage +pytest --cov=dlt_mcp --cov-report=html +``` + +### Code Formatting + +```bash +# Format code with black +black src/ + +# Lint with ruff +ruff check src/ +``` + +## Troubleshooting + +### Pipeline Not Found + +If the server cannot find your pipeline: + +1. Ensure you're running the server from the correct working directory +2. Check that your pipeline has been run at least once (creates `.dlt/` directory) +3. Verify the pipeline name matches exactly (case-sensitive) + +### Destination Connection Issues + +If querying the destination fails: + +1. Verify database credentials are configured in `.dlt/secrets.toml` +2. For PostgreSQL, ensure `psycopg2-binary` is installed +3. Check network connectivity for remote databases +4. Verify the pipeline has successfully loaded data to the destination + +### Query Validation Errors + +If queries are rejected: + +1. Ensure queries start with `SELECT` +2. Remove any INSERT, UPDATE, DELETE, or other mutation statements +3. Check for SQL comments that might interfere with validation + +## License + +MIT License - see LICENSE file for details. + +## Contributing + +Contributions are welcome! Please open an issue or submit a pull request. + +## Related Resources + +- [dlt Documentation](https://dlthub.com/docs) +- [Model Context Protocol](https://modelcontextprotocol.io) +- [Continue.dev Documentation](https://docs.continue.dev) + diff --git a/mcp-servers/dlt-mcp/pyproject.toml b/mcp-servers/dlt-mcp/pyproject.toml new file mode 100644 index 0000000000..c186d79e4a --- /dev/null +++ b/mcp-servers/dlt-mcp/pyproject.toml @@ -0,0 +1,65 @@ +[project] +name = "dlt-mcp" +version = "0.1.0" +description = "MCP server for dlt (data load tool) - inspect, debug, and explore local dlt pipelines" +readme = "README.md" +requires-python = ">=3.8" +license = {text = "MIT"} +authors = [ + {name = "Continue.dev", email = "support@continue.dev"} +] +keywords = ["mcp", "dlt", "data-pipeline", "model-context-protocol"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", +] + +dependencies = [ + "mcp>=1.0.0", + "dlt>=0.5.0", + "duckdb>=0.9.0", + "psycopg2-binary>=2.9.0", + "pydantic>=2.0.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0.0", + "pytest-mock>=3.10.0", + "pytest-asyncio>=0.21.0", + "black>=23.0.0", + "ruff>=0.1.0", +] + +[project.scripts] +dlt-mcp = "dlt_mcp.server:main" + +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +asyncio_mode = "auto" + +[tool.black] +line-length = 100 +target-version = ['py38'] + +[tool.ruff] +line-length = 100 +target-version = "py38" + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/__init__.py b/mcp-servers/dlt-mcp/src/dlt_mcp/__init__.py new file mode 100644 index 0000000000..da72efad6d --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/__init__.py @@ -0,0 +1,4 @@ +"""dlt MCP Server - Model Context Protocol server for dlt pipelines.""" + +__version__ = "0.1.0" + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/server.py b/mcp-servers/dlt-mcp/src/dlt_mcp/server.py new file mode 100644 index 0000000000..dced128f33 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/server.py @@ -0,0 +1,218 @@ +"""MCP server entry point for dlt pipeline inspection.""" + +import asyncio +from mcp.server import Server +from mcp.server.stdio import stdio_server +from mcp.types import Tool, TextContent + +from .tools import ( + inspect_pipeline, + get_schema, + query_destination, + analyze_errors, + scaffold_pipeline, + review_schema_evolution, +) + + +# Create the MCP server +server = Server("dlt-mcp") + + +@server.list_tools() +async def list_tools() -> list[Tool]: + """List all available tools.""" + return [ + Tool( + name="inspect_pipeline", + description="Inspect dlt pipeline execution details including load info, timing, file sizes, and rows loaded", + inputSchema={ + "type": "object", + "properties": { + "pipeline_name": { + "type": "string", + "description": "Name of the pipeline to inspect. If omitted, auto-discovers pipelines in the current directory." + }, + "working_dir": { + "type": "string", + "description": "Directory to search for pipelines. Defaults to current working directory." + } + } + } + ), + Tool( + name="get_schema", + description="Retrieve pipeline schema metadata including tables, columns, data types, and constraints", + inputSchema={ + "type": "object", + "properties": { + "pipeline_name": { + "type": "string", + "description": "Name of the pipeline. If omitted, uses the first available pipeline." + }, + "table_name": { + "type": "string", + "description": "Specific table name. If omitted, returns schema for all tables." + }, + "include_hints": { + "type": "boolean", + "description": "Include dlt hints and annotations in the schema. Defaults to false." + } + } + } + ), + Tool( + name="query_destination", + description="Execute a read-only SQL SELECT query on the pipeline's destination database (DuckDB, PostgreSQL)", + inputSchema={ + "type": "object", + "properties": { + "pipeline_name": { + "type": "string", + "description": "Name of the pipeline. If omitted, uses the first available pipeline." + }, + "query": { + "type": "string", + "description": "SQL SELECT query to execute. Only SELECT statements are allowed for safety." + }, + "limit": { + "type": "integer", + "description": "Maximum number of rows to return. Defaults to 100." + } + }, + "required": ["query"] + } + ), + Tool( + name="analyze_errors", + description="Analyze pipeline load errors and explain root causes with suggested fixes", + inputSchema={ + "type": "object", + "properties": { + "pipeline_name": { + "type": "string", + "description": "Name of the pipeline. If omitted, uses the first available pipeline." + }, + "last_n_runs": { + "type": "integer", + "description": "Number of recent runs to analyze. Defaults to 1." + } + } + } + ), + Tool( + name="scaffold_pipeline", + description="Generate scaffold code for a new dlt pipeline from a data source", + inputSchema={ + "type": "object", + "properties": { + "source_type": { + "type": "string", + "description": "Type of data source: 'rest_api', 'sql_database', 'filesystem'", + "enum": ["rest_api", "sql_database", "filesystem"] + }, + "destination": { + "type": "string", + "description": "Destination database: 'duckdb', 'postgres', 'bigquery', etc.", + "enum": ["duckdb", "postgres", "bigquery", "snowflake"] + }, + "pipeline_name": { + "type": "string", + "description": "Name for the new pipeline" + } + }, + "required": ["source_type", "destination", "pipeline_name"] + } + ), + Tool( + name="review_schema_evolution", + description="Review and summarize schema changes across pipeline runs", + inputSchema={ + "type": "object", + "properties": { + "pipeline_name": { + "type": "string", + "description": "Name of the pipeline. If omitted, uses the first available pipeline." + }, + "compare_runs": { + "type": "integer", + "description": "Number of recent runs to compare. Defaults to 2." + } + } + } + ), + ] + + +@server.call_tool() +async def call_tool(name: str, arguments: dict) -> list[TextContent]: + """Handle tool calls.""" + try: + if name == "inspect_pipeline": + result = await inspect_pipeline( + pipeline_name=arguments.get("pipeline_name"), + working_dir=arguments.get("working_dir") + ) + elif name == "get_schema": + result = await get_schema( + pipeline_name=arguments.get("pipeline_name"), + table_name=arguments.get("table_name"), + include_hints=arguments.get("include_hints", False) + ) + elif name == "query_destination": + result = await query_destination( + pipeline_name=arguments.get("pipeline_name"), + query=arguments.get("query"), + limit=arguments.get("limit", 100) + ) + elif name == "analyze_errors": + result = await analyze_errors( + pipeline_name=arguments.get("pipeline_name"), + last_n_runs=arguments.get("last_n_runs", 1) + ) + elif name == "scaffold_pipeline": + result = await scaffold_pipeline( + source_type=arguments.get("source_type"), + destination=arguments.get("destination"), + pipeline_name=arguments.get("pipeline_name") + ) + elif name == "review_schema_evolution": + result = await review_schema_evolution( + pipeline_name=arguments.get("pipeline_name"), + compare_runs=arguments.get("compare_runs", 2) + ) + else: + raise ValueError(f"Unknown tool: {name}") + + # Return result as JSON text content + import json + return [TextContent( + type="text", + text=json.dumps(result, indent=2) + )] + + except Exception as e: + import json + error_result = { + "error": str(e), + "error_type": type(e).__name__ + } + return [TextContent( + type="text", + text=json.dumps(error_result, indent=2) + )] + + +async def main(): + """Main entry point for the MCP server.""" + async with stdio_server() as (read_stream, write_stream): + await server.run( + read_stream, + write_stream, + server.create_initialization_options() + ) + + +if __name__ == "__main__": + asyncio.run(main()) + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/__init__.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/__init__.py new file mode 100644 index 0000000000..0b4db9b4fd --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/__init__.py @@ -0,0 +1,18 @@ +"""Tool implementations for dlt MCP server.""" + +from .pipeline_inspect import inspect_pipeline +from .schema_metadata import get_schema +from .query_data import query_destination +from .error_analysis import analyze_errors +from .pipeline_scaffold import scaffold_pipeline +from .schema_evolution import review_schema_evolution + +__all__ = [ + "inspect_pipeline", + "get_schema", + "query_destination", + "analyze_errors", + "scaffold_pipeline", + "review_schema_evolution", +] + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py new file mode 100644 index 0000000000..dcc537f155 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/error_analysis.py @@ -0,0 +1,150 @@ +"""Tool for analyzing pipeline load errors.""" + +from typing import Optional, Dict, Any, List +from pathlib import Path +from dlt.pipeline import Pipeline + +from ..utils import find_pipeline, validate_pipeline_name +from ..types import PipelineError + + +async def analyze_errors( + pipeline_name: Optional[str] = None, + last_n_runs: int = 1 +) -> Dict[str, Any]: + """ + Analyze pipeline load errors and explain root causes. + + Args: + pipeline_name: Name of the pipeline. If None, uses first available. + last_n_runs: Number of recent runs to analyze. + + Returns: + Dictionary with error details, affected tables, and suggested fixes. + """ + # Validate inputs + validated_name = validate_pipeline_name(pipeline_name) if pipeline_name else None + + # Find the pipeline + pipeline = find_pipeline(validated_name) + if pipeline is None: + return { + "error": f"Pipeline not found: {pipeline_name or 'auto-discover'}", + "errors": [], + "error_count": 0 + } + + try: + errors: List[PipelineError] = [] + + # Try to get error information from pipeline state + try: + # Check for error logs in the pipeline directory + pipeline_dir = Path(pipeline.pipelines_dir) / pipeline.pipeline_name if hasattr(pipeline, 'pipelines_dir') else None + if pipeline_dir and pipeline_dir.exists(): + # Look for error logs or failed load packages + error_logs = list(pipeline_dir.rglob("*.error")) + error_logs.extend(list(pipeline_dir.rglob("*error*.log"))) + + for error_file in error_logs[:last_n_runs * 5]: # Limit search + try: + error_content = error_file.read_text() + # Parse error content + error_type = "LoadError" + message = error_content[:500] # First 500 chars + stack_trace = error_content if len(error_content) > 500 else None + + # Try to extract table name from path + table_name = None + if "table" in error_file.name.lower(): + parts = error_file.stem.split("_") + for part in parts: + if part not in ["error", "log", "load"]: + table_name = part + break + + errors.append(PipelineError( + error_type=error_type, + message=message, + table_name=table_name, + stack_trace=stack_trace, + suggested_fix=_suggest_fix(error_type, message) + )) + except Exception: + pass + except Exception: + pass + + # Check for failed loads + try: + if hasattr(pipeline, 'list_failed_loads'): + failed_loads = pipeline.list_failed_loads() + for load_id in failed_loads[:last_n_runs]: + try: + load_info = pipeline.get_load_package_info(load_id) + if load_info and hasattr(load_info, 'exception'): + errors.append(PipelineError( + error_type=type(load_info.exception).__name__, + message=str(load_info.exception), + load_id=load_id, + suggested_fix=_suggest_fix(type(load_info.exception).__name__, str(load_info.exception)) + )) + except Exception: + pass + except Exception: + pass + + # If no errors found, check for warnings + if not errors: + return { + "pipeline_name": pipeline.pipeline_name, + "errors": [], + "error_count": 0, + "status": "no_errors_found", + "message": f"No errors found in the last {last_n_runs} run(s)" + } + + return { + "pipeline_name": pipeline.pipeline_name, + "errors": [error.model_dump() for error in errors], + "error_count": len(errors), + "last_n_runs_checked": last_n_runs + } + + except Exception as e: + return { + "error": str(e), + "error_type": type(e).__name__, + "pipeline_name": pipeline.pipeline_name if pipeline else None, + "errors": [], + "error_count": 0 + } + + +def _suggest_fix(error_type: str, message: str) -> Optional[str]: + """Generate suggested fixes based on error type and message.""" + message_lower = message.lower() + + # Data type errors + if "type" in message_lower or "cast" in message_lower: + return "Check data types in source data. Consider adding data type hints in your pipeline schema." + + # Connection errors + if "connection" in message_lower or "connect" in message_lower: + return "Verify destination database credentials and network connectivity." + + # Permission errors + if "permission" in message_lower or "access" in message_lower: + return "Check database user permissions. Ensure the user has SELECT and INSERT privileges." + + # Schema errors + if "schema" in message_lower or "column" in message_lower: + return "Review schema changes. The source data structure may have changed." + + # Memory errors + if "memory" in message_lower or "out of memory" in message_lower: + return "Consider processing data in smaller batches or increasing available memory." + + # Generic suggestion + return "Review the error message and check the pipeline configuration. Ensure source data matches expected schema." + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py new file mode 100644 index 0000000000..510e1e4943 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_inspect.py @@ -0,0 +1,141 @@ +"""Tool for inspecting dlt pipeline execution details.""" + +import os +from typing import Optional, Dict, Any +from pathlib import Path +from dlt.pipeline import Pipeline +from dlt.pipeline.pipeline import LoadInfo + +from ..utils import find_pipeline, validate_pipeline_name, validate_working_dir +from ..types import LoadInfo as LoadInfoType + + +async def inspect_pipeline( + pipeline_name: Optional[str] = None, + working_dir: Optional[str] = None +) -> Dict[str, Any]: + """ + Inspect dlt pipeline execution details. + + Args: + pipeline_name: Name of the pipeline to inspect. If None, auto-discovers. + working_dir: Directory to search for pipelines. + + Returns: + Dictionary with load info, timing, file sizes, and rows loaded. + """ + # Validate inputs + validated_name = validate_pipeline_name(pipeline_name) if pipeline_name else None + validated_dir = validate_working_dir(working_dir) if working_dir else None + + # Find the pipeline + pipeline = find_pipeline(validated_name, validated_dir) + if pipeline is None: + return { + "error": f"Pipeline not found: {pipeline_name or 'auto-discover'}", + "available_pipelines": [] + } + + try: + # Get pipeline info + pipeline_info = { + "name": pipeline.pipeline_name, + "destination": pipeline.destination.destination_name if hasattr(pipeline, 'destination') and pipeline.destination else None, + "dataset_name": pipeline.dataset_name if hasattr(pipeline, 'dataset_name') else None, + } + + # Get load information + loads: list[LoadInfoType] = [] + file_sizes: Dict[str, int] = {} + rows_loaded: Dict[str, int] = {} + + # Try to get the latest load info + try: + # Access the pipeline's load history + if hasattr(pipeline, 'list_completed_loads'): + completed_loads = pipeline.list_completed_loads() + for load_id in completed_loads[:10]: # Get last 10 loads + try: + load_info = pipeline.get_load_package_info(load_id) + if load_info: + load_data = { + "load_id": load_id, + "status": "completed", + "started_at": str(load_info.started_at) if hasattr(load_info, 'started_at') else None, + "finished_at": str(load_info.finished_at) if hasattr(load_info, 'finished_at') else None, + } + + # Calculate duration + if load_data["started_at"] and load_data["finished_at"]: + from datetime import datetime + try: + start = datetime.fromisoformat(load_data["started_at"].replace('Z', '+00:00')) + finish = datetime.fromisoformat(load_data["finished_at"].replace('Z', '+00:00')) + duration = (finish - start).total_seconds() + load_data["duration_seconds"] = duration + except Exception: + pass + + # Get table information + tables = [] + if hasattr(load_info, 'schema_update'): + # Extract table info from schema updates + pass + + loads.append(LoadInfoType(**load_data)) + except Exception: + pass + except Exception: + # If we can't get load info, continue with basic info + pass + + # Get file sizes from the pipeline directory + try: + pipeline_dir = Path(pipeline.pipelines_dir) / pipeline.pipeline_name if hasattr(pipeline, 'pipelines_dir') else None + if pipeline_dir and pipeline_dir.exists(): + for file_path in pipeline_dir.rglob("*"): + if file_path.is_file(): + file_size = file_path.stat().st_size + relative_path = str(file_path.relative_to(pipeline_dir)) + file_sizes[relative_path] = file_size + except Exception: + pass + + # Try to get row counts from destination + try: + if hasattr(pipeline, 'destination') and pipeline.destination: + from ..utils.db_connector import get_destination_connection, execute_query + conn = get_destination_connection(pipeline) + if conn: + try: + # Get table names from schema + if hasattr(pipeline, 'schema') and pipeline.schema: + for table_name in pipeline.schema.tables.keys(): + try: + query = f"SELECT COUNT(*) as count FROM {table_name}" + result = execute_query(conn, query, limit=1) + if result["rows"]: + rows_loaded[table_name] = result["rows"][0].get("count", 0) + except Exception: + pass + finally: + if hasattr(conn, 'close'): + conn.close() + except Exception: + pass + + return { + "pipeline": pipeline_info, + "loads": [load.model_dump() for load in loads], + "file_sizes": file_sizes, + "rows_loaded": rows_loaded, + "latest_load": loads[0].model_dump() if loads else None + } + + except Exception as e: + return { + "error": str(e), + "error_type": type(e).__name__, + "pipeline_name": pipeline.pipeline_name if pipeline else None + } + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py new file mode 100644 index 0000000000..d59a92d008 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/pipeline_scaffold.py @@ -0,0 +1,268 @@ +"""Tool for scaffolding new dlt pipelines.""" + +from typing import Dict, Any + + +async def scaffold_pipeline( + source_type: str, + destination: str, + pipeline_name: str +) -> Dict[str, Any]: + """ + Generate scaffold code for a new dlt pipeline. + + Args: + source_type: Type of data source: "rest_api", "sql_database", "filesystem" + destination: Destination database: "duckdb", "postgres", "bigquery", etc. + pipeline_name: Name for the new pipeline. + + Returns: + Dictionary with generated code snippets, file structure, and next steps. + """ + # Validate inputs + valid_sources = ["rest_api", "sql_database", "filesystem"] + if source_type not in valid_sources: + return { + "error": f"Invalid source_type: {source_type}. Must be one of {valid_sources}", + "code": None + } + + valid_destinations = ["duckdb", "postgres", "bigquery", "snowflake"] + if destination not in valid_destinations: + return { + "error": f"Invalid destination: {destination}. Must be one of {valid_destinations}", + "code": None + } + + # Generate code based on source type + if source_type == "rest_api": + code = _scaffold_rest_api_pipeline(pipeline_name, destination) + elif source_type == "sql_database": + code = _scaffold_sql_database_pipeline(pipeline_name, destination) + elif source_type == "filesystem": + code = _scaffold_filesystem_pipeline(pipeline_name, destination) + else: + code = None + + return { + "pipeline_name": pipeline_name, + "source_type": source_type, + "destination": destination, + "code": code, + "file_structure": _get_file_structure(pipeline_name), + "next_steps": _get_next_steps(source_type, destination, pipeline_name) + } + + +def _scaffold_rest_api_pipeline(pipeline_name: str, destination: str) -> Dict[str, str]: + """Generate code for a REST API pipeline.""" + destination_config = _get_destination_config(destination) + + code = f'''""" +{destination_config["description"]} +""" + +import dlt +from dlt.sources.helpers import requests + +@dlt.source +def {pipeline_name}_source(api_key: str = dlt.secrets.value): + """ + Source function for {pipeline_name} API. + + Args: + api_key: API key for authentication (stored in secrets.toml) + """ + + @dlt.resource + def users(): + """Load users from API""" + url = "https://api.example.com/users" + headers = {{"Authorization": f"Bearer {{api_key}}"}} + + response = requests.get(url, headers=headers) + response.raise_for_status() + + yield response.json() + + return users() + + +if __name__ == "__main__": + # Configure pipeline + pipeline = dlt.pipeline( + pipeline_name="{pipeline_name}", + destination="{destination}", + dataset_name="{pipeline_name}_data" + ) + + # Run pipeline + load_info = pipeline.run({pipeline_name}_source()) + print(load_info) +''' + return { + "main_file": f"{pipeline_name}_pipeline.py", + "content": code, + "description": f"REST API pipeline scaffold for {pipeline_name}" + } + + +def _scaffold_sql_database_pipeline(pipeline_name: str, destination: str) -> Dict[str, str]: + """Generate code for a SQL database pipeline.""" + destination_config = _get_destination_config(destination) + + code = f'''""" +{destination_config["description"]} +""" + +import dlt +from dlt.sources.sql_database import sql_database + +@dlt.source +def {pipeline_name}_source(): + """ + Source function for {pipeline_name} SQL database. + """ + + # Configure source database connection + # Update connection string in secrets.toml + return sql_database( + credentials=dlt.secrets["source_db"], + schema="public", # Update schema name as needed + table_names=["users", "orders"] # Update table names as needed + ) + + +if __name__ == "__main__": + # Configure pipeline + pipeline = dlt.pipeline( + pipeline_name="{pipeline_name}", + destination="{destination}", + dataset_name="{pipeline_name}_data" + ) + + # Run pipeline + load_info = pipeline.run({pipeline_name}_source()) + print(load_info) +''' + return { + "main_file": f"{pipeline_name}_pipeline.py", + "content": code, + "description": f"SQL database pipeline scaffold for {pipeline_name}" + } + + +def _scaffold_filesystem_pipeline(pipeline_name: str, destination: str) -> Dict[str, str]: + """Generate code for a filesystem pipeline.""" + destination_config = _get_destination_config(destination) + + code = f'''""" +{destination_config["description"]} +""" + +import dlt +from dlt.sources.filesystem import filesystem + +@dlt.source +def {pipeline_name}_source(): + """ + Source function for {pipeline_name} filesystem. + """ + + # Configure filesystem source + # Update path in secrets.toml or pass directly + return filesystem( + bucket_url="path/to/data", # Update path as needed + file_glob="*.csv" # Update file pattern as needed + ) + + +if __name__ == "__main__": + # Configure pipeline + pipeline = dlt.pipeline( + pipeline_name="{pipeline_name}", + destination="{destination}", + dataset_name="{pipeline_name}_data" + ) + + # Run pipeline + load_info = pipeline.run({pipeline_name}_source()) + print(load_info) +''' + return { + "main_file": f"{pipeline_name}_pipeline.py", + "content": code, + "description": f"Filesystem pipeline scaffold for {pipeline_name}" + } + + +def _get_destination_config(destination: str) -> Dict[str, str]: + """Get destination-specific configuration.""" + configs = { + "duckdb": { + "description": "Pipeline loads data to DuckDB (local file-based database)", + "credentials": "# DuckDB doesn't require credentials, uses local file" + }, + "postgres": { + "description": "Pipeline loads data to PostgreSQL database", + "credentials": '''# Add to .dlt/secrets.toml: +[destination.postgres.credentials] +host = "localhost" +port = 5432 +database = "your_database" +username = "your_username" +password = "your_password"''' + }, + "bigquery": { + "description": "Pipeline loads data to Google BigQuery", + "credentials": '''# Add to .dlt/secrets.toml: +[destination.bigquery.credentials] +project_id = "your-project-id" +private_key = "path/to/service-account-key.json"''' + }, + "snowflake": { + "description": "Pipeline loads data to Snowflake", + "credentials": '''# Add to .dlt/secrets.toml: +[destination.snowflake.credentials] +database = "your_database" +password = "your_password" +username = "your_username" +warehouse = "your_warehouse" +host = "your_account.snowflakecomputing.com"''' + } + } + return configs.get(destination, {"description": f"Pipeline loads data to {destination}", "credentials": ""}) + + +def _get_file_structure(pipeline_name: str) -> Dict[str, Any]: + """Get recommended file structure.""" + return { + "files": [ + f"{pipeline_name}_pipeline.py", + ".dlt/secrets.toml", + ".dlt/config.toml", + "requirements.txt" + ], + "directories": [ + ".dlt/", + "data/" # Optional: for local data files + ] + } + + +def _get_next_steps(source_type: str, destination: str, pipeline_name: str) -> list[str]: + """Get next steps for setting up the pipeline.""" + steps = [ + f"1. Create the pipeline file: {pipeline_name}_pipeline.py", + "2. Configure destination credentials in .dlt/secrets.toml", + "3. Install dependencies: pip install dlt", + f"4. Run the pipeline: python {pipeline_name}_pipeline.py" + ] + + if source_type == "rest_api": + steps.insert(1, "2. Add API credentials to .dlt/secrets.toml") + elif source_type == "sql_database": + steps.insert(1, "2. Configure source database connection in .dlt/secrets.toml") + + return steps + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py new file mode 100644 index 0000000000..510bc55a90 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/query_data.py @@ -0,0 +1,107 @@ +"""Tool for querying destination databases with read-only SQL.""" + +from typing import Optional, Dict, Any +from dlt.pipeline import Pipeline + +from ..utils import find_pipeline, validate_pipeline_name +from ..utils.db_connector import get_destination_connection, execute_query, validate_sql_query + + +async def query_destination( + pipeline_name: Optional[str] = None, + query: Optional[str] = None, + limit: int = 100 +) -> Dict[str, Any]: + """ + Query data from the pipeline's destination database. + + Args: + pipeline_name: Name of the pipeline. If None, uses first available. + query: SQL SELECT query to execute. + limit: Maximum number of rows to return. + + Returns: + Dictionary with query results, column names, and row count. + """ + if not query: + return { + "error": "Query parameter is required", + "columns": [], + "rows": [], + "row_count": 0 + } + + # Validate query is read-only + is_valid, error = validate_sql_query(query) + if not is_valid: + return { + "error": error, + "columns": [], + "rows": [], + "row_count": 0 + } + + # Validate inputs + validated_name = validate_pipeline_name(pipeline_name) if pipeline_name else None + + # Find the pipeline + pipeline = find_pipeline(validated_name) + if pipeline is None: + return { + "error": f"Pipeline not found: {pipeline_name or 'auto-discover'}", + "columns": [], + "rows": [], + "row_count": 0 + } + + try: + # Get database connection + conn = get_destination_connection(pipeline, limit=limit) + if conn is None: + return { + "error": "Could not connect to destination database", + "pipeline_name": pipeline.pipeline_name, + "destination": pipeline.destination.destination_name if hasattr(pipeline, 'destination') and pipeline.destination else None, + "columns": [], + "rows": [], + "row_count": 0 + } + + try: + # Execute query + result = execute_query(conn, query, limit=limit) + + return { + "pipeline_name": pipeline.pipeline_name, + "query": query, + "columns": result["columns"], + "rows": result["rows"], + "row_count": result["row_count"], + "limit_applied": limit + } + finally: + # Close connection + if hasattr(conn, 'close'): + conn.close() + + except ValueError as e: + # Query validation error + return { + "error": str(e), + "error_type": "ValidationError", + "query": query, + "columns": [], + "rows": [], + "row_count": 0 + } + except Exception as e: + return { + "error": str(e), + "error_type": type(e).__name__, + "pipeline_name": pipeline.pipeline_name if pipeline else None, + "query": query, + "columns": [], + "rows": [], + "row_count": 0 + } + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py new file mode 100644 index 0000000000..38e523e1f4 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_evolution.py @@ -0,0 +1,157 @@ +"""Tool for reviewing schema evolution across pipeline runs.""" + +from typing import Optional, Dict, Any, List +from dlt.pipeline import Pipeline + +from ..utils import find_pipeline, validate_pipeline_name +from ..types import SchemaEvolution, SchemaColumn + + +async def review_schema_evolution( + pipeline_name: Optional[str] = None, + compare_runs: int = 2 +) -> Dict[str, Any]: + """ + Review and summarize schema changes across pipeline runs. + + Args: + pipeline_name: Name of the pipeline. If None, uses first available. + compare_runs: Number of recent runs to compare. + + Returns: + Dictionary with added/removed/modified columns, type changes, and migration suggestions. + """ + # Validate inputs + validated_name = validate_pipeline_name(pipeline_name) if pipeline_name else None + + # Find the pipeline + pipeline = find_pipeline(validated_name) + if pipeline is None: + return { + "error": f"Pipeline not found: {pipeline_name or 'auto-discover'}", + "evolution": [] + } + + try: + # Get schema from pipeline + if not hasattr(pipeline, 'schema') or pipeline.schema is None: + return { + "error": "Pipeline schema not available", + "pipeline_name": pipeline.pipeline_name, + "evolution": [] + } + + current_schema = pipeline.schema + evolution: List[SchemaEvolution] = [] + + # Try to get historical schema information + # This is a simplified version - in practice, you'd compare against stored schema versions + try: + # Get current tables and columns + for table_name, table_schema in current_schema.tables.items(): + current_columns = set(table_schema.get("columns", {}).keys()) + + # Try to get previous schema version + # In a real implementation, you'd load this from pipeline state/history + previous_columns = set() # Placeholder - would load from history + + # Calculate differences + added_columns = current_columns - previous_columns + removed_columns = previous_columns - current_columns + + # Build evolution info + added_cols: List[SchemaColumn] = [] + for col_name in added_columns: + col_schema = table_schema.get("columns", {}).get(col_name, {}) + added_cols.append(SchemaColumn( + name=col_name, + data_type=str(col_schema.get("data_type", "unknown")), + nullable=col_schema.get("nullable", True), + primary_key=col_schema.get("primary_key", False) + )) + + # Type changes (simplified - would compare actual types) + type_changes: List[Dict[str, Any]] = [] + + # Migration suggestions + migration_suggestions = _generate_migration_suggestions( + table_name, + added_cols, + list(removed_columns), + type_changes + ) + + evolution.append(SchemaEvolution( + table_name=table_name, + added_columns=added_cols, + removed_columns=list(removed_columns), + modified_columns=[], + type_changes=type_changes + )) + + except Exception: + # If we can't get evolution info, return current schema info + pass + + # If no evolution detected (no history available), return current state + if not evolution: + return { + "pipeline_name": pipeline.pipeline_name, + "evolution": [], + "message": "Schema evolution tracking requires historical schema data. Current schema is available via get_schema tool.", + "current_tables": list(current_schema.tables.keys()) + } + + return { + "pipeline_name": pipeline.pipeline_name, + "evolution": [evol.model_dump() for evol in evolution], + "compare_runs": compare_runs, + "summary": { + "tables_changed": len(evolution), + "total_added_columns": sum(len(evol.added_columns) for evol in evolution), + "total_removed_columns": sum(len(evol.removed_columns) for evol in evolution), + "total_type_changes": sum(len(evol.type_changes) for evol in evolution) + } + } + + except Exception as e: + return { + "error": str(e), + "error_type": type(e).__name__, + "pipeline_name": pipeline.pipeline_name if pipeline else None, + "evolution": [] + } + + +def _generate_migration_suggestions( + table_name: str, + added_columns: List[SchemaColumn], + removed_columns: List[str], + type_changes: List[Dict[str, Any]] +) -> List[str]: + """Generate migration suggestions based on schema changes.""" + suggestions: List[str] = [] + + if added_columns: + suggestions.append( + f"Table '{table_name}' has {len(added_columns)} new column(s). " + "Ensure downstream processes are updated to handle new columns." + ) + + if removed_columns: + suggestions.append( + f"Table '{table_name}' has {len(removed_columns)} removed column(s). " + "Verify that no downstream processes depend on these columns." + ) + + if type_changes: + suggestions.append( + f"Table '{table_name}' has {len(type_changes)} type change(s). " + "Review data compatibility and update any type-dependent logic." + ) + + if not suggestions: + suggestions.append(f"No schema changes detected for table '{table_name}'.") + + return suggestions + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_metadata.py b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_metadata.py new file mode 100644 index 0000000000..9524d49e85 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/tools/schema_metadata.py @@ -0,0 +1,111 @@ +"""Tool for retrieving pipeline schema metadata.""" + +from typing import Optional, Dict, Any, List +from dlt.pipeline import Pipeline + +from ..utils import find_pipeline, validate_pipeline_name +from ..types import SchemaTable, SchemaColumn + + +async def get_schema( + pipeline_name: Optional[str] = None, + table_name: Optional[str] = None, + include_hints: bool = False +) -> Dict[str, Any]: + """ + Retrieve pipeline schema metadata. + + Args: + pipeline_name: Name of the pipeline. If None, uses first available. + table_name: Specific table name. If None, returns all tables. + include_hints: Include dlt hints and annotations. + + Returns: + Dictionary with tables, columns, data types, and constraints. + """ + # Validate inputs + validated_name = validate_pipeline_name(pipeline_name) if pipeline_name else None + + # Find the pipeline + pipeline = find_pipeline(validated_name) + if pipeline is None: + return { + "error": f"Pipeline not found: {pipeline_name or 'auto-discover'}", + "tables": [] + } + + try: + # Get schema from pipeline + if not hasattr(pipeline, 'schema') or pipeline.schema is None: + return { + "error": "Pipeline schema not available", + "pipeline_name": pipeline.pipeline_name, + "tables": [] + } + + schema = pipeline.schema + tables: List[SchemaTable] = [] + + # Iterate through schema tables + for table_name_key, table_schema in schema.tables.items(): + # Filter by table_name if specified + if table_name and table_name_key != table_name: + continue + + columns: List[SchemaColumn] = [] + + # Extract column information + for column_name, column_schema in table_schema.get("columns", {}).items(): + # Get data type + data_type = column_schema.get("data_type", "unknown") + if isinstance(data_type, dict): + data_type = data_type.get("name", "unknown") + + # Get nullable + nullable = column_schema.get("nullable", True) + + # Get primary key + primary_key = column_schema.get("primary_key", False) + + # Get hints if requested + hints = None + if include_hints: + hints = { + "merge_key": column_schema.get("merge_key", False), + "unique": column_schema.get("unique", False), + "foreign_key": column_schema.get("foreign_key"), + "partition": column_schema.get("partition", False), + } + # Remove None values + hints = {k: v for k, v in hints.items() if v is not None and v is not False} + + columns.append(SchemaColumn( + name=column_name, + data_type=str(data_type), + nullable=nullable, + primary_key=primary_key, + hints=hints if include_hints and hints else None + )) + + # Get resource name if available + resource_name = table_schema.get("resource", table_name_key) + + tables.append(SchemaTable( + name=table_name_key, + columns=columns, + resource_name=resource_name + )) + + return { + "pipeline_name": pipeline.pipeline_name, + "tables": [table.model_dump() for table in tables], + "table_count": len(tables) + } + + except Exception as e: + return { + "error": str(e), + "error_type": type(e).__name__, + "pipeline_name": pipeline.pipeline_name if pipeline else None + } + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/types.py b/mcp-servers/dlt-mcp/src/dlt_mcp/types.py new file mode 100644 index 0000000000..9eb6209789 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/types.py @@ -0,0 +1,66 @@ +"""Type definitions for dlt MCP server.""" + +from typing import Optional, Dict, Any, List +from pydantic import BaseModel + + +class PipelineInfo(BaseModel): + """Information about a dlt pipeline.""" + + name: str + pipeline_dir: str + destination: Optional[str] = None + dataset_name: Optional[str] = None + + +class LoadInfo(BaseModel): + """Pipeline load execution information.""" + + load_id: str + status: str + started_at: Optional[str] = None + finished_at: Optional[str] = None + duration_seconds: Optional[float] = None + tables: List[Dict[str, Any]] = [] + file_sizes: Dict[str, int] = {} + rows_loaded: Dict[str, int] = {} + + +class SchemaColumn(BaseModel): + """Schema column definition.""" + + name: str + data_type: str + nullable: bool = True + primary_key: bool = False + hints: Optional[Dict[str, Any]] = None + + +class SchemaTable(BaseModel): + """Schema table definition.""" + + name: str + columns: List[SchemaColumn] + resource_name: Optional[str] = None + + +class PipelineError(BaseModel): + """Pipeline error information.""" + + error_type: str + message: str + table_name: Optional[str] = None + load_id: Optional[str] = None + stack_trace: Optional[str] = None + suggested_fix: Optional[str] = None + + +class SchemaEvolution(BaseModel): + """Schema evolution information.""" + + table_name: str + added_columns: List[SchemaColumn] = [] + removed_columns: List[str] = [] + modified_columns: List[Dict[str, Any]] = [] + type_changes: List[Dict[str, Any]] = [] + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/utils/__init__.py b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/__init__.py new file mode 100644 index 0000000000..755a8ebb65 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/__init__.py @@ -0,0 +1,15 @@ +"""Utility modules for dlt MCP server.""" + +from .pipeline_discovery import discover_pipelines, find_pipeline +from .db_connector import get_destination_connection, validate_sql_query +from .validation import validate_pipeline_name, validate_working_dir + +__all__ = [ + "discover_pipelines", + "find_pipeline", + "get_destination_connection", + "validate_sql_query", + "validate_pipeline_name", + "validate_working_dir", +] + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py new file mode 100644 index 0000000000..b15ba7d35a --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/db_connector.py @@ -0,0 +1,161 @@ +"""Database connector utilities for safe read-only queries.""" + +import re +from typing import Optional, Dict, Any, List, Tuple +from dlt.pipeline import Pipeline +import duckdb +import psycopg2 +from psycopg2.extras import RealDictCursor + + +def validate_sql_query(query: str) -> Tuple[bool, Optional[str]]: + """ + Validate that a SQL query is read-only (SELECT only). + + Args: + query: SQL query string to validate. + + Returns: + Tuple of (is_valid, error_message). is_valid is True if query is safe. + """ + # Remove comments and normalize whitespace + query_clean = re.sub(r'--.*?$', '', query, flags=re.MULTILINE) + query_clean = re.sub(r'/\*.*?\*/', '', query_clean, flags=re.DOTALL) + query_clean = ' '.join(query_clean.split()) + + # Convert to uppercase for checking + query_upper = query_clean.upper().strip() + + # Check for dangerous keywords + dangerous_keywords = [ + 'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', + 'TRUNCATE', 'GRANT', 'REVOKE', 'EXEC', 'EXECUTE', 'CALL' + ] + + for keyword in dangerous_keywords: + # Use word boundaries to avoid false positives + pattern = r'\b' + keyword + r'\b' + if re.search(pattern, query_upper): + return False, f"Query contains forbidden keyword: {keyword}. Only SELECT queries are allowed." + + # Must start with SELECT + if not query_upper.startswith('SELECT'): + return False, "Query must be a SELECT statement." + + return True, None + + +def get_destination_connection(pipeline: Pipeline, limit: int = 100) -> Optional[Any]: + """ + Get a connection to the pipeline's destination database. + + Args: + pipeline: dlt Pipeline object. + limit: Maximum number of rows to return (for safety). + + Returns: + Database connection object or None if connection fails. + """ + if not hasattr(pipeline, 'destination') or pipeline.destination is None: + return None + + destination_name = pipeline.destination.destination_name + + try: + if destination_name == "duckdb": + # For DuckDB, get the database path from the pipeline + db_path = pipeline.pipeline_name + if hasattr(pipeline.destination, 'credentials'): + if hasattr(pipeline.destination.credentials, 'database'): + db_path = pipeline.destination.credentials.database + elif isinstance(pipeline.destination.credentials, str): + db_path = pipeline.destination.credentials + + # DuckDB can work with in-memory or file-based databases + conn = duckdb.connect(db_path, read_only=True) + return conn + + elif destination_name == "postgres": + # For PostgreSQL, get connection details + if hasattr(pipeline.destination, 'credentials'): + creds = pipeline.destination.credentials + conn = psycopg2.connect( + host=getattr(creds, 'host', 'localhost'), + port=getattr(creds, 'port', 5432), + database=getattr(creds, 'database', 'postgres'), + user=getattr(creds, 'username', 'postgres'), + password=getattr(creds, 'password', ''), + cursor_factory=RealDictCursor + ) + return conn + + # Add support for other destinations as needed + # BigQuery, Snowflake, etc. + + except Exception as e: + # Log error but don't raise - return None to indicate failure + print(f"Failed to connect to destination: {e}") + return None + + return None + + +def execute_query(connection: Any, query: str, limit: int = 100) -> Dict[str, Any]: + """ + Execute a read-only query on the database connection. + + Args: + connection: Database connection object. + query: SQL SELECT query. + limit: Maximum number of rows to return. + + Returns: + Dictionary with query results, column names, and row count. + """ + # Validate query first + is_valid, error = validate_sql_query(query) + if not is_valid: + raise ValueError(error) + + # Add LIMIT if not present and limit is specified + query_upper = query.upper() + if limit > 0 and 'LIMIT' not in query_upper: + query = f"{query.rstrip(';')} LIMIT {limit}" + + try: + if isinstance(connection, duckdb.DuckDBPyConnection): + # DuckDB + cursor = connection.execute(query) + columns = [desc[0] for desc in cursor.description] if cursor.description else [] + rows = cursor.fetchall() + # Convert to list of dicts + results = [dict(zip(columns, row)) for row in rows] + return { + "columns": columns, + "rows": results, + "row_count": len(results) + } + + elif hasattr(connection, 'cursor'): + # PostgreSQL or similar + cursor = connection.cursor() + cursor.execute(query) + rows = cursor.fetchall() + columns = [desc[0] for desc in cursor.description] if cursor.description else [] + # If using RealDictCursor, rows are already dicts + if rows and isinstance(rows[0], dict): + results = rows + else: + results = [dict(zip(columns, row)) for row in rows] + cursor.close() + return { + "columns": columns, + "rows": results, + "row_count": len(results) + } + + except Exception as e: + raise RuntimeError(f"Query execution failed: {str(e)}") + + raise ValueError("Unsupported database connection type") + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/utils/pipeline_discovery.py b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/pipeline_discovery.py new file mode 100644 index 0000000000..7c0363bf10 --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/pipeline_discovery.py @@ -0,0 +1,146 @@ +"""Pipeline discovery utilities for locating local dlt pipelines.""" + +import os +from pathlib import Path +from typing import List, Optional +from dlt.pipeline import Pipeline + +from ..types import PipelineInfo + + +def discover_pipelines(working_dir: Optional[str] = None) -> List[PipelineInfo]: + """ + Discover all dlt pipelines in the given directory. + + Args: + working_dir: Directory to search for pipelines. Defaults to current working directory. + + Returns: + List of PipelineInfo objects for discovered pipelines. + """ + if working_dir is None: + working_dir = os.getcwd() + + working_path = Path(working_dir).resolve() + pipelines: List[PipelineInfo] = [] + + # Check if there's a .dlt directory in the working directory + dlt_dir = working_path / ".dlt" + if dlt_dir.exists() and dlt_dir.is_dir(): + # Look for pipeline state files + pipelines_dir = dlt_dir / "pipelines" + if pipelines_dir.exists(): + for pipeline_name_dir in pipelines_dir.iterdir(): + if pipeline_name_dir.is_dir(): + pipeline_name = pipeline_name_dir.name + try: + # Try to load the pipeline to get more info + pipeline = Pipeline(pipeline_name, pipelines_dir=str(pipelines_dir)) + pipelines.append( + PipelineInfo( + name=pipeline_name, + pipeline_dir=str(working_path), + destination=pipeline.destination.destination_name if hasattr(pipeline, 'destination') else None, + dataset_name=pipeline.dataset_name if hasattr(pipeline, 'dataset_name') else None, + ) + ) + except Exception: + # If we can't load it, still include basic info + pipelines.append( + PipelineInfo( + name=pipeline_name, + pipeline_dir=str(working_path), + ) + ) + + # Also check for pipelines in the user's home directory .dlt folder + # This is where dlt stores pipelines by default when not in a project directory + try: + home_dlt_dir = Path.home() / ".dlt" / "pipelines" + if home_dlt_dir.exists(): + for pipeline_name_dir in home_dlt_dir.iterdir(): + if pipeline_name_dir.is_dir(): + pipeline_name = pipeline_name_dir.name + # Check if we already found this pipeline + if not any(p.name == pipeline_name for p in pipelines): + try: + pipeline = Pipeline(pipeline_name) + pipelines.append( + PipelineInfo( + name=pipeline_name, + pipeline_dir=str(pipeline_name_dir), + destination=pipeline.destination.destination_name if hasattr(pipeline, 'destination') else None, + dataset_name=pipeline.dataset_name if hasattr(pipeline, 'dataset_name') else None, + ) + ) + except Exception: + pipelines.append( + PipelineInfo( + name=pipeline_name, + pipeline_dir=str(pipeline_name_dir), + ) + ) + except Exception: + # If we can't access the home dlt dir, continue + pass + + return pipelines + + +def find_pipeline(pipeline_name: Optional[str] = None, working_dir: Optional[str] = None) -> Optional[Pipeline]: + """ + Find and load a specific dlt pipeline. + + Args: + pipeline_name: Name of the pipeline to find. If None, tries to find the first available pipeline. + working_dir: Directory to search for pipelines. + + Returns: + Pipeline object if found, None otherwise. + """ + if working_dir is None: + working_dir = os.getcwd() + + working_path = Path(working_dir).resolve() + + # First try in the working directory + dlt_dir = working_path / ".dlt" / "pipelines" + if dlt_dir.exists(): + if pipeline_name: + pipeline_dir = dlt_dir / pipeline_name + if pipeline_dir.exists(): + try: + return Pipeline(pipeline_name, pipelines_dir=str(dlt_dir)) + except Exception: + pass + else: + # Try to find any pipeline in this directory + for pipeline_name_dir in dlt_dir.iterdir(): + if pipeline_name_dir.is_dir(): + try: + return Pipeline(pipeline_name_dir.name, pipelines_dir=str(dlt_dir)) + except Exception: + continue + + # Try in the user's home directory .dlt folder + try: + home_dlt_dir = Path.home() / ".dlt" / "pipelines" + if home_dlt_dir.exists(): + if pipeline_name: + try: + return Pipeline(pipeline_name) + except Exception: + pass + else: + # Try to find any pipeline + for pipeline_name_dir in home_dlt_dir.iterdir(): + if pipeline_name_dir.is_dir(): + try: + return Pipeline(pipeline_name_dir.name) + except Exception: + continue + except Exception: + pass + + return None + diff --git a/mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py new file mode 100644 index 0000000000..940ebaa02f --- /dev/null +++ b/mcp-servers/dlt-mcp/src/dlt_mcp/utils/validation.py @@ -0,0 +1,54 @@ +"""Validation utilities for input parameters.""" + +import os +from pathlib import Path +from typing import Optional + + +def validate_pipeline_name(pipeline_name: Optional[str]) -> Optional[str]: + """ + Validate pipeline name format. + + Args: + pipeline_name: Pipeline name to validate. + + Returns: + Validated pipeline name or None if invalid. + """ + if pipeline_name is None: + return None + + # Basic validation: no path separators, no empty string + if not pipeline_name or '/' in pipeline_name or '\\' in pipeline_name: + return None + + # Remove leading/trailing whitespace + pipeline_name = pipeline_name.strip() + if not pipeline_name: + return None + + return pipeline_name + + +def validate_working_dir(working_dir: Optional[str]) -> Optional[str]: + """ + Validate working directory path. + + Args: + working_dir: Directory path to validate. + + Returns: + Validated absolute path or None if invalid. + """ + if working_dir is None: + return None + + try: + path = Path(working_dir).resolve() + if path.exists() and path.is_dir(): + return str(path) + except Exception: + pass + + return None + diff --git a/mcp-servers/dlt-mcp/tests/__init__.py b/mcp-servers/dlt-mcp/tests/__init__.py new file mode 100644 index 0000000000..728999e34a --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/__init__.py @@ -0,0 +1,2 @@ +"""Tests for dlt MCP server.""" + diff --git a/mcp-servers/dlt-mcp/tests/conftest.py b/mcp-servers/dlt-mcp/tests/conftest.py new file mode 100644 index 0000000000..a8bbace80f --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/conftest.py @@ -0,0 +1,56 @@ +"""Pytest fixtures for dlt MCP server tests.""" + +import pytest +import tempfile +import shutil +from pathlib import Path +from unittest.mock import Mock, MagicMock + + +@pytest.fixture +def temp_dir(): + """Create a temporary directory for tests.""" + temp_path = tempfile.mkdtemp() + yield Path(temp_path) + shutil.rmtree(temp_path) + + +@pytest.fixture +def mock_pipeline(): + """Create a mock dlt pipeline.""" + pipeline = Mock() + pipeline.pipeline_name = "test_pipeline" + pipeline.dataset_name = "test_dataset" + pipeline.destination = Mock() + pipeline.destination.destination_name = "duckdb" + pipeline.schema = Mock() + pipeline.schema.tables = { + "users": { + "columns": { + "id": {"data_type": "bigint", "nullable": False, "primary_key": True}, + "name": {"data_type": "text", "nullable": True}, + "email": {"data_type": "text", "nullable": True} + }, + "resource": "users" + } + } + pipeline.pipelines_dir = "/tmp/.dlt/pipelines" + return pipeline + + +@pytest.fixture +def mock_pipeline_with_loads(mock_pipeline): + """Create a mock pipeline with load information.""" + def list_completed_loads(): + return ["load_1", "load_2"] + + def get_load_package_info(load_id): + load_info = Mock() + load_info.started_at = "2024-01-01T00:00:00Z" + load_info.finished_at = "2024-01-01T00:05:00Z" + return load_info + + mock_pipeline.list_completed_loads = list_completed_loads + mock_pipeline.get_load_package_info = get_load_package_info + return mock_pipeline + diff --git a/mcp-servers/dlt-mcp/tests/test_db_connector.py b/mcp-servers/dlt-mcp/tests/test_db_connector.py new file mode 100644 index 0000000000..0b86c118da --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/test_db_connector.py @@ -0,0 +1,82 @@ +"""Tests for database connector utilities.""" + +import pytest +from dlt_mcp.utils.db_connector import validate_sql_query, get_destination_connection +from unittest.mock import Mock, patch + + +def test_validate_sql_query_select(): + """Test validation of valid SELECT queries.""" + is_valid, error = validate_sql_query("SELECT * FROM users") + assert is_valid is True + assert error is None + + is_valid, error = validate_sql_query("SELECT id, name FROM users WHERE id > 10") + assert is_valid is True + assert error is None + + +def test_validate_sql_query_dangerous_keywords(): + """Test validation rejects dangerous SQL keywords.""" + dangerous_queries = [ + "DELETE FROM users", + "INSERT INTO users VALUES (1, 'test')", + "UPDATE users SET name = 'test'", + "DROP TABLE users", + "CREATE TABLE test", + "ALTER TABLE users ADD COLUMN test", + ] + + for query in dangerous_queries: + is_valid, error = validate_sql_query(query) + assert is_valid is False + assert error is not None + assert "forbidden" in error.lower() or "SELECT" in error + + +def test_validate_sql_query_with_comments(): + """Test validation handles SQL comments correctly.""" + query = "-- This is a comment\nSELECT * FROM users" + is_valid, error = validate_sql_query(query) + assert is_valid is True + + query = "/* Multi-line comment */ SELECT * FROM users" + is_valid, error = validate_sql_query(query) + assert is_valid is True + + +def test_validate_sql_query_case_insensitive(): + """Test validation is case-insensitive.""" + is_valid, error = validate_sql_query("select * from users") + assert is_valid is True + + is_valid, error = validate_sql_query("delete from users") + assert is_valid is False + + +def test_get_destination_connection_duckdb(mock_pipeline): + """Test getting DuckDB connection.""" + mock_pipeline.destination.destination_name = "duckdb" + with patch('dlt_mcp.utils.db_connector.duckdb') as mock_duckdb: + mock_conn = Mock() + mock_duckdb.connect.return_value = mock_conn + result = get_destination_connection(mock_pipeline) + assert result is not None + + +def test_get_destination_connection_postgres(mock_pipeline): + """Test getting PostgreSQL connection.""" + mock_pipeline.destination.destination_name = "postgres" + mock_pipeline.destination.credentials = Mock() + mock_pipeline.destination.credentials.host = "localhost" + mock_pipeline.destination.credentials.port = 5432 + mock_pipeline.destination.credentials.database = "test" + mock_pipeline.destination.credentials.username = "user" + mock_pipeline.destination.credentials.password = "pass" + + with patch('dlt_mcp.utils.db_connector.psycopg2') as mock_psycopg2: + mock_conn = Mock() + mock_psycopg2.connect.return_value = mock_conn + result = get_destination_connection(mock_pipeline) + assert result is not None + diff --git a/mcp-servers/dlt-mcp/tests/test_error_analysis.py b/mcp-servers/dlt-mcp/tests/test_error_analysis.py new file mode 100644 index 0000000000..c2cb179f02 --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/test_error_analysis.py @@ -0,0 +1,55 @@ +"""Tests for error analysis tool.""" + +import pytest +from unittest.mock import patch, Mock +from dlt_mcp.tools.error_analysis import analyze_errors + + +@pytest.mark.asyncio +async def test_analyze_errors_pipeline_not_found(): + """Test analyze_errors when pipeline is not found.""" + with patch('dlt_mcp.tools.error_analysis.find_pipeline', return_value=None): + result = await analyze_errors(pipeline_name="nonexistent") + assert "error" in result + assert "not found" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_analyze_errors_no_errors(mock_pipeline): + """Test analyze_errors when no errors are found.""" + with patch('dlt_mcp.tools.error_analysis.find_pipeline', return_value=mock_pipeline): + with patch('pathlib.Path.exists', return_value=False): + result = await analyze_errors(pipeline_name="test_pipeline") + assert "status" in result + assert result["status"] == "no_errors_found" + + +@pytest.mark.asyncio +async def test_analyze_errors_with_errors(mock_pipeline): + """Test analyze_errors when errors are found.""" + # Create a mock error file + mock_error_file = Mock() + mock_error_file.read_text.return_value = "Test error message" + mock_error_file.name = "test_error.log" + + with patch('dlt_mcp.tools.error_analysis.find_pipeline', return_value=mock_pipeline): + with patch('pathlib.Path.exists', return_value=True): + with patch('pathlib.Path.rglob', return_value=[mock_error_file]): + result = await analyze_errors(pipeline_name="test_pipeline") + assert "errors" in result + assert result["error_count"] > 0 + + +@pytest.mark.asyncio +async def test_analyze_errors_suggest_fix(): + """Test error fix suggestion generation.""" + from dlt_mcp.tools.error_analysis import _suggest_fix + + # Test connection error + suggestion = _suggest_fix("ConnectionError", "Connection failed") + assert "connection" in suggestion.lower() or "credentials" in suggestion.lower() + + # Test type error + suggestion = _suggest_fix("TypeError", "Cannot cast to int") + assert "type" in suggestion.lower() or "data" in suggestion.lower() + diff --git a/mcp-servers/dlt-mcp/tests/test_pipeline_inspect.py b/mcp-servers/dlt-mcp/tests/test_pipeline_inspect.py new file mode 100644 index 0000000000..8e38b13d53 --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/test_pipeline_inspect.py @@ -0,0 +1,37 @@ +"""Tests for pipeline inspection tool.""" + +import pytest +from unittest.mock import patch, Mock +from dlt_mcp.tools.pipeline_inspect import inspect_pipeline + + +@pytest.mark.asyncio +async def test_inspect_pipeline_not_found(): + """Test inspect_pipeline when pipeline is not found.""" + with patch('dlt_mcp.tools.pipeline_inspect.find_pipeline', return_value=None): + result = await inspect_pipeline(pipeline_name="nonexistent") + assert "error" in result + assert "not found" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_inspect_pipeline_success(mock_pipeline): + """Test successful pipeline inspection.""" + with patch('dlt_mcp.tools.pipeline_inspect.find_pipeline', return_value=mock_pipeline): + with patch('pathlib.Path.exists', return_value=True): + with patch('pathlib.Path.rglob', return_value=[]): + result = await inspect_pipeline(pipeline_name="test_pipeline") + assert "pipeline" in result + assert result["pipeline"]["name"] == "test_pipeline" + + +@pytest.mark.asyncio +async def test_inspect_pipeline_with_loads(mock_pipeline_with_loads): + """Test pipeline inspection with load information.""" + with patch('dlt_mcp.tools.pipeline_inspect.find_pipeline', return_value=mock_pipeline_with_loads): + with patch('pathlib.Path.exists', return_value=True): + with patch('pathlib.Path.rglob', return_value=[]): + result = await inspect_pipeline(pipeline_name="test_pipeline") + assert "pipeline" in result + assert "loads" in result + diff --git a/mcp-servers/dlt-mcp/tests/test_query_data.py b/mcp-servers/dlt-mcp/tests/test_query_data.py new file mode 100644 index 0000000000..afcbecc534 --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/test_query_data.py @@ -0,0 +1,60 @@ +"""Tests for query destination tool.""" + +import pytest +from unittest.mock import patch, Mock +from dlt_mcp.tools.query_data import query_destination + + +@pytest.mark.asyncio +async def test_query_destination_no_query(): + """Test query_destination without query parameter.""" + result = await query_destination() + assert "error" in result + assert "required" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_query_destination_invalid_query(): + """Test query_destination with invalid (non-SELECT) query.""" + result = await query_destination(query="DELETE FROM users") + assert "error" in result + assert "forbidden" in result["error"].lower() or "SELECT" in result["error"] + + +@pytest.mark.asyncio +async def test_query_destination_pipeline_not_found(): + """Test query_destination when pipeline is not found.""" + with patch('dlt_mcp.tools.query_data.find_pipeline', return_value=None): + result = await query_destination(query="SELECT * FROM users") + assert "error" in result + assert "not found" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_query_destination_success(mock_pipeline): + """Test successful query execution.""" + mock_conn = Mock() + mock_result = { + "columns": ["id", "name"], + "rows": [{"id": 1, "name": "Test"}], + "row_count": 1 + } + + with patch('dlt_mcp.tools.query_data.find_pipeline', return_value=mock_pipeline): + with patch('dlt_mcp.tools.query_data.get_destination_connection', return_value=mock_conn): + with patch('dlt_mcp.tools.query_data.execute_query', return_value=mock_result): + result = await query_destination(query="SELECT * FROM users", limit=10) + assert "columns" in result + assert "rows" in result + assert result["row_count"] == 1 + + +@pytest.mark.asyncio +async def test_query_destination_connection_failure(mock_pipeline): + """Test query_destination when connection fails.""" + with patch('dlt_mcp.tools.query_data.find_pipeline', return_value=mock_pipeline): + with patch('dlt_mcp.tools.query_data.get_destination_connection', return_value=None): + result = await query_destination(query="SELECT * FROM users") + assert "error" in result + assert "connect" in result["error"].lower() + diff --git a/mcp-servers/dlt-mcp/tests/test_schema_metadata.py b/mcp-servers/dlt-mcp/tests/test_schema_metadata.py new file mode 100644 index 0000000000..4170b77652 --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/test_schema_metadata.py @@ -0,0 +1,49 @@ +"""Tests for schema metadata tool.""" + +import pytest +from unittest.mock import patch +from dlt_mcp.tools.schema_metadata import get_schema + + +@pytest.mark.asyncio +async def test_get_schema_not_found(): + """Test get_schema when pipeline is not found.""" + with patch('dlt_mcp.tools.schema_metadata.find_pipeline', return_value=None): + result = await get_schema(pipeline_name="nonexistent") + assert "error" in result + assert "not found" in result["error"].lower() + + +@pytest.mark.asyncio +async def test_get_schema_success(mock_pipeline): + """Test successful schema retrieval.""" + with patch('dlt_mcp.tools.schema_metadata.find_pipeline', return_value=mock_pipeline): + result = await get_schema(pipeline_name="test_pipeline") + assert "tables" in result + assert len(result["tables"]) > 0 + assert result["tables"][0]["name"] == "users" + + +@pytest.mark.asyncio +async def test_get_schema_specific_table(mock_pipeline): + """Test schema retrieval for a specific table.""" + with patch('dlt_mcp.tools.schema_metadata.find_pipeline', return_value=mock_pipeline): + result = await get_schema(pipeline_name="test_pipeline", table_name="users") + assert "tables" in result + assert len(result["tables"]) == 1 + assert result["tables"][0]["name"] == "users" + + +@pytest.mark.asyncio +async def test_get_schema_with_hints(mock_pipeline): + """Test schema retrieval with hints included.""" + with patch('dlt_mcp.tools.schema_metadata.find_pipeline', return_value=mock_pipeline): + result = await get_schema(pipeline_name="test_pipeline", include_hints=True) + assert "tables" in result + # Check that hints are included in column definitions + if result["tables"]: + columns = result["tables"][0].get("columns", []) + if columns: + # Hints may be None if not present, which is valid + assert "hints" in columns[0] or columns[0].get("hints") is None + diff --git a/mcp-servers/dlt-mcp/tests/test_validation.py b/mcp-servers/dlt-mcp/tests/test_validation.py new file mode 100644 index 0000000000..fd52b68573 --- /dev/null +++ b/mcp-servers/dlt-mcp/tests/test_validation.py @@ -0,0 +1,48 @@ +"""Tests for validation utilities.""" + +import pytest +from dlt_mcp.utils.validation import validate_pipeline_name, validate_working_dir +from pathlib import Path +import tempfile +import os + + +def test_validate_pipeline_name_valid(): + """Test validation of valid pipeline names.""" + assert validate_pipeline_name("my_pipeline") == "my_pipeline" + assert validate_pipeline_name("pipeline-123") == "pipeline-123" + assert validate_pipeline_name(" valid_name ") == "valid_name" + + +def test_validate_pipeline_name_invalid(): + """Test validation of invalid pipeline names.""" + assert validate_pipeline_name("") is None + assert validate_pipeline_name("path/to/pipeline") is None + assert validate_pipeline_name("path\\to\\pipeline") is None + assert validate_pipeline_name(None) is None + + +def test_validate_working_dir_valid(): + """Test validation of valid working directories.""" + with tempfile.TemporaryDirectory() as temp_dir: + result = validate_working_dir(temp_dir) + assert result == str(Path(temp_dir).resolve()) + + +def test_validate_working_dir_invalid(): + """Test validation of invalid working directories.""" + assert validate_working_dir("/nonexistent/path") is None + assert validate_working_dir(None) is None + # Create a file, not a directory + with tempfile.NamedTemporaryFile(delete=False) as temp_file: + temp_file_path = temp_file.name + temp_file.close() # Close the file handle on Windows + try: + assert validate_working_dir(temp_file_path) is None + finally: + try: + os.unlink(temp_file_path) + except (OSError, PermissionError): + # On Windows, file might still be locked, ignore + pass +