RecordRawDataActivity.Execute
Purpose
Сохраняет metadata raw payload artifact в terry.raw_data_files после validation/freshness и выполняет global dedupe по hash.
Код: internal/worker/loader/record_raw_data.go
Flow
flowchart TD
Start([Start]) --> Validate[Validate request and raw_data fields]
Validate --> Valid{Valid?}
Valid -- No --> Invalid[Return non-retryable config error]
Valid -- Yes --> Resolve[Resolve loader config]
Resolve --> ExpDays{expiration_days}
ExpDays -- "< 0" --> BadRetention[Return non-retryable config error]
ExpDays -- "null / 0 / > 0" --> Idempotency[Lookup raw_data_files by file_uid]
Idempotency --> Existing{Record exists?}
Existing -- Yes --> ExistingOutput[Return existing record metadata]
Existing -- No --> Duplicate[Lookup duplicate by hash_algorithm, hash, is_test_data, unprocessed]
Duplicate --> Found{Duplicate found?}
Found -- Yes --> DeleteNew[Delete new raw object]
DeleteNew --> DeleteOK{Deleted?}
DeleteOK -- No --> DeleteError[Return retryable delete error]
DeleteOK -- Yes --> Expired{Duplicate expiration_time expired?}
Expired -- Yes --> Extend[Extend duplicate expiration_time to now + 3h]
Expired -- No --> UseDuplicate[Use duplicate metadata]
Extend --> UseDuplicate
UseDuplicate --> DuplicateOutput[Return output with raw_data.is_new = false]
Found -- No --> ComputeExpiration[Compute expiration_time]
ComputeExpiration --> Insert[Insert terry.raw_data_files]
Insert --> InsertOK{Inserted?}
InsertOK -- Yes --> NewOutput[Return output with raw_data.is_new = true]
InsertOK -- UniqueConflict --> Duplicate
InsertOK -- DBError --> DBError[Return retryable DB error]
ExistingOutput --> End([End])
DuplicateOutput --> End
NewOutput --> End
Invalid --> End
BadRetention --> End
DeleteError --> End
DBError --> End
Input / Output
Input (loader.RecordRawDataRequest):
- data_source_id
- output
- is_test_data
Output (loader.IngestionOutput):
- возвращает исходный ingestion output с обновленным raw_data
- raw_data.is_new=true для нового файла
- raw_data.is_new=false для найденного duplicate
- raw_data.expiration_time, is_test_data, is_processed_data, metadata
Behavior
- Ищет duplicate глобально по
hash_algorithm,hash,is_test_dataсредиis_processed_data=false. - Если duplicate найден, удаляет новый raw object из S3-compatible storage и возвращает metadata старого файла.
- Если duplicate expired, продлевает его
expiration_timeдоnow + 3h. - Если duplicate не найден, создает новую запись в
terry.raw_data_files.
Retention:
- data_sources.expiration_days = null -> expiration_time = null
- 0 -> requested_at + 3h
- > 0 -> requested_at + expiration_days
- < 0 -> non-retryable config error
External Dependencies
- PostgreSQL table
terry.raw_data_files - Hetzner S3-compatible raw object storage
- Terry loader config resolver for
expiration_days
Idempotency
Activity сначала ищет запись по file_uid. Если запись уже есть, она возвращается без повторных insert/delete side effects.
Concurrent inserts by hash are handled through the partial unique index on unprocessed raw files; the losing activity re-reads the winning duplicate and deletes its new raw object.
Error Taxonomy
- invalid request or negative
expiration_days: non-retryable config error - DB read/write errors: retryable
- duplicate raw object delete failure: retryable