diff --git a/api/CHANGELOG.md b/api/CHANGELOG.md index 392b369206..61139fa810 100644 --- a/api/CHANGELOG.md +++ b/api/CHANGELOG.md @@ -2,12 +2,16 @@ All notable changes to the **Prowler API** are documented in this file. -## [1.29.0] (Prowler UNRELEASED) +## [1.29.0] (Prowler v5.28.0) ### 🚀 Added - `okta` provider support [(#11184)](https://github.com/prowler-cloud/prowler/pull/11184) +### 🔄 Changed + +- Scan finding ingestion: bulk-resolve `Resource`/`ResourceTag` rows, replace per-mapping `SELECT FOR UPDATE` with deferred `ResourceTagMapping.bulk_create(ignore_conflicts=True)`, wrap each micro-batch in a single `rls_transaction`, and raise `SCAN_DB_BATCH_SIZE` to 1000. Cuts wall-clock ~13x and `COMMIT` count by ~99% on large batches [(#11249)](https://github.com/prowler-cloud/prowler/pull/11249) + --- ## [1.28.0] (Prowler v5.27.0) diff --git a/api/src/backend/tasks/jobs/scan.py b/api/src/backend/tasks/jobs/scan.py index d4bc2afba3..a772173a7f 100644 --- a/api/src/backend/tasks/jobs/scan.py +++ b/api/src/backend/tasks/jobs/scan.py @@ -42,7 +42,6 @@ SET_CONFIG_QUERY, psycopg_connection, rls_transaction, - update_objects_in_batches, ) from api.exceptions import ProviderConnectionError from api.models import ( @@ -59,6 +58,7 @@ ResourceFindingMapping, ResourceScanSummary, ResourceTag, + ResourceTagMapping, Scan, ScanCategorySummary, ScanGroupSummary, @@ -97,8 +97,16 @@ ) # Controls how many findings we process per micro-batch before flushing to DB writes FINDINGS_MICRO_BATCH_SIZE = env.int("DJANGO_FINDINGS_MICRO_BATCH_SIZE", default=3000) -# Controls how many rows each ORM bulk_create/bulk_update call sends to Postgres -SCAN_DB_BATCH_SIZE = env.int("DJANGO_SCAN_DB_BATCH_SIZE", default=500) +# Controls how many rows each ORM bulk_create/bulk_update call sends to Postgres. +SCAN_DB_BATCH_SIZE = env.int("DJANGO_SCAN_DB_BATCH_SIZE", default=1000) +# Throttle scan progress persistence: minimum progress delta (fraction 0-1) +# between two persisted progress updates. +PROGRESS_THROTTLE_DELTA = env.float("DJANGO_SCAN_PROGRESS_THROTTLE_DELTA", default=0.01) +# Throttle scan progress persistence: maximum seconds without persisting progress +# regardless of delta (so slow checks still show progress in the UI). +PROGRESS_THROTTLE_SECONDS = env.float( + "DJANGO_SCAN_PROGRESS_THROTTLE_SECONDS", default=10.0 +) ATTACK_SURFACE_PROVIDER_COMPATIBILITY = { "internet-exposed": None, # Compatible with all providers @@ -528,16 +536,26 @@ def _process_finding_micro_batch( """ # Accumulate objects for bulk operations findings_to_create = [] - mappings_to_create = [] dirty_resources = {} + resources_with_new_tag_mappings: set[str] = set() resource_denormalized_data = [] # (finding_instance, resource_instance) pairs + tag_mappings_to_create: list[ResourceTagMapping] = [] skipped_findings_count = 0 # Track findings skipped due to UID length - # Prefetch last statuses for all findings in this batch - # TEMPORARY WORKAROUND: Filter out UIDs > 300 chars to avoid query errors - finding_uids = [ - f.uid for f in findings_batch if f is not None and len(f.uid) <= 300 - ] + # Separate findings into those persistable (uid <= 300) and over-limit. + # Resources/tags ARE still resolved for over-limit findings to preserve the + # original behavior (resources are persisted even when their finding is dropped). + non_null_findings = [f for f in findings_batch if f is not None] + persistable_findings = [f for f in non_null_findings if len(f.uid) <= 300] + skipped_findings_count = len(non_null_findings) - len(persistable_findings) + none_count = len(findings_batch) - len(non_null_findings) + if none_count: + logger.error( + f"{none_count} None finding(s) detected on scan {scan_instance.id}." + ) + + # Prefetch last statuses for all persistable findings in this batch (read replica) + finding_uids = [f.uid for f in persistable_findings] with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS): last_statuses = { item["uid"]: (item["status"], item["first_seen_at"]) @@ -548,281 +566,411 @@ def _process_finding_micro_batch( .order_by("uid", "-inserted_at") .distinct("uid") } - # Update cache for uid, data in last_statuses.items(): if uid not in last_status_cache: last_status_cache[uid] = data - # Process each finding in the batch - for finding in findings_batch: - if finding is None: - logger.error(f"None finding detected on scan {scan_instance.id}.") - continue + # All DB writes for this micro-batch run inside ONE rls_transaction, + # with deadlock-retry at micro-batch granularity instead of per-finding. + for attempt in range(CELERY_DEADLOCK_ATTEMPTS): + try: + with rls_transaction(tenant_id): + # 1) Pre-resolve Resources in bulk + # Collect all uids referenced by this batch that are not in cache yet. + # NOTE: we intentionally include empty-string uids here. The SDK + # explicitly emits findings with `resource_uid=""` for some flows + # (IaC scans, some Azure/GCP/K8s checks). The original + # `get_or_create` behavior was to create/share a Resource with + # uid="" for these findings rather than dropping them. Preserve + # that behavior; do NOT filter by truthiness. + batch_resource_uids: set[str] = set() + for f in non_null_findings: + if f.resource_uid not in resource_cache: + batch_resource_uids.add(f.resource_uid) + + if batch_resource_uids: + existing_resources = { + r.uid: r + for r in Resource.objects.filter( + tenant_id=tenant_id, + provider_id=provider_instance.id, + uid__in=batch_resource_uids, + ) + } + missing_uids = batch_resource_uids - existing_resources.keys() + if missing_uids: + # Build defaults from the first finding referencing each uid. + first_finding_per_uid: dict[str, ProwlerFinding] = {} + for f in non_null_findings: + if f.resource_uid in missing_uids: + first_finding_per_uid.setdefault(f.resource_uid, f) + resources_to_create = [] + for uid in missing_uids: + f = first_finding_per_uid[uid] + check_metadata = f.get_metadata() + group = check_metadata.get("resourcegroup") or None + resources_to_create.append( + Resource( + tenant_id=tenant_id, + provider=provider_instance, + uid=uid, + region=f.region, + service=f.service_name, + type=f.resource_type, + name=f.resource_name, + groups=[group] if group else None, + ) + ) + Resource.objects.bulk_create( + resources_to_create, + batch_size=SCAN_DB_BATCH_SIZE, + ignore_conflicts=True, + unique_fields=["tenant_id", "provider_id", "uid"], + ) + # Re-fetch to obtain instances we just created AND any + # created concurrently by another scan against the same provider. + existing_resources.update( + { + r.uid: r + for r in Resource.objects.filter( + tenant_id=tenant_id, + provider_id=provider_instance.id, + uid__in=missing_uids, + ) + } + ) + for uid, r in existing_resources.items(): + resource_cache[uid] = r + resource_failed_findings_cache.setdefault(uid, 0) + + # 2) Pre-resolve ResourceTags in bulk + batch_tag_kv: set[tuple[str, str]] = set() + for f in non_null_findings: + for k, v in f.resource_tags.items(): + if (k, v) not in tag_cache: + batch_tag_kv.add((k, v)) + + if batch_tag_kv: + keys_to_query = {k for k, _ in batch_tag_kv} + existing_tags = { + (t.key, t.value): t + for t in ResourceTag.objects.filter( + tenant_id=tenant_id, key__in=keys_to_query + ) + if (t.key, t.value) in batch_tag_kv + } + missing_kv = batch_tag_kv - existing_tags.keys() + if missing_kv: + ResourceTag.objects.bulk_create( + [ + ResourceTag(tenant_id=tenant_id, key=k, value=v) + for k, v in missing_kv + ], + batch_size=SCAN_DB_BATCH_SIZE, + ignore_conflicts=True, + unique_fields=["tenant_id", "key", "value"], + ) + existing_tags.update( + { + (t.key, t.value): t + for t in ResourceTag.objects.filter( + tenant_id=tenant_id, + key__in={k for k, _ in missing_kv}, + ) + if (t.key, t.value) in missing_kv + } + ) + tag_cache.update(existing_tags) - # Process resource with deadlock retry - for attempt in range(CELERY_DEADLOCK_ATTEMPTS): - try: - with rls_transaction(tenant_id): + # 3) Per-finding in-memory processing + for finding in non_null_findings: resource_uid = finding.resource_uid - if resource_uid not in resource_cache: - check_metadata = finding.get_metadata() - group = check_metadata.get("resourcegroup") or None - resource_instance, _ = Resource.objects.get_or_create( - tenant_id=tenant_id, - provider=provider_instance, - uid=resource_uid, - defaults={ - "region": finding.region, - "service": finding.service_name, - "type": finding.resource_type, - "name": finding.resource_name, - "groups": [group] if group else None, - }, + resource_instance = resource_cache.get(resource_uid) + if resource_instance is None: + # Should be unreachable after the pre-resolve step. Defensive log. + logger.error( + f"Resource {resource_uid} missing from cache after pre-resolve " + f"on scan {scan_instance.id}; skipping finding." ) - resource_cache[resource_uid] = resource_instance - resource_failed_findings_cache[resource_uid] = 0 - else: - resource_instance = resource_cache[resource_uid] - break - except (OperationalError, IntegrityError) as db_err: - if attempt < CELERY_DEADLOCK_ATTEMPTS - 1: - logger.warning( - f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} " - f"detected when processing resource {resource_uid} on scan {scan_instance.id}. Retrying..." - ) - time.sleep(0.1 * (2**attempt)) - continue - else: - raise db_err - - # Track resource field changes (defer save) - updated = False - check_metadata = finding.get_metadata() - group = check_metadata.get("resourcegroup") or None - if finding.region and resource_instance.region != finding.region: - resource_instance.region = finding.region - updated = True - if resource_instance.service != finding.service_name: - resource_instance.service = finding.service_name - updated = True - if resource_instance.type != finding.resource_type: - resource_instance.type = finding.resource_type - updated = True - if resource_instance.metadata != finding.resource_metadata: - resource_instance.metadata = json.dumps( - finding.resource_metadata, cls=CustomEncoder - ) - updated = True - if resource_instance.details != finding.resource_details: - resource_instance.details = finding.resource_details - updated = True - if resource_instance.partition != finding.partition: - resource_instance.partition = finding.partition - updated = True - if group and ( - not resource_instance.groups or group not in resource_instance.groups - ): - resource_instance.groups = (resource_instance.groups or []) + [group] - updated = True + continue - if updated: - dirty_resources[resource_uid] = resource_instance + # Detect resource field changes (defer save until end-of-batch bulk_update). + check_metadata = finding.get_metadata() + group = check_metadata.get("resourcegroup") or None + updated = False + if finding.region and resource_instance.region != finding.region: + resource_instance.region = finding.region + updated = True + if resource_instance.service != finding.service_name: + resource_instance.service = finding.service_name + updated = True + if resource_instance.type != finding.resource_type: + resource_instance.type = finding.resource_type + updated = True + if resource_instance.metadata != finding.resource_metadata: + resource_instance.metadata = json.dumps( + finding.resource_metadata, cls=CustomEncoder + ) + updated = True + if resource_instance.details != finding.resource_details: + resource_instance.details = finding.resource_details + updated = True + if resource_instance.partition != finding.partition: + resource_instance.partition = finding.partition + updated = True + if group and ( + not resource_instance.groups + or group not in resource_instance.groups + ): + resource_instance.groups = (resource_instance.groups or []) + [ + group + ] + updated = True + + if updated: + dirty_resources[resource_uid] = resource_instance + + # Accumulate ResourceTagMapping rows; bulk_create at end of block. + for k, v in finding.resource_tags.items(): + tag_instance = tag_cache.get((k, v)) + if tag_instance is None: + # Should not happen after pre-resolve; skip defensively. + continue + tag_mappings_to_create.append( + ResourceTagMapping( + tenant_id=tenant_id, + resource=resource_instance, + tag=tag_instance, + ) + ) - # Process tags - tags = [] - with rls_transaction(tenant_id): - for key, value in finding.resource_tags.items(): - tag_key = (key, value) - if tag_key not in tag_cache: - tag_instance, _ = ResourceTag.objects.get_or_create( - tenant_id=tenant_id, key=key, value=value + unique_resources.add( + (resource_instance.uid, resource_instance.region) ) - tag_cache[tag_key] = tag_instance - else: - tag_instance = tag_cache[tag_key] - tags.append(tag_instance) - resource_instance.upsert_or_delete_tags(tags=tags) - unique_resources.add((resource_instance.uid, resource_instance.region)) + # TEMPORARY WORKAROUND: Skip findings with UID > 300 chars + # TODO: Remove this after implementing text field migration for finding.uid + if len(finding.uid) > 300: + logger.warning( + f"Skipping finding with UID exceeding 300 characters. " + f"Length: {len(finding.uid)}, " + f"Check: {finding.check_id}, " + f"Resource: {finding.resource_name}, " + f"UID: {finding.uid}" + ) + continue - # Prepare finding data - finding_uid = finding.uid + finding_uid = finding.uid + last_status, last_first_seen_at = last_status_cache.get( + finding_uid, (None, None) + ) - # TEMPORARY WORKAROUND: Skip findings with UID > 300 chars - # TODO: Remove this after implementing text field migration for finding.uid - if len(finding_uid) > 300: - skipped_findings_count += 1 - logger.warning( - f"Skipping finding with UID exceeding 300 characters. " - f"Length: {len(finding_uid)}, " - f"Check: {finding.check_id}, " - f"Resource: {finding.resource_name}, " - f"UID: {finding_uid}" - ) - continue + status = FindingStatus[finding.status] + delta = _create_finding_delta(last_status, status) + + if not last_first_seen_at: + last_first_seen_at = datetime.now(tz=timezone.utc) + + # Determine if finding should be muted and why + # Priority: mutelist processor (highest) > manual mute rules + is_muted = False + muted_reason = None + if finding.muted: + is_muted = True + muted_reason = "Muted by mutelist" + elif finding_uid in mute_rules_cache: + is_muted = True + muted_reason = mute_rules_cache[finding_uid] + + if status == FindingStatus.FAIL and not is_muted: + resource_failed_findings_cache[resource_uid] += 1 + + check_metadata["compliance"] = finding.compliance + finding_instance = Finding( + tenant_id=tenant_id, + uid=finding_uid, + delta=delta, + check_metadata=check_metadata, + status=status, + status_extended=finding.status_extended, + severity=finding.severity, + impact=finding.severity, + raw_result=finding.raw, + check_id=finding.check_id, + scan=scan_instance, + first_seen_at=last_first_seen_at, + muted=is_muted, + muted_at=datetime.now(tz=timezone.utc) if is_muted else None, + muted_reason=muted_reason, + compliance=finding.compliance, + categories=check_metadata.get("categories", []) or [], + resource_groups=check_metadata.get("resourcegroup") or None, + # Denormalized resource arrays populated directly on insert + # (was previously a separate bulk_update; saves a CASE WHEN + # over thousands of rows per micro-batch). + resource_regions=[resource_instance.region] + if resource_instance.region + else [], + resource_services=[resource_instance.service] + if resource_instance.service + else [], + resource_types=[resource_instance.type] + if resource_instance.type + else [], + ) + findings_to_create.append(finding_instance) + resource_denormalized_data.append( + (finding_instance, resource_instance) + ) - last_status, last_first_seen_at = last_status_cache.get( - finding_uid, (None, None) - ) + scan_resource_cache.add( + ( + str(resource_instance.id), + resource_instance.service, + resource_instance.region, + resource_instance.type, + ) + ) - status = FindingStatus[finding.status] - delta = _create_finding_delta(last_status, status) - - if not last_first_seen_at: - last_first_seen_at = datetime.now(tz=timezone.utc) - - # Determine if finding should be muted and why - # Priority: mutelist processor (highest) > manual mute rules - is_muted = False - muted_reason = None - - # Check mutelist processor first (highest priority) - if finding.muted: - is_muted = True - muted_reason = "Muted by mutelist" - # If not muted by mutelist, check manual mute rules - elif finding_uid in mute_rules_cache: - is_muted = True - muted_reason = mute_rules_cache[finding_uid] - - # Increment failed_findings_count cache if needed - if status == FindingStatus.FAIL and not is_muted: - resource_failed_findings_cache[resource_uid] += 1 - - # Create finding object (don't save yet) - check_metadata = finding.get_metadata() - check_metadata["compliance"] = finding.compliance - finding_instance = Finding( - tenant_id=tenant_id, - uid=finding_uid, - delta=delta, - check_metadata=check_metadata, - status=status, - status_extended=finding.status_extended, - severity=finding.severity, - impact=finding.severity, - raw_result=finding.raw, - check_id=finding.check_id, - scan=scan_instance, - first_seen_at=last_first_seen_at, - muted=is_muted, - muted_at=datetime.now(tz=timezone.utc) if is_muted else None, - muted_reason=muted_reason, - compliance=finding.compliance, - categories=check_metadata.get("categories", []) or [], - resource_groups=check_metadata.get("resourcegroup") or None, - ) - findings_to_create.append(finding_instance) - resource_denormalized_data.append((finding_instance, resource_instance)) - - # Track for scan summary - scan_resource_cache.add( - ( - str(resource_instance.id), - resource_instance.service, - resource_instance.region, - resource_instance.type, - ) - ) + aggregate_category_counts( + categories=check_metadata.get("categories", []) or [], + severity=finding.severity.value, + status=status.value, + delta=delta.value if delta else None, + muted=is_muted, + cache=scan_categories_cache, + ) - # Track categories with counts for ScanCategorySummary by (category, severity) - aggregate_category_counts( - categories=check_metadata.get("categories", []) or [], - severity=finding.severity.value, - status=status.value, - delta=delta.value if delta else None, - muted=is_muted, - cache=scan_categories_cache, - ) + aggregate_resource_group_counts( + resource_group=check_metadata.get("resourcegroup") or None, + severity=finding.severity.value, + status=status.value, + delta=delta.value if delta else None, + muted=is_muted, + resource_uid=resource_instance.uid if resource_instance else "", + cache=scan_resource_groups_cache, + group_resources_cache=group_resources_cache, + ) - # Track resource groups with counts for ScanGroupSummary - aggregate_resource_group_counts( - resource_group=check_metadata.get("resourcegroup") or None, - severity=finding.severity.value, - status=status.value, - delta=delta.value if delta else None, - muted=is_muted, - resource_uid=resource_instance.uid if resource_instance else "", - cache=scan_resource_groups_cache, - group_resources_cache=group_resources_cache, - ) + # 4) Bulk create ResourceTagMappings + # Replaces the original per-resource `upsert_or_delete_tags` + # (which did one `update_or_create` + SELECT FOR UPDATE per mapping). + if tag_mappings_to_create: + # Pre-SELECT existing pairs: `bulk_create(ignore_conflicts=True)` + # does not populate `pk`, so we cannot tell new vs existing from + # the result; we need that to bump `updated_at` only on resources + # that actually gain a mapping. + candidate_resource_ids = { + m.resource_id for m in tag_mappings_to_create + } + candidate_tag_ids = {m.tag_id for m in tag_mappings_to_create} + existing_pairs = set( + ResourceTagMapping.objects.filter( + tenant_id=tenant_id, + resource_id__in=candidate_resource_ids, + tag_id__in=candidate_tag_ids, + ).values_list("resource_id", "tag_id") + ) + resource_uid_by_id = { + str(r.id): uid for uid, r in resource_cache.items() + } + for m in tag_mappings_to_create: + if (m.resource_id, m.tag_id) not in existing_pairs: + uid = resource_uid_by_id.get(str(m.resource_id)) + if uid is not None: + resources_with_new_tag_mappings.add(uid) + + ResourceTagMapping.objects.bulk_create( + tag_mappings_to_create, + batch_size=SCAN_DB_BATCH_SIZE, + ignore_conflicts=True, + unique_fields=["tenant_id", "resource_id", "tag_id"], + ) - # Bulk operations within single transaction - with rls_transaction(tenant_id): - # Bulk create findings - if findings_to_create: - Finding.objects.bulk_create( - findings_to_create, batch_size=SCAN_DB_BATCH_SIZE - ) + # 5) Bulk create Findings + if findings_to_create: + Finding.objects.bulk_create( + findings_to_create, batch_size=SCAN_DB_BATCH_SIZE + ) - # Bulk create resource-finding mappings - for finding_instance, resource_instance in resource_denormalized_data: - mappings_to_create.append( - ResourceFindingMapping( - tenant_id=tenant_id, - resource=resource_instance, - finding=finding_instance, - ) - ) + # 6) Bulk create ResourceFindingMapping rows + mappings_to_create = [ + ResourceFindingMapping( + tenant_id=tenant_id, + resource=resource_instance, + finding=finding_instance, + ) + for finding_instance, resource_instance in resource_denormalized_data + ] + if mappings_to_create: + created_mappings = ResourceFindingMapping.objects.bulk_create( + mappings_to_create, + batch_size=SCAN_DB_BATCH_SIZE, + ignore_conflicts=True, + unique_fields=["tenant_id", "resource_id", "finding_id"], + ) + inserted = sum(1 for m in created_mappings if m.pk) + if inserted != len(mappings_to_create): + logger.error( + f"scan {scan_instance.id}: expected " + f"{len(mappings_to_create)} ResourceFindingMapping rows, " + f"inserted {inserted}. Rolling back micro-batch." + ) - if mappings_to_create: - created_mappings = ResourceFindingMapping.objects.bulk_create( - mappings_to_create, - batch_size=SCAN_DB_BATCH_SIZE, - ignore_conflicts=True, - unique_fields=["tenant_id", "resource_id", "finding_id"], - ) - inserted = sum(1 for m in created_mappings if m.pk) - if inserted != len(mappings_to_create): - logger.error( - f"scan {scan_instance.id}: expected " - f"{len(mappings_to_create)} ResourceFindingMapping rows, " - f"inserted {inserted}. Rolling back micro-batch." + # 7) Bulk update Resources + # Union of: + # - resources whose fields changed (dirty_resources) + # - resources that got new tag mappings (need updated_at bump, + # preserving the original `self.save(update_fields=["updated_at"])` + # behavior of `upsert_or_delete_tags`) + all_resource_uids_to_touch = ( + set(dirty_resources.keys()) | resources_with_new_tag_mappings ) - - # Update finding denormalized arrays - findings_to_update = [] - for finding_instance, resource_instance in resource_denormalized_data: - if not finding_instance.resource_regions: - finding_instance.resource_regions = [] - if not finding_instance.resource_services: - finding_instance.resource_services = [] - if not finding_instance.resource_types: - finding_instance.resource_types = [] - - if resource_instance.region not in finding_instance.resource_regions: - finding_instance.resource_regions.append(resource_instance.region) - if resource_instance.service not in finding_instance.resource_services: - finding_instance.resource_services.append(resource_instance.service) - if resource_instance.type not in finding_instance.resource_types: - finding_instance.resource_types.append(resource_instance.type) - - findings_to_update.append(finding_instance) - - if findings_to_update: - Finding.objects.bulk_update( - findings_to_update, - ["resource_regions", "resource_services", "resource_types"], - batch_size=SCAN_DB_BATCH_SIZE, - ) - - # Bulk update dirty resources - if dirty_resources: - update_objects_in_batches( - tenant_id=tenant_id, - model=Resource, - objects=list(dirty_resources.values()), - fields=[ - "metadata", - "details", - "partition", - "region", - "service", - "type", - "groups", - ], - batch_size=1000, - ) + if all_resource_uids_to_touch: + now_utc = datetime.now(tz=timezone.utc) + resources_to_bulk_update = [] + for uid in all_resource_uids_to_touch: + # Use the instance from dirty_resources if present (has mutated + # fields), otherwise the cached one (for updated_at bump only). + r = dirty_resources.get(uid) or resource_cache.get(uid) + if r is None: + continue + # Manually bump updated_at since bulk_update bypasses auto_now. + r.updated_at = now_utc + resources_to_bulk_update.append(r) + if resources_to_bulk_update: + Resource.objects.bulk_update( + resources_to_bulk_update, + [ + "metadata", + "details", + "partition", + "region", + "service", + "type", + "groups", + "updated_at", + ], + batch_size=1000, + ) + # Successful execution: leave deadlock retry loop. + break + except (OperationalError, IntegrityError) as db_err: + if attempt < CELERY_DEADLOCK_ATTEMPTS - 1: + logger.warning( + f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} " + f"on micro-batch for scan {scan_instance.id}. Retrying (attempt {attempt + 1})..." + ) + time.sleep(0.1 * (2**attempt)) + # Clear accumulators that we appended to inside the failed transaction + # so the retry produces consistent results. + findings_to_create.clear() + resource_denormalized_data.clear() + tag_mappings_to_create.clear() + dirty_resources.clear() + resources_with_new_tag_mappings.clear() + continue + raise # Log skipped findings summary if skipped_findings_count > 0: @@ -873,7 +1021,7 @@ def perform_prowler_scan( scan_instance = Scan.objects.get(pk=scan_id) scan_instance.state = StateChoices.EXECUTING scan_instance.started_at = datetime.now(tz=timezone.utc) - scan_instance.save() + scan_instance.save(update_fields=["state", "started_at", "updated_at"]) # Find the mutelist processor if it exists with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS): @@ -918,7 +1066,13 @@ def perform_prowler_scan( provider_instance.connection_last_checked_at = datetime.now( tz=timezone.utc ) - provider_instance.save() + provider_instance.save( + update_fields=[ + "connected", + "connection_last_checked_at", + "updated_at", + ] + ) # If the provider is not connected, raise an exception outside the transaction. # If raised within the transaction, the transaction will be rolled back and the provider will not be marked @@ -933,6 +1087,13 @@ def perform_prowler_scan( last_status_cache = {} resource_failed_findings_cache = defaultdict(int) + # Throttle scan_instance progress writes to avoid hammering the writer: + # only persist when progress moves by at least `PROGRESS_THROTTLE_DELTA` + # OR `PROGRESS_THROTTLE_SECONDS` have elapsed. The final progress (1.0) + # always persists in the `finally` block below. + last_persisted_progress = -1.0 + last_persisted_progress_at = 0.0 + for progress, findings in prowler_scan.scan(): # Process findings in micro-batches findings_list = list(findings) @@ -959,10 +1120,20 @@ def perform_prowler_scan( group_resources_cache=group_resources_cache, ) - # Update scan progress - with rls_transaction(tenant_id): - scan_instance.progress = progress - scan_instance.save() + # Throttled progress save (the final save in the `finally` block + # below always runs regardless of throttle). + now = time.time() + progress_delta = progress - last_persisted_progress + elapsed = now - last_persisted_progress_at + if ( + progress_delta >= PROGRESS_THROTTLE_DELTA + or elapsed >= PROGRESS_THROTTLE_SECONDS + ): + with rls_transaction(tenant_id): + scan_instance.progress = progress + scan_instance.save(update_fields=["progress", "updated_at"]) + last_persisted_progress = progress + last_persisted_progress_at = now scan_instance.state = StateChoices.COMPLETED @@ -976,13 +1147,16 @@ def perform_prowler_scan( resources_to_update.append(resource_instance) if resources_to_update: - update_objects_in_batches( - tenant_id=tenant_id, - model=Resource, - objects=resources_to_update, - fields=["failed_findings_count"], - batch_size=1000, - ) + # Single rls_transaction wrapping the bulk_update (previously + # `update_objects_in_batches` opened one rls_transaction per + # chunk; for tenants with many resources this collapsed N + # BEGINs/COMMITs into 1). + with rls_transaction(tenant_id): + Resource.objects.bulk_update( + resources_to_update, + ["failed_findings_count"], + batch_size=SCAN_DB_BATCH_SIZE, + ) except Exception as e: logger.error(f"Error performing scan {scan_id}: {e}") @@ -994,7 +1168,16 @@ def perform_prowler_scan( scan_instance.duration = time.time() - start_time scan_instance.completed_at = datetime.now(tz=timezone.utc) scan_instance.unique_resource_count = len(unique_resources) - scan_instance.save() + scan_instance.save( + update_fields=[ + "state", + "duration", + "completed_at", + "unique_resource_count", + "progress", + "updated_at", + ] + ) if exception is not None: raise exception