Simple LLM scenario example based on the Airflow survey data#65172
Simple LLM scenario example based on the Airflow survey data#65172vikramkoka wants to merge 9 commits intomainfrom
Conversation
Here is a simple example for the common.ai provider based on public data which happens to be the Airflow 2025 Survey data. The goal is to demonstrate an interactive LLM use case which can be used to try out exploratory data analysis and then a scheduled LLM use case which would be more representative of expected usage. Both of these can be used by the developer as starting points, where the data extraction and LLM prompts can be replaced with other integrations pulling other data sets.
This example demonstrated how to answer a more complex question based on survey data which requires an LLM synthesis across multiple queries on the survey data.
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
Outdated
Show resolved
Hide resolved
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
Show resolved
Hide resolved
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
Outdated
Show resolved
Hide resolved
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
Show resolved
Hide resolved
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py
Outdated
Show resolved
Hide resolved
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
Show resolved
Hide resolved
providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Adds new common.ai provider example DAGs showcasing LLM-driven analysis of the Apache Airflow 2025 survey CSV, demonstrating interactive (HITL) and scheduled/agentic (mapped fan-out/fan-in) patterns.
Changes:
- Introduces an interactive + scheduled survey analysis example using LLM-to-SQL generation and DataFusion analytics execution.
- Introduces an “agentic” example using Dynamic Task Mapping to decompose a question into multiple SQL queries and synthesize results.
- Extends the docs spelling wordlist with terms used by the new examples.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 8 comments.
| File | Description |
|---|---|
| providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py | New interactive + scheduled survey analysis example DAGs. |
| providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py | New mapped multi-query (“agentic”) survey analysis example DAG. |
| docs/spelling_wordlist.txt | Adds “auditable” and “retryable” to satisfy spellcheck. |
| def prepare_csv(csv_text: str) -> None: | ||
| os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True) | ||
| with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f: |
| ) | ||
| else: | ||
| print(f"Survey analysis result:\n{data}") |
There was a problem hiding this comment.
The data comes from a CSV the user placed on disk, not from untrusted input. HTML-escaping survey results inside a <pre> block is over-engineering for an example DAG.
| os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True) | ||
| with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f: | ||
| f.write(csv_text) | ||
|
|
||
| # Write a single-row reference CSV from the schema context so | ||
| # LLMSchemaCompareOperator has a structured baseline to compare against. | ||
| os.makedirs(os.path.dirname(REFERENCE_CSV_PATH), exist_ok=True) |
There was a problem hiding this comment.
Same as above -- the default always has a directory. Not worth guarding in an example DAG.
| from airflow.providers.common.sql.config import DataSourceConfig | ||
| from airflow.providers.common.sql.operators.analytics import AnalyticsOperator | ||
| from airflow.providers.standard.operators.hitl import ApprovalOperator | ||
|
|
There was a problem hiding this comment.
common.sql is a declared dependency of common.ai. Installing common.ai always brings common.sql with it -- no guard needed.
| # ------------------------------------------------------------------ | ||
| @task | ||
| def extract_data(raw: str) -> str: | ||
| results = json.loads(raw) | ||
| data = [row for item in results for row in item["data"]] |
There was a problem hiding this comment.
Example DAGs should be self-contained and copy-pasteable. Extracting a shared helper makes them harder to use as starting points. The duplication is intentional.
|
|
||
| from airflow.providers.common.ai.operators.llm_schema_compare import LLMSchemaCompareOperator | ||
| from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator | ||
| from airflow.providers.common.compat.sdk import dag, task | ||
| from airflow.providers.common.sql.config import DataSourceConfig | ||
| from airflow.providers.common.sql.operators.analytics import AnalyticsOperator | ||
| from airflow.providers.http.operators.http import HttpOperator | ||
| from airflow.providers.standard.operators.hitl import ApprovalOperator, HITLEntryOperator | ||
| from airflow.sdk import Param | ||
|
|
There was a problem hiding this comment.
common.sql is a dependency of common.ai, so it's always available. The HttpOperator try/except guard was intentionally removed -- example DAGs should fail clearly on missing deps rather than silently hiding functionality.
|
|
||
|
|
||
| # [START example_llm_survey_scheduled] | ||
| @dag(schedule="@monthly", start_date=datetime.datetime(2025, 1, 1)) |
| os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True) | ||
| with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f: | ||
| f.write(csv_text) | ||
|
|
||
| # Write a single-row reference CSV from the schema context so | ||
| # LLMSchemaCompareOperator has a structured baseline to compare against. | ||
| os.makedirs(os.path.dirname(REFERENCE_CSV_PATH), exist_ok=True) |
There was a problem hiding this comment.
The default path is /opt/airflow/data/airflow-user-survey-2025.csv which always has a directory component. Setting it to a bare filename is a config error -- no need to guard against it in example code.
Here is a simple example for the common.ai provider based on public data which happens to be the Airflow 2025 Survey data.
The goal is to demonstrate an interactive LLM use case which can be used to try out exploratory data analysis and then a scheduled LLM use case which would be more representative of expected usage.
Both of these can be used by the developer as starting points, where the data extraction and LLM prompts can be replaced with other integrations pulling other data sets.
Was generative AI tooling used to co-author this PR?
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.