Manager.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Text;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using GSG.NET.Concurrent;
  9. using GSG.NET.LINQ;
  10. using GSG.NET.Logging;
  11. using GSG.NET.Quartz;
  12. using GSG.NET.TCP;
  13. using GSG.NET.Utils;
  14. namespace OHVConnector
  15. {
  16. public partial class Manager
  17. {
  18. const byte STX = 0x02;
  19. const byte ETX = 0x03;
  20. const long INIT_CTRL_SYSBYTE = 0x40000000;
  21. long sysbyte = 0;
  22. long ctrl_sysbyte = INIT_CTRL_SYSBYTE;
  23. static Logger logger = Logger.GetLogger();
  24. TsQueue<QueueObject> qQ = new TsQueue<QueueObject>();
  25. TsQueue<OCSMessage> qqW = new TsQueue<OCSMessage>();
  26. TimerTemplate<long, OCSMessage> quzT3 = new TimerTemplate<long, OCSMessage>();
  27. TsMap<long, SyncObject> ddReq = new TsMap<long, SyncObject>(); //sync 통신을 위함.
  28. TcpConnector h = new TcpConnector();
  29. Thread _TQ;//pumping queue thread
  30. Thread _TW;//write
  31. Thread _TR;//read
  32. Thread _TLINK;//Linktest
  33. bool ModeActive { get; set; }
  34. bool? inited;
  35. public Config Config { get; set; }
  36. #region Properties
  37. public bool Connected
  38. {
  39. get { return h.Connected; }
  40. }
  41. /// <summary>
  42. /// 연결 시도 가능여부 체크
  43. /// <para>Connect 호출 or 연결중: true</para>
  44. /// <para>최초 or Disconnect 호출: false</para>
  45. /// </summary>
  46. public bool Connecting
  47. {
  48. get { return inited.HasValue ? inited.Value : false; }
  49. }
  50. #endregion
  51. #region Constactor
  52. public Manager()
  53. {
  54. ModeActive = true;
  55. Config = new Config();
  56. quzT3.OnTimeout += _OnTimeout;
  57. h.OnTcpStateChanged += _OnLog;
  58. }
  59. #endregion
  60. #region Connection Method
  61. public void Connect(bool active)
  62. {
  63. ModeActive = active;
  64. if (!inited.HasValue)//최초 한번 실행.
  65. {
  66. _TQ = ThreadUtils.Invoke(_ThPullQueue);
  67. inited = false;
  68. }
  69. Assert.IsFalse(inited.Value, "Already connecting");
  70. inited = true;
  71. _TW = ThreadUtils.Invoke(_ThWriteTcp);
  72. _TR = ThreadUtils.Invoke(_ThReadTcp);
  73. _TLINK = ThreadUtils.Invoke(_ThLinkQuz);
  74. _OnLog("HSMS CONNECT REQ " + Config);
  75. }
  76. public void Disconnect()
  77. {
  78. //if (inited.HasValue && inited.Value)
  79. if (Connecting)
  80. {
  81. _OnLog("HSMS DISCONNECT REQ " + Config);
  82. inited = false;
  83. quzT3.StopAll();
  84. ThreadUtils.Kill(_TLINK);
  85. ThreadUtils.Kill(_TW);
  86. h.StopListen();//20170720 연결 시도중(passive listening)에 disconnect blocking patch.
  87. ThreadUtils.Kill(_TR);
  88. //h.StopListen(); 20170720
  89. h.CloseSocket();//Kill을 사용할 경우 뒤에 존재해야 한다.
  90. }
  91. }
  92. void _OnDicontd(Exception e)
  93. {
  94. _OnLog("HSMS DISCONNECTED");
  95. sysbyte = 0;
  96. ctrl_sysbyte = INIT_CTRL_SYSBYTE;
  97. quzT3.StopAll();
  98. ddReq.Clear();
  99. qQ.Enqueue(new QoNotComm { Arg0 = e });
  100. }
  101. void _OnContd()
  102. {
  103. _OnLog("HSMS CONNECTED");
  104. qQ.Enqueue(new QoComm());
  105. }
  106. void TcpConnect()
  107. {
  108. h.Connect(new TcpComm
  109. {
  110. Active = ModeActive,
  111. RetryCnt = 1, //T5를 처리해야 함.
  112. Ip = Config.IpAddress,
  113. PortNo = Config.Port,
  114. T5 = Config.T5,
  115. T6 = Config.T6,//Config.TcpRecdTimeout,
  116. });
  117. if (!h.Connected)
  118. {
  119. if (ModeActive)
  120. _OnLog("T5 TIMEOUT " + Config.ID);
  121. return;
  122. }
  123. _OnContd();
  124. ChgTcpTimeout(true);
  125. //if (ModeActive)
  126. //SendCtrlMsg(1);//무조건 HSMS Active
  127. }
  128. #endregion
  129. private void _OnLog(string obj)
  130. {
  131. //throw new NotImplementedException();
  132. }
  133. private void _OnTimeout(long id, OCSMessage msg)
  134. {
  135. if (null == msg)
  136. {
  137. logger.W("T3 [{0}] attachment is null", id);
  138. return;
  139. }
  140. qQ.Enqueue(new QoTimeout { Arg0 = msg });
  141. //_OnLog("T3 TIMEOUT {0}".format(msg.LogHeader));
  142. }
  143. #region Thread Method
  144. readonly object lockLink = new object();
  145. void _ThLinkQuz()
  146. {
  147. for (; ; )
  148. {
  149. try
  150. {
  151. bool waked;
  152. if (Config.LinkOn)
  153. waked = LockUtils.Wait(Config.TLink, lockLink);
  154. else
  155. waked = LockUtils.Wait(lockLink);
  156. if (waked)
  157. continue;//notify: 패킷을 수신할때마다 reset함.
  158. if (Connected)//연결여부와 상관없이 thread가 기동되므로 연결시에만.
  159. {
  160. //SendCtrlMsg(5);
  161. }
  162. }
  163. catch (ThreadAbortException)
  164. {
  165. break;
  166. }
  167. catch (Exception e)
  168. {
  169. logger.E(e);
  170. }
  171. }
  172. }
  173. void _ThWriteTcp()
  174. {
  175. logger.I("Write {0}", ThreadUtils.GetCurrThreadID());
  176. for (; ; )
  177. {
  178. try
  179. {
  180. var v = qqW.Dequeue();
  181. this.TcpWriteMsg(v);
  182. //v.IsRecd = false;
  183. //if (v.AfterMillis > 0)
  184. // LockUtils.Wait(v.AfterMillis);
  185. //if (v.CtrlMsg)
  186. // TcpWriteCtrlMsg(v);
  187. //else
  188. // TcpWriteNormalMsg(v);
  189. }
  190. catch (ThreadAbortException)
  191. {
  192. break;
  193. }
  194. catch (Exception e)
  195. {
  196. logger.E(e);
  197. }
  198. }
  199. }
  200. void _ThReadTcp()
  201. {
  202. for (; ; )
  203. {
  204. try
  205. {
  206. if (!h.Connected)
  207. {
  208. TcpConnect();
  209. continue;
  210. }
  211. ReadSocket();
  212. }
  213. catch (ObjectDisposedException e)
  214. {
  215. TcpError(e);
  216. }
  217. catch (IOException e)
  218. {
  219. TcpError(e);
  220. }
  221. catch (ThreadAbortException)
  222. {
  223. _OnLog("DISCONNECT REQUEST APPLIED " + Config);
  224. TcpError(new IOException("DISCONNECT REQUEST"));
  225. break;
  226. }
  227. catch (Exception e)
  228. {
  229. logger.E(e);
  230. }
  231. }
  232. }
  233. void _ThPullQueue()
  234. {
  235. for (; ; )//queue 데이터 소진 위해 while을 쓰지 않는다.
  236. {
  237. try
  238. {
  239. var qo = this.qQ.Dequeue();
  240. if (qo is QoRecdUnk)
  241. {
  242. DelegateUtils.Invoke(OnRecdUnk, qo.Arg0, qo.Arg1);
  243. //if (AutoS9Fy)
  244. //{
  245. // var v = qo.Arg0 as SFMessage;
  246. // Send(v.S9Fy);
  247. //}
  248. }
  249. else if (qo is QoComm)
  250. DelegateUtils.Invoke(OnContd, Config.ID);
  251. else if (qo is QoNotComm)
  252. DelegateUtils.Invoke(OnDiscontd, Config.ID, qo.Arg0);
  253. else if (qo is QoLog)
  254. DelegateUtils.Invoke(OnLog, Config.ID, qo.Arg0);
  255. else if (qo is QoRecd)
  256. DelegateUtils.Invoke(OnRecd, qo.Arg0);
  257. else if (qo is QoTimeout)
  258. {
  259. DelegateUtils.Invoke(OnT3Timeout, qo.Arg0);
  260. //if (AutoS9Fy)
  261. //{
  262. // var v = qo.Arg0 as SFMessage;
  263. // Send(MessageSupport.MakeS9FX(9, v));
  264. //}
  265. }
  266. else if (qo is QoSent)
  267. DelegateUtils.Invoke(OnSent, qo.Arg0);
  268. else
  269. Assert.Fail("Unk Object {0}", qo);
  270. }
  271. catch (ThreadAbortException)
  272. {
  273. break;
  274. }
  275. catch (Exception e)
  276. {
  277. logger.E(e);
  278. }
  279. }
  280. }
  281. #endregion
  282. #region Read Method
  283. void ReadSocket()
  284. {
  285. var recByte = h.ReadUntil(ETX);
  286. //if (!len.FwBtw(10, MAX_SIZE))
  287. // throw new IOException("HSMS ABNORMAL LENGTH:" + len);
  288. //var head = h.ReadBytes(10);
  289. //var body = h.ReadBytes(len - 10);
  290. ChgTcpTimeout(true);//무언가 받으면
  291. LockUtils.NotifyAll(lockLink);//Linktest thread 변환의 notify
  292. //var v = new OCSMessage { Header = head, Body = body, IsRecd = true };
  293. //v.Decoding();
  294. //_OnRecd(v);
  295. }
  296. void _OnRecd(OCSMessage recd)
  297. {
  298. //recd.Id = Config.ID;
  299. //if (recd.CtrlMsg)
  300. //{
  301. // bool skip = recd.CtrlLinkTest && Config.HideLogLink;
  302. // if (!skip)
  303. // _OnLog(recd.LogFormat());
  304. // if (recd.CtrlSeparate)
  305. // throw new IOException("HSMS SEPARATE MESSAGE");
  306. // else if (recd.CtrlSelectReq)
  307. // {
  308. // RepCtrlMsg(recd);
  309. // _OnContd();
  310. // }
  311. // else if (recd.CtrlLinkReq)
  312. // RepCtrlMsg(recd);
  313. // else if (recd.CtrlLinkRep)
  314. // { }
  315. // else if (recd.CtrlSelectRep)
  316. // _OnContd();
  317. // else
  318. // logger.W(recd.LogFormatAll());
  319. //}
  320. //else
  321. //{
  322. // //Stop T3, Correlation Mapping
  323. // if (recd.IsSecondary && quzT3.HasId(recd.Systembyte))
  324. // {
  325. // var keep = quzT3.Stop(recd.Systembyte);
  326. // recd.Correlation = keep.Correlation;
  327. // }
  328. // //CopyNames
  329. // var compared = new List<SFMessage>();
  330. // this.HML.FindWellKnownMsg(recd, out compared);
  331. // bool well = recd.UnkCode == 0;
  332. // bool syncRsp = recd.IsSecondary && ddReq.ContainsKey(recd.Systembyte);
  333. // if (recd.DeviceId != Config.DeviceID)//Chk unk device id
  334. // recd.UnkCode = 1;
  335. // //Unk or WellKnown 모든 msg의 ReqReply처리 한다.
  336. // if (syncRsp)
  337. // {
  338. // var so = ddReq[recd.Systembyte];
  339. // so.Expect = recd;
  340. // so.Notify();
  341. // _OnLog(recd.LogFormat());
  342. // return;
  343. // }
  344. // if (1 == recd.UnkCode)//Chk unk device id
  345. // {
  346. // string w = "DeviceID mismatch Config:{0} {1}".format(Config.DeviceID, recd.LogHeader);
  347. // _OnLog(w);
  348. // logger.W(w);
  349. // qQ.Enqueue(new QoRecdUnk { Arg0 = recd, Arg1 = compared });
  350. // }
  351. // else if (well || recd.IsAbortMessage)//abort 도 추가 없이 Well 판단.
  352. // {
  353. // ////time change disconnect 방지
  354. // //if (recd.SxFx.Equals(MSG_TIME_CHG_S2F31) || recd.SxFx.Equals(MSG_TIME_CHG_S2F18))
  355. // // ChgTcpTimeout(true);
  356. // qQ.Enqueue(new QoRecd { Arg0 = recd });
  357. // }
  358. // else
  359. // {
  360. // //추가 Logging, Unk Stream,Function
  361. // if (!HML.HasStream(recd.Stream))//Unknown stream
  362. // _OnLog("UNK STREAM {0}".format(recd.LogHeader));
  363. // else if (!HML.HasSxFy(recd.SxFx))//Unknown function
  364. // _OnLog("UNK FUNCTION {0}".format(recd.LogHeader));
  365. // //Unk msg일 경우 Logging함. (동우화인캠 2013.04 요청사항 -> RecdUnk Arg2로 이동함.)
  366. // qQ.Enqueue(new QoRecdUnk { Arg0 = recd, Arg1 = compared });
  367. // }
  368. // AutoRepMsg(recd);
  369. //}
  370. }
  371. void TcpError(Exception e)
  372. {
  373. _OnLog(TcpUtils.GetTcpErrMsg(h.IPClient, e));
  374. h.CloseSocket();
  375. _OnDicontd(e);
  376. LockUtils.Wait(1000);//잠시대기.
  377. }
  378. #endregion
  379. #region Write Method
  380. void TcpWriteMsg(OCSMessage msg)
  381. {
  382. qQ.Enqueue(new QoSent { Arg0 = msg });
  383. this.h.WriteFlush(msg.ToMemoryBuffer().ToBytes);
  384. }
  385. void TcpWriteNormalMsg(OCSMessage nm)
  386. {
  387. //nm.Encoding();
  388. //if (nm.IsPrimary && nm.IsWbit)
  389. // quzT3.StartOnce(Config.T3 * ConstUtils.ONE_SECOND, nm.Systembyte, nm);
  390. //int len = nm.Header.Length + nm.Body.Length + 10;
  391. //var mb = new MemoryBuffer(len);
  392. //mb.AppendBeInt(nm.Length);
  393. //mb.Append(nm.Header);
  394. //mb.Append(nm.Body);
  395. //qQ.Enqueue(new QoSent { Arg0 = nm });
  396. //h.WriteFlush(mb.ToBytes);
  397. }
  398. void TcpWriteCtrlMsg(OCSMessage ctrl)
  399. {
  400. //var mb = new MemoryBuffer(16);
  401. //mb.AppendBeInt(10);
  402. //mb.Append(ctrl.Header);
  403. //bool skip = ctrl.CtrlLinkTest && Config.HideLogLink;
  404. //if (!skip)
  405. // _OnLog(ctrl.LogFormat());
  406. //h.WriteFlush(mb.ToBytes);
  407. }
  408. void SendCtrlMsg(int stype)
  409. {
  410. //Send(new OCSMessage { SType = stype });
  411. }
  412. public void Send(OCSMessage msg, int after)
  413. {
  414. if (after > 0)
  415. TimerUtils.Once(after, Send, msg);
  416. else
  417. Send(msg);
  418. }
  419. public void Send(OCSMessage msg)
  420. {
  421. //msg.Id = Config.ID;
  422. if (!Connected)
  423. {
  424. _OnLog("Send fail not connected" + msg.LogFormat());
  425. return;
  426. }
  427. //if (msg.CtrlMsg)
  428. //{
  429. // if (msg.CtrlSelectReq || msg.CtrlLinkReq)
  430. // {
  431. // msg.Systembyte = Interlocked.Increment(ref ctrl_sysbyte);
  432. // ChgTcpTimeout(false);//select, linktest req
  433. // }
  434. //}
  435. //else
  436. //{
  437. // msg.DeviceId = msg.SessID.HasValue ? msg.SessID.Value : Config.DeviceID;
  438. // if (msg.IsPrimary && msg.NeedSetSysbyte)
  439. // msg.Systembyte = Interlocked.Increment(ref sysbyte);
  440. //}
  441. qqW.Enqueue(msg);
  442. }
  443. #endregion
  444. #region HelpMothed
  445. void ChgTcpTimeout(bool infinite)
  446. {
  447. if (h.Connected)
  448. {
  449. if (infinite)
  450. {
  451. if (h.Socket.ReceiveTimeout != Timeout.Infinite)
  452. h.ChangeRecvTimeout(Timeout.Infinite);
  453. }
  454. else
  455. {
  456. if (Config.LinkOn)
  457. h.ChangeRecvTimeout(Config.T6 * ConstUtils.ONE_SECOND);
  458. }
  459. }
  460. }
  461. #endregion
  462. }
  463. }