Every line of 'golang rabbitmq' code snippets is scanned for vulnerabilities by our powerful machine learning engine that combs millions of open source libraries, ensuring your Go code is secure.
92 func rabbitmqReceive(r *Rabbitmq) { 93 loop: 94 for { 95 deliveries, _ := r.sub.channel.Consume( 96 r.queue, 97 r.sub.tag, 98 false, 99 false, 100 false, 101 false, 102 nil, 103 ) 104 105 for d := range deliveries { 106 if r.handler.ReceiveMessage(d.Body) { 107 break loop 108 } 109 } 110 } 111 }
17 func main() { 18 conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") 19 failOnError(err, "Failed to connect to RabbitMQ") 20 defer conn.Close() 21 22 ch, err := conn.Channel() 23 failOnError(err, "Failed to open a channel") 24 defer ch.Close() 25 26 err = ch.ExchangeDeclare( 27 "logs_topic", // name 28 "topic", // type 29 true, // durable 30 false, // auto-deleted 31 false, // internal 32 false, // no-wait 33 nil, // arguments 34 ) 35 failOnError(err, "Failed to declare an exchange") 36 37 body := bodyFrom(os.Args) 38 err = ch.Publish( 39 "logs_topic", // exchange 40 severityFrom(os.Args), // routing key 41 false, // mandatory 42 false, // immediate 43 amqp.Publishing{ 44 ContentType: "text/plain", 45 Body: []byte(body), 46 }) 47 failOnError(err, "Failed to publish a message") 48 49 log.Printf(" [x] Sent %s", body) 50 }
911 func containerRabbitmq(containers []corev1.Container) corev1.Container { 912 for _, container := range containers { 913 if container.Name == "rabbitmq" { 914 return container 915 } 916 } 917 return corev1.Container{} 918 }
43 func main() { 44 var env envConfig 45 if err := envconfig.Process("", &env); err != nil { 46 log.Fatal("[ERROR] Failed to process env var: ", err) 47 os.Exit(1) 48 } 49 connStr := fmt.Sprintf("amqp://%s:%s@%s", env.Username, env.Password, env.Broker) 50 51 time.Sleep(1 * time.Minute) 52 53 conn, err := amqp.Dial(connStr) 54 failOnError(err, "Failed to connect to RabbitMQ") 55 defer conn.Close() 56 57 ch, err := conn.Channel() 58 failOnError(err, "Failed to open a channel") 59 defer ch.Close() 60 61 err = ch.ExchangeDeclare( 62 "logs", // name 63 "fanout", // type 64 true, // durable 65 false, // auto-deleted 66 false, // internal 67 false, // no-wait 68 nil, // arguments 69 ) 70 failOnError(err, "Failed to declare an exchange") 71 72 for i := 0; i < env.Count; i++ { 73 body := fmt.Sprintf(`{ "id": %d, "message": "Hello, World!" }`, i) 74 err = ch.Publish( 75 "logs", // exchange 76 "", // routing key 77 false, // mandatory 78 false, // immediate 79 amqp.Publishing{ 80 ContentType: "text/plain", 81 Body: []byte(body), 82 }) 83 failOnError(err, "Failed to publish a message") 84 log.Printf(" [x] Sent %s", body) 85 time.Sleep(50 * time.Millisecond) 86 } 87 }
19 func NewRabbitMq(queueName, exchage, key string) *RabbitMQ { 20 host := Conf.MQconf.Host 21 port := Conf.MQconf.Port 22 username := Conf.MQconf.Username 23 password := Conf.MQconf.Password 24 // MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost 25 MQurl := "amqp://" + username + ":" + password + "@" + host + ":" + port + "/" 26 27 rabbitMq := &RabbitMQ{ 28 Queuename: queueName, 29 Exchange: exchage, 30 Key: key, 31 MQurl: MQurl, 32 } 33 var err error 34 rabbitMq.conn, err = amqp.Dial(rabbitMq.MQurl) 35 rabbitMq.failOnErr(err, "连接MQ错误") 36 37 rabbitMq.channel, err = rabbitMq.conn.Channel() 38 rabbitMq.failOnErr(err, "获取channel失败") 39 40 return rabbitMq 41 }
101 func amqpInit() { 102 /** START Queue **/ 103 viper.BindEnv("queueDurable", "QUEUE_DURABLE") 104 viper.BindEnv("queueAutoDelete", "QUEUE_AUTO_DELETE") 105 viper.SetDefault("queueDurable", true) 106 viper.SetDefault("queueAutoDelete", false) 107 /** END Queue **/ 108 109 /** START Consume **/ 110 viper.BindEnv("consumer", "CONSUMER") 111 viper.BindEnv("consumerNoWait", "CONSUMER_NO_WAIT") 112 viper.SetDefault("consumer", "genesis") 113 viper.SetDefault("consumerNoWait", false) 114 /** END Consume **/ 115 116 /** START Publish **/ 117 viper.BindEnv("publishMandatory", "PUBLISH_MANDATORY") 118 viper.BindEnv("publishImmediate", "PUBLISH_IMMEDIATE") 119 viper.BindEnv("exchange", "EXCHANGE") 120 viper.SetDefault("exchange", "") 121 viper.SetDefault("publishMandatory", false) 122 viper.SetDefault("publishImmediate", false) 123 /** END Publish **/ 124 125 /** START AMQPEndpoint **/ 126 viper.BindEnv("queueProtocol", "QUEUE_PROTOCOL") 127 viper.BindEnv("queueUser", "QUEUE_USER") 128 viper.BindEnv("queuePassword", "QUEUE_PASSWORD") 129 viper.BindEnv("queueHost", "QUEUE_HOST") 130 viper.BindEnv("queuePort", "QUEUE_PORT") 131 viper.BindEnv("queueVHost", "QUEUE_VHOST") 132 133 viper.SetDefault("queueProtocol", "amqp") 134 viper.SetDefault("queueUser", "user") 135 viper.SetDefault("queuePassword", "password") 136 viper.SetDefault("queueHost", "localhost") 137 viper.SetDefault("queuePort", 5672) 138 viper.SetDefault("queueVHost", "/test") 139 /** END AMQPEndpoint **/ 140 }
47 func readRabbitBlock() { 48 scheme := "http" 49 host := "localhost:15672" 50 path := "/api/queues/TEST/AQUEUE/get" //"/api/queues/%2F/AQUEUE/get" 51 postData := fmt.Sprintf("{\"count\": \"10000\",\"encoding\": \"auto\", \"queue\": \"AQUEUE\", \"vhost\": \"%s\", \"arguments\": {}, \"requeue\": \"false\"}", *rabbitVHost) 52 fmt.Printf("postData=%s", postData) 53 req, _ := http.NewRequest("POST", "", bytes.NewReader([]byte(postData))) 54 req.URL = &url.URL{ 55 Scheme: scheme, 56 Host: host, 57 Opaque: path, 58 } 59 req.Header.Set("User-Agent", "http2amqp") //default 60 req.SetBasicAuth("guest", "guest") //user,pass 61 cli := &http.Client{} 62 resp, err := cli.Do(req) 63 if err != nil { 64 fmt.Printf("error=%v\n", err) 65 } else { 66 body, _ := ioutil.ReadAll(resp.Body) 67 fmt.Printf("body=%s", string(body[:10])) 68 } 69 }
146 func RabbitQueueConsumer(ch *amqp.Channel, config QueueConfig) <-chan amqp.Delivery { 147 msgs, err := ch.Consume( 148 config.Name, 149 config.Exchange, 150 config.AutoAck, 151 config.Exclusive, 152 config.Local, 153 config.Wait, 154 nil, 155 ) 156 157 if err != nil { 158 panic(err.Error()) 159 } 160 161 return msgs 162 }
18 func Init() { 19 if config.Config.RabbitMQ.Enable { 20 dial(config.Config.RabbitMQ.Addr) 21 go Consume(config.Config.RabbitMQ.Addr, config.Config.RabbitMQ.Queue) 22 } 23 }
14 func main() { 15 c := turnpike.NewClient() 16 err := c.Connect("ws://127.0.0.1:8080/ws", "http://localhost/") 17 if err != nil { 18 panic("Error connecting:" + err.Error()) 19 } 20 21 c.Subscribe("event:test", testHandler) 22 23 for { 24 c.Publish("event:test", "test") 25 <-time.After(time.Second) 26 } 27 }