Skip to content

Process Development

The package s2gos_common provides a simple processor development framework that

  • supports registration of processes from Python functions,
  • supports progress reporting by subscriber callback URLs, and
  • provides a command-line interface (CLI) to query and execute the registered processes.

Processor packages developed using the provided CLI can later on be used to generate Docker images, Airflow DAGs, and optionally OGC Application Packages.

You find the processor framework in the s2gos_common.processes package. It comprises just a few handy top-level components:

  • class ProcessRegistry - to register your Python functions as processes in a central collection.
  • class JobContext - used inside your process implementations to report progress or check for client-side cancellation.
  • function get_cli() - generates a CLI for the processes in the registry.

Development Recipe

Framework usage is simple, it is a 3-step process:

  1. Populate process registry with processes derived from your Python functions.
  2. Define a CLI instance from that process registry.
  3. Define an entry point script for the CLI instance, so you can run your package as an application.

The steps are explained in more detail in the following.

1. Populate process registry

First, you'll create a process registry object of type ProcessRegistry. Use the registry's process decorator to register your Python functions that should be exposed as processes. In my_package/processes.py:

from s2gos_common.process import JobContext, ProcessRegistry

registry = ProcessRegistry()


@registry.process(id="my-process-1")
def my_process_1(path: str, threshold: float = 0.5) -> str:
    ctx = JobContext.get()
    ...
    ctx.report_progress(progress=15, message="Initialized sources")
    ...


@registry.process(id="my-process-2")
def my_process_2(ctx: JobContext, path: str, factor: float = 1.0) -> str:
    ...

The ctx object of type JobContext can be used to report progress and to check for job cancellation. You can get the job context inside the function body via JobContext.get() or declare it as a function argument of type JobContext.

Process inputs, such as the arguments path or factor above, can be further specified by pydantic.Field annotations. Field annotations for an argument can be provided via the input_fields dictionary passed to the process decorator, or preferably as part of the type declaration using the Python Annotated special form. An example for the latter is factor: Annotated[float, Field(title="Scaling factor", gt=0., le=10.)] = 1.0.

Should your process have many arguments, you may consider defining them elsewhere in a dedicated pydantic Model derived from pydantic.BaseModel, pass it as single parameter to your function, and pass inputs_arg=True to the @process decorator. Now the generated process description will report the class' fields as inputs rather than the model class as single input. Conceptually:

from typing import Annotated

from pydantic import BaseModel, Field
from s2gos_common.process import JobContext, ProcessRegistry


class ArgsModel(BaseModel):
    # Required positional arguments
    arg1: Annotated[Type1, Field(..., **Specs1]
    arg2: Annotated[Type2, Field(..., **Specs2]
    # ...
    # Optional keyword arguments
    kwarg1: Annotated[Type1, Field(..., Specs1] = Default1
    kwarg2: Annotated[Type2, Field(..., Specs2] = Default2
    # ...


registry = ProcessRegistry()

@registry.process(inputs_arg=True)
def my_func(args: ArgsModel) -> MyResult:
    ...

2. Define CLI instance

In a second step you define an instance of a common process CLI and pass it a reference to your registry instance. In my_package/cli.py:

from s2gos_common.process.cli.cli import get_cli

# The CLI with a basic set of commands.
# The `cli` is a Typer application of type `typer.Typer()`,
# so can use the instance to register your own commands.
cli = get_cli("my_package.processes:registry")

You could also pass the imported registry directly, but using a reference string defers importing the registry instance until it is needed. This makes the CLI much faster if it is just called with the --help option and hence no importing of yet unused libraries takes place.

3. Define entry point script

In a last step you expose the CLI as an entry point script of your package. In your pyproject.toml:

[project.scripts]
my-tool = "my_package.cli:cli"

After installing my_package in a Python environment using pip or pixi you can run your CLI as an executable and my-tool --help will output:

process-cli.png

Usage Example

Example project setup

An application example that can serve as a starting point is provided in the workspace s2gos-app-ex. Please check out its README.md to install and run it.

The application's primary user interface is its simple, generated CLI (you can extend it, if you like). For the above application example the CLI tool is named s2gos-app-ex.

Getting process information

Use list-processes (or short lp) subcommand to list the published processes, and use get-process (or short gp) to get the details like the inputs of your your process. The command s2gos-app-ex gp primes_between will give you the input specs of the published process primes_between:

{
  "title": "Prime Generator",
  "description": "Computes the list of prime numbers within an integer value range.",
  "id": "primes_between",
  "version": "0.0.0",
  "inputs": {
    "min_val": {
      "title": "Minimum value of search range",
      "minOccurs": 0,
      "schema": {
        "minimum": 0.0,
        "type": "integer",
        "default": 0
      }
    },
    "max_val": {
      "title": "Maximum value of search range",
      "minOccurs": 0,
      "schema": {
        "minimum": 0.0,
        "type": "integer",
        "default": 100
      }
    }
  },
  "outputs": {
    "return_value": {
      "title": "Return Value",
      "schema": {
        "type": "array",
        "items": {
          "type": "integer"
        }
      }
    }
  }
}

Executing a process

To execute your processes, see help for the execute-process (or short ep) subcommand:

process-ep-cli.png

Execution request files

For larger or complex sets of input parameters it is recommended to use a execution request file in JSON or YAML format. The structure is simple, for example:

{
    "process_id": "primes_between",
    "inputs": {
      "min_val": 100,
      "max_val": 200
    }
}

The process request file format in detail:

  • process_id: Process identifier
  • dotpath: Whether dots in input names should be used to create nested object values. Defaults to False.
  • inputs: Optional process inputs given as key-value mapping. Values may be of any JSON-serializable type accepted by the given process.
  • outputs: Optional process outputs given as key-value mapping. Values are of type [Output][s2gos_common.models.Output] and should be supported by the given process.
  • subscriber: Optional object comprising callback URLs that are informed about process status changes while the processing takes place. The URLs are successUri, inProgressUri, and failedUri and none is required. See also [Subscriber][s2gos_common.models.Subscriber].

Framework API

s2gos_common.process.ProcessRegistry

Bases: Mapping[str, Process]

A registry for processes.

Processes are Python functions with extra metadata.

Represents a read-only mapping from process identifiers to Process instances.

process(function=None, /, *, id=None, version=None, title=None, description=None, input_fields=None, output_fields=None, inputs_arg=False)

A decorator that can be applied to a user function in order to register it as a process in this registry.

The decorator can be used with or without parameters.

Parameters:

Name Type Description Default
function Optional[Callable]

The decorated function that is passed automatically since process() is a decorator function.

None
id Optional[str]

Optional process identifier. Must be unique within the registry. If not provided, the fully qualified function name will be used.

None
version Optional[str]

Optional version identifier. If not provided, "0.0.0" will be used.

None
title Optional[str]

Optional, short process title.

None
description Optional[str]

Optional, detailed description of the process. If not provided, the function's docstring, if any, will be used.

None
input_fields Optional[dict[str, FieldInfo]]

Optional mapping from function argument names to pydantic.Field annotations. The preferred way is to annotate the arguments directly as described in The Annotated Pattern.

None
output_fields Optional[dict[str, FieldInfo]]

Mapping from output names to pydantic.Field annotations. Required, if you have multiple outputs returned as a dictionary. In this case, output names are the keys of your returned dictionary.

None
inputs_arg str | bool

Specifies the use of an inputs argument. An inputs argument is a container for the actual process inputs. If specified, it must be the only function argument (besides an optional job context argument) and must be a subclass of pydantic.BaseModel. If inputs_arg is True the only argument will be the input argument, if inputs_arg is a str it must be the name of the only argument.

False

s2gos_common.process.Process dataclass

A process comprises a process description and executable code in form of a Python function.

Instances of this class are be managed by the ProcessRegistry.

Attributes:

Name Type Description
function Callable

The user's Python function.

signature Signature

The signature of function.

job_ctx_arg str | None

Names of function arguments of type JobContext.

model_class type[BaseModel]

Pydantic model class for the arguments of function.

description ProcessDescription

Process description modelled after OGC API - Processes - Part 1: Core.

create(function, id=None, version=None, title=None, description=None, input_fields=None, output_fields=None, inputs_arg=False) classmethod

Create a new instance of this dataclass.

Called by the ProcessRegistry.process() decorator function. Not intended to be used by clients.

s2gos_common.process.JobContext

Bases: ABC

Report process progress and check for task cancellation.

A process function can retrieve the current job context

  1. via JobContext.get() from within a process function, or
  2. as a function argument of type JobContext.

get() classmethod

Get the current job context.

Returns the current job context that can be used by process functions to report job progress in percent or via messages and to check whether cancellation has been requested. This function is intended to be called from within a process function executed as a job. If called as a usual Python function (without a job serving as context), the returned context will have no-op methods only.

Returns:

Type Description
JobContext

An instance of the current job context.

report_progress(progress=None, message=None) abstractmethod

Report task progress.

Parameters:

Name Type Description Default
progress Optional[int]

Progress in percent.

None
message Optional[str]

Detail progress message.

None

Raises:

Type Description
JobCancellationException

if an attempt has been made to cancel this job.

is_cancelled() abstractmethod

Test whether an attempt has been made to cancel this job. It may still be running though.

Returns:

Type Description
bool

True if so, False otherwise.

check_cancelled() abstractmethod

Raise a JobCancellationException, if an attempt has been made to cancel this job.

s2gos_common.process.JobCancelledException

Bases: Exception

Raised if a job's cancellation has been requested.

s2gos_common.process.get_cli(process_registry, **kwargs)

Get the CLI instance configured to use the process registry that is given either by

  • a reference of the form "path.to.module:attribute",
  • or process registry instance,
  • or as a no-arg process registry getter function.

The process registry is usually a singleton in your application.

The context object obj of the returned CLI object will be of type dict and will contain a process registry getter function using the key get_process_registry.

The function must be called before any CLI command or callback has been invoked. Otherwise, the provided get_process_registry getter will not be recognized and all commands that require the process registry will fail with an AssertionError.

Parameters:

Name Type Description Default
process_registry Union[str, ProcessRegistry, Callable[[], ProcessRegistry]]

A registry reference string, or a registry instance, or a no-arg function that returns a registry instance.

required
kwargs

Additional context values that will be registered in the

{}

s2gos_common.process.ExecutionRequest

Bases: ProcessRequest

Process execution request as used by the CLI. Extends [ProcessRequest][] to allow the process identifier being part of the request.

Parameters:

Name Type Description Default
process_id

Process identifier

required
dotpath

Whether dots in input names should be used to create nested object values. Defaults to False.

required
inputs

Optional process inputs given as key-value mapping. Values may be of any JSON-serializable type accepted by the given process.

required
outputs

Optional process outputs given as key-value mapping. Values are of type [Output][s2gos_common.models.Output] supported by the given process.

required
subscriber

Optional subscriber of type [Subscriber][s2gos_common.models.Subscriber] comprising callback URLs that are informed about process status changes while the processing takes place.

required