Source code for sktime_mcp.tools.job_tools

"""
MCP tools for job management.

Provides tools for checking job status, listing jobs, and cancelling jobs.
"""

import logging
from typing import Any

from sktime_mcp.runtime.jobs import JobStatus, get_job_manager

logger = logging.getLogger(__name__)


[docs] def check_job_status_tool(job_id: str) -> dict[str, Any]: """ Check the status of a background job. Args: job_id: Job ID to check Returns: Dictionary with job status and progress information """ job_manager = get_job_manager() job = job_manager.get_job(job_id) if job is None: return { "success": False, "error": f"Job '{job_id}' not found", } return { "success": True, **job.to_dict(), }
[docs] def list_jobs_tool( status: str | None = None, limit: int = 20, ) -> dict[str, Any]: """ List all background jobs. Args: status: Filter by status (pending, running, completed, failed, cancelled) limit: Maximum number of jobs to return Returns: Dictionary with list of jobs """ job_manager = get_job_manager() # Convert status string to enum status_filter = None if status is not None: if not isinstance(status, str): return { "success": False, "error": ( f"Invalid status type '{type(status).__name__}'. " "Expected one of: pending, running, completed, failed, cancelled" ), } try: status_filter = JobStatus(status.lower()) except ValueError: return { "success": False, "error": f"Invalid status '{status}'. Valid values: pending, running, completed, failed, cancelled", } if limit < 1: return { "success": False, "error": "limit must be a positive integer.", } jobs = job_manager.list_jobs(status=status_filter, limit=limit) return { "success": True, "count": len(jobs), "jobs": [job.to_dict() for job in jobs], }
[docs] def cancel_job_tool(job_id: str, delete: bool = False) -> dict[str, Any]: """ Cancel a running/pending job, and optionally remove its record. Args: job_id: Job ID to cancel delete: Also remove the job record after cancelling (default: False). For jobs that are already completed/failed, set delete=True to remove them. Returns: Dictionary with success status and message """ job_manager = get_job_manager() job = job_manager.get_job(job_id) if job is None: return {"success": False, "error": f"Job '{job_id}' not found"} if job.status in (JobStatus.PENDING, JobStatus.RUNNING): job_manager.cancel_job(job_id) msg = f"Job '{job_id}' cancelled" if delete: job_manager.delete_job(job_id) msg += " and removed" return {"success": True, "message": msg} if delete: job_manager.delete_job(job_id) return {"success": True, "message": f"Job '{job_id}' removed"} return { "success": False, "error": (f"Job is already '{job.status.value}'. Use delete=true to remove the record."), }
def cleanup_old_jobs_tool(max_age_hours: int = 24) -> dict[str, Any]: """ Clean up old jobs. Args: max_age_hours: Maximum age in hours (default: 24) Returns: Dictionary with number of jobs removed """ job_manager = get_job_manager() count = job_manager.cleanup_old_jobs(max_age_hours) return { "success": True, "message": f"Removed {count} old job(s)", "count": count, }