EnsureRunIngestionSchedule
Purpose
Создает или обновляет Temporal schedule для периодического запуска RunIngestionWorkflow.
Код: internal/worker/activities/run_ingestion_schedule.go
Flow
flowchart TD
Start([Start]) --> Normalize[Normalize input defaults]
Normalize --> Validate[Validate schedule_id, cron, timezone, task_queue, data_source_id]
Validate --> Valid{Valid?}
Valid -- No --> Invalid[Return validation error]
Valid -- Yes --> Describe[Describe Temporal schedule]
Describe --> Exists{Schedule exists?}
Exists -- No --> Create[Create schedule for RunIngestionWorkflow]
Create --> Created[Return created = true]
Exists -- Yes --> Preserve{preserve_paused_state?}
Preserve -- Yes --> ReadPaused[Read current paused state]
Preserve -- No --> UseInputPaused[Use requested paused state]
ReadPaused --> BuildUpdate[Build deterministic schedule update]
UseInputPaused --> BuildUpdate
BuildUpdate --> Update[Update existing schedule]
Update --> Updated[Return updated = true]
Created --> End([End])
Updated --> End
Invalid --> End
Input / Output
Input (models.EnsureRunIngestionScheduleInput):
- schedule_id, cron, timezone, task_queue
- data_source_id
- name (optional)
- metadata (optional)
- paused
- preserve_paused_state (optional)
Output (models.EnsureRunIngestionScheduleResult):
- schedule_id
- created, updated, paused
External Dependencies
- Temporal Schedule API (через SDK client)
Idempotency
- Если schedule не существует: create.
- Если существует: deterministic update.
- При
preserve_paused_state=trueactivity сохраняет текущий paused state существующего schedule.
Error Taxonomy
run ingestion schedule validation failed: missing data_source_id- ошибки подключения Temporal client
describe/create/update scheduleошибки
Callers
Observability
- Диагностика через статус workflow run,
temporal schedule list/describeи worker startup logs.