diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index c8981a00bd83f..9d1e84f7eb793 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -113,6 +113,7 @@ attr attrs au auditability +auditable Auth auth authenticator @@ -1355,6 +1356,7 @@ responder reStructuredText resultset resumable +retryable Reusability rfc rmse diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py new file mode 100644 index 0000000000000..88f2113eada99 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_agentic.py @@ -0,0 +1,271 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Multi-query synthesis -- an agentic survey analysis pattern. + +Demonstrates how Dynamic Task Mapping turns a multi-dimensional research +question into a fan-out / fan-in pipeline that is observable, retryable, +and auditable at each step. + +**Question:** *"What does a typical Airflow deployment look like for +practitioners who actively use AI tools in their workflow?"* + +This question cannot be answered with a single SQL query. It requires +querying four independent dimensions -- executor type, deployment method, +cloud provider, and Airflow version -- all filtered to respondents who use +AI tools to write Airflow code. The results are then synthesized by a +second LLM call into a single narrative characterization. + +``example_llm_survey_agentic`` (manual trigger): + +.. code-block:: text + + decompose_question (@task) + → generate_sql (LLMSQLQueryOperator, mapped ×4) + → wrap_query (@task, mapped ×4) + → run_query (AnalyticsOperator, mapped ×4) + → collect_results (@task) + → synthesize_answer (LLMOperator) + → result_confirmation (ApprovalOperator) + +**What this makes visible that an agent harness hides:** + +* Each sub-query is a named, logged task instance -- not a hidden tool call. +* If the cloud-provider query fails, only that mapped instance retries; + the other three results are preserved in XCom. +* The synthesis step's inputs are fully auditable XCom values -- not an + opaque continuation of an LLM reasoning loop. + +Before running: + +1. Create an LLM connection named ``pydanticai_default`` (or the value of + ``LLM_CONN_ID``) for your chosen model provider. +2. Place the cleaned survey CSV at the path set by ``SURVEY_CSV_PATH``. +""" + +from __future__ import annotations + +import datetime +import json +import os + +from airflow.providers.common.ai.operators.llm import LLMOperator +from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator +from airflow.providers.common.compat.sdk import dag, task +from airflow.providers.common.sql.config import DataSourceConfig +from airflow.providers.common.sql.operators.analytics import AnalyticsOperator +from airflow.providers.standard.operators.hitl import ApprovalOperator + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +LLM_CONN_ID = "pydanticai_default" + +SURVEY_CSV_PATH = os.environ.get( + "SURVEY_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025.csv", +) +SURVEY_CSV_URI = f"file://{SURVEY_CSV_PATH}" + +# Schema context for LLMSQLQueryOperator. +# All column names must be quoted in SQL because they contain spaces and punctuation. +SURVEY_SCHEMA = """ +Table: survey +Key columns (quote all names in SQL): + "How important is Airflow to your business?" TEXT + "Which version of Airflow do you currently use?" TEXT + "CeleryExecutor" TEXT + "KubernetesExecutor" TEXT + "LocalExecutor" TEXT + "How do you deploy Airflow?" TEXT + "What best describes your current occupation?" TEXT + "What industry do you currently work in?" TEXT + "How many years of experience do you have with Airflow?" TEXT + "Which of the following is your company's primary cloud provider for Airflow?" TEXT + "How many people work at your company?" TEXT + "How many people at your company directly work on data?" TEXT + "How many people at your company use Airflow?" TEXT + "How likely are you to recommend Apache Airflow?" TEXT + "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" TEXT +""" + +survey_datasource = DataSourceConfig( + conn_id="", # No connection needed for local file-based sources + table_name="survey", + uri=SURVEY_CSV_URI, + format="csv", +) + +# Dimension labels -- order must match the sub-questions returned by decompose_question. +DIMENSION_KEYS = ["executor", "deployment", "cloud", "airflow_version"] + +SQL_SYSTEM_PROMPT = """\ +You are a SQL analysis agent working on the table "survey". +Always quote all table and column names with double quotes. + +For the AI usage filter column +"Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?": +- Treat affirmative free text (yes, sometimes, occasionally, rarely, often, regularly) as AI users. +- Treat explicit negatives (no, never) as non-users. +- Exclude blank, NULL, and ambiguous responses from the filtered set.""" + +SYNTHESIS_SYSTEM_PROMPT = """\ +You are a data analyst summarizing survey results about Apache Airflow practitioners. +Write in plain, concise language suitable for a technical audience. +Focus on patterns and proportions rather than raw counts.""" + + +# --------------------------------------------------------------------------- +# DAG: Agentic multi-query synthesis +# --------------------------------------------------------------------------- + + +# [START example_llm_survey_agentic] +@dag +def example_llm_survey_agentic(): + """ + Fan-out across four survey dimensions, then synthesize into a single narrative. + + Task graph:: + + decompose_question (@task) + → generate_sql (LLMSQLQueryOperator ×4, via Dynamic Task Mapping) + → wrap_query (@task ×4) + → run_query (AnalyticsOperator ×4, via Dynamic Task Mapping) + → collect_results (@task) + → synthesize_answer (LLMOperator) + → result_confirmation (ApprovalOperator) + """ + + # ------------------------------------------------------------------ + # Step 1: Decompose the high-level question into sub-questions, + # one per dimension. Each string becomes one mapped task instance + # in the next step. + # ------------------------------------------------------------------ + @task + def decompose_question() -> list[str]: + return [ + """\ +Among respondents who use AI/LLM tools to write Airflow code, +what executor types (CeleryExecutor, KubernetesExecutor, LocalExecutor) +are most commonly enabled? Count an executor as enabled only if the +column value is clearly affirmative. Treat blank, NULL, and negative +values as not enabled. Return the count per executor type.""", + """\ +Among respondents who use AI/LLM tools to write Airflow code, +how do they deploy Airflow? Return the count per deployment method.""", + """\ +Among respondents who use AI/LLM tools to write Airflow code, +which cloud providers are most commonly used for Airflow? +Return the count per cloud provider.""", + """\ +Among respondents who use AI/LLM tools to write Airflow code, +what version of Airflow are they currently running? +Return the count per version.""", + ] + + sub_questions = decompose_question() + + # ------------------------------------------------------------------ + # Step 2: Generate SQL for each sub-question in parallel. + # LLMSQLQueryOperator is expanded over the sub-question list -- + # four mapped instances, each translating one natural-language + # question into validated SQL. + # ------------------------------------------------------------------ + generate_sql = LLMSQLQueryOperator.partial( + task_id="generate_sql", + llm_conn_id=LLM_CONN_ID, + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, + system_prompt=SQL_SYSTEM_PROMPT, + ).expand(prompt=sub_questions) + + # ------------------------------------------------------------------ + # Step 3: Wrap each SQL string into a single-element list. + # AnalyticsOperator expects queries: list[str]; this step bridges + # the scalar output of LLMSQLQueryOperator to that interface. + # ------------------------------------------------------------------ + @task + def wrap_query(sql: str) -> list[str]: + return [sql] + + wrapped_queries = wrap_query.expand(sql=generate_sql.output) + + # ------------------------------------------------------------------ + # Step 4: Execute each SQL against the survey CSV via DataFusion. + # Four mapped instances run in parallel. If one fails, only that + # instance retries -- the other three hold their XCom results. + # ------------------------------------------------------------------ + run_query = AnalyticsOperator.partial( + task_id="run_query", + datasource_configs=[survey_datasource], + result_output_format="json", + ).expand(queries=wrapped_queries) + + # ------------------------------------------------------------------ + # Step 5: Collect all four JSON results and label them by dimension. + # The default trigger rule (all_success) ensures synthesis only runs + # when the complete picture is available. + # ------------------------------------------------------------------ + @task + def collect_results(results: list[str]) -> dict: + # Airflow preserves index order for mapped task outputs, so zip is safe here: + # results[i] corresponds to the mapped instance at index i, which matches + # the sub-question at DIMENSION_KEYS[i]. + labeled: dict[str, list] = {} + for key, raw in zip(DIMENSION_KEYS, results): + items = json.loads(raw) + data = [row for item in items for row in item["data"]] + labeled[key] = data + return labeled + + collected = collect_results(run_query.output) + + # ------------------------------------------------------------------ + # Step 6: Synthesize the four labeled result sets into a narrative. + # This is the second LLM call -- the first four generated SQL, + # this one interprets the results. Inputs are fully visible in XCom. + # ------------------------------------------------------------------ + synthesize_answer = LLMOperator( + task_id="synthesize_answer", + llm_conn_id=LLM_CONN_ID, + system_prompt=SYNTHESIS_SYSTEM_PROMPT, + prompt="""\ +Given these four independent survey query results about practitioners +who use AI tools to write Airflow code, write a 2-3 sentence +characterization of what a typical Airflow deployment looks like for +this group. + +Results: {{ ti.xcom_pull(task_ids='collect_results') }}""", + ) + collected >> synthesize_answer + + # ------------------------------------------------------------------ + # Step 7: Human reviews the synthesized narrative before the DAG ends. + # ------------------------------------------------------------------ + result_confirmation = ApprovalOperator( # noqa: F841 + task_id="result_confirmation", + subject="Review the synthesized survey analysis", + body=synthesize_answer.output, + response_timeout=datetime.timedelta(hours=1), + ) + + +# [END example_llm_survey_agentic] + +example_llm_survey_agentic() diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py new file mode 100644 index 0000000000000..c4c2af93aefd1 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_llm_survey_analysis.py @@ -0,0 +1,369 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Natural language analysis of a survey CSV -- interactive and scheduled variants. + +Both DAGs query the `Airflow Community Survey 2025 +`__ CSV using +:class:`~airflow.providers.common.ai.operators.llm_sql.LLMSQLQueryOperator` +and :class:`~airflow.providers.common.sql.operators.analytics.AnalyticsOperator`. + +**example_llm_survey_interactive** (five tasks, manual trigger) adds +human-in-the-loop review at both ends of the pipeline: HITLEntryOperator, +LLMSQLQueryOperator, AnalyticsOperator, a ``@task`` extraction step, and +ApprovalOperator. + +**example_llm_survey_scheduled** (seven tasks, runs monthly) downloads the CSV, +validates its schema, generates and executes SQL, then emails or logs the result. +No human review steps -- suitable for recurring reporting or dashboards. + +Before running either DAG: + +1. Create an LLM connection named ``pydanticai_default`` (or the value of + ``LLM_CONN_ID`` below) for your chosen model provider. +2. Place the survey CSV at the path set by the ``SURVEY_CSV_PATH`` + environment variable, or update ``SURVEY_CSV_PATH`` below. + A cleaned copy of the 2025 survey CSV (duplicate columns renamed, embedded + newlines removed) is required -- Apache DataFusion is strict about these. +""" + +from __future__ import annotations + +import csv as csv_mod +import datetime +import json +import os + +from airflow.providers.common.ai.operators.llm_schema_compare import LLMSchemaCompareOperator +from airflow.providers.common.ai.operators.llm_sql import LLMSQLQueryOperator +from airflow.providers.common.compat.sdk import dag, task +from airflow.providers.common.sql.config import DataSourceConfig +from airflow.providers.common.sql.operators.analytics import AnalyticsOperator +from airflow.providers.http.operators.http import HttpOperator +from airflow.providers.standard.operators.hitl import ApprovalOperator, HITLEntryOperator +from airflow.sdk import Param + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +# LLM provider connection (OpenAI, Anthropic, Vertex AI, etc.) +LLM_CONN_ID = "pydanticai_default" + +# HTTP connection pointing at https://airflow.apache.org (scheduled DAG only). +# Create a connection with host=https://airflow.apache.org, no auth required. +AIRFLOW_WEBSITE_CONN_ID = "airflow_website" + +# Endpoint path for the survey CSV download, relative to the HTTP connection base URL. +SURVEY_CSV_ENDPOINT = "/survey/airflow-user-survey-2025.csv" + +# Path to the survey CSV. Set the SURVEY_CSV_PATH environment variable to +# override -- no code change needed when moving between environments. +SURVEY_CSV_PATH = os.environ.get( + "SURVEY_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025.csv", +) +SURVEY_CSV_URI = f"file://{SURVEY_CSV_PATH}" + +# Path where the reference schema CSV is written at runtime (scheduled DAG only). +REFERENCE_CSV_PATH = os.environ.get( + "REFERENCE_CSV_PATH", + "/opt/airflow/data/airflow-user-survey-2025-reference.csv", +) +REFERENCE_CSV_URI = f"file://{REFERENCE_CSV_PATH}" + +# SMTP connection for the result notification step (scheduled DAG only). +# Set to None to skip email and log the result instead. +SMTP_CONN_ID = os.environ.get("SMTP_CONN_ID", None) +NOTIFY_EMAIL = os.environ.get("NOTIFY_EMAIL", None) + +# Default question for the interactive DAG -- the human can edit it in the first HITL step. +INTERACTIVE_PROMPT = ( + "How does AI tool usage for writing Airflow code compare between Airflow 3 users and Airflow 2 users?" +) + +# Fixed question for the scheduled DAG -- runs unattended on every trigger. +SCHEDULED_PROMPT = "What is the breakdown of respondents by Airflow version currently in use?" + +# Schema context for LLMSQLQueryOperator. +# Lists the analytically relevant columns from the 2025 survey CSV (168 total). +# All column names must be quoted in SQL because they contain spaces and +# punctuation. +SURVEY_SCHEMA = """ +Table: survey +Key columns (quote all names in SQL): + "How important is Airflow to your business?" TEXT + "Which version of Airflow do you currently use?" TEXT + "CeleryExecutor" TEXT + "KubernetesExecutor" TEXT + "LocalExecutor" TEXT + "How do you deploy Airflow?" TEXT + "What best describes your current occupation?" TEXT + "What industry do you currently work in?" TEXT + "What city do you currently reside in?" TEXT + "How many years of experience do you have with Airflow?" TEXT + "Which of the following is your company's primary cloud provider for Airflow?" TEXT + "How many people work at your company?" TEXT + "How many people at your company directly work on data?" TEXT + "How many people at your company use Airflow?" TEXT + "How likely are you to recommend Apache Airflow?" TEXT + "Are you using AI/LLM (ChatGPT/Cursor/Claude etc) to assist you in writing Airflow code?" TEXT +""" + +survey_datasource = DataSourceConfig( + conn_id="", # No connection needed for local file-based sources + table_name="survey", + uri=SURVEY_CSV_URI, + format="csv", +) + +reference_datasource = DataSourceConfig( + conn_id="", # No connection needed for local file-based sources + table_name="survey_reference", + uri=REFERENCE_CSV_URI, + format="csv", +) + + +# --------------------------------------------------------------------------- +# DAG 1: Interactive survey question example +# --------------------------------------------------------------------------- + + +# [START example_llm_survey_interactive] +@dag +def example_llm_survey_interactive(): + """ + Ask a natural language question about the survey with human review at each end. + + Task graph:: + + prompt_confirmation (HITLEntryOperator) + → generate_sql (LLMSQLQueryOperator) + → run_query (AnalyticsOperator) + → extract_data (@task) + → result_confirmation (ApprovalOperator) + + The first HITL step lets the analyst review and optionally reword the + question before it reaches the LLM. The final HITL step presents the + query result for approval or rejection. + """ + + # ------------------------------------------------------------------ + # Step 1: Prompt confirmation -- review or edit the question. + # ------------------------------------------------------------------ + prompt_confirmation = HITLEntryOperator( + task_id="prompt_confirmation", + subject="Review the survey analysis question", + params={ + "prompt": Param( + INTERACTIVE_PROMPT, + type="string", + description="The natural language question to answer via SQL", + ) + }, + response_timeout=datetime.timedelta(hours=1), + ) + + # ------------------------------------------------------------------ + # Step 2: SQL generation -- LLM translates the confirmed question. + # ------------------------------------------------------------------ + generate_sql = LLMSQLQueryOperator( + task_id="generate_sql", + prompt="{{ ti.xcom_pull(task_ids='prompt_confirmation')['params_input']['prompt'] }}", + llm_conn_id=LLM_CONN_ID, + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, + ) + + # ------------------------------------------------------------------ + # Step 3: SQL execution via Apache DataFusion. + # ------------------------------------------------------------------ + run_query = AnalyticsOperator( + task_id="run_query", + datasource_configs=[survey_datasource], + queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"], + result_output_format="json", + ) + + # ------------------------------------------------------------------ + # Step 4: Extract data rows from the JSON result. + # AnalyticsOperator returns [{"query": "...", "data": [...]}, ...] + # This step strips the query field so only the rows reach the reviewer. + # ------------------------------------------------------------------ + @task + def extract_data(raw: str) -> str: + results = json.loads(raw) + data = [row for item in results for row in item["data"]] + return json.dumps(data, indent=2) + + result_data = extract_data(run_query.output) + + # ------------------------------------------------------------------ + # Step 5: Result confirmation -- approve or reject the query result. + # ------------------------------------------------------------------ + result_confirmation = ApprovalOperator( # noqa: F841 + task_id="result_confirmation", + subject="Review the survey query result", + body=result_data, + response_timeout=datetime.timedelta(hours=1), + ) + + prompt_confirmation >> generate_sql >> run_query + + +# [END example_llm_survey_interactive] + +example_llm_survey_interactive() + + +# --------------------------------------------------------------------------- +# DAG 2: Scheduled survey question example +# --------------------------------------------------------------------------- + + +# [START example_llm_survey_scheduled] +@dag(schedule="@monthly", start_date=datetime.datetime(2025, 1, 1), catchup=False) +def example_llm_survey_scheduled(): + """ + Download, validate, query, and report on the survey CSV on a schedule. + + Task graph:: + + download_survey (HttpOperator) + → prepare_csv (@task) + → check_schema (LLMSchemaCompareOperator) + → generate_sql (LLMSQLQueryOperator) + → run_query (AnalyticsOperator) + → extract_data (@task) + → send_result (@task) + + No human review steps -- suitable for recurring reporting or dashboards. + Change ``schedule`` to any cron expression or Airflow timetable to adjust + the run frequency. + + Prerequisites: + + - HTTP connection ``airflow_website`` pointing at ``https://airflow.apache.org``. + - Set ``SMTP_CONN_ID`` and ``NOTIFY_EMAIL`` environment variables to enable + email delivery of results; otherwise results are logged to the task log. + """ + # ------------------------------------------------------------------ + # Step 1: Download the survey CSV from the Airflow website. + # ------------------------------------------------------------------ + download_survey = HttpOperator( + task_id="download_survey", + http_conn_id=AIRFLOW_WEBSITE_CONN_ID, + endpoint=SURVEY_CSV_ENDPOINT, + method="GET", + response_filter=lambda r: r.text, + log_response=False, + ) + + # ------------------------------------------------------------------ + # Step 2: Write the downloaded CSV to disk and generate a reference + # schema file for the schema comparison step. + # ------------------------------------------------------------------ + @task + def prepare_csv(csv_text: str) -> None: + os.makedirs(os.path.dirname(SURVEY_CSV_PATH), exist_ok=True) + with open(SURVEY_CSV_PATH, "w", encoding="utf-8") as f: + f.write(csv_text) + + # Write a single-row reference CSV from the schema context so + # LLMSchemaCompareOperator has a structured baseline to compare against. + os.makedirs(os.path.dirname(REFERENCE_CSV_PATH), exist_ok=True) + columns = [line.split('"')[1] for line in SURVEY_SCHEMA.strip().splitlines() if '"' in line] + with open(REFERENCE_CSV_PATH, "w", newline="", encoding="utf-8") as ref: + csv_mod.writer(ref).writerow(columns) + + csv_ready = prepare_csv(download_survey.output) + + # ------------------------------------------------------------------ + # Step 3: Validate the downloaded CSV schema against the reference. + # Raises if critical columns are missing or renamed. + # ------------------------------------------------------------------ + check_schema = LLMSchemaCompareOperator( + task_id="check_schema", + prompt="""\ +Compare the survey CSV schema against the reference schema. +Flag any missing or renamed columns that would break the downstream SQL queries.""", + llm_conn_id=LLM_CONN_ID, + data_sources=[survey_datasource, reference_datasource], + context_strategy="basic", + ) + csv_ready >> check_schema + + # ------------------------------------------------------------------ + # Step 4: SQL generation -- LLM translates the fixed question. + # ------------------------------------------------------------------ + generate_sql = LLMSQLQueryOperator( + task_id="generate_sql", + prompt=SCHEDULED_PROMPT, + llm_conn_id=LLM_CONN_ID, + datasource_config=survey_datasource, + schema_context=SURVEY_SCHEMA, + ) + check_schema >> generate_sql + + # ------------------------------------------------------------------ + # Step 5: SQL execution via Apache DataFusion. + # ------------------------------------------------------------------ + run_query = AnalyticsOperator( + task_id="run_query", + datasource_configs=[survey_datasource], + queries=["{{ ti.xcom_pull(task_ids='generate_sql') }}"], + result_output_format="json", + ) + + # ------------------------------------------------------------------ + # Step 6: Extract data rows from the JSON result. + # AnalyticsOperator returns [{"query": "...", "data": [...]}, ...] + # ------------------------------------------------------------------ + @task + def extract_data(raw: str) -> str: + results = json.loads(raw) + data = [row for item in results for row in item["data"]] + return json.dumps(data, indent=2) + + result_data = extract_data(run_query.output) + + # ------------------------------------------------------------------ + # Step 7: Send result via email if SMTP is configured, otherwise log. + # Set the SMTP_CONN_ID and NOTIFY_EMAIL environment variables to enable + # email delivery. + # ------------------------------------------------------------------ + @task + def send_result(data: str) -> None: + if SMTP_CONN_ID and NOTIFY_EMAIL: + from airflow.providers.smtp.hooks.smtp import SmtpHook + + with SmtpHook(smtp_conn_id=SMTP_CONN_ID) as hook: + hook.send_email_smtp( + to=NOTIFY_EMAIL, + subject=f"Airflow Survey Analysis: {SCHEDULED_PROMPT}", + html_content=f"
{data}
", + ) + else: + print(f"Survey analysis result:\n{data}") + + generate_sql >> run_query >> result_data >> send_result(result_data) + + +# [END example_llm_survey_scheduled] + +example_llm_survey_scheduled()