| Dependency | Reason |
|---|---|
| Task Instance State | Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run. |
| Dagrun Running | Task instance's dagrun was not in the 'running' state but in the state 'success'. |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 | def writeKpiToPostgres(**context):
ti = context["ti"]
# --- Pull KPI values from previous tasks ---
elapsed_time = ti.xcom_pull(task_ids="getProgrammeData") # KPI 1 (days)
contractKPIs = ti.xcom_pull(task_ids="getContractData") # KPI 2–4
riskKPIs = ti.xcom_pull(task_ids="getRiskRegisterData") # KPI 9–10
kpi1 = float(elapsed_time or 0)
kpi2 = float(contractKPIs.get("kpi2_ratio_pwdd") or 0)
kpi3 = float(contractKPIs.get("kpi3_estimate_vs_forecast") or 0)
kpi4 = float(contractKPIs.get("kpi4_pain_share") or 0)
kpi9 = float(riskKPIs.get("kpi9_open_gt_3m") or 0)
kpi10 = float(riskKPIs.get("kpi10_open_gt_6m") or 0)
# --- DB connection ---
host = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'
dbUserName = 'dRowAdmin'
dbUserPassword = 'drowsuper'
database = 'drowDateWareHouse'
port = "5432"
conn_string = (
'postgres://'
+ dbUserName + ':' + dbUserPassword
+ '@' + host + ':' + port
+ '/' + database
)
db = create_engine(conn_string)
delete_table_sql_query = """DROP TABLE IF EXISTS nd201905_kpi"""
create_table_sql_query = """
CREATE TABLE IF NOT EXISTS nd201905_kpi (
id SERIAL PRIMARY KEY,
kpi_no INT NOT NULL,
kpi_name VARCHAR(50) NOT NULL,
subject TEXT,
description TEXT,
value VARCHAR(100) NOT NULL,
unit VARCHAR(50),
condition VARCHAR(10),
action TEXT,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
);
"""
# --- Build KPI rows ---
rows = []
# KPI 1
cond, action = evaluate_kpi_condition(1, kpi1)
rows.append({
"kpi_no": 1,
"kpi_name": "KPI No. 1",
"subject": "Programme",
"description": "No. of days between the acceptance of programme and the reporting date",
"value": str(kpi1),
"unit": "days",
"condition": cond,
"action": action,
})
# KPI 2
cond, action = evaluate_kpi_condition(2, kpi2)
rows.append({
"kpi_no": 2,
"kpi_name": "KPI No. 2",
"subject": "Prices",
"description": "PWDD % for target cost contract",
"value": f"{kpi2:.2f}",
"unit": "%",
"condition": cond,
"action": action,
})
# KPI 3
cond, action = evaluate_kpi_condition(3, kpi3)
rows.append({
"kpi_no": 3,
"kpi_name": "KPI No. 3",
"subject": "Prices",
"description": "Difference of total of the Prices between PM and Contractor",
"value": f"{kpi3:.2f}",
"unit": "%",
"condition": cond,
"action": action,
})
# KPI 4
cond, action = evaluate_kpi_condition(4, kpi4)
rows.append({
"kpi_no": 4,
"kpi_name": "KPI No. 4",
"subject": "Prices",
"description": "% of Pain Share / total of the Prices (adjusted by fee percentage)",
"value": f"{kpi4:.2f}",
"unit": "%",
"condition": cond,
"action": action,
})
# KPI 9
cond, action = evaluate_kpi_condition(9, kpi9)
rows.append({
"kpi_no": 9,
"kpi_name": "KPI No. 9",
"subject": "Early Warning",
"description": "No. of Early Warnings with Elapse time > 3 months",
"value": str(int(kpi9)),
"unit": "no.",
"condition": cond,
"action": action,
})
# KPI 10
cond, action = evaluate_kpi_condition(10, kpi10)
rows.append({
"kpi_no": 10,
"kpi_name": "KPI No. 10",
"subject": "Early Warning",
"description": "No. of Early Warnings with Elapse time > 6 months",
"value": str(int(kpi10)),
"unit": "no.",
"condition": cond,
"action": action,
})
kpi_df = pd.DataFrame(rows)
print(f"check dataframe: {kpi_df}")
# --- Drop, create, insert ---
with db.begin() as conn:
conn.execute(delete_table_sql_query)
conn.execute(create_table_sql_query)
kpi_df.to_sql(
"nd201905_kpi",
con=conn,
if_exists="append",
index=False,
)
print("nd201905_kpi table refreshed with KPI 1,2,3,4,9,10.")
|
| Attribute | Value |
|---|---|
| dag_id | nd201905_kpi |
| duration | 1.604655 |
| end_date | 2025-12-12 07:05:07.988127+00:00 |
| execution_date | 2025-12-11T15:00:00+00:00 |
| executor_config | {} |
| generate_command | <function TaskInstance.generate_command at 0x7fb2f39d0320> |
| hostname | 406be32ba58f |
| is_premature | False |
| job_id | 1507 |
| key | ('nd201905_kpi', 'writeKpiToPostgres', <Pendulum [2025-12-11T15:00:00+00:00]>, 2) |
| log | <Logger airflow.task (INFO)> |
| log_filepath | /usr/local/airflow/logs/nd201905_kpi/writeKpiToPostgres/2025-12-11T15:00:00+00:00.log |
| log_url | http://localhost:8080/admin/airflow/log?execution_date=2025-12-11T15%3A00%3A00%2B00%3A00&task_id=writeKpiToPostgres&dag_id=nd201905_kpi |
| logger | <Logger airflow.task (INFO)> |
| mark_success_url | http://localhost:8080/success?task_id=writeKpiToPostgres&dag_id=nd201905_kpi&execution_date=2025-12-11T15%3A00%3A00%2B00%3A00&upstream=false&downstream=false |
| max_tries | 1 |
| metadata | MetaData(bind=None) |
| next_try_number | 2 |
| operator | PythonOperator |
| pid | 1671798 |
| pool | default_pool |
| prev_attempted_tries | 1 |
| previous_execution_date_success | 2025-12-11 07:05:38.846668+00:00 |
| previous_start_date_success | 2025-12-11 07:08:07.774326+00:00 |
| previous_ti | <TaskInstance: nd201905_kpi.writeKpiToPostgres 2025-12-11 07:05:38.846668+00:00 [success]> |
| previous_ti_success | <TaskInstance: nd201905_kpi.writeKpiToPostgres 2025-12-11 07:05:38.846668+00:00 [success]> |
| priority_weight | 1 |
| queue | default |
| queued_dttm | 2025-12-12 07:05:01.304546+00:00 |
| raw | False |
| run_as_user | None |
| start_date | 2025-12-12 07:05:06.383472+00:00 |
| state | success |
| task | <Task(PythonOperator): writeKpiToPostgres> |
| task_id | writeKpiToPostgres |
| test_mode | False |
| try_number | 2 |
| unixname | airflow |
| Attribute | Value |
|---|---|
| dag | <DAG: nd201905_kpi> |
| dag_id | nd201905_kpi |
| depends_on_past | False |
| deps | {<TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>, <TIDep(Not In Retry Period)>} |
| do_xcom_push | True |
| downstream_list | [] |
| downstream_task_ids | set() |
| None | |
| email_on_failure | True |
| email_on_retry | True |
| end_date | None |
| execution_timeout | None |
| executor_config | {} |
| extra_links | [] |
| global_operator_extra_link_dict | {} |
| inlets | [] |
| lineage_data | None |
| log | <Logger airflow.task.operators (INFO)> |
| logger | <Logger airflow.task.operators (INFO)> |
| max_retry_delay | None |
| on_failure_callback | None |
| on_retry_callback | None |
| on_success_callback | None |
| op_args | [] |
| op_kwargs | {} |
| operator_extra_link_dict | {} |
| operator_extra_links | () |
| outlets | [] |
| owner | airflow |
| params | {} |
| pool | default_pool |
| priority_weight | 1 |
| priority_weight_total | 1 |
| provide_context | True |
| queue | default |
| resources | None |
| retries | 1 |
| retry_delay | 0:05:00 |
| retry_exponential_backoff | False |
| run_as_user | None |
| schedule_interval | 0 7,15 * * * |
| shallow_copy_attrs | ('python_callable', 'op_kwargs') |
| sla | None |
| start_date | 2023-01-17T00:00:00+00:00 |
| subdag | None |
| task_concurrency | None |
| task_id | writeKpiToPostgres |
| task_type | PythonOperator |
| template_ext | [] |
| template_fields | ('templates_dict', 'op_args', 'op_kwargs') |
| templates_dict | None |
| trigger_rule | all_success |
| ui_color | #ffefeb |
| ui_fgcolor | #000 |
| upstream_list | [<Task(PythonOperator): getRiskRegisterData>] |
| upstream_task_ids | {'getRiskRegisterData'} |
| wait_for_downstream | False |
| weight_rule | downstream |