1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153 | try:
import json
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import requests
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from pandas.io.json import json_normalize
from sqlalchemy import create_engine
except Exception as e:
print("Error {} ".format(e))
dRoW_api_end_url = "https://drow.cloud"
def getDrowToken(**context):
response = requests.post(
url=f"{dRoW_api_end_url}/api/auth/authenticate",
data={
"username": "keexiansuen@drow.cloud",
"password": "c3UxMTk5a3ghIQ==",
}).json()
context["ti"].xcom_push(key="token", value=response['token'])
def getMongoDB(**context):
token = context.get("ti").xcom_pull(key="token")
response = requests.get(
url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/66175b52dac217ba1401741e?export_type=0",
headers={
"x-access-token": f"Bearer {token}",
"ICWPxAccessKey": "5WSD21ICWP_[1AG:4UdI){n=b~"
}
)
RISC_Data = json.loads(response.text)
Mapping= {
"Year - Month " : "year_month",
"1. Monthly Workforce" : "1_monthly_workforce",
"1a. Monthly Site Staff" : "1a_monthly_site_staff",
"3. Monthly Man-hours": "3_monthly_man_hours",
"4. Cumulative Man-hours": "4_cumlateive_man_hour",
"5. Cumulative Man-days": "5_cumlative_man_days",
"6. Cumulative Number of Reportable Accident": "6_cumlative_number_of_reportable_accident",
"7. Monthly Accident Rate per 100,000 man-hours": "7_monthly_accident_rate_per_100000_manhours",
"8. Cumulative Accident Rate per 100,000 man-hours": "8_cumulative_accident_rate_per_100000_manhours",
"9. Monthly Accident Rate per 1,000 workers": "9_monthly_accident_rate_per_1000_workers",
"10. Cumulative Accident Rate per 1,000 workers": "10_cumulative_accident_rate_per_1000_workers",
}
host = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'
# User name of the database server
dbUserName = 'dRowAdmin'
# Password for the database user
dbUserPassword = 'drowsuper'
# Name of the database
database = 'drowDateWareHouse'
# Character set
charSet = "utf8mb4"
port = "5432"
conn_string = ('postgres://' +
dbUserName + ':' +
dbUserPassword +
'@' + host + ':' + port +
'/' + database)
db = create_engine(conn_string)
conn = db.connect()
with conn as conn:
df = pd.DataFrame()
for x in RISC_Data:
df_nested_list = json_normalize(x['data'])
df2 = df_nested_list.reindex(columns=Mapping.keys())
if '2. Reportable Accident' in x['data'] and len(x['data']['2. Reportable Accident']) > 0:
df2['2T Reportable Accident Total'] = sum(c.get('2.2 Reportable Accident No.', 0) for c in x['data']['2. Reportable Accident'] if c.get('2.2 Reportable Accident No.') is not None)
for c in x['data']['2. Reportable Accident']:
df2['2.1 Reportable Accident Cat' + '_' + c.get('2.1 Reportable Accident Cat', '')] = c.get('2.2 Reportable Accident No.', 0)
else:
df2['2T Reportable Accident Total'] = 0
if '12. Unsafe conditions identified during inspections' in x['data'] and len(x['data']['12. Unsafe conditions identified during inspections']) > 0:
df2['12T Unsafe Conditions Total'] = sum(c.get('12.2 Unsafe Conditions No.', 0) for c in x['data']['12. Unsafe conditions identified during inspections'] if c.get('12.2 Unsafe Conditions No.') is not None)
for c in x['data']['12. Unsafe conditions identified during inspections']:
df2['12.1 Unsafe Conditions Cat' + '_' + c.get('12.1 Unsafe Conditions Cat', '')] = c.get('12.2 Unsafe Conditions No.', 0)
else:
df2['12T Unsafe Conditions Total'] = 0
if '13. Near-miss Reports' in x['data'] and len(x['data']['13. Near-miss Reports']) > 0:
df2['13T Near Miss Total'] = sum(c.get('13.2 Near Miss No.', 0) for c in x['data']['13. Near-miss Reports'] if c.get('13.2 Near Miss No.') is not None)
for c in x['data']['13. Near-miss Reports']:
df2['13.1 Near Miss Cat' + '_' + c.get('13.1 Near Miss Cat', '')] = c.get('13.2 Near Miss No.', 0)
else:
df2['13T Near Miss Total'] = 0
if '14. Incident Reports' in x['data'] and len(x['data']['14. Incident Reports']) > 0:
df2['14T Incident Reports Total'] = sum(c.get('14.2 Incident Reports No.', 0) for c in x['data']['14. Incident Reports'] if c.get('14.2 Incident Reports No.') is not None)
for c in x['data']['14. Incident Reports']:
df2['14.1 Incident Reports Cat' + '_' + c.get('14.1 Incident Reports Cat', '')] = c.get('14.2 Incident Reports No.', 0)
else:
df2['14T Incident Reports Total'] = 0
if '15. LD/MD Improvement and Suspension Notice' in x['data'] and len(x['data']['15. LD/MD Improvement and Suspension Notice']) > 0:
df2['15T LD/MD Improvement and Suspension Notice Total'] = sum(c.get('15.2 LD/MD Improvement and Suspension Notice No.', 0) for c in x['data']['15. LD/MD Improvement and Suspension Notice'] if c.get('15.2 LD/MD Improvement and Suspension Notice No.') is not None)
for c in x['data']['15. LD/MD Improvement and Suspension Notice']:
df2['15.1 LD/MD Improvement and Suspension Notice Cat' + '_' + c.get('15.1 LD/MD Improvement and Suspension Notice Cat', '')] = c.get('15.2 LD/MD Improvement and Suspension Notice No.', 0)
else:
df2['15T LD/MD Improvement and Suspension Notice Total'] = 0
if '16. Safety Convictions Records' in x['data'] and len(x['data']['16. Safety Convictions Records']) > 0:
df2['16T Safety Convictions Total'] = sum(c.get('16.2 Safety Convictions No.', 0) for c in x['data']['16. Safety Convictions Records'] if c.get('16.2 Safety Convictions No.') is not None)
for c in x['data']['16. Safety Convictions Records']:
df2['16.1 Safety Convictions Cat' + '_' + c.get('16.1 Safety Convictions Cat', '')] = c.get('16.2 Safety Convictions No.', 0)
else:
df2['16T Safety Convictions Total'] = 0
df2.rename(columns=Mapping, inplace=True)
df2.columns = df2.columns.str.replace(' ', '_').str.replace('.', '').str.replace('(', '_').str.replace(')', '').str.replace('%', 'percent').str.replace('/', '_')
df = df.append(df2)
df['year_month']=df['year_month'].apply(pd.to_datetime)
df.to_sql('icwp_safety_data_cv202303', con=conn, if_exists='replace', index= False)
# */2 * * * * Execute every two minute
with DAG(
dag_id="cv202303_icwp_safety_data",
schedule_interval="0 15 * * *",
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2023, 1, 17)
},
catchup=False) as f:
getMongoDB = PythonOperator(
task_id="getMongoDB",
python_callable=getMongoDB,
op_kwargs={"name": "Dylan"},
provide_context=True,
)
getDrowToken = PythonOperator(
task_id="getDrowToken",
python_callable=getDrowToken,
provide_context=True,
)
getDrowToken >> getMongoDB
|