DAG: manage_connections_dag

schedule: @daily


manage_connections_dag

Toggle wrap
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
try:
    from airflow import DAG
    from datetime import timedelta
    from datetime import datetime

    from airflow import settings
    from airflow.models import Connection
    from airflow.operators.python_operator import PythonOperator
except Exception as e:
    print("Error {} ".format(e))

# Postgres RDS Connection
conns = [
    {
        "conn_id": "postgres_rds",
        "conn_type": "postgres",
        "host": "drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com",
        "schema": "drowDateWareHouse",
        "login": "dRowAdmin",
        "password": "drowsuper",
        "port": 5432
    }
]

def create_connections():
    session = settings.Session()
    # Check existing connections and add if not present
    for conn in conns:
        if not session.query(Connection).filter(Connection.conn_id == conn["conn_id"]).first():
            new_conn = Connection(
                conn_id=conn["conn_id"],
                conn_type=conn["conn_type"],
                host=conn["host"],
                schema=conn["schema"],
                login=conn["login"],
                password=conn["password"],
                port=conn["port"]
            )
            session.add(new_conn)
            print(f"Connection {conn['conn_id']} added.")
        else:
            print(f"Connection {conn['conn_id']} already exists.")
    session.commit()

# Execute the function once everyday
with DAG(
    dag_id='manage_connections_dag', 
    schedule_interval='@daily',
    default_args={
        "owner": "airflow",
        "retries": 1,
        "retry_delay": timedelta(minutes=5),
        "start_date": datetime(2023, 1, 17)
    },
    catchup=False) as f:

    createConn = PythonOperator(
        task_id='create_connections_task',
        python_callable=create_connections,
        op_kwargs={"name": "Dylan"},
    )

createConn