Source code for sktime_mcp.tools.data_tools

"""
Data loading tools for sktime MCP.

Provides tools for loading data from various sources.
"""

import logging
from typing import Any

from sktime_mcp.runtime.executor import get_executor

logger = logging.getLogger(__name__)


[docs] def load_data_source_tool(config: dict[str, Any]) -> dict[str, Any]: """ Load data from any source (pandas, SQL, file, etc.). Args: config: Data source configuration { "type": "pandas" | "sql" | "file" | "url", ... (type-specific configuration) } Returns: Dictionary with: - success: bool - data_handle: str (handle ID for the loaded data) - metadata: dict (information about the data) - validation: dict (validation results) Examples: # Pandas DataFrame >>> load_data_source_tool({ ... "type": "pandas", ... "data": {"date": [...], "value": [...]}, ... "time_column": "date", ... "target_column": "value" ... }) # SQL Database >>> load_data_source_tool({ ... "type": "sql", ... "connection_string": "postgresql://user:pass@host:5432/db", ... "query": "SELECT date, value FROM sales", ... "time_column": "date", ... "target_column": "value" ... }) # CSV File >>> load_data_source_tool({ ... "type": "file", ... "path": "/path/to/data.csv", ... "time_column": "date", ... "target_column": "value" ... }) """ executor = get_executor() return executor.load_data_source(config)
def list_data_sources_tool() -> dict[str, Any]: """ List all available data source types. Returns: Dictionary with: - success: bool - sources: list of available source types - descriptions: dict with descriptions for each source type """ from sktime_mcp.data import DataSourceRegistry sources = DataSourceRegistry.list_adapters() # Get descriptions for each source descriptions = {} for source_type in sources: info = DataSourceRegistry.get_adapter_info(source_type) descriptions[source_type] = { "class": info["class"], "description": info["docstring"].split("\n")[0] if info["docstring"] else "", } return { "success": True, "sources": sources, "descriptions": descriptions, }
[docs] def release_data_handle_tool(data_handle: str) -> dict[str, Any]: """ Release a data handle and free memory. Args: data_handle: Data handle to release Returns: Dictionary with success status """ executor = get_executor() return executor.release_data_handle(data_handle)
[docs] def load_data_source_async_tool( config: dict[str, Any], ) -> dict[str, Any]: """ Load data from any source in the background (non-blocking). Schedules the data loading as a background job and returns immediately with a job_id. Use check_job_status to monitor progress and retrieve the data_handle when done. Args: config: Data source configuration (same as load_data_source) Returns: Dictionary with: - success: bool - job_id: Job ID for tracking progress - message: Information about the job Example: >>> load_data_source_async_tool({ ... "type": "file", ... "path": "/path/to/large_data.csv", ... "time_column": "date", ... "target_column": "value" ... }) { "success": True, "job_id": "abc-123-def-456", "message": "Data loading job started..." } """ import asyncio from sktime_mcp.runtime.jobs import get_job_manager executor = get_executor() job_manager = get_job_manager() source_type = config.get("type", "unknown") # create a background job for data loading job_id = job_manager.create_job( job_type="data_loading", estimator_handle="", dataset_name=source_type, total_steps=3, # load, validate, format ) # schedule on event loop try: loop = asyncio.get_event_loop() except RuntimeError: loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) coro = executor.load_data_source_async(config, job_id) asyncio.run_coroutine_threadsafe(coro, loop) return { "success": True, "job_id": job_id, "message": ( f"Data loading job started for source type '{source_type}'. " f"Use check_job_status('{job_id}') to monitor progress." ), "source_type": source_type, }