서론

혼자 짜투리 시간에 뚝딱뚝딱 뭔가를 만들어보고있는 중인데, Daily로 돌아가야 하는 코드가 있었다. 그래서 이걸 pipeline에 실어서 돌리고자 한다.

가장 유명한 airflow/kubeflow(그냥 내가 자주들어본) airflow는 내 간단한 코드를 돌리기엔 너무 무겁다. kubeflow는 kube도 어려운데.. kubeflow를…? 그래도 airflow, kube 써보고싶다 ㅠㅠ 언제쯤 리얼서비스에서 써볼 수 있을까? 얼른 분발해야겠다!

아무튼 “무겁다” 라고 생각하던 와중에, 이 블로그 를 읽게 되었다. 해당 블로그에서는 많은 pipeline 도구들을 비교분석해주었다. 위에 글을 읽고나니 luigi라는 새로운 도구를 알게되었다. 루이지의 주 목적은 오래 실행되는 배치 process의 처리이다. 나와같이 airflow는 좀 무거운데 라고 생각할 때 쓰면 좋다고 해서 루이지를 써보면 어떨까 하고 생각했다. 대신, luigi는 cron을 사용해 airflow에 비해 확장성이 낮다고한다.
어차피 내가 개발하고 있는 것도 개발 맞나..? 확장성이 크게 필요하지 않기떄문에 이렇게보나 저렇게 보나 일단은 루이지를 써보는게 좋을 것 같다. 가장 좋은 교과서는 공식문서 이므로, 공식 문서를 보고 따라가도록 하겠다.

개념

들어가기에 앞서 루이지가 뭔지 간략하게 알아보고 들어가자. 슈퍼마리오 초록색
공식 문서에 의하면 아래와 같이 정의하고 있다.

The purpose of Luigi is to address all the plumbing typically associated with long-running batch processes.

루이지의 워크플로우 빌드에는 Target, Task 2가지의 주 클래스가 있다. Target은 File, DataBase와 같은 check point를 의미한다. 그래서 LocalTarget, HdfsTarget, S3Target 같은 것들이 존재한다.
Task는 말 그대로 Task를 의미한다.

There are two fundamental building blocks of Luigi - the Task class and the Target class. Both are abstract classes and expect a few methods to be implemented. In addition to those two concepts, the Parameter class is an important concept that governs how a Task is run.

루이지의 작동원리로는 이 글 에서 잘 설명해주고있다. 정말 간단하다.

Luigi는 이렇게 태스크의 요청에 기반한 실행 단위 그래프(the execution graph)를 생성하는 top-down 접근법을 갖고 있습니다. 그리고나서 top-down 방식의 처음부터 끝까지(혹은 실패할 때까지) 실행을 하게 되는거죠.



내가 하려는 자동화는 매일 아침마다 특정 웹페이지들을 스크랩하는 것이다. 무지성으로 스크랩하진않고, 글의 날짜를 체크하여 최신 글이 있을 때만 스크랩한다. 자 그럼 사용해보자.

설치

우선은 pip를 이용해 간단하게 설치할 수 있다.

$ pip install luigi 

예제 실행

우선 예제를 하나 돌려보도록하자. 공식 문서에 나와있는 예제 코드를 보면
output, requires, run으로 이루어져있는 것을 알 수 있다. 입/출력 데이터와 실제 작업이 실행되는 부분으로 나뉘어진 것이다. 간단하다.
이 코드는 전체 코드의 일부이기때문에 깃 허브 예제 로 가서 확인하는 것이 좋다.
또한,, 애초에 돌아가지도 않는다. 깃허브로 바로 가는 것을 추천한다.

class AggregateArtists(luigi.Task):
    date_interval = luigi.DateIntervalParameter()

    def output(self):
        return luigi.LocalTarget("data/artist_streams_%s.tsv" % self.date_interval)

    def requires(self):
        return [Streams(date) for date in self.date_interval]

    def run(self):
        artist_count = defaultdict(int)
        for input in self.input():
            with input.open('r') as in_file:
                for line in in_file:
                    timestamp, artist, track = line.strip().split()
                    artist_count[artist] += 1

        with self.output().open('w') as out_file:
            for artist, count in artist_count.iteritems():
                print >> out_file, artist, count


python path를 꼭 등록해줘야 한다. 아래 명령어를 이용해 실행해 볼 수 있는데, local-scheduler이기때문에 테스트용으로만 쓰는 것이 좋다. 예시가 10년전 예시라니..환경변수는 OS에 따라 적절하게 적용해주면 된다.

Note that top_artists needs to be in your PYTHONPATH, or else this can produce an error (ImportError: No module named top_artists). Add the current working directory to the command PYTHONPATH with:

$ PYTHONPATH='.' luigi --module top_artists AggregateArtists --local-scheduler --date-interval 2012-06

실행하고나면 아래와 같이 요약해서 보여준다.

===== Luigi Execution Summary =====

Scheduled 29 tasks of which:
* 28 complete ones were encountered:
    - 28 Streams(date=2022-02-01...2022-02-28)
* 1 ran successfully:
    - 1 AggregateArtists(date_interval=2022-02)

This progress looks :) because there were no failed tasks or missing dependencies

===== Luigi Execution Summary =====

굉장히 사용법이 간단한 것 같다.

luigi Web

루이지 웹은 굉장히 실행이 간단하다. 웹페이지를 실행한 뒤 우리가 아까 위에서 실행한 명령어를 ‘–local-scheduler’ 만 제거한 뒤 진행하면 된다. 그럼 아래와 같이 뜬다 :)

luigid --port 8082

2022-02-14-luigi-img-1

Slack 연결

이번 프로젝트는 기본적으로 TB급 데이터들이었기때문에 처리하는데 10시간 이상이 걸리는 경우가 다반사였다. 그래서 중간에 파이프라인이 뻑났는데도 모르고 헤헤호호 하면 그 후에 위지원이 고통받았다. 그리하여 슬랙을 alert로 이용하였다. 아주 유용했다.
검색하다가 그나마 최신인 flugi-monitor 을 발견했다. 사용법은 깃헙 주소 에서 확인 할 수 있다. 해당 라이브러리를 사용하면 아래와 같이 사용이 가능하다고 한다.

2022-02-14-luigi-img-2
위 예제를 기반으로 아래와 같이 코드를 작성했다.

if __name__ == "__main__":
    with monitor(slack_url=<Slack API WebHook URL>):
        luigi.run(main_task_cls=AggregateArtists, cmdline_args=["--date-interval=2022-01"])


실행해본 결과 아래와 같이 잘 되는 것을 알 수 있다 야호!
2022-02-14-luigi-img-3

종속성 추가

아래 함수를 이용하면 종속성을 추가할 수 있다. 그래서 1번 Task가 완료되어야 2번 Task가 진행된다.
이 때, 1번 Task의 output 함수의 결괏값이 자동으로 2번 Task의 input값이 된다 :) 이걸 나중에 알아서 대체 self.input()이 어디서 온다는 거야!!! 했더랬다..ㅎㅎ;;

def requires(self):


내 코드에 종속성을 추가한 뒤 코드를 실행했을 때 다음과 같은 에러가 발생했다. 이 글 에 의하면

RuntimeError: Unfulfilled dependency at run time: collect_kakao_data__99914b932b

REFERENCES