Ayesha shirur

Generation

typingsWed, 18 Jan 2023

#fetches task_id of the object and loads the object to Snowflake table def sql_call_proc(**kwargs): global obj_key sf_curr = get_sf_conn(config_file) print('******kwargs******') print(kwargs) object_key = kwargs['dag_run'].conf.get('obj_key') bucket = kwargs['dag_run'].conf.get('bucket') dataset_mapping = {'epsilon/pub_click_detail_load':'SF_Ingest_pub_click_detail_load' 'epsilon/pub_click_detail_load_missing_extra':'SF_Ingest_pub_click_detail_load_missing_extra' 'epsilon/pub_common_link_framework_load':'SF_Ingest_pub_common_link_framework_load' 'adobe_campaign/pub_marketing_mrw_stg_click_detail_harmony_load':'SF_Ingest_pub_marketing_mrw_stg_click_detail_harmony_load'} print('****Object key to be loaded****' object_key) if object_key is not None: object_name = object_key.replace('run_id%3D' 'run_id=') print('***Object key***' object_name) obj_path = bucket+'/'+object_name object_path = 's3://'+obj_path.replace('_SUCCESS' '') print('======object_path======' object_path) batch_id = object_path.split('/')[5].split('=')[1] print('======batch_id=======' batch_id) #dataset = object_name.rsplit('/' 2)[0] dataset = "/".join(object_name.split("/")[0:2]) prefix = "/".join(object_name.split("/")[0:3]) print('****Dataset****' dataset) print('**Prefix****' prefix) ctrl_query = "select TASK_ID from UTIL.IT.DATA_INGESTION_CTRL where STAGE_FULLPATH_TXT='"+dataset+"/' and GROUP_ID='mdp-dppe-mkteml-snowflake-ingestion';" print('****Control table query to be executed******') print(ctrl_query) res = execute_query(sf_curr ctrl_query) if len(res)>0: task_id = res[0][0] print('***Task Id for the dataset***' task_id) sp_query = "CALL UTIL.IT.SP_MDP_MKT_EMAIL_DATA_INGESTION('"+dataset_mapping[dataset]+"' '"+batch_id+"' '"+task_id+"' '"+object_path+"')" print('****Stored procedure query****') print(sp_query) sp_res = execute_query(sf_curr sp_query) print('****Result****') for row in sp_res: for ele in row: print(ele) if ele.find('SP-FAILED')==-1: print('Data Ingestion Success..!!') else: raise AirflowException("Data Ingestion Failure!! Please check logs of the Stored Procedure for details.") return sp_res else: raise AirflowException("Data Ingestion Failure!! Invalid object placed for dataset ingestion.") else: raise AirflowException("Data Ingestion Failure!! Object not found for Data Ingestion.") #Stored procedure to load data t1 = PythonOperator( task_id='mdp-dppe-mkteml-data-load' python_callable = sql_call_proc provide_context=True xcom_push=True dag=dag ) t1

### Example Airflow DAG

Questions about programming?Chat with your personal AI assistant