배경
현재 내가 gatling을 사용하고있는 스트레스 테스트 도구의 기능 분석 세트를 작업 중입니다.Gatling 시나리오에서 여러 개의 추가 작업을 생성하는 작업
스크롤 검색과 함께 업데이트 검색 API 호출로 elasticsearch를로드하는 작업이 일부 포함됩니다.
나는
1 단계 달성하고자하는 것 : 실행 스크롤 개시를하고 추가로 스크롤 사용할 수있는 _scroll_id
가 쿼리 저장
2 단계 : 실행에 스크롤 쿼리 반복하여 각 스크롤 쿼리의 일부로 반환 된 각 히트를 수정하고이를 elasticsearch로 다시 색인화하여 한 번의 스크롤 쿼리 작업에서 최대 1,000 개의 작업을 효과적으로 생성하고 결과를 샘플링합니다.
1 단계는 쉽습니다. 2 단계는 그렇게 많이하지 않습니다. 내가
나는 현재 JSON 형식의 결과를 구문 분석 ResponseTransformer을 통해이를 달성하기 위해 노력하고있어 시도했습니다 무엇
는 인덱스에 다른 exec(http(...).post(...) etc)
을 시도 각각에 대해 스레드 떨어져 각각 수정 및 화재하게 변화는 탄성 검색으로 돌아 간다.
기본적으로 나는 그것에 대해 잘못 생각하고 있다고 생각합니다. gatling으로 샘플링 된 것은 물론, 인덱싱 스레드는 실행되지 않습니다.
여기 내 스크롤 쿼리 동작의 본체는 다음과 같습니다
...
val pool = Executors.newFixedThreadPool(parallelism)
val query = exec(http("Scroll Query")
.get(s"/_search/scroll")
.body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
.check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
.transformResponse { case response if response.isReceived =>
new ResponseWrapper(response) {
val responseJson = JSON.parseFull(response.body.string)
// Get the hits and
val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]]
for (hit <- hits) {
val id = hit.get("_id").get.asInstanceOf[String]
val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]]
val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable
source("newfield") = "testvalue" // Make a modification
Thread.sleep(pause) // Pause to simulate topology throughput
pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request
}
}
}) // Make some mods and re-index into elasticsearch
...
DocumentIndexer은 다음과 같습니다
class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable {
...
val httpConf = http
.baseURL(s"http://$host:$port/${index}/${doctype}/${id}")
.acceptHeader("application/json")
.doNotTrackHeader("1")
.disableWarmUp
override def run() {
val json = new ObjectMapper().writeValueAsString(source)
exec(http(s"Index ${id}")
.post("/_update")
.body(StringBody(json)).asJSON)
}
}
질문
- 이 개틀링을 사용하는 경우에도 가능합니까?
- 달성하려는 목표를 어떻게 달성 할 수 있습니까?
도움/의견을 보내 주셔서 감사합니다!