2017-11-28 8 views
1

그래서 Brian Goetz의 JCIP을 읽고 volatile 동작을 실험하기 위해 다음 코드를 작성했습니다. 휘발성이 예상대로 작동하지 않습니다.

public class StatefulObject { 

    private static final int NUMBER_OF_THREADS = 10; 

    private volatile State state; 

    public StatefulObject() { 
     state = new State(); 
    } 

    public State getState() { 
     return state; 
    } 

    public void setState(State state) { 
     this.state = state; 
    } 

    public static class State { 
     private volatile AtomicInteger counter; 

     public State() { 
      counter = new AtomicInteger(); 
     } 

     public AtomicInteger getCounter() { 
      return counter; 
     } 

     public void setCounter(AtomicInteger counter) { 
      this.counter = counter; 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     StatefulObject object = new StatefulObject(); 

     ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS); 

     AtomicInteger oldCounter = new AtomicInteger(); 
     AtomicInteger newCounter = new AtomicInteger(); 

     object.getState().setCounter(oldCounter); 

     ConcurrentMap<Integer, Long> lastSeen = new ConcurrentHashMap<>(); 
     ConcurrentMap<Integer, Long> firstSeen = new ConcurrentHashMap<>(); 
     lastSeen.put(oldCounter.hashCode(), 0L); 
     firstSeen.put(newCounter.hashCode(), Long.MAX_VALUE); 

     List<Future> futures = IntStream.range(0, NUMBER_OF_THREADS) 
      .mapToObj(num -> executorService.submit(() -> { 
       for (int i = 0; i < 1000; i++) { 
        object.getState().getCounter().incrementAndGet(); 
        lastSeen.computeIfPresent(object.getState().getCounter().hashCode(), (key, oldValue) -> Math.max(oldValue, System.nanoTime())); 
        firstSeen.computeIfPresent(object.getState().getCounter().hashCode(), (key, oldValue) -> Math.min(oldValue, System.nanoTime())); 
       } 
      })).collect(Collectors.toList()); 

     executorService.shutdown(); 

     object.getState().setCounter(newCounter); 

     futures.forEach(future -> { 
      try { 
       future.get(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       e.printStackTrace(); 
      } 
     }); 

     System.out.printf("Counter: %s\n", object.getState().getCounter().get()); 
     long lastSeenOld = lastSeen.get(oldCounter.hashCode()); 
     long firstSeenNew = firstSeen.get(newCounter.hashCode()); 
     System.out.printf("Last seen old counter: %s\n", lastSeenOld); 
     System.out.printf("First seen new counter: %s\n", firstSeenNew); 
     System.out.printf("Old was seen after the new: %s\n", lastSeenOld > firstSeenNew); 
     System.out.printf("Old was seen %s nanoseconds after the new\n", lastSeenOld - firstSeenNew); 
    } 
} 

그래서 나는 newCounter가 항상 먼저 oldCounter (나는 어느 것도 부실 카운터를 참조하지 않도록 모든 스레드 업데이트를 통지 할 것으로 예상) 마지막으로 목격 된 후에 만 ​​볼 것으로 기대하고있다. 이 동작을 관찰하기 위해 두 개의 맵을 사용합니다. 그러나 놀랍게도 저는 계속 이런 식으로 결과를 얻습니다 :

Counter: 9917 
Last seen old counter: 695372684800871 
First seen new counter: 695372684441226 
Old was seen after the update: true 
Old was seen 359645 nanoseconds after the new 

내가 잘못한 점을 설명해 주시겠습니까?

미리 감사드립니다.

+0

:

난 그냥 위에서 언급 한 문제를 생략하는 코드에서 몇 가지를 변경했습니다. – nos

+0

@nos 따라서 휘발성 카운터 필드를 업데이트 한 후에도 이전 카운터를 참조하지 않으므로 모든 스레드에서 업데이트를 볼 것으로 예상됩니다. – alxg2112

답변

1

관찰 된 이유는 Java의 버그가 아니지만 코드에 하나의 버그가 있습니다. 귀하의 코드에서 lastseenfirstSeen 맵에 대해 computeIfPresent이 원자 적으로 실행되는 것을 보장 할 수는 없습니다 (Javadocs 참조, computeIfPresent은 원자가 아님). 이것이 의미하는 바는 object.getState().getCounter()을 얻고 실제로지도를 업데이트하는 사이에 시간차가 있다는 것입니다.

이 틈에있는 스레드 A (나노 시간을 얻기 전에 이미 카운터 참조 - 이전)와 스레드 B가 object.getState().getCounter()이되기 직전에 newCounter을 설정하면 발생합니다. 따라서이 정확한 순간 카운터 참조가 업데이트되면 스레드 A는 이전 카운터 키를 업데이트하고 스레드 B는 새 카운터를 업데이트합니다. 스레드 B가 스레드 A (실제로는 CPU 스케줄링이 무엇인지 알 수없는 분리 된 스레드이기 때문에 발생할 수 있음) 이전에 나노 시간이 걸린 경우 완벽하게 관찰 할 수 있습니다.

제 생각은 분명합니다. 한 가지 더 명확히하기 위해 State 클래스에서 AtomicInteger counter을 휘발성으로 선언했습니다. 이것은 AtomicInteger가 본질적으로 휘발성이기 때문에 필요하지 않습니다."비 휘발성"원자 **가 없습니다. 당신이 oldCounter 마지막 ​​보였다 후에 만 ​​newCounter을 기대할 이유도 정확하게 설명한다면 도움이 될

import java.util.Collections; 
import java.util.List; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.Collectors; 
import java.util.stream.IntStream; 

public class StatefulObject { 

    private static final int NUMBER_OF_THREADS = 10; 

    private volatile State state; 

    public StatefulObject() { 
     state = new State(); 
    } 

    public State getState() { 
     return state; 
    } 

    public void setState(State state) { 
     this.state = state; 
    } 

    public static class State { 
     private volatile AtomicInteger counter; 

     public State() { 
      counter = new AtomicInteger(); 
     } 

     public AtomicInteger getCounter() { 
      return counter; 
     } 

     public void setCounter(AtomicInteger counter) { 
      this.counter = counter; 
     } 
    } 

    public static void main(String[] args) throws InterruptedException { 
     StatefulObject object = new StatefulObject(); 

     ExecutorService executorService = Executors.newFixedThreadPool(NUMBER_OF_THREADS); 

     AtomicInteger oldCounter = new AtomicInteger(); 
     AtomicInteger newCounter = new AtomicInteger(); 

     object.getState().setCounter(oldCounter); 

     List<Long> oldList = new CopyOnWriteArrayList<>(); 
     List<Long> newList = new CopyOnWriteArrayList<>(); 

     List<Future> futures = IntStream.range(0, NUMBER_OF_THREADS) 
      .mapToObj(num -> executorService.submit(() -> { 
       for (int i = 0; i < 1000; i++) { 
        long l = System.nanoTime(); 
        object.getState().getCounter().incrementAndGet(); 
        if (object.getState().getCounter().equals(oldCounter)) { 
         oldList.add(l); 
        } else { 
         newList.add(l); 
        } 
       } 
      })).collect(Collectors.toList()); 

     executorService.shutdown(); 

     object.getState().setCounter(newCounter); 

     futures.forEach(future -> { 
      try { 
       future.get(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } catch (ExecutionException e) { 
       e.printStackTrace(); 
      } 
     }); 

     System.out.printf("Counter: %s\n", object.getState().getCounter().get()); 
     Collections.sort(oldList); 
     Collections.sort(newList); 
     long lastSeenOld = oldList.get(oldList.size() - 1); 
     long firstSeenNew = newList.get(0); 
     System.out.printf("Last seen old counter: %s\n", lastSeenOld); 
     System.out.printf("First seen new counter: %s\n", firstSeenNew); 
     System.out.printf("Old was seen after the new: %s\n", lastSeenOld > firstSeenNew); 
     System.out.printf("Old was seen %s nanoseconds after the new\n", lastSeenOld - firstSeenNew); 
    } 
} 
+0

고마움, 당신의 대답과 토마스 '하나 내 물건을 분명히했다 :) – alxg2112

0

표시되는 내용은 휘발성의 영향이 아니지만 ConcurrentMap<> lastSeen에서의 동기화 효과입니다.

10 개의 스레드가 모두 거의 동시에 시작되었다고 가정합시다. 각각은 object.getState().getCounter().incrementAndGet();을 거의 병렬로 수행하므로 oldCounter이 증가합니다.

그런 다음 스레드는 lastSeen.computeIfPresent(object.getState().getCounter().hashCode(), (key, oldValue) -> Math.max(oldValue, System.nanoTime()));을 실행하려고합니다. 즉, 그들은 모두 object.getState().getCounter().hashCode()을 병렬로 평가하며, 각각은 oldCounter의 동일한 해시 코드를 얻은 다음 ConcurrentHashMap.computeIfPresent(Integer, ..)을 동일한 해시 값으로 호출합니다.

모든 스레드가 동일한 키의 값을 업데이트하려고 시도하므로 ConcurrentHashMap은 이러한 업데이트를 동기화해야합니다. 첫 번째 스레드가 메인 스레드가 object.getState().setCounter(newCounter);을 실행 lastSeen을 업데이트되는 시간 동안

, 여러 스레드가 여전히 lastSeen를 업데이트 기다리는 동안 첫 번째 스레드가, newCounter에 대한 firstSeen 실행할 수 있도록.


더 나은 결과를 얻으려면 정보 수집 단계와 분석 단계를 분리하는 것이 좋습니다.

예를 들어, 스레드는 카운터 해시 코드와 업데이트 타임 스탬프를 모든 계산이 완료된 후에 분석하는 배열로 캡처 할 수 있습니다.

+0

명확한 설명을 해주셔서 감사합니다! – alxg2112

관련 문제