티스토리 뷰

개발/Apache Airflow

Hello, Apache Airflow

Jaeyeon Baek 2021. 6. 16. 22:51

Airflow는 Airbnb에서 2014년 10월 시작된 오픈소스 프로젝트입니다. 오픈소스로 2015년 6월 발표되었으며 2016년 3월에 Apache Software Foundation에 인큐베이팅되었어요. 그리고 2019년 1월 아파치 재단은 Apache Airflow를 탑티어 프로젝트로 발표했습니다. 자, 그럼 Airflow가 무엇인지 천천히 살펴보도록 하겠습니다.

airflow는 workflow를 구축하고 실행할 수 있는 하나의 플랫폼입니다. 워크플로라는 단어가 모든 것을 설명해주는데요, airflow에서 워크플로는 DAG(Directed Acyclic Graph)으로 표시되며 세부사항으로는 개별 작업을 포함해서 종속성 및 데이터 흐름을 정렬하게 됩니다. 이게 다 무슨 소리인가 싶을 수 있는데요. DAG부터 알아보면 작업(task)을 방향이 있는 그래프로 순서대로 처리한다"로 이야기할 수 있습니다. 아래 그림처럼 순서를 보장하며 작업이 진행됩니다. 녹색 네모가 작업(task)을 나타냅니다. ( 혹시 작업이라는 표현이 익숙하지 않으신가요? 그럼 우선 함수로 생각하셔도 됩니다 )

https://airflow.apache.org/docs/apache-airflow/stable/concepts/overview.html



위에 그림을 좀 더 살펴보면 ingest task가 실행되고 끝나면 analyze가 실행됩니다. 이후 check_integrity가 실행되는데 여기서 분기가 발생합니다. 꼭 프로그래밍 같죠? 에러 여부에 따라 분기되고 나서 다음 task가 실행됩니다. 그리고 최종적으로 report가 실행되었네요.

A라는 일이 끝나면 B가 시작되고, B가 끝나면 C가 시작되고... 이 패턴에서 혹시 뭔가 번뜩이며 떠올랐다면 아마도 event-driven architecture(EDA) 일 겁니다. 작업이 끝났다는 이벤트를 보내고 해당 이벤트에 대해서 다른 작업이 시작되는 비동기 통신 방식을 기반으로 한 아키텍처입니다. airflow는 이와 같은 아키텍처를 아주 쉽게 조립해서 사용할 수 있도록 구축된 플랫폼입니다. 그리고 이런 방식으로 작업이 순차적으로 진행되는 아주 좋은 예시는 Extract Transform Load(ETL), Extract Load Transform(ELT)가 될 겁니다. 추출이 끝나면 변환하고, 변환이 끝나면 적재하는 방식처럼 말이죠. task는 작은 독립적인 단위로 동작을 하는데 task 간의 커뮤니케이션도 가능해서 앞에서 처리된 task의 결과(output)를 다음 task의 인자로 넘겨주는 것도 가능합니다. 온라인 대부분의 예제는 그 부분까지는 잘 다루지 않지만요.

airflow는 내부 구성을 살펴보면 스케줄러가 존재하고 비동기 메시지 큐로 Celery를 기본 채택하고 있으며 브로커로 redis, 저장소로 postgres가 사용됩니다. 그리고 사용자가 이런 작업들을 관리할 수 있도록 웹서버를 제공합니다. 이런 개별 서비스가 모여서 구축된 플랫폼이 바로 airflow인 것이죠.

사실 이런 따분한 내부 구성 이야기는 천천히 공부해도 늦지 않습니다. 일단 airflow를 설치하고 사용해보면서 감을 잡는 게 중요합니다. 어디 사용하면 좋을지는 airflow에 감이 잡히면 자연스럽게 떠오르게 될 테니까요. 다음 글에서 설치부터 튜토리얼, 각 서비스 소개 그리고 튜닝까지 여러 가지 내용을 하나씩 다뤄보도록 하겠습니다. 아마 최종적으로는 airflow가 동작하는 인프라(kubernetes, 혹은 클라우드 풀 매니지드 에어플로)에 대한 이야기로 마무리될 듯싶네요!

 

'개발 > Apache Airflow' 카테고리의 다른 글

airflow 파라미터 튜닝  (4) 2021.07.05
docker-compose로 Airflow 한방에 설치하기  (0) 2021.06.21
Hello, Apache Airflow  (4) 2021.06.16
댓글
  • 프로필사진 BlogIcon 은유  포스트에서 말씀하시는 task간 커뮤니케이션은 어떤걸 말씀하시는걸까요? x-com과 같은 변수 공유방식인지 아니면 상태만 주고받는 정도인지
    아니면 말그대로 task에 output 예를들어 추출한 데이터프레임과 같은 output들을 주고받는것인지 궁금합니다.
    그리고 만약 후자라면 그 주고받는 방식은 파일시스템 이외에 airflow내에서 또 어떤 방식이 있는지도 궁금합니다
    2021.07.06 11:09 신고
  • 프로필사진 BlogIcon Jaeyeon Baek 은유님. 질문 감사합니다.

    관련해서는 별도의 포스트로 다룰 예정이었는데 여기 글에 고급 질문이 올라올지는 예상 못했네요ㅎㅎ

    airflow에는 x-com 방식밖에 존재하지 않는것 같습니다. 다만 직접 x-com 타입을 써가며 구현하느냐, 일종의 하이레벨 모듈을 사용하느냐 차이 정도 같아요. 본문에서 표현하고자 했던건 아래 수준입니다. (pseudo code)

    def foo():
    ....return {'hello': [1,2,3,4]}
    def bar(foo: dict):
    ....print(foo.get('hello')) # [1,2,3,4]

    task_1 = PythonOperator(
    ....python_callable=foo
    )
    task_2 = PythonOperator(
    ....python_callable=bar
    ....op_kwargs={'foo': task_1.output}
    )
    task_1 >> task_2

    이때 실제 내부적인 통신은 x-com 이죠. :)
    2021.07.06 12:44 신고
  • 프로필사진 BlogIcon 은유  고급 질문이라니 가당치않습니다!! ㅎㅎ
    아무튼 정말 친절하고 빠른답변 감사합니다.
    저도 태스크간 output을 주고받는 것에 x-com을 고민했었으나 XCOM의 max size는 48kb고 단순히 task간의 통신을 위한 메모 정도로 설계되어있는 정도라서 유실가능성이 있는 한 output을 주고받는건 안되겠다고 판단해서 보류했습니다.
    혹시 제가 서칭하다가 xcom이외에 다른 것이 있나해서 한번 여쭤봤던 거였습니다!
    다시한번 답변 감사합니다.
    2021.07.06 13:55 신고
  • 프로필사진 BlogIcon Jaeyeon Baek 네 그런데 실제 xcom을 사용해보면 48kb 보다 훨씬 큰 데이터도 전송이 가능해서 여러모로 쓸모가 많은 것 같더라고요. (좋은 사용법 여부는 논외)

    나중에 코드 레벨로 따라가서 어떤 이유로 명시된 제한 사이즈보다 크게 output을 사용할 수 있는지 찾아볼 예정입니다! :)

    감사합니다!
    2021.07.06 14:58 신고
댓글쓰기 폼