2014-05-14 1 views
0

Rxx를 사용하여 TCP 시간 서버를 생성하고 있습니다. 이 아이디어는 모든 연결된 클라이언트에 서버의 로컬 시간을 매 초마다 브로드 캐스팅하는 Tcp Server 일뿐입니다. 이 서버에 연결할 수 있으며 '틱'관측 가능 대상이 구독되어 있지만 클라이언트가 데이터를받지 못하고 있음을 알 수 있습니다. 내가 여기서 무엇을 놓치고 있니? 다음은 서버에 대한 코드입니다.Rxx를 사용하는 Tcp 시간 서버

class Program 
{ 
    static void Main(string[] args) 
    { 
     var ticks = Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(_ => DateTime.Now.ToString()) 
      .Do(tick => Console.WriteLine("tick: {0}", tick)) 
      .Publish() 
      .RefCount(); 

     IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007); 

     var listener = ObservableSocket.Accept(
       AddressFamily.InterNetwork, 
       SocketType.Stream, 
       ProtocolType.Tcp, 
       serverAddress, 
       20) 
       .Do(s => Console.WriteLine("connection accepted {0}", s.RemoteEndPoint)) 
       .Select(s => new StreamWriter(new NetworkStream(s, true))); 


     using (listener.Subscribe(
       client => ticks.Subscribe(
        tick => client.WriteLineAsync(tick), 
        (tex) => Console.WriteLine("ticks error: {0}", tex.Message), 
        () => Console.WriteLine("ticks completed") 
       ), 
       (ex) => Console.WriteLine("server error: {0}", ex.Message), 
       () => Console.WriteLine("server completed") 
      ) 
     ) 
     { 
      Console.WriteLine("Time server listening {0}", serverAddress); 
      Console.WriteLine("Press ENTER to stop..."); 
      Console.ReadLine(); 
     } 
    } 
} 

답변

0

여기는 작동하지 않는 Rxx 기반 솔루션이 있지만 꽤 좋지 않습니다. Rxx 전문가를 호소하는 것은 여전히 ​​나에게보다 우아한 해결책을 제시 할 수 있습니다.

class Program 
{ 
    static List<TcpClient> clients = new List<TcpClient>(); 

    static void Main(string[] args) 
    { 
     IPEndPoint serverAddress = new IPEndPoint(IPAddress.Loopback, 15007); 

     using(Listen(serverAddress).Subscribe(client => { lock(clients) { clients.Add(client); }})) 
     using (Ticks().Subscribe(
      tick => 
      { 
       lock(clients) 
       { 
        int i = 0; 
        while (i < clients.Count) 
        { 
         var client = clients[i]; 
         try 
         { 
          using(var writer = new StreamWriter(client.GetStream(), Encoding.ASCII, client.Client.SendBufferSize, true)) 
          { 
           writer.WriteLine(tick.ToString()); 
           i++; 
          } 
         } 
         catch(Exception ex) 
         { 
          Console.WriteLine("exception: {0}", ex.Message); 
          clients.Remove(client); 
          Console.WriteLine("client disconnected"); 
         } 
        } 
       } 
      }) 
     ) 
     { 
      Console.WriteLine("Time server listening {0}", serverAddress); 
      Console.WriteLine("Press ENTER to stop..."); 
      Console.ReadLine(); 
     } 
    } 

    static IObservable<TcpClient> Listen(IPEndPoint endpoint) 
    { 
     return Observable.Create<TcpClient>(
      observer => 
      { 
       TcpListener listener = new TcpListener(endpoint); 
       listener.Start(); 
       var subscription = Observable 
        .FromAsync(listener.AcceptTcpClientAsync) 
        .Retry() 
        .Repeat() 
        .Do(client => Console.WriteLine("connection accepted {0}", client.Client.RemoteEndPoint)) 
        .Subscribe(observer); 
       return new CompositeDisposable(subscription, 
        Disposable.Create(() => listener.Stop())); 
      }) 
      .Publish() 
      .RefCount(); 
    } 

    static IObservable<DateTime> Ticks() 
    { 
     return Observable.Interval(TimeSpan.FromSeconds(1)) 
      .Select(_ => DateTime.Now) 
      .Do(tick => Console.WriteLine("tick: {0}", tick)) 
      .Publish() 
      .RefCount(); 
    } 
}