Source code for datamasque.client.models.runs
"""Typed request and response shapes for run-related API endpoints."""
import enum
from datetime import datetime
from typing import Any, NewType, Optional, Union
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
from datamasque.client.models.connection import ConnectionConfig, ConnectionId, unwrap_connection_id
from datamasque.client.models.ruleset import Ruleset, RulesetId, unwrap_ruleset_id
from datamasque.client.models.status import MaskingRunStatus
RunId = NewType("RunId", int)
[docs]
class MaskType(enum.Enum):
"""Type of a masking run."""
database = "database" # Also used for schema discovery.
file = "file"
file_data_discovery = "file_data_discovery"
[docs]
class MaskingRunOptions(BaseModel):
"""
Optional run-time overrides for `MaskingRunRequest.options`.
All fields optional; server applies defaults when omitted.
`run_secret`,
if supplied,
must be 16–256 characters and is used as the per-run encryption key;
the server auto-generates one when omitted.
"""
model_config = ConfigDict(extra="forbid")
batch_size: Optional[int] = None
dry_run: Optional[bool] = None
continue_on_failure: Optional[bool] = None
max_rows: Optional[int] = None
diagnostic_logging: Optional[bool] = None
run_secret: Optional[str] = Field(default=None, min_length=16, max_length=256)
disable_instance_secret: Optional[bool] = None
[docs]
class MaskingRunRequest(BaseModel):
"""
Request body for `POST /api/runs/`.
`connection`, `destination_connection`, and `ruleset` accept either the server-assigned ID
or the corresponding object returned by an earlier client call (e.g. a `ConnectionConfig`
or `Ruleset`); the object's `id` is extracted at construction time.
"""
model_config = ConfigDict(extra="forbid")
connection: Union[ConnectionId, ConnectionConfig]
ruleset: Union[RulesetId, Ruleset]
mask_type: MaskType = MaskType.database
destination_connection: Optional[Union[ConnectionId, ConnectionConfig]] = None
options: MaskingRunOptions = Field(default_factory=MaskingRunOptions)
name: Optional[str] = None
@field_validator("connection", "destination_connection", mode="before")
@classmethod
def _unwrap_connection(cls, value: Any) -> Any:
return unwrap_connection_id(value)
@field_validator("ruleset", mode="before")
@classmethod
def _unwrap_ruleset(cls, value: Any) -> Any:
return unwrap_ruleset_id(value)
[docs]
class RunConnectionRef(BaseModel):
"""A reference to a connection used in a run — just the ID and display name."""
model_config = ConfigDict(extra="allow")
id: Optional[ConnectionId] = None
name: str
def _collapse_flat_connection_fields(data: Any) -> Any:
"""
Collapse flat `*_connection` + `*_connection_name` pairs into nested `RunConnectionRef`s.
The admin server sends connections as two parallel fields
(`source_connection` holding the ID and `source_connection_name` holding the display name);
the client surfaces them as a single nested object.
Leaves the input alone if the fields are already in nested form
(i.e. the caller constructed the model directly).
"""
if not isinstance(data, dict):
return data
data = dict(data)
if "source_connection_name" in data and not isinstance(data.get("source_connection"), dict):
data["source_connection"] = {
"id": data.pop("source_connection", None),
"name": data.pop("source_connection_name"),
}
dest_name = data.get("destination_connection_name")
if dest_name and not isinstance(data.get("destination_connection"), dict):
data["destination_connection"] = {
"id": data.pop("destination_connection", None),
"name": data.pop("destination_connection_name"),
}
elif "destination_connection_name" in data:
# Empty string or None — let the Optional default apply.
data.pop("destination_connection_name", None)
data.pop("destination_connection", None)
return data
[docs]
class RunInfo(BaseModel):
"""Full record for a masking run."""
model_config = ConfigDict(extra="allow")
id: int
status: MaskingRunStatus
mask_type: MaskType
source_connection: RunConnectionRef
ruleset_name: str
name: Optional[str] = None
destination_connection: Optional[RunConnectionRef] = None
ruleset: Optional[RulesetId] = None
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
options: Optional[dict[str, Any]] = None
@model_validator(mode="before")
@classmethod
def _collapse_connection_fields(cls, data: Any) -> Any:
return _collapse_flat_connection_fields(data)
[docs]
class UnfinishedRun(BaseModel):
"""Represents a masking run that is queued, running, validating, or cancelling."""
model_config = ConfigDict(extra="allow")
id: int
source_connection: RunConnectionRef
ruleset_name: str
status: MaskingRunStatus
destination_connection: Optional[RunConnectionRef] = None
@model_validator(mode="before")
@classmethod
def _collapse_connection_fields(cls, data: Any) -> Any:
return _collapse_flat_connection_fields(data)
def __str__(self) -> str:
if self.destination_connection is not None:
connection_part = f'"{self.source_connection.name}", "{self.destination_connection.name}"'
else:
connection_part = f'"{self.source_connection.name}"'
return f'{connection_part}: Run ID {self.id} in status `{self.status.value}`, ruleset "{self.ruleset_name}"'