Process Development
The package s2gos_common
provides a simple processor development framework that
- supports registration of processes from Python functions,
- supports progress reporting by subscriber callback URLs, and
- provides a command-line interface (CLI) to query and execute the registered processes.
Processor packages developed using the provided CLI can later on be used to generate Docker images, Airflow DAGs, and optionally OGC Application Packages.
You find the processor framework in the s2gos_common.processes
package.
It comprises just a few handy top-level components:
- class
ProcessRegistry
- to register your Python functions as processes in a central collection. - class
JobContext
- used inside your process implementations to report progress or check for client-side cancellation. - function
get_cli()
- generates a CLI for the processes in the registry.
Development Recipe
Framework usage is simple, it is a 3-step process:
- Populate process registry with processes derived from your Python functions.
- Define a CLI instance from that process registry.
- Define an entry point script for the CLI instance, so you can run your package as an application.
The steps are explained in more detail in the following.
1. Populate process registry
First, you'll create a process registry object of type ProcessRegistry
.
Use the registry's process
decorator to register your Python functions
that should be exposed as processes. In my_package/processes.py
:
from s2gos_common.process import JobContext, ProcessRegistry
registry = ProcessRegistry()
@registry.process(id="my-process-1")
def my_process_1(path: str, threshold: float = 0.5) -> str:
ctx = JobContext.get()
...
ctx.report_progress(progress=15, message="Initialized sources")
...
@registry.process(id="my-process-2")
def my_process_2(ctx: JobContext, path: str, factor: float = 1.0) -> str:
...
The ctx
object of type JobContext
can be used to report progress and to check for job cancellation.
You can get the job context inside the function body via JobContext.get()
or declare it as a function argument of type JobContext
.
Process inputs, such as the arguments path
or factor
above,
can be further specified by
pydantic.Field
annotations.
Field annotations for an argument can be provided via the input_fields
dictionary
passed to the process
decorator,
or preferably as part of the type declaration using the Python Annotated
special form. An example for the latter is
factor: Annotated[float, Field(title="Scaling factor", gt=0., le=10.)] = 1.0
.
Should your process have many arguments, you may consider defining them elsewhere
in a dedicated pydantic Model derived from
pydantic.BaseModel
, pass
it as single parameter to your function, and pass inputs_arg=True
to the
@process
decorator. Now the generated process description will report the class'
fields as inputs rather than the model class as single input. Conceptually:
from typing import Annotated
from pydantic import BaseModel, Field
from s2gos_common.process import JobContext, ProcessRegistry
class ArgsModel(BaseModel):
# Required positional arguments
arg1: Annotated[Type1, Field(..., **Specs1]
arg2: Annotated[Type2, Field(..., **Specs2]
# ...
# Optional keyword arguments
kwarg1: Annotated[Type1, Field(..., Specs1] = Default1
kwarg2: Annotated[Type2, Field(..., Specs2] = Default2
# ...
registry = ProcessRegistry()
@registry.process(inputs_arg=True)
def my_func(args: ArgsModel) -> MyResult:
...
2. Define CLI instance
In a second step you define an instance of a common process CLI and pass it
a reference to your registry instance. In my_package/cli.py
:
from s2gos_common.process.cli.cli import get_cli
# The CLI with a basic set of commands.
# The `cli` is a Typer application of type `typer.Typer()`,
# so can use the instance to register your own commands.
cli = get_cli("my_package.processes:registry")
You could also pass the imported registry directly, but using a
reference string defers importing the registry instance until it is
needed. This makes the CLI much faster if it is just called with
the --help
option and hence no importing of yet unused libraries
takes place.
3. Define entry point script
In a last step you expose the CLI as an entry point script of your package.
In your pyproject.toml
:
[project.scripts]
my-tool = "my_package.cli:cli"
After installing my_package
in a Python environment using pip
or pixi
you can run your CLI as an executable and my-tool --help
will output:
Usage Example
Example project setup
An application example that can serve as a starting point is provided in the workspace
s2gos-app-ex. Please check out its README.md
to install and run it.
The application's primary user interface is its simple, generated CLI
(you can extend it, if you like). For the above application example the CLI tool
is named s2gos-app-ex
.
Getting process information
Use list-processes
(or short lp
) subcommand to list the published processes, and use
get-process
(or short gp
) to get the details like the inputs of your your process.
The command s2gos-app-ex gp primes_between
will give you the input specs of the
published process primes_between
:
{
"title": "Prime Generator",
"description": "Computes the list of prime numbers within an integer value range.",
"id": "primes_between",
"version": "0.0.0",
"inputs": {
"min_val": {
"title": "Minimum value of search range",
"minOccurs": 0,
"schema": {
"minimum": 0.0,
"type": "integer",
"default": 0
}
},
"max_val": {
"title": "Maximum value of search range",
"minOccurs": 0,
"schema": {
"minimum": 0.0,
"type": "integer",
"default": 100
}
}
},
"outputs": {
"return_value": {
"title": "Return Value",
"schema": {
"type": "array",
"items": {
"type": "integer"
}
}
}
}
}
Executing a process
To execute your processes, see help for the execute-process
(or short ep
)
subcommand:
Execution request files
For larger or complex sets of input parameters it is recommended to use a execution request file in JSON or YAML format. The structure is simple, for example:
{
"process_id": "primes_between",
"inputs": {
"min_val": 100,
"max_val": 200
}
}
The process request file format in detail:
process_id
: Process identifierdotpath
: Whether dots in input names should be used to create nested object values. Defaults toFalse
.inputs
: Optional process inputs given as key-value mapping. Values may be of any JSON-serializable type accepted by the given process.outputs
: Optional process outputs given as key-value mapping. Values are of type [Output][s2gos_common.models.Output] and should be supported by the given process.subscriber
: Optional object comprising callback URLs that are informed about process status changes while the processing takes place. The URLs aresuccessUri
,inProgressUri
, andfailedUri
and none is required. See also [Subscriber][s2gos_common.models.Subscriber].
Framework API
s2gos_common.process.ProcessRegistry
Bases: Mapping[str, Process]
A registry for processes.
Processes are Python functions with extra metadata.
Represents a read-only mapping from process identifiers to Process instances.
process(function=None, /, *, id=None, version=None, title=None, description=None, input_fields=None, output_fields=None, inputs_arg=False)
A decorator that can be applied to a user function in order to register it as a process in this registry.
The decorator can be used with or without parameters.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
function
|
Optional[Callable]
|
The decorated function that is passed automatically since
|
None
|
id
|
Optional[str]
|
Optional process identifier. Must be unique within the registry. If not provided, the fully qualified function name will be used. |
None
|
version
|
Optional[str]
|
Optional version identifier. If not provided, |
None
|
title
|
Optional[str]
|
Optional, short process title. |
None
|
description
|
Optional[str]
|
Optional, detailed description of the process. If not provided, the function's docstring, if any, will be used. |
None
|
input_fields
|
Optional[dict[str, FieldInfo]]
|
Optional mapping from function argument names
to |
None
|
output_fields
|
Optional[dict[str, FieldInfo]]
|
Mapping from output names to
|
None
|
inputs_arg
|
str | bool
|
Specifies the use of an inputs argument. An inputs argument
is a container for the actual process inputs. If specified, it must
be the only function argument (besides an optional job context
argument) and must be a subclass of |
False
|
s2gos_common.process.Process
dataclass
A process comprises a process description and executable code in form of a Python function.
Instances of this class are be managed by the ProcessRegistry.
Attributes:
Name | Type | Description |
---|---|---|
function |
Callable
|
The user's Python function. |
signature |
Signature
|
The signature of |
job_ctx_arg |
str | None
|
Names of |
model_class |
type[BaseModel]
|
Pydantic model class for the arguments of |
description |
ProcessDescription
|
Process description modelled after OGC API - Processes - Part 1: Core. |
create(function, id=None, version=None, title=None, description=None, input_fields=None, output_fields=None, inputs_arg=False)
classmethod
Create a new instance of this dataclass.
Called by the ProcessRegistry.process()
decorator function.
Not intended to be used by clients.
s2gos_common.process.JobContext
Bases: ABC
Report process progress and check for task cancellation.
A process function can retrieve the current job context
- via JobContext.get() from within a process function, or
- as a function argument of type JobContext.
get()
classmethod
Get the current job context.
Returns the current job context that can be used by process functions to report job progress in percent or via messages and to check whether cancellation has been requested. This function is intended to be called from within a process function executed as a job. If called as a usual Python function (without a job serving as context), the returned context will have no-op methods only.
Returns:
Type | Description |
---|---|
JobContext
|
An instance of the current job context. |
report_progress(progress=None, message=None)
abstractmethod
Report task progress.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
progress
|
Optional[int]
|
Progress in percent. |
None
|
message
|
Optional[str]
|
Detail progress message. |
None
|
Raises:
Type | Description |
---|---|
JobCancellationException
|
if an attempt has been made to cancel this job. |
is_cancelled()
abstractmethod
Test whether an attempt has been made to cancel this job. It may still be running though.
Returns:
Type | Description |
---|---|
bool
|
|
check_cancelled()
abstractmethod
Raise a JobCancellationException
, if
an attempt has been made to cancel this job.
s2gos_common.process.JobCancelledException
Bases: Exception
Raised if a job's cancellation has been requested.
s2gos_common.process.get_cli(process_registry, **kwargs)
Get the CLI instance configured to use the process registry that is given either by
- a reference of the form "path.to.module:attribute",
- or process registry instance,
- or as a no-arg process registry getter function.
The process registry is usually a singleton in your application.
The context object obj
of the returned CLI object
will be of type dict
and will contain
a process registry getter function using the key
get_process_registry
.
The function must be called before any CLI command or
callback has been invoked. Otherwise, the provided
get_process_registry
getter will not be recognized and
all commands that require the process registry will
fail with an AssertionError
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_registry
|
Union[str, ProcessRegistry, Callable[[], ProcessRegistry]]
|
A registry reference string, or a registry instance, or a no-arg function that returns a registry instance. |
required |
kwargs
|
Additional context values that will be registered in the |
{}
|
s2gos_common.process.ExecutionRequest
Bases: ProcessRequest
Process execution request as used by the CLI. Extends [ProcessRequest][] to allow the process identifier being part of the request.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
process_id
|
Process identifier |
required | |
dotpath
|
Whether dots in input names should be used to create
nested object values. Defaults to |
required | |
inputs
|
Optional process inputs given as key-value mapping. Values may be of any JSON-serializable type accepted by the given process. |
required | |
outputs
|
Optional process outputs given as key-value mapping. Values are of type [Output][s2gos_common.models.Output] supported by the given process. |
required | |
subscriber
|
Optional subscriber of type [Subscriber][s2gos_common.models.Subscriber] comprising callback URLs that are informed about process status changes while the processing takes place. |
required |