2022년 10월 13일 목요일

Airflow 설치 및 설정 (feat. 문제해결)

0. 준비<필요한 것>

  • python
  • conda 가상환경
  • arflow
  • postgreSQL
  • Redis

1. 시스템 사용자 생성

  • Airflow 전용의 새로운 사용자를 생성한다.
    ]# adduser airflow  # 사용자 생성
    ]# passwd airflow  # 비밀번호 설정
    ]# usermod -g cncity airflow  # primary group 변경 (cncity)
    ]# usermod -aG wheel airflow  # secondary group 추가 (wheel)
    
    ]# id airflow
    uid=2003(airflow) gid=2000(cncity) groups=2000(cncity),10(wheel)
    

2. Airflow 사전 설정

2-1. Meta Database 구성

  • PostgreSQL으로 Airflow Meta Database를 구성한다.
    
    # postgres 계정으로 실행
    -- 사용자 생성
    create user airflowuser with encrypted password '##airflowuser_94RMJM';
    
    -- Database 생성
    create database airflow_celery owner postgres encoding 'utf-8' lc_collate 'C' lc_ctype 'ko_KR.UTF-8' template 'template0';
    
    -- 사용자에 데이터베이스 전제 권한 부여
    grant all privileges on database airflow_celery to airflowuser;
    
    # airflowuser 계정으로 실행
    -- 스키마 생성
    create schema airflow;
    
    
  • Airflow Meta Database
    sql_alchemy_conn = "postgresql+psycopg2://airflowuser!:##airflowuser_94RMJM@cncity-ai-postgresql.cglzgjdidyrc.ap-northeast-2.rds.amazonaws.com/airflow_celery" 
    

2-2. Python 가상환경 구성

  • Airflow 전용의 Anaconda Virtual Environment를 설정한다.
    (base) ]$ conda create -n airflow python=3.10
    (base) ]$ conda activate airflow
    (airflow) ]$
    
    airflow 사용자가 기본으로 airflow conda virtual env을 사용하기 위해
    ~/.bash_profile 하단에 conda activate airflow을 추가
    
    (airflow) ]$ vi ~/.bash_profile
    
    # 추가
    conda activate airflow
    

2-3. Broker 구성

  • CeleryExecutor로 설정시에만 Broker구성이 필요
  • Broker로는 Redis, RabbitMQ가 있으며 Redis를 사용한다.

Redis 설치

# https://redis.io/download에서 Stable 버전의 Redis 설치파일을 다운받는다.
(airflow) ~]# cd /opt
(airflow) /opt]# mkdir /opt/redis

(airflow) /opt]# cd redis
(airflow) /opt/redis]# curl -O https://download.redis.io/releases/redis-6.2.6.tar.gz

# 압축을 해제한다.
(airflow) /opt/redis]# tar -zxvf redis-6.2.6.tar.gz

# 압축 해제한 설치파일을 make 명령어를 이요하여 설치한다.
(airflow) /opt/redis]# cd redis-6.2.6
(airflow) /opt/redis/redis-6.2.6 ]# make && make install

# 테스트
(airflow) /opt/redis/redis-6.2.6 ]# redis-cli PING
PONG

Redis 설정

################################## NETWORK #####################################

# By default, if no "bind" configuration directive is specified, Redis listens
# for connections from all available network interfaces on the host machine.
# It is possible to listen to just one or multiple selected interfaces using
# the "bind" configuration directive, followed by one or more IP addresses.
# Each address can be prefixed by "-", which means that redis will not fail to
# start if the address is not available. Being not available only refers to
# addresses that does not correspond to any network interfece. Addresses that
# are already in use will always fail, and unsupported protocols will always BE
# silently skipped.
#
# Examples:
#
# bind 192.168.1.100 10.0.0.1     # listens on two specific IPv4 addresses
# bind 127.0.0.1 ::1              # listens on loopback IPv4 and IPv6
# bind * -::*                     # like the default, all available interfaces
#
# ~~~ WARNING ~~~ If the computer running Redis is directly exposed to the
# internet, binding to all the interfaces is dangerous and will expose the
# instance to everybody on the internet. So by default we uncomment the
# following bind directive, that will force Redis to listen only on the
# IPv4 and IPv6 (if available) loopback interface addresses (this means Redis
# will only be able to accept client connections from the same host that it is
# running on).
#
# IF YOU ARE SURE YOU WANT YOUR INSTANCE TO LISTEN TO ALL THE INTERFACES
# JUST COMMENT OUT THE FOLLOWING LINE.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
#bind 127.0.0.1 -::1
bind 0.0.0.0

################################# GENERAL #####################################

# By default Redis does not run as a daemon. Use 'yes' if you need it.
# Note that Redis will write a pid file in /var/run/redis.pid when daemonized.
# When Redis is supervised by upstart or systemd, this parameter has no impact.
#daemonize no
daemonize yes

############################## MEMORY MANAGEMENT ################################

# Set a memory usage limit to the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
#
# If Redis can't remove keys according to the policy, or if the policy is
# set to 'noeviction', Redis will start to reply with errors to commands
# that would use more memory, like SET, LPUSH, and so on, and will continue
# to reply to read-only commands like GET.
#
# This option is usually useful when using Redis as an LRU or LFU cache, or to
# set a hard memory limit for an instance (using the 'noeviction' policy).
#
# WARNING: If you have replicas attached to an instance with maxmemory on,
# the size of the output buffers needed to feed the replicas are subtracted
# from the used memory count, so that network problems / resyncs will
# not trigger a loop where keys are evicted, and in turn the output
# buffer of replicas is full with DELs of keys evicted triggering the deletion
# of more keys, and so forth until the database is completely emptied.
#
# In short... if you have replicas attached it is suggested that you set a lower
# limit for maxmemory so that there is some free RAM on the system for replica
# output buffers (but this is not needed if the policy is 'noeviction').
#
# maxmemory <bytes>
maxmemory 500m

# MAXMEMORY POLICY: how Redis will select what to remove when maxmemory
# is reached. You can select one from the following behaviors:
#
# volatile-lru -> Evict using approximated LRU, only keys with an expire set.
# allkeys-lru -> Evict any key using approximated LRU.
# volatile-lfu -> Evict using approximated LFU, only keys with an expire set.
# allkeys-lfu -> Evict any key using approximated LFU.
# volatile-random -> Remove a random key having an expire set.
# allkeys-random -> Remove a random key, any key.
# volatile-ttl -> Remove the key with the nearest expire time (minor TTL)
# noeviction -> Don't evict anything, just return an error on write operations.
#
# LRU means Least Recently Used
# LFU means Least Frequently Used
#
# Both LRU, LFU and volatile-ttl are implemented using approximated
# randomized algorithms.
#
# Note: with any of the above policies, when there are no suitable keys for
# eviction, Redis will return an error on write operations that require
# more memory. These are usually commands that create new keys, add data or
# modify existing keys. A few examples are: SET, INCR, HSET, LPUSH, SUNIONSTORE,
# SORT (due to the STORE argument), and EXEC (if the transaction includes any
# command that requires memory).
#
# The default is:
#
# maxmemory-policy noeviction
maxmemory-policy allkeys-lru

2-3. Airflow Home 설정

  • AIRFLOW_HOME을 설정하지 않으면 Default로 ~/airflow 가 AIRFLOW_HOME이 된다.
  • ~/.bash_profile에 값을 설정하여 AIRFLOW_HOME을 변경한다.
(airflow) ]$ vi ~/.bash_profile

AIRFLOW_HOME=/data/airflow
export AIRFLOW_HOME

3. Airflow 설치

3-1. Meta Database 초기화

3-2. airflow.cfg 설정

3-9. user 생성

(airflow) ]$ airflow users create \
--username admin \
--password 'yourpwd' \
--firstname 'Your FirstName' \
--lastname 'Your Lastname' \
--role Admin \
--email 'aisolution.cncity@gmail.com'

(airflow) ]$ airflow users create \
--username airflowuser \
--password 'password' \
--firstname 'Your FName' \
--lastname 'Your LName' \
--role User \
--email 'email'

9. Troubleshooting (문제해결)

  • Airflow CPU점유율이 높은 경우
    - scheduler 의 주기적으로 발생/조회 주기를 늘려준다. Link
    [scheduler]
    
    # The scheduler constantly tries to trigger new tasks (look at the
    # scheduler section in the docs for more information). This defines
    # how often the scheduler should run (in seconds).
    #scheduler_heartbeat_sec = 5
    scheduler_heartbeat_sec = 60
    
    # Number of seconds after which a DAG file is parsed. The DAG file is parsed every
    # ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
    # this interval. Keeping this number low will increase CPU usage.
    #min_file_process_interval = 0
    min_file_process_interval = 60
    
    # The scheduler can run multiple processes in parallel to parse dags.
    # This defines how many processes will run.
    
    # 1.10.14 이후
    #parsing_processes = 2
    parsing_processes = 3  # <NUMBER_OF_CORES_IN_MACHINE -1>
    
    # 1.10.14 이전
    #max_threads = 2
    max_threads = 3  # <NUMBER_OF_CORES_IN_MACHINE -1>
    
  • Task 실행도중 airflow.exceptions.AirflowException: Task received SIGTERM signal 이 발생하는 경우
    - 실행시간이 긴 TASK에 대해 Airflow가 zombie_task or orphaned_task 로 판단을 하여 process를 종료하는 것으로 보임.
    - zombie_task or orphaned_task 를 정리하는 시간을 늘려주어 process가 중간에 종료되지 않도록 설정을 변경한다.
    # How often (in seconds) should the scheduler check for orphaned tasks and SchedulerJobs
    #orphaned_tasks_check_interval = 300.0
    orphaned_tasks_check_interval = 3600.0
    
    # Local task jobs periodically heartbeat to the DB. If the job has
    # not heartbeat in this many seconds, the scheduler will mark the
    # associated task instance as failed and will re-schedule the task.
    #scheduler_zombie_task_threshold = 300
    scheduler_zombie_task_threshold = 3600
    
  • AWS ECS (Elastic Container Service)에 Airflow Container를 올렸을 때 Sending Signals.SIGTERM to GPID 150 발생하면서 Container가 종료되는 경우
    - Scheduler Container에서 발생을 하는데 이 경우 Container를 Health 체크시 응답받아야하는 시간이 짧아서 발생함.
    - scheduler_health_check_threshold 값을 높여 Health 체크의 응답시간을 늘린다.
    # If the last scheduler heartbeat happened more than scheduler_health_check_threshold
    # ago (in seconds), scheduler is considered unhealthy.
    # This is used by the health check in the "/health" endpoint
    #scheduler_health_check_threshold = 30
    scheduler_health_check_threshold = 300

 

라벨: