DAG: cv202211_payment_excel ROOT: getDrowToken

schedule: 0 0,4,8,11,16 * * *


cv202211_payment_excel

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
try:

    from datetime import timedelta
    from airflow import DAG
    
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.http_operator import SimpleHttpOperator
    from datetime import datetime
    from pandas.io.json import json_normalize
    from airflow.operators.postgres_operator import PostgresOperator

    import pandas as pd
    import json
    import requests
    import numpy as np
    import re

    import psycopg2
    from sqlalchemy import create_engine
    # print("All Dag moudules are sucessfully imported")

except Exception as e:
    print("Error {} ".format(e))

dRoW_api_end_url = "https://drow.cloud"

def getDrowToken(**context):
    response = requests.post(
        url=f"{dRoW_api_end_url}/api/auth/authenticate",
        data={
            "username": "dylanlam@drow.cloud",
            "password": "dGVzdDAxQHRlc3QuY29t"
        }
    ).json()
    context["ti"].xcom_push(key="token", value=response['token'])
    # return 'DLLM{}'.format(response)

def getMongoDB(**context):
    token = context.get("ti").xcom_pull(key="token")
    response = requests.get(
        url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/66ff882a373b4986e8a2f09b?export_type=0",
        headers={
        "x-access-token": f"Bearer {token}",
        }
    )
    Data = json.loads(response.text)

    # Mapping= {
    #         "Original Doc No.": "Original_Doc_No",
    #         "NEC Doc Type": "NEC_Doc_Type",
    #         "NEC Event No.": "NEC_Event_No",
    #         "Doc Ver.": "Doc_Ver",
    #         "Doc Date": "Doc_Date",
    #         "Subject": "Subject",
    #         "From": "From",
    #         "To": "To",
    #         "CE Amount": "CE_PMI_Amount",
    #         "CE Increase / Decrease": "CE_Increase_Decrease",
    #         "Quotation Status": "Quotation_Status",
    #         "NEC Clause": "NEC_Clause",
    #         "Receive Date": "Receive_Date"
    # }
    host                  = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'  
    # User name of the database server
    dbUserName            = 'dRowAdmin'  
    # Password for the database user
    dbUserPassword        = 'drowsuper'  
    # Name of the database 
    database              = 'drowDateWareHouse'
    # Character set
    charSet               = "utf8mb4"  
    port                  = "5432"

    #create_engine('mysql+mysqldb://root:password@localhost:3306/mydbname', echo = False)
    conn_string = ('postgres://' +
                           dbUserName + ':' + 
                           dbUserPassword +
                           '@' + host + ':' + port +
                           '/' + database)
    db = create_engine(conn_string)
    conn = db.connect()

    df = pd.DataFrame()
    with conn as conn:
        for x in Data:
            try:
                if len(x['data'].keys()) == 0:
                    continue
                df_nested_list = json_normalize(x['data'])
                df = df.append(df_nested_list)
            except Exception as e:
                print(f"Error processing data: {e}")  # Add logging
            continue

        df.columns = df.columns.str.replace(' ', '_').str.replace('.', '').str.replace('(', '_').str.replace(')', '').str.replace('%', 'percent').str.replace('/', '_')
        df.drop(['Payment_Summary', "Contractor's_Application", "PM's_Assessment"], axis=1, inplace=True)
        df.to_sql('cv202211_payment_excel', con=conn, if_exists='replace', index= False)

# */2 * * * * Execute every two minute 
with DAG(
        dag_id="cv202211_payment_excel",
        schedule_interval="0 0,4,8,11,16 * * *",
        default_args={
            "owner": "airflow",
            "retries": 1,
            "retry_delay": timedelta(minutes=5),
            "start_date": datetime(2022, 10, 24)
        },
        catchup=False) as f:
    
    getDataAndSendToPSQL = PythonOperator(
        task_id="getDataAndSendToPSQL",
        python_callable=getMongoDB,
        op_kwargs={"name": "Dylan"},
        provide_context=True,
    )

    getDrowToken = PythonOperator(
        task_id="getDrowToken",
        python_callable=getDrowToken,
        provide_context=True,
        # op_kwargs={"name": "Dylan"}
    )

getDrowToken >> getDataAndSendToPSQL