나는 다음과 같은 오류 받고 있어요 불일치 ``` 발에 G1 = GraphDSL.create을() {암시 빌더 => 수입 GraphDSL.Implicits._akka 스트림의 입구와 출구가
val in: Source[ByteString, Any] = Source.single(ByteString(digest))
val flow: GraphStage[FlowShape[ByteString, ByteString]] = new ReadBlockStage(dataStore, blockingExecutionContext)
in ~> flow
SourceShape(flow.shape.out)
}
브로 sourceGraph : 소스 [ByteString, NOTUSED = Source.fromGraph (G1) ``
및 내 흐름은 다음과 같이 정의됩니다. class ReadBlockStage(dataStore: DataStore, implicit val exceutionContext: ExecutionContext) extends GraphStage[FlowShape[ByteString, ByteString]] with DefaultJsonProtocol { val in = Inlet[ByteString]("DigestSpec.in") val out = Outlet[ByteString]("BlockOut.out") override val shape = FlowShape.of(in, out) ... }
왜이 오류가 발생합니까? 흐름의 "out"포트는 Outlet [ByteString] 유형이고, My Source는 소스 [ByteString, NotUsed]입니다. 오류 메시지는 모양과 예상되는 모양이 동일하기 때문에 매우 혼란 스럽습니다.