diff --git a/docker-compose.yml b/docker-compose.yml index 2b6c2986f..c6e79f1a9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,7 +12,8 @@ services: mem_limit: 6g cpus: 2 environment: - - UNSTRUCTURED_MAX_WORKERS=2 + - UNSTRUCTURED_MAX_WORKERS=1 + - UNSTRUCTURED_MEMORY_FREE_MB=1024 - UNSTRUCTURED_TEMP_DIR=/tmp/unstructured_temp - ENV=local volumes: @@ -120,11 +121,15 @@ services: environment: - MINIO_ACCESS_KEY=minioadmin - MINIO_SECRET_KEY=minioadmin + - MINIO_BROWSER=on + - MINIO_CACHE=off + - MINIO_API_REQUESTS_MAX=100 command: server --console-address ":9001" /data + volumes: + - minio-data:/data networks: - redbox-app-network - volumes: - - ./data/objectstore:/data + mem_limit: 1g restart: unless-stopped healthcheck: test: [ "CMD", "mc", "ready", "local" ] @@ -180,6 +185,7 @@ networks: driver: bridge volumes: + minio-data: opensearch-data: redbox-app-data: local_postgres_data: {} diff --git a/redbox/redbox/loader/loaders.py b/redbox/redbox/loader/loaders.py index f25cfdef6..91274aafb 100644 --- a/redbox/redbox/loader/loaders.py +++ b/redbox/redbox/loader/loaders.py @@ -1,6 +1,5 @@ import logging import time -import json from collections.abc import Iterator from datetime import UTC, datetime from io import BytesIO @@ -201,51 +200,74 @@ def _get_chunks(self, file_name: str, file_bytes: BytesIO) -> List[dict]: is_large, page_count = is_large_pdf(file_name=file_name, filebytes=file_bytes) is_tabular = file_name.endswith((".csv", ".xls", ".xlsx")) + if is_tabular and self.chunk_resolution == ChunkResolution.tabular: + # Carry out the special ingest process for tabular files - will be carried out in addition to + elements = load_tabular_file(file_name=file_name, file_bytes=file_bytes) + logger.debug("Unstructured returned %d elements", len(elements)) + return elements + + is_pdf_image_heavy = False + if file_name.lower().endswith(".pdf"): + try: + is_pdf_image_heavy = _pdf_is_image_heavy(file_bytes) + except Exception: + pass + if is_large and page_count > 0: + pages_per_chunk = 10 if is_pdf_image_heavy else 25 if page_count > 150 else self.pages_per_pdf_chunk + logger.info( "Large PDF with (%d pages) - splitting into chunks with %d pages", page_count, self.pages_per_pdf_chunk ) + elements: List[dict] = [] - pdf_chunks = split_pdf(filebytes=file_bytes, pages_per_chunk=self.pages_per_pdf_chunk) + pdf_chunks = split_pdf(filebytes=file_bytes, pages_per_chunk=pages_per_chunk) + for idx, chunk in enumerate(pdf_chunks): chunk.seek(0) files = {"files": (file_name, chunk)} + try: chunk_elements = self._post_files_with_fallback( - url=url, files=files, file_name=file_name, file_bytes=chunk + url=url, + files=files, + file_name=file_name, + file_bytes=chunk, + is_pdf_image_heavy=is_pdf_image_heavy, ) except Exception as e: msg = f"Chunk {idx + 1} failed: {e}" logger.exception(msg) raise ValueError(msg) elements.extend(chunk_elements) - logger.debug("Unstructured returned %d elements", len(elements)) - return elements - elif is_tabular and self.chunk_resolution == ChunkResolution.tabular: - # Carry out the special ingest process for tabular files - will be carried out in addition to - elements = load_tabular_file(file_name=file_name, file_bytes=file_bytes) logger.debug("Unstructured returned %d elements", len(elements)) return elements - try: - file_bytes.seek(0) - except Exception as e: - logger.warning("Unable to seek file %s before upload - %s", file_name, str(e)) - + file_bytes.seek(0) files = {"files": (file_name, file_bytes)} - elements = self._post_files_with_fallback(url=url, files=files, file_name=file_name, file_bytes=file_bytes) + + elements = self._post_files_with_fallback( + url=url, + files=files, + file_name=file_name, + file_bytes=file_bytes, + is_pdf_image_heavy=is_pdf_image_heavy, + ) + if not elements: raise ValueError("Unstructured failed to extract text for this file") - logger.debug("Unstructured returned %d elements", len(elements)) - return elements - def _post_files_with_fallback(self, url: str, files: dict, file_name: str, file_bytes: BytesIO) -> List[dict]: - try: - file_bytes.seek(0) - except Exception as e: - logger.warning("Unable to seek file %s before upload - %s", file_name, e) + return elements + def _post_files_with_fallback( + self, + url: str, + files: dict, + file_name: str, + file_bytes: BytesIO, + is_pdf_image_heavy: bool, + ) -> List[dict]: # build default data payload base_data = { "chunking_strategy": "by_title", @@ -253,104 +275,96 @@ def _post_files_with_fallback(self, url: str, files: dict, file_name: str, file_ "combine_under_n_chars": self._min_chunk_size, "overlap": self._overlap_chars, "overlap_all": str(self._overlap_all_chunks).lower(), - "infer_table_structure": "true", } - # detect if file is an image-heavy pdf - lower_name = file_name.lower() - is_pdf_image_heavy = False - if lower_name.endswith(".pdf"): - try: - is_pdf_image_heavy = _pdf_is_image_heavy(file_bytes) - except Exception: - is_pdf_image_heavy = False + candidate_payloads: list[dict] = [] - logger.debug("file %s pdf_image_heavy=%s", file_name, is_pdf_image_heavy) + if not is_pdf_image_heavy: + candidate_payloads.append( + { + **base_data, + "strategy": "fast", + "infer_table_structure": "true", + } + ) - candidate_data_payloads = [] + candidate_payloads.append( + { + **base_data, + "strategy": "hi_res", + "hi_res_model_name": "yolox", + "pdf_image_dpi": 200, + "infer_table_structure": "true", + } + ) - # try fast strategy first - if not is_pdf_image_heavy: - candidate_data_payloads.append({**base_data, "strategy": "fast"}) - # then fallback 1 let unstructured pick the strategy - candidate_data_payloads.append({**base_data}) - # then fallback 2 conservative chunking - candidate_data_payloads.append({**base_data, "infer_table_structure": "false"}) + else: + candidate_payloads.append( + { + **base_data, + "strategy": "hi_res", + "hi_res_model_name": "yolox", + "pdf_image_dpi": 150, + "infer_table_structure": "false", + "languages": "eng", + } + ) last_exc = None - for attempt, data in enumerate(candidate_data_payloads, start=1): + + for attempt, data in enumerate(candidate_payloads, start=1): for retry in range(self.max_retries): try: file_bytes.seek(0) + logger.info( "calling Unstructured API - attempt %d.%d for %s with payload keys=%s", attempt, retry + 1, file_name, - list(data.keys()), + {k: data[k] for k in ("strategy", "pdf_image_dpi", "infer_table_structure") if k in data}, ) - resp = requests.post(url, files=files, data=data, timeout=self.request_timeout) - status = resp.status_code - text = resp.text or "" - try: - json_body = resp.json() - except Exception: - json_body = None - - if status == 200: - try: - elements = resp.json() - except Exception as parse_exc: - logger.exception("Failed parsing Unstructured JSON - %s", parse_exc) - try: - elements = json.loads(resp.text) - except Exception as fallback_exc: - raise ValueError("Failed to parse Unstructured JSON response") from fallback_exc + resp = requests.post( + url, + files=files, + data=data, + timeout=self.request_timeout, + ) + + if resp.status_code == 200: + elements = resp.json() if not isinstance(elements, list): logger.warning("Unstructured responded with unexpected payload type - %s", type(elements)) raise ValueError("Unexpected payload from Unstructured") return elements - detail_msg = "" - if isinstance(json_body, dict): - detail_msg = json_body.get("detail", "") or json_body.get("error", "") - if "fast strategy" in text.lower() or "fast strategy" in str(detail_msg).lower(): - logger.warning( - "Unstructured server reported fast strategy unavailable so trying fallback payloads" - ) - last_exc = ValueError(f"Unstructured error - {resp.status_code} {resp.text}") - break - - if 400 <= status < 500: - logger.error("Unstructured returned client error %d - %s", status, resp.text) - last_exc = ValueError(f"Client error {status} - {resp.text}") - break - - if status >= 500: - logger.warning( - "Server error %d from Unstructured, will retry - response was: %s", status, resp.text[:200] - ) - last_exc = RequestException(f"Server error {status}") - _time.sleep((2**retry) * 0.5) - continue - - last_exc = ValueError(f"Unexpected status {status} - {resp.text}") - break + # if client errors don't retry + if 400 <= resp.status_code < 500: + raise ValueError(f"Unexpected status {resp.status_code} - {resp.text}") - except RequestException as re: - logger.warning("RequestException communicating with Unstructured - %s", re) - last_exc = re + # if server error retry + logger.warning( + "Unstructured error - %d, retrying (%d/%d)", + resp.status_code, + retry + 1, + self.max_retries, + ) _time.sleep((2**retry) * 0.5) - continue - else: - logger.debug("Exhausted retries for payload moving to next approach") - continue - continue + + except RequestException as e: + last_exc = e + logger.warning("Request exception, retrying: %s", e) + _time.sleep((2**retry) * 0.5) + + except Exception as e: + last_exc = e + logger.error("Unstructured failure: %s", e) + break # if we're at this point everything failed logger.exception("All Unstructured requests failed for file %s. Last exception: %s", file_name, last_exc) - raise last_exc or RuntimeError("Unstructured requests failed without a recorded exception") + raise last_exc or RuntimeError("All Unstructured requests failed") def lazy_load(self, file_name: str, file_bytes: BytesIO) -> Iterator[Document]: """A lazy loader that reads a file line by line.