2013-10-09 2 views
1

우리가 사용하는 과제에 대해 우리가하려는 일 중 하나는 유니 프로트 레코드를 수집하기 위해 한 줄씩 유니 코드 데이터베이스 파일을 구문 분석하는 것입니다.무엇 goroutines의 거대한 오버 헤드를 만들 수 있습니까?

나는 너무 많은 코드를 공유하는 것을 선호하지 않지만, 48 초 내에 (2.5GB) 파일을 올바르게 구문 분석하는 작업 코드 스 니펫이 있습니다 (시간 이동 패키지를 사용하여 측정). 반복적으로 파일을 구문 분석하고 레코드 끝 신호에 도달 할 때까지 레코드에 줄을 추가합니다 (전체 레코드). 레코드의 메타 데이터가 만들어집니다. 그런 다음 레코드 문자열이 무효화되고 새 레코드가 줄 단위로 수집됩니다. 그렇다면 나는 일하는 것을 시도 할 것이라고 생각했다.

나는 stackoverflow에서 팁을 얻었고, 원래 코드에 메타 데이터 생성과 관련된 모든 것을 처리하는 함수를 추가했다.

그래서, 코드가

  1. 레코드 정지 신호가 발견되면,
  2. 파일을 반복하고 기록에 줄을 추가,
  3. 을 빈 레코드를 작성하고있다 (지금은 전체가 레코드) - 이동 루틴에 메타 데이터를 생성하기 위해 제공
  4. null 레코드 문자열을 입력하고 2)에서 계속하십시오.

또한 각 루틴을 끝내기를 기다렸는지 확인하기 위해 sync.WaitGroup()을 추가했습니다. goroutine이 각 레코드에서 작동하는 동안 구문 분석을 계속하면서 실제로 데이터베이스 파일을 구문 분석하는 데 소요되는 시간을 줄일 수 있다고 생각했습니다. 그러나이 코드는 20 분 이상 실행되어 무언가가 잘못되었거나 오버 헤드가 미친 듯합니다. 어떤 제안?

package main 

import (
    "bufio" 
    "crypto/sha1" 
    "fmt" 
    "io" 
    "log" 
    "os" 
    "strings" 
    "sync" 
    "time" 
) 

type producer struct { 
    parser uniprot 
} 

type unit struct { 
    tag string 
} 

type uniprot struct { 
    filenames  []string 
    recordUnits chan unit 
    recordStrings map[string]string 
} 

func main() { 
    p := producer{parser: uniprot{}} 
    p.parser.recordUnits = make(chan unit, 1000000) 
    p.parser.recordStrings = make(map[string]string) 
    p.parser.collectRecords(os.Args[1]) 
} 

func (u *uniprot) collectRecords(name string) { 
    fmt.Println("file to open ", name) 
    t0 := time.Now() 
    wg := new(sync.WaitGroup) 
    record := []string{} 
    file, err := os.Open(name) 
    errorCheck(err) 
    scanner := bufio.NewScanner(file) 
    for scanner.Scan() { //Scan the file 
     retText := scanner.Text() 
     if strings.HasPrefix(retText, "//") { 
      wg.Add(1) 
      go u.handleRecord(record, wg) 
      record = []string{} 
     } else { 
      record = append(record, retText) 
     } 
    } 
    file.Close() 
    wg.Wait() 
    t1 := time.Now() 
    fmt.Println(t1.Sub(t0)) 
} 

func (u *uniprot) handleRecord(record []string, wg *sync.WaitGroup) { 
    defer wg.Done() 
    recString := strings.Join(record, "\n") 
    t := hashfunc(recString) 
    u.recordUnits <- unit{tag: t} 
    u.recordStrings[t] = recString 
} 

func hashfunc(record string) (hashtag string) { 
    hash := sha1.New() 
    io.WriteString(hash, record) 
    hashtag = string(hash.Sum(nil)) 
    return 
} 

func errorCheck(err error) { 
    if err != nil { 
     log.Fatal(err) 
    } 
} 
+1

어떻게 'goroutine에 제공합니까'구현 되었습니까? 이것은 쉽게 많은 바이트를 복사 할 수 있습니다. 일반적으로, 최소한 구현을하는 것이 좋다. 코드가 실제로 병렬로 실행되고 있습니까 (GOMAXPROCS> 1)? 모든 메타 데이터 작업에 대한 goroutine이 있습니까? 아니면 하나입니까? – nemo

+0

코드를 추가했습니다. 나는 내 코드가 병렬로 실행되고 있고, 각각의 메타 데이터 처리 (코드 참조)에 대해 이동 루틴이 만들어져 있다고 가정한다. – stian

+0

대용량 채널 버퍼는 채널에서 뭔가를 가져오고 있지 않기 때문에 모든 것이 다 빠르다. – stian

답변

3

우선 : 코드가 스레드로부터 안전하지 않습니다. 주로 해시 맵 에 동시에 액세스하고 있기 때문입니다. 이들은 동시성에있어 안전하지 않으며 잠글 필요가 있습니다. 코드에 오류가 라인 : 당신이 GOMAXPROCS> 1 이동을 실행하는 경우에 날려 버리겠다이 같이

u.recordStrings[t] = recString 

, 나는 당신이 그 일을하지 않을 것을 가정하고있다. 병렬 처리를 실현하려면 응용 프로그램을 GOMAXPROCS=2 이상으로 실행해야합니다. 기본값은 1입니다. 따라서 두 개의 CPU 또는 CPU 코어에서 동시에 예약 할 수없는 단일 OS 스레드에서 코드가 실행됩니다. 예 :

$ GOMAXPROCS=2 go run udb.go uniprot_sprot_viruses.dat 

마지막으로 채널에서 값을 가져 오지 않으면 프로그램이 종료되지 않습니다. goroutines 수가 제한을 초과하면 교착 상태가 발생합니다. 76MiB file of data으로 테스트했는데 파일 크기가 약 2.5GB라고했습니다. 16347 개의 항목이 있습니다. 선형 성장을 가정하면 파일이 1e6을 초과하므로 채널에 슬롯이 충분하지 않으며 프로그램 이 교착 상태가되어 결국 (비참하게)에서 실패하지 않는 골 루틴을 누적하는 동안 아무런 결과도 얻지 못합니다.

그래서 해결책은 채널의 값을 가져 오는 go 루틴을 추가하고 을 사용하여이를 처리해야합니다.

참고 : 성능이 걱정되면 문자열을 항상 복사하므로 사용하지 마십시오. 대신 []byte을 사용하십시오.

관련 문제