2016-12-14 2 views
2

나는 scala + Akka에서 손을 잡고 있으며 내결함성을 계산하려고합니다. 감독관으로부터 메시지를 받고 DB에 데이터를 삽입하는 배우가 있습니다. 감독자는 액터에 오류가 발생하면 액터를 다시 시작합니다.Akka 결함 허용 접근

DB에 연결 문제가있는 경우 postRestart()에서 연결 문자열을 변경하려고합니다. 이제는 하나의 DB에 연결 문제가있을 때마다 액터가 다시 시작되고 다른 DB에 데이터를 삽입하기 시작합니다.

이 방법으로 충분합니까? 권장되는 접근 방식은 무엇입니까?

관리자 :

class SocialSupervisor extends Actor { 

    override val supervisorStrategy=OneForOneStrategy(loggingEnabled = false){ 
    case (e:Exception)=>Restart 
    } 

    val post_ref=context.actorOf(Props[Post]) 
    def receive={ 
      case Get_Feed(feed)=>{ 
       //get data from feed 
       post_ref!Post_Message(posted_by,post) 
      } 
    } 
} 

배우 :

내가 더 "결함 허용"만드는 당신은 당신의 코드를 만들 수있는 몇 가지 개선이 있다고 생각
class Post extends Actor{ 
    val config1=ConfigFactory.load() 
    var config=config1.getConfig("MyApp.db") 

    override def postRestart(reason: Throwable) { 
     config=config1.getConfig("MyApp.backup_db") 
     super.postRestart(reason) 
    } 

    def insert_data(commented_by:String,comment:String){ 
      val connection_string=config.getString("url") 
       val username=config.getString("username") 
       val password=config.getString("password") 
       //DB operations 
    } 

    def receive={ 
     case Post_Message(posted_by,message)=>{ 
     insert_data(posted_by, message) 
     } 
    } 
} 

답변

1

. 이 & 어떤 ActorSystem의 독립적 인 테스트를 사용할 수 있도록

모듈화

당신은 아마 배우의 나머지 부분에서 insert_data 기능을 분리해야한다. 귀하의 배우 그들에 약간의 코드를 가지고 있어야하고, receive 방법은 기본적으로 외부 함수에 디스패처해야한다 :

object Post { 
    def insert_data(conn : Connection)(commented_by : String, comment : String) = { 
    ... 
    } 
} 

당신도 한 단계 더 가서 Connection 종속성을 제거 할 수있다. 당신의 배우의 관점에서 삽입은 유효한 행 업데이트의 수 a PostMessage에 소요 반환하는 함수에 불과하다 :

object Post { 
    //returns an Int because Statement.executeUpdate returns an Int 
    type DBInserter : Post_Message => Int 

당신은 이제 데이터베이스 연결에 삽입 할 수 있습니다 전에 :

def insertIntoLiveDB(connFactory :() => Connection) : DBInserter = 
    (postMessage : Post_Message) => { 
     val sqlStr = s"INSERT INTO .." 
     connFactory().createStatement() executeUpdate sqlStr 
    } 
    } 

또는 테스트 목적으로 삽입을하지 않을 함수를 쓰기는 :

//does no inserting 
    val neverInsert : DBInserter = (postMessage : Post_Message) => 0 
} 

는 이제 배우가 사실상 로직이 없습니다

:

class Post(inserter : Post.DBInserter) extends Actor { 

    def receive = { 
    case pm : Post_Message => inserter(pm) 
    } 

} 

내결함성

은 지금까지 응용 프로그램 내에서 "장애"의 가장 큰 소스는 데이터베이스에 Connection하여 경우에 명시 네트워크입니다. 연결 실패시 자동으로 새로 고치는 방법이 필요합니다. 우리는 그렇게 할 공장 기능을 사용할 수 있습니다

def basicConnFactory(timeoutInSecs : Int = 10) = { 

    //setup initial connection, not specified in question 
    var conn : Connection = ??? 

() => { 
    if(conn isValid timeoutInSecs) 
     conn 
    else { 
     conn = ??? //setup backup connection 
     conn 
    } 
    } 
} 

지금 연결의 유효 기간은 각각 삽입 테스트하고 문제가있는 경우 다시 설정. connection pool ...

을 활용하여 생산 요구 사항은 더 엄격한 질수록

import Post.queryLiveDB 
val post_ref = 
    context actorOf (Props[Post], insertIntoLiveDB(basicConnFactory())) 

, 당신은 공장을 ammend 수있는이 공장은 액터를 만드는 데 사용할 수 있습니다