2014-10-22 2 views
0

단순히 오해의 결과 인 경우 사과드립니다. 나는 주변을 검색하고 문서를 읽고 나를 위해 작동하는 해결책을 제시 할 수 없었다.트리 구조에서 재귀 적으로 작업 호출

나는 각 노드가 임의의 수의 자식을 가질 수있는 트리 구조를 가지고 있습니다. 모든 노드에서 새 셀러리 작업이 인스턴스화되어 모든 자식을 살펴보고 새 작업을 인스턴스화하여 인스턴스를 만듭니다. 이 방법을 사용하는 이유는 셀러리의 멀티 스레드 특성을보다 잘 활용하기 위해서입니다. 재귀 적으로 전체 트리를 만드는 단일 작업 만이 단일 스레드를 이용하는 것처럼 보입니다.

내 코드를 이렇게하는 방식으로 설정할 수 있지만, 전체 구조가 만들어 질 때까지 실행할 수없는 일부 종속성이있는 문제가 있습니다.

@app.task 
def initial_task(tree_data): 
    jobs = [] 
    for node in tree_data: 
     jobs.append(recursive_task.s(node)) 
    job = group(jobs) 
    result = job.apply_async() 

    # Block execution until group is finished 
    while not result.ready(): 
     time.sleep(0.5) 

    ... do dependent stuff ... 

@app.task 
def recursive_task(node, parent=None): 
    # Create node object 
    node_obj = Node(node.name, parent=parent) 

    jobs = [] 
    for child in node.children: 
     jobs.append(recursive_task.s(child, node_obj)) 
    job = group(jobs) 
    result = job.apply_async() 

    return node_obj 

의 I는이 문제가 모든 아이들의 하위 작업이 작업의 첫 번째 그룹의 완료를 차단하지 않고 내가 할 것을 강제하는 방법을 모르는 것입니다 : 코드는 같이 보입니다 경우. 이 문제에 대한 도움은 매우 감사하겠습니다.

자식을 만들 때 node_obj의 ID가 필요하므로 단순히 트리를 반복하고 작업을 연결할 수 없습니다.

업데이트 : 코드를 약간 변경하여 결과가 변경되었습니다. 다음 코드는 (손자을 포함, 증손 등) 모든 아이가 최상위 노드의 직접 자식 원인 :

@app.task 
def initial_task(tree_data): 
    def _recursive_link_task(task_set, children): 
     for child in children: 
      task_set.link(create_node.s(child)) 

      if child.children: 
       _recursive_link_task(task_set, child.children) 


    for node in tree_data: 
     s = create_node.s(None, node) 
     if node.children: 
      _recursive_link_task(s, node.children) 
     s.apply_async() 

@app.task 
def create_node(parent, node): 
    node_obj = Node(node.name, parent=parent) 
    return (node_obj,) 
나는의 위의 코드와 함께 좀 더 행운이있을 수 있습니다 것으로 예상했다

코드 -하지만 모든 후속 작업에 전달되는 초기 노드 객체 일 뿐이므로이 트리 구조를 생성하려고 시도한 것은 아닙니다.

답변

1

chords을 사용하면 일련의 작업 결과에 종속적 인 작업을 실행할 수 있습니다.

정확하게 재귀 작업을 호출하는 방법을 이해하지 못했기 때문에 reference 병합 정렬 예제를 구현했습니다.

참고 작업 내에서 get을 호출하면 예외가 발생하므로 셀러리 3.2.0 이상에서는 작동하지 않습니다.

from celery import Celery, chord 
app = Celery('tasks', backend='amqp', broker='amqp://') 
app.conf.CELERY_RESULT_BACKEND = 'amqp' 


def mergesort(list_obj): 
    '''normal mergesort 
    ''' 
    if len(list_obj) <= 1: 
     return list_obj 
    middle = len(list_obj)/2 
    left, right = list_obj[:middle], list_obj[middle:] 
    return list(merge(list(mergesort(left)), list(mergesort(right)))) 

def merge(left, right): 
    '''normal merge 
    ''' 
    while 1: 
     if left == []: 
      for j in right: 
       yield j 
      break 
     elif right == []: 
      for j in left: 
       yield j 
      break 
     elif left[0] < right[0]: 
      yield left.pop(0) 
     else: 
      yield right.pop(0) 

def merge2(left_r, right_r): 
    '''celery merge 
    ''' 
    left =left_r.get() 
    right = right_r.get() 
    while 1: 
     if left == []: 
      for j in right: 
       yield j 
      break 
     elif right == []: 
      for j in left: 
       yield j 
      break 
     elif left[0] < right[0]: 
      yield left.pop(0) 
     else: 
      yield right.pop(0) 
@app.task 
def merge_c(in_list): 
    '''celery merge 
    ''' 
    #unpack 
    print '*'*21 + str(in_list) 
    left, right = in_list 
    return list(merge2(left, right)) 

@app.task 
def same_object(l_obj): 
    '''helper function to convert list to `result` 
    ''' 
    return l_obj 

@app.task 
def mergesort_c(list_obj): 
    '''celery mergesort 
    ''' 
    if len(list_obj) <= 1: 
     # make sure that you return a `result` object for merge 
     return same_object.delay(list_obj) 
    middle = len(list_obj)/2 
    left, right = list_obj[:middle], list_obj[middle:] 
    # finish mergesort (left) and mergesort(right) and merge them 
    res = chord([mergesort_c.s(left), mergesort_c.s(right)])(merge_c.s()) 
    return res 

if __name__ == '__main__': 
    l = [2,1, 3] 
    #normal mergesort 
    print mergesort(l) #[1, 2, 3, 3, 5] 
    # with celery 
    res = mergesort_c(l) 
    print res.get() 
+0

감사합니다. 내가 기회를 얻으면 필자가 작성한 코드로 작업에 적용 할 수 있는지 알게 될 것입니다. 나는 연쇄 작업을 시도했다. 아마도 화음이 더 나은 결과를 제공 할 것이다. – djbp

+0

행운을 비네. 그 사이에 나는 새로운 샐러리로 비난되지 않을 응답에 종사 할 것이다 :) – srj

관련 문제