DAG: 6wsd21_cleaning

schedule: 0 7 * * *


Task Instance: getMongoDB


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 1, 'done': 1}, upstream_task_ids={'getDrowToken'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'failed'.
Task Instance State Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run.
Attribute: python_callable
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def getMongoDB(**context):
    token = context.get("ti").xcom_pull(key="token")
    response = requests.get(
    url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/62bb4cea0e814e2e260a9795?export_type=0",
    headers={
    "x-access-token": f"Bearer {token}",
    "ICWPxAccessKey": "nd@201907ICWP_[1AG:4UdI){n=b~"
    }
    )
    #print('got_data')
    RISC_Data = json.loads(response.text)
    Mapping= {
    # "f2_checked_by_supd_on_date" : "f2_checked_by_supd_on_date", 
    "Date of Inspection" : "a01a_inspection_date",
    # "D4 Submission Date" : "d4_submission_date",
    "Daily Cleaning or Weekly Cleaning?": "a2_daily_or_weekly"
    }
    #print('start transform')
    host                  = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'  

        # User name of the database server
    dbUserName            = 'dRowAdmin'  

    # Password for the database user
    dbUserPassword        = 'drowsuper'  

    # Name of the database 
    database              = 'drowDateWareHouse'

    # Character set
    charSet               = "utf8mb4"  

    port                  = "5432"

    # #cursor Type
    # cusrsorType            = pymysql.cursors.DictCursor



    #create_engine('mysql+mysqldb://root:password@localhost:3306/mydbname', echo = False)
    conn_string = ('postgres://' +
                           dbUserName + ':' + 
                           dbUserPassword +
                           '@' + host + ':' + port +
                           '/' + database)

    # df = context.get("ti").xcom_pull(key="InsertData")
    # print(df)
    # conn_string = 'postgres://user:password@host/data1'
    
    db = create_engine(conn_string)
    conn = db.connect()
    #print('db connected')
    with conn as conn:
        df = pd.DataFrame()
        for x in RISC_Data:
            #print(x)
            df_nested_list = json_normalize(x['data'])
            #print('process 1')
            # df2["a10_request_submission_date_time"] = request_date
            
            df2 = df_nested_list.reindex(columns=Mapping.keys())
            if len(x['ApproveLogSummary']) > 0:
                # request_date = pd.to_datetime(df2["C1 - Inspect on Date Time"]) - pd.Timedelta(days=1)
                request_data = [data for data in x['ApproveLogSummary'] if data.get('statusName')=="B : RSS check"]
                if len(request_data) > 0 and 'from' in request_data[-1]:
                    df2['f2_checked_by_supd_on_date'] = request_data[-1]['from']
                else:
                    df2['f2_checked_by_supd_on_date'] = None
                if len(request_data) > 0 and 'to' in request_data[-1]:
                    df2['d4_submission_date'] = request_data[-1]['to']
                else:
                    df2['d4_submission_date'] = None
            else:
                df2['f2_checked_by_supd_on_date'] = None
                df2['d4_submission_date'] = None
                
            df2["report_name"] = df2["Date of Inspection"].astype(str).str[:10] + df2["Daily Cleaning or Weekly Cleaning?"]
            if len([data for data in x['ApproveLogSummary'] if data.get('statusName')=="Z : END"])>0 or len([data for data in x['ApproveLogSummary'] if data.get('statusName')=="C : Contractor Acknowledge and Archive"])>0:
                df2['report_complete_or_incomplete'] = 'complete'
            else:
                df2['report_complete_or_incomplete'] = 'incomplete'
            if 'data' in x and isinstance(x['data'], dict):
                if x['data']["Daily Cleaning or Weekly Cleaning?"] == "Weekly Cleaning":
                    total_report = 0
                    total_x = 0
                    for item in x['data']['Weekly']:
                        for item_key in item:
                            total_report += 1
                            if item_key.endswith('Condition') and item[item_key] == 'Unsatisfactory':
                                total_x += 1
                    df2['nc_report_item'] = total_x
                    df2['total_report_item'] = total_report
                else: 
                    total_report = 0
                    total_x = 0
                    for item in x['data']['Daily']:
                        for item_key in item:
                            total_report += 1
                            if item_key.endswith('Condition') and item[item_key] == 'Unsatisfactory':
                                total_x += 1
                    df2['nc_report_item'] = total_x
                    df2['total_report_item'] = total_report

            if (not df2['f2_checked_by_supd_on_date'].isnull().bool() and not df2['Date of Inspection'].isnull().bool()):
                df2['complete_time_in_days'] = (((df2['f2_checked_by_supd_on_date'].astype('datetime64[ns]') - 
                df2['Date of Inspection'].astype('datetime64[ns]'))/ np.timedelta64(1, 'h'))/24).round(2)
                if df2['complete_time_in_days'].isnull().bool() or df2['complete_time_in_days'].lt(0).bool():
                    df2['complete_time_in_days'] = 0
            else:
                df2['complete_time_in_days'] = 0

            df2.rename(columns=Mapping, inplace=True)
            df = df.append(df2)
        df['a01a_inspection_date']=df['a01a_inspection_date'].apply(pd.to_datetime)
        df['d4_submission_date']=df['d4_submission_date'].apply(pd.to_datetime)
        df['f2_checked_by_supd_on_date']=df['f2_checked_by_supd_on_date'].apply(pd.to_datetime)
        df.to_sql('cleansing_6wsd21', con=conn, if_exists='replace', index= False)
Task Instance Attributes
Attribute Value
dag_id 6wsd21_cleaning
duration None
end_date 2025-05-18 07:06:44.792145+00:00
execution_date 2025-05-17T07:00:00+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7f6cbcea2320>
hostname
is_premature False
job_id None
key ('6wsd21_cleaning', 'getMongoDB', <Pendulum [2025-05-17T07:00:00+00:00]>, 1)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/6wsd21_cleaning/getMongoDB/2025-05-17T07:00:00+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-05-17T07%3A00%3A00%2B00%3A00&task_id=getMongoDB&dag_id=6wsd21_cleaning
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=getMongoDB&dag_id=6wsd21_cleaning&execution_date=2025-05-17T07%3A00%3A00%2B00%3A00&upstream=false&downstream=false
max_tries 1
metadata MetaData(bind=None)
next_try_number 1
operator None
pid None
pool default_pool
prev_attempted_tries 0
previous_execution_date_success 2025-05-16 07:00:00+00:00
previous_start_date_success 2025-05-17 07:02:52.519552+00:00
previous_ti <TaskInstance: 6wsd21_cleaning.getMongoDB 2025-05-16 07:00:00+00:00 [success]>
previous_ti_success <TaskInstance: 6wsd21_cleaning.getMongoDB 2025-05-16 07:00:00+00:00 [success]>
priority_weight 1
queue default
queued_dttm None
raw False
run_as_user None
start_date 2025-05-18 07:06:44.792133+00:00
state upstream_failed
task <Task(PythonOperator): getMongoDB>
task_id getMongoDB
test_mode False
try_number 1
unixname airflow
Task Attributes
Attribute Value
dag <DAG: 6wsd21_cleaning>
dag_id 6wsd21_cleaning
depends_on_past False
deps {<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>}
do_xcom_push True
downstream_list []
downstream_task_ids set()
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {'name': 'Dylan'}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner airflow
params {}
pool default_pool
priority_weight 1
priority_weight_total 1
provide_context True
queue default
resources None
retries 1
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 7 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-01-17T00:00:00+00:00
subdag None
task_concurrency None
task_id getMongoDB
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): getDrowToken>]
upstream_task_ids {'getDrowToken'}
wait_for_downstream False
weight_rule downstream