# Sections

Sprache

Teile dieser Dokumentation werden automatisch aus dem Sourcecode generiert und sind daher nur in englischer Sprache verfügbar.

Eine ETL Pipeline ist eine Verkettung mehrerer sog. "Sections". Dabei werden die einzelnen verarbeiteten Items von einer Section zur nächsten Section weitergegeben. Jede Section hat dabei eine Aufgabe und verarbeitet und verändert einzelne Items, bevor sie an die nächste Section weitergegeben werden. Dabei kann eine Section aus einem einzelnen Items mehrere machen aber auch Items ganz weglassen.

Die ETL Pipeline besteht aus drei Arten von Arbeiten:

  • "E" für "Extract": Daten werden von einer Quelle extrahiert und in die Pipeline gefüttert
  • "T" für "Transform": Daten werden nach gewissen Regeln vollautomatisch transformiert
  • "L" für "Load": Daten werden in den digitalen Lesesaal geladen / gespeichert resp. publiziert.

Um in einer Pipeline eine Section einzusetzen, wird ein JSON-Objekt definiert, das mindestens ein Key section hat, sowie weitere optionale Angaben.

  • section: Technischer Name der Section (wie z.B. AttributeMapper)
  • title (optional): Titel, der anstelle der section angezeigt wird und besser erklärt, was in diesem Schritt gemacht wird.
  • comment (optional): Beschreibung fürs Verständnis und die Dokumentation der Pipeline.
  • options (optional): Konfigurationsoptionen für die Section.

Beispiel:

{
  "section": "Map",
  "title": "Entstehungszeitraum",
  "comment": "Der Entstehungszeitraum wird anhand des Feldes 'Datum' generiert...",
  "options": []
}

# AttributeMapper

Expects an option "mapping" as a list of dicts. Each dict represents an attribute mapping and has to contain the keys "source" (the key of the source attribute) and "target" (the new key of the attribute).

Only attributes listed in the "mapping" option are kept, all others are discarded.

Example:

{
  "section": "AttributeMapper",
  "options": {
    "mapping": [
      {
        "source": "$TITLE",
        "target": "title"
      },
      {
        "source": "$ID",
        "target": "legacy_id"
      },
      {
        "source": "$data.signature",
        "target": "signature"
      },
      {
        "source": "$data.history",
        "target": "history",
        "default": ""
      },
      {
        "source": "$data.format",
        "target": "format",
        "omit_empty": true
      }
    ]
  }
}

When source does not exist on an item and no default is defined, the pipeline is aborted with an error. If a default value is defined, the target will be set to the default value in case the source does not exist on the item.

When omit_empty is set to true, the target key is removed from the item when the source key does not exist or the value is one of "", null, [], (), {}.

In order to create custom fields on the fly, use the custom flag in the mapping item. It's best to also define labels in the required languages, so that the item is properly displayed later on the website.

When defining custom fields, it is possible to set a protected flag (see the example below for reference). The protected flag has the effect that this particular custom field is not published to the website (Solr) and is only stored in the internal systems. A use case for this feature is to be able to display protected data, such as person identifiy information (PII) to authorized personell in the orders management interface whilst publishing the record as a whole to the public without this sepcific metadata field and its contents which may deserve protection. Custom fields are not protected by default.

Example:

{
  "section": "AttributeMapper",
  "options": {
    "mapping": [
      {
        "custom": true,
        "source": "$GESCHICHTE",
        "label_de": "Geschichte",
        "label_fr": "Histoire",
        "label_en": "History",
        "display_group": 2,
        "slug": "history"
      },
      {
        "custom": true,
        "protected": true,
        "source": "$SCHUTZFRISTNOTIZ",
        "slug": "schutzfristnotiz",
        "label_de": "Schutzfristnotiz"
      }
    ]
  }
}

# Options

  • cleanup - Cleanup fields

    • When true, fields that are not listed in the attribute mapping are removed from the item.
    • Type: Boolean
    • Default: true
  • mapping - Field mapping

    • The field mapping is a list of dicts, each mapping one field value.
    • Type: List of Options
      • comment - Comment

        • Comment for documentation purposes.
        • Type: Char
      • custom - Custom field

        • When true, the value is prepared to be imported as custom field. When used, target must not be set but slug instead.
        • Type: Boolean
        • Default: false
      • default - Default value

        • Default value which is set when source is not set in the item.
        • Type: JSON
      • display_group - ISAD(G) group number

        • Can be used in combination with custom=true in order to define in which ISAD(G) display group the custom value will be displayed in the default metadata view.
        • Type: Choice
        • Choices:
          • 1: Identity Statement ("Identifikation")
          • 2: Context ("Kontext")
          • 3: Content and Structure ("Inhalt und innere Ordnung")
          • 4: Conditions of Access and Use ("Zugangs- und Benutzungsbestimmungen")
          • 5: Allied Materials ("Sachverwandte Unterlagen")
      • label_de - German label

        • Type: Char
      • label_en - English label

        • Type: Char
      • label_fr - French label

        • Type: Char
      • omit_empty - Omit empty

        • When the value is missing in item and omit=true, the value is removed completely from the result.
        • Type: Boolean
      • protected - Protected custom value

        • In combination with custom=true, setting protected=true makes custom field values to be excluded in the data sent to the website / external systems.
        • Type: Boolean
      • slug - Customfield Slug

        • The slug is the identifier of the custom field. It can be used optionally in combination with custom=true in order to make the field identifiable in other configs such as for the Web.
        • Type: Char
      • source - Source fieldname or expression

        • The source of the value within the item can be configured with a simple field name (e.g. title) or with an expression (e.g. $title or $metadata.title).
        • Type: Char
      • target - Target fieldname

        • The target must be configured when mapping to a product field or when the attribute mapper is used for renaming / filtering data. When the target is used, it simply sets the value with this fieldname.
        • Type: Char
      • value - Value to map

        • The value option can be used instead of the source option for the ability to use expressions when mapping attributes.
        • Type: Expression

# Compress

The Compress section allows for compressing files using different compression methods. Currently supported methods are:

  • gzip: Compress a single file with gzip
  • zip: Create a zip archive with one or multiple files

# GZip Compression

The gzip operation compresses a single file using gzip compression and replacing the path in the item which is yielded further. The original file is not kept.

Example:

{
    "section": "Compress",
    "options": {
        "gzip": {
            "level": 7
        }
    }
}

# Zip Archive

This operation creates a zip archive containing one or multiple files. When the batch_size is set to 0, all files are written to a single zip archive.

Example:

{
    "section": "Compress",
    "options": {
        "zip": {
            "output_filename": "archive.zip",
            "level": 6
        }
    }
}

# Options

  • gzip - GZip Compression

    • Compress a single file using gzip
    • Type: Options
    • Default: null
      • file_path - Path field

        • Name of the field in the consumed item containing the path to the file on the disk.
        • Type: Expression
        • Default: {{ item.path }}
      • level - Compression level

        • The compression level for the gzip tool as a number from 1 (fast) to 9 (best).
        • Type: Integer
        • Default: 7
  • zip - Zip Archive

    • Create a zip archive with one or multiple files
    • Type: Options
    • Default: null
      • output_filename - Archive name required

        • Name of the resulting zip archive
        • Type: Expression
      • batch_size - Batch size

        • Amount of objects to be written in one file. Set it to 0 in order to write all items into one file.
        • Type: Integer
        • Default: 1000
      • file_path - Files path

        • Name of the field in the consumed item containing the path to the file on the disk.
        • Type: Expression
        • Default: {{ item.path }}
      • level - Compression level

        • ZIP compression level (0-9, where 0=no compression, 9=maximum)
        • Type: Integer
        • Default: 7

# CreateContainerRecordRelations

Creates or updates (based on their "identifier") container record relations for items on the pipeline.

Example:

{"section": "CreateContainerRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateContainers

Creates or updates (based on their "identifier") containers for items on the pipeline.

Example:

{"section": "CreateContainers"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDeliveries

Creates or updates (based on their "identifier") deliveries for items on the pipeline.

{"section": "CreateDeliveries"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDeliveryRecordRelations

Creates or updates (based on their "identifier") delivery record relations for items on the pipeline.

{"section": "CreateDeliveryRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDescriptorRecordRelations

Creates or updates (based on their "identifier") descriptor record relations for items on the pipeline.

{"section": "CreateDescriptorRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDescriptorRelations

Creates or updates (based on their "identifier") descriptor relations for items on the pipeline.

{"section": "CreateDescriptorRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateDescriptors

Creates or updates (based on their "identifier") descriptors for items on the pipeline.

{"section": "CreateDescriptors"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateEnumEntries

Creates or updates enum entry objects. For updating, the unique combination of "enum" and "identifier" is used.

Configuration example:

{"section": "CreateEnumEntries"}

Accepts items in this form:

{
    "enum": "archival_type",
    "identifier": "pic",
    "order": 1,
    "label_de": "Bild",
    "label_en": "Picture",
    "label_fr": "Image",
    "label_it": null,
    "data": {"important": true},
},

Keys:

  • enum (required) contains the identifier of the enum, which usually matches the field name.
  • identifier (required) contains the identifier of the entry in the enum, which must be unique per enum.
  • order (optional) integer value for ordering.
  • label_* (optional) labels translated in each supported and enabled language, such as de, en, fr, it, rm.
  • data: (optional) JSON field for storing additional structured data in this record.

prune_enum Option: The prune_enum option makes it possible to delete and replace a whole set of enum entries with the same enum value without changing other existing enums.

This is done by removing all existing entries for an enum on the first appearance of an item in the pipeline with this enum name. Enum entries which have an enum name that does not appear in the pipeline are kept.

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false
  • prune_enum - Prune enum

    • Delete all enum entries of newly imported enums.
    • Type: Boolean
    • Default: false

# CreateExternalRepresentations

Creates or updates (based on their "identifier") external representations for items on the pipeline.

{"section": "CreateExternalRepresentations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateIIIFSources

Creates or updates (based on their "id") file objects for items on the pipeline. Prunes all existing relations before re-creating them.

ℹ️ The name CreateIIIFSources has historic reasons. Nowadays, the section is used for all kinds of files.

{"section": "CreateIIIFSources"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateLocations

Creates or updates (based on their "identifier") locations for items on the pipeline.

Example:

{"section": "CreateLocations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreatePackages

Creates or updates (based on their "identifier") packages for items on the pipeline.

Example:

{"section": "CreatePackages"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateRecordRelations

Creates or updates (based on their "identifier") record relations for items on the pipeline.

Example:

{"section": "CreateRecordRelations"}

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CreateRecords

Creates or updates (based on their "identifier") records for items on the pipeline.

{"section": "CreateRecords"}

The Record model supports custom field values. Thec stum fields are created dynamically. By default, when at least one custom field is mapped in a previous AttributeMapper, the existing values are cleared before updating an existing object. This behavior can be disabled with the cleanup_custom_values option.

In order to protect other pipelines from removing "our" values, we can claim ownership by setting a string in custom_value_owner. Only CreateRecords sections with the same owner configured are able to clear or overwrite existing values.

By default, the system protects pipelines from accidentally redefining values that already exist on an item and are owned by another owner. This behavior can be disabled with the force_overwite_custom_values option.

# Options

  • batch_size - Batch size

    • Amount of objects to be saved to the database in one batch.
    • Type: Integer
    • Default: 100
  • cleanup_custom_values - Cleanup custom values

    • Existing custom values on the stored record will be removed when they no longer exist in the processed item.
    • Type: Boolean
    • Default: true
  • custom_value_owner - Custom value owner

    • A string, indicating the ownership of custom values. Values by other owners will not be removed. Overwriting values of other owner raises an error. Requires slug to be defined for identifying values.
    • Type: Char
    • Default: null
  • force_overwite_custom_values - Override custome values

    • By default, overwriting custom values of other owners raises an error. By setting this option to false, the values are overwritten and the owner is set to custom_value_owner.
    • Type: Boolean
    • Default: false
  • prune - Prune objects

    • Delete all objects before creating new ones.
    • Type: Boolean
    • Default: false

# CsvFileReader

Accepts a list of strings denoting the absolute paths on the file system. Reads each CSV file in that list and returns a dict for each row. The dialect (delimiter, quote chars etc.) will be auto-detected if not specified.

Example:

{
    "section": "CsvFileReader",
    "options": {
        "columns": ["ID_NR"]
    }
}

# Options

  • columns - Columns

    • A list of column names to add to the pipeline. If empty, all columns are added.
    • Type: List of Char
    • Default: []
  • dialect - CSV Dialect

    • The CSV dialect to use. If not specified, will auto-detect from file content.
    • Type: Choice
    • Default: null
    • Choices:
      • excel: excel
      • excel-tab: excel-tab
      • unix: unix
  • items - Items

    • JSON Pointer to the list of items that will be passed on to the next section individually. When not defined, the whole document is passed on as one item.
    • Type: JSONPointer
    • Default: ''
  • path - Path

    • JSON Pointer adressing the path to the file to be loaded.
    • Type: JSONPointer
    • Default: JsonPointer('/path')
  • target - Target

    • JSON Pointer for the place where the result is inserted in the processed item. When not set, the processed item is replaced by the resulting item.
    • Type: JSONPointer
    • Default: null
  • total - Total count

    • When processing multiple items with data, such as HTTP responses, the pipeline statistics and progressbar keep being updated. When the first item/HTTP response contains information about the total amount of items to be processed, this information can be set using the total option. The total option accepts an expression, where the previous item / HTTP response can be accessed with item and the parsed content body can be accessed with document.
    • Type: Expression
    • Default: null

# Deduplicate

Deduplicate any items by the value of a specific field.

In some cases, we need to deduplicate items when they appear twice, identified by a specific fieldname. This section does exactly that.

Example:

{
  "section": "Deduplicate",
  "options": {
    "field": "identifier"
  }
}

# Options

  • field - Field name required
    • Name of the field of the unique value which is used for deduplicating items that have the very same value in this field.
    • Type: Char

# DefineMacro

Define global macros. The macros are available in every expression field. Redefining the same macro twice in another section will throw an error.

Example:

{
  "section": "DefineMacro",
  "options": {
    "macros": {
      "identity": {"args": ["x"], "body": "{{x}}"}
    }
  }
}

# Options

  • macros - Macros required
    • Type: Dict

# DeleteItems

Takes a list of objects expecting a key named identifier_field. All entries from the model model are deleted matching the list of identifiers. Yields the initial list again.

# Options

  • model - Model name required

    • Name of the model to delete from, all in lowercase characters.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
      • externalrepresentation: Externe Repräsentation
  • batch_size - Batch size

    • Number of items to delete in one batch.
    • Type: Integer
    • Default: 100
  • identifier_field - Identifier Feldname

    • Name of the field on the item in the pipeline, containing the value to look for in lookup_field.
    • Type: Char
    • Default: 'identifier'
  • lookup_field - Model field name

    • Name of the field on the model used to look up the values from identifier_field.
    • Type: Char
    • Default: 'identifier'

# ExcelReader

Reads data from an Excel file (.xlsx) and loads each row into the pipeline. By default, the first sheet of the Excel file is read.

The Excel file must have already been created by a previous section such as FileSource, HTTP, S3 or FTP. These sections provide a pipeline item containing the local path to the file, which is processed by this pipeline.

Example:

{
  "section": "ExcelReader",
  "options": {
    "columns": ["ID_NR"]
  }
}

Renaming columns:

In the ETL pipeline, the keys should be technical. Characters such as . or , can break certain section in the pipeline. In such cases, the columns can be simply renamed.

{
  "section": "ExcelReader",
  "options": {
    "columns": ["Inventarnr. verwandte Objekte"],
    "rename_columns": {"Inventarnr. verwandte Objekte": "inventarnr_verwandte_objekte"}
  }
}

Processing multiple sheets:

With the sheets option, it is possible to process multiple Excel sheets of data in the configured order. In order to be able to determine the source sheet later, a _sheet attribute is added to each item.

{
  "section": "ExcelReader",
  "options": {
    "sheets": [
      { "name": "Person" },
      { "name": "Committee" }
    ]
  }
}

Using lookups:

When processing a multi-sheet Excel, it is possible to build a lookup table from a sheet and use its value in another sheet. The order of the sheet definition is relevant, so that the lookup is built first.

Given we have an Excel with the sheets Person and Role, we could build a lookup from Role and use it in Person:

{
  "section": "ExcelReader",
  "options": {
    "sheets": [
      {
        "name": "Role",
        "build_lookup": {
          "lookup": "roles",
          "key": "{{ item.id }}"
        }
      },
      {
        "name": "Person",
        "retrieve_from_lookup": [
          {
            "key": "{{ item.role_id }}",
            "lookup": "roles",
            "target": "role_name",
            "value": "{{ item.name }}"
          }
        ]
      }
    ]
  }
}

Batch size:

The batch size is the number of rows that are read at once. This can be useful to reduce memory consumption when reading large excel files.

{
  "section": "ExcelReader",
  "options": {
    "columns": ["ID_NR"],
    "batch_size": 100
  }
}

# Options

  • batch_size - Batch size

    • The number of rows to read at once.
    • Type: Integer
    • Default: 100
  • columns - Column names

    • A list of column names to preserve. If empty, all columns are preserved.
    • Type: List of Char
    • Default: []
  • rename_columns - Rename columns

    • Mapping of columns to rename, where the key is the string name of the excel column and the value is the new name it should have.
    • Type: Dict
    • Default: {}
  • sheet - Sheet name

    • The name of the excel sheet to read from. When not set, the first sheet is read.
    • Type: Char
    • Default: null
  • sheets - Sheets

    • In order to load multiple sheets of the excel at once, this option can be used. The rows of the sheets are yielded as invidual items unless the sheet acts as lookup table. The key of this dict must match the title of the sheet in the Excel file.
    • Type: List of Options
    • Default: null
      • name - Sheet name required

        • The name of the excel sheet to read from.
        • Type: Char
      • build_lookup - Build lookup

        • Build a lookup table from the rows of this sheet. When a sheet acts as lookup table, its rows are not passed on to the next section.
        • Type: Options
        • Default: null
          • key - Key required

            • Expression used as key in the lookup. Provides item.
            • Type: Expression
          • lookup - Lookup name required

            • Name of the lookup which should be built. Must be unique within the section config.
            • Type: Slug
          • multivalued - Multivalued

            • If set to true, the lookup contains lists of items. Else, the lookup contains only one item and the key must be unique.
            • Type: Boolean
            • Default: false
      • columns - Column names

        • A list of column names to load into the item. By default, all columns are loaded.
        • Type: List of Char
        • Default: []
      • rename_columns - Rename columns

        • Mapping of columns to rename, where the key is the string name of the excel column and the value is the new name it should have.
        • Type: Dict of Char
        • Default: {}
      • retrieve_from_lookup - Retrieve from lookup

        • Retrieve an item from a previously built lookup and stored in an attribute in the current item.
        • Type: List of Options
        • Default: null
          • key - Key required

            • Expression used as key for looking up a value. Provides item.
            • Type: Expression
          • lookup - Lookup name required

            • Name of the lookup where the referenced value should be looked up.
            • Type: Slug
          • target - Target required

            • Name of the attribute to store the retrieved value in the current item.
            • Type: Slug
          • value - Value

            • Expression for extracting values from the item in the lookup table. If empty, the whole item is included. Provides item.
            • Type: Expression
            • Default: null
      • skip_head_rows - Skip head rows

        • Do not process the first n rows of the sheet. With that, additional heading rows can be skipped.
        • Type: Integer
        • Default: 0

# Expand

The Expand section is used for repeating an item.

With this section, in item can be repeated for each value of a list in a property of the items.

Example Pipeline definition:

{
  "section": "Expand",
  "options": {
    "fields": ["relation_target"]
  }
}

Input:

[
  {"relation_source": 1, "relation_target": [2, 3, 4]}
]

Output:

[
  {"relation_source": 1, "relation_target": 2},
  {"relation_source": 1, "relation_target": 3},
  {"relation_source": 1, "relation_target": 4}
]

When one of the fields is missing or the value is empty or falsy, the item will not be passed.

# Options

  • field - Field name

    • Name of the field that contains a list by which the items should be expanded. field and fields are mutual exclusive.
    • Type: Char
    • Default: null
  • fields - Field names

    • List of fieldnames to expand at the same time. Each field must have the same amount of values. field and fields are mutual exclusive.
    • Type: List of Char
    • Default: null
  • split - Split by character

    • Split the string value of the fields by this character.
    • Type: Char
    • Default: null
  • strip - Strip characters

    • Strips the characters defined in this field in all the values of the expanded lists.
    • Type: Char
    • Default: null
  • target - Target field name

    • Name of the field that the expanded field value should be stored in. When not defined the field is replaced inplace and the original value is dropped. target can not be combined with fields.
    • Type: Char
    • Default: null

# ExtractZip

The ExtractZip section allows to extract Zip files that were placed in the pipeline by a previous section such as FileSource.

It expects an item with a path to a ZIP file and yields an item with a path for each file in the ZIP after extraction.

Example item:

{
    "path": "/tmp/etl/zip/foo.txt"
}

# Options

# FTP

The FTP section can be used to interact with an FTP server, for instance for uploading files. It supports multiple operations.

The connection details are configured through options:

{
    "section": "FTP",
    "options": {
        "host": "ftp.example.org",
        "port": 21,
        "user": "ftpuser",
        "password": "secret",
        ...
    }
}

# Upload File

The upload_file operation reads a local, temporary file and uploads it to an FTP server.

Example:

Input:

[
    {
        "path": "/tmp/etl/json_writer/data.json",
        "filename": "data.json"
    }
]

Section configuration:

{
    "section": "FTP",
    "options": {
        "host": "ftp.example.org",
        "port": 21,
        "user": "ftpuser",
        "password": "secret",
        "upload_file": {
            "source": "{{ item.path }}",
            "directory": "data-dump/januar/{{ item.filename }}"
        }
    }
}

Pruning:

The prune option lets the section remove older files (configurable through keep_hours), so that the FTP server is not filled up when configuring the pipeline to run automatically / nightly. Pruning only works when directory is configured. Be aware that in this case the item variable is not available in the directory expression at pruning time, since pruning is not a per-item operation.

Delete:

By default, the temporary files on the disk are deleted after uploading in order to reduce disk size. You can disable this behavior with the delete option when you need the files later.

# Download File

The download_file operation downloads a file from an S3 bucket.

Example configuration:

{
    "section": "FTP",
    "options": {
        "host": "ftp.example.org",
        "port": 21,
        "user": "ftpuser",
        "password": "secret",
        "download_file": {
            "source": "{{ item.path }}",
        }
    }
}

# List FIles

The list_files operation recursively lists all files on the FTP server and adds an item to the pipeline for each file on the FTP server.

Example configuration:

{
    "section": "FTP",
    "options": {
        "host": "ftp.example.org",
        "port": 21,
        "user": "ftpuser",
        "password": "secret",
        "list_files": {}
    }
}

This will insert simple file representations into the pipeline:

{
    "path": "path/to/file.txt",
    "type": "file"
}

With the foreach option, you can process items in the pipeline inserted by a previous section. This enables the item variable in the expression. By default, the previous item is removed from the pipeline. With the target_path option, you can insert the result into the previous item. Be aware that the amount of inserted files is limited in order to avoid performance problems when there is a large amount of items in the FTP server.

Example:

{
    "section": "FTP",
    "options": {
        "host": "ftp.example.org",
        "port": 21,
        "user": "ftpuser",
        "password": "secret",
        "list_files": {
            "prefix": "{{ item.ftp_path }}",
            "foreach": {
                "target_path": "/ftp_file_paths",
                "paths_only": True,
            },
        },
    }
}

# Options

  • host - FTP Hostname required

  • password - FTP Password required

    • The password for connecting to the FTP server.
    • Type: Expression
  • user - FTP User required

    • The username for connecting to the FTP server.
    • Type: Expression
  • download_file - Download File

    • Download a file from the FTP server.
    • Type: Options
    • Default: null
      • source - Source Path required

        • The path to the file on the local temp directory (source).
        • Type: Expression
      • cleanup - Cleanup files

        • When enabled, the temporary downloaded file is automatically removed after passing on the item in the pipeline.
        • Type: Boolean
        • Default: true
  • list_files - List FIles

    • List files on the FTP server.
    • Type: Options
    • Default: null
      • foreach - Foreach

        • When using the foreach option, the listing is executed for each item that was previously in the pipeline. This enables the possibility that the item can be accessed in expression fields using the item variable name.
        • Type: Options
        • Default: null
          • max_results - MaxResults

            • Limits the maximum amount of results. This can only be used in combination with target_path.
            • Type: Integer
            • Default: 1000
            • Min: 1
          • paths_only - Paths only

            • Only return a list of paths as strings. This can only be used in combination with target_path.
            • Type: Boolean
            • Default: false
          • target_path - Target path

            • JSON path to the key on the previous item, where the list of results should be stored. When the option is omitted, the previous item is removed from the pipeline and each resulting item is added to the pipeline individually. When this option is used with a large object count, this can can cause performance problems; consider using the paths_only option.
            • Type: Expression
            • Default: null
      • prefix - Prefix Path

        • Path prefix as a relative path to the FTP users home directory, limiting the listed files to all files within this directory.
        • Type: Expression
        • Default: .
  • port - FTP Port

    • The port of the FTP server.
    • Type: Integer
    • Default: 21
  • upload_file - Upload File

    • Upload a file to the FTP server.
    • Type: Options
    • Default: null
      • delete - Delete

        • Delete the local file after uploading in order to reduce disk usage to a minimum.
        • Type: Boolean
        • Default: true
      • directory - Target Directory Path

        • The path to the directory, in which a new file is created on the FTP server. The filename of source will be used as the filename of the new object.
        • Type: Expression
        • Default: null
      • keep_hours - Keep hours

        • When pruning, keep files on the FTP server that are younger than the configured amount of hours.
        • Type: Integer
        • Default: 62
      • prune - Prune

        • Before uploading, remove existing files from the FTP server. Requires a directory to be configured for precaution.
        • Type: Boolean
        • Default: false
      • source - Source Path

        • The path to the file on the local temp directory (source).
        • Type: Expression
        • Default: {{ item.path }}
      • target - Target File Path

        • The path of the file on the FTP server.
        • Type: Expression
        • Default: null

# FileDeduplicator

The file deduplicator section has the job to reduce the amount of processed files by skipping duplicated files.

The use case is that when one specific record has multiple files, containing the original (which we assume to be a TIF) and lower quality preview copies (which we assume to be JPG), we only want to keep the TIF.

This is done by grouping file extensions in an ordered list. Extensions which are first in the list have higher priority.

For example, with a group of ["tif", "jpg", "png"] this means that when there are multiple files with the same stem (filename without extension), then tif is priorized over jpg and png, and the latter will be removed from the pipeline. The extensions are always compared with lowercase.

In order for files to be compared in a group, these rules apply:

  • They have to be referenced with the same record (record_identifier_field option).
  • They have to have the same stem (filename without extension).
  • They have to have an extension listed in the same extension group.

Example:

{
  "section": "FileDeduplicator",
  "options": {
    "filename_field": "DATEINAME",
    "record_identifier_field": "ID_NR_VRZNG_ENHT",
    "extension_groups": {
        "image": ["tif", "tiff", "jpg", "jpeg", "webp", "jp2", "j2k", "jpf", "png"],
        "audio": ["wav", "ogg", "mp3"]
    }
  }
}

# Options

  • extension_groups - Extension groups

    • A list of extension groups. Each extension group is a list of file extensions (without leading .). Each group represents a media type (e.g. image, audio, video) that should be deduplicated together.
    • Type: Dict of List of Char
    • Default: {'image': ['tif', 'tiff', 'jpg', 'jpeg', 'webp', 'jp2', 'j2k', 'jpf', 'png'], 'audio': ['wav', 'ogg', 'mp3']}
  • filename_field - Filename field name

    • The name of the field containing filenames or paths. The value of the field is used for identifying duplicates based on the filename stem and for identifying the extension for selecting the best version of the image.
    • Type: Char
    • Default: 'DATEINAME'
  • record_identifier_field - Record identifier field name

    • The name of the field containing the identifier of the record.
    • Type: Char
    • Default: 'ID_NR_VRZNG_ENHT'

# FileDownloader

The FileDownloader section has the purpose to download files from a source system and store them on a temporary volume for further processing by other sections and services.

The section works in combination with a file-downloader service, which runs in a separate. The source system must be configured properly in the file downloader section.

Source types:

  • HTTP / HTTPS (http): The file downloader service can download files from HTTP server.
  • S3 (s3): Download the files from an S3 service, supporting S3 authentication configured in the service.
  • Fedora Repository (fedora): Downloads an AIP package (jar-file) from a Fedora Repository service, supporting authentication configured in the service. The AIP archive is then ZIP-extracted and the files are processed later by the publisher section

Path/URL replacing:

In order to create an absolute path which works on the source system based on an URL or filesystem path, we often have to replace parts of the string. The section has a built-in functionality for doing that based on regular expressions.

The example below shows a configuration for this conversion:

  • input: file:///Q:/2018-00_P-S&N/STA_H_43_54/STA_H_43_54.xml
  • output: /2018-00_P-S&N/STA_H_43_54/STA_H_43_54.xml

Example:

{
  "section": "FileDownloader",
  "options": {
    "identifier_field": "ID_NR_DATEI",
    "source_path_field": "URL",
    "source_path_regexp_replace": [
      "file:///[A-Z]:(.*)",
      "\1"
    ]
  }
}

This section's output is meant to be consumed by a FilePublisher section later. The section especially yields the attributes public_path and temp_files.

Giving URLs as input:

The section can also be used to download files from a HTTP server. In this case, the source_type option must be set to http. The source_path_field have to point to a field containing the URL in the previous section's output.

The config_name can be omitted in this case.

The file might not have an identifier since it is not coming from database. To generate a unique identifier use Map section and generate_identifier method.

Example:

{
  "section": "FileDownloader",
  "options": {
    "source_type": "http",
    "identifier_field": "identifier",
    "source_path_field": "URL"
  }
}

# Options

  • source_path_field - Source path field name required

    • Name of the field containing a Path or URL with the path and name of the file on the source system. The path extracted from here is expected to be usable as path in the URL on the source system.
    • Type: Char
  • config_name - File downloader source config name

    • Name of the preconfigured file downloader source config which was preconfigured when installing the system. The available config names are different from installation to installation.
    • Type: Expression
    • Default: null
  • filename - Filename

    • When used, this filename is set after downloading the file. This is helpful when the URL does not contain a proper filename, which is necessary in order to make extension based file detection.
    • Type: Expression
    • Default: null
  • identifier_field - Identifier field name

    • Name of the field containing the unique identifier of the file on the source system.
    • Type: Char
    • Default: 'ID_NR_DATEI'
  • purge_on_start - Purge downloaded files on start

    • When enabled, the downloads directory is cleared on start, so that potential leftover from a previous, failed run are removed. When using multiple FileDownloader sections in a pipeline, this should maybe be disabled.
    • Type: Boolean
    • Default: true
  • source_path_regexp_replace - Source path regexp replace list

    • A search/replace regexp pattern, represented as a list of two strings, the first one containing a search pattern, the second a replace pattern.
    • Type: List
  • source_type - Source type

    • The type of the source system.
    • Type: Choice
    • Choices:
      • s3: S3 (Default)
      • http: HTTP / HTTPS
      • fedora: Fedora Repository
  • unzip - Unzip

    • When True (default), ZIP files are automatically extracted after download and a file listing of the contents is passed on. When set to False, ZIP files are not extracted.
    • Type: BooleanExpression
    • Default: {{ True }}
  • unzip_glob - Unzip glob

    • After extracting ZIPs, this glob is applied and only matching files are selected for processing. By default, the glob **/* is applied, returning all files. When the file is downloaded from a Fedora source, by default primaerdaten/**/ is used in order to maintain backwards compatibility with Fedora installation which contains packages ingested with a Scope solution.
    • Type: Expression
    • Default: null

# FilePublisher

The file publisher section has the goal to publish files to the public S3 bucket, so that it can be displayed on the web.

The pipeline must be configured in a way, so that only files of records are fed to this section, when the records are public (is_public) and their files can also be public (files_are_public).

In order for the file publisher section to work properly, a set of publisher services in separate containers are required. Depending on the file type, these sections have ressource intensive tasks to do, such as preparing the images for a IIIF viewer.

This section is expected to be used in combination with the FileDownloader section, which adds items to the pipeline containing paths to the files on the shared temporary volume. It especially requires the attributes public_path and temp_files on the items.

Pipeline version:

File publishing is in general a slower task compared to only processing metadata. Therefore, it is important to have a mechanism for identifying situations where the work is already done and does not need to be redone.

This is implemented with a "pipeline version" feature. As a pipeline version value we can define an arbitrary version number of the current configuration of the pipeline and the worker services. When we rerun the same pipeline, files will be skipped with SkipProcessedFiles section that were already processed with the currently configured pipeline version. The SkipProcessedFiles section removes the file from the pipeline, since it is already processed.

s3_public Option:

The s3_public option is set to True by default, so that all processed and published file objects are directly accessible from the S3 storage without any authentication by anyone. This is the default mode optimized for websites that are publicly accessible in the internet and only contain unprotected files.

When set to False, the files are no longer directly accessible and are served through the backend which will have the chance to verify whether a particular user is allowed to access the files. These settings are configured by setting ACL values on the file item.

Whitelist and Blacklist:

In this section, you can filter files using a whitelist and blacklist configuration which are lists of file extensions. If the whitelist is enabled, only files with the specified extensions will be processed. If the blacklist is enabled, files with the specified extensions will not be processed. If both are enabled, the blacklist will take priority over the whitelist.

For example, if you want to process only pdf files, you can set the whitelist to

{
  "section": "FilePublisher",
  "options": {
    "whitelist": ["pdf", "pdfa"]
  }
}

The file extensions are always compared in lowercase.

Example:

{
  "section": "FilePublisher",
  "options": {
    "pipeline_version": "17.2"
  }
}

# Only create thumbnails

With the thumbnail_only Option, it is possible to only produce thumbnails. For file types with thumbnail support (such as images, videos or PDFs) this has the impact that the thumbnail is generated but the Viewer button is not available for this file. Only one thumbnail per record is displayed, but it is no problem to generate multiple thumbnail files for the same record for convenience in the pipeline config. This is an expression field, so that the value can be evaluated based on the pipeline item and the file.

Example:

{
  "section": "FilePublisher",
  "options": {
    "thumbnail_only": "{{ True }}"
  }
}

Warning: this option is only working with the modern publisher service setup, since the classic publisher setup only respects this option in the image publisher. In the classic setup, the publisher_args servced the same purpose but only for images. The publisher_args option is therefore deprecated and should be replaced with the new thumbnail_only option.

Deprecated example with the old, classic file publishers:

{
  "section": "FilePublisher",
  "options": {
    "publisher_args": {
      "image": {
        "as_thumbnail": "{{ item.thumbnail_only }}"
      }
    }
  }
}

# Options

  • blacklist - Blacklist

    • If set, file extensions that are included in the list are not processed.
    • Type: List
  • package_processing - Package processing

    • If enabled, the section will process packages (e.g. AIPs). If disabled, package files are processed individually.
    • Type: Boolean
    • Default: false
  • pipeline_version - Pipeline version

    • The version of the pipeline configuration. Files that were already processed with this specific version are skipped in future runs.
    • Type: Expression
    • Default: null
  • publisher_args - Publisher args (Deprecated)

    • Additional arguments that are passed to the publisher services. The key is the media_type (e.g. image), the value is a dict where the key is the argument name and the value is either a raw value or an expression, which has access to item and file. Deprecated in the modern publisher services setup; use the thumbnail_only option instead.
    • Type: Dict of Dict of Expression
    • Default: {}
  • s3_public - S3 Public

    • When set to True, file objects stored in the S3 bucket are accessible directly from the S3 without any authenticated. Anyone can download these files.
    • Type: BooleanExpression
    • Default: {{ True }}
  • thumbnail_only - As thumbnail

    • Only publish thumbanils. The full format of the document is not published. File types without support for thumbnails are not published at all when enabled.
    • Type: BooleanExpression
    • Default: {{ False }}
  • whitelist - Whitelist

    • If set, only file extensions included in the list are processed.
    • Type: List

# FileSource

The FileSource section allows to manually upload a file for processing.

The file could be an excel file that is read and processed by the ExcelReader section. The section is not meant to be used for automatic / nightly running, since the user has to manually upload a file when the pipeline is executed.

The section yields items with paths:

[
    {
        "path": "/path/to/uploaded-file.xlsx"
    }
]

# Options

# Filter

Filter items by property values.

The filter section allows to drop items from the pipeline based on basic value comparison. Simple comparison operators can be used as well as and, or and not.

All expressions are represented as a simple list.

Keep items with a truthy field value:

{"keep": ["$is_public"]}

Using comparison operators:

{"keep": ["$is_public", "==", True]}
{"keep": ["$is_public", "!=", False]}
{"keep": ["$size", ">=", 10]}
{"keep": ["$title", "is not", None]}

Using and / or expressions:

{"keep": ["or",
          ["$schutzfrist_in_vergangenheit"],
          ["and"
            ["$status", "==", "Abgeschlossen"],
            ["$auf_dem_portal_sichtbar", "==", "Schutzfrist ignorierend"]]]}

Using not:

{"keep": ["and",
          ["$schutzfrist_in_vergangenheit"],
          ["not", ["$status", "==", "Abgeschlossen"]]]

# Options

  • keep - Keep conditions required
    • Contains a condition definition. All items for which the condition has a truthy result are kept in the pipeline. Items that do not meet the condition are dropped from the pipeline.
    • Type: JSON

# Group

The Group section groups items which have the same key. The items have to be in a certain order; as soon as a new key appears, the group is yielded to the next section. The key is defined as Expression.

Example:

{
    "section": "Group",
    "options": {
        "key": "{{ item.collection_identifier }"
    }
}

Input:

[
    {"collection_identifier": 27, "identifier": 4},
    {"collection_identifier": 27, "identifier": 7},
    {"collection_identifier": 31, "identifier": 12},
]

Output:

[
    {
        "items": [
            {"collection_identifier": 27, "identifier": 4},
            {"collection_identifier": 27, "identifier": 7},
        ]
    },
    {
        "items": [
            {"collection_identifier": 31, "identifier": 12},
        ]
    }
]

The section makes sure that the resulting items list is never empty.

# Options

  • key - Key required

    • Expression which should evaluate to a string and is used as grouping key.
    • Type: Expression
  • target - Target

    • The field name of the resulting item holding the list of grouped items.
    • Type: Char
    • Default: 'items'

# HTTP

The HTTP source section enables you to perform an HTTP request to any API endpoint and furnish the payload as a transient file for subsequent processing in a subsequent section.

Illustrative Example:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "params": {
            "query": "Test"
        }
    }
}

This action will introduce an item into the pipeline in the subsequent format:

{
    "path": "/tmp/etl/http/response-body-f34ds7fjlk",
    "status_code": "200",
    "reason": "OK",
    "headers": {
        "Content-Type": "application/json"
    }
}

Following this, the next section can access the file from the path and conduct suitable processing.

# Custom Request Headers

Custom request headers can be configured using the headers option:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "headers": {
            "Accept": "application/json"
        }
    }
}

# Basic Authentication

In situations where certain APIs mandate basic authentication, this can be accomplished by setting up a list of a username and password within the auth option:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "auth": ["api-user", "secret"]
    }
}

# OAuth

The section supports the OAuth Backend Application flow to obtain an access token that is used to access an API. This can be configured using the oauth option:

{
    "section": "HTTP",
    "options": {
        "url": "https://example.com/api/items",
        "oauth": {
            "client_id": "api-client-id",
            "client_secret": "api-client-secret",
            "token_url": "https://example.com/api/accesstoken"
        }
    }
}

# foreach

The section supports operating on previous items in the pipeline using the foreach option. It will call url for each item yielded by the previous section and stores the result of the request in target_path of the item.

Expressions can be used to build the url based on attributes of the item.

# Handling Response Codes

The section will trigger an exception when the response code falls outside the successful range, specifically when it is not a status code in the "2xx" range. In cases where the server responds with a server error (beginning with "5xx") or a client error (beginning with "4xx"), the section will terminate with an exception.

# Pagination Capability

The HTTP section offers support for paginating the requested API endpoint, provided that the response body is compatible. To enable pagination, the responding API must specify the content-type header as one of the supported content types (detailed below).

For each subsequent request, this section will generate and insert an item with a payload file path. It's important to note that this approach impacts the total number of requests in the pipeline run stats. This is because, in this scenario, it's impossible to predict the exact number of requests that will be made during pagination.

# Supported Content Types

In order to obtain the necessary information for pagination from the response body, the response body must be parsed and compatible with the supported content types. All pagination variants are compatible with these content types.

# JSON documents

When working with JSON payloads, you can select specific values using the JSON Pointer syntax. For more details, please refer to the JSON Pointer documentation (opens new window).

Content types:

  • application/json
  • application/ld+json
# XML documents

For XML documents, values can be extracted using XPath (opens new window).

Content types:

  • application/xml
  • text/xml

# Paginators

# Cursor-Based Paginator

The cursor_pagination method operates by expecting a cursor value in the response, which serves as a reference to the next page. In this method, the cursor should accurately represent the next page to be fetched.

Here's an example payload structure:

{
    "data": {
        "programmePage": {
        "cursor": "MSBnZWdlbiAxMDAgLSBTb21tZXJzcGVjaWFs",
        "edges": [
            {},
            {},
        ]
    }
}

To configure the paginator for this method, you can use the following settings:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "cursor_pagination": {
            "cursor_path": "/data/programmePage/cursor",
            "cursor_param": "cursor"
        }
    }
}

By specifying these configurations, the section will effectively manage pagination using the cursor values found in the response, ensuring the retrieval of subsequent pages of data.

# Next URL-Based Paginator

The next_pagination method is based on extracting a fully qualified URL from the response document to determine the URL for the next page. For instance, if the requested API provides a payload with a "next" URL like this:

{
    "pagination": {
        "next": "http://api.server.com/items?page=2"
    },
    "results": [{"item": 1}]
}

You can extract the next URL using a JSON Pointer expression, as demonstrated in the following configuration:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "next_pagination": {
            "next_url_path": "/pagination/next"
        }
    }
}

This approach allows the section to dynamically determine and follow the next page URL provided in the response, facilitating effective pagination.

# Offset-Based Paginator

The offset_pagination method facilitates pagination based on a specified starting index, referred to as the offset. In a scenario where a page contains, for instance, 50 items, the initial request would use a start parameter set to 0, and subsequent requests would increment the start value to 50, or alternatively, based on the API's indexing system, potentially 51.

To make this approach work effectively, the requested API must provide the following values in the response:

  • offset: Denoting the index of the first element in the current page.
  • limit: Indicating the maximum number of items on the current page.
  • total: Representing the total number of items across all pages.

The section performing the pagination will cease when any of these values is missing, or when the calculated offset for the next page exceeds the total items available.

Here is an example of a payload structure:

<root start="51" page_size="50" total="237">
    <items>
        ...
    </items>
</root>

To configure the paginator for this method, you can use the following settings:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "offset_pagination": {
            "offset_path": "/root/@start",
            "offset_param": "start",
            "limit_path": "/root/@page_size",
            "limit_param": "size",
            "total_path": "/root/@total"
        }
    }
}

This configuration allows the section to manage pagination effectively based on the offset values provided in the response, ensuring the retrieval of all relevant data.

# Page-Based Paginator

The page_pagination method operates by simply incrementing a page number in the request parameter, assuming that the first page is numbered as 1.

To determine when to stop, the configuration should include the items_path, which enables the paginator to identify whether items have been returned in the response or not.

Here's an example of a payload structure:

{
    "data": [
        {},
        {}
    ]
}

To configure the paginator for this method, you can use the following settings:

{
    "section": "HTTP",
    "options": {
        "url": "http://api.server.com/items",
        "page_pagination": {
            "page_param": "page",
            "items_path": "/data"
        }
    }
}

With these configurations, the section will effectively manage pagination by incrementing the page number and checking for the presence of items in the response, ensuring the retrieval of subsequent pages of data.

# Options

  • url - URL required

    • The URL which should be requested.
    • Type: Expression
  • auth - Authentication

    • List of username and password in plaintext, submitted as basic authentication. Environment variables can be referenced using $env. Example: $env.EXAMPLE_SECRET uses the value of the environment variable ETL_ENV_EXAMPLE_SECRET.
    • Type: List of EnvChar
    • Default: null
  • cleanup - Cleanup files

    • When enabled, the temporary payload file is automatically removed after passing on the item in the pipeline.
    • Type: Boolean
    • Default: true
  • cookies - Cookies

    • Dict of cookies to send with the request.
    • Type: Dict
    • Default: null
  • cursor_pagination - Cursor pagination

    • Type: Options
    • Default: null
      • cursor_param - cursor query param name required

        • The query param name to use for the cursor value in the next request.
        • Type: Char
      • cursor_path - Path to the cursor value required

        • The cursor value must contain the cursor for the current page.
        • Type: Char
  • foreach - Foreach

    • Type: Options
      • target_path - Path to store the result required
        • Path to the key on the item in the pipeline where the result will be stored. "" can be used to replace the entire item.
        • Type: Char
  • headers - Headers

    • Request headers to be sent.
    • Type: Dict of Expression
    • Default: null
  • json - JSON Payload

    • A JSON serializable Python object to send in the body of the Request.
    • Type: JSON
    • Default: null
  • method - HTTP Method

    • The HTTP method with which the requests should be made.
    • Type: Choice
    • Default: 'GET'
    • Choices:
      • GET: GET
  • next_pagination - Next pagination

    • Type: Options
    • Default: null
      • next_url_path - Next URL path required
        • Path to the URL of the next page in the response document.
        • Type: Char
  • oauth - Oauth

    • Type: Options
      • client_id - Client ID required

        • Environment variables can be referenced using $env. Example: $env.EXAMPLE_SECRET uses the value of the environment variable ETL_ENV_EXAMPLE_SECRET.
        • Type: EnvChar
      • client_secret - Client Secret required

        • Environment variables can be referenced using $env. Example: $env.EXAMPLE_SECRET uses the value of the environment variable ETL_ENV_EXAMPLE_SECRET.
        • Type: EnvChar
      • token_url - URL used to get an access token required

        • Type: Char
  • offset_pagination - Offset pagination

    • Type: Options
    • Default: null
      • limit_path - Path to the limit value required

        • The limit value must contain the maximum amount of items on one page.
        • Type: Char
      • offset_param - Offset query param name required

        • The query param name to use for the offset value in the next request.
        • Type: Char
      • offset_path - Path to the offset value required

        • The offset value must contain the index of the first element of the current page.
        • Type: Char
      • total_path - Path to the total value required

        • The offset value must contain total amount of items in all pages.
        • Type: Char
      • limit_param - Limit query param name

        • The query param name with which the limit is sent in the next request. This is optional. When null, the query param is omitted.
        • Type: Char
        • Default: null
  • page_pagination - Page pagination

    • Type: Options
    • Default: null
      • items_path - path to the items required

        • When there there are no items, the pagination is stopped.
        • Type: Char
      • page_param - page query param name required

        • The query param name to use for the page value in the next request.
        • Type: Char
  • params - Query Params

    • Request params to be sent in the query string.
    • Type: Dict of Expression
    • Default: null
  • raise_for_status - Raise for status

    • When enabled (default), an exception is thrown when the response code is 4xx or 5xx. This helps detecting problems and cancelling pipelines. In some cases, especially with foreach, we want to disable this behavior. This has the effect that the response status code is part of the resulting item and the problem has to be handled somehow in the pipeline.
    • Type: Boolean
    • Default: true
  • timeout - Timeout

    • How many seconds to wait for the server to send data before giving up as a float.
    • Type: Float
    • Default: null
  • verify - Verify TLS certificate

    • Control whether we verify the server's TLS certificate.
    • Type: Boolean
    • Default: true

# InvokePipeline

The InvokePipeline will invoke an existsing pipeline by passing the pipeline_id in the options. Multiple InvokePipeline sections can be chained after each other to process the same items with different pipelines. In the following example, two pipelines are invoked with the output of the CsvFileReader.

Example with input mode:

[
    { "section": "CsvFileReader" },
    {
        "section": "InvokePipeline",
        "options": {
            "pipeline_id": 2,
        }
    },
    {
        "section": "InvokePipeline",
        "options": {
            "pipeline_title": "Pipeline 3",
        }
    },
]

# Options

  • pipeline_id - Pipeline id

    • Type: Integer
  • pipeline_title - Pipeline title

    • Type: Char

# JSONReader

The JSONReader source section allows the read JSON files and yield the JSON items as a list.

This section expects the previous item to contain the path to a JSON file (by default under /path).

Example:

[
    {
        "path": "/path/to/file.json"
    }
]

Let's say we have a JSON file with the following structure:

{
    "groupSet": [
        {
            "offers": [
                {"name": "Offers 1"},
                {"name": "Offers 2"}
            ]
        },
        {
            "offers": [
                {"name": "Offers 3"},
                {"name": "Offers 4"}
            ]
        }
    ]
}

We can use the JSONReader section to read the offers items from the first element of the groupSet array:

{
    "section": "JSONReader",
    "options": {
        "items": "/groupSet/0/offers"
    }
}

JSON Pointer syntax can be used to specify the path to data in a JSON file, which is a string of tokens separated by /. For getting more information about the JSON Pointer syntax, please refer to the JSON Pointer documentation (opens new window).

Invalid data paths will result in an empty list being returned. It will always return a list, even if the data path points to a dictionary.

# Set statistics total

This section allows to configure the pipeline total. See the XMLReader documentation for details as it works similar for both sections.

# Options

  • items - Items

    • JSON Pointer to the list of items that will be passed on to the next section individually. When not defined, the whole document is passed on as one item.
    • Type: JSONPointer
    • Default: ''
  • path - Path

    • JSON Pointer adressing the path to the file to be loaded.
    • Type: JSONPointer
    • Default: JsonPointer('/path')
  • target - Target

    • JSON Pointer for the place where the result is inserted in the processed item. When not set, the processed item is replaced by the resulting item.
    • Type: JSONPointer
    • Default: null
  • total - Total count

    • When processing multiple items with data, such as HTTP responses, the pipeline statistics and progressbar keep being updated. When the first item/HTTP response contains information about the total amount of items to be processed, this information can be set using the total option. The total option accepts an expression, where the previous item / HTTP response can be accessed with item and the parsed content body can be accessed with document.
    • Type: Expression
    • Default: null

# JSONWriter

The JSONWriter section consumes items and writes them to a JSON-file as a list of items. The path to the file is yielded to the next section which may process it further.

In detail, the section writes all items consumed from the prior section into files on the temp file system of the ETL worker. It does not yield the consumed items to the next section but it yields only one item per written file, which contains the path to the file on the disk.

The written files are automatically removed after they are processed by the next section in order to keep disk usage low.

The items are written in batches (configurable through batch_size). The filename is also configurable and acts as a template, where these values are replaced:

  • $batch - the batch number, starting with 1.
  • $date - the date when the pipeline was executed in the form YYYMMMDD.
  • $time - the date and time in form YYYYMMDD-hhmmss; the date and time is the same for all written batch files for convenience, thus it does not resemble the time the file was written but the time the pipeline run was started.

Example configuration:

{
  "section": "JSONWriter",
  "options": {
    "batch_size": 200,
    "filename": "data-$date-$batch.json"
  }
},

Example output with two batches:

[
{"path": "/tmp/etl/json_writer/data-20221229-1.json", "size": 200},
{"path": "/tmp/etl/json_writer/data-20221229-2.json", "size": 173},
]

It is also possible to use simple item properties in the filename template. This has the effect that the items are grouped into multiple JSON output files.

Example configuration:

{
  "section": "JSONWriter",
  "options": {
    "batch_size": 200,
    "filename": "data-${archival_type}-${batch}.json"
  }
},

Only simple item properties such as strings, integers and booleans can be used in the filenames. Nested data structures such as lists and dicts are not supported.

WARNING

Be aware that when batch_size set to a non-zero value and $batch is missing in the filename template, the section will yield the same file path / filename multiple times with multiple batches of items. Depending on what is done with the files, the file may be overwritten on the target system and may result in missing data.

# Options

  • batch_size - Batch size

    • Amount of objects to be written in one file. Set it to 0 in order to write all items into one file.
    • Type: Integer
    • Default: 1000
  • filename - Filename

    • Filename template of the JSON file.
    • Type: Char
    • Default: 'data-$batch.json'
  • pretty - Pretty print

    • Pretty print items on multiple lines.
    • Type: Boolean
    • Default: false

# Logger

The logger section is used for logging data to the stdout of the worker process. This is useful for inspecting the behavior of a pipeline. The logger-section re-yields the all consumed items to the next section.

Example:

{
  "section": "Logger",
  "options": {
    "prefix": "Published",
    "fields": [
      "ID_NR_DATEI",
      "public_path"
    ],
    "oneline": true
  }
}

# Options

  • fields - List of field names

    • A list of fields to be logged for each item. This helps to reduce the amount of output for large items.
    • Type: List of Char
    • Default: null
  • limit - Limit

    • Only log the first n items consumed in order to not fill the log.
    • Type: Integer
    • Default: 100
  • oneline - One line

    • Flag for disabling pretty-printing items on multiple lines. Especially useful in combination with fields in order to make the output more compact.
    • Type: Boolean
    • Default: false
  • prefix - Prefix

    • Prefix for each log entry in order to identify which logger has produced the output. This is helpful when combining multiple loggers in the same pipeline.
    • Type: Char
    • Default: 'Logger'

# Map

The map section applies functions defined as operations on all items in the pipeline and acts as the "transform" part of the ETL pipeline.

The map section accepts a list of operations, where each operation consists of a dict with these keys:

  • function: string with the name of the function to apply
  • args: list of arguments for the function; string values with a $ prefix are replaced by the item-value identified by the name following the $. Nested values inside a dict can be accessed with dot notation. e.g.: $field.nested_key.double_nested_key. This notation also supports selecting list elements, for instance with $list.0 or $list.-1.
  • kwargs: optional list of keyword arguments for the function.
  • target: the name of the variable, in which the output is stored within the item.

There are many functions that can be used within the Map section. One special function is the condition function, which makes it possible to perform a condition on the item and store the boolean result in the item.

Examples:

In the next example, the string function split is applied on the value of the field ARCHIVALIENART in order to split the string by newline into a list of strings.

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "args": [
          "$ARCHIVALIENART",
          "\r\n\r\n"
        ],
        "target": "archival_type",
        "function": "split"
      }
    ]
  }
}

In the next example, we are creating a datetime string and store it in the field _today on the item. The string can later be used for comparing dates.

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "target": "_today",
        "function": "now"
      },
      {
        "args": [
          "$_today",
          "%Y-%m-%d"
        ],
        "target": "_today",
        "function": "strftime"
      }
    ]
  }
}

In the next example, the Map section is used to compare the datestring SCHUTZFRISTENDE to the current datestring stored in _today.

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "args": [
          "and",
          [
            "$SCHUTZFRISTENDE",
            "is not",
            null
          ],
          [
            "$SCHUTZFRISTENDE",
            "<",
            "$_today"
          ]
        ],
        "target": "is_approval_required",
        "function": "condition"
      }
    ]
  }
}

# Map functions

Below, the functions usable in the Map section are listed and documented. Most functions are simply Python functions that are registered in the Map section, thus the documentation is extracted from the standard Python function and may be adapted to how the Map section functions when used.

# abs

Return the absolute value of the argument.

# all

Return True if bool(x) is True for all values x in the iterable.

If the iterable is empty, return True.

# any

Return True if bool(x) is True for any x in the iterable.

If the iterable is empty, return False.

# ascii

Return an ASCII-only representation of an object.

As repr(), return a string containing a printable representation of an object, but escape the non-ASCII characters in the string returned by repr() using \x, \u or \U escapes. This generates a string similar to that returned by repr() in Python 2.

# bool

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

# bytes

bytes(iterable_of_ints) -> bytes bytes(string, encoding[, errors]) -> bytes bytes(bytes_or_buffer) -> immutable copy of bytes_or_buffer bytes(int) -> bytes object of size given by the parameter initialized with null bytes bytes() -> empty bytes object

Construct an immutable array of bytes from:

  • an iterable yielding integers in range(256)
  • a text string encoded using the specified encoding
  • any object implementing the buffer API.
  • an integer

# capitalize

Return a capitalized version of the string.

More specifically, make the first character have upper case and the rest lower case.

# chr

Return a Unicode string of one character with ordinal i; 0 <= i <= 0x10ffff.

# condition

Apply a condition and store the boolean result.

The condition function accepts a condition in the same way the filter conditions can be defined in the filter section. The difference is that the result can be stored as boolean in a field when used in the Map section.

See also the separate chapter on conditions in the ETL pipeline.

Example:

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "args": [
          "$ZUGAENGLICHKEIT_ID",
          "in",
          [10007, 10009]
        ],
        "target": "files_are_public",
        "function": "condition"
      }
    ]
  }
}

# count

S.count(sub[, start[, end]]) -> int

Return the number of non-overlapping occurrences of substring sub in string S[start:end]. Optional arguments start and end are interpreted as in slice notation.

# date_to_int

The function takes a date string that can be parsed into a date (using dateparse.parse_date) and converts it into an integer value. Eg 2022-06-19 will be converted to the integer 20220619.

Examples:

{
    "args": [
        "2022-06-19",
    ],
    "target": "output",
    "function": "date_to_int",
},
{
    "args": [
        "$field",
    ],
    "target": "other_output",
    "function": "date_to_int",
}

# datetime

datetime(year, month, day[, hour[, minute[, second[, microsecond[,tzinfo]]]]])

The year, month and day arguments are required. tzinfo may be None, or an instance of a tzinfo subclass. The remaining arguments may be ints.

# dict

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

# endswith

S.endswith(suffix[, start[, end]]) -> bool

Return True if S ends with the specified suffix, False otherwise. With optional start, test S beginning at that position. With optional end, stop comparing S at that position. suffix can also be a tuple of strings to try.

# enumerate

Return an enumerate object.

iterable an object supporting iteration

The enumerate object yields pairs containing a count (from start, which defaults to zero) and a value yielded by the iterable argument.

enumerate is useful for obtaining an indexed list: (0, seq[0]), (1, seq[1]), (2, seq[2]), ...

# filter

The filter function can be used for removing items from a list.

Given there is a list of values:

{
    "input": ["Foo", "Bar", "Baz"],
}

with the filter function, we can define a condition which is applied for each list item and must evaluate to true in order for item to be kept:

{
    "function": "filter",
    "args": ["$input", ["$value", "!=", "Bar"]],
    "target": "output",
}

results in:

{
    "input": ["Foo", "Bar", "Baz"],
    "output": ["Foo", "Baz"],
}

The item in the list can be addressed with $value in the condition. See the Conditions for details on how to write conditions.

# filter_empty

The filter_empty function is used for removing empty/null values from a list.

Given there is a list of values:

{
    "input": ["Foo", null, "", 0, "Bar"],
},

with the filter_empty function is used:

"operations": [
    {
        "function": "filter_empty",
        "args": ["$input"],
        "target": "output",
    }
]

it will remove all falsy values such as null, empty string or 0:

{
    "input": ["Foo", None, "", "Bar"],
    "output": ["Foo", "Bar"],
}

# find

S.find(sub[, start[, end]]) -> int

Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Return -1 on failure.

# float

Convert a string or number to a floating point number, if possible.

# format

S.format(*args, **kwargs) -> str

Return a formatted version of S, using substitutions from args and kwargs. The substitutions are identified by braces ('{' and '}').

# format_map

S.format_map(mapping) -> str

Return a formatted version of S, using substitutions from mapping. The substitutions are identified by braces ('{' and '}').

# generate_identifier

The function takes a string and returns a hash value of the string. The hash value is truncated to 32 characters. Mainly used for generating an identifier for a string. This will always generate the same hash value for the same string.

# hash

Return the hash value for the given object.

Two objects that compare equal must also have the same hash value, but the reverse is not necessarily true.

# hex

Return the hexadecimal representation of an integer.

hex(12648430) '0xc0ffee'

# index

S.index(sub[, start[, end]]) -> int

Return the lowest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Raises ValueError when the substring is not found.

# int

int([x]) -> integer int(x, base=10) -> integer

Convert a number or string to an integer, or return 0 if no arguments are given. If x is a number, return x.int(). For floating point numbers, this truncates towards zero.

If x is not a number or if base is given, then x must be a string, bytes, or bytearray instance representing an integer literal in the given base. The literal can be preceded by '+' or '-' and be surrounded by whitespace. The base defaults to 10. Valid bases are 0 and 2-36. Base 0 means to interpret the base from the string as an integer literal.

int('0b100', base=0) 4

# isalnum

Return True if the string is an alpha-numeric string, False otherwise.

A string is alpha-numeric if all characters in the string are alpha-numeric and there is at least one character in the string.

# isalpha

Return True if the string is an alphabetic string, False otherwise.

A string is alphabetic if all characters in the string are alphabetic and there is at least one character in the string.

# isascii

Return True if all characters in the string are ASCII, False otherwise.

ASCII characters have code points in the range U+0000-U+007F. Empty string is ASCII too.

# isdecimal

Return True if the string is a decimal string, False otherwise.

A string is a decimal string if all characters in the string are decimal and there is at least one character in the string.

# isdigit

Return True if the string is a digit string, False otherwise.

A string is a digit string if all characters in the string are digits and there is at least one character in the string.

# isidentifier

Return True if the string is a valid Python identifier, False otherwise.

Call keyword.iskeyword(s) to test whether string s is a reserved identifier, such as "def" or "class".

# islower

Return True if the string is a lowercase string, False otherwise.

A string is lowercase if all cased characters in the string are lowercase and there is at least one cased character in the string.

# isnumeric

Return True if the string is a numeric string, False otherwise.

A string is numeric if all characters in the string are numeric and there is at least one character in the string.

# isprintable

Return True if the string is printable, False otherwise.

A string is printable if all of its characters are considered printable in repr() or if it is empty.

# isspace

Return True if the string is a whitespace string, False otherwise.

A string is whitespace if all characters in the string are whitespace and there is at least one character in the string.

# istitle

Return True if the string is a title-cased string, False otherwise.

In a title-cased string, upper- and title-case characters may only follow uncased characters and lowercase characters only cased ones.

# isupper

Return True if the string is an uppercase string, False otherwise.

A string is uppercase if all cased characters in the string are uppercase and there is at least one cased character in the string.

# join

Join a list of items as strings into one string. The following kwargs can be used:

  • separator: The separator used to join the items. Defaults to -.
  • flatten: A boolean indicating whether the list should be flattened before joining. Defaults to false.

Example:

{
  "section": "Map",
  "options": {
    "operations": [
      {
        "function": "join",
        "args": ["$record_id", "$descriptor_id"],
        "kwargs": {"separator": "-"},
        "target": "identifier"
      }
    ]
  }
}

# len

Return the number of items in a container.

# list

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

# ljust

Return a left-justified string of length width.

Padding is done using the specified fill character (default is a space).

# lower

Return a copy of the string converted to lowercase.

# lstrip

Return a copy of the string with leading whitespace removed.

If chars is given and not None, remove characters in chars instead.

# max

max(iterable, *[, default=obj, key=func]) -> value max(arg1, arg2, *args, *[, key=func]) -> value

With a single iterable argument, return its biggest item. The default keyword-only argument specifies an object to return if the provided iterable is empty. With two or more arguments, return the largest argument.

# min

min(iterable, *[, default=obj, key=func]) -> value min(arg1, arg2, *args, *[, key=func]) -> value

With a single iterable argument, return its smallest item. The default keyword-only argument specifies an object to return if the provided iterable is empty. With two or more arguments, return the smallest argument.

# now

Returns new datetime object representing current time local to tz.

tz Timezone object.

If no tz is specified, uses local timezone.

# nullif

Returns None if the condition is true. Otherwise, the value is returned.

See also the separate chapter on conditions in the ETL pipeline.

Examples:

{
    "args": [
        "$input",
        ["$input", "==", "unknown"],
    ],
    "target": "output",
    "function": "nullif",
}
{
    "args": [
        "$input",
        ["and", ["$input", "is not", None], ["$input", "<", "0001-01-01"]],
    ],
    "target": "output",
    "function": "nullif",
}

# oct

Return the octal representation of an integer.

oct(342391) '0o1234567'

# ord

Return the Unicode code point for a one-character string.

# parse_datetime

Parse a string and return a datetime.datetime.

This function supports time zone offsets. When the input contains one,
the output uses a timezone with a fixed offset from UTC.

Raise ValueError if the input is well formatted but not a valid datetime.
Return None if the input isn't well formatted.

# partition

Partition the string into three parts using the given separator.

This will search for the separator in the string. If the separator is found, returns a 3-tuple containing the part before the separator, the separator itself, and the part after it.

If the separator is not found, returns a 3-tuple containing the original string and two empty strings.

# pick

When provided a list of keys, it picks all corresponding values in a list of dicts

Example 1:

{
    "section": "Map",
    "options": {
        "operations": [
            {
                "args": ["$people", "name"],
                "target": "list_of_names",
                "function": "pick"
            }
        ]
    }
}

item:

{
    "people": [{"name": "Hans"}, {"name": "Peter"}]
}

returns:

{
    "people": [{"name": "Hans"}, {"name": "Peter"}],
    "list_of_names": ["Hans","Peter"]
}

Example 2:

{
    "section": "Map",
    "options": {
        "operations": [
            {
                "args": ["$people", "name", "age"],
                "target": "list_of_names",
                "function": "pick"
            }
        ]
    }
}

item:

{
    "people": [{"name": "Hans", "age": 20}, {"name": "Peter", "age": 21}]
}

returns:

{
    "people": [{"name": "Hans", "age": 20}, {"name": "Peter", "age": 21}],
    "list_of_names": [("Hans", 20),("Peter", 21)]
}

# pow

Equivalent to baseexp with 2 arguments or baseexp % mod with 3 arguments

Some types, such as ints, are able to use a more efficient algorithm when invoked using the three argument form.

# range

range(stop) -> range object range(start, stop[, step]) -> range object

Return an object that produces a sequence of integers from start (inclusive) to stop (exclusive) by step. range(i, j) produces i, i+1, i+2, ..., j-1. start defaults to 0, and stop is omitted! range(4) produces 0, 1, 2, 3. These are exactly the valid indices for a list of 4 elements. When step is given, it specifies the increment (or decrement).

# removeprefix

Return a str with the given prefix string removed if present.

If the string starts with the prefix string, return string[len(prefix):]. Otherwise, return a copy of the original string.

# removesuffix

Return a str with the given suffix string removed if present.

If the string ends with the suffix string and that suffix is not empty, return string[:-len(suffix)]. Otherwise, return a copy of the original string.

# replace

Return a copy with all occurrences of substring old replaced by new.

count Maximum number of occurrences to replace. -1 (the default value) means replace all occurrences.

If the optional argument count is given, only the first count occurrences are replaced.

# reversed

Return a reverse iterator over the values of the given sequence.

# rfind

S.rfind(sub[, start[, end]]) -> int

Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Return -1 on failure.

# rindex

S.rindex(sub[, start[, end]]) -> int

Return the highest index in S where substring sub is found, such that sub is contained within S[start:end]. Optional arguments start and end are interpreted as in slice notation.

Raises ValueError when the substring is not found.

# rjust

Return a right-justified string of length width.

Padding is done using the specified fill character (default is a space).

# round

Round a number to a given precision in decimal digits.

The return value is an integer if ndigits is omitted or None. Otherwise the return value has the same type as the number. ndigits may be negative.

# rpartition

Partition the string into three parts using the given separator.

This will search for the separator in the string, starting at the end. If the separator is found, returns a 3-tuple containing the part before the separator, the separator itself, and the part after it.

If the separator is not found, returns a 3-tuple containing two empty strings and the original string.

# rstrip

Return a copy of the string with trailing whitespace removed.

If chars is given and not None, remove characters in chars instead.

# set

set() -> new empty set object set(iterable) -> new set object

Build an unordered collection of unique elements.

# setif

The function sets the value in the target to a specific value when the condition is true.

The first argument is the resulting value (which may contain the value of another field when prefixed with $). The second argument is a condition. See the separate chapter on conditions in the documentation.

When the condition resolves to false, the target field is not updated. But if the field does not yet exist, it is automatically set to None in order to have a consistent set of fields over all items in the pipeline.

Examples:

{
    "args": [
        "A value",
        ["$input", "==", "A"],
    ],
    "target": "output",
    "function": "setif",
},
{
    "args": [
        "$otherField",
        ["$input", "!=" "A"],
    ],
    "target": "output",
    "function": "setif",
}

# setvalue

The function simply sets the value in the target to a specific value. The value argument is the resulting value (which may contain the value of another field when prefixed with $). Examples:

{
    "args": [
        "some value",
    ],
    "target": "some_output",
    "function": "setvalue",
},
{
    "args": [
        "$otherField",
    ],
    "target": "other_output",
    "function": "setvalue",
}

# slice

slice(stop) slice(start, stop[, step])

Create a slice object. This is used for extended slicing (e.g. a[0:10:2]).

# sorted

Return a new list containing all items from the iterable in ascending order.

A custom key function can be supplied to customize the sort order, and the reverse flag can be set to request the result in descending order.

# split

A more robust implementation of Python's str.split. If an exception is raised (e.g. when trying to split None), an empty list is returned.

# splitlines

Return a list of the lines in the string, breaking at line boundaries.

Line breaks are not included in the resulting list unless keepends is given and true.

# startswith

S.startswith(prefix[, start[, end]]) -> bool

Return True if S starts with the specified prefix, False otherwise. With optional start, test S beginning at that position. With optional end, stop comparing S at that position. prefix can also be a tuple of strings to try.

# str

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

# strftime

format -> strftime() style string.

# strip

Return a copy of the string with leading and trailing whitespace removed.

If chars is given and not None, remove characters in chars instead.

# strptime

string, format -> new datetime parsed from a string (like time.strptime()).

# sum

Return the sum of a 'start' value (default: 0) plus an iterable of numbers

When the iterable is empty, return the start value. This function is intended specifically for use with numeric values and may reject non-numeric types.

# swapcase

Convert uppercase characters to lowercase and lowercase characters to uppercase.

# timedelta

Difference between two datetime values.

timedelta(days=0, seconds=0, microseconds=0, milliseconds=0, minutes=0, hours=0, weeks=0)

All arguments are optional and default to 0. Arguments may be integers or floats, and may be positive or negative.

# title

Return a version of the string where each word is titlecased.

More specifically, words start with uppercased characters and all remaining cased characters have lower case.

# tuple

Built-in immutable sequence.

If no argument is given, the constructor returns an empty tuple. If iterable is specified the tuple is initialized from iterable's items.

If the argument is a tuple, the return value is the same object.

# upper

Return a copy of the string converted to uppercase.

# zfill

Pad a numeric string with zeros on the left, to fill a field of the given width.

The string is never truncated.

# zip

The zip function is used for combining multiple lists into one list.

Given I have some lists of data in an item.

{
    "ids": [7, 5, 9],
    "names": ["foo", "bar", "baz"]
}

with the zip function I can pair them together:

"operations": [
    {
        "function": "zip",
        "args": [["$ids", "$names"]],
        "target": "result"
    }
]

and then get a combined list:

{
    "result": [[7, "foo"], [5, "bar"], [9, "baz"]],
    ...
}

and I can also do the inverse operation with the same function by reducing a list level in the input:

"operations": [
    {
        "function": "zip",
        "args": ["$result"],
        "target": "result2"
    }
]

and get:

{
    "result2": [[7, 5, 9], ["foo", "bar", "baz"]],
    ...
}

# Options

  • operations - Operationen required
    • A list of operations, each consisting of at least a function, args and a target. See section documentation for details on how to define operations.
    • Type: List of JSON

# MissingSourceItems

Takes a list of objects representing the source objects expecting a key named identifier_field for every object. Yields a list of objects which are missing in the source but present in the system by comparing identifier_field in the source with identifier_field in the system. Yields the list of missing items.

# Options

  • model - Mode name required

    • Name of the model in lowercase.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
      • externalrepresentation: Externe Repräsentation
  • fields - Fields

    • A list of field names to yield.
    • Type: List of Char
    • Default: ['identifier']
  • filter - Filter

    • A JSON object containing the filters to apply.
    • Type: Dict
  • identifier_field - Identifier field name

    • The name of the filed on the item containing the identifier.
    • Type: Char
    • Default: 'ID_NR'
  • lookup_field - Lookup field name

    • Name of the field on the model used to look up the values from identifier_field.
    • Type: Char
    • Default: 'identifier'

# MyColexSource

This section fetches data from Oikos (opens new window).

Use endpoint to specify which data to fetch.

To authenticate against the MyColex API, an API Token needs to be provided or set as an environment variable (MYCOLEX_API_TOKEN).

Example:

{
  "section": "MyColexSource",
  "options": {
    "base_url": "https://mycolex.example.org",
    "endpoint": "inventory",
    "page_size": 100
  }
}

Example:

{
  "section": "QuerysetSource",
  "options": {
    "model": "record",
    "filter": {
      "is_public": true,
      "files_are_public": true
    }
  }
},
{
  "section": "MyColexSource",
  "options": {
    "base_url": "https://mycolex.example.org",
    "endpoint": "image",
    "page_size": 100,
    "filter": {
      "published": true
    },
    "filter_by_previous_in": {
      "previous_field": "identifier",
      "filter_field": "objects"
    }
  }
}

# Options

  • endpoint - Endpoint required

    • Which API endpoint to query data from (e.g. inventory, building, event, etc.).
    • Type: Char
  • api_token - API Token

    • The token used to authenticate in MyColex. This is usually configured as environment variable and can thus be omitted.
    • Type: Char
  • base_url - URL

    • The URL to MyColex. This is usually configured as environment variable and can thus be omitted.
    • Type: Char
  • filter - Filter

    • Filter parameters passed to the endpoint.
    • Type: JSON
  • filter_by_previous_in - Filter requested data by items in the pipeline

    • The filter_by_previous_in option allows to select rows that have a relation to items consumed by previous sections in the pipeline. When this field is configured, it must contain an object with two configurations, the filter_field name, where the relation is stored in the source system and the previous_field, which is the name of the field where the foreign identifier is stored in the consumed items. See the example for reference.
    • Type: Options
      • filter_field - Filter field name required

        • Name of the field in MyColex.
        • Type: Char
      • previous_field - Previous field name required

        • Name of the field in the pipeline item.
        • Type: Char
  • page_size - Elements per request

    • Amount of items that are loaded in one page. Usually the default is fine.
    • Type: Integer
    • Min: 1
    • Max: 1000

# PipelineSnippet

The PipelineSnippet will substitute all sections from an existsing pipeline by passing the pipeline_id in the options. In the following example, all sections from the pipeline with id 2 will be substituted in the pipeline.

Example with input mode:

[
    { "section": "CsvFileReader" },
    {
        "section": "PipelineSnippet",
        "options": {
            "pipeline_id": 2,
        }
    },
]

# Options

  • pipeline_id - Pipeline id

    • Type: Integer
  • pipeline_title - Pipeline title

    • Type: Char

# Printer

Print pipeline items to the pipeline output stream (log) in JSON, YAML or Python format.

# Options

  • condition - Condition

    • Only print the file content when the condition resolves to true.
    • Type: BooleanExpression
    • Default: {{ True }}
  • data - Data

    • Data to print.
    • Type: Expression
    • Default: {{ item }}
  • format - Output format

    • The output is converted first to this format.
    • Type: Choice
    • Default: 'json'
    • Choices:
      • python: As python data structure (pformat)
      • json: As JSON string
      • yaml: As YAML string
  • limit - Limit

    • Only print the first n items.
    • Type: Integer
    • Default: 100
  • prefix - Prefix

    • The first line of the logger output.
    • Type: Expression
    • Default: Printer ({{ index }})

# Publish

The publish section publishes objects of various models from the internal database to the external database.

In the internal database we have three groups of models:

  1. Models that are stored in a regular Postgres database in the internal and external system with the same model. They can be published with this Publish section.
  2. Records are stored in Postgres in internal, but are only stored in Solr in the external system. They cannot be published with the Publish section, but the Solr section which designed for this exact case.
  3. Models that are never published.

In the Publish section, we are publishing objects of models of type 1, expecting to have a 1:1 equivalent in the external.

Examples:

{
  "section": "Publish",
  "options": {
    "model": "file"
  }
}
{
  "section": "Publish",
  "options": {
    "model": "descriptorrecordrelation"
  }
}

# Options

  • model - Model name required

    • Name of the model to be published, with all lowercase characters.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
      • externalrepresentation: Externe Repräsentation
  • batch_size - Batch size

    • Amount of objects to be published in one batch.
    • Type: Integer
    • Default: 200
  • prune - Prune

    • Delete all objects of this model from the external database before starting to publish. Use with caution and only when we really have to replace all data.
    • Type: Boolean
    • Default: false

# QuerysetSource

The queryset source section loads objects from the database and yields the objects into the pipeline. It is possible to apply simple queries by using "filter" and "exclude" functions of the Django ORM.

This can be used in combination with transform sections when data must be changed within the database.

Example:

{
  "section": "QuerysetSource",
  "options": {
    "model": "record",
    "filter": {
      "is_public": true,
      "files_are_public": true
    }
  }
}

The filter and exclude values can be defined as Expressions and will be evaluated. In the expression, the current item cannot be accessed as this is a load-only section which does not process items but only adds new items.

# Options

  • model - Model name required

    • Name of the model from which we want to load data.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
      • externalrepresentation: Externe Repräsentation
  • batch_size - Batch size

    • The number of objects that are loaded from the database at once.
    • Type: Integer
    • Default: 100
  • distinct - Distinct

    • If checked, only distinct objects are yielded into the pipeline.
    • Type: Boolean
    • Default: false
  • exclude - Exclude

    • Objects passing this condition are not yielded into the pipeline. The condition is defined in a Django ORM compatible way. The values can be defined as expression and will be evaluated.
    • Type: Dict of Expression
    • Default: null
  • filter - Filter

    • Objects passing this condition are yielded into the pipeline. The condition is defined in a Django ORM compatible way. The values can be defined as expression and will be evaluated.
    • Type: Dict of Expression
    • Default: null

# RawSource

The raw source section allows to inject raw data into the pipeline. This can be helpful for testing pipelines when you want to work with a specific set of items.

Example:

{
  "section": "RawSource",
  "options": {
    "data": [
      {"key": "foo"},
      {"key": "bar"}
    ]
  }
}

# Options

  • data - Data required
    • Raw list of items to feed into the pipeline.
    • Type: JSON

# ResolveForeignKey

Resolves foreign keys for a given model and pops the result on the item.

The ResolveForeignKey section is used when having a foreign key identifier but we need the internal primary key of the object. The section looks up the object by the identifier field and replaces it with the pk so that later sections, such as the create-sections, can process the data.

When we cannot find an object with this identifier in our database, the behavior can be configured in the missing option:

  • raise (default): raise an exception and stop the pipeline execution. This can be used when we do not expect that it is missing and need to investigate such cases.
  • drop: remove the whole item from the pipeline and do not further process it. This can be used when importing n:n relations where both ends must exist.
  • nullify: set the value to null. This can be used when importing 1:1 relations which may or may not have the target imported as well.

Example:

{
  "section": "ResolveForeignKey",
  "options": {
    "model": "descriptor",
    "lookup": "identifier",
    "source": "ID_NR_1",
    "target": "from_descriptor_id",
    "missing": "drop"
  }
},
{
  "section": "ResolveForeignKey",
  "options": {
    "model": "descriptor",
    "lookup": "identifier",
    "source": "ID_NR_2",
    "target": "to_descriptor_id",
    "missing": "drop"
  }
},

# Options

  • lookup - Model field required

    • Name of the field on the model containing the identifier. Usually identifier.
    • Type: Char
  • model - Model name required

    • Name of the model where the lookup should be executed.
    • Type: Choice
    • Choices:
      • record: Datensatz
      • recordrelation: Verknüpfung von Verzeichnungseinheiten
      • file: Files
      • descriptor: Deskriptor
      • descriptorrelation: Verknüpfung von Deskriptoren
      • descriptorrecordrelation: Verknüpfung von Deskriptoren und Verzeichnungseinheiten
      • delivery: Ablieferung
      • deliveryrecordrelation: Verknüpfung von Ablieferungen und Verzeichnungseinheiten
      • container: Behältnis
      • containerrecordrelation: Verknüpfung von Behältnissen und Verzeichnungseinheiten
      • enumentry: Enum Wert
      • package: Package
      • location: Standort
      • externalrepresentation: Externe Repräsentation
  • source - Source field required

    • Name of the field in the source item to get the identifier from.
    • Type: Char
  • target - Target field required

    • Name of the field where the primary key should be stored in.
    • Type: Char
  • many - Many

    • If checked, the section will resolve multiple objects and store them in a list.
    • Type: Boolean
    • Default: false
  • missing - Missing behavior

    • Behavior, when the object is not found. This only applies when many=False.
    • Type: Choice
    • Default: 'raise'
    • Choices:
      • raise: Abort the pipeline run with an exception (default)
      • drop: Remove the item from the pipeline
      • nullify: Set the value to null
  • values - Field values to retrieve

    • A list of fields names for which values should be retrived. When there is only one field name, the value will be directly inserted in target. With multiple field names, a key/value object will be inserted.
    • Type: List
    • Default: ['id']

# S3

With the S3 section, it is possible to interact with an S3 object store from within the pipeline.

The connection details can either be provided directly or from a preconfigured S3 connection defined in an S3_CONFIGS environment variable. The connection configuration options support expressions, so that the details can be retrieved from a previous item, from a variable or from a store value.

Example connection configuration:

{
    "section": "S3",
    "options": {
        "s3_endpoint_url": "https://s3.example.org/",
        "s3_bucket": "{{ item.bucket_name }}",
        "s3_key": "{{ env.S3_EXAMPLE_KEY }}",
        "s3_secret": "{{ env.S3_EXAMPLE_SECRET }}",
        ...
    }
}

Example named connection configuration:

{
    "section": "S3",
    "options": {
        "s3_name": "example_source",
        ...
    }
}

# List objects

The list_objects operation lists objects in an S3 bucket. By default, an item representation for each S3 object is injected invididually into the pipeline.

Example:

{
    "section": "S3",
    "options": {
        "s3_name": "internal-s3",
        "list_objects": {}
    }
}

This lists all objects in the bucket and inserts them as individual items into the pipeline. An item usually looks like this:

{
    "Key": "foo/bar.txt",
    "LastModified": datetime.datetime(2024, 1, 1),
    "ETag": '"d41d8cd98f00b204e9800998ecf8427e"',
    "Size": 123,
    "StorageClass": "STANDARD",
},

If you have already items in the pipeline from a previous section and want to operate on these items, e.g. for extracting an S3 connction or a path prefix, you can use the foreach option:

Example:

{
    "section": "S3",
    "options": {
        "s3_name": "internal-s3",
        "list_objects": {
        "foreach": {},
        "prefix": "{{ item.prefix }}"
        }
    }
}

This will remove the previous item from the pipeline and add each S3 object item to the pipeline.

If you want to get a list of S3 object keys and insert them as a list in the previous item, you can do it with:

{
    "section": "S3",
    "options": {
        "s3_name": "internal-s3",
        "list_objects": {
            "foreach": {
                "target_path": "/items",
                "keys_only": true
            }
        }
    }
}

Be aware that when you do that, there is a max_results limit which defaults to 1000. This is because it can cause performance issues when you load too much data into one item.

# Download file

With the download_file operation, a file of a particular object can be downloaded from the S3. The object is identified by it's Key in the bucket. The operation operates on previous items in the pipeline and expects that there is a value Key defined on the item, containing the object key (path on S3). This is compatible with the output of the list_objects operation, so that the operations can be chained and they easily work together.

The section downloads the file and returns an item with the necessary infos:

{
    "filename": "data.json",
    "key": "path/on/s3/data.json",
    "path": "/tmp/etl/s3/e732f3-data.json"
}

The file is stored in the temporary directory of the ETL pipeline and the path is passed on in the path key.

Example with combined operations:

{
    "section": "S3",
    "options": {
        "s3_name": "test",
        "list_objects": {
            "prefix": "examples/json"
        }
    }
},
{
    "section": "S3",
    "options": {
        "s3_name": "test",
        "download_file": {}
    }
},
{
    "section": "JSONReader"
}

This pipeline will download all files below the path examples/json/* from the S3 and read the JSON content.

# Upload file

The S3Uploader section consumes items with paths to a temporary file on the disk (for instance produces with a JSONWriter section) and uploads those files to an S3 storage.

Example:

Input:

[
    {
        "path": "/tmp/etl/json_writer/data.json",
        "filename": "data.json"
    }
]

Section configuration:

{
    "section": "S3Uploader",
    "options": {
        "s3_name": "json-dump",
        "upload_file": {
            "path": "{{ item.filename }}",
            "key": "data-dump/januar/{{ item.filename }}"
        }
    }
}

The target key can either be defined with the key option as expression, or with the directory option. When the directory option is used, the key is generated by appending the filename of path to the `directory.

Access / ACL:

By default, the uploaded files are not publicly / anonymously accessible, because the default ACL setting is private. In order to make it available to everyone anonymously, the acl option can be set to public-read.

Pruning:

The prune option lets the section remove older files (configurable through keep_hours), so that the S3 bucket is not filled up when configuring the pipeline to run automatically / nightly. Pruning only works when directory is configured. Be aware that in this case the item variable is not available in the directory expression at pruning time, since pruning is not a per-item operation.

Delete:

By default, the temporary files on the disk are deleted after uploading in order to reduce disk size. You can disable this behavior with the delete option when you need the files later.

# Options

  • download_file - Download file

    • Download the file of an object.
    • Type: Options
    • Default: null
      • cleanup - Cleanup files

        • When enabled, the temporary downloaded file is automatically removed after passing on the item in the pipeline.
        • Type: Boolean
        • Default: true
      • key - Key

        • Key (path) of the S3 object.
        • Type: Expression
        • Default: {{ item.Key }}
  • list_objects - List objects

    • List objects on the S3 storage.
    • Type: Options
    • Default: null
      • foreach - Foreach

        • When using the foreach option, the listing is executed for each item that was previously in the pipeline. This enables the possibility that the item can be accessed in expression fields using the item variable name.
        • Type: Options
        • Default: null
          • keys_only - Keys only

            • Only return a list of keys as strings. This can only be used in combination with target_path.
            • Type: Boolean
            • Default: false
          • max_results - MaxResults

            • Limits the maximum amount of results. This can only be used in combination with target_path.
            • Type: Integer
            • Default: 1000
            • Min: 1
          • target_path - Target path

            • JSON path to the key on the previous item, where the list of results should be stored. When the option is omitted, the previous item is removed from the pipeline and each resulting item is added to the pipeline individually. When this option is used with a large object count, this can can cause performance problems; consider using the keys_only option.
            • Type: Expression
            • Default: null
      • prefix - Prefix

        • Limits the response to keys that begin with the specified prefix.
        • Type: Expression
        • Default: null
      • start_after - StartAfter

        • StartAfter is where you want Amazon S3 to start listing from. Amazon S3 starts listing after this specified key. StartAfter can be any key in the bucket.
        • Type: Expression
        • Default: null
  • s3_bucket - S3 Bucket Name

    • The name of the S3 bucket.
    • Type: Expression
    • Default: null
  • s3_endpoint_url - S3 Endpoint URL

    • The endpoint URL of the S3 service.
    • Type: Expression
    • Default: null
  • s3_key - S3 Access Key

    • The S3 access key for connecting to S3.
    • Type: Expression
    • Default: null
  • s3_name - S3 config name

    • The name of the preconfigured S3 configuration.
    • Type: Expression
    • Default: null
  • s3_secret - S3 Secret Key

    • The S3 secret key for connecting to S3.
    • Type: Expression
    • Default: null
  • upload_file - Upload file

    • Upload a file to an S3 bucket.
    • Type: Options
    • Default: null
      • acl - S3 ACL

        • S3 ACL setting, determining whether the file can be downloaded publicly or not.
        • Type: Choice
        • Default: 'private'
        • Choices:
          • private: private
          • public-read: public-read
      • delete - Delete

        • Delete the local file after uploading in order to reduce disk usage to a minimum.
        • Type: Boolean
        • Default: true
      • directory - Directory key

        • The key of the directory, in which a new file is created on the S3 storage. The filename of path will be used as the filename of the new object.
        • Type: Expression
        • Default: null
      • keep_hours - Keep hours

        • When pruning, keep files on the s3 that are younger than the configured amount of hours.
        • Type: Integer
        • Default: 62
      • key - Key

        • The key of the object on the S3 storage (target).
        • Type: Expression
        • Default: null
      • path - Path

        • The path to the file on the local temp directory (source).
        • Type: Expression
        • Default: {{ item.path }}
      • prune - Prune

        • Before uploading, remove existing files from the s3. Requires a path to be set for precaution.
        • Type: Boolean
        • Default: false

# S3Uploader

Deprecated

This section is deprecated in favor of the new, generic S3 section which also supports file uploading. The S3Uploader section will be removed in a future release.

The S3Uploader section uploads single files to an S3 bucket.

The section consumes items with a path to a temporary file (for instance produced by the JSONWriter section), reads the files from the temporary file system and uploads each file to an S3 bucket.

S3 configuration:

The uploader section uses preconfigured, named S3 targets. The reason is that the credentials (secrets) cannot be configured / exposed in the pipeline because of security reasons. The available S3 targets are configured through the enviornment variable S3_CONFIGS, where each preconfigured target has a name. The section needs to know the name of the target, therefore this has to be configured with the s3_name option.

Access / ACL:

By default, the uploaded files are not publicly / anonymously accessible, because the default ACL setting is private. In order to make it available to everyone anonymously, the acl option can be set to public-read.

Pruning:

The prune option lets the section remove older files (configurable through keep_hours), so that the S3 bucket is not filled up when configuring the pipeline to run automatically / nightly. Be aware that for the prune option to work, a path needs to be configured, so that the files are uploaded to a folder instead of the bucket root. This is in order to prevent from accidentally deleting a whole bucket.

Full example:

Input:

[
  {"path": "/tmp/etl/json_writer/data.json"}
]

Section configuration:

{
  "section": "S3Uploader",
  "options": {
    "s3_name": "json-dump",
    "path": "enums",
    "delete": true,
    "acl": "private",
    "prune": true,
    "keep_hours": 120
  }
}

# Options

  • s3_name - S3 config name required

    • The name of the preconfigured S3 configuration.
    • Type: Char
  • acl - S3 ACL

    • S3 ACL setting, determining whether the file can be downloaded publicly or not.
    • Type: Choice
    • Default: 'private'
    • Choices:
      • private: private
      • public-read: public-read
  • delete - Delete

    • Delete the local file after uploading in order to reduce disk usage to a minimum.
    • Type: Boolean
    • Default: true
  • keep_hours - Keep hours

    • When pruning, keep files on the s3 that are younger than the configured amount of hours.
    • Type: Integer
    • Default: 72
  • path - Path

    • The path (directory) on the s3 within the bucket.
    • Type: Char
    • Default: null
  • prune - Prune

    • Before uploading, remove existing files from the s3. Requires a path to be set for precaution.
    • Type: Boolean
    • Default: false

# SchedulePipelines

The SchedulePipelinesSection schedules runs for pipelines specified by either a list of IDs or titles.

Example with ids:

[
    {
        "section": "SchedulePipelines",
        "options": {
            "pipeline_ids": [1,2,3]
        }
    }
]

Example with titles:

[
    {
        "section": "SchedulePipelines",
        "options": {
            "pipeline_titles": [
                "pipeline_1",
                "pipeline_2",
                "pipeline_3"
            ]
        }
    }
]

# Options

  • pipeline_ids - Pipeline ID's

    • List of Pipeline ID's to be scheduled
    • Type: List of Integer
    • Default: []
  • pipeline_titles - Pipeline Titles

    • List of Pipeline titles to be scheduled
    • Type: List of Char
    • Default: []

# ScopeArchivSource

A section generic to fetch data from Scope Archiv using the Scope Archiv Connector.

The Scope Archiv Connector is a separate, optional service which reads preconfigured Oracle views from a Scope Archiv database and provides them as a REST API to the ETL pipeline. The ScopeArchivSource section handles the communication with the service.

Use table_name and columns to specify which data to fetch. order_columns defines the columns used to order the data, they should be unique together.

Take care when configuring order_columns: it should be one or more columns, which are indexed and are unique together. It must be indexed in order to have good enough performance on oracle and it must be unique together in order for the pagination to worker properly; otherwise data will be missing. In this context, the order is solely the order the items are loaded and processed in. Since items cannot have dependencies within the pipeline, this must be optimized for best performance.

Use a single column when querying "primary" data views, containing records which each have an own unique identifier. Use multiple columns on the other hand when reading from junction tables which usually do not have one column that is unique.

Example:

{
  "section": "ScopeArchivSource",
  "options": {
    "columns": [
      "ID_NR",
      "TITEL",
      "PARENT_ID_NR",
      "ZWEIG_POSITION",
      "SIGNATUR",
      "ANZEIGE_INTERNET"
    ],
    "page_size": 100,
    "filter_sql": "\"HIERARCHIE_PFAD\" NOT LIKE '00000796390000546102%'",
    "table_name": "public.mvk_dls_vrzng_enht_1",
    "order_columns": [
      "ID_NR"
    ]
  }
}

With the above example, the section acts as the first section in the pipeline. It can also be used for looking up specific items based on other items that are already in the pipeline, e.g. all files for specific records.

Example:

{
  "section": "QuerysetSource",
  "options": {
    "model": "record",
    "filter": {
      "is_public": true,
      "files_are_public": true
    }
  }
},
{
  "section": "ScopeArchivSource",
  "options": {
    "columns": [
      "PID",
      "ID_NR_VRZNG_ENHT"
    ],
    "page_size": 100,
    "table_name": "public.mvk_dls_ve_vrkng_fdra_1",
    "order_columns": [
      "PID",
      "ID_NR_VRZNG_ENHT"
    ],
    "filter_by_previous_in": {
      "column": "ID_NR_VRZNG_ENHT",
      "previous_field": "identifier"
    }
  }
}

# Options

  • columns - Columns required

    • A list of columns to be selected from the source.
    • Type: ListSerializer
  • order_columns - Order columns required

    • A list of one or more columns for ordering, which are indexed and unique together.
    • Type: ListSerializer
  • table_name - Table name required

    • The name of the table / oracle view to select from.
    • Type: Char
  • filter_by_previous_in - Filter requested data by items in the pipeline

    • The filter_by_previous_in option allows to select rows that have a relation to items consumed by previous sections in the pipeline. When this field is configured, it must contain an object with two configurations, the column name, where the relation is stored in the source system and the previous_field, which is the name of the field where the foreign identifier is stored in the consumed items. See the example for reference.
    • Type: Options
      • column - Filter field name required

        • Name of the column in the source system.
        • Type: Char
      • previous_field - Previous field name required

        • Name of the field in the pipeline item.
        • Type: Char
      • target_path - Path to store the result

        • If configured, the result is stored in the pipeline item under this JSON path.
        • Type: Char
        • Default: null
  • filter_sql - SQL Filter Klausel

    • Additional WHERE clause in raw SQL for when we want to already filter on the source for improving performance. The filter_sql statement is joined with the generated filter_by_previous_in clause with AND.
    • Type: Char
  • page_size - Page size

    • Amount of items that are loaded in one page. Usually the default is fine.
    • Type: Integer
    • Min: 1
    • Max: 1000
  • pagination - Pagination

    • Depending on how the table looks, different pagination implementation are more efficient. Can be cursor or offset. By default, the cursor pagination is used when we sort by only one column, the offset pagination for multiple columns. Usually it is best to not configure this option.
    • Type: Choice
    • Choices:
      • cursor: Cursor-based pagination
      • offset: Pagination with offset and limit

# SkipProcessedFiles

The SkipProcessedFiles section has the job to detect as early as possible when we are processing files that we have already processed and skip those files.

Importing files is very time consuming. That's why we want to skip files when we already have processed and imported them before. This makes it possible that pipelines can be aborted and restarted when needed and they can be configured in a way that they will skip items as long as they are already processed.

In order for being able to also make updates in the configuration (e.g. tile sizes of images) and re-process the files, there is a pipeline_version string configured, which is stored on each file. If the pipeline version string from the pipeline configuration matches with the one stored in the file, the file is skipped. When the configuration is changed and the goal is to rerun everything, the pipeline version configuration in the section is increased and the pipeline rerun. With this mechanism, we can successively update the images and always have data published.

Be aware that this is optimized for non-Fedora sources by default. In the Fedora case, you must set the fedora flag to true so that only the first part of the identifier is compared, which is equal to the PID in the fedora case. This is necessary because one package (PID) can result in multiple files.

Example:

{
  "section": "SkipProcessedFiles",
  "options": {
    "identifier_field": "ID_NR_DATEI",
    "pipeline_version": "1.0.0"
  }
}

# Options

  • pipeline_version - Pipeline version required

    • Pipeline version string. The skipping is based on this string. Can be changed when the pipeline is updated.
    • Type: Expression
  • batch_size - Batch size

    • Amount of items to be processed in one batch.
    • Type: Integer
    • Default: 50
  • fedora - Fedora mode

    • Set to true when the source is fedora, causing the identifiers to be compared by removing the path.
    • Type: Boolean
    • Default: false
  • identifier_field - Identifier field name

    • Name of the field containing the unique identifier of the file.
    • Type: Char
    • Default: 'ID_NR_DATEI'

# Slice

The slice section is used for only processing a certain slice of the items yielded by a previous section.

This is useful for testing pipeline configurations. It allows to limit the amount of items yielded but also to skip the first n items by setting start to n.

Example:

{
  "section": "Slice",
  "options": {
    "limit": 100,
    "start": 0
  }
}

# Options

  • limit - Limit required

    • The maximum amount of items to be yielded.
    • Type: Integer
  • start - Start

    • Skip the first n items and do not yield them.
    • Type: Integer
    • Default: 0

# Solr

The Solr section indexes records in the external, public Solr instance.

In the DLS, the Solr instance is installed in the external part and data indexed in Solr is available for the public in the search and used for displaying data in the web.

Solr is configured in a way that it only indexes records that have the is_public flag set to True.

The Solr section consumes items from the previous section, identifies them based on the identifier_field (which must match the value on the record in the identifier field) and publishes them.

It is also possible to prune the Solr, removing all data from Solr. This has the impact that the records can no longer be found in the web until indexing is fully completed.

Example:

{
  "section": "Solr",
  "options": {
    "prune": false,
    "batch_size": 500,
    "identifier_field": "identifier",
    "lookup_field": "identifier"
  }
}

# Options

  • batch_size - Batch size

    • Amount of items to be processed in one batch.
    • Type: Integer
    • Default: 200
  • identifier_field - Identifier field name

    • Name of the field in the consumed item that contains the unique identifier.
    • Type: Char
    • Default: 'identifier'
  • lookup_field - Lookup field name

    • Name of the field on the record used to look up the values in the identifier_field.
    • Type: Char
    • Default: 'identifier'
  • prune - Prune

    • Remove all data from Solr before starting to index.
    • Type: Boolean
    • Default: false

# Store

This section allows to read and write values to the store. The store is a key-value store that is persisted in the database and the values can be shared between all ETL pipelines over multiple pipeline runs. The store is organized in contexts. Each context has its own set of keys. The default context is default.

# Write operation

{
    "section": "Store",
    "options": {
        "write": {
            "await_success": true,
            "context": "default",
            "key": "my_key",
            "value": "my_value"
        }
    }
}

This will write the value my_value to the store with the key my_key in the default context. The context and key pair must be unique in the store. The await_success option can be used to wait for the whole pipeline to finish successfully before writing the value to the store. When this option is set to true (default), the value will be written to the store after the pipeline has finished successfully. In case of an error in any section of the pipeline, the value will not be written to the store.

When using the await_success option, the key-value pair will not be available in the pipeline for the next store sections. When using the await_success option, the key-value pair will not be available in the pipeline. Setting the await_success option to false will write the value to the store immediately.

The write options needs to have a previous section in the pipeline. So it can not be the first section.

# Read operation

The read option can be used to read a value from the store and set it to a key in the item.


{
    "section": "Store",
    "options": {
        "read": {
            "context": "default",
            "key": "my_key",
            "foreach": {
                "target_path": "/my_target_path"
            }
        }
    }
}

When the foreach option is used, the section will be executed for each item in the pipeline. The value from the store will be inserted into the item with the target_path key. The foreach option is optional and must be a valid JSON Pointer. When no foreach is specified, the item from the previous section will be replaced by a new item containing the store value.

# Delete operation

The delete option can be used to delete a value from the store.

{
    "section": "Store",
    "options": {
        "delete": {
            "context": "default",
            "key": "my_key"
        }
    }
}

# List operation

The list option can be used to read all values of a contet from the store as a new item in the pipeline.

{
    "section": "Store",
    "options": {
        "list": {
            "key": "my_key",
            "context": "default"
        }
    }
}

This will insert a new item in the pipeline which has a key value containing a mapping of the existing key/value pairs within the store context.

# Set context operation

This operation sets the store context for all future section. This impacts later Store sections but also sets the store context for expression fields in other sections. The default context is default.

To set the context to my_context:

{
    "section": "Store",
    "options": {
        "set_context": {
            "context": "my_context"
        }
    }
}

# Options

  • delete - Delete operation

    • Removes key/value pairs from the store.
    • Type: Options
    • Default: null
      • context - Store context

        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • key - Store key

        • The key for the identifying the key/value pair to delete. When omitted, all key/value pairs in the context are deleted. It must contain only alphanumeric characters and underscores.
        • Type: Expression
        • Default: null
      • remove_item - Remove pipeline item

        • When set to True, the pipeline item is removed from the pipeline after deleting from the store.
        • Type: Boolean
        • Default: false
  • list - List operation

    • Reads all key/value pairs within a context from the store.
    • Type: Options
    • Default: null
      • context - Store context
        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
  • read - Read operation

    • Reads all key/value pairs within a context from the store.
    • Type: Options
    • Default: null
      • key - Store key required

        • The key to read the value from. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • context - Store context

        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • foreach - Foreach

        • When used, the value is read and updated into each processed item in the pipeline. When omitted, a new item is inserted into the pipeline containing the value.
        • Type: Options
          • target_path - Path to store the result required
            • Path to the key on the item in the pipeline where the value will be stored. "" can be used to replace the entire item.
            • Type: Char
  • set_context - Set context operation

    • Sets the default store context for later sections in the pipeline.
    • Type: Options
    • Default: null
      • context - Store context required
        • The default store context to set for later sections.
        • Type: Char
  • write - Write operation

    • Writes a key/value pair into the store.
    • Type: Options
    • Default: null
      • key - Store key required

        • The key for writing the value. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • value - Store value required

      • await_success - Await success

        • When set to true, the value is not written until the pipeline terminates without errors.
        • Type: Boolean
        • Default: false
      • context - Store context

        • The store context in which to operate. When omitted, the default store context of a previous Store.set_context operation is used or the context default. It must contain only alphanumeric characters and underscores.
        • Type: Expression
      • remove_item - Remove pipeline item

        • When set to True, the pipeline item is removed from the pipeline after writing to the store.
        • Type: Boolean
        • Default: false

# TemplateWriter

The TemplateWriter section consumes items and writes them to the output_filename using the template configured through the template_filename. The template can have any content along jinja2 expressions to render the items. The items' context is called items. The path to the output file is yielded to the next section which may process it further.

In detail, the section writes all items consumed from the prior section into files on the temp file system of the ETL worker. It does not yield the consumed items to the next section but it yields only one item per written file, which contains the path to the file on the disk.

The written files are automatically removed after they are processed by the next section in order to keep disk usage low.

The items are written in batches (configurable through batch_size). The batch_size is also accessible for the output_filename as a context variable.

  • batch - the batch number, starting with 1.
  • date - the date when the pipeline was executed in the form YYYMMMDD.
  • time - the date and time in form YYYYMMDD-hhmmss; the date and time is the same for all written batch files for convenience, thus it does not resemble the time the file was written but the time the pipeline run was started.
  • items - The items accessible in the template for the current batch.

Example configuration:

{
  "section": "TemplateWriter",
  "options": {
    "batch_size": 200,
    "output_filename": "data-{{ date }}-{{ batch }}.xml",
    "template_filename": "template.xml"
  }
},

Example output with two batches:

[
  {"path": "/tmp/etl/template_writer/data-20221229-1.xml", "size": 200},
  {"path": "/tmp/etl/template_writer/data-20221229-2.xml", "size": 173}
]

WARNING

Be aware that when batch_size is set to a non-zero value and batch is missing in the output_filename template, the section will yield the same file path / filename multiple times with multiple batches of items. Depending on what is done with the files, the file may be overwritten on the target system and may result in missing data.

# Options

  • template_filename - Template filename required

  • batch_size - Batch size

    • Amount of objects to be written in one file. Set it to 0 in order to write all items into one file.
    • Type: Integer
    • Default: 1000
  • output_filename - Output filename

    • Filename template of the output file
    • Type: Expression
    • Default: data-{{ batch }}.xml

# Terminate

When the Terminate section's condition is truthy for the first item, the pipeline is terminated, so that no further items are processed by previous sections.

The Terminate section is similar to the Filter section, but the Filter section keeps processing items individually and may remove them from the pipeline. The Terminate section is a bit different: it can be used for performance optimization when the condition indicates that no later item must be processed and therefore it does not make sense to keep consuming data from the data source.

The condition field of the Terminate section uses the newer expression field syntax for defining conditions.

Example:

{
    "section": "Terminate",
    "options": {
        "condition": "{{ item.value > 2 }}"
    }
}

# Options

  • condition - Terminate condition required
    • When the first processed item has a thruthy condition, the pipeline is terminated.
    • Type: Expression

# TraverseTree

Traverse the tree structure of the records and yield the nested tree structure.

The tree structure is built by recursively traversing the tree and building the nested tree structure. The depth step determines how many levels to traverse in one iteration. The section will yield the nested tree structure for each depth step.

The calculation of the depth step is done by calculating the maximum depth of the tree and then subtracting the depth step until the depth is 0.

For example, if the maximum depth is 14, and the depth step is set to 10, the section will yield two tree structures.

  • One is generated from the depth 4 to 14,
  • and the other from the depth 1 to 4.

The reason for starting from the bottom of the tree is to optimize the number of the files generated. Middle depths mainly contain the most number of the nodes and records. By starting from the bottom, the section can yield the tree structure with the least number of the nodes.

The Record and Node objects are not serialized within the yielded tree structure.

The yielded data structure is as follows:

{
    "path": "0001",
    "record": Record<identifier=1>,
    "children": [
        {
            "path": "00010001",
            "record": Record<identifier=2>,
            "children": [
                {
                    "path": "000100010001",
                    "record": Record<identifier=3>,
                    "children": [],
                }
            ]
        },
    ],
}

The section yields only the public data. Protected data will not be included in the yielded tree structure.

# Options

  • depth_step - Depth step

    • The depth step to traverse the tree. Determines how many levels to traverse in one iteration.
    • Type: Integer
    • Default: null
  • max_depth - Last depth

    • The last depth to traverse. The section will traverse the tree until this depth.
    • Type: Integer
    • Default: null
  • min_depth - First depth

    • The first depth to traverse. The section will start traversing the tree from this depth.
    • Type: Integer
    • Default: null

# Tree

The Tree section builds a tree based on the record, so that the records can be displayed as tree in the web.

Create the tree (Node instances) based on records in the database. Starting with the root node, all immediate children nodes of this record are created. If the children have further children, they are recursively created as well.

Yield the data used to create the tree.

Example:

{
  "section": "Tree",
  "options": {
    "root_value": "-1"
  }
}

# Options

  • root_value - Root value
    • Identifier of the root record. There must be exactly one root record.
    • Type: Char
    • Default: '-1'

# TreeSorter

Sort items for building a tree.

The sorter makes sure that parents always are before their children in the pipeline.

The sorter also handles orphan nodes according to the configuration.

Options:

  • id_field: name of the ID field of a node.
  • parent_id_field: name of the parent ID field of a node.
  • root_value: parent_id_field value of the root node.
  • orphans: Handling of orphan nodes.

Orphan node options:

  • "drop": drop the records (default)
  • "raise": abort the pipeline run with an exception

# Options

  • id_field - ID field name

    • Type: Char
    • Default: 'ID_NR'
  • orphans - Orphan strategy

    • Type: Choice
    • Default: 'raise'
    • Choices:
      • drop: Drop the records (default)
      • raise: Abort the pipeline run with an exception
  • parent_id_field - Parent-ID field name

    • Type: Char
    • Default: 'PARENT_ID_NR'
  • root_value - Root value id

    • Type: Char
    • Default: '-1'

# Variables

Variables can be set during the execution time, so that they can be used in expression fields of other sections. Variables can either be set raw (using the variables option) or by rendering the values as expressions.

Simple example:

[
    {
        "section": "Variables",
        "options": {
            "variables": {
                "mapping": {
                    "key1": "Label 1",
                    "key2": "Label 2"
                }
            }
        }
    },
    {
        "section": "AttributeMapper",
        "options": {
            "mapping": [
                {
                    "value": "{{ var.mapping.get(item.key, item.key) }}",
                    "target": "label"
                }
            ]
        }
    }
]

# Options

  • expressions - Expression variables

    • The value of these variables are evaluated as expression before setting. item is only avaiable when mode is set to each.
    • Type: Dict of Expression
    • Default: {}
  • mode - Mode

    • The mode defines, when and how often the variables are set.
    • Type: Choice
    • Default: 'once'
    • Choices:
      • once: Variables are set only once when starting the pipeline.
      • each: The variables are set per each item.
  • variables - Raw variables

    • Dictionary of variables, where the value is set without processing (raw).
    • Type: Dict
    • Default: {}

# XMLReader

The XMLReader section allows to read XML files and load containing items into the pipeline.

This section expects the previous section to yield a dictionary with a path key, such as it is done by the HTTP section. The path key should point to a XML file. An example of a valid item is:

[
    {
        "path": "/path/to/file.xml"
    }
]

Lets say we have an XML file with the following structure:

<SearchDetailResponse xmlns="http://www.cmiag.ch/cdws/searchDetailResponse">
    <Verzeichnungseinheit>
        <ID>164494</ID>
        <Titel>Abteilung für die XYZ</Titel>
        <Verzeichnungsstufe>Serie</Verzeichnungsstufe>
        <Findmittel/>
    </Verzeichnungseinheit>
</SearchDetailResponse>

We can use the items option to specify the path to the items in the XML file. The path should be a valid JSON pointer. For getting more information about the JSON Pointer syntax, please refer to the JSON Pointer documentation (opens new window).

For example, if we want to yield the Verzeichnungseinheit items, we can use the following configuration:

{
    "section": "XMLReader",
    "options": {
        "items": "/SearchDetailResponse/Verzeichnungseinheit"
    }
}

This will yield the following items:

[
    {
        "ID": "164494",
        "Titel": "Abteilung für die XYZ",
        "Verzeichnungsstufe": "Serie",
        "Findmittel": null,
    }
]

# Making the child items a list

Lets say we have an XML file with the following structure:

<SearchDetailResponse xmlns="http://www.cmiag.ch/cdws/searchDetailResponse">
    <Verzeichnungseinheit>
        <ID>164494</ID>
        <Titel>Abteilung für die XYZ</Titel>
        <Verzeichnungsstufe>Serie</Verzeichnungsstufe>
        <Bilder>
            <Bild src="imageserver.ch/image123456.jpg">Bild 1</Bild>
        </Bilder>
        <Findmittel/>
    </Verzeichnungseinheit>
</SearchDetailResponse>

By default, the Bilder item will be a dictionary because there is only one Bilder item. Using with the following default configuration:

{
    "section": "XMLReader",
}

This will yield the following item:

[
    {
        "SearchDetailResponse": {
            "@xmlns": "http://www.cmiag.ch/cdws/Verzeichnungseinheit",
            "Verzeichnungseinheit":  {
                    "ID": "164494",
                    "Titel": "Abteilung für die XYZ",
                    "Verzeichnungsstufe": "Serie",
                    "Bilder": {
                        "Bild": {
                            "@src": "imageserver.ch/image123456.jpg",
                            "#text": "Bild 1"
                        }
                    },
                    "Findmittel": null,
                },
        }
    }
]

If we want to make the Bilder items as a list, we can use the ensure_list option. option:

{
    "section": "XMLReader",
    "options": {
        "ensure_list": ["Bilder"]
    }
}

This will yield the following item:

[
    {
        "SearchDetailResponse": {
            "@xmlns": "http://www.cmiag.ch/cdws/Verzeichnungseinheit",
            "Verzeichnungseinheit":  {
                    "ID": "164494",
                    "Titel": "Abteilung für die XYZ",
                    "Verzeichnungsstufe": "Serie",
                    "Bilder": [
                        {
                            "Bild": {
                                "@src": "imageserver.ch/image123456.jpg",
                                "#text": "Bild 1"
                            }
                        }
                    ],
                    "Findmittel": null,
                },
        }
    }
]

The ensure_list option can be used to ensure that the items are a list. You can only specify the name of the tag that should be a list. All the tags with the same name will be converted to a list.

# Using the path option

The path option can be used to specify the path to the XML file. The path determined by looking up "$" prefixed names in previous section. For example, if the previous section yields the following item:

[
    {
        "different_path_key": "/path/to/file.xml"
    }
]

We can use the following configuration:

{
    "section": "XMLReader",
    "options": {
        "path": "/different_path_key"
    }
}

# Keep the previous item data

By default, the previous item data is discarded and the item is replaced with the parsed result. If you want to keep the previous item data, you can use the target option. For example, we can use the following configuration:

{
    "section": "XMLReader",
    "options": {
        "target": "/xml_data"
    }
}

This will yield the following item:

[
    {
        "path": "/path/to/file.xml",
        "xml_data": [
            {
                "ID": "164494",
                "Titel": "Abteilung für die XYZ",
                "Verzeichnungsstufe": "Serie",
                "Findmittel": null,
            }
        ]
    }
]

# Set statistics total

The XMLReader section is often used in combination with a HTTP section, which supports pagination. In this case, the pipeline does not know from the beginning how many items it is about to process, causing the progressbar and section statistics to be updated for each page.

This can be prevented when the queried API responds the total amount of items at the first page. The total option can be used in the XMLReader section to tell the pipeline the overall total amount of items to be processed. The information can be gathered from the first response body or the response headers.

Examples:

Given we are processing a response body such as:

<Response total="123" pagesize="15">
    <items>...</items>
</Response>

We can extract the total with this configuration:

{
    "section": "XMLReader",
    "options": {
        "total": "{{ document.Response.total }}",
        "items": "/Response/items"
    }
}

When the response has a response header such as x-total-results, this configuration sets total:

{
    "section": "XMLReader",
    "options": {
        "total": "{{ item.headers['x-total-results'] }}"
    }
}

# Options

  • ensure_list - Ensure list

    • The tag names, that should contain lists.
    • Type: List
    • Default: []
  • items - Items

    • JSON Pointer to the list of items that will be passed on to the next section individually. When not defined, the whole document is passed on as one item.
    • Type: JSONPointer
    • Default: ''
  • path - Path

    • JSON Pointer adressing the path to the file to be loaded.
    • Type: JSONPointer
    • Default: JsonPointer('/path')
  • target - Target

    • JSON Pointer for the place where the result is inserted in the processed item. When not set, the processed item is replaced by the resulting item.
    • Type: JSONPointer
    • Default: null
  • total - Total count

    • When processing multiple items with data, such as HTTP responses, the pipeline statistics and progressbar keep being updated. When the first item/HTTP response contains information about the total amount of items to be processed, this information can be set using the total option. The total option accepts an expression, where the previous item / HTTP response can be accessed with item and the parsed content body can be accessed with document.
    • Type: Expression
    • Default: null