Skip to content

RecordRawDataActivity.Execute

Purpose

Сохраняет metadata raw payload artifact в terry.raw_data_files после validation/freshness и выполняет global dedupe по hash.

Код: internal/worker/loader/record_raw_data.go

Flow

flowchart TD
    Start([Start]) --> Validate[Validate request and raw_data fields]
    Validate --> Valid{Valid?}
    Valid -- No --> Invalid[Return non-retryable config error]
    Valid -- Yes --> Resolve[Resolve loader config]
    Resolve --> ExpDays{expiration_days}
    ExpDays -- "< 0" --> BadRetention[Return non-retryable config error]
    ExpDays -- "null / 0 / > 0" --> Idempotency[Lookup raw_data_files by file_uid]
    Idempotency --> Existing{Record exists?}
    Existing -- Yes --> ExistingOutput[Return existing record metadata]
    Existing -- No --> Duplicate[Lookup duplicate by hash_algorithm, hash, is_test_data, unprocessed]
    Duplicate --> Found{Duplicate found?}
    Found -- Yes --> DeleteNew[Delete new raw object]
    DeleteNew --> DeleteOK{Deleted?}
    DeleteOK -- No --> DeleteError[Return retryable delete error]
    DeleteOK -- Yes --> Expired{Duplicate expiration_time expired?}
    Expired -- Yes --> Extend[Extend duplicate expiration_time to now + 3h]
    Expired -- No --> UseDuplicate[Use duplicate metadata]
    Extend --> UseDuplicate
    UseDuplicate --> DuplicateOutput[Return output with raw_data.is_new = false]
    Found -- No --> ComputeExpiration[Compute expiration_time]
    ComputeExpiration --> Insert[Insert terry.raw_data_files]
    Insert --> InsertOK{Inserted?}
    InsertOK -- Yes --> NewOutput[Return output with raw_data.is_new = true]
    InsertOK -- UniqueConflict --> Duplicate
    InsertOK -- DBError --> DBError[Return retryable DB error]
    ExistingOutput --> End([End])
    DuplicateOutput --> End
    NewOutput --> End
    Invalid --> End
    BadRetention --> End
    DeleteError --> End
    DBError --> End

Input / Output

Input (loader.RecordRawDataRequest): - data_source_id - output - is_test_data

Output (loader.IngestionOutput): - возвращает исходный ingestion output с обновленным raw_data - raw_data.is_new=true для нового файла - raw_data.is_new=false для найденного duplicate - raw_data.expiration_time, is_test_data, is_processed_data, metadata

Behavior

  • Ищет duplicate глобально по hash_algorithm, hash, is_test_data среди is_processed_data=false.
  • Если duplicate найден, удаляет новый raw object из S3-compatible storage и возвращает metadata старого файла.
  • Если duplicate expired, продлевает его expiration_time до now + 3h.
  • Если duplicate не найден, создает новую запись в terry.raw_data_files.

Retention: - data_sources.expiration_days = null -> expiration_time = null - 0 -> requested_at + 3h - > 0 -> requested_at + expiration_days - < 0 -> non-retryable config error

External Dependencies

  • PostgreSQL table terry.raw_data_files
  • Hetzner S3-compatible raw object storage
  • Terry loader config resolver for expiration_days

Idempotency

Activity сначала ищет запись по file_uid. Если запись уже есть, она возвращается без повторных insert/delete side effects.

Concurrent inserts by hash are handled through the partial unique index on unprocessed raw files; the losing activity re-reads the winning duplicate and deletes its new raw object.

Error Taxonomy

  • invalid request or negative expiration_days: non-retryable config error
  • DB read/write errors: retryable
  • duplicate raw object delete failure: retryable

Callers