Skip to content

CheckFreshnessActivity.Execute

Purpose

Читает raw payload artifact, сохранённый LoaderActivity.Execute, валидирует JSON payload по validator_settings и проверяет его актуальность по freshness_settings.

Код: internal/worker/loader/check_freshness.go

Flow

flowchart TD
    Start([Start]) --> Validate[Validate request and loader_output.raw_data]
    Validate --> Valid{Valid?}
    Valid -- No --> Invalid[Return non-retryable validation error]
    Valid -- Yes --> Resolve[Resolve loader config]
    Resolve --> Read[Read raw payload from object storage]
    Read --> ReadOK{Read succeeded?}
    ReadOK -- No --> StorageError[Return retryable storage error]
    ReadOK -- Yes --> Parse[Parse payload as JSON]
    Parse --> ParseOK{JSON valid?}
    ParseOK -- No --> PayloadError[Return non-retryable payload error]
    ParseOK -- Yes --> Required[Evaluate required_paths]
    Required --> Rules[Evaluate validator rules]
    Rules --> ValidationOK{Validation passed?}
    ValidationOK -- No --> ValidationFailed[Return non-retryable validation failure]
    ValidationOK -- Yes --> FreshnessConfigured{Freshness threshold configured?}
    FreshnessConfigured -- No --> Skipped[Set freshness.status = skipped]
    FreshnessConfigured -- Yes --> Observe[Resolve observed_at from payload or $fetched_at]
    Observe --> FreshnessOK{Fresh enough or allow_stale?}
    FreshnessOK -- No --> FreshnessFailed[Return non-retryable freshness failure]
    FreshnessOK -- Yes --> Passed[Set freshness.status = passed]
    Skipped --> Output[Return IngestionOutput]
    Passed --> Output
    Output --> End([End])
    Invalid --> End
    StorageError --> End
    PayloadError --> End
    ValidationFailed --> End
    FreshnessFailed --> End

Input / Output

Input (loader.CheckFreshnessRequest): - data_source_id - loader_output

Output (loader.IngestionOutput): - metadata - fetched_at - source_type - raw_data - validation - freshness

validation возвращает machine-readable outcome: - status - passed - warnings - results[]

freshness возвращает: - status - stale - allow_stale - observed_at - checked_at - age - threshold - observed_field - message

Runtime Behavior

  • activity использует snapshot validator_settings и freshness_settings, полученный из LoaderActivity.Execute;
  • читает raw payload из Hetzner S3 по loader_output.raw_data.file_uid;
  • ожидает JSON payload;
  • запускает required_paths, затем rules[];
  • schema_ref читается из того же S3 bucket/client, что и raw payload artifacts;
  • observed_field="$fetched_at" использует loader_output.fetched_at вместо чтения timestamp из JSON payload; это fallback для источников, где payload не содержит собственного trustworthy observed timestamp;
  • freshness threshold precedence:
  • max_age
  • stale_after
  • ttl

Error Taxonomy

Non-retryable: - invalid validator config; - invalid freshness config; - validation failed; - freshness failed; - invalid expr rule contract.

Retryable: - transient raw payload read/storage errors.

Validator Contract

Top-level validator_settings fields:

  • schema_ref: object key в том же S3 bucket/client, что и raw payload artifacts.
  • fail_fast: если true, rule engine останавливается на первом failing error rule.
  • required_paths: список обязательных dot-path, проверяется до rules[].
  • rules: ordered список validation rules.

Common rule fields:

  • name: человекочитаемое имя правила.
  • type: тип built-in validator.
  • path: dot-path до значения или коллекции.
  • severity: error или warning; по умолчанию error.
  • config: type-specific settings.

Supported built-in rules:

  • schema
  • назначение: JSON Schema validation для узла по path.
  • настройки: использует top-level schema_ref; отдельных per-rule настроек нет.
  • простыми словами: проверяет структуру JSON по заранее описанной схеме.
  • required
  • назначение: проверяет, что значение по path существует.
  • настройки: нет.
  • простыми словами: поле обязано быть в payload.
  • not_empty
  • назначение: проверяет, что значение по path не пустое.
  • работает для string, array, map.
  • настройки: нет.
  • простыми словами: поле не должно быть пустой строкой, пустым массивом или пустым объектом.
  • type
  • назначение: проверяет runtime type значения.
  • настройки:
    • config.kind: string, number, integer, boolean, object, array, null.
  • простыми словами: поле должно быть именно того типа, который ожидается.
  • equals
  • назначение: проверяет точное равенство значения.
  • настройки:
    • config.value: ожидаемое значение.
  • простыми словами: значение должно быть ровно таким, как задано в конфиге.
  • one_of
  • назначение: проверяет, что значение входит в whitelist.
  • настройки:
    • config.values: массив допустимых значений.
  • простыми словами: значение должно быть одним из заранее разрешённых.
  • regex
  • назначение: проверяет string по regexp.
  • настройки:
    • config.pattern: regexp pattern.
  • простыми словами: строка должна подходить под шаблон.
  • length
  • назначение: проверяет длину string/array/map.
  • настройки:
    • config.min
    • config.max
  • простыми словами: строка или коллекция должна быть не короче и не длиннее заданных границ.
  • numeric_range
  • назначение: проверяет числовой диапазон.
  • настройки:
    • config.min
    • config.max
  • простыми словами: число должно лежать в допустимом диапазоне.
  • time_parse
  • назначение: проверяет, что timestamp по path парсится.
  • настройки:
    • config.formats: ordered список time layouts.
    • config.time_zone: fallback IANA timezone.
  • простыми словами: поле должно быть корректной датой/временем.
  • time_not_future
  • назначение: timestamp не должен быть в будущем относительно now.
  • настройки:
    • config.formats
    • config.time_zone
    • config.max_skew: допустимый положительный drift.
  • простыми словами: timestamp не должен опережать текущее время.
  • time_after
  • назначение: timestamp по path должен быть позже другого времени.
  • настройки:
    • ровно одно из:
    • config.value: фиксированное время
    • config.other_path: другой dot-path
    • optional:
    • config.inclusive
    • config.formats
    • config.time_zone
  • простыми словами: одно время должно быть позже другого.
  • time_before
  • назначение: timestamp по path должен быть раньше другого времени.
  • настройки:
    • ровно одно из:
    • config.value
    • config.other_path
    • optional:
    • config.inclusive
    • config.formats
    • config.time_zone
  • простыми словами: одно время должно быть раньше другого.
  • items_count
  • назначение: проверяет размер массива.
  • настройки:
    • config.min
    • config.max
  • простыми словами: массив должен содержать допустимое число элементов.
  • unique
  • назначение: проверяет уникальность значений внутри массива.
  • настройки:
    • config.item_path: путь внутри элемента массива, например currency или meta.id.
  • простыми словами: в массиве не должно быть дублей по выбранному полю.
  • expr
  • назначение: advanced bool expression rule.
  • настройки:
    • config.expression: expression string.
  • env:
    • value: значение по path
    • root: весь payload
    • now: текущее UTC время
  • helper functions:
    • has(path)
    • matches(pattern, s)
    • parseTime(value, formats, tz)
    • duration(s)
  • простыми словами: это escape hatch для сложной логики, когда обычных built-in правил уже недостаточно.

Path rules:

  • object path: data.items
  • array item path: records[].id
  • required_paths, rules[].path, unique.item_path и freshness_settings.observed_field используют один и тот же path syntax.

Freshness Contract

Top-level freshness_settings fields:

  • observed_field: dot-path до timestamp в payload.
  • max_age: основной freshness threshold.
  • stale_after: fallback threshold, если max_age не задан.
  • ttl: последний fallback threshold, если max_age и stale_after не заданы.
  • allow_stale: если true, stale payload не валит activity, а только помечает результат.
  • time_formats: ordered список layouts для парсинга observed timestamp.
  • time_zone: fallback IANA timezone для значений без offset.

Простыми словами:

  • observed_field говорит, из какого поля брать время данных;
  • max_age / stale_after / ttl задают максимально допустимый возраст;
  • allow_stale говорит, считать ли stale payload ошибкой или только warning-like outcome;
  • time_formats и time_zone нужны, чтобы правильно распарсить нестандартный timestamp из upstream.

Special observed field:

  • observed_field="$fetched_at" использует loader_output.fetched_at вместо payload field.

Freshness threshold precedence:

  1. max_age
  2. stale_after
  3. ttl

Freshness status semantics:

  • threshold не задан: status="skipped"
  • observed_field отсутствует или не парсится: non-retryable freshness failure
  • age <= threshold: status="passed", stale=false
  • age > threshold и allow_stale=true: status="passed", stale=true
  • age > threshold и allow_stale=false: non-retryable freshness failure

Time parsing semantics:

  • если time_formats пустой, используются time.RFC3339 и time.RFC3339Nano;
  • если timestamp уже содержит timezone/offset, он приоритетнее time_zone;
  • time_zone используется только как fallback для naive timestamps.

Callers