I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.
I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'
This is what my pipeline looks like(very simplified version):
from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime
datetime_obj = datetime.datetime
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=5),
}
current_datetime = datetime_obj.now(tz=tz.tzlocal())
dag = DAG(
'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))
curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'
t1 = BashOperator(
task_id='rest-api-1',
bash_command=curl_cmd,
dag=dag)
If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal())
Instead what I want here is 'execution_date'
How do I use 'execution_date' directly and assign it to a variable in my python file?
I have having this general issue of accessing args. Any help will be genuinely appreciated.
Thanks
The BashOperator
's bash_command
argument is a template. You can access execution_date
in any template as a datetime
object using the execution_date
variable. In the template, you can use any jinja2
methods to manipulate it.
Using the following as your BashOperator
bash_command
string:
# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}
# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}
If you just want the string equivalent of the execution date, ds
will return a datestamp (YYYY-MM-DD), ds_nodash
returns same without dashes (YYYYMMDD), etc. More on macros
is available in the Api Docs.
Your final operator would look like:
command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)