using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using GSG.NET.Concurrent; using GSG.NET.LINQ; using GSG.NET.Logging; using GSG.NET.Quartz; using GSG.NET.TCP; using GSG.NET.Utils; namespace OHVConnector { public partial class Manager { const byte STX = 0x02; const byte ETX = 0x03; const long INIT_CTRL_SYSBYTE = 0x40000000; long sysbyte = 0; long ctrl_sysbyte = INIT_CTRL_SYSBYTE; static Logger logger = Logger.GetLogger(); TsQueue qQ = new TsQueue(); TsQueue qqW = new TsQueue(); TimerTemplate quzT3 = new TimerTemplate(); TsMap ddReq = new TsMap(); //sync 통신을 위함. TcpConnector h = new TcpConnector(); Thread _TQ;//pumping queue thread Thread _TW;//write Thread _TR;//read Thread _TLINK;//Linktest bool ModeActive { get; set; } bool? inited; public Config Config { get; set; } #region Properties public bool Connected { get { return h.Connected; } } /// /// 연결 시도 가능여부 체크 /// Connect 호출 or 연결중: true /// 최초 or Disconnect 호출: false /// public bool Connecting { get { return inited.HasValue ? inited.Value : false; } } #endregion #region Constactor public Manager() { ModeActive = true; Config = new Config(); quzT3.OnTimeout += _OnTimeout; h.OnTcpStateChanged += _OnLog; } #endregion #region Connection Method public void Connect(bool active) { ModeActive = active; if (!inited.HasValue)//최초 한번 실행. { _TQ = ThreadUtils.Invoke(_ThPullQueue); inited = false; } Assert.IsFalse(inited.Value, "Already connecting"); inited = true; _TW = ThreadUtils.Invoke(_ThWriteTcp); _TR = ThreadUtils.Invoke(_ThReadTcp); _TLINK = ThreadUtils.Invoke(_ThLinkQuz); _OnLog("HSMS CONNECT REQ " + Config); } public void Disconnect() { //if (inited.HasValue && inited.Value) if (Connecting) { _OnLog("HSMS DISCONNECT REQ " + Config); inited = false; quzT3.StopAll(); ThreadUtils.Kill(_TLINK); ThreadUtils.Kill(_TW); h.StopListen();//20170720 연결 시도중(passive listening)에 disconnect blocking patch. ThreadUtils.Kill(_TR); //h.StopListen(); 20170720 h.CloseSocket();//Kill을 사용할 경우 뒤에 존재해야 한다. } } void _OnDicontd(Exception e) { _OnLog("HSMS DISCONNECTED"); sysbyte = 0; ctrl_sysbyte = INIT_CTRL_SYSBYTE; quzT3.StopAll(); ddReq.Clear(); qQ.Enqueue(new QoNotComm { Arg0 = e }); } void _OnContd() { _OnLog("HSMS CONNECTED"); qQ.Enqueue(new QoComm()); } void TcpConnect() { h.Connect(new TcpComm { Active = ModeActive, RetryCnt = 1, //T5를 처리해야 함. Ip = Config.IpAddress, PortNo = Config.Port, T5 = Config.T5, T6 = Config.T6,//Config.TcpRecdTimeout, }); if (!h.Connected) { if (ModeActive) _OnLog("T5 TIMEOUT " + Config.ID); return; } _OnContd(); ChgTcpTimeout(true); //if (ModeActive) //SendCtrlMsg(1);//무조건 HSMS Active } #endregion private void _OnLog(string obj) { //throw new NotImplementedException(); } private void _OnTimeout(long id, OCSMessage msg) { if (null == msg) { logger.W("T3 [{0}] attachment is null", id); return; } qQ.Enqueue(new QoTimeout { Arg0 = msg }); //_OnLog("T3 TIMEOUT {0}".format(msg.LogHeader)); } #region Thread Method readonly object lockLink = new object(); void _ThLinkQuz() { for (; ; ) { try { bool waked; if (Config.LinkOn) waked = LockUtils.Wait(Config.TLink, lockLink); else waked = LockUtils.Wait(lockLink); if (waked) continue;//notify: 패킷을 수신할때마다 reset함. if (Connected)//연결여부와 상관없이 thread가 기동되므로 연결시에만. { //SendCtrlMsg(5); } } catch (ThreadAbortException) { break; } catch (Exception e) { logger.E(e); } } } void _ThWriteTcp() { logger.I("Write {0}", ThreadUtils.GetCurrThreadID()); for (; ; ) { try { var v = qqW.Dequeue(); this.TcpWriteMsg(v); //v.IsRecd = false; //if (v.AfterMillis > 0) // LockUtils.Wait(v.AfterMillis); //if (v.CtrlMsg) // TcpWriteCtrlMsg(v); //else // TcpWriteNormalMsg(v); } catch (ThreadAbortException) { break; } catch (Exception e) { logger.E(e); } } } void _ThReadTcp() { for (; ; ) { try { if (!h.Connected) { TcpConnect(); continue; } ReadSocket(); } catch (ObjectDisposedException e) { TcpError(e); } catch (IOException e) { TcpError(e); } catch (ThreadAbortException) { _OnLog("DISCONNECT REQUEST APPLIED " + Config); TcpError(new IOException("DISCONNECT REQUEST")); break; } catch (Exception e) { logger.E(e); } } } void _ThPullQueue() { for (; ; )//queue 데이터 소진 위해 while을 쓰지 않는다. { try { var qo = this.qQ.Dequeue(); if (qo is QoRecdUnk) { DelegateUtils.Invoke(OnRecdUnk, qo.Arg0, qo.Arg1); //if (AutoS9Fy) //{ // var v = qo.Arg0 as SFMessage; // Send(v.S9Fy); //} } else if (qo is QoComm) DelegateUtils.Invoke(OnContd, Config.ID); else if (qo is QoNotComm) DelegateUtils.Invoke(OnDiscontd, Config.ID, qo.Arg0); else if (qo is QoLog) DelegateUtils.Invoke(OnLog, Config.ID, qo.Arg0); else if (qo is QoRecd) DelegateUtils.Invoke(OnRecd, qo.Arg0); else if (qo is QoTimeout) { DelegateUtils.Invoke(OnT3Timeout, qo.Arg0); //if (AutoS9Fy) //{ // var v = qo.Arg0 as SFMessage; // Send(MessageSupport.MakeS9FX(9, v)); //} } else if (qo is QoSent) DelegateUtils.Invoke(OnSent, qo.Arg0); else Assert.Fail("Unk Object {0}", qo); } catch (ThreadAbortException) { break; } catch (Exception e) { logger.E(e); } } } #endregion #region Read Method void ReadSocket() { var recByte = h.ReadUntil(ETX); //if (!len.FwBtw(10, MAX_SIZE)) // throw new IOException("HSMS ABNORMAL LENGTH:" + len); //var head = h.ReadBytes(10); //var body = h.ReadBytes(len - 10); ChgTcpTimeout(true);//무언가 받으면 LockUtils.NotifyAll(lockLink);//Linktest thread 변환의 notify //var v = new OCSMessage { Header = head, Body = body, IsRecd = true }; //v.Decoding(); //_OnRecd(v); } void _OnRecd(OCSMessage recd) { //recd.Id = Config.ID; //if (recd.CtrlMsg) //{ // bool skip = recd.CtrlLinkTest && Config.HideLogLink; // if (!skip) // _OnLog(recd.LogFormat()); // if (recd.CtrlSeparate) // throw new IOException("HSMS SEPARATE MESSAGE"); // else if (recd.CtrlSelectReq) // { // RepCtrlMsg(recd); // _OnContd(); // } // else if (recd.CtrlLinkReq) // RepCtrlMsg(recd); // else if (recd.CtrlLinkRep) // { } // else if (recd.CtrlSelectRep) // _OnContd(); // else // logger.W(recd.LogFormatAll()); //} //else //{ // //Stop T3, Correlation Mapping // if (recd.IsSecondary && quzT3.HasId(recd.Systembyte)) // { // var keep = quzT3.Stop(recd.Systembyte); // recd.Correlation = keep.Correlation; // } // //CopyNames // var compared = new List(); // this.HML.FindWellKnownMsg(recd, out compared); // bool well = recd.UnkCode == 0; // bool syncRsp = recd.IsSecondary && ddReq.ContainsKey(recd.Systembyte); // if (recd.DeviceId != Config.DeviceID)//Chk unk device id // recd.UnkCode = 1; // //Unk or WellKnown 모든 msg의 ReqReply처리 한다. // if (syncRsp) // { // var so = ddReq[recd.Systembyte]; // so.Expect = recd; // so.Notify(); // _OnLog(recd.LogFormat()); // return; // } // if (1 == recd.UnkCode)//Chk unk device id // { // string w = "DeviceID mismatch Config:{0} {1}".format(Config.DeviceID, recd.LogHeader); // _OnLog(w); // logger.W(w); // qQ.Enqueue(new QoRecdUnk { Arg0 = recd, Arg1 = compared }); // } // else if (well || recd.IsAbortMessage)//abort 도 추가 없이 Well 판단. // { // ////time change disconnect 방지 // //if (recd.SxFx.Equals(MSG_TIME_CHG_S2F31) || recd.SxFx.Equals(MSG_TIME_CHG_S2F18)) // // ChgTcpTimeout(true); // qQ.Enqueue(new QoRecd { Arg0 = recd }); // } // else // { // //추가 Logging, Unk Stream,Function // if (!HML.HasStream(recd.Stream))//Unknown stream // _OnLog("UNK STREAM {0}".format(recd.LogHeader)); // else if (!HML.HasSxFy(recd.SxFx))//Unknown function // _OnLog("UNK FUNCTION {0}".format(recd.LogHeader)); // //Unk msg일 경우 Logging함. (동우화인캠 2013.04 요청사항 -> RecdUnk Arg2로 이동함.) // qQ.Enqueue(new QoRecdUnk { Arg0 = recd, Arg1 = compared }); // } // AutoRepMsg(recd); //} } void TcpError(Exception e) { _OnLog(TcpUtils.GetTcpErrMsg(h.IPClient, e)); h.CloseSocket(); _OnDicontd(e); LockUtils.Wait(1000);//잠시대기. } #endregion #region Write Method void TcpWriteMsg(OCSMessage msg) { qQ.Enqueue(new QoSent { Arg0 = msg }); this.h.WriteFlush(msg.ToMemoryBuffer().ToBytes); } void TcpWriteNormalMsg(OCSMessage nm) { //nm.Encoding(); //if (nm.IsPrimary && nm.IsWbit) // quzT3.StartOnce(Config.T3 * ConstUtils.ONE_SECOND, nm.Systembyte, nm); //int len = nm.Header.Length + nm.Body.Length + 10; //var mb = new MemoryBuffer(len); //mb.AppendBeInt(nm.Length); //mb.Append(nm.Header); //mb.Append(nm.Body); //qQ.Enqueue(new QoSent { Arg0 = nm }); //h.WriteFlush(mb.ToBytes); } void TcpWriteCtrlMsg(OCSMessage ctrl) { //var mb = new MemoryBuffer(16); //mb.AppendBeInt(10); //mb.Append(ctrl.Header); //bool skip = ctrl.CtrlLinkTest && Config.HideLogLink; //if (!skip) // _OnLog(ctrl.LogFormat()); //h.WriteFlush(mb.ToBytes); } void SendCtrlMsg(int stype) { //Send(new OCSMessage { SType = stype }); } public void Send(OCSMessage msg, int after) { if (after > 0) TimerUtils.Once(after, Send, msg); else Send(msg); } public void Send(OCSMessage msg) { //msg.Id = Config.ID; if (!Connected) { _OnLog("Send fail not connected" + msg.LogFormat()); return; } //if (msg.CtrlMsg) //{ // if (msg.CtrlSelectReq || msg.CtrlLinkReq) // { // msg.Systembyte = Interlocked.Increment(ref ctrl_sysbyte); // ChgTcpTimeout(false);//select, linktest req // } //} //else //{ // msg.DeviceId = msg.SessID.HasValue ? msg.SessID.Value : Config.DeviceID; // if (msg.IsPrimary && msg.NeedSetSysbyte) // msg.Systembyte = Interlocked.Increment(ref sysbyte); //} qqW.Enqueue(msg); } #endregion #region HelpMothed void ChgTcpTimeout(bool infinite) { if (h.Connected) { if (infinite) { if (h.Socket.ReceiveTimeout != Timeout.Infinite) h.ChangeRecvTimeout(Timeout.Infinite); } else { if (Config.LinkOn) h.ChangeRecvTimeout(Config.T6 * ConstUtils.ONE_SECOND); } } } #endregion } }