博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ 原文译05--Topics
阅读量:4969 次
发布时间:2019-06-12

本文共 4838 字,大约阅读时间需要 16 分钟。

在之前的系统中,我们改进了我们的日志系统,我们使用direct 交换机代替fanout交换机,可以实现选择性的接受日志。

 虽然使用direct 交换机改进了我们的系统,但是对于多种条件的判断,依然存在问题。如我们不仅仅想要根据日志的级别来订阅日志,同时也希望可以通过发出日志的源(即日志的生产者)来订阅。你也许已经通过unix 的工具syslog知道了这个概念,它同时通过级别(info/warn/crit...)和源(auth/cron/kern...).这种方式给了我们很大的灵活性--我们可以同时监听来自cron的critical 级别的错误消息和来自kern的所有消息。

为了在我们的系统中实现这种功能,我们需要学习更为复杂的交换机类型 --topic

Topic 交换机

发送到topic的交换机不能是任意routing_key,而必须是一系列使用"."。这些单词可以是任意的单子,单通常情况是是跟当前消息有关的一些功能,例如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".你可以指定任意多个字符,单上线是255字节。

绑定的key必须也是同样个格式,topic交换机背后的原理类似于把带有特定key的消息发送到所有跟这个key相匹配的队列上去,然而对于bindingKey这里有两个特殊的情况:

1:"*"可以代表一个单词(是单词不是字符--即"."分割的单词)

2:"?"可以代表0个或多个单词

例如:

在这个例子中,我们将要发送描述动物的消息,消息的routingkey将会有3部分组成,routingkey的第一个单词描述的是速度,第二个单词描述的是颜色,第三个单词是特殊描述: "<speed>.<colour>.<species>"。

我们创建了三个绑定,Q1使用bingingkey"*.orange.*",Q2使用"*.*.rabbit" 和"lazy.#"

绑定总结如下:

Q1所有的orange动物感兴趣

Q2想要所有的关于rabbits的消息,和所有的敢于layz动物的消息。

一个带有"quick.orange.rabbit"routingkey的消息会两个队列都接收,"lazy.orange.elephant"的消息同业也会被转发到两个队列上。而"quick.orange.fox"的消息仅仅进入Q1,"lazy.brown.fox"的消息只能被Q2接收。"lazy.pink.rabbit"的消息将再次被转发到Q2上,即使它匹配到了队列2的两个bingdingKey。"quick.brown.fox"的消息不会匹配到任何绑定,所有消息将会被忽略。

如果我们打破我们的约定发送带有4个单词的消息将会发生什么,例如"orange" 或者"quick.orange.male.rabbit"?答案是这些routingkey将不会匹配到任何的bingdingkey,因此将会被忽略

但是"lazy.orange.male.rabbit",因为匹配到了"lazy.#"bingdingkey,所有可以被Q2接收,即使其有4个单词。

Topic 交换机:Topic 交换机功能非常强大,可以完成像其他交换机那样工作。

当队列的bingdingkey使用"#" 时,它将会接收所有的消息,不论routingkey是什么--像fanout交换机那样

当特殊的"*"和"#"都没在bindingkey中使用时,它的行为就像direct交换机那样工作。

汇总

我没将要在我们的日志系统中使用"topic"交换机,我们将定我们的日志系统的routingkey有2个单词:"<facility>.<severity>"

代码和之前的案例基本上是一样的。

 EmitLogTopic.cs:

using System;using System.Linq;using RabbitMQ.Client;using System.Text;class EmitLogTopic{    public static void Main(string[] args)    {        var factory = new ConnectionFactory() { HostName = "localhost" };        using(var connection = factory.CreateConnection())        using(var channel = connection.CreateModel())        {            channel.ExchangeDeclare(exchange: "topic_logs",                                    type: "topic");            var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";            var message = (args.Length > 1)                          ? string.Join(" ", args.Skip( 1 ).ToArray())                          : "Hello World!";            var body = Encoding.UTF8.GetBytes(message);            channel.BasicPublish(exchange: "topic_logs",                                 routingKey: routingKey,                                 basicProperties: null,                                 body: body);            Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);        }    }}
View Code

ReceiveLogsTopic.cs:

using System;using RabbitMQ.Client;using RabbitMQ.Client.Events;using System.Text;class ReceiveLogsTopic{    public static void Main(string[] args)    {        var factory = new ConnectionFactory() { HostName = "localhost" };        using(var connection = factory.CreateConnection())        using(var channel = connection.CreateModel())        {            channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");            var queueName = channel.QueueDeclare().QueueName;            if(args.Length < 1)            {                Console.Error.WriteLine("Usage: {0} [binding_key...]",                                        Environment.GetCommandLineArgs()[0]);                Console.WriteLine(" Press [enter] to exit.");                Console.ReadLine();                Environment.ExitCode = 1;                return;            }            foreach(var bindingKey in args)            {                channel.QueueBind(queue: queueName,                                  exchange: "topic_logs",                                  routingKey: bindingKey);            }            Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");            var consumer = new EventingBasicConsumer(channel);            consumer.Received += (model, ea) =>            {                var body = ea.Body;                var message = Encoding.UTF8.GetString(body);                var routingKey = ea.RoutingKey;                Console.WriteLine(" [x] Received '{0}':'{1}'",                                  routingKey,                                  message);            };            channel.BasicConsume(queue: queueName,                                 noAck: true,                                 consumer: consumer);            Console.WriteLine(" Press [enter] to exit.");            Console.ReadLine();        }    }}
View Code

运行下面的案例:

接收所有的日志:

ReceiveLogsTopic.exe "#"

接收所有的"kern"源的日志:

ReceiveLogsTopic.exe "kern.*"

或者仅仅接受"critical"的日志:

ReceiveLogsTopic.exe "*.critical"

你可以创建多个绑定:

ReceiveLogsTopic.exe "kern.*" "*.critical"

发送一个routingkey是 "kern.critical"的消息

EmitLogTopic.exe "kern.critical" "A critical kernel error"

 

转载于:https://www.cnblogs.com/grayguo/p/5581323.html

你可能感兴趣的文章
MySQL数据迁移到SQL Server
查看>>
复杂链表的复制(python)
查看>>
添加日期选择控件
查看>>
jquery.cookie.js操作cookie
查看>>
javascript遍历数组
查看>>
bzoj4765: 普通计算姬 (分块 && BIT)
查看>>
thinkphp5-----模板中函数的使用
查看>>
POJ-3211 Washing Clothes[01背包问题]
查看>>
Codeforces 518 D Ilya and Escalator
查看>>
[BZOJ4832][Lydsy1704月赛]抵制克苏恩
查看>>
Cheatsheet: 2012 07.11 ~ 07.18
查看>>
Cheatsheet: 2013 08.01 ~ 08.13
查看>>
XCode5无法设置Deployment Target的解决办法
查看>>
关于charles连接手机的配置
查看>>
关于xUtils框架的个人理解
查看>>
SpringMVC案例1——对User表进行CRUD操作
查看>>
用jquery制作一个简单的导航栏
查看>>
MFC中ClistCtrl的=NM_CUSTOMDRAW消息
查看>>
如何去激励自己
查看>>
Crontab定时任务
查看>>