2017-11-22 3 views
0

paho mqtt 클라이언트를 사용하여 여러 주제를 구독했습니다. 브로커에서 메시지를 받으면 메시지를 mysql 데이터베이스에 저장하려고합니다. DB에 삽입하기 전에 메시지를 모으고 싶습니다. 나는 임계 값이 1000 메시지라고 설정했습니다. 임계 값에 도달 한 경우에만 메시지를 DB에 모두 삽입해야합니다. cursor.execute() 다음에 row_count를 확인합니다. 그러나 그것은 1로 카운트를 보여줍니다. 그래서 대량 삽입은 일어나지 않습니다. 여기 MySQL의 커넥터 - 파이썬을 사용하여 반면 한 번에 하나의 쿼리를 실행할 수있는 실행 pymysql 모듈 내 샘플 코드Python : MQTT 브로커 메시지가 mysql 데이터베이스에 대량 삽입

//main.py 

#mysql database class 
db = MySQLDBClass() 

#mqtt client class where subscription,connection to broker,some callbacks 
mqttclient = MyMQTTClient() 
mqttclient.on_message = db.onMessage 
mqttclient.loop_forever() 

//MySQLDBClass.py 

def __init__(self): 

     self.insertcounter = 0 
     self.insertStatement = '' 
     self.bulkpayload = '' 
     self.maxInsert = 1000 

    def onMessage(self, client, userdata, msg): 

     if msg.topic.startswith("topic1/"): 
      self.bulkpayload += "(" + msg.payload.decode("utf-8") + "," + datetime + ")," 
     elif msg.topic.startswith("topic2/"): 
      self.insertStatement += "INSERT INTO mydatabase.table1 VALUES (" + msg.payload.decode("utf-8") + "," + datetime + ");" 
     elif msg.topic.startswith("topic3/") 
      self.insertStatement += "INSERT INTO mydatabase.table2 VALUES (" +msg.payload.decode("utf-8") + "," + datetime + ");" 
     elif msg.topic.startswith("messages"): 
      self.insertStatement += "INSERT INTO mydatabase.table3 VALUES ('" + msg.topic + "'," + msg.payload.decode("utf-8") + "," + datetime + ");" 
     else: 
      return # do not store in DB 

     self.insertcounter += 1 

     if (self.insertcounter > self.maxInsert): 
      if (self.bulkpayload != ''): 
       self.insertStatement += "INSERT INTO mydatabase.table4 VALUES" + self.bulkpayload + ";"  
       self.bulkpayload = '' 

      cursor.execute(self.insertStatement) 
      cursor.commit() 
      print (cursor.rowcount) #prints always count as one , expecting bulk count 
      self.insertcounter = 0 
      self.insertStatement = '' 

답변