ZmqManager.cs 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. using GSG.NET.Concurrent;
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading.Tasks;
  7. using ZeroMQ;
  8. namespace OHVDriveSimulator
  9. {
  10. public class ZmqManager
  11. {
  12. ZContext ctx = new ZContext();
  13. ZSocket rep = null;
  14. ZSocket pub = null;
  15. ZSocket sub = null;
  16. ThreadCancel threadCancel = new ThreadCancel();
  17. public ZmqManager()
  18. {
  19. CreateZeroMQ();
  20. }
  21. void CreateZeroMQ()
  22. {
  23. this.pub = new ZSocket( ctx, ZSocketType.PUB );
  24. //this.pub.Bind( "tcp://127.0.0.1:5566" );
  25. this.pub.Bind( "tcp://127.0.0.1:5565" );
  26. this.sub = new ZSocket( ctx, ZSocketType.SUB );
  27. this.sub.Connect( "tcp://127.0.0.1:5565" );
  28. this.sub.SubscribeAll();
  29. this.rep = new ZSocket( ctx, ZSocketType.REP );
  30. rep.Bind( "tcp://127.0.0.1:5567" );
  31. this.threadCancel.AddGo( Th_Respens );
  32. this.threadCancel.AddGo( Th_Publish );
  33. }
  34. void Th_Respens()
  35. {
  36. while ( !this.threadCancel.Canceled )
  37. {
  38. ZMessage message;
  39. ZError error;
  40. if ( null != (message = this.sub.ReceiveMessage(out error ) ))
  41. {
  42. using ( message )
  43. {
  44. Console.WriteLine( $"{message[0].ReadString()} / {message[1].ReadString()}" );
  45. }
  46. }
  47. else
  48. {
  49. if ( error == ZError.ETERM )
  50. return; // Interrupted
  51. throw new ZException( error );
  52. }
  53. //if ( null != ( message = this.rep.ReceiveMessage( out error ) ) )
  54. //{
  55. // using ( message )
  56. // {
  57. // Console.WriteLine( $"{message}" );
  58. // rep.Send( new ZFrame( "rep" ) );
  59. // }
  60. //}
  61. //else
  62. //{
  63. // if ( error == ZError.ETERM )
  64. // return; // Interrupted
  65. // throw new ZException( error );
  66. //}
  67. }
  68. }
  69. void Th_Publish()
  70. {
  71. while ( !this.threadCancel.Canceled )
  72. {
  73. LockUtils.Wait( 1000 );
  74. var msg = new ZMessage();
  75. msg.Add( new ZFrame( "1000" ) );
  76. msg.Add( new ZFrame( "Test" ) );
  77. this.pub.Send( msg );
  78. //this.pub.SendMore( new ZFrame( "1000" ) );
  79. //this.pub.Send( new ZFrame( "Test Publish" ) );
  80. }
  81. }
  82. void Dispese()
  83. {
  84. this.threadCancel.Cancel();
  85. }
  86. }
  87. }