2009-12-30 2 views
0

PublisherTask 및 SubscriberTask 유형의 여러 객체가있는 프로그램이 있습니다. 주어진 가입자는 하나 이상의 게시자에 가입 할 수 있습니다.
은 그래서 지금 당신이 내 코드를 자세히 조사 할 수있는 기회를 했어 ....Java ConcurrentLinkedQueue의 요소에서 peek()을 수행 할 수 있습니까

abstract class Publication { 
    // some published information 
} 

class ConcretePublicationA extends Publication { 

} 

class ConcretePublicationB extends Publication { 

} 

abstract class Subscription { 
    private final long id; 
    private final Subscriber s; 
    // PLUS some other members relating to the subscription 

    protected Subscription(long id, Subscriber s){ 
     this.id = id; 
     this.s =s; 
    } 

    public Subscriber getSubscriber() { 
     return this.s; 
    } 


} 

class ConcreteSubscriptionA extends Subscription { 

    protected ConcreteSubscriptionA(long id, Subscriber s) { 
     super(id, s); 
     // TODO Auto-generated constructor stub 
    } 

} 

class ConcreteSubscriptionB extends Subscription { 

    protected ConcreteSubscriptionB(long id, Subscriber s) { 
     super(id, s); 
     // TODO Auto-generated constructor stub 
    } 

} 

interface Subscriber { 
    public void update(Publication pub); 
} 

interface Publisher { 
    public Subscription subscribe(Subscriber subscriber); 
} 

abstract class PublisherTask implements Runnable, Publisher { 
    private final ConcurrentHashMap<Long, Subscription> subscribers = 
     new ConcurrentHashMap<Long, Subscription>(); 
    Long subscriptionId = 0L; 

    @Override 
    public void run() { 
     /*obviously this is a different variable in a real program*/ 
     boolean some_condition = true; 

     while(some_condition) { 
      // do some work 
      Publication pub = /* new ConcretePublication(....) */ null; 

      for (Subscription s : subscribers.values()) { 
       s.getSubscriber().update(pub); 
      } 
     } 

    } 

    @Override 
    public Subscription subscribe(Subscriber subscriber) { 
     Subscription sub; 

     synchronized(subscriptionId) { 
      /* the lines below are in a function in the sub-class, 
      * but for brevity I'm showing them here 
      */ 
      sub = new ConcreteSubscriptionA(++subscriptionId, subscriber); 
        subscribers.put(subscriptionId, sub); 
     } 
     return sub ; 
    } 

} 


abstract class SubscriberTask implements Runnable, Subscriber { 

    protected ConcurrentLinkedQueue<Publication> newPublications = 
     new ConcurrentLinkedQueue<Publication>(); 

    @Override 
    public void run() { 
     /*obviously this is a different variable in a real program*/ 
     boolean some_condition = true; 

     while(some_condition) { 
      // do some work 
      Publication pub = newPublications.peek(); 

     /* the lines below are in a function in the sub-class, 
     * but for brevity I'm showing them here 
     */ 
     { 
      if (pub instanceof ConcretePublicationA) { 
       // Do something with the published data 
      } else if (pub instanceof ConcretePublicationB) { 
       // Do something with the published data 
      } 
     } 
     } 
    } 

    @Override 
    public void update(Publication pub) { 

     /* My question relates to this method: 
      * Bascially to avoid memory issues I would like existing 
      * unprocessed publications **Of Tth Same Type As The New One** 
      * to be discarded 
      */ 
     Publication existing = null; 

     do { 
      //This won't work coz peek() only looks at the head of the queue 
        existing = newPublications.peek(); 

      if ((existing != null) && (existing.getClass().equals(pub))) { 
       newPublications.remove(existing); 
      } 
     } while (existing != null); 
     newPublications.add(pub); 
    } 

좋아, 나는 몇 가지 코드를 게시 할 예정입니다 내 문제를 설명합니다. 다음 질문을하고 싶습니다 :
위의 업데이트 방법에서 ConcurrentLinkedQueue의 모든 요소를 ​​들여다보고 주어진 유형의 요소를 제거 할 수 있습니까?
또한 수업을 개선하고 상호 작용하는 방법을 생각하면 알려 주시기 바랍니다.
감사합니다.

답변

0

이전에이 문제를 해결했습니다. 내가 해결 한 방법은 모든 새로운 게시 객체에 카운터를 효과적으로 포함시키는 것입니다 (사용자가 수행중인 작업에 대해 클래스 유형마다 카운터가 필요함). 소비 된 개체 내의 카운터가 현재 값보다 작 으면 큐에서 게시 개체를 읽을 때 큐에 나중에 개체가 있기 때문에 버려집니다.

그러나 새로운 출판물을 충분히 빨리 추가하면 계속해서 사실을 처리하지 않으므로 cpu 출판물을 굶주 리지 않도록 조심해야합니다. 부실 (나는 10 분마다 하나씩 천천히 덧붙이는 것을 알았다. 그리고 나의 작업은 두 분씩 움직이면서 실행되었다. 오래된 작업은 새로운 것이 있으면 그 자리에 아무 쓸모가 없다).

A1, A2, B1, A3, A4, B2를 추가하면 A1을 대기열의 맨 위에두고 같은 유형이 아니므로 B2를 추가하면 B1을 제거하지 않습니다. 귀하의 의견이 당신이 원하는 것을 의미하지 않는 것 같습니다.

당신이 좋아하는 몇 가지 헬퍼 코드를 넣어 내 아이디어를 구현할 수

: 검사 할 때 여전히 수 처리해야하는지 여부를 생성자에서하는 추가 할 때

class ValidCounter { 
    private Map<Class, AtomicLong> counters = new ConcurrentHashMap<Class, AtomicLong>(); 
    public int getAtomicLongFor(Class clz) { 
     AtomicLong ans = counters.get(clz); 
     if (ans == null) { 
      counters.put(clz, new AtomicLong(0)); 
      return 0; 
     } 
     return ans.get(); 
    } 
} 

사용은

long myValidCounter = validCounterInstance.getAtomicLongFor(getClass()).incrementAndGet(); 

사용

long currValidCounter = validCounterInstance.getAtomicLongFor(getClass()).get(); 
if (pub.getValidCounter() >= currValidCounter) { 
    // still valid so process 
} else { 
    // superceeded, so ignore 
} 

기아 문제를 기억하십시오. 얼마나 많은 또는 얼마나 오래 당신이 물건을 버렸는지 신경 쓰고 그냥 오래된 것을 가져가는 것.

나는 또한 당신이 여러 소비자를 가지고있는 경우에만 엿보기를 사용하지 않고 그냥 설문 조사를 사용하거나 스레드 문제 (여러 번 처리 된 항목)를 얻는 것이 좋습니다.

1

예, 반복기를 사용하여 ConcurrentLinkedQueue의 모든 요소를 ​​들여다 볼 수 있습니다.

Iterator<Publication> itr = newPublications.iterator(); 
while (itr.hasNext()) { 
    Publication existing = itr.next(); 
    if (existing.getClass().equals(pub)) { 
     itr.remove(); 
    } 
} 

반복자의 구축시에 존재하고, 당신은 외부의 주위에 고정 할 수 있습니다 (다만 보증되지는 않았다), 구축 후의 모든 변경을 반영 수 있으므로 ConcurrentLinkedQueue를 보장 의해 반환 된 반복자는 요소를 통과하기 때문에

:

newPublications 

일반적으로 이것은 매우 효율적으로 보이지 않으므로 중복 발행물을 정리하기위한 대체 솔루션을 조사해야합니다.

관련 문제