2016-06-28 1 views
0

일부 로그 유형 항목을 작성하기 위해 일괄 처리를 작성하고 있습니다. 그것은 내 API와 별도의 응용 프로그램이 될 것이고 오직이 작업을 수행 할 것입니다. 지금 당장은 logMsg가 전달되는 엔드 포인트가있는 Express API를 가지고 있습니다. 이것들은 많은 사용자들로부터 들어오고 각 사용자는 10 초마다 하나씩 보냅니다. 움직이는 것을 유지하기 위해 나는 logMsg에 간단한 작업을 몇 번하고 그것을 Postgres DB에 집어 넣었다.knex 쿼리 빌더 결과에서 동기화 처리

이제 활동 중에 소비 된 시간을보다 쉽게 ​​계산할 수 있도록 logEntries를 구축하기 위해 이러한 메시지를 반복하고 싶습니다. 들어올 때마다 각 logMsg에는 활동 (또는 하나도 없음), 사용자 및 시간 소인이 있습니다. LogEntry에는 user, activity, startTime 및 endTime이 있습니다. 그렇게하면 API 응용 프로그램이 사용자 또는 사용자 그룹의 활동별로 활동을하거나하지 않는 시간을 더 쉽게 계산할 수 있습니다.

그래서 일부 처리되지 않은 포인트를 잡고 처리중인 것으로 표시 한 다음 처리 한 다음 완료로 표시하는 배치 작업을 설정하고 싶습니다. 내가 곤경에 처하게되는 곳은 상황을 정돈하는 것입니다.

let activityLogJob = new CronJob('30 * * * * *', function() { 

    return knex 
    .raw() //grab a chunk of rows and mark as In Progress 
    .then((processingRows) => { 
     _.forEach(logMsgs, (msg) => { 
      console.log('message id:', msg.id); 
      return knex.select() //latest log entry for msg.user 
       .then((logEntry) => { 
        if (logEntry) { 
         if (logEntry.activity_id !== msg.activity_id) { 
          console.log('new activity started. end old one'); 
          return knex.transaction((trx) => { 
           trx 
           .update() //logEntry.endTime = msg.startTime 
           .then((update) => { 
            trx 
            .insert(); //create new logEntry for new activity 
           }); 
          }); 
         } 
        } else { 
         console.log('first log entry for user'); 
         knex 
         .insert(); //create new logEntry since we don't have one for the user 
        } 
       }) 
       .then((result) => { 
        console.log('finished msg :', msg.id); 
        knex 
        .update(); //set msg.status = LOGGED 
       }) 
       .catch((err) => { 
        logger.error("ERROR: ", err); 
       }); 
     });  
    }); 

}, null, true); 

그래서 내가 무엇을 위의 코드를 참조하면 잘 모든 행을 잡고 있다는 것입니다, 그러나 실제로 처리로 전환 될 때 이상한 가져옵니다. 로그 문은 다음과 같이 표시됩니다.

message 1 
first log entry for user 
message 2 
first log entry for user 
... 
message n 
first log entry for user 

finished message 1 
finished message 2 
... 
finished message n 

그래서 여기에 두 가지 문제가 있다고 생각합니다.
1. 기존 LogEntry를 찾지 않고 매번 새로운 LogEntry를 만듭니다. 그것은 모든 메시지가 처리 될 때까지 삽입이 일어나지 않는 것처럼 느껴집니다.
2. 나는 내 문제는 내가 아직 약속 주위에 제대로 내 머리를 포장 아니에요 꽤 확신 더

message 1 
first entry for user 
finished message 1 
message 2 
first entry for user 
finished message 2 
... 

처럼되고 로그 문을 기대했을 것이다.

답변

0

다른 약속의 콜백 안에 중첩 된 모든 약속을 반환해야합니다. 그렇지 않으면 중첩 된 약속의 결과가 체인을 통과하지 않습니다. 메시지가 동기화되는 데 도움이됩니다.

return knex 
.raw() //grab a chunk of rows and mark as In Progress 
.then((processingRows) => { 
    _.forEach(logMsgs, (msg) => { 
     console.log('message id:', msg.id); 
     return knex.select() //latest log entry for msg.user 
      .then((logEntry) => { 
       if (logEntry) { 
        if (logEntry.activity_id !== msg.activity_id) { 
         console.log('new activity started. end old one'); 
         return knex.transaction((trx) => { 
          return trx ##### 
          .update() //logEntry.endTime = msg.startTime 
          .then((update) => { 
           return trx ##### 
           .insert(); //create new logEntry for new activity 
          }); 
         }); 
        } 
       } else { 
        console.log('first log entry for user'); 
        return knex ##### 
        .insert(); //create new logEntry since we don't have one for the user 
       } 
      }) 
      .then((result) => { 
       console.log('finished msg :', msg.id); 
       return knex ##### 
       .update(); //set msg.status = LOGGED 
      }) 
      .catch((err) => { 
       logger.error("ERROR: ", err); 
      }); 
    });  
});