Posts Hướng dẫn Airflow HA
Post
Cancel

Hướng dẫn Airflow HA

Trong bài này mình sẽ hướng dẫn cài đặt Airflow và cài HA cho nó. môi trường sử dụng là máy ảo virtualbox

tạo 2 máy ảo với địa chỉ cố định, add user và cập quyền ssh từ host
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
VAGRANT_COMMAND = ARGV[0]

Vagrant.configure("2") do |config|

    if VAGRANT_COMMAND == "ssh"
      config.ssh.username = 'vagrant'
    end
    config.vm.box = "ubuntu/bionic64" # Chọn box bạn muốn sử dụng

    # Khởi tạo máy ảo thứ nhất
    config.vm.define "machine1" do |machine1|
    machine1.vm.network "private_network", ip: "192.168.56.2"
    machine1.vm.provider "virtualbox" do |vb|
          vb.memory = "2048" # 2GB RAM
          vb.cpus = 1       # 1 core CPU
        end

    machine1.vm.provision "shell", inline: <<-SHELL
          adduser airflow
          sudo su - airflow -c $'\
          whoami && \
          mkdir .ssh && \
          echo "ssh-rsa xxxx" > .ssh/authorized_keys && \
          chmod 700 .ssh && \
          chmod 600 .ssh/authorized_keys && \
          file_path=".ssh/authorized_keys" && \
          echo "cat file $file_path after make change" && \
          cat $file_path '
        SHELL
    end
end

Khái niệm

  • cho phép xây dựng flow các task
  • cung cấp ui thuận tiện cho việc theo dõi và quản lí tập trung các task
  • hạn chế: không sử dụng để truyền dữ liệu lớn giữa các task. không sử dụng với các task vô hạn (như streaming)

Các thành phần của airflow

scheduler

executor -> worker

executor

các loại executor

  • local: local executor, sequential executor
  • remote: celery executor, kubernetes executor

nên dùng celery executor vì nó có thể scale được số lượng các worker thông qua celery backend (rabbitMQ, redis). executor như kiểu một cách thức để task được giao từ scheduler tới worker (đứng giữa nơi lập lịch và nơi thực thi task)

ngoài ra: metadata db, dag dir, web server

Cài airflow

Cài airflow từ pip
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
sudo apt update
sudo apt install python3-pip -y

export AIRFLOW_HOME=~/airflow

AIRFLOW_VERSION=2.5.1
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

echo $CONSTRAINT_URL

python3 -m pip install --upgrade pip (khi lỗi setup tools)
pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

hoặc đơn giản hơn (xong lỗi nhiều :v)
python3.8 -m pip install --upgrade pip
python3.8 -m pip install apache-airflow==2.5.1

Khởi tạo db
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
python3 -m airflow db init 

# lỗi ModuleNotFoundError: No module named 'apt_pkg'
sudo apt-get install python-apt

# lúc này mới tạo ra cái thưu mục AIRFLOW_HOME
# mặc định sql_alchemy_conn = sqlite:////home/airflow/airflow2/airflow.db

# sửa db sang mysql rồi chạy lại db init
CREATE DATABASE airflow_db CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
CREATE USER 'airflow_user' IDENTIFIED BY 'airflow_pass';
GRANT ALL PRIVILEGES ON airflow_db.* TO 'airflow_user';
FLUSH PRIVILEGES;

vi airflow/airflow.cfg
mysql+mysqldb://airflow_db:airflow_pass@192.168.56.1:3306/airflow_db  # lỗi jh ấy
mysql+mysqlconnector://airflow_user:airflow_pass@192.168.56.1:3306/airflow_db
Chạy lên các thành phần webserver , scheduler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email spiderman@superhero.org

# sau đó nhập mật khẩu

airflow webserver --port 8080 [-D, -h]

airflow scheduler [-D, -h]

# kill daemon
kill $(ps -ef | grep "gunicorn" | awk '{print $2}')

Cài rabitmq
1
2
3
4
5
6
7
8
sudo apt update && sudo apt upgrade -y
sudo apt install rabbitmq-server -y
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
sudo systemctl status rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
sudo rabbitmqctl add_user airflow airflow
sudo rabbitmqctl set_user_tags airflow administrator
Đổi sang CeleryExecutor

tuy nhiên kiểu jh chạy cách kia cũng lỗi thôi The scheduler does not appear to be running. Last heartbeat was received 14 minutes ago.

đổi sang chuẩn executor khác xem sao

1
executor = CeleryExecutor

đổi celery broker + ressult backend

1
2
3
4
5
6
7
8
9
broker_url = redis://redis:6379/0
broker_url = amqp://airflow:airflow@192.168.56.10:5672/
result_backend = db+mysql://airflow_user:airflow_pass@192.168.56.1:3306/airflow_db

# set mysql+mysqlconnector lỗi
# cài thêm celery nếu chưa có 
pip install 'apache-airflow[celery]'


Chạy flower để view job

Monitoring Tasks

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# flower
# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it ``airflow celery flower``. This defines the IP that Celery Flower runs on
flower_host = 0.0.0.0

# The root URL for Flower
# Example: flower_url_prefix = /flower
flower_url_prefix = /flower

# This defines the port that Celery Flower runs on
flower_port = 5556

# lỗi amqp.exceptions.NotAllowed: Connection.open: (530) NOT_ALLOWED - access to vhost '/' refused for user 'airflow'

sudo rabbitmqctl list_permissions -p / 
sudo rabbitmqctl set_permissions -p "/" "airflow" ".*" ".*" ".*"

lỗi mấy cái linh ta linh tinh vì chưa cài cái

1
2
3
sudo apt-get install python3.8-dev
# Duy bảo mỗi lần cài phải đi theo bộ python3-pip python3.10-dev python3.10-venv

job khi chạy sẽ được đẩy vào queue

trên ui của airflow sẽ thấy task bị queue mà k chạy

tuy nhiên bên flower chưa hiện thị task do mình chưa bật worker

Chạy worker
1
2
3
4
airflow celery worker

# chú ý: worker có thể được gắn với 1 queue
airflow celery worker -q spark,quark

Mô hình HA

Web server. chạy trên con thứ 2 với config i hệt: vẫn lên được, chứng tỏ cứ chạy 2 con cùng lúc ha ip là dc. chứng tỏ webserver chỉ cần db là được ? (đồng bộ qua db)

Scheduler. chạy trên con chứ 2 mọi thứ vẫn bthg ở con 1 (flower, rabitMQ, worker trên con 1 vẫn nhận)

Web server, Scheduler trên con thứ 2
1
2
3
4
5
6
ImportError: No module named 'mysql'
pip install mysql-connector-python-rf

sqlalchemy.exc.NotSupportedError: (mysql.connector.errors.NotSupportedError) Authentication plugin 'caching_sha2_password' is not supported
pip install mysql-connector-python

1
2
3
4
5
6
7
8
9
10
ModuleNotFoundError: No module named 'MySQLdb'
pip install mysqlclient
# vẫn lỗi 
sudo apt-get install python3.8-dev
# vẫn lỗi
sudo apt-get install python3-dev default-libmysqlclient-dev build-essential pkg-config

# cài thêm celery nếu chưa có 
pip install 'apache-airflow[celery]'

Message queue, mysql có cách HA riêng

Vì Scheduler lúc 2 con chạy job sẽ bị x2, nên bắt buộc chỉ có 1 con chạy thui -> cần HA Scheduler

1
2
3
4
5
6
7
8
pip3 install git+https://github.com/teamclairvoyant/airflow-scheduler-failover-controller.git@v1.0.8
scheduler_failover_controller init  # append them config HA vào airflow.cfg
scheduler_failover_controller start

# mấy cái lệnh chạy, stop có trong file config
# chú ý config scheduler_nodes_in_cluster List of potential nodes that can act as Schedulers (Comma Separated List)
# chuyển log về debug sẽ thấy cái scheduler_failover_controller check bằng 
Running Command: ps -eaf | grep 'airflow scheduler'
lỗi không thể chạy lên
1
2
3
4
5
6
7
8
9
10
11
12
13
in configuration.py:     


def get_sql_alchemy_conn(self):
        return self.get_config("core", "SQL_ALCHEMY_CONN")


but maybe airflow.cfg, the section is [database]. so that engine can be null

add sql_alchemy_conn both [database], [core] in airflow.cfg can fix this issue

https://github.com/teamclairvoyant/airflow-scheduler-failover-controller/issues/43

Note

  • xcom (cross-communications) để truyền thông tin giữa các task trong dag
  • get_pty=True để kill được task trên ui
  • do_xcom_push=False để tránh trường hợp xcom lưu output> 65kb dẫn đến task fail(sử dụng print hoặc return function)

Ref

lotus doc - hiephm

git ha

doc

phân loại executor

bài viết gốc - longpt233

This post is licensed under CC BY 4.0 by the author.