2010-07-08 3 views
3

원격 FTP 서버에서 큰 파일 (100MB 이상)을 스트리밍하는 WCF 서비스가 있습니다. WCF 클라이언트에 파일을 스트리밍으로WCF 스트림을 반환하는 동안 파일로 스트리밍 하시겠습니까?

[ServiceContract] 
public interface IStreamService 
{ 
    [OperationContract] 
    Stream GetDataFromFtp(); 
} 

public class StreamService : IStreamService 
{ 
    public Stream GetDataFromFtp() 
    { 
     Stream ftpStream = Retr(...); 
     return ftpStream; 
    } 
} 

, 나는 미래의 요청이 원격 FTP 다시 모든 길을 갈 필요가 없도록, 로컬 캐시로 스트리밍 할 - 나는 그냥 서비스를 제공 할 수 오프 디스크.

반환하기 전에 전체 100MB 파일 스트림을 메모리에 버퍼링하지 않으면이 문제가 발생합니다.

나는 각각의 읽기 쓰기를 수행하는 간단한 캡처 스트림 래퍼 사용하여 시도 :

public class CapturingStreamWrapper : Stream 
{ 
    private readonly Stream stream; 
    private readonly Stream captureStream; 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     int readBytes = stream.Read(buffer, offset, count); 
     captureStream.Write(buffer, offset, readBytes); 

     return readBytes; 
    } 
} 

public class StreamService : IStreamService 
{ 
    public Stream GetDataFromFtp() 
    { 
     Stream ftpStream = Retr(...); 
     Stream cacheStream = File.OpenWrite(...); 
     return new CapturingStreamWrapper(ftpStream, cacheStream); 
    } 
} 

을하지만이 작동하지 않았다.

또한 오류 처리 기능이 제공되지 않습니다. 클라이언트 전송에 실패 (예 : 트랜잭션 캐시) 된 경우 절반의 기록 된 파일을 삭제하기위한 캐치 블록이 필요합니다. WCF 라이프 사이클에서 스트림을 호출하거나 정리할 때를 모르기 때문에 이것이 어떻게 작동하는지 잘 모르겠습니다.

클라이언트로 다시 스트리밍 할 때 어떤 파일을 어떻게 스트리밍 할 수 있습니까?

답변

3

필자는 두 개의 서로 연결된 스트림 클래스를 작성했습니다. 큰 코드를 붙여 넣기에 대한 사과 :

/// <summary> 
/// A stream that, as it reads, makes those bytes available on an ouput 
/// stream. Thread safe. 
/// </summary> 
public class CacheStream : Stream 
{ 
    private readonly Stream stream; 

    public CacheStream(Stream stream) 
    { 
     if (stream == null) throw new ArgumentNullException("stream"); 
     this.stream = stream; 
     OutputStream = new CacheOutputStream(this); 
    } 

    public event EventHandler<BytesReadEventArgs> BytesRead = delegate { }; 
    public event EventHandler Closing = delegate { }; 

    public Stream OutputStream { get; private set; } 

    public override void Flush() 
    { 
     stream.Flush(); 
    } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new InvalidOperationException("Cannot seek in CachingStream."); 
    } 

    public override void SetLength(long value) 
    { 
     stream.SetLength(value); 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     int numberOfBytesRead = stream.Read(buffer, offset, count); 

     if (numberOfBytesRead > 0) 
      PipeToOutputStream(buffer, offset, numberOfBytesRead); 

     return numberOfBytesRead; 
    } 

    private void PipeToOutputStream(byte[] buffer, int offset, int numberOfBytesRead) 
    { 
     var tmp = new byte[numberOfBytesRead]; 
     Array.Copy(buffer, offset, tmp, 0, numberOfBytesRead); 
     BytesRead(this, new BytesReadEventArgs(tmp)); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     throw new InvalidOperationException("Cannot write in CachingStream."); 
    } 

    public override bool CanRead 
    { 
     get { return stream.CanRead; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return false; } 
    } 

    public override long Length 
    { 
     get { return stream.Length; } 
    } 

    public override long Position 
    { 
     get { return stream.Position; } 
     set { throw new InvalidOperationException("Cannot set position in CachingStream."); } 
    } 

    public override void Close() 
    { 
     Closing(this, EventArgs.Empty); 
     base.Close(); 
    } 

    protected override void Dispose(bool disposing) 
    { 
     base.Dispose(disposing); 
     OutputStream.Dispose(); 
    } 
} 

그리고

/// <summary> 
/// Output portion of CacheStream. Streams bytes from a queue of buffers. 
/// Thread safe. 
/// </summary> 
public class CacheOutputStream : Stream 
{ 
    private volatile int position; 
    private volatile int length; 
    private volatile bool sourceIsClosed; 

    // No Deque<T> in the BCL yet, but LinkedList is more or less the same. 
    private readonly LinkedList<byte[]> buffers = new LinkedList<byte[]>(); 

    public CacheOutputStream(CacheStream stream) 
    { 
     if (stream == null) throw new ArgumentNullException("stream"); 

     stream.BytesRead += (o, e) => AddToQueue(e.Buffer); 
     stream.Closing += (o, e) => sourceIsClosed = true; 
    } 

    private void AddToQueue(byte[] buffer) 
    { 
     if (buffer.Length == 0) 
      return; 

     lock (buffers) 
     { 
      buffers.AddLast(buffer); 
      length += buffer.Length; 
     } 
    } 

    public override int Read(byte[] buffer, int offset, int count) 
    { 
     if (buffer == null) throw new ArgumentNullException("buffer"); 

     bool noMoreBuffersAvailable = HasNoMoreBuffersAvailable(); 

     // Guard clause - closed and nothing more to write. 
     if (noMoreBuffersAvailable && sourceIsClosed) 
      return 0; 

     if (noMoreBuffersAvailable) 
     { 
      // Not closed yet! Block infinitely until we get closed or have some data. 
      while (HasNoMoreBuffersAvailable()) 
      { 
       if (sourceIsClosed) 
        return 0; 

       Thread.Sleep(TimeSpan.FromMilliseconds(50)); 
      } 
     } 

     byte[] currentBuffer = GetCurrentBuffer(); 
     int numberOfBytesRead = DoRead(buffer, count, currentBuffer, offset); 

     PutLeftoverBytesAtFrontOfQueue(currentBuffer, numberOfBytesRead); 

     return numberOfBytesRead; 
    } 

    // Check if caller didn't have enough space to fit the buffer. 
    // Put the remaining bytes at the front of the queue. 
    private void PutLeftoverBytesAtFrontOfQueue(byte[] currentBuffer, int numberOfBytesRead) 
    { 
     if (currentBuffer == null) throw new ArgumentNullException("currentBuffer"); 

     if (numberOfBytesRead == currentBuffer.Length) 
      return; // Clean read! 

     var remainingBuffer = new byte[currentBuffer.Length - numberOfBytesRead]; 
     Array.Copy(currentBuffer, numberOfBytesRead, remainingBuffer, 0, remainingBuffer.Length); 

     lock (buffers) 
      buffers.AddFirst(remainingBuffer); 
    } 

    private int DoRead(byte[] buffer, int count, byte[] currentBuffer, int offset) 
    { 
     int maxNumberOfBytesWeCanWrite = Math.Min(count, currentBuffer.Length); 

     Array.Copy(currentBuffer, 0, buffer, offset, maxNumberOfBytesWeCanWrite); 
     position += maxNumberOfBytesWeCanWrite; 

     return maxNumberOfBytesWeCanWrite; 
    } 

    private byte[] GetCurrentBuffer() 
    { 
     byte[] currentBuffer; 

     lock (buffers) 
     { 
      currentBuffer = buffers.First.Value; 
      buffers.RemoveFirst(); 
     } 

     return currentBuffer; 
    } 

    private bool HasNoMoreBuffersAvailable() 
    { 
     lock (buffers) 
      return buffers.Count == 0; 
    } 

    public override void Flush() { } 

    public override long Seek(long offset, SeekOrigin origin) 
    { 
     throw new InvalidOperationException("Cannot seek in CachingStream."); 
    } 

    public override void SetLength(long value) 
    { 
     throw new InvalidOperationException("Cannot set length in CachingStream."); 
    } 

    public override void Write(byte[] buffer, int offset, int count) 
    { 
     throw new InvalidOperationException("Cannot write in a CachingStream."); 
    } 

    public override bool CanRead 
    { 
     get { return true; } 
    } 

    public override bool CanSeek 
    { 
     get { return false; } 
    } 

    public override bool CanWrite 
    { 
     get { return false; } 
    } 

    public override long Length 
    { 
     get { return length; } 
    } 

    public override long Position 
    { 
     get { return position; } 
     set { throw new InvalidOperationException("Cannot set position in CachingStream."); } 
    } 
} 
+0

난 당신이'너무 BytesReadEventArgs' 한 않습니다 ..이 스핀을 줄 막했다? 이게 너에게 어떻게 작용하는거야? – LamonteCristo

+0

안녕하세요 리차드 지금은 꽤 오래된 것 같습니다.하지만 WCF 구현에서 스트리밍 전송 모드를 사용하고 있는지 확인하고 싶습니다. – user919426

관련 문제