Skip to content

RunIngestionWorkflow

Purpose

Запускает Terry universal loader для одного persisted data_source_id, валидирует raw payload, проверяет freshness, записывает raw-file metadata в БД и определяет post-record outcome.

Код: internal/worker/workflows/run_ingestion.go

Input / Output

Input (models.RunIngestionInput): - data_source_id - name (optional) - metadata (optional) - is_test_data

Output (models.RunIngestionResult): - output.metadata - output.fetched_at - output.source_type - output.raw_data - output.validation - output.freshness - output.status - output.has_processor - output.processor_requested - output.new_raw_data_notification_requested - output.temporary_structured_data - output.structured_data - output.notifications - output.failure_message

Raw payload data не возвращается в workflow output. output.raw_data содержит сведения о raw payload artifact, который LoaderActivity.Execute сохраняет в Hetzner S3: - file_uid - hash - hash_algorithm - content_type - requested_at - expiration_time - is_new - is_test_data - is_processed_data - metadata

output.validation содержит machine-readable validation outcome: - status - passed - warnings - results

output.freshness содержит machine-readable freshness outcome: - status - stale - allow_stale - observed_at - checked_at - age - threshold - observed_field - message

Post-record status semantics: - status="success": raw stage завершен, обработчика нет, success metrics обновлены - status="requesting_transform": обработчик есть, workflow сообщает следующий шаг через processor_requested=true, success metrics пока не обновляются - status="failed": reserved для output contract; текущие workflow failures возвращаются ошибкой

Called Activities / Children

Retry / Failure Behavior

  • Activity timeout: 10m
  • Retry: max_attempts=3, backoff 5s -> 1m
  • Raw object key генерируется workflow-ом один раз и повторно используется activity retries.
  • Workflow завершается ошибкой при invalid data_source_id, неактивном source/config, ошибке fetcher/middleware, ошибке записи raw payload в Hetzner S3, validation failure, freshness failure, ошибке записи raw-file metadata или ошибке finalize stage.
  • При ошибке после того, как workflow знает loader_config_id, он best-effort вызывает UpdateSourcePerformanceActivity.Execute с outcome=failure и затем возвращает исходную ошибку.
  • При status=success workflow вызывает UpdateSourcePerformanceActivity.Execute с outcome=success и завершает run.
  • При status=requesting_transform workflow завершает run с processor_requested=true; StoreStructuredDataActivity.Execute возвращает skipped=waiting_for_processor, если temporary structured file еще не появился.
  • Notification policies/subscribers пока не реализованы; BuildPolicySubscriberNotificationsActivity.Execute возвращает notifications.status=skipped.

Manual Run

docker compose exec -T temporal-admin-tools temporal workflow start \
  --address temporal:7233 --namespace default --task-queue default \
  --type RunIngestionWorkflow \
  --workflow-id run-ingestion-manual-$(date +%s) \
  --input '{"data_source_id":"<uuid>","name":"manual-run"}'
  • Tables: terry.workspaces, terry.domains, terry.data_sources, terry.loader_configs, terry.raw_data_files, terry.structured_data_documents
  • Metrics: terry.loader_configs.total_runs, successful_runs, failed_runs, failed_runs_after_success, period failed counters, last_run_at, last_success_at, last_failure_at, last_failure_start_date
  • Логи: ошибки resolve config / fetcher / middleware / raw storage / validation / freshness / raw metadata / finalize / performance metrics в worker