Skip to content

EnsureRunIngestionSchedule

Purpose

Создает или обновляет Temporal schedule для периодического запуска RunIngestionWorkflow.

Код: internal/worker/activities/run_ingestion_schedule.go

Flow

flowchart TD
    Start([Start]) --> Normalize[Normalize input defaults]
    Normalize --> Validate[Validate schedule_id, cron, timezone, task_queue, data_source_id]
    Validate --> Valid{Valid?}
    Valid -- No --> Invalid[Return validation error]
    Valid -- Yes --> Describe[Describe Temporal schedule]
    Describe --> Exists{Schedule exists?}
    Exists -- No --> Create[Create schedule for RunIngestionWorkflow]
    Create --> Created[Return created = true]
    Exists -- Yes --> Preserve{preserve_paused_state?}
    Preserve -- Yes --> ReadPaused[Read current paused state]
    Preserve -- No --> UseInputPaused[Use requested paused state]
    ReadPaused --> BuildUpdate[Build deterministic schedule update]
    UseInputPaused --> BuildUpdate
    BuildUpdate --> Update[Update existing schedule]
    Update --> Updated[Return updated = true]
    Created --> End([End])
    Updated --> End
    Invalid --> End

Input / Output

Input (models.EnsureRunIngestionScheduleInput): - schedule_id, cron, timezone, task_queue - data_source_id - name (optional) - metadata (optional) - paused - preserve_paused_state (optional)

Output (models.EnsureRunIngestionScheduleResult): - schedule_id - created, updated, paused

External Dependencies

  • Temporal Schedule API (через SDK client)

Idempotency

  • Если schedule не существует: create.
  • Если существует: deterministic update.
  • При preserve_paused_state=true activity сохраняет текущий paused state существующего schedule.

Error Taxonomy

  • run ingestion schedule validation failed: missing data_source_id
  • ошибки подключения Temporal client
  • describe/create/update schedule ошибки

Callers

Observability

  • Диагностика через статус workflow run, temporal schedule list/describe и worker startup logs.