Plan: Migrate connection_graph.py to NormalizedNativeWorkflow
Goal
Replace raw dict manipulation in connection_graph.py with gxformat2’s NormalizedNativeWorkflow / NormalizedNativeStep Pydantic models. This is the biggest remaining consumer of raw step_def.get(...) patterns in the workflow_state package.
Context
- Branch:
wf_tool_state - Worktree:
/Users/jxc755/projects/worktrees/galaxy/branch/wf_tool_state - File:
lib/galaxy/tool_util/workflow_state/connection_graph.py - Tests:
test/unit/tool_util/workflow_state/(run withsource .venv/bin/activate && PYTHONPATH=lib python -m pytest test/unit/tool_util/workflow_state/ -x -q) - IWC sweep tests: Set
GALAXY_TEST_IWC_DIRECTORYto run against real IWC workflows
Prior art
Recent commits migrated validation_native.py and convert.py to a StepLike = Union[NormalizedNativeStep, NativeStepDict] pattern. The shared helpers live in _util.py:
StepLiketype aliasstep_tool_id(),step_tool_version(),step_tool_state(),step_input_connections(),step_as_dict()decode_double_encoded_values()
What NormalizedNativeStep provides
class NormalizedNativeStep(BaseModel):
id: int
type_: NativeStepType = Field(alias="type") # enum with .tool, .subworkflow, .data_input, etc.
tool_id: str | None
tool_version: str | None
tool_state: dict[str, Any] # guaranteed parsed dict, never JSON string
input_connections: dict[str, list[NativeInputConnection]] # always-list (normalized)
inputs: list[NativeStepInput]
outputs: list[NativeStepOutput]
workflow_outputs: list[NativeWorkflowOutput]
subworkflow: NormalizedNativeWorkflow | None
when: str | None
# ... other fields
# Properties: is_tool_step, is_subworkflow_step, is_input_step, is_pause_step
NativeInputConnection has typed fields: .id, .output_name, .input_subworkflow_step_id.
NativeWorkflowOutput has typed fields: .output_name, .label.
What changes
Dict access → model property mapping
| Current raw dict access | Model equivalent |
|---|---|
step_def.get("type", "tool") | step.type_ (NativeStepType enum, compare with NativeStepType.tool, .subworkflow, etc.) |
step_def.get("tool_id") | step.tool_id |
step_def.get("tool_version") | step.tool_version |
step_def.get("tool_state") + json.loads | step.tool_state (already parsed dict) |
step_def.get("input_connections", {}) | step.input_connections (already dict with always-list values) |
step_def.get("subworkflow") | step.subworkflow (typed `NormalizedNativeWorkflow |
step_def.get("when") | step.when |
step_def.get("workflow_outputs", []) | step.workflow_outputs (typed list) |
isinstance(conn, dict) and "id" in conn | conn.id (typed NativeInputConnection) |
conn.get("output_name", "output") | conn.output_name |
conn.get("input_subworkflow_step_id") | conn.input_subworkflow_step_id |
wo.get("output_name") | wo.output_name |
wo.get("label") | wo.label |
Functions to change
-
build_workflow_graph()— change signature fromworkflow_dict: dicttoworkflow: Union[NormalizedNativeWorkflow, dict]. Normalize on entry withgxformat2.normalized.normalized_native()if given a dict. -
_get_tool_state()— delete entirely. Model guaranteesstep.tool_stateis a parsed dict. Callers just usestep.tool_statedirectly. -
_parse_connections()— simplify heavily. The model’sinput_connectionsis alreadydict[str, list[NativeInputConnection]](always-list). No need forif not isinstance(conn_info, list): conn_info = [conn_info]. Just mapNativeInputConnection→ConnectionRef. -
_resolve_input_step()— changestep_def: NativeStepDicttostep: NormalizedNativeStep. Usestep.type_,step.tool_state. -
_resolve_tool_step()— same pattern. Usestep.tool_id,step.tool_version,step.tool_state,step.when. -
_resolve_subworkflow_step()— usestep.subworkflow(already typedNormalizedNativeWorkflow | None). Theisinstance(subworkflow, dict)check becomesstep.subworkflow is not None. Recursive call tobuild_workflow_graphpasses the model directly. -
_build_subworkflow_output_map()— change to acceptNormalizedNativeWorkflow. Useworkflow.steps(typed dict ofNormalizedNativeStep),step.workflow_outputs(typed list ofNativeWorkflowOutput).
Caller: connection_validation.py
connection_validation.py calls build_workflow_graph(workflow_dict, get_tool_info) on lines 102 and 112 with raw dicts. After migration, the Union signature means these calls still work. No changes needed in connection_validation.py if the Union approach is used.
Steps
- Add
NormalizedNativeWorkflow,NormalizedNativeStepimports andnormalized_nativefactory to connection_graph.py - Change
build_workflow_graphto acceptUnion[NormalizedNativeWorkflow, dict], normalize on entry - Update the step loop to iterate over
NormalizedNativeStepobjects - Rewrite
_resolve_input_step,_resolve_tool_step,_resolve_subworkflow_stepto acceptNormalizedNativeStep - Simplify
_parse_connectionsto mapNativeInputConnection→ConnectionRef - Rewrite
_build_subworkflow_output_mapto use typed workflow/step models - Delete
_get_tool_state - Remove
import jsonif no longer needed (check_collect_inputswhich also does json.loads for container states — these may still need it sincetool_statevalues can still be double-encoded strings) - Run tests:
source .venv/bin/activate && PYTHONPATH=lib python -m pytest test/unit/tool_util/workflow_state/ -x -q
Important note on _collect_inputs
_collect_inputs (line 326) walks the tool_state for conditionals/repeats/sections and does json.loads on string values inside the state dict. Even though the outer tool_state is guaranteed to be a parsed dict by the model, inner values like conditional states and repeat instances can still be JSON-encoded strings (the double-encoding pattern). This is the same issue decode_double_encoded_values in _util.py handles. Consider using that helper on step.tool_state before passing it to _collect_inputs, which would let you remove the json.loads calls inside _collect_inputs.
Testing
- All 230 existing unit tests must pass
- If
GALAXY_TEST_IWC_DIRECTORYis set, the sweep tests (480 parametrized) should also pass - Connection validation tests are in
test/unit/tool_util/workflow_state/test_connection_validation.py
Unresolved questions
- Should
NativeStepTypeenum values be imported or compared as strings? Check what the enum exposes. - Does
_collect_inputsneed refactoring too, or just the outer step-level functions? It works withtool_stateinternals rather than step-level fields, so it may be fine as-is with adecode_double_encoded_valuescall at the top.