2017-04-20 2 views
0

Spark에서 RDD를 테스트 할 수 있는지 여부는 확실하지 않습니다.Spark RDD 테스트 방법

나는 ROCK을 조롱하는 것이 좋은 생각이 아니라고 말하는 기사를 보았습니다. RDD 테스트를위한 다른 방법이나 모범 사례가 있습니까

+1

Holden의 [spark-test-base] (https://github.com/holdenk/spark-testing-base)를 보았습니까? – Pushkr

답변

0

Spark RDD/응용 프로그램을 테스트하는 두 가지 방법이 있습니다. 다음과 같이 그들은 :

단위 테스트 :

import org.apache.spark.SparkContext 
import org.apache.spark.rdd.RDD 

class WordCount { 
    def get(url: String, sc: SparkContext): RDD[(String, Int)] = { 
    val lines = sc.textFile(url) lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) 
    } 
} 

지금 방법 1은 다음대로 테스트하는 예를를 들어

import org.scalatest.{ BeforeAndAfterAll, FunSuite } 
import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 

class WordCountTest extends FunSuite with BeforeAndAfterAll { 
    private var sparkConf: SparkConf = _ 
    private var sc: SparkContext = _ 

    override def beforeAll() { 
    sparkConf = new SparkConf().setAppName("unit-testing").setMaster("local") 
    sc = new SparkContext(sparkConf) 
    } 

    private val wordCount = new WordCount 

    test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10) 
    } 

    override def afterAll() { 
    sc.stop() 
    } 
} 

방법 1에서 우리는 RDD를 조롱하지 않습니다. 우리는 단지 우리의 WordCount 클래스의 행동을 점검하고 있습니다. 하지만 여기서 우리는 SparkContext의 생성과 파기를 우리 스스로 관리해야합니다.

방법 2 :

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 

class WordCountTest extends FunSuite with SharedSparkContext { 
    private val wordCount = new WordCount 

    test("get word count rdd") { 
    val result = wordCount.get("file.txt", sc) 
    assert(result.take(10).length === 10) 
    } 
} 

또는

import org.scalatest.FunSuite 
import com.holdenkarau.spark.testing.SharedSparkContext 
import com.holdenkarau.spark.testing.RDDComparisons 

class WordCountTest extends FunSuite with SharedSparkContext { 
    private val wordCount = new WordCount 

    test("get word count rdd with comparison") { 
    val expected = sc.textFile("file.txt") 
        .flatMap(_.split(" ")) 
        .map((_, 1)) 
        .reduceByKey(_ + _) 

    val result = wordCount.get("file.txt", sc) 

    assert(RDDComparisons.compare(expected, result).isEmpty) 
    } 
} 

를 들어 당신이 여분의 코드를 작성하지 않으려는 경우에 따라서, 당신은 다음과 같이 spark-testing-base을 사용할 수 있습니다 Spark RDD 테스트에 대한 자세한 내용은 다음을 참조하십시오. - KnolX: Unit Testing of Spark Applications

+0

광산은 단지 작은 프로그램이므로 Method : 1을 사용하려고합니다. 그러나 방법 1에서 보여준 (himanshu)는 RDD를 비교하지 않습니다. 당신은 그 RDD에 대한 액션을 수행하고 나서 Integer 값과 동일한 것을 시도합니다. 내가 원하는 것은 2 RDD의 ... 2를 비교하는 것입니다. RDD [myClass] === RDD [myClass] – AJm

+0

RDD를 비교하기 위해서는 방법 2에서 언급 한'RDDComparisons'을 사용해야합니다. – himanshuIIITian

+0

Some이 개발 한 커스텀 라이브러리를 사용하여 개발 중이며 아파치와 같은 큰 우산 밑에서는 사용되지 않습니다. 생산 준비도되지 않았을 수도 있습니다. – AJm

1

감사합니다. 너는이 현저한 질문을 거기 밖으로 퍼트려있다. 어떤 이유로 스파크의 경우 모든 사람들이 분석에 너무 몰두하여 지난 15 년 간 발생한 훌륭한 소프트웨어 엔지니어링 관행을 잊어 버리게됩니다. 그렇기 때문에 우리는 테스트와 지속적인 통합 (DevOps와 같은 다른 것들 중에서도)을 우리 코스에서 논의하는 것이 중요합니다.

빠른 제외하고

내가 계속하기 전에 용어에, 나는 @himanshuIIITian가 인용 KnolX의 발표에 사소한 의견 차이를 표현해야합니다. A true 단위 테스트는 테스트의 모든 구성 요소를 완벽하게 제어 할 수 있음을 의미합니다. 데이터베이스, REST 호출, 파일 시스템 또는 심지어 시스템 시계와도 상호 작용할 수 없습니다. Gerard Mezaros가 xUnit Test Patterns에 넣은대로 모든 것을 "두 배로"해야합니다 (예 : 조롱, 스터브 등). 이것은 의미론처럼 보입니다. 그러나 그것은 정말로 중요합니다. 이 문제를 이해하지 못하면 지속적인 통합에서 간헐적 인 테스트 실패를 볼 수 있습니다.

우리는 여전히 단위 테스트

그래서 단위는 RDD 불가능 테스트, 이러한 이해를 제공 할 수 있습니다. 그러나 분석을 개발할 때 단위 테스트를위한 장소는 여전히 있습니다.

(참고 : 예제에서는 스칼라를 사용하지만 개념은 언어와 프레임 워크를 초월합니다.

rdd.map(foo).map(bar) 

여기 foobar 간단한 함수이다 :)

간단한 조작을 고려한다. 그것들은 일반적인 방법으로 단위 테스트를 할 수 있으며, 여러분이 소집 할 수있는만큼 많은 코너 케이스가 있어야합니다. 결국, 그들은 테스트 픽스쳐 또는 RDD인지 여부에 관계없이 입력을받는 위치를 왜 신경 씁니까?

이 테스트되지

스파크 셸을 잊지 마세요 자체을하지만, 이러한 초기 단계에서 당신은 또한 당신의 변환을 파악하기 위해 스파크 쉘에서 실험을해야하며, 특히 당신의 접근 방식의 결과. 예를 들어 toDebugString, explain, glom, show, printSchema 등과 같이 여러 가지 기능을 사용하여 물리적 및 논리적 쿼리 계획, 분할 전략 및 보존 및 데이터 상태를 검사 할 수 있습니다. 나는 그들을 탐험하게 할 것이다.

Spark 쉘 및 테스트에서 마스터를 local[2]으로 설정하여 작업 배포를 시작한 후에 만 ​​발생할 수있는 문제를 식별 할 수도 있습니다. 재미있는 물건 이제

스파크와

통합 테스트

.

위해

통합 테스트 당신은 당신의 도우미 기능의 품질과 RDD/DataFrame 변환 로직에 확신 후 스파크 에, (에 관계없이 빌드 도구 및 테스트 프레임 워크의) 몇 일을하는 것이 중요합니다 :

  • JVM 메모리를 늘리십시오.
  • 포크를 활성화하지만 병렬 실행을 비활성화합니다.
  • 테스트 프레임 워크를 사용하여 Spark 통합 테스트를 스위트에 축적하고 모든 테스트를 수행하기 전에 SparkContext을 초기화 한 다음 모든 테스트 후에 테스트 프레임 워크를 중지하십시오.

마지막으로 수행 할 수있는 방법은 여러 가지가 있습니다. 하나는 @Pushkr과 @himanshuIIITian에 의해 링크 된 KnolX 프리젠 테이션 모두에 의해 인용 된 spark-testing-base에서 얻을 수 있습니다.

차관 패턴

또 다른 방법은 Loan Pattern을 사용하는 것입니다. 예를 들어

(사용 ScalaTest) :

class MySpec extends WordSpec with Matchers with SparkContextSetup { 
    "My analytics" should { 
    "calculate the right thing" in withSparkContext { (sparkContext) => 
     val data = Seq(...) 
     val rdd = sparkContext.parallelize(data) 
     val total = rdd.map(...).filter(...).map(...).reduce(_ + _) 

     total shouldBe 1000 
    } 
    } 
} 

trait SparkContextSetup { 
    def withSparkContext(testMethod: (SparkContext) => Any) { 
    val conf = new SparkConf() 
     .setMaster("local") 
     .setAppName("Spark test") 
    val sparkContext = new SparkContext(conf) 
    try { 
     testMethod(sparkContext) 
    } 
    finally sparkContext.stop() 
    } 
} 

당신이 볼 수있는 바와 같이, 대출 패턴은 그것의 이후의 "대부"다음 시험에 SparkContext 등을 처분하는 higher-order 함수를 사용합니다 끝난.

고통 지향 프로그래밍 (감사합니다, 나단)

그것은 전적으로 취향의 문제입니다,하지만 난 반입 전에만큼 내가 할 수있는 자신까지 대출 패턴 및 와이어 물건을 사용하는 것을 선호 다른 프레임 워크. 가벼운 체재를 유지하는 것 외에도, 프레임 워크는 때로는 디버깅 테스트 실패를 추론하기가 어려운 "마술"을 많이 추가합니다. 따라서 Suffering-Oriented Programming 접근 방식을 사용합니다.이 프레임 워크를 사용하지 않을 때까지 새로운 프레임 워크를 추가하지 않아도됩니다. 그러나 다시 말하지만, 이것은 당신에게 달렸습니다.

지금 스파크 테스트 기반 정말 빛나는 한 곳은 HDFSClusterLikeYARNClusterLike 같은 하둡 기반 도우미와 함께입니다. 이러한 특성을 혼합하면 설치 통증을 크게 줄일 수 있습니다. 그것이 빛나는 다른 곳은 Scalacheck과 같은 속성과 생성기입니다. 그러나 필자는 분석과 테스트가 그 수준의 정교함에 도달 할 때까지 개인적으로 사용하지 않을 것입니다.

val sparkContext: SparkContext = ... 
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3")) 
val rdd: RDD[(String, String)] = sparkContext.parallelize(data) 
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]] 
val streamingContext = new StreamingContext(sparkContext, Seconds(1)) 
val dStream: InputDStream = streamingContext.queueStream(strings) 
strings += rdd 

:

마지막으로 스트리밍 스파크와

통합 테스트, 난 그냥 메모리 값으로 SparkStreaming 통합 테스트 설정은 다음과 같은 형태가 될 것이다 스 니펫 (snippet)을 제시 하 고 싶습니다 이것은 보이는 것보다 간단합니다. 실제로는 데이터 시퀀스를 대기열로 변환하여 DStream에 공급합니다. 대부분은 Spark API와 함께 작동하는 보일러 플레이트 설정입니다.

내 게시물 중 가장 길기 때문에 여기에 남겨 두겠습니다. 다른 사람들이 다른 모든 응용 프로그램 개발을 개선 한 동일한 민첩한 소프트웨어 엔지니어링 방법으로 분석의 품질을 향상시키는 데 도움이되기를 바랍니다.

그리고 뻔뻔한 플러그에 사과를하시면 Analytics with Apache Spark 코스를 확인하실 수 있습니다. 여기서 많은 아이디어를 얻을 수 있습니다. 곧 온라인 버전을 갖기를 바랍니다.