JPA/EclipseLink 및 MySQL을 데이터베이스로 사용하는 응용 프로그램에서 동시 처리 문제가 발생합니다. 여기에 조언이 필요해.
파일에 문자열 블록을 생성하는 클라이언트가 있습니다. 각 블록에는 몇 줄부터 최대 100 줄 또는 200 줄까지 포함될 수 있습니다. 데이터를 생성 한 직후에 클라이언트는 각 블록이 적절히 분석되고 db에 저장된 서버로 블록 집합을 보내기 시작합니다. 서버는 JPA/EclipseLink를 사용하여 MySQL과 통신합니다.
클라이언트가 REST를 통해 동기식으로 블록 집합을 보내면 데이터 구문 분석 및 DB에 저장하는 것과 관련하여 모든 것이 예상대로 작동합니다. 그러나 클라이언트가 병렬로 데이터를 보낼 때 (내 경우 병렬 스레드 15 개) 일부 데이터 (여러 줄)는 처음으로 db에만 저장하는 동안 건너 뜁니다. 동일한 데이터가 다시 병렬로 전송되면 (두 번째, 세 번째 등) 예상대로 작동합니다 (저장하는 동안 줄을 건너 뛰지 않음).
병렬 전송의 경우 테스트 목적으로 모든 행을 동시 맵에 저장했습니다. 맵에서 건너 뛴 줄을 보지 못했기 때문에 순전히 JPA 및/또는 MySQL 문제입니다. JPA와 함께 낙관적/비관적 잠금을 사용하려고했지만 도움이되지 않았습니다.
다른 사람이 이런 상황에 빠지면 어떻게 될까요? 지금은 내가 루트 문제를 발견 생각하지만 난 아직 해결책을 찾지 못한
JPA/EclipseLink MySQL 동시 처리 문제
@Entity
@Table(name = "PROJECT")
@NamedQueries({
@NamedQuery(name = "Project.findByKey", query = "SELECT p FROM Project p WHERE p.key = :P_KEY",
hints = {
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "1000"),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value = HintValues.TRUE),
@QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
}
)
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
@CascadeOnDelete
@OptimisticLocking
public class Project implements Serializable {
@Id
@TableGenerator(name = "p-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "PROJECT_SEQ", allocationSize = 50)
@GeneratedValue(strategy = GenerationType.TABLE, generator = "p-table-gen")
private Long id;
@Column(name = "DURATION")
private Long duration;
@Column(name = "PROGRAM")
private String program;
@Embedded
@AttributeOverrides(
{
@AttributeOverride(name = "parentHash", column = @Column(name = "PARENT_CMD_HASH")),
@AttributeOverride(name = "hash", column = @Column(name = "CMD_HASH")),
@AttributeOverride(name = "pathHash", column = @Column(name = "PATH_HASH"))
}
)
ProjectKey key = new ProjectKey();
@BatchFetch(BatchFetchType.EXISTS)
@OneToMany(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true)
@XmlTransient
@CascadeOnDelete
private List<Task> tasks = new Vector<Task>();
@BatchFetch(BatchFetchType.EXISTS)
@OneToOne(mappedBy="project", cascade = CascadeType.ALL, fetch = FetchType.LAZY, orphanRemoval = true)
@XmlTransient
@CascadeOnDelete
private Environment environment;
@Version
@Column(name = "VERSION")
private Long version;
}
@Entity
@Table(name = "TASK")
@NamedQueries({
@NamedQuery(name = "Task.findTaskByBuildAndPathName",
query = "SELECT t FROM Task t WHERE t.operation = 'W' AND t.key.pathName = :PATH_NAME ORDER BY t.startTime",
hints = {
@QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
}
)
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
@CascadeOnDelete
@OptimisticLocking
public class Task implements Serializable, Constants {
private static final long serialVersionUID = 1L;
@Id
@TableGenerator(name = "t-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_SEQ", allocationSize = 200)
@GeneratedValue(strategy = GenerationType.TABLE, generator = "t-table-gen")
private Long id;
@Column(name = "HASH")
private String hash;
@Column(name = "OPERATION")
private String operation;
@Column(name = "PROCESS_ID")
private BigInteger processId;
@Embedded
@AttributeOverride(name = "nanoseconds", column = @Column(name = "START_TIME"))
private Moment startTime;
@Column(name = "THREAD_ID")
private BigInteger threadId;
@Column(name = "THROUGHPUT")
private Float throughput;
@Column(name = "PROCESSED_BYTES")
private Float processedBytes;
@BatchFetch(BatchFetchType.JOIN)
@ManyToOne(cascade = { CascadeType.MERGE, CascadeType.REMOVE}, fetch=FetchType.EAGER)
@JoinColumn(name = "PATH_STATE_ID", insertable = false, updatable = true)
@CascadeOnDelete
private TaskKey key;
@ManyToOne
@JoinColumn(name = "PROJECT_ID")
@XmlTransient
@CascadeOnDelete
private Project project;
@Version
@Column(name = "VERSION")
private Long version;
}
@Entity
@Index(name="IDX_TS_SIZE_TIME_INDEX", columnNames={"SIZE","TIME"})
@Table(name = "TASK_STATE")
@NamedQueries({
@NamedQuery(name = "TaskState.findByKey", query = "SELECT ts FROM TaskState ts WHERE ts.key = :TS_KEY",
hints = {
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE, value = HintValues.TRUE),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_SIZE, value = "2000"),
@QueryHint(name = QueryHints.QUERY_RESULTS_CACHE_IGNORE_NULL, value="true"),
@QueryHint(name = QueryHints.BIND_PARAMETERS, value = HintValues.TRUE)
}
)
})
@XmlRootElement
@XmlAccessorType(XmlAccessType.FIELD)
@XmlType
public class TaskState implements Serializable {
@Id
@TableGenerator(name = "ts-table-gen", table = "SEQ", pkColumnName = "SEQ_NAME", valueColumnName = "SEQ_COUNT", pkColumnValue = "TASK_STATE_SEQ", allocationSize = 50)
@GeneratedValue(strategy = GenerationType.TABLE, generator = "ts-table-gen")
private Long id;
@Lob
@Basic(fetch = FetchType.LAZY)
private byte[] artifact;
@Column(name = "MODE")
private String mode;
@Embedded
@AttributeOverrides({
@AttributeOverride(name = "nanoseconds", column = @Column(name = "TIME")),
@AttributeOverride(name = "size", column = @Column(name = "SIZE")),
@AttributeOverride(name = "pathName", column = @Column(name = "NAME")),
@AttributeOverride(name = "fileHash", column = @Column(name = "HASH"))
})
TaskStateKey key = new TaskStateKey();
@BatchFetch(BatchFetchType.EXISTS)
@OneToMany(mappedBy = "taskState", orphanRemoval = true, cascade = { CascadeType.REMOVE})
@XmlTransient
@CascadeOnDelete
private List<Task> tasks = new Vector<Task>();
}
업데이트
: 여기
1. 키가 있으면 taskState를 찾거나 반환하십시오. 그렇지 않은 경우
2. 새 taskState 지속/삽입 삽입 작업을 수행하려면 중복 입력 오류가 발생합니다. 따라서 성공적으로 완료 될 때까지 삽입 작업을 다시 시도합니다.) 새로 생성 된 pathState를 반환합니다.
pathState의 현재 삽입 작업에서 중복 항목 오류가 발생할 때마다 건너 뛴 줄을 보았습니다. 두 번째 또는 세 번째로도 pathState를 유지하는 데 성공했습니다. 나는 SERIALIZABLE 격리 수준을 데이터베이스에서 사용할 수 있지만이 경우 내 경우에는 감당할 수없는 비효율적이며 SERIALIZABLE 격리가 내 문제를 해결할 수 있을지는 의문입니다.
엔티티의 모습을 보여 주시면 도움이됩니다. @Version 필드가 있습니까? – Rick
정확하게 '건너 뛴'것을 정의 할 수 있습니까? 한 블록/라인이란 무엇입니까? 어떻게 보내고 있으며 건너 뛴 데이터에 대해 로그에 무엇이 표시됩니까? 데이터가 중복되거나 엔티티를 반복합니까? – Chris
내가 말했듯이 클라이언트는 서버에 블록 집합을 보내고 각 블록에는 여러 줄의 문자열이 들어 있습니다. 데이터베이스에 저장하는 동안 일부 줄이 일부 블록에 저장되지 않았습니다. 각 행을 수백 개의 문자가 포함될 수있는 긴 문자열로 간주하십시오. – CacheCort