2016-06-14 7 views
1

특정 시간 (예 : 10 초) 동안 최대 N 개의 요소를 저장하거나 가득 찬 경우 가장 오래된 값을 처리해야하는 대기열을 찾습니다.생존 시간이있는 메시지 대기열

아파치 컬렉션 (CircularFifoQueueJavaDoc)에서 비슷한 시간대를 발견했습니다. 전체 메시지 브로커가 작은 프로젝트의 오버 헤드처럼 보입니다.

어떻게 구현해야하는지 힌트를 주시겠습니까? 요소를 폴링하는 동안 이전 값을 필터링해야합니까?

답변

1

다음 대기열 구현에 익숙합니다. 코드는 Apaches CircularFifoQueue을 기반으로하며 약점만을 테스트합니다. 또한 구현은 스레드 안전하지 않음은 직렬화 가능하지 않음입니다.

실수를 저지른 경우 의견을 남겨주세요.

import java.util.AbstractCollection; 
import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.NoSuchElementException; 
import java.util.Queue; 
import java.util.concurrent.TimeUnit; 

/** 
* TimedQueue is a first-in first-out queue with a fixed size that 
* replaces its oldest element if full. 
* <p> 
* The removal order of a {@link TimedQueue} is based on the 
* insertion order; elements are removed in the same order in which they 
* were added. The iteration order is the same as the removal order. 
* <p> 
* The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll}, 
* {@link #offer(Object)} operations all perform in constant time. 
* All other operations perform in linear time or worse. 
* <p> 
* This queue prevents null objects from being added and it is not thread-safe and not serializable. 
* 
* The majority of this source code was copied from Apaches {@link CircularFifoQueue}: http://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/queue/CircularFifoQueue.html 
* 
* @version 1.0 
*/ 
public class TimedQueue<E> extends AbstractCollection<E> 
implements Queue<E> { 

/** Underlying storage array. */ 
private Item<E>[] elements; 

/** Array index of first (oldest) queue element. */ 
private int start = 0; 

/** 
* Index mod maxElements of the array position following the last queue 
* element. Queue elements start at elements[start] and "wrap around" 
* elements[maxElements-1], ending at elements[decrement(end)]. 
* For example, elements = {c,a,b}, start=1, end=1 corresponds to 
* the queue [a,b,c]. 
*/ 
private transient int end = 0; 

/** Flag to indicate if the queue is currently full. */ 
private transient boolean full = false; 

/** Capacity of the queue. */ 
private final int maxElements; 

private TimeUnit unit; 
private int delay; 

/** 
* Constructor that creates a queue with the default size of 32. 
*/ 
public TimedQueue() { 
    this(32); 
} 

/** 
* Constructor that creates a queue with the specified size. 
* 
* @param size the size of the queue (cannot be changed) 
* @throws IllegalArgumentException if the size is &lt; 1 
*/ 
public TimedQueue(final int size) { 
    this(size, 3, TimeUnit.SECONDS); 
} 

@SuppressWarnings("unchecked") 
public TimedQueue(final int size, int delay, TimeUnit unit) { 
    if (size <= 0) { 
     throw new IllegalArgumentException("The size must be greater than 0"); 
    } 
    elements = new Item[size]; 
    maxElements = elements.length; 
    this.unit = unit; 
    this.delay = delay; 
} 

/** 
* Constructor that creates a queue from the specified collection. 
* The collection size also sets the queue size. 
* 
* @param coll the collection to copy into the queue, may not be null 
* @throws NullPointerException if the collection is null 
*/ 
public TimedQueue(final Collection<? extends E> coll) { 
    this(coll.size()); 
    addAll(coll); 
} 

/** 
* Returns the number of elements stored in the queue. 
* 
* @return this queue's size 
*/ 
@Override 
public int size() { 
    int size = 0; 

    for(int i = 0; i < elements.length; i++) { 
     if(validElement(i) != null) { 
      size++; 
     } 
    } 

    return size; 
} 

/** 
* Returns true if this queue is empty; false otherwise. 
* 
* @return true if this queue is empty 
*/ 
@Override 
public boolean isEmpty() { 
    return size() == 0; 
} 

private boolean isAtFullCapacity() { 
    return size() == maxElements; 
} 

/** 
* Clears this queue. 
*/ 
@Override 
public void clear() { 
    full = false; 
    start = 0; 
    end = 0; 
    Arrays.fill(elements, null); 
} 

/** 
* Adds the given element to this queue. If the queue is full, the least recently added 
* element is discarded so that a new element can be inserted. 
* 
* @param element the element to add 
* @return true, always 
* @throws NullPointerException if the given element is null 
*/ 
@Override 
public boolean add(final E element) { 
    if (null == element) { 
     throw new NullPointerException("Attempted to add null object to queue"); 
    } 

    if (isAtFullCapacity()) { 
     remove(); 
    } 

    elements[end++] = new Item<E>(element); 

    if (end >= maxElements) { 
     end = 0; 
    } 

    if (end == start) { 
     full = true; 
    } 

    return true; 
} 

/** 
* Returns the element at the specified position in this queue. 
* 
* @param index the position of the element in the queue 
* @return the element at position {@code index} 
* @throws NoSuchElementException if the requested position is outside the range [0, size) 
*/ 
public E get(final int index) { 
    final int sz = size(); 
    if (sz == 0) { 
     throw new NoSuchElementException(
       String.format("The specified index (%1$d) is outside the available range because the queue is empty.", Integer.valueOf(index))); 
    } 
    if (index < 0 || index >= sz) { 
     throw new NoSuchElementException(
       String.format("The specified index (%1$d) is outside the available range [0, %2$d]", 
           Integer.valueOf(index), Integer.valueOf(sz-1))); 
    } 

    final int idx = (start + index) % maxElements; 
    return validElement(idx); 
} 

private E validElement(int idx) { 
    if(elements[idx] == null){ 
     return null; 
    } 
    long diff = System.currentTimeMillis() - elements[idx].getCreationTime(); 

    if(diff < unit.toMillis(delay)) { 
     return (E) elements[idx].getValue(); 
    } else { 
     elements[idx] = null; 
     return null; 
    } 
} 

//----------------------------------------------------------------------- 

/** 
* Adds the given element to this queue. If the queue is full, the least recently added 
* element is discarded so that a new element can be inserted. 
* 
* @param element the element to add 
* @return true, always 
* @throws NullPointerException if the given element is null 
*/ 
public boolean offer(E element) { 
    return add(element); 
} 

public E poll() { 
    if (isEmpty()) { 
     return null; 
    } 
    return remove(); 
} 

public E element() { 
    if (isEmpty()) { 
     throw new NoSuchElementException("queue is empty"); 
    } 
    return peek(); 
} 

public E peek() { 
    if (isEmpty()) { 
     return null; 
    } 
    return (E) elements[start].getValue(); 
} 

public E remove() { 
    if (isEmpty()) { 
     throw new NoSuchElementException("queue is empty"); 
    } 

    final E element = validElement(start); 
    if (null != element) { 
     elements[start++] = null; 

     if (start >= maxElements) { 
      start = 0; 
     } 
     full = false; 
    } 
    return element; 
} 

/** 
* Increments the internal index. 
* 
* @param index the index to increment 
* @return the updated index 
*/ 
private int increment(int index) { 
    index++; 
    if (index >= maxElements) { 
     index = 0; 
    } 
    return index; 
} 

/** 
* Decrements the internal index. 
* 
* @param index the index to decrement 
* @return the updated index 
*/ 
private int decrement(int index) { 
    index--; 
    if (index < 0) { 
     index = maxElements - 1; 
    } 
    return index; 
} 

/** 
* Returns an iterator over this queue's elements. 
* 
* @return an iterator over this queue's elements 
*/ 
@Override 
public Iterator<E> iterator() { 
    return new Iterator<E>() { 

     private int index = start; 
     private int lastReturnedIndex = -1; 
     private boolean isFirst = full; 

     public boolean hasNext() { 
      return (isFirst || index != end) && size() > 0; 
     } 

     public E next() { 
      if (!hasNext()) { 
       throw new NoSuchElementException(); 
      } 
      isFirst = false; 
      lastReturnedIndex = index; 
      index = increment(index); 
      if(validElement(lastReturnedIndex) == null) { 
       return next(); 
      } else { 
       return validElement(lastReturnedIndex); 
      } 
     } 

     public void remove() { 
      if (lastReturnedIndex == -1) { 
       throw new IllegalStateException(); 
      } 

      // First element can be removed quickly 
      if (lastReturnedIndex == start) { 
       TimedQueue.this.remove(); 
       lastReturnedIndex = -1; 
       return; 
      } 

      int pos = lastReturnedIndex + 1; 
      if (start < lastReturnedIndex && pos < end) { 
       // shift in one part 
       System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos); 
      } else { 
       // Other elements require us to shift the subsequent elements 
       while (pos != end) { 
        if (pos >= maxElements) { 
         elements[pos - 1] = elements[0]; 
         pos = 0; 
        } else { 
         elements[decrement(pos)] = elements[pos]; 
         pos = increment(pos); 
        } 
       } 
      } 

      lastReturnedIndex = -1; 
      end = decrement(end); 
      elements[end] = null; 
      full = false; 
      index = decrement(index); 
     } 

    }; 
} 

private static final class Item<E> { 
    private long creationTime; 
    private E in; 

    public Item(E in) { 
     this.in = in; 
     creationTime = System.currentTimeMillis(); 
    } 

    public E getValue() { 
     return in; 
    } 

    public long getCreationTime() { 
     return creationTime; 
    } 
} 
} 
1

부실 데이터 제거를위한 특별한 방법이있는 LinkedHashMap이라는 클래스가 있습니다. 문서에서 : 사실

보호 부울 위해서, removeEldestEntry (의 Map.Entry 장남)
반환하면이 맵이 제일 낡은 엔트리를 삭제하는 경우.

어떤 것이 목록 (대기열)에 추가 될 때마다 removeEldestEntry 메서드가 호출됩니다. true을 반환하면 가장 오래된 항목이 제거되어 새 항목을위한 공간을 만듭니다. 그렇지 않으면 아무 것도 제거되지 않습니다. 가장 오래된 항목의 타임 스탬프를 확인하고 만료 (예 : 10 초)의 임계 값보다 오래된 경우 true을 반환하는 자체 구현을 추가 할 수 있습니다. 구현 내용은 다음과 같습니다.

protected boolean removeEldestEntry(Map.Entry eldest) { 
    long currTimeMillis = System.currentTimeMillis(); 
    long entryTimeMillis = eldest.getValue().getTimeCreation(); 

    return (currTimeMillis - entryTimeMillis) > (1000*10*60); 
} 
+0

나는 그것을 점검하고 계속 게시 할 것입니다! – Markus

+0

이 답변은 훌륭합니다! –

+0

'removeEldestEntry' 메소드는 새로운 엔트리가 추가 될 경우에만 호출됩니다. 이전 값은 맵에 남아 있습니다. – Markus

1

생각해 보면 java.util.LinkedHashMap이 해결책입니다. 그것은 엔트리가 맵에 놓여질 때마다 호출되는 removeEldest() 메쏘드를 가지고 있습니다. 가장 오래된 항목을 제거해야하는지 나타내려면이 항목을 무시할 수 있습니다.

Javadoc가 좋은 예를 제공합니다 :지도는 100 개 이상의 항목이있는 경우

private static final int MAX_ENTRIES = 100; 

protected boolean removeEldestEntry(Map.Entry eldest) { 
    return size() > MAX_ENTRIES; 
} 

이는 낡은 엔트리를 제거합니다.

10 초 후에 적극적으로 항목을 제거하려면 나이를 확인하고 오래된 항목을 제거하려면 별도의 스레드가 필요합니다. 나는 이것이 당신이 원하는 바가 아니라는 것을 당신의 묘사로 판단하면서 추측하고 있습니다.

+0

이 답변은 코멘트 여야합니다. –

+0

이것은 좋은 아이디어 인 것 같습니다. 그러나 10 초보다 오래된 항목은 제거하고 싶습니다. 이미 그러한 "시간 기반 대기열"의 구현이 있습니까? – Markus