2014-12-21 3 views
7

저는 Akka 지속성을 가지고 놀았으며 이해를 테스트하기 위해 다음 프로그램을 작성했습니다. 문제는이 프로그램을 실행할 때마다 결과가 달라진다는 것입니다. 정답은 49995000이지만 항상 그런 것은 아닙니다. 각 실행 사이에 업무 일지 디렉토리를 정리했지만 아무런 차이가 없습니다. 누가 잘못되었는지 알 수 있습니까? 이 프로그램은 1에서 n까지의 모든 숫자를 단순히 합합니다 (n은 아래 코드에서 9999입니다).확인 된 전송을 사용한 Akka 지속성이 일관성없는 결과를 나타냅니다.

정답은 : (n 개의 * (N + 1)) 49995000.

편집의 N = 9999/2 : 나는이어야한다 JDK 7보다 JDK 8에 더 일관되게 작동하는 것 같다 JDK 8 만 사용 하시겠습니까?

package io.github.ourkid.akka.aggregator.guaranteed 

import akka.actor.Actor 
import akka.actor.ActorPath 
import akka.actor.ActorSystem 
import akka.actor.Props 
import akka.actor.actorRef2Scala 
import akka.persistence.AtLeastOnceDelivery 
import akka.persistence.PersistentActor 

case class ExternalRequest(updateAmount : Int) 
case class CountCommand(deliveryId : Long, updateAmount : Int) 
case class Confirm(deliveryId : Long) 

sealed trait Evt 
case class CountEvent(updateAmount : Int) extends Evt 
case class ConfirmEvent(deliveryId : Long) extends Evt 

class TestGuaranteedDeliveryActor(counter : ActorPath) extends PersistentActor with AtLeastOnceDelivery { 

    override def persistenceId = "persistent-actor-ref-1" 

    override def receiveCommand : Receive = { 
    case ExternalRequest(updateAmount) => persist(CountEvent(updateAmount))(updateState) 
    case Confirm(deliveryId) => persist(ConfirmEvent(deliveryId)) (updateState) 
    } 

    override def receiveRecover : Receive = { 
    case evt : Evt => updateState(evt) 
    } 

    def updateState(evt:Evt) = evt match { 
    case CountEvent(updateAmount) => deliver(counter, id => CountCommand(id, updateAmount)) 
    case ConfirmEvent(deliveryId) => confirmDelivery(deliveryId) 
    } 
} 

class FactorialActor extends Actor { 
    var count = 0 
    def receive = { 
    case CountCommand(deliveryId : Long, updateAmount:Int) => { 
     count = count + updateAmount 
     sender() ! Confirm(deliveryId) 
    } 
    case "print" => println(count) 
    } 
} 

object GuaranteedDeliveryTest extends App { 
    val system = ActorSystem() 

    val factorial = system.actorOf(Props[FactorialActor]) 

    val delActor = system.actorOf(Props(classOf[TestGuaranteedDeliveryActor], factorial.path)) 

    import system.dispatcher 

    system.scheduler.schedule(0 seconds, 2 seconds) { factorial ! "print" } 

    for (i <- 1 to 9999) 
    delActor ! ExternalRequest(i) 



} 

SBT 파일

name := "akka_aggregator" 

organization := "io.github.ourkid" 

version := "0.0.1-SNAPSHOT" 

scalaVersion := "2.11.4" 

scalacOptions ++= Seq("-unchecked", "-deprecation") 

resolvers ++= Seq(
    "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" 
) 

val Akka = "2.3.7" 
val Spray = "1.3.2" 

libraryDependencies ++= Seq(
    // Core Akka 
    "com.typesafe.akka" %% "akka-actor" % Akka, 
    "com.typesafe.akka" %% "akka-cluster" % Akka, 
    "com.typesafe.akka" %% "akka-persistence-experimental" % Akka, 
    "org.iq80.leveldb" % "leveldb" % "0.7", 
    "org.fusesource.leveldbjni" % "leveldbjni-all" % "1.8", 

    // For future REST API 
    "io.spray" %% "spray-httpx" % Spray, 
    "io.spray" %% "spray-can" % Spray, 
    "io.spray" %% "spray-routing" % Spray, 
    "org.typelevel" %% "scodec-core" % "1.3.0", 

    // CSV reader  
    "net.sf.opencsv" % "opencsv" % "2.3", 

    // Logging 
    "com.typesafe.akka" %% "akka-slf4j" % Akka, 
    "ch.qos.logback" % "logback-classic" % "1.0.13", 

    // Testing 
    "org.scalatest" %% "scalatest" % "2.2.1" % "test", 
    "com.typesafe.akka" %% "akka-testkit" % Akka % "test", 
    "io.spray" %% "spray-testkit" % Spray % "test", 
    "org.scalacheck" %% "scalacheck" % "1.11.6" % "test" 
) 
fork := true 
mainClass in assembly := Some("io.github.ourkid.akka.aggregator.TestGuaranteedDeliveryActor") 

application.conf 파일

########################################## 
# Akka Persistence Reference Config File # 
########################################## 

akka { 

    # Loggers to register at boot time (akka.event.Logging$DefaultLogger logs 
    # to STDOUT) 
    loggers = ["akka.event.slf4j.Slf4jLogger"] 

    # Log level used by the configured loggers (see "loggers") as soon 
    # as they have been started; before that, see "stdout-loglevel" 
    # Options: OFF, ERROR, WARNING, INFO, DEBUG 
    loglevel = "DEBUG" 

    # Log level for the very basic logger activated during ActorSystem startup. 
    # This logger prints the log messages to stdout (System.out). 
    # Options: OFF, ERROR, WARNING, INFO, DEBUG 
    stdout-loglevel = "INFO" 

    # Filter of log events that is used by the LoggingAdapter before 
    # publishing log events to the eventStream. 
    logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" 

    # Protobuf serialization for persistent messages 
    actor { 

    serializers { 

     akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" 
     akka-persistence-message = "akka.persistence.serialization.MessageSerializer" 
    } 

    serialization-bindings { 

     "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot 
     "akka.persistence.serialization.Message" = akka-persistence-message 
    } 
    } 

    persistence { 

    journal { 

     # Maximum size of a persistent message batch written to the journal. 
     max-message-batch-size = 200 

     # Maximum size of a deletion batch written to the journal. 
     max-deletion-batch-size = 10000 

     # Path to the journal plugin to be used 
     plugin = "akka.persistence.journal.leveldb" 

     # In-memory journal plugin. 
     inmem { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.inmem.InmemJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.actor.default-dispatcher" 
     } 

     # LevelDB journal plugin. 
     leveldb { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.leveldb.LeveldbJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

     # Dispatcher for message replay. 
     replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" 

     # Storage location of LevelDB files. 
     dir = "journal" 

     # Use fsync on write 
     fsync = on 

     # Verify checksum on read. 
     checksum = off 

     # Native LevelDB (via JNI) or LevelDB Java port 
     native = on 
     # native = off 
     } 

     # Shared LevelDB journal plugin (for testing only). 
     leveldb-shared { 

     # Class name of the plugin. 
     class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.actor.default-dispatcher" 

     # timeout for async journal operations 
     timeout = 10s 

     store { 

      # Dispatcher for shared store actor. 
      store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

      # Dispatcher for message replay. 
      replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

      # Storage location of LevelDB files. 
      dir = "journal" 

      # Use fsync on write 
      fsync = on 

      # Verify checksum on read. 
      checksum = off 

      # Native LevelDB (via JNI) or LevelDB Java port 
      native = on 
     } 
     } 
    } 

    snapshot-store { 

     # Path to the snapshot store plugin to be used 
     plugin = "akka.persistence.snapshot-store.local" 

     # Local filesystem snapshot store plugin. 
     local { 

     # Class name of the plugin. 
     class = "akka.persistence.snapshot.local.LocalSnapshotStore" 

     # Dispatcher for the plugin actor. 
     plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" 

     # Dispatcher for streaming snapshot IO. 
     stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" 

     # Storage location of snapshot files. 
     dir = "snapshots" 
     } 
    } 

    view { 

     # Automated incremental view update. 
     auto-update = on 

     # Interval between incremental updates 
     auto-update-interval = 5s 

     # Maximum number of messages to replay per incremental view update. Set to 
     # -1 for no upper limit. 
     auto-update-replay-max = -1 
    } 

    at-least-once-delivery { 
     # Interval between redelivery attempts 
     redeliver-interval = 5s 

     # Maximum number of unconfirmed messages that will be sent in one redelivery burst 
     redelivery-burst-limit = 10000 

     # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` 
     # message will be sent to the actor. 
     warn-after-number-of-unconfirmed-attempts = 5 

     # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is 
     # allowed to hold in memory. 
     max-unconfirmed-messages = 100000 
    } 

    dispatchers { 
     default-plugin-dispatcher { 
     type = PinnedDispatcher 
     executor = "thread-pool-executor" 
     } 
     default-replay-dispatcher { 
     type = Dispatcher 
     executor = "fork-join-executor" 
     fork-join-executor { 
      parallelism-min = 2 
      parallelism-max = 8 
     } 
     } 
     default-stream-dispatcher { 
     type = Dispatcher 
     executor = "fork-join-executor" 
     fork-join-executor { 
      parallelism-min = 2 
      parallelism-max = 8 
     } 
     } 
    } 
    } 
} 

올바른 출력 :

18:02:36.684 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
18:02:36.684 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 
0 
18:02:36.951 [default-akka.actor.default-dispatcher-14] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
18:02:36.966 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
3974790 
24064453 
18:02:42.313 [default-akka.actor.default-dispatcher-11] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
49995000 
49995000 
49995000 
49995000 

잘못된 실행 :

17:56:22.493 [default-akka.actor.default-dispatcher-4] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
17:56:22.508 [default-akka.actor.default-dispatcher-4] DEBUG akka.event.EventStream - Default Loggers started 
0 
17:56:22.750 [default-akka.actor.default-dispatcher-2] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
17:56:22.765 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
3727815 
22167811 
17:56:28.391 [default-akka.actor.default-dispatcher-3] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
49995000 
51084018 
51084018 
52316760 
52316760 
52316760 
52316760 
52316760 

또 다른 잘못된 실행 : 당신은 AtLeastOnceDelivery 의미를 사용하고

17:59:12.122 [default-akka.actor.default-dispatcher-3] INFO akka.event.slf4j.Slf4jLogger - Slf4jLogger started 
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - logger log1-Slf4jLogger started 
17:59:12.137 [default-akka.actor.default-dispatcher-3] DEBUG akka.event.EventStream - Default Loggers started 
0 
17:59:12.387 [default-akka.actor.default-dispatcher-7] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentImpl] 
17:59:12.402 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.CountEvent] 
2982903 
17710176 
49347145 
17:59:18.204 [default-akka.actor.default-dispatcher-13] DEBUG a.s.Serialization(akka://default) - Using serializer[akka.serialization.JavaSerializer] for message [io.github.ourkid.akka.aggregator.guaranteed.ConfirmEvent] 
51704199 
51704199 
55107844 
55107844 
55107844 
55107844 

답변

10

. 이 here의 말처럼

주에서-최소 한 번 전달이 원래 메시지 항상 유지되지 않습니다 위해 를 보내고 대상이 중복 메시지가 나타날 수 있음을 의미한다. 즉 의미 보내 ActorRef 정상 의 것과 일치하지 않는 것을 의미 동작 :

그것이 AT-대부분 한번 동일한 송신기 - 수신기 쌍에 대한 전달 메시지 순서는 후 가능한 재송신으로 인해 보존되지되지

크래시 및 다시 시작 메시지가 전달됩니다. 새 액터 화신이 의미는 ActorPath가 나타내는 것과 유사하므로 액터 라이프 사이클을 참조하십시오. 따라서 메시지를 전달할 때 은 경로가 아닌 참조를 제공해야합니다. 메시지는 액터 선택과 함께 경로로 전송됩니다.

일부 숫자는 두 번 이상 수신 될 수 있습니다. FactorialActor 안에 중복 된 숫자를 무시하거나이 의미를 사용하지 마십시오.

관련 문제