try:
from airflow import DAG
from datetime import timedelta
from datetime import datetime
from airflow import settings
from airflow.models import Connection
from airflow.operators.python_operator import PythonOperator
except Exception as e:
print("Error {} ".format(e))
# Postgres RDS Connection
conns = [
{
"conn_id": "postgres_rds",
"conn_type": "postgres",
"host": "drowdatewarehouse.crlwwhgepgi7.ap-east-1.rds.amazonaws.com",
"schema": "drowDateWareHouse",
"login": "dRowAdmin",
"password": "drowsuper",
"port": 5432
}
]
def create_connections():
session = settings.Session()
# Check existing connections and add if not present
for conn in conns:
if not session.query(Connection).filter(Connection.conn_id == conn["conn_id"]).first():
new_conn = Connection(
conn_id=conn["conn_id"],
conn_type=conn["conn_type"],
host=conn["host"],
schema=conn["schema"],
login=conn["login"],
password=conn["password"],
port=conn["port"]
)
session.add(new_conn)
print(f"Connection {conn['conn_id']} added.")
else:
print(f"Connection {conn['conn_id']} already exists.")
session.commit()
# Execute the function once everyday
with DAG(
dag_id='manage_connections_dag',
schedule_interval='@daily',
default_args={
"owner": "airflow",
"retries": 1,
"retry_delay": timedelta(minutes=5),
"start_date": datetime(2023, 1, 17)
},
catchup=False) as f:
createConn = PythonOperator(
task_id='create_connections_task',
python_callable=create_connections,
op_kwargs={"name": "Dylan"},
)
createConn