DAG: sd201903_icwp_safety_data

schedule: 0 15 * * *


sd201903_icwp_safety_data

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
try:

    import json
    from datetime import datetime, timedelta

    import numpy as np
    import pandas as pd
    import requests
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from pandas.io.json import json_normalize
    from sqlalchemy import create_engine

except Exception as e:
    print("Error {} ".format(e))

dRoW_api_end_url = "https://uat2.drow.cloud"

def getDrowToken(**context):

    response = requests.post(
    url=f"{dRoW_api_end_url}/api/auth/authenticate",
    data={
    "username": "icwp2@drow.cloud",
    "password": "dGVzdDAxQHRlc3QuY29t"
    }
    ).json()
    context["ti"].xcom_push(key="token", value=response['token'])


def getMongoDB(**context):
    token = context.get("ti").xcom_pull(key="token")
    response = requests.get(
    url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/66331ff38824b3535358a0f7?export_type=0",
    headers={
            "x-access-token": f"Bearer {token}",
            "ICWPxAccessKey": "5WSD21ICWP_[1AG:4UdI){n=b~"
        }
    )

    RISC_Data = json.loads(response.text)
    Mapping= {
    "Year - Month " : "year_month", 
    "1. Monthly Workforce" : "1_monthly_workforce",
    "1a.  Monthly Site Staff" : "1a_monthly_site_staff",
    "3. Monthly Man-hours": "3_monthly_man_hours",
    "4. Cumulative Man-hours": "4_cumlateive_man_hour",
    "5. Cumulative Man-days": "5_cumlative_man_days",
    "6. Cumulative Number of Reportable Accident": "6_cumlative_number_of_reportable_accident",
    "7. Monthly Accident Rate per 100,000 man-hours": "7_monthly_accident_rate_per_100000_manhours",
    "8. Cumulative Accident Rate per 100,000 man-hours": "8_cumulative_accident_rate_per_100000_manhours",
    "9. Monthly Accident Rate per 1,000 workers": "9_monthly_accident_rate_per_1000_workers",
    "10. Cumulative Accident Rate per 1,000 workers": "10_cumulative_accident_rate_per_1000_workers",
    }

    host                  = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'  
    # User name of the database server
    dbUserName            = 'dRowAdmin'  
    # Password for the database user
    dbUserPassword        = 'drowsuper'  
    # Name of the database 
    database              = 'drowDateWareHouse'
    # Character set
    charSet               = "utf8mb4"  
    port                  = "5432"

    conn_string = ('postgres://' +
                           dbUserName + ':' + 
                           dbUserPassword +
                           '@' + host + ':' + port +
                           '/' + database)
    
    db = create_engine(conn_string)
    conn = db.connect()
    with conn as conn:
        df = pd.DataFrame()
        for x in RISC_Data:
            df_nested_list = json_normalize(x['data'])
            df2 = df_nested_list.reindex(columns=Mapping.keys())

            if '2. Reportable Accident' in x['data'] and len(x['data']['2. Reportable Accident']) > 0:
                df2['2T Reportable Accident Total'] = sum(c.get('2.2 Reportable Accident No.', 0) for c in x['data']['2. Reportable Accident'])
                for c in x['data']['2. Reportable Accident']:
                    df2['2.1 Reportable Accident Cat' + '_' + c.get('2.1 Reportable Accident Cat', '')] = c.get('2.2 Reportable Accident No.', 0)
            else:
                df2['2T Reportable Accident Total'] = 0

            if '12. Unsafe conditions identified during inspections' in x['data'] and len(x['data']['12. Unsafe conditions identified during inspections']) > 0:
                df2['12T Unsafe Conditions Total'] = sum(c.get('12.2 Unsafe Conditions No.', 0) for c in x['data']['12. Unsafe conditions identified during inspections'])
                for c in x['data']['12. Unsafe conditions identified during inspections']:
                    df2['12.1 Unsafe Conditions Cat' + '_' + c.get('12.1 Unsafe Conditions Cat', '')] = c.get('12.2 Unsafe Conditions No.', 0)
            else:
                df2['12T Unsafe Conditions Total'] = 0

            if '13. Near-miss Reports' in x['data'] and len(x['data']['13. Near-miss Reports']) > 0:
                df2['13T Near Miss Total'] = sum(c.get('13.2 Near Miss No.', 0) for c in x['data']['13. Near-miss Reports'])
                for c in x['data']['13. Near-miss Reports']:
                    df2['13.1 Near Miss Cat' + '_' + c.get('13.1 Near Miss Cat', '')] = c.get('13.2 Near Miss No.', 0)
            else:
                df2['13T Near Miss Total'] = 0

            if '14. Incident Reports' in x['data'] and len(x['data']['14. Incident Reports']) > 0:
                df2['14T Incident Reports Total'] = sum(c.get('14.2 Incident Reports No.', 0) for c in x['data']['14. Incident Reports'])
                for c in x['data']['14. Incident Reports']:
                    df2['14.1 Incident Reports Cat' + '_' + c.get('14.1 Incident Reports Cat', '')] = c.get('14.2 Incident Reports No.', 0)
            else:
                df2['14T Incident Reports Total'] = 0
                
            if '15. LD/MD Improvement and Suspension Notice' in x['data'] and len(x['data']['15. LD/MD Improvement and Suspension Notice']) > 0:
                df2['15T LD/MD Improvement and Suspension Notice Total'] = sum(c.get('15.2 LD/MD Improvement and Suspension Notice No.', 0) for c in x['data']['15. LD/MD Improvement and Suspension Notice'])
                for c in x['data']['15. LD/MD Improvement and Suspension Notice']:
                    df2['15.1 LD/MD Improvement and Suspension Notice Cat' + '_' + c.get('15.1 LD/MD Improvement and Suspension Notice Cat', '')] = c.get('15.2 LD/MD Improvement and Suspension Notice No.', 0)
            else:
                df2['15T LD/MD Improvement and Suspension Notice Total'] = 0            

            if '16. Safety Convictions Records' in x['data'] and len(x['data']['16. Safety Convictions Records']) > 0:
                df2['16T Safety Convictions Total'] = sum(c.get('16.2 Safety Convictions No.', 0) for c in x['data']['16. Safety Convictions Records'])
                for c in x['data']['16. Safety Convictions Records']:
                    df2['16.1 Safety Convictions Cat' + '_' + c.get('16.1 Safety Convictions Cat', '')] = c.get('16.2 Safety Convictions No.', 0)
            else:
                df2['16T Safety Convictions Total'] = 0

            df2.rename(columns=Mapping, inplace=True)
            df2.columns = df2.columns.str.replace(' ', '_').str.replace('.', '').str.replace('(', '_').str.replace(')', '').str.replace('%', 'percent').str.replace('/', '_')

            df = df.append(df2)
        df['year_month']=df['year_month'].apply(pd.to_datetime)
        df.to_sql('icwp_safety_data_sd201903', con=conn, if_exists='replace', index= False)

# */2 * * * * Execute every two minute 
with DAG(
        dag_id="sd201903_icwp_safety_data",
        schedule_interval="0 15 * * *",
        default_args={
            "owner": "airflow",
            "retries": 1,
            "retry_delay": timedelta(minutes=5),
            "start_date": datetime(2023, 1, 17)
        },
        catchup=False) as f:
    
    getMongoDB = PythonOperator(
        task_id="getMongoDB",
        python_callable=getMongoDB,
        op_kwargs={"name": "Dylan"},
        provide_context=True,
    )

    getDrowToken = PythonOperator(
        task_id="getDrowToken",
        python_callable=getDrowToken,
        provide_context=True,
    )

getDrowToken >> getMongoDB