博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ实例教程:路由选择
阅读量:7260 次
发布时间:2019-06-29

本文共 3975 字,大约阅读时间需要 13 分钟。

  在前面的例子中,我们构建了一个简单的日志系统来日志消息通过广播传送到多个接受者。本文将介绍如何订阅消息的子集。比如,我们能够将关键的错误信息写到日志文件中,同时也能够在控制台打印所有的日志消息。

  消息绑定(Bindings)

  在前面的例子中,我们使用下面的代码方式再次绑定。

channel.queueBind(queueName, EXCHANGE_NAME, "");

  绑定表述的是交换机和队列之间的关系,可以理解为队列对交换机中的消息感兴趣。

  绑定的参数是 routingKey,为避免混淆,我们将basic_publish参数称为binding key,下面是使用关键字绑定binding的例子。

channel.queueBind(queueName, EXCHANGE_NAME, "black");

  

  它表示绑定关键字依赖于交换机类型,对于我们之前定义的 fanout 交换机,它会忽略绑定的关键字。即为所有订阅者发送消息。

  Direct 交换机(Direct exchange)

  在前面的例子中,我们将所有消息广播给了所有消费者,现在我们想在原有的基础上做些改进对日志消息进行过滤。比如我们只把严重错误的日志写入日志文件,这样能节省磁盘空间。

  fanout交换机的灵活性比较差,它只是盲目的广播所有消息,现在使用direct交换机代替它。direct交换机的算法非常简单-当绑定关键字与消息的路由关键字匹配时就会将消息传递给队列。

  如上图所示,X交换机绑定了两个队列Q1和Q2。Q1队列绑定了orange关键字,Q2队列绑定green和black关键字。路由中带有orange路由关键字的消息会转发给Q1,带有black或green关键字的消息会转发给Q2,其它的消息就会废弃。

  多重绑定(Multiple bindings)

  一个路由使用同一个绑定代码绑定到多个队列是合法的。上图中,X路由器使用black代码绑定到Q1和Q2。这种情况下,direct交换机会像fanout交换机一样,把消息广播给所有队列。

  提交日志

  我们继续使用日志系统作为我们的模型。我们使用direct交换机来代替fanout交换机,将日志等级severity作为路由代码,这样我们就能根据日志等级决定接收哪些消息。

  (1)创建direct交换机

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

  (2)准备发送消息

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

    

  (3)将日志等级划分为:info,warning和error,根据日志等级创建不同的绑定。

  消息订阅


  接收消息与之前中的例子一样,不同之处在于会对每个 Severity 创建一个新的绑定。

String queueName = channel.queueDeclare().getQueue();for(String severity : argv){      channel.queueBind(queueName, EXCHANGE_NAME, severity);}

  大总结

  EmitLogDirect.java

package com.favccxx.favrabbit;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class EmitLogDirect {	private static final String EXCHANGE_NAME = "direct_logs";	public static void main(String[] argv) throws Exception {		ConnectionFactory factory = new ConnectionFactory();		factory.setHost("localhost");		Connection connection = factory.newConnection();		Channel channel = connection.createChannel();		channel.exchangeDeclare(EXCHANGE_NAME, "direct");		String[] serveritys = {"debug", "info", "warning", "error"};		String[] messages = {"This is a DEBUG message!", "This is a INFO message!", "This is a WARNING message!", "This is a ERROR message!"};				for(int i=0; i

  控制台输出内容如下

 [x] Sent 'debug':'This is a DEBUG message!' [x] Sent 'info':'This is a INFO message!' [x] Sent 'warning':'This is a WARNING message!' [x] Sent 'error':'This is a ERROR message!'

  为了简化代码实现,我们使用了一个for循环来分别接收各种不同严重程度的日志信息。但在一个真实的应用环境中,我们可能需要3~4个接收器。

  ReceiveLogsDirect.java

package com.favccxx.favrabbit;import java.io.IOException;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class ReceiveLogsDirect {	private static final String EXCHANGE_NAME = "direct_logs";	public static void main(String[] argv) throws Exception {		ConnectionFactory factory = new ConnectionFactory();		factory.setHost("localhost");		Connection connection = factory.newConnection();		Channel channel = connection.createChannel();		channel.exchangeDeclare(EXCHANGE_NAME, "direct");		String queueName = channel.queueDeclare().getQueue();		String[] serveritys = { "debug", "info", "warning", "error" };		for (String severity : serveritys) {			channel.queueBind(queueName, EXCHANGE_NAME, severity);			Consumer consumer = new DefaultConsumer(channel) {				@Override				public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,						byte[] body) throws IOException {					String message = new String(body, "UTF-8");					System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");				}			};			channel.basicConsume(queueName, true, consumer);		}	}}

  控制台输出内容如下

 [x] Received 'debug':'This is a DEBUG message!' [x] Received 'info':'This is a INFO message!' [x] Received 'warning':'This is a WARNING message!' [x] Received 'error':'This is a ERROR message!'

转载地址:http://xaodm.baihongyu.com/

你可能感兴趣的文章
如何正确的阅读源代码?
查看>>
剖析APT攻击 绿盟NGTP构建下一代防御体系
查看>>
关于写异步代码测试用例的一些思考
查看>>
JavaScript 各种遍历方式详解
查看>>
斯诺登:我们需要共同让监视再回昂贵时代
查看>>
红星Linux操作系统的大部分代码处于被管控的状态
查看>>
大数据驱动社会治理的创新转向
查看>>
防骚扰诈骗的重要技术是依靠大数据?
查看>>
阳光电源西藏安装20MW太阳能光伏储能微电网电站
查看>>
首个获FDA批准的AI辅助心脏成像系统,是如何诞生的?
查看>>
《R语言编程艺术》——2.4 常用的向量运算
查看>>
大数据时代探究:关于“数据垄断”的几点思考
查看>>
传言是真的:苹果承认1970变砖漏洞,不过可以解决!
查看>>
比特币矿机开发公司Butterfly Labs接受和解
查看>>
全方位挖掘大数据获取超额收益
查看>>
从零开始写一个 redux(第二讲)
查看>>
数据库面试题
查看>>
内核必须懂(六): 使用kgdb调试内核
查看>>
专访阿里云MVP黄胜蓝:90 后 CTO花了6年,改变了你日常生活里的这件事
查看>>
WWW简介
查看>>