Source code for datamasque.client.runs

import logging
import re

from datamasque.client.base import BaseClient
from datamasque.client.exceptions import (
    FailedToStartError,
    InvalidLibraryError,
    InvalidRulesetError,
    RunNotCancellableError,
)
from datamasque.client.models.runs import MaskingRunRequest, RunId, RunInfo, UnfinishedRun
from datamasque.client.models.status import MaskingRunStatus

logger = logging.getLogger(__name__)


[docs] class RunClient(BaseClient): """Masking-run and run-report API methods. Mixed into `DataMasqueClient`."""
[docs] def get_run_log(self, run_id: RunId) -> str: """Returns the full log output of the specified run.""" response = self.make_request("GET", f"api/runs/{run_id}/log/") return response.text
[docs] def get_sdd_report(self, run_id: RunId) -> str: """Returns the sensitive-data-discovery report generated by the specified run.""" response = self.make_request("GET", f"api/runs/{run_id}/sdd-report/") return response.text
[docs] def get_run_report(self, run_id: RunId) -> str: """ Retrieves the run report for the specified run. Args: run_id: The ID of the run Returns: str: The run report content """ response = self.make_request("GET", f"api/runs/{run_id}/run-report/") return response.text
[docs] def get_db_discovery_result_report(self, run_id: RunId, include_selection_column: bool = True) -> str: """ Returns the database-discovery result report for the specified run as CSV. When `include_selection_column` is true (the default), the CSV includes a `selected` column suitable for feeding back into ruleset generation. """ url = f"api/runs/{run_id}/db-discovery-results/report/" params = None if include_selection_column else {"include_selection_column": "false"} response = self.make_request("GET", url, params=params) return response.text
[docs] def get_unfinished_runs(self) -> dict[str, UnfinishedRun]: """Queries the DM instance for unfinished runs, and returns them organised by connection name.""" unfinished_runs = {} for status in ( MaskingRunStatus.queued, MaskingRunStatus.running, MaskingRunStatus.validating, MaskingRunStatus.cancelling, ): response = self.make_request( "GET", "api/runs/", params={ "connection_ruleset_name": "", "ruleset_name": "", "run_status": status.value, "limit": 1, "offset": 0, }, ) data = response.json() for run in data.get("results", []): unfinished_run = UnfinishedRun.model_validate(run) unfinished_runs[unfinished_run.source_connection.name] = unfinished_run if unfinished_run.destination_connection is not None: unfinished_runs[unfinished_run.destination_connection.name] = unfinished_run return unfinished_runs
[docs] def start_masking_run(self, run_info: MaskingRunRequest) -> RunId: """ Starts a masking run with the given configuration and returns its run ID. Args: run_info: A `MaskingRunRequest` describing the run configuration. Raises: InvalidRulesetError: the run failed to start because the ruleset is invalid. InvalidLibraryError: the run failed to start because a referenced library is invalid. FailedToStartError: the run failed to start for any other reason. """ data = run_info.model_dump(exclude_none=True, mode="json") response = self.make_request("POST", "/api/runs/", data=data, require_status_check=False) run_data = response.json() if response.content else {} if response.status_code == 201: logger.info( "Run %s started successfully using ruleset %s", run_data["id"], run_data["name"], ) return RunId(run_data["id"]) if isinstance(run_data, dict) and "ruleset" in run_data: logger.error("Run failed to start: %s", run_data) try: errors = run_data["ruleset"][0] except (TypeError, IndexError, KeyError): pass # fall through to generic FailedToStartError below else: if errors.lower().startswith("cannot start run"): # Attempt to parse the library name out from a string like: # `Library "abc" is invalid.` # Trailing space is deliberate # to match the end of the library name and the start of the error description that follows. if matches := re.search(r'library "(.*)" ', errors, flags=re.IGNORECASE): raise InvalidLibraryError( f'Run failed to start due to invalid library named "{matches.group(1)}".', response=response, ) elif "library" in errors.lower(): raise InvalidLibraryError( "Run failed to start due to invalid library.", response=response, ) raise InvalidRulesetError( f'Run failed to start due to invalid ruleset named "{data.get("name")}".', response=response, ) raise FailedToStartError( f'Run failed to start using ruleset named "{data.get("name")}" ' f"(server responded with status {response.status_code}: {response.text}).", response=response, )
[docs] def get_run_info(self, run_id: RunId) -> RunInfo: """Returns the full run record for the specified run ID.""" response = self.make_request("GET", f"/api/runs/{run_id}/") return RunInfo.model_validate(response.json())
[docs] def cancel_run(self, run_id: RunId) -> RunInfo: """ Requests cancellation of the specified run and returns the updated run record. On success the run transitions to the `cancelling` status; callers can poll `get_run_info` to observe the final `cancelled` status. Args: run_id: The ID of the run to cancel. Returns: The updated `RunInfo` for the run, with `status` set to `cancelling`. Raises: RunNotCancellableError: the run is not in a state that can be cancelled (typically because it is already finished, failed, or cancelling). DataMasqueApiError: any other non-2xx response. """ response = self.make_request( "POST", f"/api/runs/{run_id}/cancel/", require_status_check=False, ) if response.status_code == 400: # The admin server returns 400 with a `RunLifecycleError` payload # when the run cannot be cancelled in its current state. raise RunNotCancellableError(f"Run {run_id} cannot be cancelled in its current state.") self._raise_for_status(response) return RunInfo.model_validate(response.json())