2015-01-21 1 views
0

기본 아이디어는 하나의 셀러리 태스크가 메소드를 n 번 호출하고 작은 태스크를 그룹화하여 태스크의 다른 값으로 그룹화하는 것입니다 매번 매개 변수.셀러리 - 하나의 태스크가 다른 매개 변수로 n 번 더 작은 태스크를 호출합니다.

첫 번째 작업은 매번 메서드를 호출하는 for 루프이며, group은 더 작은 작업을 차례로 수행합니다. 첫 번째 작업은 for 루프의 진행으로 웹 페이지를 업데이트합니다.

이 작업을 수행하는 가장 좋은 방법은 무엇입니까?

나는 단지 작업을 지연하는 것을 포함하여 몇 가지 방법을 시도했지만, 작업자 중 한 명은 첫 번째 작업에 잠겨 있고 작은 작업은 할당되지 않은 것으로 나타났습니다. 처리되다.

chains과 작동하지 않습니다.

프리 플레칭을 사용하지 않으려면 -Ofair 플래그를 사용하고 있습니다.하지만 결과가 매우 느리게 나타납니다.

celeryTasks.py

@app.task() 
def sweepSubnets(subnets): 
    ... 
    for subnet in subnets: 
     print "subnet:{0}".format(subnet) 
     sweep.runSweep(subnet, finished) 
     finished += 1 
     percent = (float((float(finished)/ float(noSubnets))) * 100) 
     current_task.update_state(state='PROGRESS', 
     meta={'process_percent': percent, 'subnet' : subnet}) 

    results = sweep.getResults() 
    return results 

@app.task() 
def ping(ip): 
    result = os.system("ping -W 1 -c 1 " + ip + " >/dev/null") 
    return (ip,result) 

sweep.py 여기에 비슷한 문제가 건너 올 수있는 사람들을위한

def runSweep(self, subnet, number): 
    if self.checkSubnet(subnet): 
     print "Pinging {0}".format(subnet) 
     netAdd, nodeAdd, poolSize = self.splitSubnet(subnet) 
     pingResults = self.queuePings(netAdd, nodeAdd, poolSize) 
     activeResults = self.getActiveResults(pingResults) 

     # Adds a tuple to the results list (subnet, active hosts, total hosts) 
     self.results.append({"subnet":subnet, "activeNo":len(activeResults), "totalNo":len(pingResults), "active":activeResults, "total":pingResults, "number":number}) 
    else: 
     self.results.append({"subnet":subnet, "activeNo":0, "totalNo":0, "active":[], "total":[], "number":number}) 

def queuePings(self, netAdd, nodeAdd, poolSize): 
    from celeryTasks import ping 

    ipToPing = [] 

    # For each address, puts the address on the job queue 
    for i in range(1, poolSize): 
     # Checks if the node address is over 254 and increases the network address if so 
     nodeAdd = int(nodeAdd) + 1 
     if int(nodeAdd) > 255: 
      nodeAdd = 0 
      netAdd = netAdd.split(".") 
      netAdd = netAdd[0] + "." + netAdd[1] + "." + str(int(netAdd[2]) + 1) 

     ipToPing.append("{0}.{1}".format(netAdd, nodeAdd)) 

    job = group(ping.s(ip) for ip in ipToPing) 

    result = job.apply_async() 
    print "Getting Results" 
    return result.get() 
+0

정확히 무엇을하려고하는지 코드를 보면 도움이 될 수 있습니다. – user2097159

+0

@ user2097159 코드 – DJDMorrison

+0

을 추가했습니다. 그래서 내가 보았던 한 가지 문제가 있습니다. 이것은 아마도 작업 결과를 기다리는 작업 결과를 기다리는 작업이 아니라는 것입니다. 내가 당신의 코드를 올바르게 이해하고 있다면. 이것은 잠금 조건을 매우 쉽게 유발할 수있는 나쁜 생각입니다. – user2097159

답변

0

내가 무슨 짓을했는지 :

나는 모든를 실행하는 그룹을 사용 이전과 마찬가지로 ping하지만 그룹의 ID를 저장하여 큰 작업을 수행하는 대신 나중에 결과를 복원 할 수 있도록했습니다.

job = group(ping.s(ip) for ip in ipToPing) 
result = job.apply_async() 
while not result.ready(): 
    time.sleep(0.1) 
result.save() 
return result.id 

그런 다음 더 큰 작업이 완료되면 복원하고 작업 한 그룹 ID 목록이 있습니다.

for job in jobList: 
    jobIDs = GroupResult.restore(job) 
    results = jobIDs.get() 
관련 문제