2017-02-23 1 views
1

배경

현재 내가 gatling을 사용하고있는 스트레스 테스트 도구의 기능 분석 세트를 작업 중입니다.Gatling 시나리오에서 여러 개의 추가 작업을 생성하는 작업

스크롤 검색과 함께 업데이트 검색 API 호출로 elasticsearch를로드하는 작업이 일부 포함됩니다.

나는

1 단계 달성하고자하는 것 : 실행 스크롤 개시를하고 추가로 스크롤 사용할 수있는 _scroll_id가 쿼리 저장

2 단계 : 실행에 스크롤 쿼리 반복하여 각 스크롤 쿼리의 일부로 반환 된 각 히트를 수정하고이를 elasticsearch로 다시 색인화하여 한 번의 스크롤 쿼리 작업에서 최대 1,000 개의 작업을 효과적으로 생성하고 결과를 샘플링합니다.

1 단계는 쉽습니다. 2 단계는 그렇게 많이하지 않습니다. 내가

나는 현재 JSON 형식의 결과를 구문 분석 ResponseTransformer을 통해이를 달성하기 위해 노력하고있어 시도했습니다 무엇

는 인덱스에 다른 exec(http(...).post(...) etc)을 시도 각각에 대해 스레드 떨어져 각각 수정 및 화재하게 변화는 탄성 검색으로 돌아 간다.

기본적으로 나는 그것에 대해 잘못 생각하고 있다고 생각합니다. gatling으로 샘플링 된 것은 물론, 인덱싱 스레드는 실행되지 않습니다.

여기 내 스크롤 쿼리 동작의 본체는 다음과 같습니다

... 

    val pool = Executors.newFixedThreadPool(parallelism) 

    val query = exec(http("Scroll Query") 
    .get(s"/_search/scroll") 
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query 
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response 
    .transformResponse { case response if response.isReceived => 
     new ResponseWrapper(response) { 
     val responseJson = JSON.parseFull(response.body.string) 
     // Get the hits and 
     val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]] 
     for (hit <- hits) { 
      val id = hit.get("_id").get.asInstanceOf[String] 
      val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]] 
      val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable 
      source("newfield") = "testvalue" // Make a modification 
      Thread.sleep(pause) // Pause to simulate topology throughput 
      pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request 
     } 
     } 
    }) // Make some mods and re-index into elasticsearch 

    ... 

DocumentIndexer은 다음과 같습니다

class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable { 

    ... 

    val httpConf = http 
    .baseURL(s"http://$host:$port/${index}/${doctype}/${id}") 
    .acceptHeader("application/json") 
    .doNotTrackHeader("1") 
    .disableWarmUp 

    override def run() { 

    val json = new ObjectMapper().writeValueAsString(source) 

    exec(http(s"Index ${id}") 
     .post("/_update") 
     .body(StringBody(json)).asJSON) 

    } 

} 

질문

  1. 이 개틀링을 사용하는 경우에도 가능합니까?
  2. 달성하려는 목표를 어떻게 달성 할 수 있습니까?

도움/의견을 보내 주셔서 감사합니다!

답변

1

jsonPath을 사용하여 JSON 히트 배열을 추출하고 세션에 요소를 저장 한 다음 액션 체인에서 foreach를 사용하고 루프에서 인덱스 작업을 exec- 색인 생성을 수행 할 수 있습니다 따라서.

예 :
ScrollQuery

... 
    val query = exec(http("Scroll Query") 
    .get(s"/_search/scroll") 
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query 
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response 
    .check(jsonPath("$.hits.hits[*]").ofType[Map[String,Any]].findAll.saveAs("hitsJson")) // Save a List of hit Maps into the session 
) 
... 

시뮬레이션

... 
    val scrollQueries = scenario("Enrichment Topologies").exec(ScrollQueryInitiator.query, repeat(numberOfPagesToScrollThrough, "scrollQueryCounter"){ 
     exec(ScrollQuery.query, pause(10 seconds).foreach("${hitsJson}", "hit"){ exec(HitProcessor.query) }) 
    }) 
... 

HitProcessor

... 
    def getBody(session: Session): String = { 
    val hit = session("hit").as[Map[String,Any]] 
    val id = hit("_id").asInstanceOf[String] 
    val source = mapAsScalaMap(hit("_source").asInstanceOf[java.util.LinkedHashMap[String,Any]]) 
    source.put("newfield", "testvalue") 
    val sourceJson = new ObjectMapper().writeValueAsString(mapAsJavaMap(source)) 
    val json = s"""{"doc":${sourceJson}}""" 
    json 
    } 

    def getId(session: Session): String = { 
    val hit = session("hit").as[Map[String,Any]] 
    val id = URLEncoder.encode(hit("_id").asInstanceOf[String], "UTF-8") 
    val uri = s"/${index}/${doctype}/${id}/_update" 
    uri 
    } 

    val query = exec(http(s"Index Item") 
    .post(session => getId(session)) 
    .body(StringBody(session => getBody(session))).asJSON) 
... 

면책 조항 :이 코드는 여전히 최적화가 필요합니다! 그리고 저는 아직 실제로 많은 스칼라를 배웠습니다. 더 나은 솔루션으로 의견을 말하십시오.

이 작업을 수행 한 후, 실제로 달성하고 싶은 것은 주어진 수의 색인 작업을 병렬 처리하는 것입니다. ie : 나는 1000 개의 히트를 되 돌린다. 나는 각각의 히트마다 인덱스 작업을 수행하고 싶다. 단지 그들을 반복하고 그들을 하나씩 수행하는 것이 아니라 동시에 10 개를 수행하고 싶다.

그러나 저는 이것이 별개의 질문이라고 생각합니다. 실제로 저는 그것을 제시 할 것입니다.

관련 문제