본문 바로가기

Data & AI

[Airflow] Docker Compose 기반 Airflow 구성과 주식 메타데이터 추출과 적재

 

Airflow는 DAG라는 작업 단위를 통해 Data Processing에 필요한 작업을 수행하고 스케줄링, 시각화해주는 도구다. 

 

Airflow 관련 내용은 별도로 정리를 할 예정이다. 


회사에서 Airflow를 구축할 때는 VM을 할당받고 그 위에 직접 설치를 했었다. 

이번 FDP 프로젝트때문에 별도로 Airflow를 설치하려고 공부하다 보니 Docker Compose를 통해 손쉽게 구현이 가능했다. 


Docker Compose는 공통 서비스에 사용될 Docker Container들의 관리와 배포 관련 설정들을 손쉽게 이용할 수 있다. 

 

나는 기본 사용중이던 Mysql Container가 있어서 Airflow Container들을 추가로 구현하기로 했다. 

 

Airflow 구축 후 주식 메타정보(PER, PBR 등 재무정보)를 가져와 DB에 적재할 예정이다. 

기본적으로 Airflow 문서에서 docker compose yaml 파일을 제공하고 compose.yaml파일만으로 배포가 가능하지만 

 

Dag에 필요한 라이브러리들이 있다면 개별로 Dockerfile을 만들어 base image를 만들고

그 이미지를 기반으로 container 들을 형성해주기로 했다. 

상장된 주식들 코드를 가져오기 위해 finance-datareader, mysql접속을 위한 pymysql 등 필요한 모듈을 정의해주었다. 

 

# Dockerfile
FROM apache/airflow:latest
COPY requirements.txt /requirements.txt
RUN pip install --user --upgrade pip
RUN pip install -r /requirements.txt

 

Docker build를 위한 명령 수행 

docker build . --tag extending_airflow:latest

apache/airflow image위에 requirements에 정의된 python module들을 설치하도록 했다.
 
이렇게 Dockerfile을 작성하고 build 하는데 --tag 옵션으로 이미지 이름을 명시해주었다. 

 

extending_airflow 이름으로 만들어진 이미지


이제 이 image를 기반으로 docker compose를 구성하도록 했다. 


docker compose 파일은 airflow 공식문서에서 소개하고 있다. 
https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html

 

기존 docker compose 파일에서 compose 되는 기본 이미지를 

생성했던 extending_airflow image를 사용하도록 변경했다. 

 

# docker-compose.yaml
version: '3'

...
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-extending_airflow:latest}
    
...

 

docker compose up 명령 수행

docker compose up

 

airflow 이름으로 형성된 docker compose containers


airflow에 필요한 container들이 기동되고 mysql과 연동된 dag를 테스트했는데 

로그를 보니 실패한 것으로 보여 원인을 파악하다가

container간 network 설정을 깜빡 했다는 걸 알았다. 

 

airflow compose들은 하나의 네트워크로 기본적으로 묶이게 된다. 

 

"airflow_default" 네트워크로 묶이고 있어서 mysql을 재실행하면서 해당 네트워크에 포함시켜주었다. 

 

docker run --network airflow_detault --name mysql-volume-container -e MYSQL_ROOT_PASSWORD='root' -d -p 3306:3306 -v mysql-volume:/var/lib/mysql mysql:latest

 

참고로 mysql db의 volume을 항상 같은곳에 지정해주어서 container를 재기동하여도 데이터의 손실 없이 DB를 올리고 내릴 수 있다. 

 

네트워크를 지정하고 container를 올리고 나서는 `docker network instpect airflow_default` 명령을 통해서 

airflow_default 네트워크들이 내부적으로 어떤 ip로 통신하고 있는지 확인하고

airflow dag가 mysql을 접속할 때 airflow_default의 mysql container가 할당받은 해당 ip로 통신하도록 수정해준다. 

 

docker network inspect airflow_default
[
    {
        "Name": "airflow_default",
	# ...

                "Name": "mysql",
                "IPv4Address": "172.21.0.6/16",
                "IPv6Address": ""
            },

    # ...

 

이번에 추가한 Dag는 네이버 증권에서 재무정보를 표시할때 조회하는 API를 호출하여 재무 데이터를 호출하는 작업을 수행한다.
(chrome devtools network 탭에서 호출하는 정보를 확인할 수 있다. 네이버 증권을 여러번 호출하는 구조는 아니다.)

 

Dag의 작업 순서를 간단하게 설명하면

 

1. finance-datareader 라이브러리를 통해 상장된 주식의 종목코드번호를 가져온다.

 

2. 가져온 코드들에 해당하는 재무정보를 가져온다.

 

3. 추출 데이터 전처리 : 가져온 데이터를 적재하기 위해 데이터 전처리를 수행한다. 

4. stock_meta이름으로 로 설계된 테이블에 데이터를 삽입한다. 

 

Dag Grid탭의 모습, Dag실행에 13분 정도가 소요된다.

 

해당 데이터는 분기별 업데이트가 필요하기 때문에 90일 주기로 스케줄링 되도록 되어있다. 

 

성공적으로 데이터가 적재되었는지 확인해보았다. 

 

2022년 2분기 재무데이터 기준 저 PER, PBR, EVEBITDA

이제 퀀트 투자에 기본이 되는 기초재무정보수집이 완료되었다. 

 

사실 기초적인 데이터로도 시장 이상의 수익 창출은 가능하다. 

 

떠오르는 투자 전력의 알고리즘을 백테스트하며 검증하고 시각화 하기 위한 기초 데이터 수집이 완료된 셈이다. 

 

기초적인 데이터 외에도 투자에 적용할 수 있는 많은 데이터들을 수집하고 처리하고 적재하여

의미 있는 정보로 가공하는 내용을 꾸준히 연구할 예정이다.