tasks

Galaxy Task Management with Celery

Learning Questions

Learning Objectives

Processing requests on the server

Avoid Doing Work in Web Threads

Web servers are a terrible place to do work. Traditional Python WSGI servers are meant for processing requests that take less a minute - they are meant for long running tasks.

This request/response cycle is inappropriate for deleting all the files in a history, submitted 10,000 batch jobs for a collection, building a zip file for a library folder.

Zoom in on Backend

Infrastructure including Celery

Celery Overview

Downsides of Celery

Adds more complexity to deploying Galaxy. Celery needs to be available to Galaxy at runtime, production Galaxy instances need a broker and a backend.

Gravity + Celery

.code[``` $ galaxy Registered galaxy config: /home/nate/work/galaxy/config/galaxy.yml Creating or updating service gunicorn Creating or updating service celery Creating or updating service celery-beat celery: added process group 2022-01-20 14:44:24,619 INFO spawned: ‘celery’ with pid 291651 celery-beat: added process group 2022-01-20 14:44:24,620 INFO spawned: ‘celery-beat’ with pid 291652 gunicorn: added process group 2022-01-20 14:44:24,622 INFO spawned: ‘gunicorn’ with pid 291653 celery STARTING celery-beat STARTING gunicorn STARTING ==> /home/nate/work/galaxy/database/gravity/log/gunicorn.log <== …log output follows…


![Celery tasks being registered at startup](https://jmchilton.github.io/galaxy-architecture/_images/celery_tasks_list.png)

## Declaring a Task

- Placed in `galaxy.celery.tasks`.
- We've placed a layer around Celery to mirror what we're with API endpoints.
  - Typed functions with Pydantic inputs implicitly mapped.
  - Implicit type based dependency injection from Galaxy's DI container (using Lagom)
  - Feels a lot like writing an API endpoint.

## A Simple Task

.code[```
@galaxy_task(
    ignore_result=True,
    action="setting up export history job"
)
def export_history(
    model_store_manager: ModelStoreManager,
    request: SetupHistoryExportJob,
):
    model_store_manager.setup_history_export_job(request)
```]

## The `galaxy_task` Decorator

.code[```python
@galaxy_task(
    ignore_result=True,
    action="setting up export history job"
)
def export_history(
    model_store_manager: ModelStoreManager,
    request: SetupHistoryExportJob,
):
    model_store_manager.setup_history_export_job(request)
```]

- `galaxy_task` is a wrapper around Celery's `task` decorator
- Wrap a simple function to turn it into a task.
- Ensure all inputs are JSON serializable or components in Galaxy's dependency injection container

## Celery and Pydantic

The `request` argument to `export_history` is a Pydantic model type named
`SetupHistoryExportJob`. These are mostly defined in `galaxy.schema.tasks`.

.code[```python
from pydantic import BaseModel

class SetupHistoryExportJob(BaseModel):
    history_id: int
    job_id: int
    store_directory: str
    include_files: bool
    include_hidden: bool
    include_deleted: bool
```]

## Celery and Pydantic - Implementation

- Custom JSON encoding and decoding to adapt Celery to Pydantic.
- Implemented in `galaxy.celery._serialization`.
- Inject `__type__` and `__class__` attributes into JSON description.
- `@galaxy_task` decorator sets Celery `serializer` attribute.

## Celery and Dependency Injection

.code[```
@galaxy_task(
    ignore_result=True,
    action="setting up export history job"
)
def export_history(
    model_store_manager: ModelStoreManager,
    request: SetupHistoryExportJob,
):
    model_store_manager.setup_history_export_job(request)
```]

- The type declaration on `model_store_manager` of `ModelStoreManager` causes
  the Galaxy manager object of this class to be passed to the function when the
  task is running.
- Client does not need to have any knowledge of this class.

## Executing Tasks from Galaxy

See `lib/galaxy/tools/imp_exp/__init__.py`:

.code[```python
from galaxy.schema.tasks import SetupHistoryExportJob

...

    request = SetupHistoryExportJob(
        history_id=history.id,
        job_id=self.job_id,
        store_directory=store_directory,
        include_files=True,
        include_hidden=include_hidden,
        include_deleted=include_deleted,
    )
    export_history.delay(request=request)

```]

The delay method is created implicitly from the `galaxy_task` decorator.

## Best Practices

- Place tasks in `galaxy.celery.tasks`.
- Keep the tasks as thin as possible (ideally simply delegate inputs
  to a manager or another Galaxy component independent of Celery).
- Ensure required/injected Galaxy components as small and decomposed as
  possible.
- Place new request definition argument types in `galaxy.schema.tasks`.

## Existing Tasks Success Stories



## PDF Export Problems

- We added PDF export of Galaxy Markdown using weasyprint
- Generation of PDF took too long, feature was quite unstable

## Short Term Storage (STS)

- A Galaxy component for managing user downloadable files that only
  need to exist for a little time.
- Traditionally, these kind of files have required a lot of
  hacking to do well in Galaxy (tracking transient request-like stuff in data model, etc..)
- Not just unoptimized by default, but unusable
- Required customizing nginx routes, special web server plugins, etc...

https://github.com/galaxyproject/galaxy/pull/13691

.code[```
class GeneratePdfDownload(BaseModel):
    short_term_storage_request_id: str
    basic_markdown: str
    document_type: PdfDocumentType
```]

## Robust PDF Export

.code[```python
from galaxy.managers.markdown_util import generate_branded_pdf

@galaxy_task(
    action="preparing Galaxy Markdown PDF for download"
)
def prepare_pdf_download(
    request: GeneratePdfDownload,
    config: GalaxyAppConfiguration,
    short_term_storage_monitor: ShortTermStorageMonitor,
):
    generate_branded_pdf(
        request,
        config,
        short_term_storage_monitor,
    )
```]

## Exporting Histories, Invocations, Libraries

.code[```
@galaxy_task(
    action="generate and stage a workflow invocation store for download"
)
def prepare_invocation_download(
    model_store_manager: ModelStoreManager,
    request: GenerateInvocationDownload,
):
    model_store_manager.prepare_invocation_download(
        request
    )
```]

https://github.com/galaxyproject/galaxy/pull/12533

## Optimized Uploads

- Decomposed job handling, precursor to migrating more job components to Celery
- Converting uploads to tasks signficantly sped up running Galaxy tests
  - API tests went from 2.5 hours to 50 minutes
  - Amazing speed up for small jobs
- Exploring task composition

https://github.com/galaxyproject/galaxy/pull/13655

## Uploads - Task Composition

See `lib/galaxy/tools/execute.py`

.code[```
async_result = (
    setup_fetch_data.s(job_id, raw_tool_source=raw_tool_source)
    | fetch_data.s(job_id=job_id).set(queue="galaxy.external")
    | set_job_metadata.s(
        extended_metadata_collection="extended" in tool.app.config.metadata_strategy,
        job_id=job_id,
    ).set(
        queue="galaxy.external",
        link_error=finish_job.si(job_id=job_id, raw_tool_source=raw_tool_source)
    )
    | finish_job.si(job_id=job_id, raw_tool_source=raw_tool_source)
)()
```]

## Batch Operations

Task-based operations enable the most expensive of the new history's batch operations.

- Changing datatypes
- Purging datasets

https://github.com/galaxyproject/galaxy/pull/14042

## Future Work

- *Migrating tool submission to tasks*
- Workflow scheduling
- Importing shared histories

https://github.com/galaxyproject/galaxy/issues/11721

## Key Takeaways
- Web servers are inappropriate for long-running work
- Celery handles async task execution
- Gravity manages Celery processes
- Tasks use typed functions with Pydantic
- DI works in tasks just like controllers