2016-08-31 2 views
3

나는 어떤 pyspark 라이브러리를 포함하는 luigi python 태스크를 가지고있다. 이제는 spark-submes와 함께 mesos에이 작업을 제출하고 싶습니다. 그것을 실행하려면 어떻게해야합니까?spark-submit과 pyspark로 luigi 작업을 실행하는 방법

from pyspark.sql import functions as F 
from pyspark import SparkContext 

class myClass(SparkSubmitTask): 
# date = luigi.DateParameter() 

    def __init__(self, date): 
    self.date = date # date is datetime.date.today().isoformat() 

    def output(self): 

    def input(self): 

    def run(self): 
    # Some functions are using pyspark libs 

if __name__ == "__main__": 
    luigi.run() 

루이지없이, 나는 다음과 같은 명령 줄로이 작업 submmitting 해요 : : 다음은 내 코드의 골격이다

/opt/spark/bin/spark-submit --master mesos://host:port --deploy-mode cluster --total-executor-cores 1 --driver-cores 1 --executor-memory 1G --driver-memory 1G my_module.py 

하는 것은 이제 문제는 내가-불꽃 제출 루이지 작업을 할 수있는 방법입니다 my_module.py 먼저 완료하는 데 필요한 작업이있는 경우

luigi --module my_module myClass --local-scheduler --date 2016-01 

하나 개 더 질문은 내가 더 많은 일을하거나 현재 - 명령과 동일하게 설정해야합니까입니다 : 루이지 명령 줄 등 포함 선?

정말 어떤 힌트 나 제안에 감사드립니다. 매우 감사합니다.

답변

4

루이지 (Luigi)에는 몇 가지 템플릿 작업이 있습니다. 그들 중 하나는 PySparkTask라고 불렀습니다. 이 클래스에서 상속하여 속성을 재정의 할 수 있습니다.

https://github.com/spotify/luigi/blob/master/luigi/contrib/spark.py.

나는 그것을 테스트하지만, 루이지 내 경험을 바탕으로하지 않은 내가 이것을 시도 할 것이다 :

import my_module 


class MyPySparkTask(PySparkTask): 
    date = luigi.DateParameter() 

    @property 
    def name(self): 
     return self.__class__.__name__ 

    @property 
    def master(self): 
     return 'mesos://host:port' 

    @property 
    def deploy_mode(self): 
     return 'cluster' 

    @property 
    def total_executor_cores(self): 
     return 1 

    @property 
    def driver_cores(self): 
     return 1 

    @property 
    def executor-memory(self): 
     return 1G 

    @property 
    def driver-memory(self): 
     return 1G 

    def main(self, sc, *args): 
     my_module.run(sc) 

    def self.app_options(): 
     return [date] 

그럼 당신은 그것을 실행할 수 있습니다 루이지 --module task_module MyPySparkTask --local-스케줄러 - -date 2016-01

다른 PySparkTasks 그들에게 기본 값을하기 위해 client.cfg 파일의 속성을 설정하는 옵션도 있습니다 :

[spark] 
master: mesos://host:port 
deploy_mode: cluster 
total_executor_cores: 1 
driver_cores: 1 
executor-memory: 1G 
driver-memory: 1G 
+0

안녕하세요 님 un12, 정말 고마워요. 이미 SparkSubmitTask에서 클래스를 상속 받았지만 대신이 클래스를 사용할 수 있습니까? MyPySparkTask를 사용하면 client.cfg에서 spark 명령을 설정하고 다음 명령을 실행할 필요가 있다는 것을 의미합니다. "luigi --module task_module MyPySparkTask --local-scheduler --date 2016-01"평상시처럼? – zuhakasa

+0

1. SparkSubmitTask 대신 위의 예제에서 MyPySparkTask와 같은 PySparkTask를 상속 받아 'main'메소드에서 로직을 구현해야합니다. 2. 구성은 SparkSubmitTask와 동일하며 client.cfg에서 정의 했습니까? 아니면 속성을 무시하여 정의 했습니까? 3. 실행은 SparkSubmitTask를 실행하는 것과 비슷합니다. – ayun12

+0

도움을 주셔서 대단히 감사드립니다. 한 가지 물어보고 싶은 것은 main 메소드의 코드에서 my_module.run (sc)을 호출하는 것입니다. 그러나, 내가 아는 한, luigi.Task my_module 함수가 task_module.py에서 상속받은 메소드에는 메소드 실행시 매개 변수 sc가 없습니다. 따라서 task_module.py에서이 메서드를 재정의해야한다는 것을 의미합니까? – zuhakasa

관련 문제