Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework Airflow page to include guidance on Kubernetes without kedro-airflow-k8s #4499

Open
astrojuanlu opened this issue Feb 19, 2025 · 7 comments · May be fixed by #4529
Open

Rework Airflow page to include guidance on Kubernetes without kedro-airflow-k8s #4499

astrojuanlu opened this issue Feb 19, 2025 · 7 comments · May be fixed by #4529
Assignees
Labels
Component: Documentation 📄 Issue/PR for markdown and API documentation

Comments

@astrojuanlu
Copy link
Member

Description

The maintainers have mentioned that they'd rather move users towards kedro-airflow

Documentation page (if applicable)

Context

@astrojuanlu astrojuanlu added the Component: Documentation 📄 Issue/PR for markdown and API documentation label Feb 19, 2025
@DimedS
Copy link
Member

DimedS commented Feb 19, 2025

I think it makes sense to guide our users on how to run Kedro projects on Airflow with execution in separate containers on Kubernetes. Currently, the only guide available was the now-deprecated kedro-airflow-k8s plugin.

It seems worthwhile to do a small spike to explore the best approach. We might need to modify the kedro-airflow plugin or, at the very least, update the documentation to explain how to use kedro-airflow with kedro-docker and how to adjust the DAG to work with Kubernetes.

This will become even more important when we implement node grouping in Airflow (issue #962), as it will make running these groups in separate environments more convenient.

@astrojuanlu
Copy link
Member Author

It's a good point. I'll reframe the scope of this issue.

@astrojuanlu astrojuanlu changed the title Remove mention of kedro-airflow-k8s or mark it as deprecated Rework Airflow page to include guidance on Kubernetes without kedro-airflow-k8s Feb 20, 2025
@DimedS
Copy link
Member

DimedS commented Feb 24, 2025

I successfully executed the spaceflights-pandas starter project on Google Cloud Composer (Airflow on Kubernetes) with minor modifications to the DAG generated by kedro-airflow. Additionally, I used the Dockerfile generated by kedro-docker.

Original Kedro-Airflow DAG

The original Airflow DAG generated using kedro airflow create looked like this:

from __future__ import annotations

from datetime import datetime, timedelta
from pathlib import Path

from airflow import DAG
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from kedro.framework.session import KedroSession
from kedro.framework.project import configure_project


class KedroOperator(BaseOperator):
    @apply_defaults
    def __init__(
        self,
        package_name: str,
        pipeline_name: str,
        node_name: str | list[str],
        project_path: str | Path,
        env: str,
        conf_source: str,
        *args, **kwargs
    ) -> None:
        super().__init__(*args, **kwargs)
        self.package_name = package_name
        self.pipeline_name = pipeline_name
        self.node_name = node_name
        self.project_path = project_path
        self.env = env
        self.conf_source = conf_source

    def execute(self, context):
        configure_project(self.package_name)
        with KedroSession.create(self.project_path, env=self.env, conf_source=self.conf_source) as session:
            if isinstance(self.node_name, str):
                self.node_name = [self.node_name]
            session.run(self.pipeline_name, node_names=self.node_name)

# Kedro settings required to run your pipeline
env = "local"
pipeline_name = "__default__"
project_path = Path.cwd()
package_name = "sf_pandas"
conf_source = "" or Path.cwd() / "conf"


# Using a DAG context manager, you don't have to specify the dag property of each task
with DAG(
    dag_id="sf-pandas",
    start_date=datetime(2023,1,1),
    max_active_runs=3,
    # https://airflow.apache.org/docs/stable/scheduler.html#dag-runs
    schedule_interval="@once",
    catchup=False,
    # Default settings applied to all tasks
    default_args=dict(
        owner="airflow",
        depends_on_past=False,
        email_on_failure=False,
        email_on_retry=False,
        retries=1,
        retry_delay=timedelta(minutes=5)
    )
) as dag:
    tasks = {
        "preprocess-companies-node": KedroOperator(
            task_id="preprocess-companies-node",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="preprocess_companies_node",
            project_path=project_path,
            env=env,
            conf_source=conf_source,
        ),
        "preprocess-shuttles-node": KedroOperator(
            task_id="preprocess-shuttles-node",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="preprocess_shuttles_node",
            project_path=project_path,
            env=env,
            conf_source=conf_source,
        ),
        "create-model-input-table-node": KedroOperator(
            task_id="create-model-input-table-node",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="create_model_input_table_node",
            project_path=project_path,
            env=env,
            conf_source=conf_source,
        ),
        "split-data-node": KedroOperator(
            task_id="split-data-node",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="split_data_node",
            project_path=project_path,
            env=env,
            conf_source=conf_source,
        ),
        "train-model-node": KedroOperator(
            task_id="train-model-node",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="train_model_node",
            project_path=project_path,
            env=env,
            conf_source=conf_source,
        ),
        "evaluate-model-node": KedroOperator(
            task_id="evaluate-model-node",
            package_name=package_name,
            pipeline_name=pipeline_name,
            node_name="evaluate_model_node",
            project_path=project_path,
            env=env,
            conf_source=conf_source,
        ),
    }

    tasks["preprocess-companies-node"] >> tasks["create-model-input-table-node"]
    tasks["preprocess-shuttles-node"] >> tasks["create-model-input-table-node"]
    tasks["create-model-input-table-node"] >> tasks["split-data-node"]
    tasks["split-data-node"] >> tasks["evaluate-model-node"]
    tasks["split-data-node"] >> tasks["train-model-node"]
    tasks["train-model-node"] >> tasks["evaluate-model-node"]

Improvements & Proposed Changes to kedro-airflow

I made two key modifications that I suggest incorporating into the kedro-airflow plugin:

  1. Simplified task creation

    • The original DAG redundantly specified the same parameters for each task. I refactored it so that tasks are created dynamically with a single line function call.
  2. Replaced KedroOperator with KubernetesPodOperator

    • Instead of executing the installed Kedro package, I ran each node inside a dedicated Docker container using KubernetesPodOperator.
    • This approach enables isolated execution of nodes within separate containers, improving scalability and reproducibility.
    • I propose adding an option in kedro-airflow to generate DAGs using KubernetesPodOperator when requested by the user.

Optimized DAG with KubernetesPodOperator

The modified DAG is significantly shorter and more maintainable:

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
from airflow.utils.dates import days_ago

# GCP & Kubernetes settings
GCP_PROJECT_ID = "my_project_id"
GCP_REGION = "europe-west4"
NAMESPACE = "composer"  # Cloud Composer namespace

# Docker image for Kedro execution
DOCKER_IMAGE = "europe-west4-docker.pkg.dev/my_docker_name:latest"

# Kedro settings
ENV = "base"
PIPELINE_NAME = "__default__"

# Default Airflow arguments
default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

# Define DAG
with DAG(
    dag_id="sf-pandas-cloud-composer",
    start_date=days_ago(1),
    schedule_interval="@once",
    catchup=False,
    default_args=default_args,
) as dag:

    def create_kedro_task(node_name: str):
        return KubernetesPodOperator(
            task_id=node_name,
            name=node_name,
            namespace=NAMESPACE,
            image=DOCKER_IMAGE,
            cmds=["kedro"],
            arguments=["run", f"--pipeline={PIPELINE_NAME}", f"--nodes={node_name}"],
            get_logs=True,
            is_delete_operator_pod=True,  # Cleanup after execution
            in_cluster=False,  # Cloud Composer runs inside GKE
            do_xcom_push=False,
            image_pull_policy="Always",
        )

    # Define tasks dynamically
    preprocess_companies = create_kedro_task("preprocess_companies_node")
    preprocess_shuttles = create_kedro_task("preprocess_shuttles_node")
    create_model_input_table = create_kedro_task("create_model_input_table_node")
    split_data = create_kedro_task("split_data_node")
    train_model = create_kedro_task("train_model_node")
    evaluate_model = create_kedro_task("evaluate_model_node")

    # Define task dependencies
    preprocess_companies >> create_model_input_table
    preprocess_shuttles >> create_model_input_table
    create_model_input_table >> split_data
    split_data >> [train_model, evaluate_model]
    train_model >> evaluate_model

what do you think @astrojuanlu , @ankatiyar ?

@astrojuanlu
Copy link
Member Author

If I understand correctly, the implicit grouping strategy here is by pipeline, correct @DimedS ? This is consistent with what users have been telling us.

For the record, I agree with everything you propose 👍🏼

@ankatiyar
Copy link
Contributor

This is cool @DimedS :D
Some questions:

  • Should we have both options? Something like --operator=kedro/kubernetes?
  • Should the Dockerfile be generated by kedro-airflow too to simplify the workflow or should we document the use of Kedro + kedro-docker + kedro+airflow?

@DimedS
Copy link
Member

DimedS commented Feb 26, 2025

thanks, @astrojuanlu ! I didn't modify the grouping strategies in this PR; that should be addressed in future PRs

Thanks, @ankatiyar !

Something like --operator=kedro/kubernetes

I think that sounds great! Kedro should remain the default option, as it is now.

Should the Dockerfile be generated by kedro-airflow too to simplify the workflow or should we document the use of Kedro + kedro-docker + kedro+airflow?

Based on my experience with deployment plugins, it's best to keep them simple. Instead of adding complexity, I think it's better to document the process, explaining that users can easily generate a Docker image for their project using kedro-docker.

@DimedS
Copy link
Member

DimedS commented Feb 27, 2025

I created PR #4529 to close the current issue and update the documentation on manually modifying DAGs to use KubernetesPodOperator, while also removing references to the outdated kedro-airflow-k8s plugin.

As a follow-up, I opened issue #1025 to enhance the kedro-airflow plugin by adding an option for selecting the operator and simplifying the DAG structure, as discussed above.

If we have the resources, we can start working on this after completing the current sprint changes for the kedro-airflow plugin in issue #962.

@DimedS DimedS moved this to In Review in Kedro 🔶 Feb 27, 2025
@DimedS DimedS self-assigned this Feb 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Component: Documentation 📄 Issue/PR for markdown and API documentation
Projects
Status: In Review
Development

Successfully merging a pull request may close this issue.

3 participants