2014-06-10 3 views
0

Flume이 ScribeSource를 사용하여 Scribe로 작업하려고하는데, 다음과 같은 Exception 및 flume이 몇 분 안에 데이터를 수신하지 못하게합니다 (대략 1 메가 바이트).Flume ScribeSource, 슬리 프 프레임 크기 설정 방법

Flume에서 Frame Size를 설정할 수있는 방법이 없으므로 Scribe 트래픽을 허용 할 수 있습니다. 제약

2014-06-10 19:40:40,405 WARN org.apache.thrift.server.THsHaServer: Exception while invoking! 
org.apache.thrift.transport.TTransportException: Frame size (23757404) larger than max length (16384000)! 
    at org.apache.thrift.transport.TFramedTransport.readFrame(TFramedTransport.java:137) 
    at org.apache.thrift.transport.TFramedTransport.read(TFramedTransport.java:101) 
    at org.apache.thrift.transport.TTransport.readAll(TTransport.java:84) 
    at org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:378) 
    at org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:297) 
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:204) 
    at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27) 
    at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.invoke(AbstractNonblockingServer.java:478) 
    at org.apache.thrift.server.Invocation.run(Invocation.java:18) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:724) 

답변

0

이것은 기본값 16메가바이트로 작성되는()를 사용 TFrameTransport.Factory 수로 인해 스크라이브 소스시키는 ... 스크라이브 측을 변경하지 않는 것이다.

ScribeSource이 TFramedTransport.Factory (thriftFrameSize)을 사용하는 경우, 다음이 해결 될 것 ...

//Added to accept custom frame size 
private int thriftFrameSize = DEFAULT_THRIFT_FRAME_SIZE; 

private SourceCounter sourceCounter; 

@Override 
public void configure(Context context) { 
    port = context.getInteger("port", port); 

    workers = context.getInteger("workerThreads", DEFAULT_WORKERS); 
    if (workers <= 0) { 
    workers = DEFAULT_WORKERS; 
    } 

    if (sourceCounter == null) { 
    sourceCounter = new SourceCounter(getName()); 
    } 

    thriftFrameSize = context.getInteger("thriftFrameSize", DEFAULT_THRIFT_FRAME_SIZE); 
    if (thriftFrameSize <= 0) { 
    thriftFrameSize = DEFAULT_THRIFT_FRAME_SIZE; 
    } 
} 

private class Startup extends Thread { 

    public void run() { 
    try { 
     Scribe.Processor processor = new Scribe.Processor(new Receiver()); 
     TNonblockingServerTransport transport = new TNonblockingServerSocket(port); 
     THsHaServer.Args args = new THsHaServer.Args(transport); 

     args.workerThreads(workers); 
     args.processor(processor); 
     args.transportFactory(new TFramedTransport.Factory(thriftFrameSize)); 
관련 문제