- Continuing with the set up Next is to start the scheduler. “The Airflow scheduler monitors all tasks and DAGs.Behind the scenes, it spins up a subprocess, which monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) collects DAG parsing results and inspects active tasks to see whether they can be triggered.
- It’s a DAG definition file One thing to wrap your head around (it may not be very intuitive for everyone at first) is that this Airflow Python script is really just a configuration file specifying the DAG’s structure as code. The actual tasks defined here will run in a different context from the context of this script.
- Aug 18, 2018 DAGs In Airflow, a DAG– or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. For example, a simple DAG could consist of three tasks: A, B, and C. It could say that A has to run successfully before B can run, but C can run anytime.
Update your local configuration¶
Aug 17, 2020 Airflow comes with a very mature and stable scheduler that is responsible for parsing DAGs at regular intervals and updating the changes if any to the database. The scheduler keeps polling for tasks that are ready to run (dependencies have met and scheduling is possible) and queues them to the executor.
Open your airflow configuration file
~/airflow/airflow.cf
and make the following changes:Here we are replacing the default executor (
SequentialExecutor
) with the CeleryExecutor
so that we can run multiple DAGs in parallel.We also replace the default sqlite
database with our newly created airflow
database.Now we can initialize the database:
Let’s now start the web server locally:
Lucky live casino. we can head over to http://localhost:8080 now and you will see that there are a number of examples DAGS already there.
? Take some time to familiarise with the UI and get your local instance set up
Now let’s have a look at the connections (http://localhost:8080/admin/connection/) go to
admin>connections
. You should be able to see a number of connections available. For this tutorial, we will use some of the connections including mysql
.Commands¶
Let us go over some of the commands. Back on your command line:
we can list the DAG tasks in a tree view
we can tests the dags too, but we will need to set a date parameter so that this executes:
(note that you cannot use a future date or you will get an error)
By using the test commands these are not saved in the database.
Now let’s start the scheduler: Populous the beginning mac.
Behind the scenes, it monitors and stays in sync with a folder for all DAG objects it contains. The Airflow scheduler is designed to run as a service in an Airflow production environment.
Now with the schedule up and running we can trigger an instance:
This will be stored in the database and you can see the change of the status change straight away.
What would happen for example if we wanted to run or trigger the
tutorial
task? ?Let’s try from the CLI and see what happens.
In one of my previous posts, I described orchestration and coordination in the data context. At the end I promised to provide some code proofs to the theory and architecture described there. And that moment of truth is just coming.
The post is composed of 3 parts. The first describes the external trigger feature in Apache Airflow. The second one provides a code that will trigger the jobs based on a queue external to the orchestration framework. The final part shows assembled code.
External trigger
Apache Airflow DAG can be triggered at regular interval, with a classical CRON expression. But it can also be executed only on demand. In order to enable this feature, you must set the trigger property of your DAG to None. You can find an example in the following snippet that I will use later in the demo code:
To trigger a DAG without schedule you have different choices. Either you can use the CLI 'airflow trigger command or call the experimental API endpoint. For the sake of simplicity, in this introductory section I used the first version and its output looks like:
If you want a more programmatical way, you can also use trigger_dag method from airflow.api.common.experimental.trigger_dag.trigger_dag. Please notice however that as of this writing, this method is exposed in an experimental package and you should think twice before using it in your production code. Of course, do not forget to activate the DAG. If you keep it in pause, it won't trigger even if you say it to do so.
Triggering DAG
In Data pipelines: orchestration, choreography or both? I gave you an example of AWS Lambda triggering Airflow DAGs. Since I always try to keep the examples replayables at home, I will simulate here the event-driven behavior of AWS architecture with RabbitMQ and a static data producer.
But before I show you this simulation code, let's talk a little about 3 different external triggering methods in Apache Airflow:
- using HTTP API endpoint - it's good thanks to its universality. HTTP protocol is widely implemented in a lot of SDKs so making such calls shouldn't be a big deal. So you can even do multi-cloud projects with a central orchestration point deployed in one of the cloud providers. Another benefit of this approach is scalability.
You can then have as many producers as you want - you only need to ensure that your Apache Airflow instance won't get stuck because of too many calls of that type. Also, you should take special care for monitoring. Now you have 1 + x points to monitor where x is the number of producers. Another point to consider is security. After all, you're exposing your orchestration layer to the world - even if you limit the callability to your VPC, you will still have a risk that one of the producers makes some damages. - using a long-running local process calling Airflow CLI - long time I thought using it for this demo post but I started to look how to monitor and ensure the longevity of this process and found that it was not an easy task. It's a good candidate for testing though since you simply need to continuously poll your source and execute DAGs accordingly to received events.
- using TriggerDagRunOperator - when I found trigger_dag method described above, I immediately checked its users. One of them was TriggerDagRunOperator which according to the documentation, is responsible for 'Triggers a DAG run for a specified ``dag_id``' when a specific condition is met'. And it fits perfectly for what I'm trying to do in this post. The single problem is that the check condition will be executed in CRON manner and not right after a triggering event. So unlike 2 previously discussed solutions, this one is less reactive. The second drawback is scalability. Since the operator returns the single DAG to run, you will need to poll 1 event at a time. That's why if latency is your high priority, you should think about other alternatives. Also, if you have a single message queue with DAGs to execute, you will end up with quite complicated DAG with probably a lot of branches.
On the other side, this solution centralizes all your orchestration logic in a single place. I want to say that you're using the same abstraction of DAGs everywhere, so it can be much simpler to understand by newcomers in your team. Another good thing is that you have 1 place to monitor.
In the following sections I will present 2 approaches of triggering a DAG externally in Apache Airflow. The first will use TriggerDagRunOperator. I think that it may fit to several high latency (= few external triggers) cases where even regenerating the triggers should be simple. In another part I will show you an example with HTTP call. I will focus there particularly on the reprocessing part and show you that even if you reprocess externally triggered DAGs, you will still keep the same input parameters.
TriggerDagRunOperator example
To demonstrate the use of TriggerDagRunOperator I will use these 2 Docker images:
To push the messages to RabbitMQ I will use Pika library and the following code:
The code responsible for routing the messages and triggering appropriated DAGs looks like:
In the following image you can see how the routing DAG behaved after executing the code:
It works but as you can imagine, the frequency of publishing messages is much higher than consuming them. Hence, if you want to trigger the DAG in the response of the given event as soon as it happens, you may be a little bit deceived. That's why I will also try the solution with an external API call.
Aside from the scalability, there are some logical problems with this solution. First, our 'router' DAG is not idempotent - the input always changes because of non-deterministic character of RabbitMQ queue. So, if you have some problems in your logic and restart the pipeline, you won't see already processed messages again - unless you will never retry the router tasks and only reprocess triggered DAGs which in this context could be an acceptable trade-off.
![Dag Dag](https://image.slidesharecdn.com/fyber-airflowbestpracticeinproduction-190715105541/95/fyber-airflow-best-practices-in-production-13-638.jpg?cb=1563262562)
Another point to analyze related to replayability concerns externally triggered DAGs. Since they receive the parameters from an external source, will they keep the same parameters when they will be reprocessed? To check that, I cleaned the state of one of the executions of hello_world_a. The input parameters survived the retry, as shown in the following images:
So to sum-up, a good point here is that there is still a single place to monitor. On the other side this approach doesn't do what Apache Airflow is supposed to do since it simulates a streaming processing. Let's then check another solution, the one with external API.
External API call
As I explained before, the good point of an external API call is that it scales pretty well. If you have plenty of S3 objects created that must trigger different DAGs, you won't wait 1 minute before triggering every execution for them. On the other side, this distributed (coordinated) approach will require a little bit more monitoring effort because of multiple potential sources of failure.
![Airflow Dag Airflow Dag](https://kubernetes.io/images/blog/2018-05-25-Airflow-Kubernetes-Operator/2018-05-25-airflow.png)
To test external triggering you can make simple curl calls:
As you can see, all DAGs were correctly triggered - of course, I omit the authentication only for the sake of simplicity. Since these DAGs are triggered externally, I must also check whether the configuration passed as 'task_payload' parameter is kept between retried executions. In the following video you can see that it's the case:
Airflow Dag Picture
External triggers are a good way to simulate hybrid orchestration solution using external coordination system. They scale pretty good, help to write the processing logic more responsible than static CRON expressions. But there are not only benefits. As shown in this post, you will either have multiple extra places to monitor, gain some latency or longevity issues to overcome.