Quellcode durchsuchen

Zero MQ Add to VCS

DESKTOP-Kang vor 6 Jahren
Ursprung
Commit
efd3a029da

+ 158 - 0
Dev/OHV/VehicleControlSystem/ControlLayer/MQ/ZmqManager.cs

@@ -0,0 +1,158 @@
+using GSG.NET.Concurrent;
+using GSG.NET.Logging;
+using GSG.NET.ObjectBase;
+using NetMQ;
+using NetMQ.Monitoring;
+using NetMQ.Sockets;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace VehicleControlSystem.ControlLayer.MQ
+{
+    public class ZmqManager : SingletonBase<ZmqManager>, IDisposable
+    {
+        Logger logger = Logger.GetLogger();
+
+        SubscriberSocket sub = null;
+        RequestSocket req = null;
+
+        NetMQPoller poller = null;
+        NetMQMonitor monitor = null;
+
+        ThreadCancel threadCancel = new ThreadCancel();
+
+        private bool isReqConnected;
+
+        public bool IsReqConnected
+        {
+            get { return isReqConnected; }
+            set { isReqConnected = value; }
+        }
+
+
+        private ZmqManager()
+        {
+            NetMQ.NetMQConfig.Cleanup();
+        }
+
+        public void Init()
+        {
+
+        }
+
+        public void CrateClientSocket()
+        {
+            //pub = new PublisherSocket();
+            //pub.Bind( "tcp://127.0.0.1:5565" );
+
+            sub = new SubscriberSocket();
+            sub.Connect( "tcp://127.0.0.1:5565" );
+            sub.Connect( "tcp://127.0.0.1:5566" );
+            sub.Subscribe( "" ); //All
+
+            sub.ReceiveReady += Sub_ReceiveReady;
+
+            req = new RequestSocket();
+            this.monitor = new NetMQMonitor( req, "inproc://rep.inproc", SocketEvents.Disconnected | SocketEvents.Connected );
+            this.monitor.Connected += ( s, a ) => { this.IsReqConnected = true; };
+            this.monitor.Disconnected += ( s, a ) => { this.IsReqConnected = false; };
+
+            this.monitor.StartAsync();
+
+            req.Connect( "tcp://127.0.0.1:5567" );
+
+            this.poller = new NetMQPoller { this.sub };
+            this.poller.RunAsync();
+
+            //this.threadCancel.AddGo( Th_SubPoller );
+        }
+
+        public void Dispose()
+        {
+            this.threadCancel.Cancel();
+
+            this.monitor.Stop();
+            this.monitor.Dispose();
+
+            this.poller.Stop();
+            this.poller.Dispose();
+
+            this.sub.Dispose();
+            this.req.Dispose();
+        }
+
+        private void Sub_ReceiveReady( object sender, NetMQ.NetMQSocketEventArgs e )
+        {
+            logger.I( e.Socket.ReceiveMultipartStrings() );
+        }
+
+        void Th_SubPoller()
+        {
+            while ( !this.threadCancel.Canceled )
+            {
+                LockUtils.Wait( 1000 );
+                //this.pub.SendMoreFrame( "1000" ).SendFrame( "Test" );
+                LockUtils.Wait( 100 );
+
+                NetMQMessage msg = new NetMQMessage();
+                if ( this.sub.TryReceiveMultipartMessage( TimeSpan.FromSeconds( 1 ), ref msg ) )
+                {
+                    var m = msg;
+                }
+            }
+        }
+
+        public string Request( string topic )
+        {
+            //if ( this.req.TrySendFrame( topic ) )
+            //{
+            //    return string.Empty;
+            //}
+            if ( this.IsReqConnected )
+            {
+
+            }
+            if ( this.req.HasOut )
+            {
+                this.req.SendFrame( topic );
+            }
+
+            string outStr = string.Empty;
+            if ( this.req.TryReceiveFrameString( TimeSpan.FromSeconds( 5 ), out outStr ) )
+            {
+                var s = outStr;
+            }
+            else
+            {
+                this.req.Disconnect( "tcp://127.0.0.1:5567" );
+                this.req.Dispose();
+                this.req = null;
+                LockUtils.Wait( 100 );
+                this.req = new RequestSocket();
+                this.req.Connect( "tcp://127.0.0.1:5567" );
+                return string.Empty;
+            }
+
+            //var msg = this.req.ReceiveFrameString();
+
+            //if ( this.req.HasIn )
+            //{
+            //    if ( this.req.TryReceiveFrameString( TimeSpan.FromSeconds( 5 ), out outStr ) )
+            //    {
+            //        var s = outStr;
+            //    }
+            //}
+
+            //if ( this.req.Poll( TimeSpan.FromSeconds( 5 ) ) )
+            //{
+            //    return this.req.ReceiveFrameString();
+            //}
+            //else
+            return string.Empty;
+        }
+
+    }
+}

+ 4 - 0
Dev/OHV/VehicleControlSystem/VehicleControlSystem.csproj

@@ -70,6 +70,9 @@
       <SpecificVersion>False</SpecificVersion>
       <HintPath>..\Assambly\ExcelMapper\GSG.NET.Excel.dll</HintPath>
     </Reference>
+    <Reference Include="NetMQ">
+      <HintPath>..\Assambly\NetMq\NetMQ.dll</HintPath>
+    </Reference>
     <Reference Include="Newtonsoft.Json, Version=6.0.0.0, Culture=neutral, PublicKeyToken=30ad4fe6b2a6aeed, processorArchitecture=MSIL">
       <SpecificVersion>False</SpecificVersion>
       <HintPath>..\Assambly\Newtonsoft.Json.dll</HintPath>
@@ -137,6 +140,7 @@
     <Compile Include="ControlLayer\IO\Lib\MOTION_EziSERVO2_DEFINE.cs" />
     <Compile Include="ControlLayer\IO\QueueObjects.cs" />
     <Compile Include="ControlLayer\Motion\GSIDrive.cs" />
+    <Compile Include="ControlLayer\MQ\ZmqManager.cs" />
     <Compile Include="ControlLayer\Serial\BatteryTabos\Advantech\Advantech.cs" />
     <Compile Include="ControlLayer\Serial\BatteryTabos\Advantech\AdvCan.cs" />
     <Compile Include="ControlLayer\Serial\BatteryTabos\Advantech\AdvCANIO.cs" />

+ 1 - 1
Dev/OHVDriveLogger/OHVDriveLogger/OHVDriveLogger/FormMain.cs

@@ -75,7 +75,7 @@ namespace OHVDriveLogger
 
         private void button1_Click( object sender, EventArgs e )
         {
-            this.zmp.Request("10");
+            this.zmp.Request("Hello");
         }
     }
 }

+ 68 - 11
Dev/OHVDriveLogger/OHVDriveLogger/OHVDriveLogger/ZmqManager.cs

@@ -7,6 +7,7 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Text;
 using System.Threading.Tasks;
+using NetMQ.Monitoring;
 
 namespace OHVDriveLogger
 {
@@ -17,12 +18,20 @@ namespace OHVDriveLogger
         SubscriberSocket sub = null;
         RequestSocket req = null;
 
-        PublisherSocket pub = null;
-
         NetMQPoller poller = null;
+        NetMQMonitor monitor = null;
 
         ThreadCancel threadCancel = new ThreadCancel();
 
+        private bool isReqConnected;
+
+        public bool IsReqConnected
+        {
+            get { return isReqConnected; }
+            set { isReqConnected = value; }
+        }
+
+
         public ZmqManager()
         {
             NetMQ.NetMQConfig.Cleanup();
@@ -35,11 +44,18 @@ namespace OHVDriveLogger
 
             sub = new SubscriberSocket();
             sub.Connect( "tcp://127.0.0.1:5565" );
-            sub.Subscribe(""); //All
+            sub.Connect( "tcp://127.0.0.1:5566" );
+            sub.Subscribe( "" ); //All
 
             sub.ReceiveReady += Sub_ReceiveReady;
 
             req = new RequestSocket();
+            this.monitor = new NetMQMonitor( req, "inproc://rep.inproc", SocketEvents.Disconnected | SocketEvents.Connected );
+            this.monitor.Connected += ( s, a ) => { this.IsReqConnected = true; };
+            this.monitor.Disconnected += ( s, a ) => { this.IsReqConnected = false; };
+
+            this.monitor.StartAsync();
+
             req.Connect( "tcp://127.0.0.1:5567" );
 
             this.poller = new NetMQPoller { this.sub };
@@ -50,12 +66,16 @@ namespace OHVDriveLogger
 
         public void Dispose()
         {
+            this.threadCancel.Cancel();
+
+            this.monitor.Stop();
+            this.monitor.Dispose();
+
+            this.poller.Stop();
             this.poller.Dispose();
-            //this.pub.Dispose();
+
             this.sub.Dispose();
             this.req.Dispose();
-
-            this.threadCancel.Cancel();
         }
 
         private void Sub_ReceiveReady( object sender, NetMQ.NetMQSocketEventArgs e )
@@ -68,7 +88,7 @@ namespace OHVDriveLogger
             while ( !this.threadCancel.Canceled )
             {
                 LockUtils.Wait( 1000 );
-                this.pub.SendMoreFrame( "1000" ).SendFrame( "Test" );
+                //this.pub.SendMoreFrame( "1000" ).SendFrame( "Test" );
                 LockUtils.Wait( 100 );
 
                 NetMQMessage msg = new NetMQMessage();
@@ -79,15 +99,52 @@ namespace OHVDriveLogger
             }
         }
 
-        public string Request(string topic )
+        public string Request( string topic )
         {
-            this.req.SendFrame( topic );
+            //if ( this.req.TrySendFrame( topic ) )
+            //{
+            //    return string.Empty;
+            //}
+            if ( this.IsReqConnected )
+            {
+
+            }
+            if ( this.req.HasOut )
+            {
+                this.req.SendFrame( topic );
+            }
 
-            if ( this.req.Poll( TimeSpan.FromSeconds( 5 ) ) )
+            string outStr = string.Empty;
+            if ( this.req.TryReceiveFrameString( TimeSpan.FromSeconds( 5 ), out outStr ) )
             {
-                return this.req.ReceiveFrameString();
+                var s = outStr;
             }
             else
+            {
+                this.req.Disconnect( "tcp://127.0.0.1:5567" );
+                this.req.Dispose();
+                this.req = null;
+                LockUtils.Wait( 100 );
+                this.req = new RequestSocket();
+                this.req.Connect( "tcp://127.0.0.1:5567" );
+                return string.Empty;
+            }
+
+            //var msg = this.req.ReceiveFrameString();
+
+            //if ( this.req.HasIn )
+            //{
+            //    if ( this.req.TryReceiveFrameString( TimeSpan.FromSeconds( 5 ), out outStr ) )
+            //    {
+            //        var s = outStr;
+            //    }
+            //}
+
+            //if ( this.req.Poll( TimeSpan.FromSeconds( 5 ) ) )
+            //{
+            //    return this.req.ReceiveFrameString();
+            //}
+            //else
                 return string.Empty;
         }
     }