2012-03-28 1 views
2

Apache HTTPClient 4를 사용하여 기본 수준 액세스로 트위터의 스트리밍 API에 연결합니다. 그것은 처음에 완벽하게 잘 작동하지만 데이터를 검색 몇 분 후에는이 오류와 함께 밖으로 보석금 : 나는이 문제를 직면하고있는 이유아파치 httpclient를 사용하여 트위터의 스트리밍 API를 점차적으로 처리 하시겠습니까?

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443] 
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource 
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated. 
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216) 
Make sure to release the connection before allocating another one. 
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190) 

이해. flume 소스로 flume 클러스터에서이 HttpClient를 사용하려고합니다. 코드는 다음과 같습니다 : 나는 StringBuffer를에 응답 스트림에서 30,000 문자를 버퍼링 한 후 수신 된 데이터로이를 반환하려고

public Event next() throws IOException, InterruptedException { 

    try { 

     HttpHost target = new HttpHost("stream.twitter.com", 443, "https"); 
     new BasicHttpContext(); 
     HttpPost httpPost = new HttpPost("/1/statuses/filter.json"); 
     StringEntity postEntity = new StringEntity("track=birthday", 
       "UTF-8"); 
     postEntity.setContentType("application/x-www-form-urlencoded"); 
     httpPost.setEntity(postEntity); 
     HttpResponse response = httpClient.execute(target, httpPost, 
       new BasicHttpContext()); 
     BufferedReader reader = new BufferedReader(new InputStreamReader(
       response.getEntity().getContent())); 
     String line = null; 
     StringBuffer buffer = new StringBuffer(); 
     while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if(buffer.length()>30000) break; 
     } 
     return new EventImpl(buffer.toString().getBytes()); 
    } catch (IOException ie) { 
     throw ie; 
    } 

} 

. 나는 분명히 연결을 닫지 않는다. -하지만 나는 그것을 닫고 싶지 않다. 나는 추측한다. 트위터의 dev에 가이드 그것은 읽기이 here에 대해 이야기 :

Some HTTP client libraries only return the response body after the connection has been closed by the server. These clients will not work for accessing the Streaming API. You must use an HTTP client that will return response data incrementally. Most robust HTTP client libraries will provide this functionality. The Apache HttpClient will handle this use case, for example.

그것은 분명 HttpClient를 점진적으로 응답 데이터를 반환합니다 있음을 알려줍니다. 예제와 튜토리얼을 살펴 보았지만이 작업을 수행하는 데는 아무 것도 발견하지 못했습니다. 여러분이 httpclient (아파치가 아닐 경우)를 사용하고 점차적으로 트위터의 스트리밍 API를 읽은 경우,이 묘기를 어떻게 달성했는지 알려주십시오. 그렇지 않은 사람들은 자유롭게 답변에 기여하십시오. TIA.

UPDATE는 I 이러는 시도 : 1) I는 플룸 소스의 개방 방법에 스트림 핸들을 획득 옮겼다. 2) 간단한 inpustream을 사용하고 데이터를 바이트 버퍼로 읽어들입니다. 그래서 여기 메소드 본문이 지금 모습입니다 :

 byte[] buffer = new byte[30000]; 

     while (true) { 
      int count = instream.read(buffer); 
      if (count == -1) 
       continue; 
      else 
       break; 
     } 
     return new EventImpl(buffer); 

이 어느 정도 작동 - 나는 트윗을 얻을 수는, 그들은 잘 목적지에 기록되고있다. 문제는 instream.read (버퍼) 반환 값입니다. 스트림에 데이터가없고 버퍼의 기본값이 \ u0000 바이트와 30,000 인 경우에도이 값은 대상에 기록됩니다. 그래서 대상 파일은 다음과 같습니다. "짹짹 .. 짹짹 .. 딱지 .. \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 ... 짹짹 .. 짹짹 ...". 나는이 카운트가 결코 끝나지 않는 스트림이라는 -1 코즈를 리턴하지 않을 것이라는 것을 이해합니다. 버퍼가 읽기 명령의 새로운 내용을 가지고 있는지 어떻게 알 수 있습니까?

+0

#close 메소드로 발생하는 I/O 예외를 잡으려고 시도 했습니까? 그에 따라 아래에서 내 대답을 업데이트했습니다. – oleg

+0

또한, \ u0000 \ u0000 ... 바이트/null 바이트는 스트림에 없습니다. - 30k 문자로 버퍼를 인스턴스화하면 이들이 기본 바이트가되고 스트림 내용이 30k 문자보다 작 으면 나머지 문자는 빈 바이트입니다. – Jay

답변

0

그것은 flume 문제였다. Flume은 크기가 32kb 인 이벤트를 전송하도록 최적화되어 있습니다. 32kb를 넘는 모든 것, Flume bails out. 해결 방법은 이벤트 크기를 32KB보다 크게 조정하는 것입니다. 그래서 적어도 20,000자를 버퍼하도록 코드를 변경했습니다. 그것은 일종의 작품이지만, 어리석은 증거는 아닙니다. 버퍼 길이가 32kb를 초과하더라도 여전히 실패 할 수 있습니다. 그러나 한 시간 만에 실패하지 않았습니다. 트위터가 공개 스트림에서 많은 데이터를 보내지 않는다는 사실과 관련이 있다고 생각합니다.

while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if(buffer.length()>20000) break; 
     } 
0

문제는 코드가 연결을 유출하고 있다는 것입니다. 콘텐츠 스트림을 닫거나 요청을 중단했는지 확인하십시오.

InputStream instream = response.getEntity().getContent(); 
    try { 
     BufferedReader reader = new BufferedReader(
       new InputStreamReader(instream)); 
     String line = null; 
     StringBuffer buffer = new StringBuffer(); 
     while ((line = reader.readLine()) != null) { 
      buffer.append(line); 
      if (buffer.length()>30000) { 
       httpPost.abort(); 
       // connection will not be re-used 
       break; 
      } 
     } 
     return new EventImpl(buffer.toString().getBytes()); 
    } finally { 
     // if request is not aborted the connection can be re-used 
     try { 
      instream.close(); 
     } catch (IOException ex) { 
      // log or ignore 
     } 
    } 
+0

작동 안함. Flume은 스트림이 닫혔다 고 불평합니다 - 처리를 시작하기 전에 예외가 발생합니다. – Jay

+0

예외는 #close() 메서드에 의해 throw되며 안전하게 무시할 수 있습니다. – oleg

관련 문제