1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122 | try:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime
from pandas.io.json import json_normalize
from airflow.operators.postgres_operator import PostgresOperator
from pandas.tseries.offsets import CustomBusinessDay
import pandas as pd
import json
import requests
import numpy as np
import psycopg2
from sqlalchemy import create_engine
except Exception as e:
print("Error {} ".format(e))
dRoW_api_end_url = "https://drow.cloud"
def getDrowToken(**context):
response = requests.post(
url=f"{dRoW_api_end_url}/api/auth/authenticate",
data={
"username": "keexiansuen@drow.cloud",
"password": "c3UxMTk5a3ghIQ==",
}
).json()
context["ti"].xcom_push(key="token", value=response['token'])
def getdrowPSQLConnectionString():
host = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'
# User name of the database server
dbUserName = 'dRowAdmin'
# Password for the database user
dbUserPassword = 'drowsuper'
# Name of the database
database = 'drowDateWareHouse'
# Character set
charSet = "utf8mb4"
port = "5432"
conn_string = ('postgres://' +
dbUserName + ':' +
dbUserPassword +
'@' + host + ':' + port +
'/' + database)
return conn_string
def getMongoDB(**context):
token = context.get("ti").xcom_pull(key="token")
response = requests.get(
url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/6780da3a19a6ea5c197f5e8a?export_type=0",
headers={
"x-access-token": f"Bearer {token}",
"ICWPxAccessKey": "5WSD21ICWP_[1AG:4UdI){n=b~"
}
)
RISC_Data = json.loads(response.text)
conn_string = getdrowPSQLConnectionString()
db = create_engine(conn_string)
conn = db.connect()
with conn as conn:
df = pd.DataFrame()
for x in RISC_Data:
df_nested_list = json_normalize(x['data'])
for sequence in df_nested_list['ITP Sequence (Hold Points Only)'][0]:
df2 = pd.DataFrame([sequence])
df2['Type'] = df_nested_list['Type']
df2['Method Statement Submission No.'] = df_nested_list['Method Statement Submission No.']
df2['ITP Type Identifier'] = df_nested_list['ITP Type Identifier']
df2['Labour Type'] = df_nested_list['Labour Type']
df2['Description'] = df_nested_list['Description']
df = df.append(df2)
df.columns = df.columns.str.replace(' ', '_').str.replace('.', '', regex=False).str.replace('(', '_', regex=False).str.replace(')', '', regex=False).str.replace('%', 'percent', regex=False).str.replace('/', '_', regex=False)
df.to_sql('itp_hy202308', con=conn, if_exists='replace', index= False)
conn.close()
# */2 * * * * Execute every two minute
with DAG(
dag_id="hy202308_itp",
schedule_interval="0 15 * * *",
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2023, 1, 17)
},
catchup=False) as f:
getMongoDB = PythonOperator(
task_id="getMongoDB",
python_callable=getMongoDB,
op_kwargs={"name": "Dylan"},
provide_context=True,
)
getDrowToken = PythonOperator(
task_id="getDrowToken",
python_callable=getDrowToken,
provide_context=True,
# op_kwargs={"name": "Dylan"}
)
getDrowToken >> getMongoDB
|