2016-10-04 2 views
2

나는 sample of mapWithState function on Databricks website을 따르고 있습니다.스파크 스트리밍에서 mapWithState로 시간 초과 지정

trackstatefunction에 대한 코드는 다음과 같다 :

def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

가 나는 상태 타이밍 아웃됩니다 (state.isTimingout()==true)을 다음 함수가 다시 예외가 발생할 수 있습니다 물리게를 업데이트 할 경우에 질문을했다. 샘플에 대해 사실입니까?

답변

3

상태가 시간 초과 (state.isTimingout() == true) 인 경우 예외가 발생할 수있는 상태를 다시 업데이트합니다.

네, 맞습니다. 명시 적 제한 시간을 mapWithState으로 설정하고 state.update을 호출하면 제한 시간이 경과하면 상태를 업데이트 할 수 없으므로 예외가 throw되는 마지막 시간 초과 반복이 발생합니다. 즉 (이미 제거 된 경우

국가는 업데이트 할 수 없습니다 (즉, 제거()는 이미 호출 된) 또는 때문에 시간 초과로 제거 될 것입니다 :이 명시 적으로 in the documentation 적혀있다 , isTimingOut()이 참). 타임 아웃이 발생하면 valueNone해야하기 때문에 당신은뿐만 아니라 일치하는 패턴을 사용할 수 있습니다,

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    if (!state.isTimingOut) state.update(sum) 
    Some(output) 
} 

을 또는 : 당신의 예에서


는 추가 검사는 순서에

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[Long]): Option[(String, Long)] = { 
    value match { 
    case Some(v) => 
     val sum = v.toLong + state.getOption.getOrElse(0L) 
     state.update(sum) 
     Some((key, sum)) 
    case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L)) 
    } 
} 

스테이트 풀 스트리밍에 대한 자세한 내용은 this blog post (면책 조항 : 저는 저자입니다.)

+0

안녕하세요 @ Yuval, 특정 키가 초과되면 모든 상태가 사라 졌나요? 당신은 처음부터 시작해야합니까? – marios

+1

@marios 예, 제한 시간이 지나면 키가 삭제로 표시됩니다. –

+0

시간 초과 후 상태를 유지해야 할 필요가 있다면 직접 해봐야 할 것 같습니까? 감사합니다 Yuval! – marios

관련 문제