DAG: nd201905_kpi

schedule: 0 7,15 * * *


Task Instance: getContractData


Task Instance Details

Dependencies Blocking Task From Getting Scheduled
Dependency Reason
Task Instance State Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
Dagrun Running Task instance's dagrun was not in the 'running' state but in the state 'success'.
Attribute: python_callable
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def getContractData(**context):
    print("going to get Contract data now!")
    ti = context.get("ti")
    token = ti.xcom_pull(key="token")

    response = requests.get(
        url=f"{dRoW_api_end_url}/api/sheets/63fd68e49f48080c646e7f32?with_records=true&fields=",
        headers={"x-access-token": f"Bearer {token}"}
    )

    sheet = response.json()
    records = sheet["record"]

    # Column names in dRoW – confirm spelling with your sheet headers
    col_id = "id"
    col_cumulative = "Cumulative PWDD"
    col_forecast_pwdd = "Forecast of the final Prices for the Work Done to Date (PWDD)"

    col_latest_est_contractor = "Latest Estimate by the Contractor"
    col_latest_forecast_total = "Latest Forecast Total of the Prices"

    col_total_pain = "Total Pain"
    col_contractor_pain_share = "Contractor Pain Share"  # might be 0.5 or "50%"
    col_forecast_final_total = "Forecast of the final total of the Prices"

    latest_id = -1
    latest_map = None  # will hold the col_map for the max id row

    def to_float(val):
        """Convert dRoW string/number/percent to float safely."""
        if val in (None, "", "NA"):
            return None
        if isinstance(val, (int, float)):
            return float(val)

        s = str(val).strip()
        if s.endswith("%"):
            try:
                return float(s[:-1].replace(",", "").strip()) / 100.0
            except ValueError:
                return None

        try:
            return float(s.replace(",", ""))
        except ValueError:
            return None

    for record in records:
        col_map = {v["colName"]: v.get("value") for v in record.get("values", [])}

        raw_id = col_map.get(col_id)
        if raw_id is None or raw_id == "":
            continue

        try:
            numeric_id = int(raw_id)
        except ValueError:
            continue

        if numeric_id > latest_id:
            latest_id = numeric_id
            latest_map = col_map

    if latest_map is None:
        print("No valid contract record found (no numeric id).")
        return None

    print(f"Using contract record with largest id = {latest_id}")

    # 2) Extract values from that latest row
    # ----- KPI 2 -----
    cumulative_pwdd = to_float(latest_map.get(col_cumulative)) or 0
    forecast_pwdd = to_float(latest_map.get(col_forecast_pwdd)) or 0

    if forecast_pwdd != 0:
        kpi2 = round((cumulative_pwdd / forecast_pwdd) * 100, 2)
    else:
        kpi2 = 0

    # ----- KPI 3 -----
    latest_est_contractor = to_float(latest_map.get(col_latest_est_contractor)) or 0
    latest_forecast_total = to_float(latest_map.get(col_latest_forecast_total)) or 0

    avg = (latest_est_contractor + latest_forecast_total) / 2.0
    print(f"check average: {avg}")

    if avg != 0:
        kpi3 = round(((latest_est_contractor - latest_forecast_total) / avg) * 100, 2)
    else:
        kpi3 = 0

    # ----- KPI 4 -----
    total_pain = to_float(latest_map.get(col_total_pain)) or 0
    contractor_pain_share = to_float(latest_map.get(col_contractor_pain_share)) or 0
    forecast_final_total = to_float(latest_map.get(col_forecast_final_total)) or 0

    if forecast_final_total != 0:
        kpi4 = round(((total_pain * contractor_pain_share) / forecast_final_total) * 100, 2)
    else:
        kpi4 = 0


    print("KPI2 (PWDD ratio %) =", kpi2)
    print("KPI3 (Estimate vs Forecast %) =", kpi3)
    print("KPI4 (Pain share %) =", kpi4)

    kpi_result = {
        "kpi2_ratio_pwdd": kpi2,
        "kpi3_estimate_vs_forecast": kpi3,
        "kpi4_pain_share": kpi4,
    }

    # If you want to push explicitly:
    ti.xcom_push(key="CONTRACT_KPIS", value=kpi_result)

    return kpi_result
Task Instance Attributes
Attribute Value
dag_id nd201905_kpi
duration 0.318778
end_date 2025-12-11 03:56:13.415958+00:00
execution_date 2025-12-11T03:54:29.510399+00:00
executor_config {}
generate_command <function TaskInstance.generate_command at 0x7fb2f39d0320>
hostname 406be32ba58f
is_premature False
job_id 735
key ('nd201905_kpi', 'getContractData', <Pendulum [2025-12-11T03:54:29.510399+00:00]>, 2)
log <Logger airflow.task (INFO)>
log_filepath /usr/local/airflow/logs/nd201905_kpi/getContractData/2025-12-11T03:54:29.510399+00:00.log
log_url http://localhost:8080/admin/airflow/log?execution_date=2025-12-11T03%3A54%3A29.510399%2B00%3A00&task_id=getContractData&dag_id=nd201905_kpi
logger <Logger airflow.task (INFO)>
mark_success_url http://localhost:8080/success?task_id=getContractData&dag_id=nd201905_kpi&execution_date=2025-12-11T03%3A54%3A29.510399%2B00%3A00&upstream=false&downstream=false
max_tries 1
metadata MetaData(bind=None)
next_try_number 2
operator PythonOperator
pid 709140
pool default_pool
prev_attempted_tries 1
previous_execution_date_success 2025-12-11 03:48:51.052009+00:00
previous_start_date_success 2025-12-11 03:50:20.556340+00:00
previous_ti <TaskInstance: nd201905_kpi.getContractData 2025-12-11 03:48:51.052009+00:00 [success]>
previous_ti_success <TaskInstance: nd201905_kpi.getContractData 2025-12-11 03:48:51.052009+00:00 [success]>
priority_weight 1
queue default
queued_dttm 2025-12-11 03:56:11.253201+00:00
raw False
run_as_user None
start_date 2025-12-11 03:56:13.097180+00:00
state success
task <Task(PythonOperator): getContractData>
task_id getContractData
test_mode False
try_number 2
unixname airflow
Task Attributes
Attribute Value
dag <DAG: nd201905_kpi>
dag_id nd201905_kpi
depends_on_past False
deps {<TIDep(Trigger Rule)>, <TIDep(Previous Dagrun State)>, <TIDep(Not In Retry Period)>}
do_xcom_push True
downstream_list [<Task(PythonOperator): getRiskRegisterData>]
downstream_task_ids {'getRiskRegisterData'}
email None
email_on_failure True
email_on_retry True
end_date None
execution_timeout None
executor_config {}
extra_links []
global_operator_extra_link_dict {}
inlets []
lineage_data None
log <Logger airflow.task.operators (INFO)>
logger <Logger airflow.task.operators (INFO)>
max_retry_delay None
on_failure_callback None
on_retry_callback None
on_success_callback None
op_args []
op_kwargs {}
operator_extra_link_dict {}
operator_extra_links ()
outlets []
owner airflow
params {}
pool default_pool
priority_weight 1
priority_weight_total 3
provide_context True
queue default
resources None
retries 1
retry_delay 0:05:00
retry_exponential_backoff False
run_as_user None
schedule_interval 0 7,15 * * *
shallow_copy_attrs ('python_callable', 'op_kwargs')
sla None
start_date 2023-01-17T00:00:00+00:00
subdag None
task_concurrency None
task_id getContractData
task_type PythonOperator
template_ext []
template_fields ('templates_dict', 'op_args', 'op_kwargs')
templates_dict None
trigger_rule all_success
ui_color #ffefeb
ui_fgcolor #000
upstream_list [<Task(PythonOperator): getProgrammeData>]
upstream_task_ids {'getProgrammeData'}
wait_for_downstream False
weight_rule downstream