2016-12-08 1 views
1

나는 다음과 같은 오류 받고 있어요 불일치 ``` 발에 G1 = GraphDSL.create을() {암시 빌더 => 수입 GraphDSL.Implicits._akka 스트림의 입구와 출구가

val in: Source[ByteString, Any] = Source.single(ByteString(digest)) 
    val flow: GraphStage[FlowShape[ByteString, ByteString]] = new ReadBlockStage(dataStore, blockingExecutionContext) 

    in ~> flow 

    SourceShape(flow.shape.out) 
} 

브로 sourceGraph : 소스 [ByteString, NOTUSED = Source.fromGraph (G1) ``

및 내 흐름은 다음과 같이 정의됩니다. class ReadBlockStage(dataStore: DataStore, implicit val exceutionContext: ExecutionContext) extends GraphStage[FlowShape[ByteString, ByteString]] with DefaultJsonProtocol { val in = Inlet[ByteString]("DigestSpec.in") val out = Outlet[ByteString]("BlockOut.out") override val shape = FlowShape.of(in, out) ... } 왜이 오류가 발생합니까? 흐름의 "out"포트는 Outlet [ByteString] 유형이고, My Source는 소스 [ByteString, NotUsed]입니다. 오류 메시지는 모양과 예상되는 모양이 동일하기 때문에 매우 혼란 스럽습니다.

답변

1

문제점을 파악했습니다. 각 그래프 요소에 대해 builder.add()를 수행하는 것을 잊었습니다.

관련 문제