DAG: hy202308_itp

schedule: 0 15 * * *


hy202308_itp

Toggle wrap
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
try:

    from datetime import timedelta
    from airflow import DAG
    
    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.http_operator import SimpleHttpOperator
    from datetime import datetime
    from pandas.io.json import json_normalize
    from airflow.operators.postgres_operator import PostgresOperator
    from pandas.tseries.offsets import CustomBusinessDay

    import pandas as pd
    import json
    import requests
    import numpy as np

    import psycopg2
    from sqlalchemy import create_engine

except Exception as e:
    print("Error {} ".format(e))

dRoW_api_end_url = "https://drow.cloud"

def getDrowToken(**context):
    response = requests.post(
    url=f"{dRoW_api_end_url}/api/auth/authenticate",
    data={
        "username": "keexiansuen@drow.cloud",
        "password": "c3UxMTk5a3ghIQ==",
    }
    ).json()
    context["ti"].xcom_push(key="token", value=response['token'])


def getdrowPSQLConnectionString():
    host                  = 'drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com'  

    # User name of the database server
    dbUserName            = 'dRowAdmin'  

    # Password for the database user
    dbUserPassword        = 'drowsuper'  

    # Name of the database 
    database              = 'drowDateWareHouse'

    # Character set
    charSet               = "utf8mb4"  

    port                  = "5432"

    conn_string = ('postgres://' +
                           dbUserName + ':' + 
                           dbUserPassword +
                           '@' + host + ':' + port +
                           '/' + database)
    return conn_string

def getMongoDB(**context):
    token = context.get("ti").xcom_pull(key="token")
    response = requests.get(
    url=f"{dRoW_api_end_url}/api/module/document-export/airflow/workflow/6780da3a19a6ea5c197f5e8a?export_type=0",
    headers={
            "x-access-token": f"Bearer {token}",
            "ICWPxAccessKey": "5WSD21ICWP_[1AG:4UdI){n=b~"
        }
    )

    RISC_Data = json.loads(response.text)
    conn_string = getdrowPSQLConnectionString()
    db = create_engine(conn_string)
    conn = db.connect()

    with conn as conn:
        df = pd.DataFrame()
        for x in RISC_Data:
            df_nested_list = json_normalize(x['data'])

            for sequence in df_nested_list['ITP Sequence (Hold Points Only)'][0]:
                df2 = pd.DataFrame([sequence])
                df2['Type'] = df_nested_list['Type']
                df2['Method Statement Submission No.'] = df_nested_list['Method Statement Submission No.']
                df2['ITP Type Identifier'] = df_nested_list['ITP Type Identifier']
                df2['Labour Type'] = df_nested_list['Labour Type']
                df2['Description'] = df_nested_list['Description']

                df = df.append(df2)

        df.columns = df.columns.str.replace(' ', '_').str.replace('.', '', regex=False).str.replace('(', '_', regex=False).str.replace(')', '', regex=False).str.replace('%', 'percent', regex=False).str.replace('/', '_', regex=False)
        df.to_sql('itp_hy202308', con=conn, if_exists='replace', index= False)
        conn.close()

# */2 * * * * Execute every two minute 
with DAG(
        dag_id="hy202308_itp",
        schedule_interval="0 15 * * *",
        default_args={
            "owner": "airflow",
            "retries": 1,
            "retry_delay": timedelta(minutes=5),
            "start_date": datetime(2023, 1, 17)
        },
        catchup=False) as f:
    
    getMongoDB = PythonOperator(
        task_id="getMongoDB",
        python_callable=getMongoDB,
        op_kwargs={"name": "Dylan"},
        provide_context=True,
    )

    getDrowToken = PythonOperator(
        task_id="getDrowToken",
        python_callable=getDrowToken,
        provide_context=True,
        # op_kwargs={"name": "Dylan"}
    )

getDrowToken >> getMongoDB