PublisherTask 및 SubscriberTask 유형의 여러 객체가있는 프로그램이 있습니다. 주어진 가입자는 하나 이상의 게시자에 가입 할 수 있습니다.
은 그래서 지금 당신이 내 코드를 자세히 조사 할 수있는 기회를 했어 ....Java ConcurrentLinkedQueue의 요소에서 peek()을 수행 할 수 있습니까
abstract class Publication {
// some published information
}
class ConcretePublicationA extends Publication {
}
class ConcretePublicationB extends Publication {
}
abstract class Subscription {
private final long id;
private final Subscriber s;
// PLUS some other members relating to the subscription
protected Subscription(long id, Subscriber s){
this.id = id;
this.s =s;
}
public Subscriber getSubscriber() {
return this.s;
}
}
class ConcreteSubscriptionA extends Subscription {
protected ConcreteSubscriptionA(long id, Subscriber s) {
super(id, s);
// TODO Auto-generated constructor stub
}
}
class ConcreteSubscriptionB extends Subscription {
protected ConcreteSubscriptionB(long id, Subscriber s) {
super(id, s);
// TODO Auto-generated constructor stub
}
}
interface Subscriber {
public void update(Publication pub);
}
interface Publisher {
public Subscription subscribe(Subscriber subscriber);
}
abstract class PublisherTask implements Runnable, Publisher {
private final ConcurrentHashMap<Long, Subscription> subscribers =
new ConcurrentHashMap<Long, Subscription>();
Long subscriptionId = 0L;
@Override
public void run() {
/*obviously this is a different variable in a real program*/
boolean some_condition = true;
while(some_condition) {
// do some work
Publication pub = /* new ConcretePublication(....) */ null;
for (Subscription s : subscribers.values()) {
s.getSubscriber().update(pub);
}
}
}
@Override
public Subscription subscribe(Subscriber subscriber) {
Subscription sub;
synchronized(subscriptionId) {
/* the lines below are in a function in the sub-class,
* but for brevity I'm showing them here
*/
sub = new ConcreteSubscriptionA(++subscriptionId, subscriber);
subscribers.put(subscriptionId, sub);
}
return sub ;
}
}
abstract class SubscriberTask implements Runnable, Subscriber {
protected ConcurrentLinkedQueue<Publication> newPublications =
new ConcurrentLinkedQueue<Publication>();
@Override
public void run() {
/*obviously this is a different variable in a real program*/
boolean some_condition = true;
while(some_condition) {
// do some work
Publication pub = newPublications.peek();
/* the lines below are in a function in the sub-class,
* but for brevity I'm showing them here
*/
{
if (pub instanceof ConcretePublicationA) {
// Do something with the published data
} else if (pub instanceof ConcretePublicationB) {
// Do something with the published data
}
}
}
}
@Override
public void update(Publication pub) {
/* My question relates to this method:
* Bascially to avoid memory issues I would like existing
* unprocessed publications **Of Tth Same Type As The New One**
* to be discarded
*/
Publication existing = null;
do {
//This won't work coz peek() only looks at the head of the queue
existing = newPublications.peek();
if ((existing != null) && (existing.getClass().equals(pub))) {
newPublications.remove(existing);
}
} while (existing != null);
newPublications.add(pub);
}
좋아, 나는 몇 가지 코드를 게시 할 예정입니다 내 문제를 설명합니다. 다음 질문을하고 싶습니다 :
위의 업데이트 방법에서 ConcurrentLinkedQueue의 모든 요소를 들여다보고 주어진 유형의 요소를 제거 할 수 있습니까?
또한 수업을 개선하고 상호 작용하는 방법을 생각하면 알려 주시기 바랍니다.
감사합니다.