1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126 | try:
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime
from pandas.io.json import json_normalize
from airflow.operators.postgres_operator import PostgresOperator
import pandas as pd
import json
import requests
import numpy as np
import re
import psycopg2
from sqlalchemy import create_engine
# print("All Dag moudules are sucessfully imported")
except Exception as e:
print("Error {} ".format(e))
dRoW_api_end_url = "https://drow.cloud"
def getDrowToken(**context):
response = requests.post(
url=f"{dRoW_api_end_url}/api/auth/authenticate",
data={
"username": "dylanlam@drow.cloud",
"password": "dGVzdDAxQHRlc3QuY29t"
}
).json()
context["ti"].xcom_push(key="token", value=response['token'])
# return 'DLLM{}'.format(response)
def getMongoDB(**context):
token = context.get("ti").xcom_pull(key="token")
response = requests.get(
url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/66ff882a373b4986e8a2f09b?export_type=0",
headers={
"x-access-token": f"Bearer {token}",
}
)
Data = json.loads(response.text)
# Mapping= {
# "Original Doc No.": "Original_Doc_No",
# "NEC Doc Type": "NEC_Doc_Type",
# "NEC Event No.": "NEC_Event_No",
# "Doc Ver.": "Doc_Ver",
# "Doc Date": "Doc_Date",
# "Subject": "Subject",
# "From": "From",
# "To": "To",
# "CE Amount": "CE_PMI_Amount",
# "CE Increase / Decrease": "CE_Increase_Decrease",
# "Quotation Status": "Quotation_Status",
# "NEC Clause": "NEC_Clause",
# "Receive Date": "Receive_Date"
# }
host = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'
# User name of the database server
dbUserName = 'dRowAdmin'
# Password for the database user
dbUserPassword = 'drowsuper'
# Name of the database
database = 'drowDateWareHouse'
# Character set
charSet = "utf8mb4"
port = "5432"
#create_engine('mysql+mysqldb://root:password@localhost:3306/mydbname', echo = False)
conn_string = ('postgres://' +
dbUserName + ':' +
dbUserPassword +
'@' + host + ':' + port +
'/' + database)
db = create_engine(conn_string)
conn = db.connect()
df = pd.DataFrame()
with conn as conn:
for x in Data:
try:
if len(x['data'].keys()) == 0:
continue
df_nested_list = json_normalize(x['data'])
df = df.append(df_nested_list)
except Exception as e:
print(f"Error processing data: {e}") # Add logging
continue
df.columns = df.columns.str.replace(' ', '_').str.replace('.', '').str.replace('(', '_').str.replace(')', '').str.replace('%', 'percent').str.replace('/', '_')
df.drop(['Payment_Summary', "Contractor's_Application", "PM's_Assessment"], axis=1, inplace=True)
df.to_sql('cv202211_payment_excel', con=conn, if_exists='replace', index= False)
# */2 * * * * Execute every two minute
with DAG(
dag_id="cv202211_payment_excel",
schedule_interval="0 0,4,8,11,16 * * *",
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2022, 10, 24)
},
catchup=False) as f:
getDataAndSendToPSQL = PythonOperator(
task_id="getDataAndSendToPSQL",
python_callable=getMongoDB,
op_kwargs={"name": "Dylan"},
provide_context=True,
)
getDrowToken = PythonOperator(
task_id="getDrowToken",
python_callable=getDrowToken,
provide_context=True,
# op_kwargs={"name": "Dylan"}
)
getDrowToken >> getDataAndSendToPSQL
|