해결책을 찾았습니다. 나는 그것이 세상에서 가장 아름답다는 것을 모르지만 그것은 효과가있다. 나는 개인 재산을 창조, 각각의 사용자는 여러 세션을 가질 수 있습니다
private ConcurrentDictionary<long, ChatServerUserSessions> _chatServerUserSessionInfoDictionary = new ConcurrentDictionary<long, ChatServerUserSessions>();
및 세션 클래스 :
public class ChatServerUserSessions
{
public long UserId { get; set; }
public string UserName { get; set; }
public ConcurrentDictionary<string, ChatServerUserSessionInfo> Sessions { get; set; }
public object Lock { get; set; }
}
및 각 세션에 대해 나는 클래스 생성 :
public class ChatServerUserSessionInfo : IObservable<ChatServerRoomActivityInfoBase>, IDisposable
{
public string SessionId { get; set; }
public List<long> SubscribedRoomIds { get; set; }
public DateTime SubscriptionTicketInvalidationDate { get; set; }
public Queue<ChatServerRoomActivityInfoBase> MessagesQueue { get; set; }
private IDisposable subscription;
private List<IObserver<ChatServerRoomActivityInfoBase>> observers;
private ChatServerUserSessions parentUserSessions;
public ChatServerUserSessionInfo(string sessionId, DateTime subscriptionTicketInvalidationDate, Subject<ChatServerRoomActivityInfoBase> chatServerRoomActivity, ChatServerUserSessions parentUserSessions)
{
this.SessionId = sessionId;
this.SubscribedRoomIds = new List<long>();
this.SubscriptionTicketInvalidationDate = subscriptionTicketInvalidationDate;
this.MessagesQueue = new Queue<ChatServerRoomActivityInfoBase>();
this.parentUserSessions = parentUserSessions;
subscription = chatServerRoomActivity.Subscribe(activity =>
{
lock (parentUserSessions.Lock)
{
if (this.SubscribedRoomIds.Contains(activity.RoomId))
{
this.MessagesQueue.Enqueue(activity);
foreach (var observer in observers)
{
observer.OnNext(activity);
}
}
}
});
observers = new List<IObserver<ChatServerRoomActivityInfoBase>>();
}
~ChatServerUserSessionInfo()
{
Dispose();
}
public void Dispose()
{
if (subscription != null)
{
subscription.Dispose();
subscription = null;
}
this.observers = null;
GC.SuppressFinalize(this);
}
public IDisposable Subscribe(IObserver<ChatServerRoomActivityInfoBase> observer)
{
lock (parentUserSessions.Lock)
{
this.observers.Add(observer);
return (IDisposable)new Subscription(this, observer);
}
}
private void Unsubscribe(IObserver<ChatServerRoomActivityInfoBase> observer)
{
lock (parentUserSessions.Lock)
{
if (this.observers == null)
{
return;
}
this.observers.Remove(observer);
}
}
private class Subscription : IDisposable
{
private ChatServerUserSessionInfo subject;
private IObserver<ChatServerRoomActivityInfoBase> observer;
public Subscription(ChatServerUserSessionInfo subject, IObserver<ChatServerRoomActivityInfoBase> observer)
{
this.subject = subject;
this.observer = observer;
}
public void Dispose()
{
IObserver<ChatServerRoomActivityInfoBase> observer = Interlocked.Exchange<IObserver<ChatServerRoomActivityInfoBase>>(ref this.observer, (IObserver<ChatServerRoomActivityInfoBase>)null);
if (observer == null)
{
return;
}
this.subject.Unsubscribe(observer);
this.subject = (ChatServerUserSessionInfo)null;
}
}
}
각 사용자 세션을 자신의 MessageQueue를 가지고 있으며 글로벌 채팅룸 활동 과목에 등록되어 있습니다. ChatRoomActivityMessages는 각 세션마다 유지됩니다. 다음은 메시지를 가져 오는 방법입니다.
public void CheckForChatRoomsActivityAsync(long userId, string userName, Action<List<ChatServerRoomActivityInfoBase>> onChatRoomActivity)
{
var sessionId = GetCurrentSessionId();
var chatServerUserSessions = GetChatServerUserSessions(userId, userName);
lock (chatServerUserSessions.Lock)
{
var userSession = GetChatServerUserSessionInfo(sessionId, chatServerUserSessions);
ProlongSubscriptions(userSession);
if (userSession.MessagesQueue.Count > 0)
{
var activities = new List<ChatServerRoomActivityInfoBase>();
while (userSession.MessagesQueue.Count > 0)
{
activities.Add(userSession.MessagesQueue.Dequeue());
}
onChatRoomActivity(activities);
}
else
{
var queued = ThreadPool.QueueUserWorkItem(new WaitCallback(parm =>
{
var activities = new List<ChatServerRoomActivityInfoBase>();
var wait = new AutoResetEvent(false);
using (var subscriber = userSession.Subscribe(activity =>
{
lock (chatServerUserSessions.Lock)
{
activities.Add(activity);
userSession.MessagesQueue.Dequeue();
wait.Set();
}
}))
{
wait.WaitOne(TimeSpan.FromSeconds(CheckForActivityMaxWaitSeconds));
}
((Action<List<ChatServerRoomActivityInfoBase>>)parm)(activities);
}), onChatRoomActivity);
if (!queued)
{
onChatRoomActivity(new List<ChatServerRoomActivityInfoBase>());
}
}
}
}