2016-10-03 2 views
0

RabbitMQ pub/sub tutorial을 아래의 더미 테스트로 변환했습니다. 어떻게 든 예상대로 작동하지 않습니다.RabbitMQ pub/sub 구현이 작동하지 않습니다.

amqpURL은 유효한 AMQP 서비스 (예 : RabbitMQ) URL입니다. 내가 대기열 예제와 함께 그것을 테스트하고 작동합니다. 어떻게 든 그것은 "교환"에 실패합니다

나는 TestDummy가 "[Hello World]"를 기록 할 것으로 기대합니다. 어떻게 든 그것은 일어나지 않고있다. 보내는 절반 만 예상대로 작동합니다.

무엇이 잘못 되었습니까?

import (
    "fmt" 
    "log" 
    "testing" 

    "github.com/streadway/amqp" 
) 

func TestDummy(t *testing.T) { 
    done := exchangeReceive() 
    exchangeSend("Hello World") 
    <-done 
} 

func exchangeSend(msg string) { 
    failOnError := func(err error, msg string) { 
     if err != nil { 
      log.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    log.Printf("exchangeSend: connect %s", amqpURL) 
    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    body := []byte(msg) 
    err = ch.Publish(
     "logs", // exchange 
     "",  // routing key 
     false, // mandatory 
     false, // immediate 
     amqp.Publishing{ 
      ContentType: "text/plain", 
      Body:  []byte(body), 
     }) 
    failOnError(err, "Failed to publish a message") 

    log.Printf(" [x] Sent %s", body) 
} 

func exchangeReceive() <-chan bool { 

    done := make(chan bool) 

    failOnError := func(err error, msg string) { 
     if err != nil { 
      log.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    log.Printf("exchangeReceive: connect %s", amqpURL) 
    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    q, err := ch.QueueDeclare(
     "", // name 
     false, // durable 
     false, // delete when usused 
     true, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.QueueBind(
     q.Name, // queue name 
     "",  // routing key 
     "logs", // exchange 
     false, 
     nil) 
    failOnError(err, "Failed to bind a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     true, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    go func() { 
     for d := range msgs { 
      log.Printf(" [x] %s", d.Body) 
      done <- true 
     } 
    }() 

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C") 

    return done 
} 

답변

0

여기에 몇 가지 어리석은 실수. exchangeRecieve가 끝나면 연기 명령이 실행되고 연결이 닫힙니다. 그래서 재 작성이 실패하는 것입니다.

나는 이런 식으로 내 코드를 변경했습니다 그것은 일 :

import (
    "fmt" 
    "os" 
    "testing" 
    "time" 

    "github.com/streadway/amqp" 
) 

func TestDummy(t *testing.T) { 
    amqpURL := os.Getenv("CLOUDAMQP_URL") 
    t.Logf(" [*] amqpURL: %s", amqpURL) 

    results1 := exchangeReceive(t, "consumer 1", amqpURL) 
    results2 := exchangeReceive(t, "consumer 2", amqpURL) 
    time.Sleep(50 * time.Millisecond) 

    exchangeSend(t, amqpURL, "Hello World") 
    if want, have := "Hello World", <-results1; want != have { 
     t.Errorf("expected %#v, got %#v", want, have) 
    } 
    if want, have := "Hello World", <-results2; want != have { 
     t.Errorf("expected %#v, got %#v", want, have) 
    } 
} 

func exchangeReceive(t *testing.T, name, amqpURL string) <-chan string { 

    out := make(chan string) 

    failOnError := func(err error, msg string) { 
     if err != nil { 
      t.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    q, err := ch.QueueDeclare(
     "", // name 
     false, // durable 
     false, // delete when usused 
     true, // exclusive 
     false, // no-wait 
     nil, // arguments 
    ) 
    failOnError(err, "Failed to declare a queue") 

    err = ch.QueueBind(
     q.Name, // queue name 
     "",  // routing key 
     "logs", // exchange 
     false, 
     nil) 
    failOnError(err, "Failed to bind a queue") 

    msgs, err := ch.Consume(
     q.Name, // queue 
     "",  // consumer 
     true, // auto-ack 
     false, // exclusive 
     false, // no-local 
     false, // no-wait 
     nil, // args 
    ) 
    failOnError(err, "Failed to register a consumer") 

    go func() { 
     for d := range msgs { 
      t.Logf(" [x] %s received: %s", name, d.Body) 
      out <- string(d.Body) 
     } 
    }() 

    t.Logf(" [*] %s ready to receive", name) 
    return out 
} 

func exchangeSend(t *testing.T, amqpURL, msg string) { 
    failOnError := func(err error, msg string) { 
     if err != nil { 
      t.Fatalf("%s: %s", msg, err) 
      panic(fmt.Sprintf("%s: %s", msg, err)) 
     } 
    } 

    conn, err := amqp.Dial(amqpURL) 
    failOnError(err, "Failed to connect to RabbitMQ") 
    defer conn.Close() 

    ch, err := conn.Channel() 
    failOnError(err, "Failed to open a channel") 
    defer ch.Close() 

    err = ch.ExchangeDeclare(
     "logs", // name 
     "fanout", // type 
     true,  // durable 
     false, // auto-deleted 
     false, // internal 
     false, // no-wait 
     nil,  // arguments 
    ) 
    failOnError(err, "Failed to declare an exchange") 

    body := []byte(msg) 
    err = ch.Publish(
     "logs", // exchange 
     "",  // routing key 
     false, // mandatory 
     false, // immediate 
     amqp.Publishing{ 
      ContentType: "text/plain", 
      Body:  []byte(body), 
     }) 
    failOnError(err, "Failed to publish a message") 

    t.Logf(" [x] Sent %s", body) 
} 
관련 문제