DAG: cv202303_cleaning

schedule: 0 15 * * *


Task Instance: getMongoDB


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 1, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'getDrowToken'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'failed'.
Task Instance State Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run.
Attribute: python_callable
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def getMongoDB(**context):
    token = context.get("ti").xcom_pull(key="token")
    response = requests.get(
    url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/66175ed0dac217ba1408964b?export_type=0",
    headers={
            "x-access-token": f"Bearer {token}",
            "ICWPxAccessKey": "nd@201907ICWP_[1AG:4UdI){n=b~"
        }
    )

    RISC_Data = json.loads(response.text)
    Mapping= {
        "Date of Inspection" : "a01a_inspection_date",
        # "SupD signature time": "f2_checked_by_supd_on_date",
    }
    
    db = create_engine(conn_string)
    conn = db.connect()
    with conn as conn:
        df = pd.DataFrame()
        for x in RISC_Data:
            df_nested_list = json_normalize(x['data'])

            df2 = df_nested_list.reindex(columns=Mapping.keys())
            df2.rename(columns=Mapping, inplace=True)

            df2["a2_daily_or_weekly"] = "daily"

            if len(x['ApproveLogSummary']) > 0:
                request_data = [data for data in x['ApproveLogSummary'] if data.get('statusName')=="B : RSS(IOW) Check & Sign"]
                if len(request_data) > 0 and 'from' in request_data[-1]:
                    df2['f2_checked_by_supd_on_date'] = request_data[-1]['from']
                else:
                    df2['f2_checked_by_supd_on_date'] = None
                if len(request_data) > 0 and 'to' in request_data[-1]:
                    df2['d4_submission_date'] = request_data[-1]['to']
                else:
                    df2['d4_submission_date'] = None
            else:
                df2['f2_checked_by_supd_on_date'] = None
                df2['d4_submission_date'] = None
                
            df2["report_name"] = df2["a01a_inspection_date"].astype(str).str[:10]
            if len([data for data in x['ApproveLogSummary'] if data.get('statusName')=="Z : END"])>0 :
                df2['report_complete_or_incomplete'] = 'complete'
            else:
                df2['report_complete_or_incomplete'] = 'incomplete'

            if 'data' in x and isinstance(x['data'], dict):
                for key in x['data']:
                    if key == 'Daily':
                        total_report = 0
                        total_x = 0
                        for item in x['data'][key]:
                            nc_report = False
                            for item_key in item:
                                if item[item_key] == 'UA':
                                    nc_report = True
                                    break
                            if nc_report:
                                total_x += 1
                            total_report += 1
                        df2['nc_report_item'] = total_x
                        df2['total_report_item'] = total_report

            if (not df2['f2_checked_by_supd_on_date'].isnull().bool() and not df2['a01a_inspection_date'].isnull().bool()):
                df2['complete_time_in_days'] = (((df2['f2_checked_by_supd_on_date'].astype('datetime64[ns]') - 
                df2['a01a_inspection_date'].astype('datetime64[ns]'))/ np.timedelta64(1, 'h'))/24).round(2)
                if df2['complete_time_in_days'].isnull().bool() or df2['complete_time_in_days'].lt(0).bool():
                    df2['complete_time_in_days'] = 0
            else:
                df2['complete_time_in_days'] = 0

            df = df.append(df2)
        df['a01a_inspection_date']=df['a01a_inspection_date'].apply(pd.to_datetime)
        df['d4_submission_date']=df['d4_submission_date'].apply(pd.to_datetime)
        df['f2_checked_by_supd_on_date']=df['f2_checked_by_supd_on_date'].apply(pd.to_datetime)
        df.to_sql('cleansing_cv202303', con=conn, if_exists='replace', index= False)
Task Instance Attributes
Attribute Value
dag_id cv202303_cleaning
duration None
end_date 2024-11-15 15:09:45.445117+00:00
execution_date 2024-11-14T15:00:00+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7f6cbcea2320>
hostname
is_premature False
job_id None
key ('cv202303_cleaning', 'getMongoDB', <Pendulum [2024-11-14T15:00:00+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/cv202303_cleaning/getMongoDB/2024-11-14T15:00:00+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2024-11-14T15%3A00%3A00%2B00%3A00&task_id=getMongoDB&dag_id=cv202303_cleaning
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=getMongoDB&dag_id=cv202303_cleaning&execution_date=2024-11-14T15%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 1
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool default_pool
prev_attempted_tries 0
previous_execution_date_success 2024-11-13 15:00:00+00:00
previous_start_date_success 2024-11-14 15:01:40.600516+00:00
previous_ti <TaskInstance: cv202303_cleaning.getMongoDB 2024-11-13 15:00:00+00:00 [success]>
previous_ti_success <TaskInstance: cv202303_cleaning.getMongoDB 2024-11-13 15:00:00+00:00 [success]>
priority_weight 2
queue default
queued_dttm None
raw False
run_as_user None
start_date 2024-11-15 15:09:45.445097+00:00
state upstream_failed
task <Task(PythonOperator): getMongoDB>
task_id getMongoDB
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: cv202303_cleaning>
dag_id cv202303_cleaning
depends_on_past False
deps {<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>}
do_xcom_push True
downstream_list [<Task(PythonOperator): getMongoDB2>]
downstream_task_ids {'getMongoDB2'}
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'name': 'Dylan'}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner airflow
params {}
pool default_pool
priority_weight 1
priority_weight_total 2
provide_context True
queue default
resources None
retries 1
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 15 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-01-17T00:00:00+00:00
subdag None
task_concurrency None
task_id getMongoDB
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): getDrowToken>]
upstream_task_ids {'getDrowToken'}
wait_for_downstream False
weight_rule downstream