博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一起谈.NET技术,NET下RabbitMQ实践 [示例篇]
阅读量:7112 次
发布时间:2019-06-28

本文共 5749 字,大约阅读时间需要 19 分钟。

      在中,介绍了在window环境下安装erlang,rabbitmq-server,以免配置用户,权限,虚拟机等内容。今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。  

      首先,我们下载官方的.net客户端软件,链接:。下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用:

RabbitMQ.Client.dll //基于的发布订阅消息的功能类   
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类

       如下图:          接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下:  

       首先是ProducerMQ:

public
 
class
 ProducerMQ
{
    
public
 
static
  
void
 InitProducerMQ()
    {
        Uri uri 
=
 
new
 Uri(
"
amqp://10.0.4.85:5672/
"
);
        
string
 exchange 
=
 
"
ex1
"
;
        
string
 exchangeType 
=
 
"
direct
"
;
        
string
 routingKey 
=
 
"
m1
"
;
        
bool
 persistMode 
=
 
true
;
        ConnectionFactory cf 
=
 
new
 ConnectionFactory();
      
        cf.UserName 
=
 
"
daizhj
"
;
        cf.Password 
=
 
"
617595
"
;
        cf.VirtualHost 
=
 
"
dnt_mq
"
;
        cf.RequestedHeartbeat 
=
 
0
;
        cf.Endpoint 
=
 
new
 AmqpTcpEndpoint(uri);
        
using
 (IConnection conn 
=
 cf.CreateConnection())
        {
            
using
 (IModel ch 
=
 conn.CreateModel())
            {
                
if
 (exchangeType 
!=
 
null
)
                {
                    ch.ExchangeDeclare(exchange, exchangeType);
//
,true,true,false,false, true,null);
                    ch.QueueDeclare(
"
q1
"
true
);
//
true, true, true, false, false, null);
                    ch.QueueBind(
"
q1
"
"
ex1
"
"
m1
"
false
null
); 
                }
                IMapMessageBuilder b 
=
 
new
 MapMessageBuilder(ch);
                IDictionary target 
=
 b.Headers;
                target[
"
header
"
=
 
"
hello world
"
;
                IDictionary targetBody 
=
 b.Body;
                targetBody[
"
body
"
=
 
"
daizhj
"
;
                
if
 (persistMode)
                {
                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode 
=
 
2
;
                }
             
                ch.BasicPublish(exchange, routingKey,
                                           (IBasicProperties)b.GetContentHeader(),
                                           b.GetContentBody());
            }
        }
    }
}

  下面对上面代码进行说明:

  1.  定义要链接的rabbitmq-server地址(基于amqp协议):

Uri uri = new Uri("amqp://10.0.4.85:5672/");

  2.  定义交换方式       

string
 exchange 
=
 
"
ex1
"
;
string
 exchangeType 
=
 
"
direct
"
;
string
 routingKey 
=
 
"
m1
"
;

  说明:rabbitmq交换方式分为三种,分别是:

  Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
  Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
  Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
  更多内容参见:  
  3. 是否对消息队列持久化保存       

bool
 persistMode 
=
 
true
;

  4. 使用ConnectionFactory创建连接,虽然创建时指定了多个server address,但每个connection只与一个物理的server进行连接。

       ConnectionFactory cf 
=
 
new
 ConnectionFactory();    
        
//
使用前文的配置环境信息  
        cf.UserName 
=
 
"
daizhj
"
        cf.Password 
=
 
"
617595
"
;
        cf.VirtualHost 
=
 
"
dnt_mq
"
;
        cf.RequestedHeartbeat 
=
 
0
;
        cf.Endpoint 
=
 
new
 AmqpTcpEndpoint(uri);

     5. 实例化IConnection对象,并设置交换方式:

 
using
 (IConnection conn 
=
 cf.CreateConnection())
            {
                
using
 (IModel ch 
=
 conn.CreateModel())
                {
                    
if
 (exchangeType 
!=
 
null
)
                    {
                        ch.ExchangeDeclare(exchange, exchangeType);
//
,true,true,false,false, true,null);
                        ch.QueueDeclare(
"
q1
"
true
);
//
true, true, true, false, false, null);
                        ch.QueueBind(
"
q1
"
"
ex1
"
"
m1
"
false
null
); 
                    }
        ....

  6. 构造消息实体对象并发布到消息队列上:     

  IMapMessageBuilder b 
=
 
new
 MapMessageBuilder(ch);
  IDictionary target 
=
 b.Headers;
  target[
"
header
"
=
 
"
hello world
"
;
  IDictionary targetBody 
=
 b.Body;
  targetBody[
"
body
"
=
 
"
daizhj
"
;
  
if
 (persistMode)
  {
    ((IBasicProperties)b.GetContentHeader()).DeliveryMode 
=
 
2
;
  }
  
//
简单发布方式
  ch.BasicPublish(exchange, routingKey,
          (IBasicProperties)b.GetContentHeader(),
          b.GetContentBody());

  这样就完成了单条消息的发布。 下面是CustmerMq.cs(用于消费消息)实例代码:    

public
 
class
 CustmerMq
    {
        
public
 
static
 
int
 InitCustmerMq()
        {
            
string
 exchange 
=
 
"
ex1
"
;
            
string
 exchangeType 
=
 
"
direct
"
;
            
string
 routingKey 
=
 
"
m1
"
;
            
string
 serverAddress 
=
 
"
10.0.4.85:5672
"
;
            ConnectionFactory cf 
=
 
new
 ConnectionFactory();
            cf.Address 
=
 serverAddress;
            cf.UserName 
=
 
"
daizhj
"
;
            cf.Password 
=
 
"
617595
"
;
            cf.VirtualHost 
=
 
"
dnt_mq
"
;
            cf.RequestedHeartbeat 
=
 
0
;

  可以看出上面的代码与 ProducerMQ的开头代码类似,下面使用ConnectionFactory来构造链接并接收队列消息:           

 
using
 (IConnection conn 
=
 cf.CreateConnection())
            {
                
using
 (IModel ch 
=
 conn.CreateModel())
                {
                    
//
普通使用方式BasicGet
                    
//
noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
                    
//
noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息
                    BasicGetResult res 
=
 ch.BasicGet(
"
q1
"
false
/*
noAck
*/
);
                    
if
 (res 
!=
 
null
)
                    {
                        
bool
 t 
=
 res.Redelivered;
                        t 
=
 
true
;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag, 
false
);
                    }
                    
else
                    {
                        Console.WriteLine(
"
No message!
"
);
                    }  

  上面代码比较简单,主要是使用BasicGetResult来进行简单的消息接收,并使用BasicAck方式来告之是否从队列中移除该条消息。这一点很重要,因为在某些应用场景下,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。 

  当然上面操作只是用于单条数据操作,如果要遍历队列中所有消息,则需要使用如下方式:

while
 (
true
)
  {
      BasicGetResult res 
=
 ch.BasicGet(
"
q1
"
false
/*
noAck
*/
);
      
if
 (res 
!=
 
null
)
      {
          
try
          {
               
bool
 t 
=
 res.Redelivered;
                        t 
=
 
true
;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag, 
false
);
          }
          
catch
 { }
      }
      
else
          
break
;
  }

  另外,在rabbitmq中,获取消息可以使用两种方式,一种是上面提到的主动获取,另一种是基于订阅模式,即让当前获取消息的线程阻塞,用于绑定到指定的队列上,当有新的消息入队之后,该阻塞线程会被运行,从队列中获取新入队的消息,形如:    

 
//
第二种取法QueueingBasicConsumer基于订阅模式
 QueueingBasicConsumer consumer 
=
 
new
 QueueingBasicConsumer(ch);
 ch.BasicConsume(
"
q1
"
false
null
, consumer);
 
while
 (
true
)
 {
     
try
     {
         BasicDeliverEventArgs e 
=
 (BasicDeliverEventArgs)consumer.Queue.Dequeue();
         IBasicProperties props 
=
 e.BasicProperties;
         
byte
[] body 
=
 e.Body;
         Console.WriteLine(System.Text.Encoding.UTF8.GetString(body));
         
//
ch.BasicAck(e.DeliveryTag, true);
         ProcessRemainMessage();                          
     }
     
catch
 (EndOfStreamException ex) 
     {
         
//
The consumer was removed, either through channel or connection closure, or through the action of IModel.BasicCancel(). 
         Console.WriteLine(ex.ToString());
         
break
;
     }
 }

  这样,就完成了一个简单的发布,消费消息的示例。在接下来的文章中,将会介绍如果基于WCF来发布RABBITMQ服务,敬请关注:) 

转载于:https://www.cnblogs.com/waw/archive/2011/09/01/2162752.html

你可能感兴趣的文章
DHCP租约时间工作原理
查看>>
Qt移动应用开发(六):QML与C++互动
查看>>
svn代码统计工具的金额
查看>>
2015第32周三
查看>>
Codeforces 56D Changing a String 编辑距离 记忆dp
查看>>
Scala 深入浅出实战经典 第62讲:Scala中上下文界定内幕中的隐式参数实战详解...
查看>>
Android应用Design Support Library完全使用实例
查看>>
中通打印助手-实现快递面单快速打印(免费使用)
查看>>
付款页面DEMO
查看>>
Swift - 使用Core Data进行数据持久化存储
查看>>
[转载]服务器和应用系统迁移方案
查看>>
类的专有方法(__init__)
查看>>
open()系统调用的实现
查看>>
java历史集合类对比
查看>>
Java实现字符全阵列阵列
查看>>
媒体类型和字符集
查看>>
iOS keyChain
查看>>
GIT在LINUX下的基本操作
查看>>
关于 android receiver
查看>>
Automysqlbackup: WARNING: Turning off multicore support, since pigz isn’t there.
查看>>