How can I use the xcom return_value from one task in a downstream task using SQLExecuteQueryOperator

  Kiến thức lập trình

I am trying to dynamically generate a SQL statement with the SQLExecuteQueryOperator based on a previous task’s xcom value. The problem is that the xcom value is not being inserted into the SQL Statement. Instead, Airflow is attempting to execute the query select * from table where value = {{ti.xcom_pull(task_ids='return_bq_value')}} which obviously fails. Is it possible using this operator to have the xcom value render in the query so that select * from table where value = 1 is executed?

Note: The BashOperator is just to make sure the value is correct, and it does indeed render 1.

with DAG(
    "xcomtest",
    default_args=default_args,
    template_searchpath=[sql_dir],
) as dag:

    @task 
    def return_bq_value():
        hook = BigQueryHook(gcp_conn_id="gcs_default",
                            use_legacy_sql=False,
                            location='US'
                            )
        result = hook.get_records(sql="""SELECT 1""")
        return result

    echo_bq_value = BashOperator(
        task_id="echo_bq_value",
        bash_command="echo '{{ ti.xcom_pull(task_ids='return_bq_value') }}'",
    )

    execute_query = SQLExecuteQueryOperator(
        task_id="execute_query",
        conn_id="default_conn",
        sql="select * from table where value = {params.value}",
        params = {
            "value": "{{ti.xcom_pull(task_ids='return_bq_value')}}",
        },
    )

    return_bq_value() >> echo_bq_value >> execute_query

New contributor

user25419094 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

LEAVE A COMMENT