Skip to content

actinia_parallel_plugin Internal logic

Carmen Tawalika edited this page Jan 18, 2022 · 13 revisions

Overview of Parallel Processing

Abbreviations:

  • PC: process chain (for actinia)

Existing Approaches

actinia-gdi, project specific

  • Parallelization via different actinia-core instances
  • Parallel steps are defined in a batch PC and send to actinia-gdi
  • actinia-gdi takes care of forwarding partial steps to actinia-core
  • actinia-gdi starts jobs in parallel or waits for partial results, as defined in the PC
  • Advantage: Resources of the workers don't matter, because jobs are enqueueud

Certain GRASS GIS Addons

  • Parallelization within GRASS GIS modules in one actinia instance
  • A complete PC is sent to actinia-core
  • If single modules support it, processing inside is run in parallel. Module is only finished when all parallel steps are done, then the PC proceeds with the next module

actinia-algebra

Overview samle process (global classification)

  1. tiling globally
  2. calculation parallel
  3. classification model global
  4. calculation parallel
  5. merge globally

Possible ways to parallelize

from the workflow point of view

  • many actinias
  • within GRASS GIS

from the processing point of view

  1. spatial (tiling)
  2. temporally
  3. Parallel calculation of independent jobs

Combined

  • A*. Spatial (tiling) parallel in actinia.
    • For some steps easily possible, e.g. NDVI, since simple calculation is performed.
    • For other steps, there may be hard boundaries. Possible solutions:
      • tiny overlap
      • Tiling with natural_tiles -> but could take too long worldwide
  • B*. Temporally parallel in GRASS GIS
    • possible for many t-modules without danger
  • A*. calculate independent jobs in parallel
  • B*. impossible for some steps, e.g. global model for classification

A*: actinia must know about it B*: actinia treats it as single job

Concept with actinia-gdi and SLURM in HPC

Components:

"from outside" -> actinia-gdi

  • actinia-gdi as working title
  • "from outside" a batch PC is formulated (or a PC template is used)
  • Geospatial data reprojection challenges?
    • Prototype: first only all in one location, input data will be reprojected during import

batch PC

describes,

  • what to calculate
  • which intermediate steps can run in parallel (independent jobs)
  • which intermediate steps have to be waited for
  • where to tile (spatially parallel)
  • and which steps can be calculated as parallel tiles
  • TODO: structure of batch PC:
    • extension of current actinia batch PC
      • enable cross-actinia processing
      • infinite recursion of parallelization potentially possible!?
      • intermediate steps - mapsets merge
      • possible draft see bottom of page
    • openEO process graph ?
      • no - this structure is too limited for all GRASS GIS functionality
        • it is very close related to raster data cubes while GRASS GIS has more different data types
        • it allows only one output per process, in the overall job and also within a process graph as input for another process. This is also too limited to fully support all GRASS GIS functionality
    • OGC API processes ?
      • TODO: gain knowledge

actinia-gdi

  • PC is sent to actinia-gdi
  • actinia-gdi
    • identifies the necessary single steps from batch PC
    • recognizes which single steps can run in parallel
    • monitors whether steps previously required for a single step have already been successful
    • recognizes a tiling step and executes the steps, which should run spatially parallel, in a parallel loop
      • mapset name must be unique (initially only persistent possible)
    • Merge always after tiling loop (Control in tiling output)
      • detects a mapset merge step and merges all mapsets, which were previously spatially tiled, when defined in outputs.
        • Merge all new maps? -> Not in prototype as too many variations can occur
      • delete the merged mapsets
      • When inputs are multiple previous outputs, the tiling step must finish, merged and a new tiling process must be started (reuse_tiles=true)
  • Can this also work for ephemeral processing?
    • actinia-gdi would have to calculate everything persistently, export anyway and delete the rest.
    • Also in the persistent case the intermediate results have to be deleted
    • Prototype: first only for persistent processing

actinia-gdi -> SLURM

  • actinia-gdi (e.g. in openshift) communicates via REST with SLURM https://slurm.schedmd.com/rest.html
  • intermediate steps are sent as single job to SLURM
    • sending parallel jobs at the same time
    • waiting for partial results for dependent jobs

SLURM -> compute actinia

  • SLURM communicates via shell script with compute nodes
  • we assume that necessary input/output data/folders are mounted in all nodes

Possibility 1:

  • Via script start container with service (actinia) and send PC via HTTP
  • In script not allowed to start service and then end script (SLURM would lose overview of running processes)
  • Polling in script would be necessary
  • After ending the processing the script would have to shut down the container again
  • => Service feels wrong in this environment

Possibility 2:

  • Via script start container with GRASS GIS and execute command
  • command needs to include a GRASS GIS script or actinia process chain
  • GRASS GIS script (compared to single GRASS GIS commands) is necessary, because also several commands can be a logical single step
  • actinia container is needed instead of plain GRASS GIS container
    • TODO: which / how much functionality of actinia is needed here?
    • User permissions
    • Resource management
    • redis job status
    • ...
  • This command should allow to communicate with actinia the same way as an HTTP request would do

compute actinia -> actinia-gdi

  • actinia-gdi can be informed about the status of the job via webhook

{
  "processing_host": "http://actinia-core-docker:8088/",
  "processing_platform_name": "example_name",
  "jobs": [
    {
      "list": [
        {
          "module": "stac_importer",
          "inputs":[
            {"param": "param1", "value": "value1"}
          ]
        }
      ],
      "parallel": "false",
      "version": "1"
    },
    {
      "list": [
        {
          "module": "actinia_tiling",
          "comment": "All jobs executed in parallel loop for each tile",
          "inputs":[
            // With this approach, also area size would be possible
            // {"param": "size", "value": "10000"},
            {"param": "num_tiles", "value": "10000"}
          ],
          "outputs":[
            {"param": "raster", "value": "ndvi,ndwi"}
          ],
          "jobs": [
            {
              "list": [
                {
                  "module": "g.region",
                  "inputs":[
                    {"param": "x", "value": "{{ tile_id }}"}
                  ]
                },
                {
                  "module": "r.mask",
                  "inputs":[
                    {"param": "x", "value": "y"}
                  ]
                }
              ],
              "parallel": "false"
            },
            {
              "list": [
                {
                  "module": "r.mapcalc",
                  "inputs":[
                    {"param": "x", "value": "ndvi"}
                  ]
                }
              ],
              "parallel": "true"
            },
            {
              "list": [
                {
                  "module": "r.mapcalc",
                  "inputs":[
                    {"param": "x", "value": "ndwi"}
                  ]
                }
              ],
              "parallel": "true"
            }
          ]
        }
      ],
      "parallel": "false",
      "version": "1"
    },
    {
        "list": [
          {
            "module": "actinia_tiling",
            "comment": "All jobs executed in parallel loop for each tile",
            "inputs":[
              // With this approach, also area size would be possible
              // {"param": "size", "value": "10000"},
              {"param": "num_tiles", "value": "10000"},
              // TODO: parameter or flag?
              {"param": "reuse_tiles", "value": "true"}
            ],
            "outputs":[
                {"param": "raster", "value": "agg1,agg2"}
              ],
            "jobs": [
              {
                "list": [
                  {
                    "module": "g.region",
                    "inputs":[
                      {"param": "x", "value": "{{ tile_id }}"}
                    ]
                  },
                  {
                    "module": "r.mask",
                    "inputs":[
                      {"param": "x", "value": "y"}
                    ]
                  }
                ],
                "parallel": "false"
              },
              {
                "list": [
                  {
                    "module": "t.aggregate",
                    "inputs":[
                      {"param": "x", "value": "red_nir,green_red"}
                    ]
                  }
                ],
                "parallel": "true"
              },
              {
                "list": [
                  {
                    "module": "t.aggregate",
                    "inputs":[
                      {"param": "x", "value": "blue,blue_nir"}
                    ]
                  }
                ],
                "parallel": "true"
              }
            ]
          }
        ],
        "parallel": "false",
        "version": "1"
      }
  ]
}