DAG: nd201905_kpi

schedule: 0 7,15 * * *


Task Instance: getContractData


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Task Instance State Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
Trigger Rule Task's trigger rule 'all_success' requires all upstream tasks to have succeeded, but found 1 non-success(es). upstream_tasks_state={'total': 1, 'successes': 0, 'skipped': 0, 'failed': 0, 'upstream_failed': 0, 'done': 0}, upstream_task_ids={'getProgrammeData'}
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
Attribute: python_callable
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def getContractData(**context):
    print("going to get Contract data now!")
    ti = context.get("ti")
    token = ti.xcom_pull(key="token")

    response = requests.get(
        url=f"{dRoW_api_end_url}/api/sheets/63fd68e49f48080c646e7f32?with_records=true&fields=",
        headers={"x-access-token": f"Bearer {token}"}
    )

    sheet = response.json()
    records = sheet["record"]

    # Column names in dRoW – confirm spelling with your sheet headers
    col_id = "id"
    col_cumulative = "Cumulative PWDD"
    col_forecast_pwdd = "Forecast of the final Prices for the Work Done to Date (PWDD)"

    col_latest_est_contractor = "Latest Estimate by the Contractor"
    col_latest_forecast_total = "Latest Forecast Total of the Prices"

    col_total_pain = "Total Pain"
    col_contractor_pain_share = "Contractor Pain Share"  # might be 0.5 or "50%"
    col_forecast_final_total = "Forecast of the final total of the Prices"

    latest_id = -1
    latest_map = None  # will hold the col_map for the max id row

    def to_float(val):
        """Convert dRoW string/number/percent to float safely."""
        if val in (None, "", "NA"):
            return None
        if isinstance(val, (int, float)):
            return float(val)

        s = str(val).strip()
        if s.endswith("%"):
            try:
                return float(s[:-1].replace(",", "").strip()) / 100.0
            except ValueError:
                return None

        try:
            return float(s.replace(",", ""))
        except ValueError:
            return None

    for record in records:
        col_map = {v["colName"]: v.get("value") for v in record.get("values", [])}

        raw_id = col_map.get(col_id)
        if raw_id is None or raw_id == "":
            continue

        try:
            numeric_id = int(raw_id)
        except ValueError:
            continue

        if numeric_id > latest_id:
            latest_id = numeric_id
            latest_map = col_map

    if latest_map is None:
        print("No valid contract record found (no numeric id).")
        return None

    print(f"Using contract record with largest id = {latest_id}")

    # 2) Extract values from that latest row
    # ----- KPI 2 -----
    cumulative_pwdd = to_float(latest_map.get(col_cumulative)) or 0
    forecast_pwdd = to_float(latest_map.get(col_forecast_pwdd)) or 0

    if forecast_pwdd != 0:
        kpi2 = round((cumulative_pwdd / forecast_pwdd) * 100, 2)
    else:
        kpi2 = 0

    # ----- KPI 3 -----
    latest_est_contractor = to_float(latest_map.get(col_latest_est_contractor)) or 0
    latest_forecast_total = to_float(latest_map.get(col_latest_forecast_total)) or 0

    avg = (latest_est_contractor + latest_forecast_total) / 2.0
    print(f"check average: {avg}")

    if avg != 0:
        kpi3 = round(((latest_est_contractor - latest_forecast_total) / avg) * 100, 2)
    else:
        kpi3 = 0

    # ----- KPI 4 -----
    total_pain = to_float(latest_map.get(col_total_pain)) or 0
    contractor_pain_share = to_float(latest_map.get(col_contractor_pain_share)) or 0
    forecast_final_total = to_float(latest_map.get(col_forecast_final_total)) or 0

    if forecast_final_total != 0:
        kpi4 = round(((total_pain * contractor_pain_share) / forecast_final_total) * 100, 2)
    else:
        kpi4 = 0


    print("KPI2 (PWDD ratio %) =", kpi2)
    print("KPI3 (Estimate vs Forecast %) =", kpi3)
    print("KPI4 (Pain share %) =", kpi4)

    kpi_result = {
        "kpi2_ratio_pwdd": kpi2,
        "kpi3_estimate_vs_forecast": kpi3,
        "kpi4_pain_share": kpi4,
    }

    # If you want to push explicitly:
    ti.xcom_push(key="CONTRACT_KPIS", value=kpi_result)

    return kpi_result
Task Instance Attributes
Attribute Value
dag_id nd201905_kpi
duration 3.332975
end_date 2025-12-11 02:21:32.452264+00:00
execution_date 2025-12-11T02:20:43.348295+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7ffa72f03320>
hostname a7c46ba165e9
is_premature False
job_id 710
key ('nd201905_kpi', 'getContractData', <Pendulum [2025-12-11T02:20:43.348295+00:00]>, 2)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/nd201905_kpi/getContractData/2025-12-11T02:20:43.348295+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-12-11T02%3A20%3A43.348295%2B00%3A00&task_id=getContractData&dag_id=nd201905_kpi
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=getContractData&dag_id=nd201905_kpi&execution_date=2025-12-11T02%3A20%3A43.348295%2B00%3A00&upstream=false&downstream=false
max_tries 1
metadata MetaData(bind=None)
next_try_number 2
operator PythonOperator
pid 614060
pool default_pool
prev_attempted_tries 1
previous_execution_date_success 2025-12-11 01:51:07.889826+00:00
previous_start_date_success 2025-12-11 01:52:07.252373+00:00
previous_ti <TaskInstance: nd201905_kpi.getContractData 2025-12-11 01:51:07.889826+00:00 [success]>
previous_ti_success <TaskInstance: nd201905_kpi.getContractData 2025-12-11 01:51:07.889826+00:00 [success]>
priority_weight 1
queue default
queued_dttm 2025-12-11 02:21:27.458475+00:00
raw False
run_as_user None
start_date 2025-12-11 02:21:29.119289+00:00
state success
task <Task(PythonOperator): getContractData>
task_id getContractData
test_mode False
try_number 2
unixname airflow
Task Attributes
Attribute Value
dag <DAG: nd201905_kpi>
dag_id nd201905_kpi
depends_on_past False
deps {<TIDep(Previous Dagrun State)>, <TIDep(Not In Retry Period)>, <TIDep(Trigger Rule)>}
do_xcom_push True
downstream_list [<Task(PythonOperator): getRiskRegisterData>]
downstream_task_ids {'getRiskRegisterData'}
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner airflow
params {}
pool default_pool
priority_weight 1
priority_weight_total 5
provide_context True
queue default
resources None
retries 1
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 7,15 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-01-17T00:00:00+00:00
subdag None
task_concurrency None
task_id getContractData
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): getProgrammeData>]
upstream_task_ids {'getProgrammeData'}
wait_for_downstream False
weight_rule downstream