2012-05-15 3 views
2
내가이 예를 기초로 수신을 사용하여 .NET에서 간단한 채팅을 구현하고있어

을 잃지 않을 것입니다 간단한 채팅을 구현하는 방법 : 이 https://blogs.claritycon.com/blog/2011/04/roll-your-own-mvc-3-long-polling-chat-site/닷넷 반응성 확장, 메시지

LongPolling를 사용하는 방법있다 새 메시지를 기다립니다 와서 :

public static void CheckForMessagesAsync(Action<List<MessageInfo>> onMessages) 
{ 
    var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm => 
    { 
     var msgs = new List<MessageInfo>(); 
     var wait = new AutoResetEvent(false); 
     using (var subscriber = _messages.Subscribe(msg => 
             { 
              msgs.Add(msg); 
              wait.Set(); 
             })) 
     { 
      // Wait for the max seconds for a new msg 
      wait.WaitOne(TimeSpan.FromSeconds(MaxWaitSeconds)); 
     } 

     ((Action<List<MessageInfo>>)parm)(msgs); 
    }), onMessages); 

    if (!queued) 
     onMessages(new List<MessageInfo>()); 
} 

내가 메시지를 해제하고 관찰자를 폐기하고 다시 연결 사이에 나타나는 손실이 방법을 사용. 메시지를 잃지 않도록이 메커니즘을 올바르게 구현하는 방법은 무엇입니까?

답변

0

해결책을 찾았습니다. 나는 그것이 세상에서 가장 아름답다는 것을 모르지만 그것은 효과가있다. 나는 개인 재산을 창조, 각각의 사용자는 여러 세션을 가질 수 있습니다

private ConcurrentDictionary<long, ChatServerUserSessions> _chatServerUserSessionInfoDictionary = new ConcurrentDictionary<long, ChatServerUserSessions>(); 

및 세션 클래스 :

public class ChatServerUserSessions 
{ 
    public long UserId { get; set; } 

    public string UserName { get; set; } 

    public ConcurrentDictionary<string, ChatServerUserSessionInfo> Sessions { get; set; } 

    public object Lock { get; set; } 
} 

및 각 세션에 대해 나는 클래스 생성 :

public class ChatServerUserSessionInfo : IObservable<ChatServerRoomActivityInfoBase>, IDisposable 
{ 
    public string SessionId { get; set; } 

    public List<long> SubscribedRoomIds { get; set; } 

    public DateTime SubscriptionTicketInvalidationDate { get; set; } 

    public Queue<ChatServerRoomActivityInfoBase> MessagesQueue { get; set; } 

    private IDisposable subscription; 
    private List<IObserver<ChatServerRoomActivityInfoBase>> observers; 
    private ChatServerUserSessions parentUserSessions; 

    public ChatServerUserSessionInfo(string sessionId, DateTime subscriptionTicketInvalidationDate, Subject<ChatServerRoomActivityInfoBase> chatServerRoomActivity, ChatServerUserSessions parentUserSessions) 
    { 
     this.SessionId = sessionId; 
     this.SubscribedRoomIds = new List<long>(); 
     this.SubscriptionTicketInvalidationDate = subscriptionTicketInvalidationDate; 
     this.MessagesQueue = new Queue<ChatServerRoomActivityInfoBase>(); 
     this.parentUserSessions = parentUserSessions; 

     subscription = chatServerRoomActivity.Subscribe(activity => 
     { 
      lock (parentUserSessions.Lock) 
      { 
       if (this.SubscribedRoomIds.Contains(activity.RoomId)) 
       { 
        this.MessagesQueue.Enqueue(activity); 

        foreach (var observer in observers) 
        { 
         observer.OnNext(activity); 
        } 
       } 
      } 
     }); 

     observers = new List<IObserver<ChatServerRoomActivityInfoBase>>(); 
    } 

    ~ChatServerUserSessionInfo() 
    { 
     Dispose(); 
    } 

    public void Dispose() 
    { 
     if (subscription != null) 
     { 
      subscription.Dispose(); 
      subscription = null; 
     } 

     this.observers = null; 

     GC.SuppressFinalize(this); 
    } 

    public IDisposable Subscribe(IObserver<ChatServerRoomActivityInfoBase> observer) 
    { 
     lock (parentUserSessions.Lock) 
     { 
      this.observers.Add(observer); 
      return (IDisposable)new Subscription(this, observer); 
     } 
    } 

    private void Unsubscribe(IObserver<ChatServerRoomActivityInfoBase> observer) 
    { 
     lock (parentUserSessions.Lock) 
     { 
      if (this.observers == null) 
      { 
       return; 
      } 

      this.observers.Remove(observer); 
     } 
    } 

    private class Subscription : IDisposable 
    { 
     private ChatServerUserSessionInfo subject; 
     private IObserver<ChatServerRoomActivityInfoBase> observer; 

     public Subscription(ChatServerUserSessionInfo subject, IObserver<ChatServerRoomActivityInfoBase> observer) 
     { 
      this.subject = subject; 
      this.observer = observer; 
     } 

     public void Dispose() 
     { 
      IObserver<ChatServerRoomActivityInfoBase> observer = Interlocked.Exchange<IObserver<ChatServerRoomActivityInfoBase>>(ref this.observer, (IObserver<ChatServerRoomActivityInfoBase>)null); 
      if (observer == null) 
      { 
       return; 
      } 

      this.subject.Unsubscribe(observer); 
      this.subject = (ChatServerUserSessionInfo)null; 
     } 
    } 
} 

각 사용자 세션을 자신의 MessageQueue를 가지고 있으며 글로벌 채팅룸 활동 과목에 등록되어 있습니다. ChatRoomActivityMessages는 각 세션마다 유지됩니다. 다음은 메시지를 가져 오는 방법입니다.

public void CheckForChatRoomsActivityAsync(long userId, string userName, Action<List<ChatServerRoomActivityInfoBase>> onChatRoomActivity) 
    { 
     var sessionId = GetCurrentSessionId(); 
     var chatServerUserSessions = GetChatServerUserSessions(userId, userName); 

     lock (chatServerUserSessions.Lock) 
     { 
      var userSession = GetChatServerUserSessionInfo(sessionId, chatServerUserSessions); 
      ProlongSubscriptions(userSession); 

      if (userSession.MessagesQueue.Count > 0) 
      { 
       var activities = new List<ChatServerRoomActivityInfoBase>(); 
       while (userSession.MessagesQueue.Count > 0) 
       { 
        activities.Add(userSession.MessagesQueue.Dequeue()); 
       } 

       onChatRoomActivity(activities); 
      } 
      else 
      { 
       var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm => 
       { 
        var activities = new List<ChatServerRoomActivityInfoBase>(); 
        var wait = new AutoResetEvent(false); 

        using (var subscriber = userSession.Subscribe(activity => 
        { 
         lock (chatServerUserSessions.Lock) 
         { 
          activities.Add(activity); 
          userSession.MessagesQueue.Dequeue(); 

          wait.Set(); 
         } 
        })) 
        { 
         wait.WaitOne(TimeSpan.FromSeconds(CheckForActivityMaxWaitSeconds)); 
        } 

        ((Action<List<ChatServerRoomActivityInfoBase>>)parm)(activities); 
       }), onChatRoomActivity); 

       if (!queued) 
       { 
        onChatRoomActivity(new List<ChatServerRoomActivityInfoBase>()); 
       } 
      } 
     } 
    }