本文共 3595 字,大约阅读时间需要 11 分钟。
在对远程的MSMQ访问的时候,发现一个奇怪的现象
static string path = @"FormatName:DIRECT=TCP:sha-db\private$\mt1";
//check is existed and create
if (MessageQueue.Exists(path) == false) { MessageQueue.Create(path,false); }就如下代码,会报错,队列路径非法类似的错误,google了很多,都没有个解决方法。后来查到篇个回帖
说 MessageQueue.Exists 文档中说:Exists方法不支持 前缀。Exists 方法不能验证远程队列。参考:
文中还提供了其他可以实现的方法。
虽然Exists方法会失败,但是不影响它发送消息,只要路径写对,照样可以发送出去。
---------------------------------------------------------------------------------
还有一个奇怪的现象。
使用for语句插入2000条消息,在消息队列的窗口最多只能看到1000条,当你取出一条后,才能看到后一条。
估计是设计使然。
---------------------------------------------------------------------------------
下面的方法是一些示例代码,主要是多线程发消息和去消息
多线程
//watch.Restart();
//Thread t1 = new Thread(send); //Thread t2 = new Thread(send); //Thread t3 = new Thread(send); //Thread t4 = new Thread(send); //t1.Start(); //t2.Start(); //t3.Start(); //t4.Start();//t4.Join();
//t1.Join(); //t2.Join(); //t3.Join(); //t4.Join(); //watch.Stop(); //SaveFile(@"c:\msmqlog.txt", "多线程发送" + j + "条消息,每条大小约40k,时间:" + watch.Elapsed.ToString());
委托异步执行
// Action a = new Action(() =>
// { // for (int i = 0; i < 5000; i++) // { // Console.WriteLine(i); // queue.Send(s, transactionType); // } // queue.Close(); // });//watch.Restart();
//IAsyncResult ia= a.BeginInvoke((ar) => { // SaveFile(@"c:\msmqlog.txt", "异步发送" + j + "条消息,每条大小约40k,时间:" + watch.Elapsed.ToString()); // }, null); // a.EndInvoke(ia);同步的读取消息
//int index = 0;
//MessageEnumerator me = queue.GetMessageEnumerator2(); //watch.Restart(); //while (me.MoveNext()) //{ // Console.Write(me.Current.Id + "-"); // Console.WriteLine(++index); // me.RemoveCurrent(); // me.Reset(); //} //watch.Stop(); //SaveFile(@"c:\msmqlog.txt", "删除" + index + "条消息,每条大小约40k,时间:" + watch.Elapsed.ToString());
多线程读取消息
Thread t1 = new Thread(() => receive("1"));//使用拉姆达表达式往线程中传参数
Thread t2 = new Thread(() => receive("2")); Thread t3 = new Thread(() => receive("3")); Thread t4 = new Thread(() => receive("4"));t1.Start();
t2.Start(); t3.Start(); t4.Start();t1.Join();
t2.Join(); t3.Join(); t4.Join();Console.WriteLine();
}
//GetMessageEnumerator2()方式取消息,
//在并发情况下,可能游标所指定的消息被其他线程删除,抛异常。
static public void GetMessage(string i)
{ MessageQueue queue = new System.Messaging.MessageQueue(path); MessageEnumerator me = queue.GetMessageEnumerator2(); int index = 0; while (me.MoveNext()) { try { Console.Write(me.Current.Id + "-"); Console.WriteLine(++index); string s = me.Current.Id + "-"; SaveFile(@"c:\" + i + ".txt", s + index); me.RemoveCurrent(); me.Reset(); } catch (Exception ex) { SaveFile(@"c:\" + i + ".txt", ex.Message); } } }//Receive方式取消息
static public void receive(string i)
{ MessageQueue queue = new System.Messaging.MessageQueue(path);while (true)
{ try { Message me= queue.Receive(new TimeSpan(0,0,1)); me.Formatter = new System.Messaging.XmlMessageFormatter((new Type[] { typeof(string) })); Console.WriteLine(me.Id + "-"); SaveFile(@"c:\" + i + ".txt", me.Id); } catch (MessageQueueException ex1) { if (ex1.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout) { Console.WriteLine("MessageQueueErrorCode.IOTimeout"); SaveFile(@"c:\" + i + ".txt", "MessageQueueErrorCode.IOTimeout"); Thread.Sleep(5000); } } catch (Exception ex) { Console.WriteLine(ex.Message); SaveFile(@"c:\" + i + ".txt", ex.Message); } } }static public void send()
{ string s = ReadFile(@"c:\pp.txt"); MessageQueueTransactionType transactionType = MessageQueueTransactionType.None; MessageQueue queue = new System.Messaging.MessageQueue(path); //int j = 10000; for (int i = 0; i < 1250; i++) { Console.WriteLine(i); queue.Send(s, transactionType); } queue.Close(); }转载地址:http://osuna.baihongyu.com/