Manager.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using GSG.NET.Concurrent;
  9. using GSG.NET.LINQ;
  10. using GSG.NET.Logging;
  11. using GSG.NET.Quartz;
  12. using GSG.NET.TCP;
  13. using GSG.NET.Utils;
  14. using GSG.NET.Extensions;
  15. namespace OHVConnector
  16. {
  17. public partial class Manager
  18. {
  19. const byte STX = 0x02;
  20. const byte ETX = 0x03;
  21. const long INIT_CTRL_SYSBYTE = 0x40000000;
  22. long sysbyte = 0;
  23. long ctrl_sysbyte = INIT_CTRL_SYSBYTE;
  24. static Logger logger = Logger.GetLogger();
  25. TsQueue<QueueObject> qQ = new TsQueue<QueueObject>();
  26. TsQueue<OCSMessage> qqW = new TsQueue<OCSMessage>();
  27. TimerTemplate<byte, OCSMessage> quzT3 = new TimerTemplate<byte, OCSMessage>();
  28. TsMap<long, SyncObject> ddReq = new TsMap<long, SyncObject>(); //sync 통신을 위함.
  29. TcpConnector h = new TcpConnector();
  30. Thread _TQ;//pumping queue thread
  31. Thread _TW;//write
  32. Thread _TR;//read
  33. Thread _TLINK;//Linktest
  34. bool ModeActive { get; set; }
  35. bool? inited;
  36. public Config Config { get; set; }
  37. #region Properties
  38. public bool Connected
  39. {
  40. get { return h.Connected; }
  41. }
  42. /// <summary>
  43. /// 연결 시도 가능여부 체크
  44. /// <para>Connect 호출 or 연결중: true</para>
  45. /// <para>최초 or Disconnect 호출: false</para>
  46. /// </summary>
  47. public bool Connecting
  48. {
  49. get { return inited.HasValue ? inited.Value : false; }
  50. }
  51. #endregion
  52. #region Constactor
  53. public Manager()
  54. {
  55. ModeActive = true;
  56. Config = new Config();
  57. quzT3.OnTimeout += _OnTimeout;
  58. h.OnTcpStateChanged += _OnLog;
  59. }
  60. #endregion
  61. #region Connection Method
  62. public void Connect(bool active = false)
  63. {
  64. ModeActive = active;
  65. if (!inited.HasValue)//최초 한번 실행.
  66. {
  67. _TQ = ThreadUtils.Invoke(_ThPullQueue);
  68. inited = false;
  69. }
  70. Assert.IsFalse(inited.Value, "Already connecting");
  71. inited = true;
  72. _TW = ThreadUtils.Invoke(_ThWriteTcp);
  73. _TR = ThreadUtils.Invoke(_ThReadTcp);
  74. _TLINK = ThreadUtils.Invoke(_ThLinkQuz);
  75. _OnLog("OHV CONNECT REQ " + Config);
  76. }
  77. public void Disconnect()
  78. {
  79. //if (inited.HasValue && inited.Value)
  80. if (Connecting)
  81. {
  82. _OnLog("HSMS DISCONNECT REQ " + Config);
  83. inited = false;
  84. quzT3.StopAll();
  85. ThreadUtils.Kill(_TLINK);
  86. ThreadUtils.Kill(_TW);
  87. h.StopListen();
  88. ThreadUtils.Kill(_TR);
  89. h.CloseSocket();//Kill을 사용할 경우 뒤에 존재해야 한다.
  90. }
  91. }
  92. void _OnDicontd(Exception e)
  93. {
  94. _OnLog("OHV DISCONNECTED");
  95. sysbyte = 0;
  96. ctrl_sysbyte = INIT_CTRL_SYSBYTE;
  97. quzT3.StopAll();
  98. ddReq.Clear();
  99. qQ.Enqueue(new QoNotComm { Arg0 = e });
  100. }
  101. void _OnContd()
  102. {
  103. _OnLog("OHV CONNECTED");
  104. qQ.Enqueue(new QoComm());
  105. }
  106. void TcpConnect()
  107. {
  108. h.Connect(new TcpComm
  109. {
  110. Active = ModeActive,
  111. RetryCnt = 1, //T5를 처리해야 함.
  112. Ip = Config.IpAddress,
  113. PortNo = Config.Port,
  114. T5 = Config.T5,
  115. T6 = Config.T6,//Config.TcpRecdTimeout,
  116. });
  117. if (!h.Connected)
  118. {
  119. if (ModeActive)
  120. _OnLog("T5 TIMEOUT " + Config.ID);
  121. return;
  122. }
  123. _OnContd();
  124. ChgTcpTimeout(true);
  125. //if (ModeActive)
  126. //SendCtrlMsg(1);//무조건 HSMS Active
  127. }
  128. #endregion
  129. private void _OnLog(string obj)
  130. {
  131. qQ.Enqueue(new QoLog { Arg0 = obj });
  132. }
  133. private void _OnTimeout(byte id, OCSMessage msg)
  134. {
  135. if (null == msg)
  136. {
  137. logger.W("T3 [{0}] attachment is null", id);
  138. return;
  139. }
  140. qQ.Enqueue(new QoTimeout { Arg0 = msg });
  141. //_OnLog("T3 TIMEOUT {0}".format(msg.LogHeader));
  142. }
  143. #region Thread Method
  144. readonly object lockLink = new object();
  145. void _ThLinkQuz()
  146. {
  147. for (; ; )
  148. {
  149. try
  150. {
  151. bool waked;
  152. if (Config.LinkOn)
  153. waked = LockUtils.Wait(Config.TLink, lockLink);
  154. else
  155. waked = LockUtils.Wait(lockLink);
  156. if (waked)
  157. continue;//notify: 패킷을 수신할때마다 reset함.
  158. if (Connected)//연결여부와 상관없이 thread가 기동되므로 연결시에만.
  159. {
  160. //SendCtrlMsg(5);
  161. }
  162. }
  163. catch (ThreadAbortException)
  164. {
  165. break;
  166. }
  167. catch (Exception e)
  168. {
  169. logger.E(e);
  170. }
  171. }
  172. }
  173. void _ThWriteTcp()
  174. {
  175. logger.I("Write {0}", ThreadUtils.GetCurrThreadID());
  176. for (; ; )
  177. {
  178. try
  179. {
  180. var v = qqW.Dequeue();
  181. this.TcpWriteMsg(v);
  182. //v.IsRecd = false;
  183. //if (v.AfterMillis > 0)
  184. // LockUtils.Wait(v.AfterMillis);
  185. //if (v.CtrlMsg)
  186. // TcpWriteCtrlMsg(v);
  187. //else
  188. // TcpWriteNormalMsg(v);
  189. }
  190. catch (ThreadAbortException)
  191. {
  192. break;
  193. }
  194. catch (Exception e)
  195. {
  196. logger.E(e);
  197. }
  198. }
  199. }
  200. void _ThReadTcp()
  201. {
  202. for (; ; )
  203. {
  204. try
  205. {
  206. if (!h.Connected)
  207. {
  208. TcpConnect();
  209. continue;
  210. }
  211. ReadSocket();
  212. }
  213. catch (ObjectDisposedException e)
  214. {
  215. TcpError(e);
  216. }
  217. catch (IOException e)
  218. {
  219. TcpError(e);
  220. }
  221. catch (ThreadAbortException)
  222. {
  223. _OnLog("DISCONNECT REQUEST APPLIED " + Config);
  224. TcpError(new IOException("DISCONNECT REQUEST"));
  225. break;
  226. }
  227. catch (Exception e)
  228. {
  229. logger.E(e);
  230. }
  231. }
  232. }
  233. void _ThPullQueue()
  234. {
  235. for (; ; )
  236. {
  237. try
  238. {
  239. var qo = this.qQ.Dequeue();
  240. if (qo is QoRecdUnk)
  241. {
  242. DelegateUtils.Invoke(OnRecdUnk, qo.Arg0, qo.Arg1);
  243. //if (AutoS9Fy)
  244. //{
  245. // var v = qo.Arg0 as SFMessage;
  246. // Send(v.S9Fy);
  247. //}
  248. }
  249. else if (qo is QoComm)
  250. DelegateUtils.Invoke(OnContd, Config.ID);
  251. else if (qo is QoNotComm)
  252. DelegateUtils.Invoke(OnDiscontd, Config.ID, qo.Arg0);
  253. else if (qo is QoLog)
  254. DelegateUtils.Invoke(OnLog, Config.ID, qo.Arg0);
  255. else if (qo is QoRecd)
  256. DelegateUtils.Invoke(OnRecd, qo.Arg0);
  257. else if (qo is QoTimeout)
  258. {
  259. DelegateUtils.Invoke(OnT3Timeout, qo.Arg0);
  260. //if (AutoS9Fy)
  261. //{
  262. // var v = qo.Arg0 as SFMessage;
  263. // Send(MessageSupport.MakeS9FX(9, v));
  264. //}
  265. }
  266. else if (qo is QoSent)
  267. DelegateUtils.Invoke(OnSent, qo.Arg0);
  268. else
  269. Assert.Fail("Unk Object {0}", qo);
  270. }
  271. catch (ThreadAbortException)
  272. {
  273. break;
  274. }
  275. catch (Exception e)
  276. {
  277. logger.E(e);
  278. }
  279. }
  280. }
  281. #endregion
  282. #region Read Method
  283. void ReadSocket()
  284. {
  285. h.ReadByte(); //STX
  286. string revID = string.Empty;
  287. string sendID = string.Empty;
  288. if (ModeActive)
  289. {
  290. revID = h.ReadAscii(2);
  291. sendID = h.ReadAscii(5);
  292. }
  293. else
  294. {
  295. revID = h.ReadAscii(5);
  296. sendID = h.ReadAscii(2);
  297. }
  298. if (!this.Config.ID.Equals(revID))
  299. OnLog(this.Config.ID, $"RevID Not Equals");
  300. var ocsMeg = new OCSMessage();
  301. ocsMeg.RevID = revID;
  302. ocsMeg.SendID = sendID;
  303. ocsMeg.Kind = h.ReadAscii(1).ToEnum<eKind>(eKind.Unknown);
  304. ocsMeg.Tag = h.ReadAscii(4);
  305. ocsMeg.SubCode = h.ReadAscii(3);
  306. //CheckSum 을 해야 하나??
  307. ocsMeg.CheckSum = h.ReadByte();
  308. h.ReadUntil(ETX);
  309. //Todo: 응답으로 온건지 그냥 보낸건지 분류가 필요. = CheckSum 을 저장 했다가 이용하자.
  310. //if (!len.FwBtw(10, MAX_SIZE))
  311. // throw new IOException("HSMS ABNORMAL LENGTH:" + len);
  312. //var head = h.ReadBytes(10);
  313. //var body = h.ReadBytes(len - 10);
  314. ChgTcpTimeout(true);//무언가 받으면
  315. LockUtils.NotifyAll(lockLink);//Linktest thread 변환의 notify
  316. //var v = new OCSMessage { Header = head, Body = body, IsRecd = true };
  317. //v.Decoding();
  318. _OnRecd(ocsMeg);
  319. }
  320. void _OnRecd(OCSMessage recd)
  321. {
  322. //Alive Check Reply
  323. if (recd.Kind == eKind.A && !ModeActive) //자동으로 응답을 보낸다. OCS 가 Active 상태
  324. {
  325. var reply = new OCSMessage()
  326. {
  327. Id = this.Config.ID,
  328. RevID = recd.SendID,
  329. SendID = this.Config.ID,
  330. Kind = eKind.A,
  331. Tag = recd.Tag,
  332. SubCode = recd.SubCode,
  333. };
  334. Reply(reply);
  335. return;
  336. }
  337. if ( recd.Kind == eKind.C )
  338. {
  339. this.qQ.Enqueue( new QoRecd { Arg0 = recd } );
  340. return;
  341. }
  342. //Send 한 Message 의 Reply 로 판단.
  343. if (this.quzT3.HasId(recd.CheckSum))
  344. {
  345. //Send 목록에서 삭제한다.
  346. this.quzT3.Stop(recd.CheckSum);
  347. this._OnLog($"[Received] - Reply - {recd.LogFormat()}");
  348. return;
  349. }
  350. this.qQ.Enqueue(new QoRecd { Arg0 = recd });
  351. }
  352. void TcpError(Exception e)
  353. {
  354. _OnLog(TcpUtils.GetTcpErrMsg(h.IPClient, e));
  355. h.CloseSocket();
  356. _OnDicontd(e);
  357. LockUtils.Wait(1000);//잠시대기.
  358. }
  359. #endregion
  360. #region Write Method
  361. void TcpWriteMsg(OCSMessage msg)
  362. {
  363. qQ.Enqueue(new QoSent { Arg0 = msg });
  364. this.h.WriteFlush(msg.ToMemoryBuffer().ToBytes);
  365. }
  366. void TcpWriteNormalMsg(OCSMessage nm)
  367. {
  368. //nm.Encoding();
  369. //if (nm.IsPrimary && nm.IsWbit)
  370. // quzT3.StartOnce(Config.T3 * ConstUtils.ONE_SECOND, nm.Systembyte, nm);
  371. //int len = nm.Header.Length + nm.Body.Length + 10;
  372. //var mb = new MemoryBuffer(len);
  373. //mb.AppendBeInt(nm.Length);
  374. //mb.Append(nm.Header);
  375. //mb.Append(nm.Body);
  376. //qQ.Enqueue(new QoSent { Arg0 = nm });
  377. //h.WriteFlush(mb.ToBytes);
  378. }
  379. void TcpWriteCtrlMsg(OCSMessage ctrl)
  380. {
  381. //var mb = new MemoryBuffer(16);
  382. //mb.AppendBeInt(10);
  383. //mb.Append(ctrl.Header);
  384. //bool skip = ctrl.CtrlLinkTest && Config.HideLogLink;
  385. //if (!skip)
  386. // _OnLog(ctrl.LogFormat());
  387. //h.WriteFlush(mb.ToBytes);
  388. }
  389. void SendCtrlMsg(int stype)
  390. {
  391. //Send(new OCSMessage { SType = stype });
  392. }
  393. public void Send(OCSMessage msg, int after)
  394. {
  395. if (after > 0)
  396. TimerUtils.Once(after, Send, msg);
  397. else
  398. Send(msg);
  399. }
  400. public void Send(OCSMessage msg)
  401. {
  402. //msg.Id = Config.ID;
  403. if (!Connected)
  404. {
  405. _OnLog("Send fail not connected" + msg.LogFormat());
  406. return;
  407. }
  408. msg.RevID = Config.HostID;
  409. msg.SendID = Config.ID;
  410. if ( msg.Kind == eKind.C ) // Control Message 는 페어로 응답이 오지 않는다.
  411. {
  412. qqW.Enqueue( msg );
  413. }
  414. if (this.quzT3.HasId(msg.GetCheckSum()))
  415. {
  416. _OnLog("quzT3 Has ID" + msg.LogFormat());
  417. return;
  418. }
  419. this.quzT3.StartOnce(Config.T3 * ConstUtils.ONE_SECOND, msg.GetCheckSum(), msg);
  420. //if (msg.CtrlMsg)
  421. //{
  422. // if (msg.CtrlSelectReq || msg.CtrlLinkReq)
  423. // {
  424. // msg.Systembyte = Interlocked.Increment(ref ctrl_sysbyte);
  425. // ChgTcpTimeout(false);//select, linktest req
  426. // }
  427. //}
  428. //else
  429. //{
  430. // msg.DeviceId = msg.SessID.HasValue ? msg.SessID.Value : Config.DeviceID;
  431. // if (msg.IsPrimary && msg.NeedSetSysbyte)
  432. // msg.Systembyte = Interlocked.Increment(ref sysbyte);
  433. //}
  434. qqW.Enqueue(msg);
  435. }
  436. /// <summary>
  437. /// 응답을 보낼 때 사용.
  438. /// </summary>
  439. /// <param name="msg"></param>
  440. public void Reply(OCSMessage msg)
  441. {
  442. if (!Connected)
  443. {
  444. _OnLog("Reply fail not connected" + msg.LogFormat());
  445. return;
  446. }
  447. msg.RevID = Config.HostID;
  448. msg.SendID = Config.ID;
  449. qqW.Enqueue(msg);
  450. }
  451. #endregion
  452. #region HelpMothed
  453. void ChgTcpTimeout(bool infinite)
  454. {
  455. if (h.Connected)
  456. {
  457. if (infinite)
  458. {
  459. if (h.Socket.ReceiveTimeout != Timeout.Infinite)
  460. h.ChangeRecvTimeout(Timeout.Infinite);
  461. }
  462. else
  463. {
  464. if (Config.LinkOn)
  465. h.ChangeRecvTimeout(Config.T6 * ConstUtils.ONE_SECOND);
  466. }
  467. }
  468. }
  469. #endregion
  470. }
  471. }