CheckFreshnessActivity.Execute
Purpose
Читает raw payload artifact, сохранённый LoaderActivity.Execute, валидирует JSON payload по validator_settings и проверяет его актуальность по freshness_settings.
Код: internal/worker/loader/check_freshness.go
Flow
flowchart TD
Start([Start]) --> Validate[Validate request and loader_output.raw_data]
Validate --> Valid{Valid?}
Valid -- No --> Invalid[Return non-retryable validation error]
Valid -- Yes --> Resolve[Resolve loader config]
Resolve --> Read[Read raw payload from object storage]
Read --> ReadOK{Read succeeded?}
ReadOK -- No --> StorageError[Return retryable storage error]
ReadOK -- Yes --> Parse[Parse payload as JSON]
Parse --> ParseOK{JSON valid?}
ParseOK -- No --> PayloadError[Return non-retryable payload error]
ParseOK -- Yes --> Required[Evaluate required_paths]
Required --> Rules[Evaluate validator rules]
Rules --> ValidationOK{Validation passed?}
ValidationOK -- No --> ValidationFailed[Return non-retryable validation failure]
ValidationOK -- Yes --> FreshnessConfigured{Freshness threshold configured?}
FreshnessConfigured -- No --> Skipped[Set freshness.status = skipped]
FreshnessConfigured -- Yes --> Observe[Resolve observed_at from payload or $fetched_at]
Observe --> FreshnessOK{Fresh enough or allow_stale?}
FreshnessOK -- No --> FreshnessFailed[Return non-retryable freshness failure]
FreshnessOK -- Yes --> Passed[Set freshness.status = passed]
Skipped --> Output[Return IngestionOutput]
Passed --> Output
Output --> End([End])
Invalid --> End
StorageError --> End
PayloadError --> End
ValidationFailed --> End
FreshnessFailed --> End
Input / Output
Input (loader.CheckFreshnessRequest):
- data_source_id
- loader_output
Output (loader.IngestionOutput):
- metadata
- fetched_at
- source_type
- raw_data
- validation
- freshness
validation возвращает machine-readable outcome:
- status
- passed
- warnings
- results[]
freshness возвращает:
- status
- stale
- allow_stale
- observed_at
- checked_at
- age
- threshold
- observed_field
- message
Runtime Behavior
- activity использует snapshot
validator_settingsиfreshness_settings, полученный изLoaderActivity.Execute; - читает raw payload из Hetzner S3 по
loader_output.raw_data.file_uid; - ожидает JSON payload;
- запускает
required_paths, затемrules[]; schema_refчитается из того же S3 bucket/client, что и raw payload artifacts;observed_field="$fetched_at"используетloader_output.fetched_atвместо чтения timestamp из JSON payload; это fallback для источников, где payload не содержит собственного trustworthy observed timestamp;- freshness threshold precedence:
max_agestale_afterttl
Error Taxonomy
Non-retryable:
- invalid validator config;
- invalid freshness config;
- validation failed;
- freshness failed;
- invalid expr rule contract.
Retryable: - transient raw payload read/storage errors.
Validator Contract
Top-level validator_settings fields:
schema_ref: object key в том же S3 bucket/client, что и raw payload artifacts.fail_fast: еслиtrue, rule engine останавливается на первом failingerrorrule.required_paths: список обязательных dot-path, проверяется доrules[].rules: ordered список validation rules.
Common rule fields:
name: человекочитаемое имя правила.type: тип built-in validator.path: dot-path до значения или коллекции.severity:errorилиwarning; по умолчаниюerror.config: type-specific settings.
Supported built-in rules:
schema- назначение: JSON Schema validation для узла по
path. - настройки: использует top-level
schema_ref; отдельных per-rule настроек нет. - простыми словами: проверяет структуру JSON по заранее описанной схеме.
required- назначение: проверяет, что значение по
pathсуществует. - настройки: нет.
- простыми словами: поле обязано быть в payload.
not_empty- назначение: проверяет, что значение по
pathне пустое. - работает для string, array, map.
- настройки: нет.
- простыми словами: поле не должно быть пустой строкой, пустым массивом или пустым объектом.
type- назначение: проверяет runtime type значения.
- настройки:
config.kind:string,number,integer,boolean,object,array,null.
- простыми словами: поле должно быть именно того типа, который ожидается.
equals- назначение: проверяет точное равенство значения.
- настройки:
config.value: ожидаемое значение.
- простыми словами: значение должно быть ровно таким, как задано в конфиге.
one_of- назначение: проверяет, что значение входит в whitelist.
- настройки:
config.values: массив допустимых значений.
- простыми словами: значение должно быть одним из заранее разрешённых.
regex- назначение: проверяет string по regexp.
- настройки:
config.pattern: regexp pattern.
- простыми словами: строка должна подходить под шаблон.
length- назначение: проверяет длину string/array/map.
- настройки:
config.minconfig.max
- простыми словами: строка или коллекция должна быть не короче и не длиннее заданных границ.
numeric_range- назначение: проверяет числовой диапазон.
- настройки:
config.minconfig.max
- простыми словами: число должно лежать в допустимом диапазоне.
time_parse- назначение: проверяет, что timestamp по
pathпарсится. - настройки:
config.formats: ordered список time layouts.config.time_zone: fallback IANA timezone.
- простыми словами: поле должно быть корректной датой/временем.
time_not_future- назначение: timestamp не должен быть в будущем относительно
now. - настройки:
config.formatsconfig.time_zoneconfig.max_skew: допустимый положительный drift.
- простыми словами: timestamp не должен опережать текущее время.
time_after- назначение: timestamp по
pathдолжен быть позже другого времени. - настройки:
- ровно одно из:
config.value: фиксированное времяconfig.other_path: другой dot-path- optional:
config.inclusiveconfig.formatsconfig.time_zone
- простыми словами: одно время должно быть позже другого.
time_before- назначение: timestamp по
pathдолжен быть раньше другого времени. - настройки:
- ровно одно из:
config.valueconfig.other_path- optional:
config.inclusiveconfig.formatsconfig.time_zone
- простыми словами: одно время должно быть раньше другого.
items_count- назначение: проверяет размер массива.
- настройки:
config.minconfig.max
- простыми словами: массив должен содержать допустимое число элементов.
unique- назначение: проверяет уникальность значений внутри массива.
- настройки:
config.item_path: путь внутри элемента массива, напримерcurrencyилиmeta.id.
- простыми словами: в массиве не должно быть дублей по выбранному полю.
expr- назначение: advanced bool expression rule.
- настройки:
config.expression: expression string.
- env:
value: значение поpathroot: весь payloadnow: текущее UTC время
- helper functions:
has(path)matches(pattern, s)parseTime(value, formats, tz)duration(s)
- простыми словами: это escape hatch для сложной логики, когда обычных built-in правил уже недостаточно.
Path rules:
- object path:
data.items - array item path:
records[].id required_paths,rules[].path,unique.item_pathиfreshness_settings.observed_fieldиспользуют один и тот же path syntax.
Freshness Contract
Top-level freshness_settings fields:
observed_field: dot-path до timestamp в payload.max_age: основной freshness threshold.stale_after: fallback threshold, еслиmax_ageне задан.ttl: последний fallback threshold, еслиmax_ageиstale_afterне заданы.allow_stale: еслиtrue, stale payload не валит activity, а только помечает результат.time_formats: ordered список layouts для парсинга observed timestamp.time_zone: fallback IANA timezone для значений без offset.
Простыми словами:
observed_fieldговорит, из какого поля брать время данных;max_age/stale_after/ttlзадают максимально допустимый возраст;allow_staleговорит, считать ли stale payload ошибкой или только warning-like outcome;time_formatsиtime_zoneнужны, чтобы правильно распарсить нестандартный timestamp из upstream.
Special observed field:
observed_field="$fetched_at"используетloader_output.fetched_atвместо payload field.
Freshness threshold precedence:
max_agestale_afterttl
Freshness status semantics:
- threshold не задан:
status="skipped" observed_fieldотсутствует или не парсится: non-retryable freshness failure- age <= threshold:
status="passed",stale=false - age > threshold и
allow_stale=true:status="passed",stale=true - age > threshold и
allow_stale=false: non-retryable freshness failure
Time parsing semantics:
- если
time_formatsпустой, используютсяtime.RFC3339иtime.RFC3339Nano; - если timestamp уже содержит timezone/offset, он приоритетнее
time_zone; time_zoneиспользуется только как fallback для naive timestamps.