2017-11-15 1 views
0

다음 rabbitmq에서 내부적으로 메시지를 저장하는 방법과 원래 형식으로 다시 검색하는 방법은 무엇입니까? 내가 파이썬을 다음과 같이 rabbitmq 메시지를 게시하려고

import findspark 
findspark.init("/home/spark/spark-2.2.0") 
from pyspark.sql import SparkSession 
import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='task_queue_durable', durable=True) 
message="Hello_Hbase!" 
channel.basic_publish(exchange='',routing_key='queue1',body=message) 
print(" [x] Sent %r" % message) 
connection.close() 

을 script--,이 가입자 스크립트입니다. 내가 routing_queue='queue1'에서 메시지를 검색하고 다른 곳에서 그 메시지를 저장할하려는이 스크립트 ..에서
import findspark 
findspark.init("/home/spark/spark-2.2.0") 
from pyspark.sql import SparkSession 

import time 
import pika 
import happybase 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.queue_declare(queue='queue1', durable=True) 
print(' [*] Waiting for messages. To exit press CTRL+C') 

connection = happybase.Connection(host='localhost', port=9090) 
table = connection.table('blogpost') 
print(connection.tables()) 

    def callback(ch, method, body): 
     print(" [x] Received %r" % body) 
     time.sleep(body.count('.')) 
     print(" [x] Done") 
     ch.basic_ack(delivery_tag=method.delivery_tag) 
     #return(body) 

channel.basic_qos(prefetch_count=1) 
body=channel.basic_consume(callback,queue='queue1') 
print(body) ## here it is giving some encrypted msg, how to retrieve in original form 
# here in the body,I am getting this - ctag1.587a9ab83301436195fc3f653c2f6db0 
table.put('1', {'post:status': body}) 
print("hbase insertion done") 
#channel.start_consuming() 

누군가가 나에게 원래의 형태로 rabbitmq 대기열에서 MSG를 검색하는 방법을 알려 수

?

+0

받은 메시지를 표시하면 도움이됩니다. –

+0

인쇄 (본문) 여기에 내가 이것을 얻고 있습니다 - ctag1.587a9ab83301436195fc3f653c2f6db0 – andy

답변

0

basic_consume 메시지 본문을 반환하지 않습니다.

이 매개 변수는 이미 올바르게 사용하고있는 body 매개 변수로 콜백 함수에서 액세스 할 수 있습니다. 데이터베이스 조작도해야합니다.

관련 문제