dflow package

Subpackages

Submodules

dflow.argo_objects module

class dflow.argo_objects.ArgoObjectDict(d)

Bases: UserDict

Generate ArgoObjectDict and ArgoObjectList on initialization rather than on __getattr__, otherwise modify a.b.c will not take effect

recover()
class dflow.argo_objects.ArgoObjectList(li)

Bases: UserList

recover()
class dflow.argo_objects.ArgoParameter(par)

Bases: ArgoObjectDict

class dflow.argo_objects.ArgoStep(step, workflow)

Bases: ArgoObjectDict

delete_pod()
get_duration() timedelta
get_pod()
get_script()
handle_big_parameters(io)
handle_io(io)
modify_output_artifact(name: str, s3: S3Artifact) None

Modify output artifact of an Argo step

Parameters:
  • name – artifact name

  • s3 – replace the artifact with a s3 object

modify_output_parameter(name: str, value: Any) None

Modify output parameter of an Argo step

Parameters:
  • name – parameter name

  • value – new value

replay()
retry()
set_script(script)
class dflow.argo_objects.ArgoWorkflow(d)

Bases: ArgoObjectDict

get_duration() timedelta
get_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep]
dflow.argo_objects.fnva(data, hval_init, fnv_prime, fnv_size)

Alternative FNV hash algorithm used in FNV-1a.

dflow.argo_objects.get_duration(status) timedelta
dflow.argo_objects.get_hash(node_name)
dflow.argo_objects.get_pod_name(wf_name, node_name, template_name, node_id)
dflow.argo_objects.match(n, names)

dflow.code_gen module

class dflow.code_gen.CodeGenerator(graph)

Bases: object

generate()
get_kwargs(template, cls)
get_var_name(name)
render_dag(var_name, template)
render_python_op_template(var_name, template)
render_script_op_template(var_name, template)
render_steps(var_name, template)
dflow.code_gen.gen_code(graph)

dflow.common module

class dflow.common.CustomArtifact

Bases: ABC

abstract download(name: str, path: str)
static from_urn(urn)
abstract get_urn() str
redirect = None
render(template, name: str)
class dflow.common.CustomHandler(context)

Bases: BaseHandler

flatten(obj, data)

Flatten obj into a json-friendly form and write result to data.

Parameters:
  • obj (object) – The object to be serialized.

  • data (dict) – A partially filled dictionary which will contain the json-friendly representation of obj once this method has finished.

restore(obj)

Restore an object of the registered type from the json-friendly representation obj and return it.

class dflow.common.CustomPickler

Bases: object

dumps(obj, **kwargs)
loads(s, **kwargs)
class dflow.common.HTTPArtifact(url)

Bases: object

download(path='.')
class dflow.common.LineageClient

Bases: ABC

abstract get_artifact_metadata(urn: str) object
abstract register_artifact(namespace: str, name: str, uri: str, **kwargs) str
abstract register_task(task_name: str, input_urns: Dict[str, str | List[str]], output_uris: Dict[str, str], workflow_urn: str) Dict[str, str]
abstract register_workflow(workflow_name: str) str
class dflow.common.LocalArtifact(local_path)

Bases: object

sub_path(path: str) Any
class dflow.common.S3Artifact(key: str | None = None, path_list: str | list | None = None, urn: str = '', debug_s3: bool = False, *args, **kwargs)

Bases: object

S3 artifact

Parameters:

key – key of the s3 artifact

download(**kwargs)
evalable_repr(imports)
classmethod from_dict(d)
oss()
sub_path(path: str) Any
to_dict()
dflow.common.import_func(s)

dflow.config module

dflow.config.boolize(s)
dflow.config.nullable(s)
dflow.config.set_config(**kwargs) None

Set global configurations

Parameters:
  • host – host of Argo server

  • namespace – k8s namespace

  • token – token for authentication, necessary for reused workflows

  • k8s_config_file – location of kube config file if it is used for

  • authentication

  • k8s_api_server – address of Kubernetes API server, necessary for reused

  • workflows

  • private_key_host_path – path of private key on the Kubernetes nodes

  • save_path_as_parameter – save catalog of artifacts as parameters

  • catalog_dir_name – catalog directory name for artifacts

  • archive_mode – “tar” for archiving with tar, None for no archive

  • util_image – image for util step

  • util_image_pull_policy – image pull policy for util step

  • extender_image – image for dflow extender

  • extender_image_pull_policy – image pull policy for dflow extender

  • dispatcher_image – image for dpdispatcher

  • dispatcher_image_pull_policy – image pull policy for dpdispatcher

  • save_keys_in_global_outputs – save keys of steps in global outputs

  • mode – “default” for normal, “debug” for debugging locally

  • lineage – lineage client, None by default

  • http_headers – HTTP headers for requesting Argo server

  • workflow_annotations – default annotations for workflows

  • overwrite_reused_artifact – overwrite reused artifact

dflow.config.set_s3_config(**kwargs) None

Set S3 configurations

Parameters:
  • endpoint – endpoint for S3 storage

  • console – console address for S3 storage

  • access_key – access key for S3 storage

  • secret_key – secret key for S3 storage

  • secure – secure or not

  • bucket_name – name of S3 bucket

  • repo_key – key of artifact repository

  • repo – artifact repository, parsed from repo_key

  • repo_type – s3 or oss, parsed from repo_key

  • repo_prefix – prefix of artifact repository, parsed from repo_key

  • prefix – prefix of storage key

  • storage_client – client for plugin storage backend

  • extra_prefixes – extra prefixes ignored by auto-prefixing

dflow.config.split_headers(s)

dflow.context module

class dflow.context.Context

Bases: ABC

abstract render(template: OPTemplate) OPTemplate

render original template and return a new template

dflow.context_syntax module

class dflow.context_syntax.Range_Context

Bases: object

Local context for range.

get_current_range_param_name()
property in_context: bool

whether it is in context environment or not.

to_in_context()

Switch to be in context environment.

to_out_context()

Switch to be not in context environment.

dflow.dag module

class dflow.dag.DAG(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, tasks: List[Task] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)

Bases: OPTemplate

Parameters:
  • name – the name of the dag

  • inputs – inputs in the template

  • outputs – outputs in the template

  • tasks – a list of tasks

  • memoize_key – memoized key of the dag

  • annotations – annotations for the OP template

  • parallelism – maximum number of running pods for the OP template

add(task: Task | List[Task]) None

Add a task or a list of tasks to the dag

Parameters:

task – a task or a list of tasks to be added to the dag

add_slices(slices, layer=0)
convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)
convert_to_graph()
copy()
classmethod from_dict(d, templates)
classmethod from_graph(graph, templates)
resolve(pool, futures)
run(workflow_id=None, context=None)

dflow.executor module

class dflow.executor.ContainerExecutor(docker: str | None = None, singularity: str | None = None, podman: str | None = None, image_pull_policy: str | None = None)

Bases: Executor

render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

class dflow.executor.Executor

Bases: ABC

abstract render(template: OPTemplate) OPTemplate

render original template and return a new template, do not modify self in this method to make the executor reusable

class dflow.executor.RemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1)

Bases: Executor

download(src, dst)
execute(cmd)
get_script(template)
mkdir_and_download(path)
render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

run(image, remote_command)
upload(src, dst)
upload_if_exists(path)
dflow.executor.render_script_with_tmp_root(template, tmp_root)
dflow.executor.run_script(image, cmd, docker=None, singularity=None, podman=None, image_pull_policy=None, host_mounts=None, cpu=None, memory=None, args='', envs=None)

dflow.io module

class dflow.io.ArgoVar(expr=None, is_str=True)

Bases: object

is_str = True
class dflow.io.AutonamedDict(*args, **kwargs)

Bases: UserDict

convert_to_graph()
set_step(step)
set_template(template)
class dflow.io.Expression(expr)

Bases: object

eval(scope)
class dflow.io.IfExpression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar)

Bases: ArgoVar, Expression

class dflow.io.InputArtifact(path: str | None = None, name: str | None = None, step=None, template=None, optional: bool = False, type: Any | None = None, source: str | InputArtifact | OutputArtifact | S3Artifact | None = None, mode: int | None = None, sub_path: str | None = None, archive: str = 'default', save_as_parameter: bool = False, **kwargs)

Bases: ArgoVar

Input artifact for OP template

Parameters:
  • path – path where the input artifact is placed in the container

  • name – name of the input artifact

  • optional – optional artifact or not

  • type – artifact type

  • source – default source

  • archive – regarded as archived file or not

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
get_path_list_parameter()
get_urn_parameter()
sub_path(path)
class dflow.io.InputArtifacts(*args, **kwargs)

Bases: AutonamedDict

set_template(template)
class dflow.io.InputParameter(name: str | None = None, step=None, template=None, type: Any | None = None, save_as_artifact: bool = False, path: str | None = None, source: S3Artifact | InputArtifact | OutputArtifact | None = None, **kwargs)

Bases: ArgoVar

Input parameter for OP template

Parameters:
  • name – name of the input parameter

  • type – parameter type

  • value – default value

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
class dflow.io.InputParameters(*args, **kwargs)

Bases: AutonamedDict

class dflow.io.Inputs(parameters: Dict[str, InputParameter] | None = None, artifacts: Dict[str, InputArtifact] | None = None, step=None, template=None)

Bases: object

Inputs for OP template

Parameters:
  • parameters – input parameters

  • artifacts – input artifacts

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
set_step(step)
set_template(template)
class dflow.io.ObjectDict(dict=None, /, **kwargs)

Bases: UserDict

class dflow.io.OutputArtifact(path: str | None = None, _from: InputArtifact | OutputArtifact | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, save: List[PVC | S3Artifact] | None = None, archive: str = 'default', global_name: str | None = None, from_expression: IfExpression | str | None = None, optional: bool = False)

Bases: ArgoVar

Output artifact for OP template

Parameters:
  • path – path of the output artifact in the container

  • _from – the artifact is from another artifact

  • name – name of the output artifact

  • type – artifact type

  • save – place to store the output artifact instead of default storage, can be a list

  • archive – compress format of the artifact, None for no compression

  • global_name – global name of the artifact within the workflow

  • from_expression – the artifact is from an expression

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
get_path_list_parameter()
get_urn_parameter()
handle_path_list()
handle_urn()
pvc(size='1Gi', storage_class=None, access_modes=None)
sub_path(path)
class dflow.io.OutputArtifacts(*args, **kwargs)

Bases: AutonamedDict

set_template(template)
class dflow.io.OutputParameter(value_from_path: str | None = None, value_from_parameter: InputParameter | OutputParameter | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, global_name: str | None = None, value_from_expression: IfExpression | str | None = None, save_as_artifact: bool = False, save_both: bool = False, **kwargs)

Bases: ArgoVar

Output parameter for OP template

Parameters:
  • value_from_path – the value is read from file generated in the container

  • value_from_parameter – the value is from another parameter

  • name – name of the output parameter

  • type – parameter type

  • default – default value

  • global_name – global name of the parameter within the workflow

  • value_from_expression – the value is from an expression

  • value – specify value directly

convert_to_argo()
convert_to_argo_artifact()
convert_to_argo_parameter()
convert_to_graph()
expr_as_artifact()
expr_as_parameter()
classmethod from_dict(d)
classmethod from_graph(graph)
repr_as_artifact()
repr_as_parameter()
class dflow.io.OutputParameters(*args, **kwargs)

Bases: AutonamedDict

class dflow.io.Outputs(parameters: Dict[str, OutputParameter] | None = None, artifacts: Dict[str, OutputArtifact] | None = None, step=None, template=None)

Bases: object

Outputs for OP template

Parameters:
  • paramters – output parameters

  • artifacts – output artifacts

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
set_step(step)
set_template(template)
class dflow.io.PVC(name: str, subpath: str, size: str = '1Gi', storage_class: str | None = None, access_modes: List[str] | None = None)

Bases: object

dflow.io.convert_value_to_str(value)
dflow.io.if_expression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar) IfExpression

Return an if expression in Argo

Parameters:
  • _if – a bool expression, which may be a comparison of two Argo parameters

  • _then – value returned if the condition is satisfied

  • _else – value returned if the condition is not satisfied

dflow.io.to_expr(var)
dflow.io.type_to_str(type)

dflow.main module

dflow.main.format_print_table(t: List[List[str]])
dflow.main.format_time_delta(td: timedelta) str
dflow.main.main()
dflow.main.main_parser()
dflow.main.parse_args(args: List[str] | None = None)

Commandline options argument parsing.

Parameters:

args (List[str]) – list of command line arguments, main purpose is testing default option None takes arguments from sys.argv

dflow.op_template module

class dflow.op_template.ContainerOPTemplate(command: str | List[str] = '', args: List[str] | None = None, **kwargs)

Bases: ScriptOPTemplate

classmethod from_dict(d)
class dflow.op_template.OPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None)

Bases: object

copy()
deepcopy()
classmethod from_dict(d)
handle_key(memoize_prefix=None, memoize_configmap='dflow')
class dflow.op_template.PythonScriptOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)

Bases: ScriptOPTemplate

Python script OP template

Parameters:
  • name – the name of the OP template

  • inputs – input parameters and input artifacts

  • outputs – output parameters and output artifacts

  • image – image the template uses

  • command – command to run the script

  • script – python script

  • volumes – volumes the template uses

  • mounts – volumes the template mounts

  • init_progress – a str representing the initial progress

  • timeout – timeout of the OP template

  • retry_strategy – retry strategy of the OP template

  • memoize_key – memoized key of the OP template

  • pvcs – PVCs need to be declared

  • image_pull_policy – Always, IfNotPresent, Never

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

  • init_containers – init containers before the template runs

  • sidecars – sidecar containers

class dflow.op_template.ScriptOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, resource: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str | Secret | object] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)

Bases: OPTemplate

Script OP template

Parameters:
  • name – the name of the OP template

  • inputs – input parameters and input artifacts

  • outputs – output parameters and output artifacts

  • image – image the template uses

  • command – command to run the script

  • script – script

  • volumes – volumes the template uses

  • mounts – volumes the template mounts

  • init_progress – a str representing the initial progress

  • timeout – timeout of the OP template

  • retry_strategy – retry strategy of the OP template

  • memoize_key – memoized key of the OP template

  • pvcs – PVCs need to be declared

  • image_pull_policy – Always, IfNotPresent, Never

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

convert_to_argo(memoize_prefix=None, memoize_configmap='dflow')
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
class dflow.op_template.Secret(value=None, name=None, key='secret')

Bases: object

class dflow.op_template.ShellOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)

Bases: ScriptOPTemplate

Shell script OP template

Parameters:
  • name – the name of the OP template

  • inputs – input parameters and input artifacts

  • outputs – output parameters and output artifacts

  • image – image the template uses

  • command – command to run the script

  • script – shell script

  • volumes – volumes the template uses

  • mounts – volumes the template mounts

  • init_progress – a str representing the initial progress

  • timeout – timeout of the OP template

  • retry_strategy – retry strategy of the OP template

  • memoize_key – memoized key of the OP template

  • pvcs – PVCs need to be declared

  • image_pull_policy – Always, IfNotPresent, Never

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

  • init_containers – init containers before the template runs

  • sidecars – sidecar containers

dflow.op_template.get_k8s_client(k8s_api_server=None, token=None, k8s_config_file=None)

dflow.resource module

class dflow.resource.Resource

Bases: ABC

Parameters:
  • action – action on the Kubernetes resource

  • success_condition – expression representing success

  • failure_condition – expression representing failure

action: str | None = None
failure_condition: str | None = None
abstract get_manifest(template: OPTemplate) OPTemplate

The method to get the manifest (str)

success_condition: str | None = None

dflow.slurm module

class dflow.slurm.SlurmJob(header='', node_selector=None, prepare=None, results=None, map_tmp_dir=True, workdir='.', remote_command=None, docker_executable=None, singularity_executable=None, podman_executable=None)

Bases: Resource

get_manifest(template)

The method to get the manifest (str)

class dflow.slurm.SlurmJobTemplate(header: str = '', node_selector: Dict[str, str] | None = None, prepare_image: str | None = None, prepare_image_pull_policy: str | None = None, collect_image: str | None = None, collect_image_pull_policy: str | None = None, workdir: str = 'dflow/workflows/{{workflow.name}}/{{pod.name}}', remote_command: str | List[str] | None = None, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None)

Bases: Executor

Slurm job template

Parameters:
  • header – header for Slurm job

  • node_selector – node selector

  • prepare_image – image for preparing data

  • prepare_image_pull_policy – image pull policy for preparing data

  • collect_image – image for collecting results

  • collect_image_pull_policy – image pull policy for collecting results

  • workdir – remote working directory

  • remote_command – command for running the script remotely

  • docker_executable – docker executable to run remotely

  • singularity_executable – singularity executable to run remotely

  • podman_executable – podman executable to run remotely

render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

class dflow.slurm.SlurmRemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1, header: str = '', interval: int = 3, pvc: PVC | None = None)

Bases: RemoteExecutor

Slurm remote executor

Parameters:
  • host – remote host

  • port – SSH port

  • username – username

  • password – password for SSH

  • private_key_file – private key file for SSH

  • workdir – remote working directory

  • command – command for the executor

  • remote_command – command for running the script remotely

  • image – image for the executor

  • image_pull_policy – image pull policy for the executor

  • map_tmp_dir – map /tmp to ./tmp

  • docker_executable – docker executable to run remotely

  • singularity_executable – singularity executable to run remotely

  • podman_executable – podman executable to run remotely

  • action_retries – retries for actions (upload, execute commands, download), -1 for infinity

  • header – header for Slurm job

  • interval – query interval for Slurm

render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

run(image, remote_command)

dflow.step module

class dflow.step.ArgoConcat(param)

Bases: object

class dflow.step.ArgoEnumerate(**kwargs)

Bases: ArgoVar

class dflow.step.ArgoLen(param)

Bases: ArgoVar

class dflow.step.ArgoRange(end, start=0, step=1)

Bases: ArgoVar

class dflow.step.ArgoSequence(count, start, end, format)

Bases: object

convert_to_argo()
evalable_repr(imports)
classmethod from_dict(d)
to_dict()
class dflow.step.ArgoSum(param)

Bases: object

class dflow.step.Step(name: str, template: OPTemplate, parameters: Dict[str, Any] | None = None, artifacts: Dict[str, S3Artifact | InputArtifact | OutputArtifact | None] | None = None, when: str | None = None, with_param: str | list | InputParameter | OutputParameter | None = None, continue_on_failed: bool = False, continue_on_error: bool = False, continue_on_num_success: int | None = None, continue_on_success_ratio: float | None = None, with_sequence: object | None = None, key: str | None = None, executor: Executor | None = None, use_resource: Resource | None = None, util_image: str | None = None, util_image_pull_policy: str | None = None, util_command: str | List[str] | None = None, parallelism: int | None = None, slices: Slices | None = None)

Bases: object

Parameters:
  • name – the name of the step

  • template – OP template the step uses

  • parameters – input parameters passed to the step as arguments

  • artifacts – input artifacts passed to the step as arguments

  • when – conditional step if the condition is satisfied

  • with_param – generate parallel steps with respect to a list as a parameter

  • continue_on_failed – continue if the step fails

  • continue_on_error – continue if the step meets error

  • continue_on_num_success – continue if the success number of the generated parallel steps greater than certain number

  • continue_on_success_ratio – continue if the success ratio of the generated parallel steps greater than certain number

  • with_sequence – generate parallel steps with respect to a sequence

  • key – the key of the step

  • executor – define the executor to execute the script

  • use_resource – use k8s resource

  • util_image – image for utility step

  • util_image_pull_policy – image pull policy for utility step

  • util_command – command for utility step

  • parallelism – parallelism for sliced step

  • slices – override slices of OP template

convert_to_argo(context=None)
convert_to_graph()
exec(scope, parameters, item=None, context=None)
exec_pod(scope, parameters, item=None)

directory structure: step-xxxxx |- inputs

|- parameters |- artifacts

|- outputs

|- parameters |- artifacts

|- script |- workdir

exec_steps(scope, parameters, item=None, context=None)
exec_with_config(scope, parameters, item, conf, s3_conf, cwd, context=None)
classmethod from_dict(d, templates)
classmethod from_graph(graph, templates)
handle_sub_path_slices_of_artifact_list(slices, artifacts)
load_output_artifacts(stepdir, artifacts)
load_output_parameters(stepdir, parameters)
prepare_argo_arguments(context=None)
record_input_artifacts(stepdir, artifacts, item, scope, ignore_nonexist=False)
record_input_parameters(stepdir, parameters)
record_output_artifacts(stepdir, artifacts)
record_output_parameters(stepdir, parameters)
render_by_executor(context=None)
run(scope, context=None)
run_with_config(scope, context, conf, s3_conf, cwd)
set_artifacts(artifacts)
set_parameters(parameters)
dflow.step.add_slices(templ: OPTemplate, slices: Slices, layer=0)
dflow.step.argo_concat(param: ArgoVar) ArgoConcat

Return the concatenation of a list of lists which is an Argo parameter

Parameters:

param – the Argo parameter which is a list of lists

dflow.step.argo_enumerate(*args, **kwargs) ArgoVar

Return the enumeration of a list which is an Argo parameter

Parameters:

param – the Argo parameter which is a list

dflow.step.argo_len(param: ArgoVar | S3Artifact) ArgoVar

Return the length of a list which is an Argo parameter

Parameters:

param – the Argo parameter which is a list

dflow.step.argo_range(*args) ArgoVar

Return a str representing a range of integer in Argo It receives 1-3 arguments, which is similar to the function range in

Python

Each argument can be Argo parameter

dflow.step.argo_sequence(count: int | ArgoVar | None = None, start: int | ArgoVar | None = None, end: int | ArgoVar | None = None, format: str | None = None) object

Return a numeric sequence in Argo

Parameters:
  • count – number of elements in the sequence (default: 0), not to be used with end, can be an Argo parameter

  • start – number at which to start the sequence (default: 0), can be an Argo parameter

  • end – number at which to end the sequence (default: 0), not to be used with count, can be an Argo parameter

  • format – a printf format string to format the value in the sequence

dflow.step.argo_sum(param: ArgoVar) ArgoSum

Return the sum of a list of integers which is an Argo parameter

Parameters:

param – the Argo parameter which is a list of integers

dflow.step.backup(path)
dflow.step.download_artifact_debug(artifact, path)
dflow.step.download_with_lock(download, path)
dflow.step.eval_expr(expr)
dflow.step.get_var(expr, scope)
dflow.step.render_expr(expr, scope)
dflow.step.render_item(expr, item)
dflow.step.render_script(script, parameters, workflow_id=None, step_id=None)
dflow.step.replace_argo_func(expr)

dflow.steps module

class dflow.steps.Steps(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, steps: List[Step | List[Step]] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)

Bases: OPTemplate

Parameters:
  • name – the name of the steps

  • inputs – inputs in the template

  • outputs – outputs in the template

  • steps – a sequential list of steps

  • memoize_key – memoized key of the steps

  • annotations – annotations for the OP template

  • parallelism – maximum number of running pods for the OP template

add(step: Step | List[Step]) None

Add a step or a list of parallel steps to the steps

Parameters:

step – a step or a list of parallel steps to be added to the entrypoint of the workflow

add_slices(slices, layer=0)
convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)
convert_to_graph()
copy()
classmethod from_dict(d, templates)
classmethod from_graph(graph, templates)
run(workflow_id=None, context=None)

dflow.task module

class dflow.task.Task(name: str, template: OPTemplate, dependencies: List[Task | str] | None = None, **kwargs)

Bases: Step

Parameters:
  • name – the name of the task

  • template – OP template the task uses

  • parameters – input parameters passed to the task as arguments

  • artifacts – input artifacts passed to the task as arguments

  • when – conditional task if the condition is satisfied

  • with_param – generate parallel tasks with respect to a list as a parameter

  • continue_on_failed – continue if the task fails

  • continue_on_num_success – continue if the success number of the generated parallel tasks greater than certain number

  • continue_on_success_ratio – continue if the success ratio of the generated parallel tasks greater than certain number

  • with_sequence – generate parallel tasks with respect to a sequence

  • key – the key of the task

  • executor – define the executor to execute the script

  • use_resource – use k8s resource

  • util_image – image for utility step

  • util_image_pull_policy – image pull policy for utility step

  • util_command – command for utility step

  • dependencies – extra dependencies of the task

convert_to_argo(context=None)
convert_to_graph()
classmethod from_dict(d, templates)
set_artifacts(artifacts)
set_parameters(parameters)

dflow.util_ops module

class dflow.util_ops.CheckNumSuccess(name='check-num-success', image=None, image_pull_policy=None)

Bases: ShellOPTemplate

class dflow.util_ops.CheckSuccessRatio(name='check-success-ratio', image=None, image_pull_policy=None)

Bases: ShellOPTemplate

class dflow.util_ops.InitArtifactForSlices(template, image, command, image_pull_policy, key, sliced_output_artifact=None, sliced_input_artifact=None, sum_var=None, concat_var=None, auto_loop_artifacts=None, group_size=None, format='%d', post_script='', tmp_root='/tmp')

Bases: PythonScriptOPTemplate

render_script()

dflow.utils module

class dflow.utils.ArtifactDict

Bases: dict

class dflow.utils.ArtifactList(iterable=(), /)

Bases: list

class dflow.utils.ArtifactPosixPath(*args, **kwargs)

Bases: PosixPath

class dflow.utils.ArtifactSet

Bases: set

class dflow.utils.ArtifactStr

Bases: str

class dflow.utils.ArtifactWindowsPath(*args, **kwargs)

Bases: WindowsPath

class dflow.utils.MinioClient(endpoint: str | None = None, access_key: str | None = None, secret_key: str | None = None, secure: bool | None = None, bucket_name: str | None = None, **kwargs)

Bases: StorageClient

copy(src: str, dst: str) None
download(key: str, path: str) None
get_md5(key: str) str
list(prefix: str, recursive: bool = False) List[str]
upload(key: str, path: str) None
class dflow.utils.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

Bases: ProcessPoolExecutor

class dflow.utils.StorageClient

Bases: ABC

abstract copy(src: str, dst: str) None
abstract download(key: str, path: str) None
abstract get_md5(key: str) str
abstract list(prefix: str, recursive: bool = False) List[str]
abstract upload(key: str, path: str) None
class dflow.utils.Variable(expr)

Bases: object

evalable_repr(imports)
dflow.utils.append_item(catalog, item)
dflow.utils.assemble_path_object(art_path, remove=False)
dflow.utils.assemble_path_object_from_catalog(catalog, art_path=None)
dflow.utils.catalog_of_artifact(art, **kwargs) List[dict]
dflow.utils.catalog_of_local_artifact(art_path, remove=False)
dflow.utils.convert_dflow_list(dflow_list)
dflow.utils.copy_artifact(src, dst, sort=False) S3Artifact

Copy an artifact to another on server side

Parameters:
  • src – source artifact

  • dst – destination artifact

  • sort – append the path list of dst after that of src

dflow.utils.copy_file(src, dst, func=<function try_link>)
dflow.utils.copy_s3(src_key: str, dst_key: str, recursive: bool = True, ignore_catalog: bool = False, **kwargs) None
dflow.utils.dict2list(d: dict)
dflow.utils.download_artifact(artifact, extract: bool = True, sub_path: str | None = None, slice: int | None = None, path: PathLike = '.', remove_catalog: bool = True, **kwargs) List[str]

Download an artifact from Argo to local

Parameters:
  • artifact – artifact to be downloaded

  • extract – extract files if the artifact is compressed

  • sub_path – download a subdir of an artifact

  • slice – download a slice of an artifact

  • path – local path

  • endpoint – endpoint for Minio

  • access_key – access key for Minio

  • secret_key – secret key for Minio

  • secure – secure or not for Minio

  • bucket_name – bucket name for Minio

  • skip_exists – skip files with the same MD5

dflow.utils.download_s3(key: str, path: PathLike = '.', recursive: bool = True, skip_exists: bool = False, keep_dir: bool = False, **kwargs) str
dflow.utils.evalable_repr(obj, imports)
dflow.utils.expand(d: dict) list | dict
dflow.utils.find_subclass(pkg, cls)
dflow.utils.flatten(d: list | dict) dict
dflow.utils.force_move(src, dst)
dflow.utils.get_key(artifact, raise_error=True)
dflow.utils.get_md5(f)
dflow.utils.linktree(src, dst, func=<built-in function symlink>)
dflow.utils.merge_dir(src, dst, func=<function force_move>)
dflow.utils.path_list_of_artifact(art, **kwargs) List[str]
dflow.utils.path_object_of_artifact(art, **kwargs) list | dict
dflow.utils.randstr(length: int = 5) str
dflow.utils.remove_empty_dir_tag(path)
dflow.utils.run_command(cmd: List[str] | str, raise_error: bool = True, input: str | None = None, try_bash: bool = False, login: bool = True, interactive: bool = True, shell: bool = False, print_oe: bool = False, **kwargs) Tuple[int, str, str]

Run shell command in subprocess

Parameters:

cmd: list of str, or str

Command to execute

raise_error: bool

Wheter to raise an error if the command failed

input: str, optional

Input string for the command

try_bash: bool

Try to use bash if bash exists, otherwise use sh

login: bool

Login mode of bash when try_bash=True

interactive: bool

Alias of login

shell: bool

Use shell for subprocess.Popen

print_oe: bool

Print stdout and stderr at the same time

**kwargs:

Arguments in subprocess.Popen

Raises:

AssertionError:

Raises if the error failed to execute and raise_error set to True

Return:

return_code: int

The return code of the command

out: str

stdout content of the executed command

err: str

stderr content of the executed command

dflow.utils.set_directory(dirname: PathLike, mkdir: bool = False)

Set current workding directory within context

Parameters:
  • dirname (os.PathLike) – The directory path to change to

  • mkdir (bool) – Whether make directory if dirname does not exist

Yields:

path (Path) – The absolute path of the changed working directory

Examples

>>> with set_directory("some_path"):
...    do_something()
dflow.utils.set_key(artifact, key)
dflow.utils.subclass_or_none(m, cls)
dflow.utils.upload_artifact(path: PathLike | List[PathLike] | Set[PathLike] | Dict[str, PathLike] | list | dict, archive: str = 'default', namespace: str | None = None, dataset_name: str | None = None, **kwargs) S3Artifact

Upload an artifact from local to Argo

Parameters:
  • path – local path

  • archive – compress format of the artifact, None for no compression

  • endpoint – endpoint for Minio

  • access_key – access key for Minio

  • secret_key – secret key for Minio

  • secure – secure or not for Minio

  • bucket_name – bucket name for Minio

dflow.utils.upload_s3(path: ~os.PathLike, key: str | None = None, prefix: str | None = None, debug_func=<built-in function symlink>, **kwargs) str

dflow.workflow module

class dflow.workflow.DockerSecret(registry, username, password, name=None)

Bases: object

class dflow.workflow.Workflow(name: str = 'workflow', steps: Steps | None = None, dag: DAG | None = None, namespace: str | None = None, id: str | None = None, uid: str | None = None, host: str | None = None, token: str | None = None, k8s_config_file: PathLike | None = None, k8s_api_server: str | None = None, context: Context | Executor | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, parallelism: int | None = None, pod_gc_strategy: str | None = None, image_pull_secrets: str | DockerSecret | List[str | DockerSecret] | None = None, artifact_repo_key: str | None = None, parameters: Dict[str, Any] | None = None)

Bases: object

Parameters:
  • name – the name of the workflow

  • steps – steps used as the entrypoint of the workflow, if not provided, a empty steps will be used

  • dag – dag used as the entrypoint of the workflow

  • namespace – k8s namespace

  • id – workflow ID in Argo, you can provide it to track an existing workflow

  • host – URL of the Argo server, will override global config

  • token – request the Argo server with the token, will override global config

  • k8s_config_file – Kubernetes configuration file for accessing API server, will override global config

  • k8s_api_server – Url of kubernetes API server, will override global config

  • context – context for the workflow

  • annotations – annotations for the workflow

  • parallelism – maximum number of running pods for the workflow

  • pod_gc_stategy

    pod GC provides the ability to delete pods automatically without deleting the workflow, pod GC strategy must be one of the following:

    • OnPodCompletion - delete pods immediately when pod is completed

      (including errors/failures)

    • OnPodSuccess - delete pods immediately when pod is successful

    • OnWorkflowCompletion - delete pods when workflow is completed

    • OnWorkflowSuccess - delete pods when workflow is successful

  • image_pull_secrets – secrets for image registies

  • artifact_repo_key – use artifact repository reference by key

  • parameters – global input parameters

add(step: Step | List[Step] | Task | List[Task]) None

Add a step or a list of parallel steps to the workflow

Parameters:
  • step – a step or a list of parallel steps to be added to the

  • workflow (entrypoint of the)

convert_to_argo(reuse_step=None)
deduplicate_templates()
delete() None

Delete the workflow

classmethod from_dict(d)
classmethod from_graph(graph)
classmethod from_graph_json(j, **kwargs)
classmethod from_graph_yaml(y, **kwargs)
classmethod from_json(s)
classmethod from_yaml(s)
get_graph_templates(template, graph_templates=None)
get_k8s_core_v1_api()
handle_reused_artifact(step, name, art, group_key)
handle_reused_artifact_with_copy(step, name, art, group_key)
handle_reused_step(step, global_parameters, global_artifacts)
handle_template(template, memoize_prefix=None, memoize_configmap='dflow')
query(fields: List[str] | None = None, retry: int = 3) ArgoWorkflow

Query the workflow from Argo If fields is not provided, full information of all steps will be returned [O(n)]

Parameters:

fields – fields of the workflow to be returned

Returns:

an ArgoWorkflow object

query_global_outputs() ArgoWorkflow

Query the global outputs of the workflow from Argo The function is O(1)

Parameters:

key – filter by key of step

Returns:

a list of steps

query_keys_of_steps() List[str]

Query the keys of existing steps of the workflow from Argo This function will try to get keys from the global outputs, which is O(1). If failed, it will downgrade to query full steps

Returns:

a list of keys

query_status() str

Query the status of the workflow from Argo The function is O(1)

Returns:

Pending, Running, Succeeded, Failed, Error, etc

query_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep]

Query the existing steps of the workflow from Argo This function will query full steps from server [O(n)], then filter with conditions given in the arguments If you want to call this function multiple times successively, it is recommended to call query once and call get_step repeatedly, e.g. info = wf.query() step1 = info.get_step(key=”step1”) step2 = info.get_step(key=”step2”)

Parameters:
  • name – filter by name of step, support regex

  • key – filter by key of step

  • phase – filter by phase of step

  • id – filter by id of step

  • type – filter by type of step

Returns:

a list of steps

query_step_by_key(key: str | List[str], name: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep]

Query the existing steps of the workflow from Argo by key This function will try to get key-ID map from the global outputs, then query step by ID, which is O(m) where m is the number of the requested keys. If failed, it will downgrade to query full steps

Parameters:

key – filter by key of step

Returns:

a list of steps

resubmit() None

Resubmit the workflow

resume() None

Resume the workflow

retry() None

Retry the workflow

retry_steps(step_ids)
stop() None

Stop the workflow

submit(reuse_step: List[ArgoStep] | None = None) ArgoWorkflow

Submit the workflow

Parameters:

reuse_step – a list of steps to be reused in the workflow

suspend() None

Suspend the workflow

terminate() None

Terminate the workflow

to_dict()
to_graph()
to_graph_json(**kwargs)
to_graph_yaml(**kwargs)
to_json(**kwargs)
to_yaml(**kwargs)
wait(interval=1)
dflow.workflow.get_argo_api_client(host=None, token=None)
dflow.workflow.parse_repo(repo_key=None, namespace=None, **kwargs)
dflow.workflow.query_archived_workflows(labels: Dict[str, str] | None = None, id: str | None = None) List[ArgoWorkflow]
dflow.workflow.query_workflows(labels: Dict[str, str] | None = None, fields: List[str] | None = None) List[ArgoWorkflow]

Module contents

class dflow.ArgoStep(step, workflow)

Bases: ArgoObjectDict

delete_pod()
get_duration() timedelta
get_pod()
get_script()
handle_big_parameters(io)
handle_io(io)
modify_output_artifact(name: str, s3: S3Artifact) None

Modify output artifact of an Argo step

Parameters:
  • name – artifact name

  • s3 – replace the artifact with a s3 object

modify_output_parameter(name: str, value: Any) None

Modify output parameter of an Argo step

Parameters:
  • name – parameter name

  • value – new value

replay()
retry()
set_script(script)
class dflow.ArgoWorkflow(d)

Bases: ArgoObjectDict

get_duration() timedelta
get_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep]
class dflow.AutonamedDict(*args, **kwargs)

Bases: UserDict

convert_to_graph()
set_step(step)
set_template(template)
class dflow.ContainerExecutor(docker: str | None = None, singularity: str | None = None, podman: str | None = None, image_pull_policy: str | None = None)

Bases: Executor

render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

class dflow.Context

Bases: ABC

abstract render(template: OPTemplate) OPTemplate

render original template and return a new template

class dflow.CustomArtifact

Bases: ABC

abstract download(name: str, path: str)
static from_urn(urn)
abstract get_urn() str
redirect = None
render(template, name: str)
class dflow.DAG(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, tasks: List[Task] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)

Bases: OPTemplate

Parameters:
  • name – the name of the dag

  • inputs – inputs in the template

  • outputs – outputs in the template

  • tasks – a list of tasks

  • memoize_key – memoized key of the dag

  • annotations – annotations for the OP template

  • parallelism – maximum number of running pods for the OP template

add(task: Task | List[Task]) None

Add a task or a list of tasks to the dag

Parameters:

task – a task or a list of tasks to be added to the dag

add_slices(slices, layer=0)
convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)
convert_to_graph()
copy()
classmethod from_dict(d, templates)
classmethod from_graph(graph, templates)
resolve(pool, futures)
run(workflow_id=None, context=None)
class dflow.DockerSecret(registry, username, password, name=None)

Bases: object

class dflow.Executor

Bases: ABC

abstract render(template: OPTemplate) OPTemplate

render original template and return a new template, do not modify self in this method to make the executor reusable

class dflow.IfExpression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar)

Bases: ArgoVar, Expression

class dflow.InputArtifact(path: str | None = None, name: str | None = None, step=None, template=None, optional: bool = False, type: Any | None = None, source: str | InputArtifact | OutputArtifact | S3Artifact | None = None, mode: int | None = None, sub_path: str | None = None, archive: str = 'default', save_as_parameter: bool = False, **kwargs)

Bases: ArgoVar

Input artifact for OP template

Parameters:
  • path – path where the input artifact is placed in the container

  • name – name of the input artifact

  • optional – optional artifact or not

  • type – artifact type

  • source – default source

  • archive – regarded as archived file or not

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
get_path_list_parameter()
get_urn_parameter()
sub_path(path)
class dflow.InputParameter(name: str | None = None, step=None, template=None, type: Any | None = None, save_as_artifact: bool = False, path: str | None = None, source: S3Artifact | InputArtifact | OutputArtifact | None = None, **kwargs)

Bases: ArgoVar

Input parameter for OP template

Parameters:
  • name – name of the input parameter

  • type – parameter type

  • value – default value

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
class dflow.Inputs(parameters: Dict[str, InputParameter] | None = None, artifacts: Dict[str, InputArtifact] | None = None, step=None, template=None)

Bases: object

Inputs for OP template

Parameters:
  • parameters – input parameters

  • artifacts – input artifacts

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
set_step(step)
set_template(template)
class dflow.LineageClient

Bases: ABC

abstract get_artifact_metadata(urn: str) object
abstract register_artifact(namespace: str, name: str, uri: str, **kwargs) str
abstract register_task(task_name: str, input_urns: Dict[str, str | List[str]], output_uris: Dict[str, str], workflow_urn: str) Dict[str, str]
abstract register_workflow(workflow_name: str) str
class dflow.LocalArtifact(local_path)

Bases: object

sub_path(path: str) Any
class dflow.OPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None)

Bases: object

copy()
deepcopy()
classmethod from_dict(d)
handle_key(memoize_prefix=None, memoize_configmap='dflow')
class dflow.OutputArtifact(path: str | None = None, _from: InputArtifact | OutputArtifact | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, save: List[PVC | S3Artifact] | None = None, archive: str = 'default', global_name: str | None = None, from_expression: IfExpression | str | None = None, optional: bool = False)

Bases: ArgoVar

Output artifact for OP template

Parameters:
  • path – path of the output artifact in the container

  • _from – the artifact is from another artifact

  • name – name of the output artifact

  • type – artifact type

  • save – place to store the output artifact instead of default storage, can be a list

  • archive – compress format of the artifact, None for no compression

  • global_name – global name of the artifact within the workflow

  • from_expression – the artifact is from an expression

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
get_path_list_parameter()
get_urn_parameter()
handle_path_list()
handle_urn()
pvc(size='1Gi', storage_class=None, access_modes=None)
sub_path(path)
class dflow.OutputParameter(value_from_path: str | None = None, value_from_parameter: InputParameter | OutputParameter | None = None, name: str | None = None, step=None, template=None, type: Any | None = None, global_name: str | None = None, value_from_expression: IfExpression | str | None = None, save_as_artifact: bool = False, save_both: bool = False, **kwargs)

Bases: ArgoVar

Output parameter for OP template

Parameters:
  • value_from_path – the value is read from file generated in the container

  • value_from_parameter – the value is from another parameter

  • name – name of the output parameter

  • type – parameter type

  • default – default value

  • global_name – global name of the parameter within the workflow

  • value_from_expression – the value is from an expression

  • value – specify value directly

convert_to_argo()
convert_to_argo_artifact()
convert_to_argo_parameter()
convert_to_graph()
expr_as_artifact()
expr_as_parameter()
classmethod from_dict(d)
classmethod from_graph(graph)
repr_as_artifact()
repr_as_parameter()
class dflow.Outputs(parameters: Dict[str, OutputParameter] | None = None, artifacts: Dict[str, OutputArtifact] | None = None, step=None, template=None)

Bases: object

Outputs for OP template

Parameters:
  • paramters – output parameters

  • artifacts – output artifacts

convert_to_argo()
convert_to_graph()
classmethod from_dict(d)
classmethod from_graph(graph)
set_step(step)
set_template(template)
class dflow.PythonScriptOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)

Bases: ScriptOPTemplate

Python script OP template

Parameters:
  • name – the name of the OP template

  • inputs – input parameters and input artifacts

  • outputs – output parameters and output artifacts

  • image – image the template uses

  • command – command to run the script

  • script – python script

  • volumes – volumes the template uses

  • mounts – volumes the template mounts

  • init_progress – a str representing the initial progress

  • timeout – timeout of the OP template

  • retry_strategy – retry strategy of the OP template

  • memoize_key – memoized key of the OP template

  • pvcs – PVCs need to be declared

  • image_pull_policy – Always, IfNotPresent, Never

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

  • init_containers – init containers before the template runs

  • sidecars – sidecar containers

class dflow.RemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1)

Bases: Executor

download(src, dst)
execute(cmd)
get_script(template)
mkdir_and_download(path)
render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

run(image, remote_command)
upload(src, dst)
upload_if_exists(path)
class dflow.Resource

Bases: ABC

Parameters:
  • action – action on the Kubernetes resource

  • success_condition – expression representing success

  • failure_condition – expression representing failure

action: str | None = None
failure_condition: str | None = None
abstract get_manifest(template: OPTemplate) OPTemplate

The method to get the manifest (str)

success_condition: str | None = None
class dflow.S3Artifact(key: str | None = None, path_list: str | list | None = None, urn: str = '', debug_s3: bool = False, *args, **kwargs)

Bases: object

S3 artifact

Parameters:

key – key of the s3 artifact

download(**kwargs)
evalable_repr(imports)
classmethod from_dict(d)
oss()
sub_path(path: str) Any
to_dict()
class dflow.Secret(value=None, name=None, key='secret')

Bases: object

class dflow.ShellOPTemplate(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, memoize_key: str | None = None, pvcs: List[PVC] | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, node_selector: Dict[str, str] | None = None, tolerations: List[object] | None = None, affinity: object | None = None, image: str | None = None, command: str | List[str] | None = None, script: str | None = None, volumes: List[object] | None = None, mounts: List[object] | None = None, init_progress: str = '0/1', timeout: str | None = None, retry_strategy: object | None = None, image_pull_policy: str | None = None, requests: Dict[str, str] | None = None, limits: Dict[str, str] | None = None, envs: Dict[str, str] | None = None, init_containers: List[object] | None = None, sidecars: List[object] | None = None, script_rendered: bool = False)

Bases: ScriptOPTemplate

Shell script OP template

Parameters:
  • name – the name of the OP template

  • inputs – input parameters and input artifacts

  • outputs – output parameters and output artifacts

  • image – image the template uses

  • command – command to run the script

  • script – shell script

  • volumes – volumes the template uses

  • mounts – volumes the template mounts

  • init_progress – a str representing the initial progress

  • timeout – timeout of the OP template

  • retry_strategy – retry strategy of the OP template

  • memoize_key – memoized key of the OP template

  • pvcs – PVCs need to be declared

  • image_pull_policy – Always, IfNotPresent, Never

  • annotations – annotations for the OP template

  • labels – labels for the OP template

  • node_selector – node selector when scheduling the pod

  • tolerations – tolerations of taints when scheduling the pod

  • affinity – affinity when scheduling the pod

  • requests – a dict of resource requests

  • limits – a dict of resource limits

  • envs – environment variables

  • init_containers – init containers before the template runs

  • sidecars – sidecar containers

class dflow.SlurmJob(header='', node_selector=None, prepare=None, results=None, map_tmp_dir=True, workdir='.', remote_command=None, docker_executable=None, singularity_executable=None, podman_executable=None)

Bases: Resource

get_manifest(template)

The method to get the manifest (str)

class dflow.SlurmJobTemplate(header: str = '', node_selector: Dict[str, str] | None = None, prepare_image: str | None = None, prepare_image_pull_policy: str | None = None, collect_image: str | None = None, collect_image_pull_policy: str | None = None, workdir: str = 'dflow/workflows/{{workflow.name}}/{{pod.name}}', remote_command: str | List[str] | None = None, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None)

Bases: Executor

Slurm job template

Parameters:
  • header – header for Slurm job

  • node_selector – node selector

  • prepare_image – image for preparing data

  • prepare_image_pull_policy – image pull policy for preparing data

  • collect_image – image for collecting results

  • collect_image_pull_policy – image pull policy for collecting results

  • workdir – remote working directory

  • remote_command – command for running the script remotely

  • docker_executable – docker executable to run remotely

  • singularity_executable – singularity executable to run remotely

  • podman_executable – podman executable to run remotely

render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

class dflow.SlurmRemoteExecutor(host: str, port: int = 22, username: str = 'root', password: str | None = None, private_key_file: PathLike | None = None, workdir: str = '~/dflow/workflows/{{workflow.name}}/{{pod.name}}', command: str | List[str] | None = None, remote_command: str | List[str] | None = None, image: str | None = None, image_pull_policy: str | None = None, map_tmp_dir: bool = True, docker_executable: str | None = None, singularity_executable: str | None = None, podman_executable: str | None = None, action_retries: int = -1, header: str = '', interval: int = 3, pvc: PVC | None = None)

Bases: RemoteExecutor

Slurm remote executor

Parameters:
  • host – remote host

  • port – SSH port

  • username – username

  • password – password for SSH

  • private_key_file – private key file for SSH

  • workdir – remote working directory

  • command – command for the executor

  • remote_command – command for running the script remotely

  • image – image for the executor

  • image_pull_policy – image pull policy for the executor

  • map_tmp_dir – map /tmp to ./tmp

  • docker_executable – docker executable to run remotely

  • singularity_executable – singularity executable to run remotely

  • podman_executable – podman executable to run remotely

  • action_retries – retries for actions (upload, execute commands, download), -1 for infinity

  • header – header for Slurm job

  • interval – query interval for Slurm

render(template)

render original template and return a new template, do not modify self in this method to make the executor reusable

run(image, remote_command)
class dflow.Step(name: str, template: OPTemplate, parameters: Dict[str, Any] | None = None, artifacts: Dict[str, S3Artifact | InputArtifact | OutputArtifact | None] | None = None, when: str | None = None, with_param: str | list | InputParameter | OutputParameter | None = None, continue_on_failed: bool = False, continue_on_error: bool = False, continue_on_num_success: int | None = None, continue_on_success_ratio: float | None = None, with_sequence: object | None = None, key: str | None = None, executor: Executor | None = None, use_resource: Resource | None = None, util_image: str | None = None, util_image_pull_policy: str | None = None, util_command: str | List[str] | None = None, parallelism: int | None = None, slices: Slices | None = None)

Bases: object

Parameters:
  • name – the name of the step

  • template – OP template the step uses

  • parameters – input parameters passed to the step as arguments

  • artifacts – input artifacts passed to the step as arguments

  • when – conditional step if the condition is satisfied

  • with_param – generate parallel steps with respect to a list as a parameter

  • continue_on_failed – continue if the step fails

  • continue_on_error – continue if the step meets error

  • continue_on_num_success – continue if the success number of the generated parallel steps greater than certain number

  • continue_on_success_ratio – continue if the success ratio of the generated parallel steps greater than certain number

  • with_sequence – generate parallel steps with respect to a sequence

  • key – the key of the step

  • executor – define the executor to execute the script

  • use_resource – use k8s resource

  • util_image – image for utility step

  • util_image_pull_policy – image pull policy for utility step

  • util_command – command for utility step

  • parallelism – parallelism for sliced step

  • slices – override slices of OP template

convert_to_argo(context=None)
convert_to_graph()
exec(scope, parameters, item=None, context=None)
exec_pod(scope, parameters, item=None)

directory structure: step-xxxxx |- inputs

|- parameters |- artifacts

|- outputs

|- parameters |- artifacts

|- script |- workdir

exec_steps(scope, parameters, item=None, context=None)
exec_with_config(scope, parameters, item, conf, s3_conf, cwd, context=None)
classmethod from_dict(d, templates)
classmethod from_graph(graph, templates)
handle_sub_path_slices_of_artifact_list(slices, artifacts)
load_output_artifacts(stepdir, artifacts)
load_output_parameters(stepdir, parameters)
prepare_argo_arguments(context=None)
record_input_artifacts(stepdir, artifacts, item, scope, ignore_nonexist=False)
record_input_parameters(stepdir, parameters)
record_output_artifacts(stepdir, artifacts)
record_output_parameters(stepdir, parameters)
render_by_executor(context=None)
run(scope, context=None)
run_with_config(scope, context, conf, s3_conf, cwd)
set_artifacts(artifacts)
set_parameters(parameters)
class dflow.Steps(name: str | None = None, inputs: Inputs | None = None, outputs: Outputs | None = None, steps: List[Step | List[Step]] | None = None, memoize_key: str | None = None, annotations: Dict[str, str] | None = None, parallelism: int | None = None)

Bases: OPTemplate

Parameters:
  • name – the name of the steps

  • inputs – inputs in the template

  • outputs – outputs in the template

  • steps – a sequential list of steps

  • memoize_key – memoized key of the steps

  • annotations – annotations for the OP template

  • parallelism – maximum number of running pods for the OP template

add(step: Step | List[Step]) None

Add a step or a list of parallel steps to the steps

Parameters:

step – a step or a list of parallel steps to be added to the entrypoint of the workflow

add_slices(slices, layer=0)
convert_to_argo(memoize_prefix=None, memoize_configmap='dflow', context=None)
convert_to_graph()
copy()
classmethod from_dict(d, templates)
classmethod from_graph(graph, templates)
run(workflow_id=None, context=None)
class dflow.Task(name: str, template: OPTemplate, dependencies: List[Task | str] | None = None, **kwargs)

Bases: Step

Parameters:
  • name – the name of the task

  • template – OP template the task uses

  • parameters – input parameters passed to the task as arguments

  • artifacts – input artifacts passed to the task as arguments

  • when – conditional task if the condition is satisfied

  • with_param – generate parallel tasks with respect to a list as a parameter

  • continue_on_failed – continue if the task fails

  • continue_on_num_success – continue if the success number of the generated parallel tasks greater than certain number

  • continue_on_success_ratio – continue if the success ratio of the generated parallel tasks greater than certain number

  • with_sequence – generate parallel tasks with respect to a sequence

  • key – the key of the task

  • executor – define the executor to execute the script

  • use_resource – use k8s resource

  • util_image – image for utility step

  • util_image_pull_policy – image pull policy for utility step

  • util_command – command for utility step

  • dependencies – extra dependencies of the task

convert_to_argo(context=None)
convert_to_graph()
classmethod from_dict(d, templates)
set_artifacts(artifacts)
set_parameters(parameters)
class dflow.Workflow(name: str = 'workflow', steps: Steps | None = None, dag: DAG | None = None, namespace: str | None = None, id: str | None = None, uid: str | None = None, host: str | None = None, token: str | None = None, k8s_config_file: PathLike | None = None, k8s_api_server: str | None = None, context: Context | Executor | None = None, annotations: Dict[str, str] | None = None, labels: Dict[str, str] | None = None, parallelism: int | None = None, pod_gc_strategy: str | None = None, image_pull_secrets: str | DockerSecret | List[str | DockerSecret] | None = None, artifact_repo_key: str | None = None, parameters: Dict[str, Any] | None = None)

Bases: object

Parameters:
  • name – the name of the workflow

  • steps – steps used as the entrypoint of the workflow, if not provided, a empty steps will be used

  • dag – dag used as the entrypoint of the workflow

  • namespace – k8s namespace

  • id – workflow ID in Argo, you can provide it to track an existing workflow

  • host – URL of the Argo server, will override global config

  • token – request the Argo server with the token, will override global config

  • k8s_config_file – Kubernetes configuration file for accessing API server, will override global config

  • k8s_api_server – Url of kubernetes API server, will override global config

  • context – context for the workflow

  • annotations – annotations for the workflow

  • parallelism – maximum number of running pods for the workflow

  • pod_gc_stategy

    pod GC provides the ability to delete pods automatically without deleting the workflow, pod GC strategy must be one of the following:

    • OnPodCompletion - delete pods immediately when pod is completed

      (including errors/failures)

    • OnPodSuccess - delete pods immediately when pod is successful

    • OnWorkflowCompletion - delete pods when workflow is completed

    • OnWorkflowSuccess - delete pods when workflow is successful

  • image_pull_secrets – secrets for image registies

  • artifact_repo_key – use artifact repository reference by key

  • parameters – global input parameters

add(step: Step | List[Step] | Task | List[Task]) None

Add a step or a list of parallel steps to the workflow

Parameters:
  • step – a step or a list of parallel steps to be added to the

  • workflow (entrypoint of the)

convert_to_argo(reuse_step=None)
deduplicate_templates()
delete() None

Delete the workflow

classmethod from_dict(d)
classmethod from_graph(graph)
classmethod from_graph_json(j, **kwargs)
classmethod from_graph_yaml(y, **kwargs)
classmethod from_json(s)
classmethod from_yaml(s)
get_graph_templates(template, graph_templates=None)
get_k8s_core_v1_api()
handle_reused_artifact(step, name, art, group_key)
handle_reused_artifact_with_copy(step, name, art, group_key)
handle_reused_step(step, global_parameters, global_artifacts)
handle_template(template, memoize_prefix=None, memoize_configmap='dflow')
query(fields: List[str] | None = None, retry: int = 3) ArgoWorkflow

Query the workflow from Argo If fields is not provided, full information of all steps will be returned [O(n)]

Parameters:

fields – fields of the workflow to be returned

Returns:

an ArgoWorkflow object

query_global_outputs() ArgoWorkflow

Query the global outputs of the workflow from Argo The function is O(1)

Parameters:

key – filter by key of step

Returns:

a list of steps

query_keys_of_steps() List[str]

Query the keys of existing steps of the workflow from Argo This function will try to get keys from the global outputs, which is O(1). If failed, it will downgrade to query full steps

Returns:

a list of keys

query_status() str

Query the status of the workflow from Argo The function is O(1)

Returns:

Pending, Running, Succeeded, Failed, Error, etc

query_step(name: str | List[str] | None = None, key: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep]

Query the existing steps of the workflow from Argo This function will query full steps from server [O(n)], then filter with conditions given in the arguments If you want to call this function multiple times successively, it is recommended to call query once and call get_step repeatedly, e.g. info = wf.query() step1 = info.get_step(key=”step1”) step2 = info.get_step(key=”step2”)

Parameters:
  • name – filter by name of step, support regex

  • key – filter by key of step

  • phase – filter by phase of step

  • id – filter by id of step

  • type – filter by type of step

Returns:

a list of steps

query_step_by_key(key: str | List[str], name: str | List[str] | None = None, phase: str | List[str] | None = None, id: str | List[str] | None = None, type: str | List[str] | None = None) List[ArgoStep]

Query the existing steps of the workflow from Argo by key This function will try to get key-ID map from the global outputs, then query step by ID, which is O(m) where m is the number of the requested keys. If failed, it will downgrade to query full steps

Parameters:

key – filter by key of step

Returns:

a list of steps

resubmit() None

Resubmit the workflow

resume() None

Resume the workflow

retry() None

Retry the workflow

retry_steps(step_ids)
stop() None

Stop the workflow

submit(reuse_step: List[ArgoStep] | None = None) ArgoWorkflow

Submit the workflow

Parameters:

reuse_step – a list of steps to be reused in the workflow

suspend() None

Suspend the workflow

terminate() None

Terminate the workflow

to_dict()
to_graph()
to_graph_json(**kwargs)
to_graph_yaml(**kwargs)
to_json(**kwargs)
to_yaml(**kwargs)
wait(interval=1)
dflow.argo_concat(param: ArgoVar) ArgoConcat

Return the concatenation of a list of lists which is an Argo parameter

Parameters:

param – the Argo parameter which is a list of lists

dflow.argo_enumerate(*args, **kwargs) ArgoVar

Return the enumeration of a list which is an Argo parameter

Parameters:

param – the Argo parameter which is a list

dflow.argo_len(param: ArgoVar | S3Artifact) ArgoVar

Return the length of a list which is an Argo parameter

Parameters:

param – the Argo parameter which is a list

dflow.argo_range(*args) ArgoVar

Return a str representing a range of integer in Argo It receives 1-3 arguments, which is similar to the function range in

Python

Each argument can be Argo parameter

dflow.argo_sequence(count: int | ArgoVar | None = None, start: int | ArgoVar | None = None, end: int | ArgoVar | None = None, format: str | None = None) object

Return a numeric sequence in Argo

Parameters:
  • count – number of elements in the sequence (default: 0), not to be used with end, can be an Argo parameter

  • start – number at which to start the sequence (default: 0), can be an Argo parameter

  • end – number at which to end the sequence (default: 0), not to be used with count, can be an Argo parameter

  • format – a printf format string to format the value in the sequence

dflow.argo_sum(param: ArgoVar) ArgoSum

Return the sum of a list of integers which is an Argo parameter

Parameters:

param – the Argo parameter which is a list of integers

dflow.copy_artifact(src, dst, sort=False) S3Artifact

Copy an artifact to another on server side

Parameters:
  • src – source artifact

  • dst – destination artifact

  • sort – append the path list of dst after that of src

dflow.copy_s3(src_key: str, dst_key: str, recursive: bool = True, ignore_catalog: bool = False, **kwargs) None
dflow.download_artifact(artifact, extract: bool = True, sub_path: str | None = None, slice: int | None = None, path: PathLike = '.', remove_catalog: bool = True, **kwargs) List[str]

Download an artifact from Argo to local

Parameters:
  • artifact – artifact to be downloaded

  • extract – extract files if the artifact is compressed

  • sub_path – download a subdir of an artifact

  • slice – download a slice of an artifact

  • path – local path

  • endpoint – endpoint for Minio

  • access_key – access key for Minio

  • secret_key – secret key for Minio

  • secure – secure or not for Minio

  • bucket_name – bucket name for Minio

  • skip_exists – skip files with the same MD5

dflow.download_s3(key: str, path: PathLike = '.', recursive: bool = True, skip_exists: bool = False, keep_dir: bool = False, **kwargs) str
dflow.gen_code(graph)
dflow.if_expression(_if: str | ArgoVar, _then: str | ArgoVar, _else: str | ArgoVar) IfExpression

Return an if expression in Argo

Parameters:
  • _if – a bool expression, which may be a comparison of two Argo parameters

  • _then – value returned if the condition is satisfied

  • _else – value returned if the condition is not satisfied

dflow.path_list_of_artifact(art, **kwargs) List[str]
dflow.path_object_of_artifact(art, **kwargs) list | dict
dflow.query_archived_workflows(labels: Dict[str, str] | None = None, id: str | None = None) List[ArgoWorkflow]
dflow.query_workflows(labels: Dict[str, str] | None = None, fields: List[str] | None = None) List[ArgoWorkflow]
dflow.randstr(length: int = 5) str
dflow.set_config(**kwargs) None

Set global configurations

Parameters:
  • host – host of Argo server

  • namespace – k8s namespace

  • token – token for authentication, necessary for reused workflows

  • k8s_config_file – location of kube config file if it is used for

  • authentication

  • k8s_api_server – address of Kubernetes API server, necessary for reused

  • workflows

  • private_key_host_path – path of private key on the Kubernetes nodes

  • save_path_as_parameter – save catalog of artifacts as parameters

  • catalog_dir_name – catalog directory name for artifacts

  • archive_mode – “tar” for archiving with tar, None for no archive

  • util_image – image for util step

  • util_image_pull_policy – image pull policy for util step

  • extender_image – image for dflow extender

  • extender_image_pull_policy – image pull policy for dflow extender

  • dispatcher_image – image for dpdispatcher

  • dispatcher_image_pull_policy – image pull policy for dpdispatcher

  • save_keys_in_global_outputs – save keys of steps in global outputs

  • mode – “default” for normal, “debug” for debugging locally

  • lineage – lineage client, None by default

  • http_headers – HTTP headers for requesting Argo server

  • workflow_annotations – default annotations for workflows

  • overwrite_reused_artifact – overwrite reused artifact

dflow.set_s3_config(**kwargs) None

Set S3 configurations

Parameters:
  • endpoint – endpoint for S3 storage

  • console – console address for S3 storage

  • access_key – access key for S3 storage

  • secret_key – secret key for S3 storage

  • secure – secure or not

  • bucket_name – name of S3 bucket

  • repo_key – key of artifact repository

  • repo – artifact repository, parsed from repo_key

  • repo_type – s3 or oss, parsed from repo_key

  • repo_prefix – prefix of artifact repository, parsed from repo_key

  • prefix – prefix of storage key

  • storage_client – client for plugin storage backend

  • extra_prefixes – extra prefixes ignored by auto-prefixing

dflow.upload_artifact(path: PathLike | List[PathLike] | Set[PathLike] | Dict[str, PathLike] | list | dict, archive: str = 'default', namespace: str | None = None, dataset_name: str | None = None, **kwargs) S3Artifact

Upload an artifact from local to Argo

Parameters:
  • path – local path

  • archive – compress format of the artifact, None for no compression

  • endpoint – endpoint for Minio

  • access_key – access key for Minio

  • secret_key – secret key for Minio

  • secure – secure or not for Minio

  • bucket_name – bucket name for Minio

dflow.upload_s3(path: ~os.PathLike, key: str | None = None, prefix: str | None = None, debug_func=<built-in function symlink>, **kwargs) str