2016-09-27 5 views
0

akka 스케줄러 사용법에 대한 몇 가지 예를 찾고 있습니다. 필자는 일부 데이터베이스에서 데이터를 검색하기 위해 하나의 액터 (dataProducer라고 부름) 구현을했습니다. 5 초 간격으로 dataProducer 액터를 극대화 할 하나의 스케줄러 액터를 작성하고 싶습니다. 또한 데이터 검색이 스케줄러 간격보다 많은 시간이 걸리는 경우 케이스를 처리하는 방법. Scheduler 액터의 scheduleOnce 메소드가 이것을 처리합니까?akka 스케줄러를 사용한 데이터 푸시

여기 내 스케줄러 배우

import java.util.concurrent.{Executors, TimeUnit} 
import akka.actor.{Actor, Cancellable, Props} 
import scala.concurrent.ExecutionContext 

class SchedulerActor(interval: Long) extends Actor with LogF{ 

    implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(100)) 

    private var scheduler: Cancellable = _ 

    override def preStart(): Unit = { 
    import scala.concurrent.duration._ 
    scheduler = context.system.scheduler.schedule(
     initialDelay = 0 seconds, 
     interval = FiniteDuration(interval, TimeUnit.SECONDS), 
     receiver = self, 
     message = FetchData 
    ) 
    } 

    override def postStop(): Unit = { 
    scheduler.cancel() 
    } 

    def receive = { 
    case FetchData => 
     logger.debug("Fetch Data") 
     sender() ! "Data Fetched!!!" //here I'll call dataProducer API 
     true 
    case unknown => 
     throw new RuntimeException("ERROR: Received unknown message [" + unknown + "], can't handle it") 
    } 

} 

object SchedulerActor { 

    def props(interval: Long): Props = Props(new SchedulerActor(interval)) 
} 

sealed trait FetchDataMessage 
case object FetchData extends FetchDataMessage 
+0

의미있는 답변을 얻으려면 지금까지 시도한 내용을 보여 드리는 것이 좋습니다. – hasumedic

+0

내 질문이 업데이트되었습니다. – Abhay

답변

0

scheduleOnce 일부 지연 후 조각을 실행하는 데 도움이 스케줄러. 서로 다른 상태를 가지며 상태를 전환하여 다른 종류의 메시지를 받아들이고 그에 따라 작동하십시오. 하지만 시간 초과가 발생하면 scheduleOnce가 timeoutState로 연결됩니다.

ScheduleOnce는 배우가 타임 아웃이 발생했음을 알 수 있도록 도와줍니다.

데이터 검색에 스케줄러 간격보다 많은 시간이 필요한 경우 어떻게 처리합니까?

데이터를 지정된 시간에 배우 상태 이상 소요 timeoutState을 변경하고 타임 아웃 상태에서 가져 오는 경우는 배우로 수행해야 무슨 말을. 재 시도하거나 다른 소스를 시도 할 수 있습니다.

나는 극 오초와 scheduleOnce에 대한 결과 상태의 대기에서

간격 오초에서 dataProducer 배우가 dataProducer하고 모든 일이 다시 반복 요청이 지연됩니다 스케줄러 배우를 쓰고 싶습니다 .

수행 방법을 이해하려면이 코드를 확인하십시오.

import akka.actor.{Actor, Cancellable} 
import stackoverflow.DBUtils.Entity 

import scala.concurrent.Future 
import scala.concurrent.duration._ 
import akka.pattern.pipe 


object DBPollActor { 
    case class Result(results: List[Entity]) 
    case object Schedule 
    case object Timeup 
    case object FetchData 
} 

object DBUtils { 
    case class Entity(name: String) 

    def doDBOperation: Future[List[Entity]] = { 
    Future.successful(List(Entity(name = "foo"))) 
    } 

} 

class DBPollActor(timeout: Int) extends Actor { 

    import DBPollActor._ 

    implicit val ex = context.system.dispatcher 

    var schedulerOpt: Option[Cancellable] = None 

    @scala.throws[Exception](classOf[Exception]) 
    override def preStart(): Unit = { 
    super.preStart() 
    self ! FetchData 
    schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
    }) 
    } 

    override def receive: Receive = { 
    case [email protected] => 
     context become startState 
     self forward msg 
    } 

    def startState: Receive = { 
    case FetchData => 
     schedulerOpt.map(_.cancel()) 
     context become resultState 
     DBUtils.doDBOperation.map(Result) pipeTo self 
     schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
     }) 
    } 

    def timeoutState: Receive = { 
    case Timeup => 
     schedulerOpt.map(_.cancel()) 
     //Timeout happened do something or repeat 
    } 

    def resultState: Receive = { 
    case [email protected](list) => 
     schedulerOpt.map(_.cancel()) 
     //Result available consume the result and repeat or doSomething different 
    context become resultState 
     DBUtils.doDBOperation.map(Result) pipeTo self 
     schedulerOpt = Some(context.system.scheduler.scheduleOnce(timeout seconds) { 
     context become timeoutState 
     self ! Timeup 
     }) 

    case ex: Exception => 
     schedulerOpt.map(_.cancel()) 
     //future failed exit or retry 
    } 
} 
관련 문제