0

트위터 스트리밍 API에서 응답 데이터를 읽는 방법 - POST 상태/필터? 연결을 설정했으며 200 개의 상태 코드가 수신되었지만 짹짹을 읽는 방법을 모르겠습니다. 나는 그들이 올 때 짹짹이나 인쇄하고 싶다.Playframework 및 Twitter 스트리밍 API

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200) 
    println(response.body) 
} 

편집 : 나는 stream()를 호출이 솔루션

ws.url(url) 
.sign(OAuthCalculator(consumerKey, requestToken)) 
.withMethod("POST") 
.stream() 
.map { response => 
    if(response.headers.status == 200){ 
    response.body 
     .scan("")((acc, curr) => if (acc.contains("\r\n")) curr.utf8String else acc + curr.utf8String) 
     .filter(_.contains("\r\n")) 
     .map(json => Try(parse(json).extract[Tweet])) 
     .runForeach { 
     case Success(tweet) => 
      println("-----") 
      println(tweet.text) 
     case Failure(e) => 
      println("-----") 
      println(e.getStackTrace) 
     } 
    } 
} 

답변

4

스트리밍 WS 요청에 대한 응답 본문은 Akka Streams Source 바이트입니다. Twitter API 응답은 개행 문자로 구분되기 때문에 (보통) Framing.delimiter을 사용하여 바이트 덩어리로 분할하고 청크를 JSON으로 구문 분석 한 다음 원하는 내용으로 처리 할 수 ​​있습니다. 이런 식으로 뭔가 작업을해야합니다 :

import akka.stream.scaladsl.Framing 
import scala.util.{Success, Try} 
import akka.util.ByteString 
import play.api.libs.json.{JsSuccess, Json, Reads} 
import play.api.libs.oauth.{ConsumerKey, OAuthCalculator, RequestToken} 

case class Tweet(id: Long, text: String) 
object Tweet { 
    implicit val reads: Reads[Tweet] = Json.reads[Tweet] 
} 

def twitter = Action.async { implicit request => 
    ws.url("https://stream.twitter.com/1.1/statuses/filter.json?track=Rio2016") 
     .sign(OAuthCalculator(consumerKey, requestToken)) 
     .withMethod("POST") 
     .stream().flatMap { response => 
    response.body 
     // Split up the byte stream into delimited chunks. Note 
     // that the chunks are quite big 
     .via(Framing.delimiter(ByteString.fromString("\n"), 20000)) 
     // Parse the chunks into JSON, and then to a Tweet. 
     // A better parsing strategy would be to account for all 
     // the different possible responses, but here we just 
     // collect those that match a Tweet. 
     .map(bytes => Try(Json.parse(bytes.toArray).validate[Tweet])) 
     .collect { 
     case Success(JsSuccess(tweet, _)) => tweet.text 
     } 
     // Print out each chunk 
     .runForeach(println).map { _ => 
     Ok("done") 
    } 
    } 
} 

참고 : 당신이 당신의 컨트롤러에 암시 Materializer를 주입해야 스트림을 구체화 할 수 있습니다.

+0

설명해 주셔서 감사합니다. – mkovacek

+0

나중에 연결을 닫을 수 있습니까? 다른 단어를 추적하기위한 여러 요청을 계획하고 있으며 특정 미래의 특정 연결을 닫고 싶습니다. – mkovacek

+1

Akka 문서에서 [Dynamic Stream Handling] (http://doc.akka.io/docs/akka/2.4.8/scala/stream/stream-dynamic.html)을보십시오. 하나의 아이디어 : 공유 된 kill 스위치를 만든 다음'source.via (killSwitch.flow)'를 사용하여 스트림에 추가하십시오. killswitch에서'shutdown()'을 실행하면 연결을 닫아야합니다. – Mikesname

3

를 발견 한 것은 다시 Future[StreamedResponse]을 제공합니다. 그 때 ByteString 청크를 변환하기 위해 akka 관용구를 사용해야합니다. 뭔가 같은 : 내가 위의 코드를 테스트하지 않았다 (하지만이 https://www.playframework.com/documentation/2.5.x/ScalaWS의 스트리밍 응답 섹션 플러스 http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html에서 싱크 설명 떨어져 기반으로)이 각을 인쇄 할 수 있습니다

val stream = ws.url(url) 
    .sign(OAuthCalculator(consumerKey, requestToken)) 
    .withMethod("POST") 
    .stream() 

stream flatMap { res => 
    res.body.runWith(Sink.foreach[ByteString] { bytes => 
    println(bytes.utf8String) 
    }) 
} 

노트 청크 자체의 라인에, 그리고 나는 지저귐 API가 청크 당 완전한 json blob을 돌려 주는지 확신 할 수 없다. 청크를 인쇄하기 전에 누적하려면 Sink.fold을 사용해야 할 수도 있습니다.

관련 문제