Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ services:
mem_limit: 6g
cpus: 2
environment:
- UNSTRUCTURED_MAX_WORKERS=2
- UNSTRUCTURED_MAX_WORKERS=1
- UNSTRUCTURED_MEMORY_FREE_MB=1024
- UNSTRUCTURED_TEMP_DIR=/tmp/unstructured_temp
- ENV=local
volumes:
Expand Down Expand Up @@ -120,11 +121,15 @@ services:
environment:
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
- MINIO_BROWSER=on
- MINIO_CACHE=off
- MINIO_API_REQUESTS_MAX=100
command: server --console-address ":9001" /data
volumes:
- minio-data:/data
networks:
- redbox-app-network
volumes:
- ./data/objectstore:/data
mem_limit: 1g
restart: unless-stopped
healthcheck:
test: [ "CMD", "mc", "ready", "local" ]
Expand Down Expand Up @@ -180,6 +185,7 @@ networks:
driver: bridge

volumes:
minio-data:
opensearch-data:
redbox-app-data:
local_postgres_data: {}
198 changes: 106 additions & 92 deletions redbox/redbox/loader/loaders.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import time
import json
from collections.abc import Iterator
from datetime import UTC, datetime
from io import BytesIO
Expand Down Expand Up @@ -201,156 +200,171 @@ def _get_chunks(self, file_name: str, file_bytes: BytesIO) -> List[dict]:
is_large, page_count = is_large_pdf(file_name=file_name, filebytes=file_bytes)
is_tabular = file_name.endswith((".csv", ".xls", ".xlsx"))

if is_tabular and self.chunk_resolution == ChunkResolution.tabular:
# Carry out the special ingest process for tabular files - will be carried out in addition to
elements = load_tabular_file(file_name=file_name, file_bytes=file_bytes)
logger.debug("Unstructured returned %d elements", len(elements))
return elements

is_pdf_image_heavy = False
if file_name.lower().endswith(".pdf"):
try:
is_pdf_image_heavy = _pdf_is_image_heavy(file_bytes)
except Exception:
pass

if is_large and page_count > 0:
pages_per_chunk = 10 if is_pdf_image_heavy else 25 if page_count > 150 else self.pages_per_pdf_chunk

logger.info(
"Large PDF with (%d pages) - splitting into chunks with %d pages", page_count, self.pages_per_pdf_chunk
)

elements: List[dict] = []
pdf_chunks = split_pdf(filebytes=file_bytes, pages_per_chunk=self.pages_per_pdf_chunk)
pdf_chunks = split_pdf(filebytes=file_bytes, pages_per_chunk=pages_per_chunk)

for idx, chunk in enumerate(pdf_chunks):
chunk.seek(0)
files = {"files": (file_name, chunk)}

try:
chunk_elements = self._post_files_with_fallback(
url=url, files=files, file_name=file_name, file_bytes=chunk
url=url,
files=files,
file_name=file_name,
file_bytes=chunk,
is_pdf_image_heavy=is_pdf_image_heavy,
)
except Exception as e:
msg = f"Chunk {idx + 1} failed: {e}"
logger.exception(msg)
raise ValueError(msg)
elements.extend(chunk_elements)
logger.debug("Unstructured returned %d elements", len(elements))
return elements

elif is_tabular and self.chunk_resolution == ChunkResolution.tabular:
# Carry out the special ingest process for tabular files - will be carried out in addition to
elements = load_tabular_file(file_name=file_name, file_bytes=file_bytes)
logger.debug("Unstructured returned %d elements", len(elements))
return elements

try:
file_bytes.seek(0)
except Exception as e:
logger.warning("Unable to seek file %s before upload - %s", file_name, str(e))

file_bytes.seek(0)
files = {"files": (file_name, file_bytes)}
elements = self._post_files_with_fallback(url=url, files=files, file_name=file_name, file_bytes=file_bytes)

elements = self._post_files_with_fallback(
url=url,
files=files,
file_name=file_name,
file_bytes=file_bytes,
is_pdf_image_heavy=is_pdf_image_heavy,
)

if not elements:
raise ValueError("Unstructured failed to extract text for this file")
logger.debug("Unstructured returned %d elements", len(elements))
return elements

def _post_files_with_fallback(self, url: str, files: dict, file_name: str, file_bytes: BytesIO) -> List[dict]:
try:
file_bytes.seek(0)
except Exception as e:
logger.warning("Unable to seek file %s before upload - %s", file_name, e)
return elements

def _post_files_with_fallback(
self,
url: str,
files: dict,
file_name: str,
file_bytes: BytesIO,
is_pdf_image_heavy: bool,
) -> List[dict]:
# build default data payload
base_data = {
"chunking_strategy": "by_title",
"max_characters": self._max_chunk_size,
"combine_under_n_chars": self._min_chunk_size,
"overlap": self._overlap_chars,
"overlap_all": str(self._overlap_all_chunks).lower(),
"infer_table_structure": "true",
}

# detect if file is an image-heavy pdf
lower_name = file_name.lower()
is_pdf_image_heavy = False
if lower_name.endswith(".pdf"):
try:
is_pdf_image_heavy = _pdf_is_image_heavy(file_bytes)
except Exception:
is_pdf_image_heavy = False
candidate_payloads: list[dict] = []

logger.debug("file %s pdf_image_heavy=%s", file_name, is_pdf_image_heavy)
if not is_pdf_image_heavy:
candidate_payloads.append(
{
**base_data,
"strategy": "fast",
"infer_table_structure": "true",
}
)

candidate_data_payloads = []
candidate_payloads.append(
{
**base_data,
"strategy": "hi_res",
"hi_res_model_name": "yolox",
"pdf_image_dpi": 200,
"infer_table_structure": "true",
}
)

# try fast strategy first
if not is_pdf_image_heavy:
candidate_data_payloads.append({**base_data, "strategy": "fast"})
# then fallback 1 let unstructured pick the strategy
candidate_data_payloads.append({**base_data})
# then fallback 2 conservative chunking
candidate_data_payloads.append({**base_data, "infer_table_structure": "false"})
else:
candidate_payloads.append(
{
**base_data,
"strategy": "hi_res",
"hi_res_model_name": "yolox",
"pdf_image_dpi": 150,
"infer_table_structure": "false",
"languages": "eng",
}
)

last_exc = None
for attempt, data in enumerate(candidate_data_payloads, start=1):

for attempt, data in enumerate(candidate_payloads, start=1):
for retry in range(self.max_retries):
try:
file_bytes.seek(0)

logger.info(
"calling Unstructured API - attempt %d.%d for %s with payload keys=%s",
attempt,
retry + 1,
file_name,
list(data.keys()),
{k: data[k] for k in ("strategy", "pdf_image_dpi", "infer_table_structure") if k in data},
)
resp = requests.post(url, files=files, data=data, timeout=self.request_timeout)
status = resp.status_code
text = resp.text or ""
try:
json_body = resp.json()
except Exception:
json_body = None

if status == 200:
try:
elements = resp.json()
except Exception as parse_exc:
logger.exception("Failed parsing Unstructured JSON - %s", parse_exc)
try:
elements = json.loads(resp.text)
except Exception as fallback_exc:
raise ValueError("Failed to parse Unstructured JSON response") from fallback_exc

resp = requests.post(
url,
files=files,
data=data,
timeout=self.request_timeout,
)

if resp.status_code == 200:
elements = resp.json()
if not isinstance(elements, list):
logger.warning("Unstructured responded with unexpected payload type - %s", type(elements))
raise ValueError("Unexpected payload from Unstructured")
return elements

detail_msg = ""
if isinstance(json_body, dict):
detail_msg = json_body.get("detail", "") or json_body.get("error", "")
if "fast strategy" in text.lower() or "fast strategy" in str(detail_msg).lower():
logger.warning(
"Unstructured server reported fast strategy unavailable so trying fallback payloads"
)
last_exc = ValueError(f"Unstructured error - {resp.status_code} {resp.text}")
break

if 400 <= status < 500:
logger.error("Unstructured returned client error %d - %s", status, resp.text)
last_exc = ValueError(f"Client error {status} - {resp.text}")
break

if status >= 500:
logger.warning(
"Server error %d from Unstructured, will retry - response was: %s", status, resp.text[:200]
)
last_exc = RequestException(f"Server error {status}")
_time.sleep((2**retry) * 0.5)
continue

last_exc = ValueError(f"Unexpected status {status} - {resp.text}")
break
# if client errors don't retry
if 400 <= resp.status_code < 500:
raise ValueError(f"Unexpected status {resp.status_code} - {resp.text}")

except RequestException as re:
logger.warning("RequestException communicating with Unstructured - %s", re)
last_exc = re
# if server error retry
logger.warning(
"Unstructured error - %d, retrying (%d/%d)",
resp.status_code,
retry + 1,
self.max_retries,
)
_time.sleep((2**retry) * 0.5)
continue
else:
logger.debug("Exhausted retries for payload moving to next approach")
continue
continue

except RequestException as e:
last_exc = e
logger.warning("Request exception, retrying: %s", e)
_time.sleep((2**retry) * 0.5)

except Exception as e:
last_exc = e
logger.error("Unstructured failure: %s", e)
break

# if we're at this point everything failed
logger.exception("All Unstructured requests failed for file %s. Last exception: %s", file_name, last_exc)
raise last_exc or RuntimeError("Unstructured requests failed without a recorded exception")
raise last_exc or RuntimeError("All Unstructured requests failed")

def lazy_load(self, file_name: str, file_bytes: BytesIO) -> Iterator[Document]:
"""A lazy loader that reads a file line by line.
Expand Down
Loading