Dashboard

Plan Workflow Extraction Vue Conversion Api

API design spec for replacing the legacy Mako workflow extraction UI with a typed FastAPI endpoint and Vue frontend.

Raw
Revised:
2026-02-12
Revision:
1
Parent Plan:
Plan - Workflow Extraction Vue Conversion
Related Issues:
Issue 17506

Workflow Extraction API Design

Executive Summary

This document specifies the API surface for replacing Galaxy’s last non-data-display Mako template (build_from_current_history.mako) with a Vue.js frontend backed by a typed FastAPI endpoint. The scope covers:

  1. A new read endpoint (GET /api/histories/{id}/extraction_summary) returning structured data about jobs and datasets available for extraction
  2. A new typed Pydantic request model for the existing POST /api/workflows extraction path (from_history_id)
  3. All Pydantic response/request models with field-level documentation
  4. Edge case handling for the three job types: real Job, FakeJob, DatasetCollectionCreationJob

The existing Mako flow is a two-phase controller method: phase 1 renders the form (equivalent to the new GET), phase 2 submits the extraction (equivalent to the existing POST). This design preserves that separation while adding type safety and enabling a Vue SPA frontend.


Table of Contents

  1. Current System Analysis
  2. API Design: Extraction Summary Endpoint
  3. API Design: Workflow Creation from History
  4. Pydantic Models: Complete Specification
  5. Data Flow
  6. Edge Cases and Special Types
  7. ID Encoding Strategy
  8. Service Layer Design
  9. Integration with Existing Infrastructure
  10. Performance Considerations
  11. Backwards Compatibility
  12. Unresolved Questions

1. Current System Analysis

1.1 The Mako Controller (WorkflowController.build_from_current_history)

The legacy controller in lib/galaxy/webapps/galaxy/controllers/workflow.py serves dual duty:

Phase 1 (GET - render form):

jobs_dict, warnings = summarize(trans, history)
# Template receives: jobs (dict[Job, list[(output_name, HDA/HDCA)]]), warnings (set[str]), history

Phase 2 (POST - create workflow):

stored_workflow = extract_workflow(trans, user, history, job_ids, dataset_ids, ...)

1.2 The summarize() Function

galaxy.workflow.extract.summarize() returns:

(jobs, warnings) where:
  jobs: dict[Union[Job, FakeJob, DatasetCollectionCreationJob], list[tuple[Optional[str], Union[HDA, HDCA]]]]
  warnings: set[str]

The dict keys are heterogeneous - three distinct types with different attributes:

Typeidis_fakenametool_iddisabled_why
Job (model)int (DB ID)N/AN/AstrN/A
FakeJob"fake_{dataset.id}"True"Import from History" / "Import from Library" / NoneN/AN/A
DatasetCollectionCreationJob"fake_{hdca.id}"True"Dataset Collection Creation"N/A"Dataset collection created in a way not compatible with workflows"

The dict values are lists of (output_name, content) tuples where output_name is None for fake jobs and a string like "out_file1" for real jobs.

1.3 The Mako Template’s Rendering Logic

For each (job, datasets) pair, the template:

  1. Looks up the tool via app.toolbox.get_tool(job.tool_id, job.tool_version) (real jobs only)
  2. Checks tool.is_workflow_compatible to determine if job can be included
  3. Checks tool.version != job.tool_version for version warnings
  4. Renders each dataset with state coloring, hid, name
  5. For incompatible/fake jobs, renders “treat as input” checkboxes with editable name fields
  6. For compatible jobs, renders “include in workflow” checkbox (pre-checked if any output is non-deleted)

1.4 What is_workflow_compatible Means

A tool is NOT workflow compatible when:

  • tool.has_multiple_pages == True
  • tool.tool_type.startswith("data_source") (external data source tools)
  • Tool XML has workflow_compatible="False"

1.5 The Submission Payload (Current)

The Mako form submits to POST /api/workflows (via the controller, eventually routed to the API). Current parameter types:

ParameterSourceType in MakoType at API
from_history_idURL paramencoded strdecoded to int
workflow_nametext inputstrstr
job_idscheckbox valuesencoded str[]decoded to int[]
dataset_idscheckbox valuesint[] (HIDs)int[] (HIDs)
dataset_collection_idscheckbox valuesint[] (HIDs)int[] (HIDs)
dataset_namestext inputsstr[]str[]
dataset_collection_namestext inputsstr[]str[]

Critical distinction: job_ids are encoded database IDs, but dataset_ids/dataset_collection_ids are raw HIDs (history item numbers). This is because extract_steps() works with HIDs to build the step graph via hid_to_output_pair.


2. API Design: Extraction Summary Endpoint

2.1 Endpoint Specification

GET /api/histories/{history_id}/extraction_summary

Rationale for placement on histories router: The data is fundamentally about a history’s contents analyzed for extraction eligibility. The history ID is the primary key. This follows the existing pattern of history sub-resources (e.g., /api/histories/{id}/contents).

Alternative considered: /api/workflows/extraction_summary?history_id=X - rejected because the resource is history-centric data, not workflow-centric.

2.2 Request

ParameterLocationTypeRequiredDescription
history_idpathDecodedDatabaseIdFieldyesEncoded history ID

No query parameters for initial implementation. See Performance Considerations for future pagination.

2.3 Response

HTTP 200 with WorkflowExtractionSummary body. See Section 4.1 for full model.

2.4 Error Responses

StatusConditionBody
403User cannot access history{"err_msg": "Cannot access history {id}", "err_code": 403006}
404History does not exist{"err_msg": "History {id} not found", "err_code": 404001}

2.5 Why Not Reuse Existing Endpoints?

The extraction summary requires a very specific view of history data that no existing endpoint provides:

  • GET /api/histories/{id}/contents returns datasets but not their creating jobs
  • GET /api/jobs returns jobs but not grouped with their outputs
  • Neither provides tool compatibility analysis, version warnings, or the fake-job abstraction

The summarize() function in galaxy.workflow.extract already does exactly this aggregation. The new endpoint wraps it in a typed API response.


3. API Design: Workflow Creation from History

3.1 Current State

POST /api/workflows currently accepts an untyped dict payload. When from_history_id is present, it branches into extraction mode. The parameters are parsed from raw dict access:

if "from_history_id" in payload:
    from_history_id = payload.get("from_history_id")
    job_ids = [self.decode_id(_) for _ in payload.get("job_ids", [])]
    dataset_ids = payload.get("dataset_ids", [])
    ...

3.2 Proposed: Typed Request Model

Add a Pydantic model WorkflowExtractionPayload to formalize the extraction submission. See Section 4.2 for full model.

This model should be used alongside the existing dict-based dispatch in the create method. The endpoint already handles multiple creation modes (from archive, from shared workflow, from path, etc.) - extraction is one branch.

Recommended approach: Create the typed model as documentation and validation, but integrate it into the existing create method’s branching logic rather than creating a separate endpoint. This avoids breaking existing clients.

3.3 Response

The existing response format from POST /api/workflows when using from_history_id:

{
    "id": "abc123def456",
    "name": "Workflow constructed from history 'My History'",
    "create_time": "2026-02-12T10:00:00",
    "update_time": "2026-02-12T10:00:00",
    "published": false,
    "importable": false,
    "deleted": false,
    "hidden": false,
    "latest_workflow_uuid": "550e8400-e29b-41d4-a716-446655440000",
    "url": "/api/workflows/abc123def456"
}

This matches StoredWorkflow.to_dict() with dict_collection_visible_keys plus url and latest_workflow_uuid. No changes needed to this response.


4. Pydantic Models: Complete Specification

4.1 Response Models

File: lib/galaxy/schema/workflow_extraction.py (new)

from enum import Enum
from typing import (
    List,
    Optional,
)

from pydantic import (
    BaseModel,
    Field,
)

from galaxy.schema.fields import EncodedDatabaseIdField


class HistoryContentType(str, Enum):
    """Type discriminator for history items."""
    DATASET = "dataset"
    DATASET_COLLECTION = "dataset_collection"


class ExtractionDatasetState(str, Enum):
    """Subset of DatasetState relevant to extraction display."""
    NEW = "new"
    UPLOAD = "upload"
    QUEUED = "queued"
    RUNNING = "running"
    OK = "ok"
    EMPTY = "empty"
    ERROR = "error"
    PAUSED = "paused"
    SETTING_METADATA = "setting_metadata"
    FAILED_METADATA = "failed_metadata"
    DEFERRED = "deferred"
    DISCARDED = "discarded"


class ExtractionJobType(str, Enum):
    """Discriminator for the three job archetypes in extraction."""
    TOOL = "tool"
    INPUT_DATASET = "input_dataset"
    COLLECTION_CREATION = "collection_creation"

4.1.1 ExtractionOutputDataset

Represents a single dataset or dataset collection produced by a job.

class ExtractionOutputDataset(BaseModel):
    """A history item (dataset or collection) that is an output of a job.

    In the extraction UI, these are displayed in the right column opposite
    their creating job/tool. For fake jobs, these can be marked as workflow
    inputs by the user.
    """
    id: EncodedDatabaseIdField = Field(
        ...,
        title="ID",
        description="Encoded database ID of the HDA or HDCA.",
    )
    hid: int = Field(
        ...,
        title="History Item ID",
        description=(
            "Sequential display number within the history. "
            "Used as the key for dataset_ids/dataset_collection_ids "
            "in the extraction submission payload."
        ),
    )
    name: str = Field(
        ...,
        title="Display Name",
        description="Human-readable name. For HDAs, from datatype.display_name(). For HDCAs, from get_display_name().",
    )
    state: str = Field(
        ...,
        title="State",
        description=(
            "Current processing state. One of: new, upload, queued, running, ok, "
            "empty, error, paused, setting_metadata, failed_metadata, deferred, discarded. "
            "Used for state-based coloring in the UI."
        ),
    )
    deleted: bool = Field(
        ...,
        title="Deleted",
        description="Whether this item has been deleted from the history.",
    )
    history_content_type: HistoryContentType = Field(
        ...,
        title="Content Type",
        description=(
            "Discriminator: 'dataset' for HDA, 'dataset_collection' for HDCA. "
            "Determines which submission field (dataset_ids vs dataset_collection_ids) "
            "this item's HID should be added to when marked as a workflow input."
        ),
    )
    collection_type: Optional[str] = Field(
        None,
        title="Collection Type",
        description=(
            "For dataset_collection items only. The collection structure type "
            "e.g., 'list', 'paired', 'list:paired'. Required by extract_steps() "
            "to create properly typed data_collection_input workflow steps. "
            "None for regular datasets."
        ),
    )

Design note on collection_type: This field is absent from the original plan but is required. When a user marks a collection as a workflow input, extract_steps() needs the collection_type to create a properly typed data_collection_input step. The WorkflowSummary tracks this in self.collection_types[hid]. Without it, the client would need a separate API call to determine collection type. Including it here avoids that round trip.

4.1.2 ExtractionToolInfo

Extracted tool metadata for a job. Separated from the job model for clarity - this represents the tool as resolved by the toolbox at request time, not the tool as it was when the job ran.

class ExtractionToolInfo(BaseModel):
    """Tool metadata resolved from the toolbox for a job's tool_id/tool_version.

    This represents the *current* state of the tool in the toolbox, which may
    differ from the version that originally ran the job. When the versions differ,
    version_warning is populated.
    """
    tool_id: str = Field(
        ...,
        title="Tool ID",
        description="Tool identifier as stored on the job (e.g., 'cat1', 'toolshed.g2.bx.psu.edu/repos/...').",
    )
    tool_version: Optional[str] = Field(
        None,
        title="Job Tool Version",
        description="Tool version string from the job record.",
    )
    tool_name: str = Field(
        ...,
        title="Tool Name",
        description="Human-readable tool name from the toolbox (e.g., 'Concatenate datasets').",
    )
    is_workflow_compatible: bool = Field(
        ...,
        title="Workflow Compatible",
        description=(
            "Whether this tool can be included in workflows. False for: "
            "multi-page tools, data_source tools, tools with workflow_compatible=False XML attribute."
        ),
    )
    version_warning: Optional[str] = Field(
        None,
        title="Version Warning",
        description=(
            "Present when the current toolbox version differs from the version used "
            "to run the job. Format: 'Dataset was created with tool version \"X\", "
            "but workflow extraction will use version \"Y\".'"
        ),
    )

4.1.3 ExtractionJob

The central model representing one row in the extraction table.

class ExtractionJob(BaseModel):
    """A job available for workflow extraction, with its outputs.

    This is the core unit of the extraction UI. Each ExtractionJob corresponds
    to one row in the extraction table: the left column shows the tool/job info,
    the right column shows the output datasets.

    There are three archetypes:
    - TOOL: A real Galaxy Job with a tool_id. Can be included in the workflow
      if the tool is workflow-compatible.
    - INPUT_DATASET: A FakeJob representing a dataset with no creating job
      (uploaded, imported from history/library). Can be marked as a workflow input.
    - COLLECTION_CREATION: A DatasetCollectionCreationJob for collections created
      outside normal tool execution. Can be marked as a workflow input.
    """
    id: str = Field(
        ...,
        title="Job ID",
        description=(
            "For real jobs: encoded database ID. "
            "For fake jobs: string like 'fake_12345' (not an encoded ID). "
            "Used as the value for job_ids[] in the extraction submission. "
            "The heterogeneous format is preserved for compatibility with extract_workflow()."
        ),
    )
    job_type: ExtractionJobType = Field(
        ...,
        title="Job Type",
        description=(
            "Discriminator for the three job archetypes. "
            "'tool' = real Job, 'input_dataset' = FakeJob, "
            "'collection_creation' = DatasetCollectionCreationJob."
        ),
    )
    tool_info: Optional[ExtractionToolInfo] = Field(
        None,
        title="Tool Info",
        description=(
            "Present only for job_type='tool'. Contains resolved tool metadata "
            "from the toolbox. None for fake jobs."
        ),
    )
    display_name: str = Field(
        ...,
        title="Display Name",
        description=(
            "Human-readable label for the job row. "
            "For tools: tool name from toolbox (e.g., 'Concatenate datasets'). "
            "For input datasets: source description ('Import from History', "
            "'Import from Library', or 'Input Dataset'). "
            "For collection creation: 'Dataset Collection Creation'."
        ),
    )
    is_selectable: bool = Field(
        ...,
        title="Selectable",
        description=(
            "Whether the user can check this job for inclusion in the workflow. "
            "True only for real jobs with workflow-compatible tools. "
            "False for fake jobs and incompatible tools."
        ),
    )
    disabled_reason: Optional[str] = Field(
        None,
        title="Disabled Reason",
        description=(
            "Human-readable explanation when is_selectable=False. "
            "Examples: 'This tool cannot be used in workflows', "
            "'Dataset collection created in a way not compatible with workflows', "
            "'Tool not found in toolbox'."
        ),
    )
    can_be_input: bool = Field(
        ...,
        title="Can Be Input",
        description=(
            "Whether outputs of this job can be marked as workflow inputs. "
            "True for fake jobs (input_dataset, collection_creation). "
            "False for real tool jobs."
        ),
    )
    outputs: List[ExtractionOutputDataset] = Field(
        default_factory=list,
        title="Outputs",
        description="Datasets/collections created by this job, ordered by HID.",
    )
    has_non_deleted_outputs: bool = Field(
        ...,
        title="Has Non-Deleted Outputs",
        description=(
            "True if at least one output is not deleted. "
            "Used to determine default checkbox state: "
            "jobs with all-deleted outputs are unchecked by default."
        ),
    )

Design decisions:

  1. job_type discriminator instead of is_fake boolean: The original plan used is_fake: bool which loses information about which kind of fake job it is. ExtractionJobType with three values gives the client precise knowledge for rendering different UI treatments.

  2. tool_info as optional sub-object instead of flat fields: The original plan mixed tool fields (tool_id, tool_version, is_workflow_compatible) at the job level, creating confusion about which fields are meaningful for fake jobs. Nesting tool info makes the optionality explicit.

  3. is_selectable + can_be_input instead of just is_workflow_compatible: These are the two UI behaviors the client needs. A job is either selectable (checkbox to include the tool step) or its outputs can be marked as inputs (checkbox per output). These are mutually exclusive: is_selectable = real compatible tool job, can_be_input = fake job.

  4. id as plain str instead of EncodedDatabaseIdField: Fake job IDs are strings like "fake_123" which don’t conform to Galaxy’s ID encoding scheme. Using str accommodates both encoded real job IDs and fake job ID strings.

4.1.4 WorkflowExtractionSummary

Top-level response model.

class WorkflowExtractionSummary(BaseModel):
    """Complete extraction summary for a history.

    Contains all data needed to render the workflow extraction UI:
    the list of jobs with their outputs, global warnings, and
    a suggested default workflow name.
    """
    history_id: EncodedDatabaseIdField = Field(
        ...,
        title="History ID",
        description="Encoded database ID of the analyzed history.",
    )
    history_name: str = Field(
        ...,
        title="History Name",
        description="Display name of the history.",
    )
    jobs: List[ExtractionJob] = Field(
        default_factory=list,
        title="Jobs",
        description=(
            "Jobs available for extraction, ordered by the HID of their "
            "first output dataset. This matches the display order in the history panel."
        ),
    )
    warnings: List[str] = Field(
        default_factory=list,
        title="Warnings",
        description=(
            "Global warnings about the extraction. Currently the only warning is "
            "'Some datasets still queued or running were ignored' when the history "
            "contains non-terminal datasets."
        ),
    )
    default_workflow_name: str = Field(
        ...,
        title="Default Workflow Name",
        description="Suggested workflow name. Format: \"Workflow constructed from history '{history_name}'\".",
    )

4.2 Request Models

4.2.1 WorkflowExtractionPayload

Typed model for the extraction submission. This formalizes what is currently parsed from a raw dict.

class WorkflowExtractionPayload(BaseModel):
    """Payload for creating a workflow by extracting from a history.

    Submitted to POST /api/workflows. The from_history_id field triggers
    extraction mode (vs. import, copy, etc.).

    IMPORTANT: dataset_ids and dataset_collection_ids are HIDs (history item
    display numbers), NOT encoded database IDs. This is because the extraction
    engine (extract_steps) uses HIDs to build the workflow step graph via
    hid_to_output_pair mappings.
    """
    from_history_id: EncodedDatabaseIdField = Field(
        ...,
        title="History ID",
        description="Encoded database ID of the history to extract from.",
    )
    workflow_name: str = Field(
        ...,
        title="Workflow Name",
        description="Name for the created workflow.",
        min_length=1,
    )
    job_ids: List[EncodedDatabaseIdField] = Field(
        default_factory=list,
        title="Job IDs",
        description=(
            "Encoded database IDs of real jobs to include as tool steps. "
            "These are the IDs from ExtractionJob.id where job_type='tool'. "
            "Decoded to integers before passing to extract_workflow()."
        ),
    )
    dataset_ids: List[int] = Field(
        default_factory=list,
        title="Dataset HIDs",
        description=(
            "History item display numbers (HIDs) of datasets to include as "
            "workflow inputs. These are ExtractionOutputDataset.hid values "
            "where history_content_type='dataset'. NOT encoded database IDs."
        ),
    )
    dataset_collection_ids: List[int] = Field(
        default_factory=list,
        title="Dataset Collection HIDs",
        description=(
            "History item display numbers (HIDs) of collections to include as "
            "workflow inputs. These are ExtractionOutputDataset.hid values "
            "where history_content_type='dataset_collection'. NOT encoded database IDs."
        ),
    )
    dataset_names: Optional[List[str]] = Field(
        None,
        title="Dataset Input Names",
        description=(
            "Custom names for dataset inputs, parallel to dataset_ids. "
            "If provided, must be same length as dataset_ids. "
            "Used as labels for data_input workflow steps."
        ),
    )
    dataset_collection_names: Optional[List[str]] = Field(
        None,
        title="Dataset Collection Input Names",
        description=(
            "Custom names for collection inputs, parallel to dataset_collection_ids. "
            "If provided, must be same length as dataset_collection_ids. "
            "Used as labels for data_collection_input workflow steps."
        ),
    )

Why dataset_ids are HIDs, not encoded IDs: This is the most surprising aspect of the extraction API. The reason is that extract_steps() builds a hid_to_output_pair dict that maps HIDs to workflow steps. Input datasets are identified by their HID in the history, and tool steps reference their inputs by the HID of the input dataset. This is a fundamental design choice in the extraction engine that predates the API layer. Changing it would require rewriting extract_steps().


5. Data Flow

5.1 Summary Endpoint Flow

Client: GET /api/histories/{encoded_id}/extraction_summary


FastAPIHistories.extraction_summary(history_id: DecodedDatabaseIdField)
  │  history_id is auto-decoded to int

HistoriesService.get_extraction_summary(trans, history_id)

  ├─ history = self.manager.get_accessible(history_id, trans.user, ...)
  │    └─ raises ObjectNotFound or ItemAccessibilityException

  ├─ jobs_dict, warnings = summarize(trans, history)
  │    └─ WorkflowSummary.__init__(trans, history)
  │         ├─ Iterates history.visible_contents
  │         ├─ For HDAs: finds creating_job_associations or creates FakeJob
  │         ├─ For HDCAs: finds creating_job_associations or creates DatasetCollectionCreationJob
  │         ├─ Tracks hid mappings for copied datasets
  │         └─ Filters out non-ready datasets (adds warning)

  ├─ For each (job, datasets) pair:
  │    ├─ Determine job_type (tool / input_dataset / collection_creation)
  │    ├─ If tool: resolve from toolbox, check is_workflow_compatible, check version
  │    ├─ Build ExtractionOutputDataset for each output
  │    │    └─ Include collection_type from WorkflowSummary.collection_types
  │    └─ Build ExtractionJob

  └─ Return WorkflowExtractionSummary


Client receives JSON response

5.2 Extraction Submission Flow

Client: POST /api/workflows
  Body: { from_history_id, workflow_name, job_ids, dataset_ids, ... }


WorkflowsAPIController.create(trans, payload)
  │  Detects "from_history_id" in payload

  ├─ Decode from_history_id → int
  ├─ Decode each job_id → int
  ├─ dataset_ids remain as int[] (HIDs)
  ├─ dataset_collection_ids remain as int[] (HIDs)

  ├─ history = history_manager.get_accessible(...)

  ├─ stored_workflow = extract_workflow(trans, user, history, ...)
  │    ├─ extract_steps(trans, history, job_ids, dataset_ids, ...)
  │    │    ├─ Creates WorkflowSummary for the history
  │    │    ├─ For each dataset_id (HID): creates data_input WorkflowStep
  │    │    ├─ For each dataset_collection_id (HID): creates data_collection_input WorkflowStep
  │    │    │    └─ Looks up collection_type from WorkflowSummary.collection_types[hid]
  │    │    ├─ For each job_id: creates tool WorkflowStep
  │    │    │    ├─ Calls step_inputs(trans, job) → (tool_inputs, associations)
  │    │    │    │    └─ associations = [(input_hid, param_name), ...]
  │    │    │    └─ Creates WorkflowStepConnections via hid_to_output_pair lookups
  │    │    └─ Returns ordered step list
  │    │
  │    ├─ Creates Workflow model, attaches steps
  │    ├─ attach_ordered_steps() - establishes step ordering
  │    ├─ order_workflow_steps_with_levels() - calculates canvas positions
  │    └─ Creates and persists StoredWorkflow

  └─ Returns { id, name, url, ... }

5.3 Client-Side Data Mapping

When the Vue component submits the form, it must correctly map the ExtractionJob/ExtractionOutputDataset data to the submission payload:

User selections in UI → WorkflowExtractionPayload:

For each selected ExtractionJob where job_type = "tool":
  → job_ids.push(job.id)     // encoded database ID

For each ExtractionOutputDataset marked as input:
  if history_content_type == "dataset":
    → dataset_ids.push(output.hid)        // HID, not encoded ID
    → dataset_names.push(customName)
  if history_content_type == "dataset_collection":
    → dataset_collection_ids.push(output.hid)    // HID, not encoded ID
    → dataset_collection_names.push(customName)

6. Edge Cases and Special Types

6.1 FakeJob (Input Datasets)

Trigger: An HDA exists in the history with no creating_job_associations. Common cases:

  • Uploaded datasets
  • Datasets imported from another history (copied_from_history_dataset_association)
  • Datasets imported from a data library (copied_from_library_dataset_dataset_association)

API representation:

{
    "id": "fake_12345",
    "job_type": "input_dataset",
    "tool_info": null,
    "display_name": "Import from History",
    "is_selectable": false,
    "disabled_reason": null,
    "can_be_input": true,
    "outputs": [
        {
            "id": "abc123def456",
            "hid": 1,
            "name": "input.fastq",
            "state": "ok",
            "deleted": false,
            "history_content_type": "dataset",
            "collection_type": null
        }
    ],
    "has_non_deleted_outputs": true
}

Name resolution: FakeJob._guess_name_from_dataset() returns:

  • "Import from History" if dataset.copied_from_history_dataset_association exists
  • "Import from Library" if dataset.copied_from_library_dataset_dataset_association exists
  • None otherwise (displayed as "Input Dataset" in the API)

6.2 DatasetCollectionCreationJob

Trigger: An HDCA exists with no creating_job_associations and either:

  • It’s a non-implicit collection (e.g., user-constructed list)
  • It’s implicit but the creating job can’t be traced through its elements

API representation:

{
    "id": "fake_67890",
    "job_type": "collection_creation",
    "tool_info": null,
    "display_name": "Dataset Collection Creation",
    "is_selectable": false,
    "disabled_reason": "Dataset collection created in a way not compatible with workflows",
    "can_be_input": true,
    "outputs": [
        {
            "id": "xyz789",
            "hid": 3,
            "name": "My Collection",
            "state": "ok",
            "deleted": false,
            "history_content_type": "dataset_collection",
            "collection_type": "list"
        }
    ],
    "has_non_deleted_outputs": true
}

6.3 Real Job with Incompatible Tool

Trigger: A real Job exists but tool.is_workflow_compatible == False.

{
    "id": "encoded_real_job_id",
    "job_type": "tool",
    "tool_info": {
        "tool_id": "upload1",
        "tool_version": "1.0.0",
        "tool_name": "Upload File",
        "is_workflow_compatible": false,
        "version_warning": null
    },
    "display_name": "Upload File",
    "is_selectable": false,
    "disabled_reason": "This tool cannot be used in workflows",
    "can_be_input": false,
    "outputs": [...],
    "has_non_deleted_outputs": true
}

Note: Incompatible real jobs have can_be_input: false. The Mako template treats these the same as compatible jobs (no input checkbox). Only fake jobs get the “treat as input” UI. This is correct behavior - if a real tool ran, its outputs depend on a tool execution, not raw input.

6.4 Real Job with Missing Tool

Trigger: trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version) returns None. This happens when a tool has been uninstalled from the toolbox.

{
    "id": "encoded_real_job_id",
    "job_type": "tool",
    "tool_info": null,
    "display_name": "Unknown Tool",
    "is_selectable": false,
    "disabled_reason": "Tool not found in toolbox",
    "can_be_input": false,
    "outputs": [...],
    "has_non_deleted_outputs": true
}

Design note: tool_info is null when the tool can’t be resolved. This is distinct from fake jobs (where tool_info is also null) - the job_type discriminator tells the client why.

6.5 Real Job with Version Mismatch

{
    "id": "encoded_real_job_id",
    "job_type": "tool",
    "tool_info": {
        "tool_id": "cat1",
        "tool_version": "1.0.0",
        "tool_name": "Concatenate datasets",
        "is_workflow_compatible": true,
        "version_warning": "Dataset was created with tool version \"1.0.0\", but workflow extraction will use version \"2.0.0\"."
    },
    "display_name": "Concatenate datasets",
    "is_selectable": true,
    "disabled_reason": null,
    "can_be_input": false,
    "outputs": [...],
    "has_non_deleted_outputs": true
}

6.6 Non-Ready Datasets

Datasets in states new, running, or queued are excluded from the summary entirely. They don’t appear in any job’s outputs. Instead, a global warning is added:

{
    "warnings": ["Some datasets still queued or running were ignored"],
    "jobs": [...]
}

This filtering happens inside WorkflowSummary.__check_state() before any jobs are built.

6.7 Copied Datasets

When a dataset is copied from another history (or a library), the WorkflowSummary traces back through the copy chain to find the original:

  • hda.copied_from_history_dataset_association chain for HDAs
  • hdca.copied_from_history_dataset_collection_association chain for HDCAs

The original’s creating job is used. The HID in the current history (not the source) is tracked in hda_hid_in_history / hdca_hid_in_history.

6.8 Implicit Collection Mapping

When a tool produces implicit output collections (e.g., running a tool over a list), multiple jobs may produce the same logical output. The WorkflowSummary designates one “representative” job and maps all related job IDs to it via job_id2representative_job. The API only returns the representative job, avoiding duplicates.

6.9 Empty History

{
    "history_id": "abc123",
    "history_name": "Unnamed history",
    "jobs": [],
    "warnings": [],
    "default_workflow_name": "Workflow constructed from history 'Unnamed history'"
}

Valid response. The client should show a message like “No tools have been run in this history.”


7. ID Encoding Strategy

This is the most critical aspect of the API design because the extraction system uses three different ID spaces:

7.1 ID Types in the API

FieldID SpaceFormatExampleWhy
history_id (path/response)Encoded DB IDhex string"f2db41e1fa331b3e"Standard Galaxy pattern
ExtractionJob.id (tool)Encoded DB IDhex string"f2db41e1fa331b3e"Real job, standard encoding
ExtractionJob.id (fake)Synthetic stringfake_\d+"fake_12345"No DB record to encode
ExtractionOutputDataset.idEncoded DB IDhex string"a1b2c3d4e5f6"HDA/HDCA DB ID
ExtractionOutputDataset.hidHistory item numberplain int3Display order in history
job_ids (submission)Encoded DB IDshex strings["f2db41e1fa331b3e"]Decoded to int by API
dataset_ids (submission)HIDsplain ints[1, 3]Used directly by extract_steps
dataset_collection_ids (submission)HIDsplain ints[2]Used directly by extract_steps

7.2 Why ExtractionJob.id Cannot Be EncodedDatabaseIdField

The EncodedDatabaseIdField type has validators that enforce the hex-encoded format. Fake job IDs like "fake_12345" would fail validation. Options considered:

  1. Use str for all job IDs (chosen): Simple, accommodates both formats. Client treats job IDs as opaque strings for the job_ids submission field.

  2. Encode fake job IDs differently: Could encode the underlying dataset ID and prefix with a type marker. Adds complexity with no benefit since fake job IDs are never submitted in job_ids (only real job IDs are).

  3. Use a union type: Union[EncodedDatabaseIdField, str] with validation. Over-engineered.

7.3 Client Guidance

The client should:

  • Display hid to the user (it’s the number they see in the history panel)
  • Use id (encoded DB ID) for any operations that need to reference the specific dataset (e.g., preview links)
  • Use hid for the dataset_ids/dataset_collection_ids submission fields
  • Use ExtractionJob.id directly for the job_ids submission field (it’s already encoded for real jobs)

8. Service Layer Design

8.1 Method Signature

# In HistoriesService:

def get_extraction_summary(
    self,
    trans: ProvidesHistoryContext,
    history_id: DecodedDatabaseIdField,
) -> WorkflowExtractionSummary:

8.2 Implementation Structure

def get_extraction_summary(self, trans, history_id):
    # 1. Access check (reuse existing pattern)
    history = self.manager.get_accessible(history_id, trans.user, current_history=trans.history)

    # 2. Call existing summarize()
    jobs_dict, warnings = summarize(trans, history)

    # 3. Transform each (job, datasets) pair
    extraction_jobs = []
    for job, datasets in jobs_dict.items():
        extraction_job = self._build_extraction_job(trans, job, datasets)
        extraction_jobs.append(extraction_job)

    # 4. Sort by first output HID (matches Mako display order)
    extraction_jobs.sort(key=lambda j: j.outputs[0].hid if j.outputs else 0)

    # 5. Build response
    return WorkflowExtractionSummary(
        history_id=trans.security.encode_id(history.id),
        history_name=history.name,
        jobs=extraction_jobs,
        warnings=list(warnings),
        default_workflow_name=f"Workflow constructed from history '{history.name}'",
    )

8.3 Job Transformation Helper

def _build_extraction_job(self, trans, job, datasets):
    """Transform a summarize() job entry into an ExtractionJob."""

    is_fake = isinstance(job, (FakeJob, DatasetCollectionCreationJob))

    # Determine job type
    if isinstance(job, FakeJob):
        job_type = ExtractionJobType.INPUT_DATASET
    elif isinstance(job, DatasetCollectionCreationJob):
        job_type = ExtractionJobType.COLLECTION_CREATION
    else:
        job_type = ExtractionJobType.TOOL

    # Resolve tool info (real jobs only)
    tool_info = None
    display_name = "Unknown"
    is_selectable = False
    disabled_reason = None
    can_be_input = is_fake

    if job_type == ExtractionJobType.INPUT_DATASET:
        display_name = job.name or "Input Dataset"

    elif job_type == ExtractionJobType.COLLECTION_CREATION:
        display_name = job.name  # "Dataset Collection Creation"
        disabled_reason = job.disabled_why

    elif job_type == ExtractionJobType.TOOL:
        tool = trans.app.toolbox.get_tool(job.tool_id, tool_version=job.tool_version)
        if tool is None:
            display_name = "Unknown Tool"
            disabled_reason = "Tool not found in toolbox"
        else:
            display_name = tool.name
            is_selectable = tool.is_workflow_compatible
            if not tool.is_workflow_compatible:
                disabled_reason = "This tool cannot be used in workflows"

            version_warning = None
            if tool.version != job.tool_version:
                version_warning = (
                    f'Dataset was created with tool version "{job.tool_version}", '
                    f'but workflow extraction will use version "{tool.version}".'
                )

            tool_info = ExtractionToolInfo(
                tool_id=job.tool_id,
                tool_version=job.tool_version,
                tool_name=tool.name,
                is_workflow_compatible=tool.is_workflow_compatible,
                version_warning=version_warning,
            )

    # Build output list
    outputs = self._build_extraction_outputs(trans, datasets)
    has_non_deleted = any(not o.deleted for o in outputs)

    # Encode job ID
    if is_fake:
        job_id = str(job.id)  # "fake_12345"
    else:
        job_id = trans.security.encode_id(job.id)

    return ExtractionJob(
        id=job_id,
        job_type=job_type,
        tool_info=tool_info,
        display_name=display_name,
        is_selectable=is_selectable,
        disabled_reason=disabled_reason,
        can_be_input=can_be_input,
        outputs=outputs,
        has_non_deleted_outputs=has_non_deleted,
    )

8.4 Output Transformation Helper

def _build_extraction_outputs(self, trans, datasets):
    """Transform summarize() dataset tuples into ExtractionOutputDataset list."""
    outputs = []
    for _output_name, data in datasets:
        # Determine collection_type if applicable
        collection_type = None
        if hasattr(data, 'collection') and data.collection:
            collection_type = data.collection.collection_type

        outputs.append(ExtractionOutputDataset(
            id=trans.security.encode_id(data.id),
            hid=data.hid,
            name=data.display_name() if hasattr(data, 'display_name') else data.name,
            state=data.state or "queued",
            deleted=data.deleted,
            history_content_type=data.history_content_type,
            collection_type=collection_type,
        ))
    return outputs

9. Integration with Existing Infrastructure

9.1 Endpoint Registration

The endpoint is added as a method on the existing FastAPIHistories CBV class:

@router.get(
    "/api/histories/{history_id}/extraction_summary",
    summary="Get workflow extraction summary for a history.",
    responses={
        403: {"description": "Not authorized to access this history"},
        404: {"description": "History not found"},
    },
)
def extraction_summary(
    self,
    history_id: HistoryIDPathParam,
    trans: ProvidesHistoryContext = DependsOnTrans,
) -> WorkflowExtractionSummary:
    return self.service.get_extraction_summary(trans, history_id)

No manual router registration needed - Galaxy auto-discovers routers.

9.2 Schema Integration

The new file lib/galaxy/schema/workflow_extraction.py follows existing patterns:

  • Uses EncodedDatabaseIdField from galaxy.schema.fields
  • Uses BaseModel (not Model) since these aren’t tied to ORM classes
  • Uses Field(...) with title and description for OpenAPI docs

9.3 Import Path for FakeJob / DatasetCollectionCreationJob

from galaxy.workflow.extract import (
    DatasetCollectionCreationJob,
    FakeJob,
    summarize,
)

Both classes are defined at module level in galaxy.workflow.extract and can be imported directly for isinstance() checks.

9.4 OpenAPI Schema Generation

The response model generates OpenAPI 3.0 schema automatically. Example generated schema fragment:

WorkflowExtractionSummary:
  type: object
  required: [history_id, history_name, default_workflow_name]
  properties:
    history_id:
      type: string
      example: "0123456789ABCDEF"
    history_name:
      type: string
    jobs:
      type: array
      items:
        $ref: '#/components/schemas/ExtractionJob'
    warnings:
      type: array
      items:
        type: string
    default_workflow_name:
      type: string

10. Performance Considerations

10.1 Current Performance Profile

The summarize() function iterates history.visible_contents, which lazy-loads from the DB. For each content item, it:

  1. Checks creating_job_associations (N+1 query risk)
  2. Follows copy chains (copied_from_*) to trace originals
  3. For real jobs, the service additionally calls toolbox.get_tool() per job

Expected bottlenecks:

  • Histories with 100+ items: many DB queries for job associations
  • Histories with many copied datasets: chain-following is sequential
  • Large toolboxes: get_tool() does a dict lookup, should be O(1)

10.2 No Pagination Initially

The Mako template loads everything at once. The Vue component should match this behavior initially. Adding pagination would require changes to summarize() which currently returns a complete dict.

10.3 Future Optimization Path

If performance becomes an issue:

  1. Eager-load job associations: Modify the visible_contents query to join-load creating_job_associations (eliminates N+1)
  2. Cache toolbox lookups: The toolbox is already cached in memory; no concern here
  3. Paginate: Add limit/offset query params and modify summarize() to accept bounds. Would require tracking total count separately.

10.4 Response Size Estimate

For a history with 50 jobs averaging 2 outputs each:

  • ~50 ExtractionJob objects × ~200 bytes each = ~10KB
  • ~100 ExtractionOutputDataset objects × ~150 bytes each = ~15KB
  • Total: ~25KB JSON response

For 500 jobs: ~250KB. Acceptable without pagination.


11. Backwards Compatibility

11.1 No Breaking Changes

The new GET endpoint is additive. The existing POST /api/workflows extraction path is unchanged. No existing clients are affected.

11.2 URL Redirect

The legacy URL /workflow/build_from_current_history?history_id=X should redirect to /workflows/extract?history_id=X. This can be done in the legacy controller before removing it, or via a route-level redirect.

11.3 Typed Payload Migration

The WorkflowExtractionPayload model can be introduced without breaking existing clients:

  1. Add the model to the schema
  2. In the create method, validate the payload against the model when from_history_id is present
  3. Existing dict-based clients will still work because Pydantic parses dicts

11.4 Client TypeScript Types

The TypeScript interfaces should be generated from the OpenAPI schema rather than manually maintained. Galaxy uses openapi-typescript for this. After the backend is implemented, run the schema generator to produce typed client code.


12. Unresolved Questions

  1. Should incompatible real tool jobs allow “treat as input”? The Mako template does NOT allow this - only fake jobs get the input checkbox. But a case could be made that outputs of incompatible tools (e.g., data_source tools) could serve as workflow inputs. The current plan preserves Mako behavior.

  2. Should the extraction summary include the output_name from summarize()? Currently discarded (stored as _output_name in the output builder). This is the output port name on the tool (e.g., "out_file1"). Not needed for the UI but could be useful for debugging or advanced features.

  3. Should WorkflowExtractionPayload be a formal discriminated union branch of the workflow creation payload? Currently the POST /api/workflows endpoint uses a raw dict with branching logic. A cleaner design would be a discriminated union, but that’s a larger refactor.

  4. How to handle DatasetCollectionCreationJob with from_jobs? When a collection creation job tracks its source jobs via set_jobs(), should the API expose that relationship? Currently not exposed.

  5. Should the API include tool parameter details for real jobs? The Mako template doesn’t show tool parameters, but exposing them could enable a “preview extraction” feature. Would require calling step_inputs() for each job, which is expensive.

  6. Accessibility in the new API: The API itself is accessible by design (structured data). But the Vue component needs to use proper ARIA labels for checkboxes. The API should include enough context (display_name, disabled_reason) for the client to generate good ARIA labels.

  7. Large history performance: Should summarize() be modified to accept bounds? Or should the API response be cached? The Mako template worked fine for histories with hundreds of items, so this may not be a real problem.

Incoming References (2)

  • Pr 21935 Workflow Extraction Vue Conversion related note — Mako to Vue conversion of workflow extraction UI with new HID-keyed FastAPI extraction summary and extract workflow endpoints replacing legacy controller
  • Pr 22706 Workflow Extraction By Ids related note — ID-based workflow extraction endpoint selecting implicit collection jobs by encoded id instead of HID inference for map-over steps