2012-02-07 2 views
9

Netty 라이브러리 (GitHub의 버전 4)를 사용하고 있습니다. 스칼라에서 잘 작동하지만, 비동기 대기를 위해 연속 전달 스타일을 사용할 수 있기를 바라고 있습니다. netty/NIO 리스너에서 스칼라 연속 사용하기

은 전통적 인 Netty와 당신이 뭔가를 (예를 들어 비동기 작업을 연결) 할 것 :

//client is a ClientBootstrap 
val future:ChannelFuture = client.connect(remoteAddr); 
future.addListener(new ChannelFutureListener { 
    def operationComplete (f:ChannelFuture) = { 
     //here goes the code that happens when the connection is made 
    } 
}) 

당신이 라이브러리 (내가하는) 당신은 기본적으로 사용자를 허용하는 세 가지 간단한 옵션이 있습니다를 구현하는 경우

  1. 연결 메소드에서 ChannelFuture를 반환하고 사용자가 처리하도록하십시오. 이것은 netty에서 많은 추상화를 제공하지 않습니다.
  2. ChannelFutureListener를 연결 메소드의 매개 변수로 가져 와서 Listener로 ChannelFuture에 추가하십시오.
  3. 이 연결 방법의 매개 변수로 콜백 함수 객체를 가지고 사용자가 만드는 ChannelFutureListener 내에서 그 전화를 내가 무엇

(이 다소 Node.js를 같은 콜백 중심의 스타일로 만들 것입니다) 하려고하는 것은 네 번째 옵션입니다. 그것이 위의 수에 포함되지 않은 이유는 간단하지 않기 때문입니다.

나는 도서관의 사용이 다소 차단 라이브러리처럼 만들기 위해 스칼라 구분의 연속 요청을 사용하고자하지만, 뒤에서 비 차단됩니다

class MyLibraryClient { 
    def connect(remoteAddr:SocketAddress) = { 
     shift { retrn: (Unit => Unit) => { 
       val future:ChannelFuture = client.connect(remoteAddr); 
       future.addListener(new ChannelFutureListener { 
        def operationComplete(f:ChannelFuture) = { 
         retrn(); 
        } 
       }); 
      } 
     } 
    } 
} 

다른 읽기/쓰기 작업이 구현되는 상상 같은 패션. 사용자의 코드를 더 같이 할 수있는이 존재의 목적 : 즉

reset { 
    val conn = new MyLibraryClient(); 
    conn.connect(new InetSocketAddress("127.0.0.1", 1337)); 
    println("This will happen after the connection is finished"); 
} 

, 프로그램은 간단한 차단 스타일의 프로그램을 같이하지만 무대 뒤에서 어떤 차단 또는 스레딩이되지 않습니다 .

내가 겪고있는 문제는 구분 된 연속 입력이 어떻게 작동하는지 완전히 이해하지 못한다는 점입니다. 위의 방법으로 구현하려고하면 컴파일러에서 내 operationComplete 구현이 실제로 Unit 대신 Unit @scala.util.continuations.cpsParam[Unit,Unit => Unit]을 반환한다고 불평합니다. scala의 CPS에 일종의 "잡았다"가 있다는 점에서 shift 메서드의 반환 유형에 @suspendable이라는 주석을 달아야하는데, reset까지 호출 스택이 전달되지만 조정할 방법이없는 것 같습니다. 구분 된 연속성에 대한 개념이없는 선재 Java 라이브러리가있는 것입니다.

Swarm이 연속성을 직렬화하고 다른 곳에서 계산되도록 네트워크를 통해 잼 할 수 있다면, 기존 Java 클래스에서 연속성을 호출 할 수 있어야합니다. 그러나 나는 그것이 어떻게 행해질 수 있는지 알 수 없다. 이 일을하기 위해 스칼라에서 netty의 전체 부분을 다시 작성해야합니까?

+0

하지만 난 당신의 아이디어에 대해 제안한다. 왜 그랬는지 말해 줄 게요. 그러나 사용자가 귀하의 도서관의 비동기 성질을 "알지 못하도록"만들면 청취자 코드에서 "확인"할 수있는 권한이 있다는 것을 알려줍니다. 사실 그는 청취자에게 코드를 작성하는 것조차 알지 못합니다. 청취자에서 차단 호출을하면 모든 종류의 문제가 발생할 수 있습니다. 대부분의 시간에 볼 수있는 문제는 다른 IO 작업을 "느리게"하여 처리량을 제한한다는 것입니다. –

+1

좋은 지적이 있지만 동의하지 않습니다. 나는 나의 라이브러리 사용자가 나와 만 있다면, 아마도 '리셋'이 무엇을하는지 이해해야 할 것이고 따라서 콜이 비 블로킹 (non-blocking)하다는 것을 이해할 것이다. 이것은 A) 구분 된 연속을 더 잘 이해하고 B) 본질적으로 콜백 중심의 코드를보다 명확한 방식으로 작성하는 실험입니다. – Jeremy

답변

4

내가 처음 시작할 때 Scala's continuations의 설명이 매우 유용하다는 것을 알았습니다. 특히 그가 shift[A, B, C]reset[B, C]을 설명하는 부분에주의하십시오. nulloperationComplete의 마지막 문장으로 추가하면 도움이됩니다.

Btw 내부에 shift이 포함될 수 있으면 reset 안에 retrn()을 호출해야합니다.

편집 : 여기에 동작하는 예제 가능한 출력

import scala.util.continuations._ 
import java.util.concurrent.Executors 

object Test { 

    val execService = Executors.newFixedThreadPool(2) 

    def main(args: Array[String]): Unit = { 
    reset { 
     val conn = new MyLibraryClient(); 
     conn.connect("127.0.0.1"); 
     println("This will happen after the connection is finished"); 
    } 
    println("Outside reset"); 
    } 
} 

class ChannelFuture { 
    def addListener(listener: ChannelFutureListener): Unit = { 
    val future = this 
    Test.execService.submit(new Runnable { 
     def run(): Unit = { 
     listener.operationComplete(future) 
     } 
    }) 
    } 
} 

trait ChannelFutureListener { 
    def operationComplete(f: ChannelFuture): Unit 
} 

class MyLibraryClient { 
    def connect(remoteAddr: String): [email protected][Unit] = { 
    shift { 
     retrn: (Unit => Unit) => { 
     val future: ChannelFuture = new ChannelFuture() 
     future.addListener(new ChannelFutureListener { 
      def operationComplete(f: ChannelFuture): Unit = { 
      println("operationComplete starts") 
      retrn(); 
      null 
      } 
     }); 
     } 
    } 
    } 
} 

이다 : 나는 하우투는 스칼라 물건을 수정 모르는

Outside reset 
operationComplete starts 
This will happen after the connection is finished 
+0

사실 이것은 컴파일러를 행복하게 만들고 심지어 제대로 작동하는 것처럼 보입니다. 익명의'ChannelFutureListener' 밖에서'shift'를 움직이고 클로저를 사용하여'operationComplete' 내부에서 연속을 호출하는 것이 중요합니다. 나는 이것이 왜 효과가 있고 다른 방법으로는 이해가되는지 모르지만, 나는 그것을 받아 들일 것이다. 감사! – Jeremy

+0

그리고 그것은 스칼라의 연속에 대한 아주 좋은 읽을 거리입니다. 그들은 continuation에 대한 scala-lang.org 페이지에서 쓸모없는 예제를 제거하고 링크 된 기사로 대체해야합니다. – Jeremy

+0

는 @Jeremy 그래, 그 기사는 @Jeremy가 BTW, 코드와 내 사이의 차이가 나는 명시 적으로 몇 가지 방법의 반환 유형을 주석 있다는 것입니다 :) – shams

관련 문제