2016-06-29 3 views
2

날짜와 정수가 포함 된 초대형 csv 파일이 있습니다. 각 파일 레코드에 대해 Ecto 레코드를 만들어야합니다. 문제는 이웃 레코드의 날짜 사이의 최소 시간차를 기준으로 정수를 조작해야한다는 것입니다. 나는 한 점까지 스트림을 처리하려고 시도했다. 변수에 바인딩 한 다음 두 개의 다른 계산에 사용했다. 그러나 두 번째 계산은 빈 스트림을 얻습니다. 스트림에 대한 모든 액세스는 내가 가진 것을 제거합니다. 스트림을 재사용/fork/clone/dup/something 할 수있는 방법이 있습니까? RX 스트림에는 이러한 개념이 있습니다. 나는 한 가지 사슬에서이 일을하는 어떤 방법을 생각해 보았지만 공허 해졌다.엘릭서 파일 스트림 재사용/포크

def do_something(path) do 
    {:ok, file} = File.open(path) 
    stream = file 
    |> IO.stream(:line) 
    |> Stream.map(&String.split(&1, ",")) 

    dates = stream_to_dates(stream) # stream 
    factor = dates_to_factor(dates) # float 
    values = stream_to_values(stream, factor) # stream 

    Stream.zip(dates, values) 
end 

그때 요인을 날짜를 계산하고, 수,하지만 바로 그 후, 스트림 및 날짜가 모두 빈 스트림, 따라서 값입니다 따라서, 비어 : 여기에 내가하려고했던 흐름은 기본적으로 우편은 ... 비어

답변

1

당신은 주어진 스트림에서 쌍의 스트림을 만들 Stream.transform을 사용할 수 있습니다 그리고

def pairs(stream) do 
    Stream.transform(stream, nil, fn(x, last) -> 
    # The first element is the list of values to return at this point, 
    # the second one is the new accumulator 
    {[{last, x}], x} 
    end) 
    # Drop the first pair of {nil, something} 
    |> Stream.drop(1) 
end 

:

iex(1)> 1..1000 |> Stream2.pairs |> Enum.take(5) 
[{1, 2}, {2, 3}, {3, 4}, {4, 5}, {5, 6}] 

|> Stream.map(&String.split(&1, ",")) 뒤에 쌍을 사용하여 인접한 레코드 쌍을 얻을 수 있어야합니다. 더 큰 덩어리가 필요하면 함수를 일반화 할 수 있습니다.