import logging
import re
from datamasque.client.base import BaseClient
from datamasque.client.exceptions import (
FailedToStartError,
InvalidLibraryError,
InvalidRulesetError,
RunNotCancellableError,
)
from datamasque.client.models.runs import MaskingRunRequest, RunId, RunInfo, UnfinishedRun
from datamasque.client.models.status import MaskingRunStatus
logger = logging.getLogger(__name__)
[docs]
class RunClient(BaseClient):
"""Masking-run and run-report API methods. Mixed into `DataMasqueClient`."""
[docs]
def get_run_log(self, run_id: RunId) -> str:
"""Returns the full log output of the specified run."""
response = self.make_request("GET", f"api/runs/{run_id}/log/")
return response.text
[docs]
def get_sdd_report(self, run_id: RunId) -> str:
"""Returns the sensitive-data-discovery report generated by the specified run."""
response = self.make_request("GET", f"api/runs/{run_id}/sdd-report/")
return response.text
[docs]
def get_run_report(self, run_id: RunId) -> str:
"""
Retrieves the run report for the specified run.
Args:
run_id: The ID of the run
Returns:
str: The run report content
"""
response = self.make_request("GET", f"api/runs/{run_id}/run-report/")
return response.text
[docs]
def get_db_discovery_result_report(self, run_id: RunId, include_selection_column: bool = True) -> str:
"""
Returns the database-discovery result report for the specified run as CSV.
When `include_selection_column` is true (the default),
the CSV includes a `selected` column suitable for feeding back into ruleset generation.
"""
url = f"api/runs/{run_id}/db-discovery-results/report/"
params = None if include_selection_column else {"include_selection_column": "false"}
response = self.make_request("GET", url, params=params)
return response.text
[docs]
def get_unfinished_runs(self) -> dict[str, UnfinishedRun]:
"""Queries the DM instance for unfinished runs, and returns them organised by connection name."""
unfinished_runs = {}
for status in (
MaskingRunStatus.queued,
MaskingRunStatus.running,
MaskingRunStatus.validating,
MaskingRunStatus.cancelling,
):
response = self.make_request(
"GET",
"api/runs/",
params={
"connection_ruleset_name": "",
"ruleset_name": "",
"run_status": status.value,
"limit": 1,
"offset": 0,
},
)
data = response.json()
for run in data.get("results", []):
unfinished_run = UnfinishedRun.model_validate(run)
unfinished_runs[unfinished_run.source_connection.name] = unfinished_run
if unfinished_run.destination_connection is not None:
unfinished_runs[unfinished_run.destination_connection.name] = unfinished_run
return unfinished_runs
[docs]
def start_masking_run(self, run_info: MaskingRunRequest) -> RunId:
"""
Starts a masking run with the given configuration and returns its run ID.
Args:
run_info: A `MaskingRunRequest` describing the run configuration.
Raises:
InvalidRulesetError: the run failed to start because the ruleset is invalid.
InvalidLibraryError: the run failed to start because a referenced library is invalid.
FailedToStartError: the run failed to start for any other reason.
"""
data = run_info.model_dump(exclude_none=True, mode="json")
response = self.make_request("POST", "/api/runs/", data=data, require_status_check=False)
run_data = response.json() if response.content else {}
if response.status_code == 201:
logger.info(
"Run %s started successfully using ruleset %s",
run_data["id"],
run_data["name"],
)
return RunId(run_data["id"])
if isinstance(run_data, dict) and "ruleset" in run_data:
logger.error("Run failed to start: %s", run_data)
try:
errors = run_data["ruleset"][0]
except (TypeError, IndexError, KeyError):
pass # fall through to generic FailedToStartError below
else:
if errors.lower().startswith("cannot start run"):
# Attempt to parse the library name out from a string like:
# `Library "abc" is invalid.`
# Trailing space is deliberate
# to match the end of the library name and the start of the error description that follows.
if matches := re.search(r'library "(.*)" ', errors, flags=re.IGNORECASE):
raise InvalidLibraryError(
f'Run failed to start due to invalid library named "{matches.group(1)}".',
response=response,
)
elif "library" in errors.lower():
raise InvalidLibraryError(
"Run failed to start due to invalid library.",
response=response,
)
raise InvalidRulesetError(
f'Run failed to start due to invalid ruleset named "{data.get("name")}".',
response=response,
)
raise FailedToStartError(
f'Run failed to start using ruleset named "{data.get("name")}" '
f"(server responded with status {response.status_code}: {response.text}).",
response=response,
)
[docs]
def get_run_info(self, run_id: RunId) -> RunInfo:
"""Returns the full run record for the specified run ID."""
response = self.make_request("GET", f"/api/runs/{run_id}/")
return RunInfo.model_validate(response.json())
[docs]
def cancel_run(self, run_id: RunId) -> RunInfo:
"""
Requests cancellation of the specified run and returns the updated run record.
On success the run transitions to the `cancelling` status;
callers can poll `get_run_info` to observe the final `cancelled` status.
Args:
run_id: The ID of the run to cancel.
Returns:
The updated `RunInfo` for the run,
with `status` set to `cancelling`.
Raises:
RunNotCancellableError: the run is not in a state that can be cancelled
(typically because it is already finished, failed, or cancelling).
DataMasqueApiError: any other non-2xx response.
"""
response = self.make_request(
"POST",
f"/api/runs/{run_id}/cancel/",
require_status_check=False,
)
if response.status_code == 400:
# The admin server returns 400 with a `RunLifecycleError` payload
# when the run cannot be cancelled in its current state.
raise RunNotCancellableError(f"Run {run_id} cannot be cancelled in its current state.")
self._raise_for_status(response)
return RunInfo.model_validate(response.json())