Skip to content

Terry Configuration

Goal

Terry должен быть конфигурируемой платформой, где поведение pipeline задаётся декларативно, а не хардкодится под каждый source.

Current Persisted Shape

Текущий Terry baseline уже хранит source catalog в PostgreSQL:

  • terry.workspaces задаёт верхний ownership boundary;
  • terry.domains задаёт logical source owner внутри workspace;
  • terry.data_sources хранит source identity/policy и pointer на текущую активную config version.
  • terry.loader_configs хранит versioned runtime config и history/statistics по каждой версии.
  • data_sources.expiration_days хранит retention policy для raw payload artifacts этого источника.

На этом этапе persisted source-of-truth для universal loader runtime это связка data_sources + loader_configs.

Configuration Domains

Source configuration

Описывает:

  • тип source;
  • transport details;
  • auth/secrets references;
  • polling или trigger strategy;
  • формат входного payload.

В текущем коде source/runtime configuration вынесена в versioned loader_configs:

  • loader_settings
  • auth_settings
  • validator_settings
  • freshness_settings
  • processor_settings
  • fingerprint
  • incremental
  • schedule_settings
  • expiration_days

Для source.type=sdk текущий Terry runtime ожидает transport-specific блок source.sdk:

  • client идентифицирует биржу или SDK-backed provider;
  • operation выбирает Terry registry operation;
  • method остаётся legacy fallback для старого config shape, если operation не задан;
  • params передаёт per-operation runtime arguments;
  • SDK credentials по-прежнему берутся из environment, а не из Terry config.

Для binance:coins.snapshot runtime использует официальный Go SDK github.com/binance/binance-connector-go и прокидывает source.sdk.params.recv_window в Binance recvWindow, если параметр задан.

Current supported Terry SDK operations:

  • binance:coins.snapshot
  • kucoin:coins.snapshot
  • kucoin:loans.snapshot
  • bybit:coins.snapshot
  • bybit:loans.snapshot
  • gate:loans.snapshot

Для source.type=http Terry runtime ожидает transport-specific блок source.http.

Current HTTP fetcher capabilities:

  • common methods: GET, POST, PUT, PATCH, DELETE;
  • enabled/disabled query/header params;
  • raw JSON and raw text request body;
  • JSON response parsing via response.items_path;
  • raw/text response envelope for the existing JSON raw payload contract;
  • pagination modes: disabled, page, offset, cursor;
  • per-request timeout and retry;
  • SSRF protection for localhost/private/link-local/internal targets.

Detailed HTTP config contract: Terry HTTP Fetcher.

Current loader output contract для Terry:

  • raw payload не возвращается в workflow/activity output;
  • LoaderOutput.raw_data содержит file_uid, SHA-256 hash, content_type и requested_at для сырого файла, сохранённого в Hetzner S3;
  • для sdk и http fetchers это означает, что workflow result в Temporal CLI показывает raw artifact metadata вместо payload body;
  • fetcher должен вернуть JSON-compatible raw payload; HTTP text/raw responses оборачиваются в JSON envelope перед сохранением.

Current validation/freshness runtime contract:

  • RunIngestionWorkflow после загрузки raw payload запускает CheckFreshnessActivity.Execute;
  • validator_settings работает поверх JSON payload, считанного из raw storage;
  • freshness_settings использует observed_field, max_age / stale_after / ttl, optional time_formats и time_zone;
  • special-case observed_field="$fetched_at" использует LoaderOutput.fetched_at как observed timestamp, если upstream payload не содержит отдельного поля времени для freshness policy;
  • built-in validator rules:
  • schema
  • required
  • not_empty
  • type
  • equals
  • one_of
  • regex
  • length
  • numeric_range
  • time_parse
  • time_not_future
  • time_after
  • time_before
  • items_count
  • unique
  • expr
  • schema_ref читается из того же Hetzner S3 bucket/client, что и raw payload artifacts;
  • deterministic config/data defects завершаются non-retryable activity error, а transient raw storage read failures остаются retryable.

validator_settings

Top-level fields:

  • schema_ref: object key JSON Schema в том же Hetzner S3 bucket/client. Используется, когда нужно валидировать payload или его часть по формальной JSON Schema.
  • fail_fast: остановиться на первом failing error rule. Удобно для строгих pipeline, где нет смысла собирать весь список ошибок.
  • required_paths: обязательные dot-path, проверяются до rules[]. Это быстрый pre-check на наличие критичных полей.
  • rules: ordered список validation rules. Правила выполняются по порядку, поэтому сначала обычно ставят structural checks, потом более узкие value/time checks.

Common rule fields:

  • name: произвольное имя правила для логов и validation.results[].
  • type: какой именно встроенный валидатор будет использован.
  • path: где в JSON искать значение для проверки.
  • severity: error или warning, default error. error валит activity, warning только попадает в output.
  • config: настройки конкретного валидатора.

Supported rules[].type:

  • schema
  • использует top-level schema_ref
  • что делает: валидирует JSON-узел по полноценной JSON Schema.
  • когда использовать: когда payload уже описан формальным schema contract и нужна строгая structural validation.
  • required
  • без дополнительных настроек
  • что делает: проверяет, что поле вообще существует.
  • когда использовать: когда отсутствие поля уже само по себе дефект данных.
  • not_empty
  • без дополнительных настроек
  • что делает: проверяет, что строка не пустая, массив не пустой и объект не пустой.
  • когда использовать: когда поле может существовать, но пустое значение всё равно считается невалидным.
  • type
  • config.kind: string, number, integer, boolean, object, array, null
  • что делает: проверяет runtime type значения.
  • когда использовать: когда upstream может менять shape и нужно жёстко зафиксировать тип.
  • equals
  • config.value
  • что делает: проверяет точное равенство значения ожидаемому.
  • когда использовать: для fixed flags, константных статусов или version markers.
  • one_of
  • config.values
  • что делает: проверяет, что значение входит в список допустимых.
  • когда использовать: для enum-like полей.
  • regex
  • config.pattern
  • что делает: применяет regexp к строковому значению.
  • когда использовать: для кодов, идентификаторов, тикеров, e-mail-like строк и другого формата, который неудобно описывать через equals.
  • length
  • config.min, config.max
  • что делает: проверяет длину строки, массива или объекта.
  • когда использовать: когда важен минимум/максимум элементов или символов.
  • numeric_range
  • config.min, config.max
  • что делает: проверяет числовое значение на вхождение в диапазон.
  • когда использовать: для процентов, цен, лимитов, счётчиков и других numeric полей.
  • time_parse
  • config.formats, config.time_zone
  • что делает: проверяет, что timestamp можно распарсить.
  • когда использовать: если важно убедиться, что поле времени вообще валидно, но сравнение по времени пока не нужно.
  • time_not_future
  • config.formats, config.time_zone, config.max_skew
  • что делает: проверяет, что timestamp не лежит в будущем относительно момента проверки.
  • когда использовать: для observed/published/generated timestamps, которые не должны опережать текущее время.
  • time_after
  • ровно одно из config.value или config.other_path
  • optional config.inclusive, config.formats, config.time_zone
  • что делает: проверяет, что один timestamp позже другого.
  • когда использовать: например, updated_at должен быть позже created_at.
  • time_before
  • ровно одно из config.value или config.other_path
  • optional config.inclusive, config.formats, config.time_zone
  • что делает: проверяет, что один timestamp раньше другого.
  • когда использовать: например, start_at должен быть раньше end_at.
  • items_count
  • config.min, config.max
  • что делает: проверяет размер массива.
  • когда использовать: когда payload обязан содержать хотя бы один record или не должен превышать безопасный объём.
  • unique
  • config.item_path
  • что делает: проверяет, что внутри массива нет дублей по выбранному полю.
  • когда использовать: например, список валют, тикеров или product ids должен быть уникальным.
  • expr
  • config.expression
  • expression env: value, root, now
  • helper functions: has(path), matches(pattern, s), parseTime(value, formats, tz), duration(s)
  • что делает: выполняет bool expression для нестандартной логики, которую неудобно выразить built-in rule.
  • когда использовать: только для advanced cases, когда built-in rule types уже не хватает.

Path semantics:

  • object path example: data.items
  • array item path example: records[].id

Пример validator_settings:

{
  "required_paths": ["records"],
  "rules": [
    {
      "name": "records-array",
      "type": "type",
      "path": "records",
      "config": { "kind": "array" }
    },
    {
      "name": "records-present",
      "type": "items_count",
      "path": "records",
      "config": { "min": 1 }
    },
    {
      "name": "currency-unique",
      "type": "unique",
      "path": "records",
      "config": { "item_path": "currency" }
    }
  ]
}

Смысл примера:

  • records должен существовать;
  • records должен быть массивом;
  • в массиве должен быть хотя бы один элемент;
  • у элементов массива поле currency не должно повторяться.

freshness_settings

Fields:

  • observed_field: dot-path до timestamp в payload. Это поле, по которому система понимает, насколько данные свежие.
  • max_age: основной freshness threshold. Если now - observed_at > max_age, payload считается stale.
  • stale_after: fallback threshold, если max_age не задан. Нужен для совместимости или более мягкой конфигурации.
  • ttl: последний fallback threshold, если не заданы max_age и stale_after. Используется как самый слабый резервный способ задать freshness policy.
  • allow_stale: stale payload не валит activity, если true. В этом режиме результат будет успешным, но в output появится freshness.stale=true.
  • time_formats: ordered список layouts для timestamp parsing. Нужен, когда upstream отдаёт время не только в RFC3339.
  • time_zone: fallback IANA timezone для naive timestamps. Используется только если сама строка времени не содержит offset/timezone.

Special values:

  • observed_field="$fetched_at" использует LoaderOutput.fetched_at, а не payload field. Это полезно для источников, где в payload нет отдельного trustworthy timestamp, но важна свежесть самого факта загрузки.

Threshold precedence:

  1. max_age
  2. stale_after
  3. ttl

Behavior:

  • если threshold не задан, freshness check получает status="skipped";
  • если observed_field отсутствует или не парсится, activity завершается non-retryable freshness error;
  • если age превышает threshold и allow_stale=false, activity завершается non-retryable freshness error;
  • если age превышает threshold и allow_stale=true, activity проходит с freshness.stale=true;
  • если time_formats пустой, используются time.RFC3339 и time.RFC3339Nano.

Пример freshness_settings:

{
  "observed_field": "published_at",
  "max_age": 3600000000000,
  "allow_stale": false,
  "time_formats": ["2006-01-02 15:04:05", "RFC3339"],
  "time_zone": "UTC"
}

Смысл примера:

  • берем время из поля published_at;
  • считаем данные stale, если им больше 1 часа;
  • не пропускаем stale результат;
  • сначала пытаемся распарсить формат 2006-01-02 15:04:05, потом RFC3339;
  • если в строке нет timezone, используем UTC.

Для source-level throttling Terry runtime ожидает source.rate_limit:

  • requests_per_second, burst и optional window описывают limiter policy;
  • bucket_key позволяет нескольким data_source делить один limiter bucket;
  • если bucket_key не задан, runtime fallback-ит к auth.credentials_ref, а затем к data_source_id;
  • domain.rate_limit_rps остаётся domain metadata и не используется loader runtime как active throttling policy.

Job configuration

Описывает:

  • какой source использует job;
  • какой parser или extractor применяется;
  • какие validation rule включены;
  • какой набор transformation применяется;
  • какой sink является целевым;
  • какие notification policies привязаны к outcome.

Notification configuration

Описывает:

  • какие события порождают нотификацию;
  • какой notification template использовать;
  • в какие delivery channel отправлять;
  • dedup/rate-limit policy;
  • fallback channel при delivery errors.

Suggested Config Shape

На уровне документации разумно исходить из следующих logical blocks:

job
source
parser
validation
transformation
sink
notification
operations

Для текущего persisted catalog это можно читать так:

workspace
domain
data_source
  current_loader_config_id
loader_config(versioned)
  source
  auth
  validation
  freshness
  processing
  fingerprint
  incremental
  scheduling

Configuration Principles

  • config должен описывать intent, а не внутренние runtime классы;
  • secrets и credentials не хранятся inline в job config;
  • validation и transformation должны быть composable;
  • notification rules должны ссылаться на pipeline outcome, а не на transport-specific payload;
  • любой config должен поддерживать dry-run или validation mode до production запуска.

Дополнительно для текущей реализации:

  • runtime должен уметь однозначно загрузить config по data_source_id;
  • source_type и transport-specific source.type не должны расходиться;
  • ownership chain workspace -> domain -> data_source должна быть консистентной на уровне БД;
  • data_source принадлежит workspace транзитивно через domain, без отдельного workspace_id в строке источника.

Evolution Rules

  • новые поля должны добавляться backward-compatible способом;
  • deprecated keys должны иметь документированный migration path;
  • shared vocabulary из Terry должен использоваться в names и описаниях ключей.