DAG: hy202308_itp_dashboard

schedule: 0 7,15 * * *


Task Instance: getInspectionData


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 1, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'getDrowToken'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'failed'.
Task Instance State Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run.
Attribute: python_callable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def getInspectionData(**context):
    token = context.get("ti").xcom_pull(key="token")

    # mongo_conn = BaseHook.get_connection("mongo_conn")
    # mongo_uri = f"mongodb://{mongo_conn.host}:{mongo_conn.port}"
    # mongo_client = MongoClient(mongo_uri)
    # mongo_db = mongo_client[mongo_conn.schema]
    # workflow_collection = mongo_db['workflows']
    # workflow_records = mongo_db['workflowrecords']

    itp_response = requests.get(
        url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/6780da3a19a6ea5c197f5e8a?export_type=0",
        headers={
            "x-access-token": f"Bearer {token}",
            "ICWPxAccessKey": "5WSD21ICWP_[1AG:4UdI){n=b~"
        }
    )
    ITP_Data = json.loads(itp_response.text)
    risc_response = requests.get(
        url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/6732d16bf0fd5b8f573d79ef?export_type=0",
        headers={
            "x-access-token": f"Bearer {token}",
            "ICWPxAccessKey": "5WSD21ICWP_[1AG:4UdI){n=b~"
        }
    )
    RISC_Data = json.loads(risc_response.text)

    conn_string = getdrowPSQLConnectionString()
    db = create_engine(conn_string)
    conn = db.connect()

    with conn as conn:
        itp_df = pd.DataFrame()
        for x in ITP_Data:
            print("Data ITP:", x)
            df_nested_list = json_normalize(x['data'])
            df_nested_list['ITP ID'] = x['_id']
            itp_df = itp_df.append(df_nested_list)
        # print("ITP Dataframe:", itp_df.columns)

        df = pd.DataFrame()
        for x in RISC_Data:
            print("Data RISC:", x)
            df_nested_list = json_normalize(x['data'])
            df = df.append(df_nested_list)
Task Instance Attributes
Attribute Value
dag_id hy202308_itp_dashboard
duration None
end_date 2025-06-06 08:03:56.154074+00:00
execution_date 2025-06-06T07:57:20.070700+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7f6cbcea2320>
hostname
is_premature False
job_id None
key ('hy202308_itp_dashboard', 'getInspectionData', <Pendulum [2025-06-06T07:57:20.070700+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/hy202308_itp_dashboard/getInspectionData/2025-06-06T07:57:20.070700+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-06-06T07%3A57%3A20.070700%2B00%3A00&task_id=getInspectionData&dag_id=hy202308_itp_dashboard
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=getInspectionData&dag_id=hy202308_itp_dashboard&execution_date=2025-06-06T07%3A57%3A20.070700%2B00%3A00&upstream=false&downstream=false
max_tries 1
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool default_pool
prev_attempted_tries 0
previous_execution_date_success 2025-06-06 07:00:00+00:00
previous_start_date_success 2025-06-06 15:03:06.646702+00:00
previous_ti <TaskInstance: hy202308_itp_dashboard.getInspectionData 2025-06-06 07:00:00+00:00 [success]>
previous_ti_success <TaskInstance: hy202308_itp_dashboard.getInspectionData 2025-06-06 07:00:00+00:00 [success]>
priority_weight 1
queue default
queued_dttm None
raw False
run_as_user None
start_date 2025-06-06 08:03:56.154055+00:00
state upstream_failed
task <Task(PythonOperator): getInspectionData>
task_id getInspectionData
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: hy202308_itp_dashboard>
dag_id hy202308_itp_dashboard
depends_on_past False
deps {<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>}
do_xcom_push True
downstream_list []
downstream_task_ids set()
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'name': 'Dylan'}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner airflow
params {}
pool default_pool
priority_weight 1
priority_weight_total 1
provide_context True
queue default
resources None
retries 1
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 7,15 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-01-17T00:00:00+00:00
subdag None
task_concurrency None
task_id getInspectionData
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): getDrowToken>]
upstream_task_ids {'getDrowToken'}
wait_for_downstream False
weight_rule downstream