2017-04-04 2 views
0

이 문제에 도움이 필요합니다. 나는 스파우트가 데이터를 읽거나 Bolt에서 처리하기 위해 그것을 준비 할 책임이 있음을 읽었습니다. 그래서 스파우트에 파일을 열고 줄 단위로 읽을 코드를 썼습니다.스파우트 로직 오류

class SimSpout(storm.Spout): 
    # Not much to do here for such a basic spout 
    def initialize(self, conf, context): 
    ## Open the file with read only permit 
     self.f = open('data.txt', 'r') 
    ## Read the first line 
     self._conf = conf 
     self._context = context 
     storm.logInfo("Spout instance starting...") 
    # Process the next tuple 
    def nextTuple(self): 
     # check if it reach at the EOF to close it 
     for line in self.f.readlines(): 
     # Emit a random sentence 
     storm.logInfo("Emiting %s" % line) 
     storm.emit([line]) 

# Start the spout when it's invoked 
SimSpout().run() 

맞습니까?

답변

0

당신은 스톰에서 스톰의 책임하에 처리 할 다운 스트림 볼트에 대한 튜플을 방출합니다.

Spout의 nextTuple 책임은 수신 될 때마다 하나의 이벤트를 방출하는 것입니다. 코드에서 파일의 모든 행을 내보내고 있습니다. 단일 튜플이 단일 행인 경우. 당신은 파일 오프셋 유지하고 오프셋 라인을 읽고 방출한다, 업데이트, 오프셋 = 회신에 대한

class SimSpout(storm.Spout): 

    # Not much to do here for such a basic spout 
    def initialize(self, conf, context): 
    ## Open the file with read only permit 
    self.f = open('data.txt', 'r') 
    ## Read the first line 
    self._conf = conf 
    self._context = context 
    self._offset = 0 
    storm.logInfo("Spout instance starting...") 

# Process the next tuple 
def nextTuple(self): 
    # check if it reach at the EOF to close it 
    with open(self.f) as f: 
     f.readlines()[self._offset] 
     #Emit a random sentence 
     storm.logInfo("Emiting %s" % line) 
     storm.emit([line]) 
    self._offset = self._offset + 1 
+0

많은 감사합니다 아래, 나는이있어 같은 + 1 뭔가 오프셋 "개방 (self.f)와 as f : TypeError : 유니 코드로 강제 변환 : 문자열 또는 버퍼가 필요합니다. 파일을 찾았습니다. " – user3188912

+0

오류가 발생했습니다. 파일을 두 번 열었기 때문에 해결되었지만 다른 AttributeError가 있습니다. 'SimSpout'개체에 '_offset'속성이 없습니다. 해결을 위해 검색했지만 아무 도움도 찾지 못했습니다. – user3188912