RunIngestionWorkflow
Purpose
Запускает Terry universal loader для одного persisted data_source_id, валидирует raw payload, проверяет freshness, записывает raw-file metadata в БД и определяет post-record outcome.
Код: internal/worker/workflows/run_ingestion.go
Input / Output
Input (models.RunIngestionInput):
- data_source_id
- name (optional)
- metadata (optional)
- is_test_data
Output (models.RunIngestionResult):
- output.metadata
- output.fetched_at
- output.source_type
- output.raw_data
- output.validation
- output.freshness
- output.status
- output.has_processor
- output.processor_requested
- output.new_raw_data_notification_requested
- output.temporary_structured_data
- output.structured_data
- output.notifications
- output.failure_message
Raw payload data не возвращается в workflow output. output.raw_data содержит сведения о raw payload artifact, который LoaderActivity.Execute сохраняет в Hetzner S3:
- file_uid
- hash
- hash_algorithm
- content_type
- requested_at
- expiration_time
- is_new
- is_test_data
- is_processed_data
- metadata
output.validation содержит machine-readable validation outcome:
- status
- passed
- warnings
- results
output.freshness содержит machine-readable freshness outcome:
- status
- stale
- allow_stale
- observed_at
- checked_at
- age
- threshold
- observed_field
- message
Post-record status semantics:
- status="success": raw stage завершен, обработчика нет, success metrics обновлены
- status="requesting_transform": обработчик есть, workflow сообщает следующий шаг через processor_requested=true, success metrics пока не обновляются
- status="failed": reserved для output contract; текущие workflow failures возвращаются ошибкой
Called Activities / Children
- Activity:
LoaderActivity.Execute - Activity:
CheckFreshnessActivity.Execute - Activity:
RecordRawDataActivity.Execute - Activity:
FinalizeRawDataActivity.Execute - Activity:
StoreStructuredDataActivity.Execute - Activity:
BuildPolicySubscriberNotificationsActivity.Execute - Activity:
UpdateSourcePerformanceActivity.Execute - Child workflows: нет
Retry / Failure Behavior
- Activity timeout:
10m - Retry:
max_attempts=3, backoff5s -> 1m - Raw object key генерируется workflow-ом один раз и повторно используется activity retries.
- Workflow завершается ошибкой при invalid
data_source_id, неактивном source/config, ошибке fetcher/middleware, ошибке записи raw payload в Hetzner S3, validation failure, freshness failure, ошибке записи raw-file metadata или ошибке finalize stage. - При ошибке после того, как workflow знает
loader_config_id, он best-effort вызываетUpdateSourcePerformanceActivity.Executeсoutcome=failureи затем возвращает исходную ошибку. - При
status=successworkflow вызываетUpdateSourcePerformanceActivity.Executeсoutcome=successи завершает run. - При
status=requesting_transformworkflow завершает run сprocessor_requested=true;StoreStructuredDataActivity.Executeвозвращаетskipped=waiting_for_processor, если temporary structured file еще не появился. - Notification policies/subscribers пока не реализованы;
BuildPolicySubscriberNotificationsActivity.Executeвозвращаетnotifications.status=skipped.
Manual Run
docker compose exec -T temporal-admin-tools temporal workflow start \
--address temporal:7233 --namespace default --task-queue default \
--type RunIngestionWorkflow \
--workflow-id run-ingestion-manual-$(date +%s) \
--input '{"data_source_id":"<uuid>","name":"manual-run"}'
Related Tables / Logs
- Tables:
terry.workspaces,terry.domains,terry.data_sources,terry.loader_configs,terry.raw_data_files,terry.structured_data_documents - Metrics:
terry.loader_configs.total_runs,successful_runs,failed_runs,failed_runs_after_success, period failed counters,last_run_at,last_success_at,last_failure_at,last_failure_start_date - Логи: ошибки resolve config / fetcher / middleware / raw storage / validation / freshness / raw metadata / finalize / performance metrics в
worker