Dependency | Reason |
---|---|
Dagrun Running | Task instance's dagrun was not in the 'running' state but in the state 'failed'. |
Trigger Rule | Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 1, 'upstream_failed': 0, 'done': 1}, upstream_task_ids={'getDrowToken'} |
Task Instance State | Task is in the 'upstream_failed' state which is not a valid state for execution. The task must be cleared in order to be run. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | def getMongoDB(**context):
token = context.get("ti").xcom_pull(key="token")
response = requests.get(
url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/667a4df5af0b2f37bdf6e00d?export_type=0",
headers={
"x-access-token": f"Bearer {token}",
"ICWPxAccessKey": "nd@201907ICWP_[1AG:4UdI){n=b~"
})
RISC_Data = json.loads(response.text)
Mapping= {
"A3. Date Time" : "a3_date_time",
"A1. No. of Walk": "a1_no_of_walk",
}
saftey_cats=[
"1. General",
"2. Flammable Liquids / Gases",
"3. Hazardous Substances",
"4. Electricity",
"5. Fire Precaution",
"6. Working Area",
"7. Lifting Operation",
"8. Material Hoist",
"9. Confined Spaces",
"10. Noise",
"11. Gas Welding and Cutting Equipment",
"12. Electricityâarc Welding",
"13. Mechanical Plant and Equipment",
"14. Tunnel",
"15. Formwork",
"16. Hoarding",
"17. Working at Height",
"18. Abrasive Wheels",
"19. Excavations",
"20. Slings and other Lifting Gears",
"21. Compressed Air/ Pneumatic Air Tools",
"22. Protection of the Public",
"23. Prevention of Mosquito Breed",
"24. Work Over Water",
"25. Welfare Facilities",
"26. Others"
]
host = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'
dbUserName = 'dRowAdmin'
dbUserPassword = 'drowsuper'
database = 'drowDateWareHouse'
charSet = "utf8mb4"
port = "5432"
conn_string = ('postgres://' +
dbUserName + ':' +
dbUserPassword +
'@' + host + ':' + port +
'/' + database)
db = create_engine(conn_string)
conn = db.connect()
df = pd.DataFrame()
with conn:
for x in RISC_Data:
# Normalize the nested JSON data into a flat DataFrame
df_nested_list = json_normalize(x['data'])
df2 = df_nested_list.reindex(columns=Mapping.keys())
df2.rename(columns=Mapping, inplace=True)
if len(x['ApproveLogSummary']) > 0:
# request_date = pd.to_datetime(df2["C1 - Inspect on Date Time"]) - pd.Timedelta(days=1)
request_data = [data for data in x['ApproveLogSummary'] if data.get('statusName')=="B : RSS Check/Agree Report"]
if len(request_data) > 0 and 'from' in request_data[-1]:
df2['sup_rep_signed_date'] = request_data[len(request_data)-1]['from']
else:
df2['sup_rep_signed_date'] = None
if len(request_data) > 0 and 'to' in request_data[-1]:
df2['contractor_rep_signed_date'] = request_data[len(request_data)-1]['to']
else:
df2['contractor_rep_signed_date'] = None
else:
df2['sup_rep_signed_date'] = None
df2['contractor_rep_signed_date'] = None
if x['data']['A1. No. of Walk'] != None :
df2["report_name"] = x['data']['A1. No. of Walk']
else :
df2["report_name"] = None
if (len(x['data']['Summary']) > 0):
total_late_retification = 0
for summaryData in x['data']['Summary']:
if ("Agreed Due Date for Completion" in summaryData and not (summaryData["Agreed Due Date for Completion"]!='') and (not (summaryData["Date Completion"]!='')) and (summaryData["Agreed Due Date for Completion"].astype('datetime64[ns]') < summaryData["Date Completion"].astype('datetime64[ns]')).bool()):
total_late_retification += 1
df2['total_late_retification'] = total_late_retification
else:
total_late_retification = 0
if (not df2['contractor_rep_signed_date'].isnull().bool() and not df2['a3_date_time'].isnull().bool()):
df2['days_complete'] = (((df2['contractor_rep_signed_date'].astype('datetime64[ns]') -
df2['a3_date_time'].astype('datetime64[ns]'))/ np.timedelta64(1, 'h'))/24).round(2)
if df2['days_complete'].isnull().bool() or df2['days_complete'].lt(0).bool():
df2['days_complete'] = 0
else:
df2['days_complete'] = None
df4=pd.DataFrame()
for saftey_cat in saftey_cats:
df3=df2.copy()
complete = 0
incomplete = 0
key = str(saftey_cat)[0:3].strip()+' Checklist'
if not df2['sup_rep_signed_date'].isnull().bool():
key = str(saftey_cat)[0:3].strip()+' Checklist'
if (len(x['data'][key]) > 0):
for record in x['data'][key]:
if record[str(saftey_cat)[0:3].strip()+' Result'] != 'N/A':
complete += 1
else:
if (len(x['data'][key]) > 0):
for record in x['data'][key]:
if record[str(saftey_cat)[0:3].strip()+' Result'] != 'N/A':
incomplete += 1
df3['saftey_cat'] = saftey_cat
df3['saftey_cat' + '_' + 'complete'] = complete
df3['saftey_cat' + '_' + 'incomplete'] = incomplete
df4 = df4.append(df3)
df2=df2.append(df4)
df = df.append(df2)
df['sup_rep_signed_date']=df['sup_rep_signed_date'].apply(pd.to_datetime)
df['contractor_rep_signed_date']=df['contractor_rep_signed_date'].apply(pd.to_datetime)
df['a3_date_time']=df['a3_date_time'].apply(pd.to_datetime)
df.columns = df.columns.str.replace(' ', '_').str.replace('.', '').str.replace('(', '_').str.replace(')', '').str.replace('%', 'percent').str.replace('/', '_')
df.to_sql('safety_walk_cv202303', con=conn, if_exists='replace', index= False)
|
Attribute | Value |
---|---|
dag_id | cv202303_safety_walk |
duration | None |
end_date | 2024-11-18 15:09:22.287485+00:00 |
execution_date | 2024-11-17T15:00:00+00:00 |
executor_config | {} |
generate_command | <function TaskInstance.generate_command at 0x7f152f9bf320> |
hostname | |
is_premature | False |
job_id | None |
key | ('cv202303_safety_walk', 'getMongoDB', <Pendulum [2024-11-17T15:00:00+00:00]>, 1) |
log | <Logger airflow.task (INFO)> |
log_filepath | /usr/local/airflow/logs/cv202303_safety_walk/getMongoDB/2024-11-17T15:00:00+00:00.log |
log_url | http://localhost:8080/admin/airflow/log?execution_date=2024-11-17T15%3A00%3A00%2B00%3A00&task_id=getMongoDB&dag_id=cv202303_safety_walk |
logger | <Logger airflow.task (INFO)> |
mark_success_url | http://localhost:8080/success?task_id=getMongoDB&dag_id=cv202303_safety_walk&execution_date=2024-11-17T15%3A00%3A00%2B00%3A00&upstream=false&downstream=false |
max_tries | 1 |
metadata | MetaData(bind=None) |
next_try_number | 1 |
operator | None |
pid | None |
pool | default_pool |
prev_attempted_tries | 0 |
previous_execution_date_success | 2024-11-13 15:00:00+00:00 |
previous_start_date_success | 2024-11-14 15:01:19.603745+00:00 |
previous_ti | <TaskInstance: cv202303_safety_walk.getMongoDB 2024-11-16 15:00:00+00:00 [upstream_failed]> |
previous_ti_success | <TaskInstance: cv202303_safety_walk.getMongoDB 2024-11-13 15:00:00+00:00 [success]> |
priority_weight | 1 |
queue | default |
queued_dttm | None |
raw | False |
run_as_user | None |
start_date | 2024-11-18 15:09:22.287465+00:00 |
state | upstream_failed |
task | <Task(PythonOperator): getMongoDB> |
task_id | getMongoDB |
test_mode | False |
try_number | 1 |
unixname | airflow |
Attribute | Value |
---|---|
dag | <DAG: cv202303_safety_walk> |
dag_id | cv202303_safety_walk |
depends_on_past | False |
deps | {<TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>} |
do_xcom_push | True |
downstream_list | [] |
downstream_task_ids | set() |
None | |
email_on_failure | True |
email_on_retry | True |
end_date | None |
execution_timeout | None |
executor_config | {} |
extra_links | [] |
global_operator_extra_link_dict | {} |
inlets | [] |
lineage_data | None |
log | <Logger airflow.task.operators (INFO)> |
logger | <Logger airflow.task.operators (INFO)> |
max_retry_delay | None |
on_failure_callback | None |
on_retry_callback | None |
on_success_callback | None |
op_args | [] |
op_kwargs | {'name': 'Dylan'} |
operator_extra_link_dict | {} |
operator_extra_links | () |
outlets | [] |
owner | airflow |
params | {} |
pool | default_pool |
priority_weight | 1 |
priority_weight_total | 1 |
provide_context | True |
queue | default |
resources | None |
retries | 1 |
retry_delay | 0:05:00 |
retry_exponential_backoff | False |
run_as_user | None |
schedule_interval | 0 15 * * * |
shallow_copy_attrs | ('python_callable', 'op_kwargs') |
sla | None |
start_date | 2023-01-17T00:00:00+00:00 |
subdag | None |
task_concurrency | None |
task_id | getMongoDB |
task_type | PythonOperator |
template_ext | [] |
template_fields | ('templates_dict', 'op_args', 'op_kwargs') |
templates_dict | None |
trigger_rule | all_success |
ui_color | #ffefeb |
ui_fgcolor | #000 |
upstream_list | [<Task(PythonOperator): getDrowToken>] |
upstream_task_ids | {'getDrowToken'} |
wait_for_downstream | False |
weight_rule | downstream |