Terry Configuration
Goal
Terry должен быть конфигурируемой платформой, где поведение pipeline задаётся декларативно, а не хардкодится под каждый source.
Current Persisted Shape
Текущий Terry baseline уже хранит source catalog в PostgreSQL:
terry.workspacesзадаёт верхний ownership boundary;terry.domainsзадаёт logical source owner внутри workspace;terry.data_sourcesхранит source identity/policy и pointer на текущую активную config version.terry.loader_configsхранит versioned runtime config и history/statistics по каждой версии.data_sources.expiration_daysхранит retention policy для raw payload artifacts этого источника.
На этом этапе persisted source-of-truth для universal loader runtime это связка data_sources + loader_configs.
Configuration Domains
Source configuration
Описывает:
- тип source;
- transport details;
- auth/secrets references;
- polling или trigger strategy;
- формат входного payload.
В текущем коде source/runtime configuration вынесена в versioned loader_configs:
loader_settingsauth_settingsvalidator_settingsfreshness_settingsprocessor_settingsfingerprintincrementalschedule_settingsexpiration_days
Для source.type=sdk текущий Terry runtime ожидает transport-specific блок source.sdk:
clientидентифицирует биржу или SDK-backed provider;operationвыбирает Terry registry operation;methodостаётся legacy fallback для старого config shape, еслиoperationне задан;paramsпередаёт per-operation runtime arguments;- SDK credentials по-прежнему берутся из environment, а не из Terry config.
Для binance:coins.snapshot runtime использует официальный Go SDK github.com/binance/binance-connector-go и прокидывает source.sdk.params.recv_window в Binance recvWindow, если параметр задан.
Current supported Terry SDK operations:
binance:coins.snapshotkucoin:coins.snapshotkucoin:loans.snapshotbybit:coins.snapshotbybit:loans.snapshotgate:loans.snapshot
Для source.type=http Terry runtime ожидает transport-specific блок source.http.
Current HTTP fetcher capabilities:
- common methods:
GET,POST,PUT,PATCH,DELETE; - enabled/disabled query/header params;
- raw JSON and raw text request body;
- JSON response parsing via
response.items_path; - raw/text response envelope for the existing JSON raw payload contract;
- pagination modes:
disabled,page,offset,cursor; - per-request timeout and retry;
- SSRF protection for localhost/private/link-local/internal targets.
Detailed HTTP config contract: Terry HTTP Fetcher.
Current loader output contract для Terry:
- raw payload не возвращается в workflow/activity output;
LoaderOutput.raw_dataсодержитfile_uid, SHA-256 hash,content_typeиrequested_atдля сырого файла, сохранённого в Hetzner S3;- для
sdkиhttpfetchers это означает, что workflow result в Temporal CLI показывает raw artifact metadata вместо payload body; - fetcher должен вернуть JSON-compatible raw payload; HTTP text/raw responses оборачиваются в JSON envelope перед сохранением.
Current validation/freshness runtime contract:
RunIngestionWorkflowпосле загрузки raw payload запускаетCheckFreshnessActivity.Execute;validator_settingsработает поверх JSON payload, считанного из raw storage;freshness_settingsиспользуетobserved_field,max_age/stale_after/ttl, optionaltime_formatsиtime_zone;- special-case
observed_field="$fetched_at"используетLoaderOutput.fetched_atкак observed timestamp, если upstream payload не содержит отдельного поля времени для freshness policy; - built-in validator rules:
schemarequirednot_emptytypeequalsone_ofregexlengthnumeric_rangetime_parsetime_not_futuretime_aftertime_beforeitems_countuniqueexprschema_refчитается из того же Hetzner S3 bucket/client, что и raw payload artifacts;- deterministic config/data defects завершаются non-retryable activity error, а transient raw storage read failures остаются retryable.
validator_settings
Top-level fields:
schema_ref: object key JSON Schema в том же Hetzner S3 bucket/client. Используется, когда нужно валидировать payload или его часть по формальной JSON Schema.fail_fast: остановиться на первом failingerrorrule. Удобно для строгих pipeline, где нет смысла собирать весь список ошибок.required_paths: обязательные dot-path, проверяются доrules[]. Это быстрый pre-check на наличие критичных полей.rules: ordered список validation rules. Правила выполняются по порядку, поэтому сначала обычно ставят structural checks, потом более узкие value/time checks.
Common rule fields:
name: произвольное имя правила для логов иvalidation.results[].type: какой именно встроенный валидатор будет использован.path: где в JSON искать значение для проверки.severity:errorилиwarning, defaulterror.errorвалит activity,warningтолько попадает в output.config: настройки конкретного валидатора.
Supported rules[].type:
schema- использует top-level
schema_ref - что делает: валидирует JSON-узел по полноценной JSON Schema.
- когда использовать: когда payload уже описан формальным schema contract и нужна строгая structural validation.
required- без дополнительных настроек
- что делает: проверяет, что поле вообще существует.
- когда использовать: когда отсутствие поля уже само по себе дефект данных.
not_empty- без дополнительных настроек
- что делает: проверяет, что строка не пустая, массив не пустой и объект не пустой.
- когда использовать: когда поле может существовать, но пустое значение всё равно считается невалидным.
typeconfig.kind:string,number,integer,boolean,object,array,null- что делает: проверяет runtime type значения.
- когда использовать: когда upstream может менять shape и нужно жёстко зафиксировать тип.
equalsconfig.value- что делает: проверяет точное равенство значения ожидаемому.
- когда использовать: для fixed flags, константных статусов или version markers.
one_ofconfig.values- что делает: проверяет, что значение входит в список допустимых.
- когда использовать: для enum-like полей.
regexconfig.pattern- что делает: применяет regexp к строковому значению.
- когда использовать: для кодов, идентификаторов, тикеров, e-mail-like строк и другого формата, который неудобно описывать через
equals. lengthconfig.min,config.max- что делает: проверяет длину строки, массива или объекта.
- когда использовать: когда важен минимум/максимум элементов или символов.
numeric_rangeconfig.min,config.max- что делает: проверяет числовое значение на вхождение в диапазон.
- когда использовать: для процентов, цен, лимитов, счётчиков и других numeric полей.
time_parseconfig.formats,config.time_zone- что делает: проверяет, что timestamp можно распарсить.
- когда использовать: если важно убедиться, что поле времени вообще валидно, но сравнение по времени пока не нужно.
time_not_futureconfig.formats,config.time_zone,config.max_skew- что делает: проверяет, что timestamp не лежит в будущем относительно момента проверки.
- когда использовать: для observed/published/generated timestamps, которые не должны опережать текущее время.
time_after- ровно одно из
config.valueилиconfig.other_path - optional
config.inclusive,config.formats,config.time_zone - что делает: проверяет, что один timestamp позже другого.
- когда использовать: например,
updated_atдолжен быть позжеcreated_at. time_before- ровно одно из
config.valueилиconfig.other_path - optional
config.inclusive,config.formats,config.time_zone - что делает: проверяет, что один timestamp раньше другого.
- когда использовать: например,
start_atдолжен быть раньшеend_at. items_countconfig.min,config.max- что делает: проверяет размер массива.
- когда использовать: когда payload обязан содержать хотя бы один record или не должен превышать безопасный объём.
uniqueconfig.item_path- что делает: проверяет, что внутри массива нет дублей по выбранному полю.
- когда использовать: например, список валют, тикеров или product ids должен быть уникальным.
exprconfig.expression- expression env:
value,root,now - helper functions:
has(path),matches(pattern, s),parseTime(value, formats, tz),duration(s) - что делает: выполняет bool expression для нестандартной логики, которую неудобно выразить built-in rule.
- когда использовать: только для advanced cases, когда built-in rule types уже не хватает.
Path semantics:
- object path example:
data.items - array item path example:
records[].id
Пример validator_settings:
{
"required_paths": ["records"],
"rules": [
{
"name": "records-array",
"type": "type",
"path": "records",
"config": { "kind": "array" }
},
{
"name": "records-present",
"type": "items_count",
"path": "records",
"config": { "min": 1 }
},
{
"name": "currency-unique",
"type": "unique",
"path": "records",
"config": { "item_path": "currency" }
}
]
}
Смысл примера:
recordsдолжен существовать;recordsдолжен быть массивом;- в массиве должен быть хотя бы один элемент;
- у элементов массива поле
currencyне должно повторяться.
freshness_settings
Fields:
observed_field: dot-path до timestamp в payload. Это поле, по которому система понимает, насколько данные свежие.max_age: основной freshness threshold. Еслиnow - observed_at > max_age, payload считается stale.stale_after: fallback threshold, еслиmax_ageне задан. Нужен для совместимости или более мягкой конфигурации.ttl: последний fallback threshold, если не заданыmax_ageиstale_after. Используется как самый слабый резервный способ задать freshness policy.allow_stale: stale payload не валит activity, еслиtrue. В этом режиме результат будет успешным, но в output появитсяfreshness.stale=true.time_formats: ordered список layouts для timestamp parsing. Нужен, когда upstream отдаёт время не только в RFC3339.time_zone: fallback IANA timezone для naive timestamps. Используется только если сама строка времени не содержит offset/timezone.
Special values:
observed_field="$fetched_at"используетLoaderOutput.fetched_at, а не payload field. Это полезно для источников, где в payload нет отдельного trustworthy timestamp, но важна свежесть самого факта загрузки.
Threshold precedence:
max_agestale_afterttl
Behavior:
- если threshold не задан, freshness check получает
status="skipped"; - если
observed_fieldотсутствует или не парсится, activity завершается non-retryable freshness error; - если age превышает threshold и
allow_stale=false, activity завершается non-retryable freshness error; - если age превышает threshold и
allow_stale=true, activity проходит сfreshness.stale=true; - если
time_formatsпустой, используютсяtime.RFC3339иtime.RFC3339Nano.
Пример freshness_settings:
{
"observed_field": "published_at",
"max_age": 3600000000000,
"allow_stale": false,
"time_formats": ["2006-01-02 15:04:05", "RFC3339"],
"time_zone": "UTC"
}
Смысл примера:
- берем время из поля
published_at; - считаем данные stale, если им больше 1 часа;
- не пропускаем stale результат;
- сначала пытаемся распарсить формат
2006-01-02 15:04:05, потомRFC3339; - если в строке нет timezone, используем
UTC.
Для source-level throttling Terry runtime ожидает source.rate_limit:
requests_per_second,burstи optionalwindowописывают limiter policy;bucket_keyпозволяет несколькимdata_sourceделить один limiter bucket;- если
bucket_keyне задан, runtime fallback-ит кauth.credentials_ref, а затем кdata_source_id; domain.rate_limit_rpsостаётся domain metadata и не используется loader runtime как active throttling policy.
Job configuration
Описывает:
- какой
sourceиспользует job; - какой parser или extractor применяется;
- какие
validation ruleвключены; - какой набор
transformationприменяется; - какой
sinkявляется целевым; - какие notification policies привязаны к outcome.
Notification configuration
Описывает:
- какие события порождают нотификацию;
- какой
notification templateиспользовать; - в какие
delivery channelотправлять; - dedup/rate-limit policy;
- fallback channel при delivery errors.
Suggested Config Shape
На уровне документации разумно исходить из следующих logical blocks:
Для текущего persisted catalog это можно читать так:
workspace
domain
data_source
current_loader_config_id
loader_config(versioned)
source
auth
validation
freshness
processing
fingerprint
incremental
scheduling
Configuration Principles
- config должен описывать intent, а не внутренние runtime классы;
- secrets и credentials не хранятся inline в job config;
- validation и transformation должны быть composable;
- notification rules должны ссылаться на pipeline outcome, а не на transport-specific payload;
- любой config должен поддерживать dry-run или validation mode до production запуска.
Дополнительно для текущей реализации:
- runtime должен уметь однозначно загрузить config по
data_source_id; source_typeи transport-specificsource.typeне должны расходиться;- ownership chain
workspace -> domain -> data_sourceдолжна быть консистентной на уровне БД; data_sourceпринадлежитworkspaceтранзитивно черезdomain, без отдельногоworkspace_idв строке источника.
Evolution Rules
- новые поля должны добавляться backward-compatible способом;
- deprecated keys должны иметь документированный migration path;
- shared vocabulary из Terry должен использоваться в names и описаниях ключей.