DAG: nd201905_kpi

schedule: 0 7,15 * * *


Task Instance: writeKpiToPostgres


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Task Instance State Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
Attribute: python_callable
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def writeKpiToPostgres(**context):
    ti = context["ti"]

    # --- Pull KPI values from previous tasks ---
    elapsed_time = ti.xcom_pull(task_ids="getProgrammeData")  # KPI 1 (days)

    contractKPIs = ti.xcom_pull(task_ids="getContractData")   # KPI 2–4
    riskKPIs = ti.xcom_pull(task_ids="getRiskRegisterData")   # KPI 9–10

    kpi1 = float(elapsed_time or 0)
    kpi2 = float(contractKPIs.get("kpi2_ratio_pwdd") or 0)
    kpi3 = float(contractKPIs.get("kpi3_estimate_vs_forecast") or 0)
    kpi4 = float(contractKPIs.get("kpi4_pain_share") or 0)

    kpi9 = float(riskKPIs.get("kpi9_open_gt_3m") or 0)
    kpi10 = float(riskKPIs.get("kpi10_open_gt_6m") or 0)

    # --- DB connection ---
    host           = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'
    dbUserName     = 'dRowAdmin'
    dbUserPassword = 'drowsuper'
    database       = 'drowDateWareHouse'
    port           = "5432"

    conn_string = (
        'postgres://'
        + dbUserName + ':' + dbUserPassword
        + '@' + host + ':' + port
        + '/' + database
    )

    db = create_engine(conn_string)

    delete_table_sql_query = """DROP TABLE IF EXISTS nd201905_kpi"""

    create_table_sql_query = """
    CREATE TABLE IF NOT EXISTS nd201905_kpi (
        id SERIAL PRIMARY KEY,
        kpi_no INT NOT NULL,
        kpi_name VARCHAR(50) NOT NULL,
        subject TEXT,
        description TEXT,
        value VARCHAR(100) NOT NULL,
        unit VARCHAR(50),
        condition VARCHAR(10),
        action TEXT,
        created_at TIMESTAMP DEFAULT NOW(),
        updated_at TIMESTAMP DEFAULT NOW()
    );
    """

    # --- Build KPI rows ---
    rows = []

    # KPI 1
    cond, action = evaluate_kpi_condition(1, kpi1)
    rows.append({
        "kpi_no": 1,
        "kpi_name": "KPI No. 1",
        "subject": "Programme",
        "description": "No. of days between the acceptance of programme and the reporting date",
        "value": str(kpi1),
        "unit": "days",
        "condition": cond,
        "action": action,
    })

    # KPI 2
    cond, action = evaluate_kpi_condition(2, kpi2)
    rows.append({
        "kpi_no": 2,
        "kpi_name": "KPI No. 2",
        "subject": "Prices",
        "description": "PWDD % for target cost contract",
        "value": f"{kpi2:.2f}",
        "unit": "%",
        "condition": cond,
        "action": action,
    })

    # KPI 3
    cond, action = evaluate_kpi_condition(3, kpi3)
    rows.append({
        "kpi_no": 3,
        "kpi_name": "KPI No. 3",
        "subject": "Prices",
        "description": "Difference of total of the Prices between PM and Contractor",
        "value": f"{kpi3:.2f}",
        "unit": "%",
        "condition": cond,
        "action": action,
    })

    # KPI 4
    cond, action = evaluate_kpi_condition(4, kpi4)
    rows.append({
        "kpi_no": 4,
        "kpi_name": "KPI No. 4",
        "subject": "Prices",
        "description": "% of Pain Share / total of the Prices (adjusted by fee percentage)",
        "value": f"{kpi4:.2f}",
        "unit": "%",
        "condition": cond,
        "action": action,
    })

    # KPI 9
    cond, action = evaluate_kpi_condition(9, kpi9)
    rows.append({
        "kpi_no": 9,
        "kpi_name": "KPI No. 9",
        "subject": "Early Warning",
        "description": "No. of Early Warnings with Elapse time > 3 months",
        "value": str(int(kpi9)),
        "unit": "no.",
        "condition": cond,
        "action": action,
    })

    # KPI 10
    cond, action = evaluate_kpi_condition(10, kpi10)
    rows.append({
        "kpi_no": 10,
        "kpi_name": "KPI No. 10",
        "subject": "Early Warning",
        "description": "No. of Early Warnings with Elapse time > 6 months",
        "value": str(int(kpi10)),
        "unit": "no.",
        "condition": cond,
        "action": action,
    })

    kpi_df = pd.DataFrame(rows)

    print(f"check dataframe: {kpi_df}")

    # --- Drop, create, insert ---
    with db.begin() as conn:
        conn.execute(delete_table_sql_query)
        conn.execute(create_table_sql_query)

        kpi_df.to_sql(
            "nd201905_kpi",
            con=conn,
            if_exists="append",
            index=False,
        )

    print("nd201905_kpi table refreshed with KPI 1,2,3,4,9,10.")
Task Instance Attributes
Attribute Value
dag_id nd201905_kpi
duration 0.141048
end_date 2025-12-11 06:17:23.058325+00:00
execution_date 2025-12-11T06:14:17.830625+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7fb2f39d0320>
hostname a7c46ba165e9
is_premature False
job_id 839
key ('nd201905_kpi', 'writeKpiToPostgres', <Pendulum [2025-12-11T06:14:17.830625+00:00]>, 2)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/nd201905_kpi/writeKpiToPostgres/2025-12-11T06:14:17.830625+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-12-11T06%3A14%3A17.830625%2B00%3A00&task_id=writeKpiToPostgres&dag_id=nd201905_kpi
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=writeKpiToPostgres&dag_id=nd201905_kpi&execution_date=2025-12-11T06%3A14%3A17.830625%2B00%3A00&upstream=false&downstream=false
max_tries 1
metadata MetaData(bind=None)
next_try_number 2
operator PythonOperator
pid 753125
pool default_pool
prev_attempted_tries 1
previous_execution_date_success None
previous_start_date_success None
previous_ti <TaskInstance: nd201905_kpi.writeKpiToPostgres 2025-12-11 06:02:21.601192+00:00 [failed]>
previous_ti_success None
priority_weight 1
queue default
queued_dttm 2025-12-11 06:17:21.003202+00:00
raw False
run_as_user None
start_date 2025-12-11 06:17:22.917277+00:00
state success
task <Task(PythonOperator): writeKpiToPostgres>
task_id writeKpiToPostgres
test_mode False
try_number 2
unixname airflow
Task Attributes
Attribute Value
dag <DAG: nd201905_kpi>
dag_id nd201905_kpi
depends_on_past False
deps {<TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>, <TIDep(Not In Retry Period)>}
do_xcom_push True
downstream_list []
downstream_task_ids set()
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner airflow
params {}
pool default_pool
priority_weight 1
priority_weight_total 1
provide_context True
queue default
resources None
retries 1
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 7,15 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-01-17T00:00:00+00:00
subdag None
task_concurrency None
task_id writeKpiToPostgres
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): getRiskRegisterData>]
upstream_task_ids {'getRiskRegisterData'}
wait_for_downstream False
weight_rule downstream