2014-07-11 2 views
0

JPA/EclipseLink 및 MySQL을 데이터베이스로 사용하는 응용 프로그램에서 동시 처리 문제가 발생합니다. 여기에 조언이 필요해.

파일에 문자열 블록을 생성하는 클라이언트가 있습니다. 각 블록에는 몇 줄부터 최대 100 줄 또는 200 줄까지 포함될 수 있습니다. 데이터를 생성 한 직후에 클라이언트는 각 블록이 적절히 분석되고 db에 저장된 서버로 블록 집합을 보내기 시작합니다. 서버는 JPA/EclipseLink를 사용하여 MySQL과 통신합니다.

클라이언트가 REST를 통해 동기식으로 블록 집합을 보내면 데이터 구문 분석 및 DB에 저장하는 것과 관련하여 모든 것이 예상대로 작동합니다. 그러나 클라이언트가 병렬로 데이터를 보낼 때 (내 경우 병렬 스레드 15 개) 일부 데이터 (여러 줄)는 처음으로 db에만 저장하는 동안 건너 뜁니다. 동일한 데이터가 다시 병렬로 전송되면 (두 번째, 세 번째 등) 예상대로 작동합니다 (저장하는 동안 줄을 건너 뛰지 않음).

병렬 전송의 경우 테스트 목적으로 모든 행을 동시 맵에 저장했습니다. 맵에서 건너 뛴 줄을 보지 못했기 때문에 순전히 JPA 및/또는 MySQL 문제입니다. JPA와 함께 낙관적/비관적 잠금을 사용하려고했지만 도움이되지 않았습니다.

다른 사람이 이런 상황에 빠지면 어떻게 될까요? 지금은 내가 루트 문제를 발견 생각하지만 난 아직 해결책을 찾지 못한
JPA/EclipseLink MySQL 동시 처리 문제

@Entity 
@Table(name = "PROJECT") 
@NamedQueries({ 
     @NamedQuery(name = "Project.findByKey", query = "SELECT p FROM Project p WHERE p.key = :P_KEY", 
       hints = { 
         @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE), 
         @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "1000"), 
         @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value = HintValues.TRUE), 
         @QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE) 
       } 
     ) 
}) 
@XmlRootElement 
@XmlAccessorType(XmlAccessType.FIELD) 
@XmlType 
@CascadeOnDelete 
@OptimisticLocking 
public class Project implements Serializable { 

    @Id 
    @TableGenerator(name = "p-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "PROJECT_SEQ", allocationSize = 50) 
    @GeneratedValue(strategy = GenerationType.TABLE, generator = "p-table-gen") 
    private Long id; 

    @Column(name = "DURATION") 
    private Long duration; 

    @Column(name = "PROGRAM") 
    private String program; 

    @Embedded 
    @AttributeOverrides(
     { 
      @AttributeOverride(name = "parentHash", column = @Column(name = "PARENT_CMD_HASH")), 
      @AttributeOverride(name = "hash", column = @Column(name = "CMD_HASH")), 
      @AttributeOverride(name = "pathHash", column = @Column(name = "PATH_HASH")) 
     } 
    ) 
    ProjectKey key = new ProjectKey(); 

    @BatchFetch(BatchFetchType.EXISTS) 
    @OneToMany(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true) 
    @XmlTransient 
    @CascadeOnDelete 
    private List<Task> tasks = new Vector<Task>(); 

    @BatchFetch(BatchFetchType.EXISTS) 
    @OneToOne(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true) 
    @XmlTransient 
    @CascadeOnDelete 
    private Environment environment; 

    @Version 
    @Column(name = "VERSION") 
    private Long version; 
} 



@Entity 
@Table(name = "TASK") 
@NamedQueries({ 
     @NamedQuery(name = "Task.findTaskByBuildAndPathName", 
       query = "SELECT t FROM Task t WHERE t.operation = 'W' AND t.key.pathName = :PATH_NAME ORDER BY t.startTime", 
       hints = { 
         @QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE) 
       } 
     ) 
}) 
@XmlRootElement 
@XmlAccessorType(XmlAccessType.FIELD) 
@XmlType 
@CascadeOnDelete 
@OptimisticLocking 
public class Task implements Serializable, Constants { 
    private static final long serialVersionUID = 1L; 

    @Id 
    @TableGenerator(name = "t-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_SEQ", allocationSize = 200) 
    @GeneratedValue(strategy = GenerationType.TABLE, generator = "t-table-gen") 
    private Long id; 

    @Column(name = "HASH") 
    private String hash; 

    @Column(name = "OPERATION") 
    private String operation; 

    @Column(name = "PROCESS_ID") 
    private BigInteger processId; 

    @Embedded 
    @AttributeOverride(name = "nanoseconds", column = @Column(name = "START_TIME")) 
    private Moment startTime; 

    @Column(name = "THREAD_ID") 
    private BigInteger threadId; 

    @Column(name = "THROUGHPUT") 
    private Float throughput; 

    @Column(name = "PROCESSED_BYTES") 
    private Float processedBytes; 

    @BatchFetch(BatchFetchType.JOIN) 
    @ManyToOne(cascade = { CascadeType.MERGE, CascadeType.REMOVE}, fetch=FetchType.EAGER) 
    @JoinColumn(name = "PATH_STATE_ID", insertable = false, updatable = true) 
    @CascadeOnDelete 
    private TaskKey key; 

    @ManyToOne 
    @JoinColumn(name = "PROJECT_ID") 
    @XmlTransient 
    @CascadeOnDelete 
    private Project project; 

    @Version 
    @Column(name = "VERSION") 
    private Long version; 
} 



@Entity 
@Index(name="IDX_TS_SIZE_TIME_INDEX", columnNames={"SIZE","TIME"}) 
@Table(name = "TASK_STATE") 
@NamedQueries({ 
     @NamedQuery(name = "TaskState.findByKey", query = "SELECT ts FROM TaskState ts WHERE ts.key = :TS_KEY", 
       hints = { 
         @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE), 
         @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "2000"), 
         @QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value="true"), 
         @QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE) 
       } 
     ) 
}) 
@XmlRootElement 
@XmlAccessorType(XmlAccessType.FIELD) 
@XmlType 
public class TaskState implements Serializable { 

    @Id 
    @TableGenerator(name = "ts-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_STATE_SEQ", allocationSize = 50) 
    @GeneratedValue(strategy = GenerationType.TABLE, generator = "ts-table-gen") 
    private Long id; 

    @Lob 
    @Basic(fetch = FetchType.LAZY) 
    private byte[] artifact; 

    @Column(name = "MODE") 
    private String mode; 

    @Embedded 
    @AttributeOverrides({ 
      @AttributeOverride(name = "nanoseconds", column = @Column(name = "TIME")), 
      @AttributeOverride(name = "size", column = @Column(name = "SIZE")), 
      @AttributeOverride(name = "pathName", column = @Column(name = "NAME")), 
      @AttributeOverride(name = "fileHash", column = @Column(name = "HASH")) 
    }) 
    TaskStateKey key = new TaskStateKey(); 

    @BatchFetch(BatchFetchType.EXISTS) 
    @OneToMany(mappedBy = "taskState", orphanRemoval = true, cascade = { CascadeType.REMOVE}) 
    @XmlTransient 
    @CascadeOnDelete 
    private List<Task> tasks = new Vector<Task>(); 
} 

업데이트
: 여기

내 세 개의 관련 엔티티 클래스입니다. 처음에는 데이터베이스에서 프로젝트를 만들고 관련 taskStates를 찾으려고합니다. 처음에는 taskStates가 없으므로 하나씩 만듭니다. 따라서 taskState에 관한 작업의 결과는 다음과 같습니다.

1. 키가 있으면 taskState를 찾거나 반환하십시오. 그렇지 않은 경우

2. 새 taskState 지속/삽입 삽입 작업을 수행하려면 중복 입력 오류가 발생합니다. 따라서 성공적으로 완료 될 때까지 삽입 작업을 다시 시도합니다.) 새로 생성 된 pathState를 반환합니다.

pathState의 현재 삽입 작업에서 중복 항목 오류가 발생할 때마다 건너 뛴 줄을 보았습니다. 두 번째 또는 세 번째로도 pathState를 유지하는 데 성공했습니다. 나는 SERIALIZABLE 격리 수준을 데이터베이스에서 사용할 수 있지만이 경우 내 경우에는 감당할 수없는 비효율적이며 SERIALIZABLE 격리가 내 문제를 해결할 수 있을지는 의문입니다.

+0

엔티티의 모습을 보여 주시면 도움이됩니다. @Version 필드가 있습니까? – Rick

+0

정확하게 '건너 뛴'것을 정의 할 수 있습니까? 한 블록/라인이란 무엇입니까? 어떻게 보내고 있으며 건너 뛴 데이터에 대해 로그에 무엇이 표시됩니까? 데이터가 중복되거나 엔티티를 반복합니까? – Chris

+0

내가 말했듯이 클라이언트는 서버에 블록 집합을 보내고 각 블록에는 여러 줄의 문자열이 들어 있습니다. 데이터베이스에 저장하는 동안 일부 줄이 일부 블록에 저장되지 않았습니다. 각 행을 수백 개의 문자가 포함될 수있는 긴 문자열로 간주하십시오. – CacheCort

답변

0

위의 문제에 대한 해결책을 찾았습니다. TASK_STATE ORM 클래스를 약간 변경했습니다. 특히, 클래스에서 작업 목록을 제거하고 예상대로 작동하기 시작했습니다. 이 관계는 많은 동시 스레드에서 몇 줄 건너 뛰기로 이어지고있는 것 같습니다.

0

TaskState의 ID 생성 전략을 TABLE에서 IDENTITY로 변경하십시오. 이렇게하면 MySQL이 삽입 된 행의 ID를 자체적으로 설정할 수 있습니다.

+0

아니, 도움이되지 않았다. 실제로 테이블에서 AUTO_INCREMENT를 사용하고있었습니다. – CacheCort

+0

'@GeneratedValue (strategy = GenerationType.TABLE, generator = "p-table-gen")'는 JPA가 MySQL의 AUTO_INCREMENT 동작을 건너 뛰고 id 속성에 값을 할당하도록합니다. –