Major Update:
- Minor fix: Set correct names in comments at top of file - New FileCore Class: - Writing data to output file - BufferCore: - Check for "EAGAIN" on write and retry write - FunctionCore: - Add new Output method that references LocalIO directly - SelectableCore: - New method SetAutomanage to specify auto re-open parameters - Re-open timer implemented to slow re-open events - Only call ProcessBuffer() if data received on socket - Force processing input data when no input marker set - Use new Output method to simplify code - Bug fix: Read correctly from buffer on multiple reads/writes on FD - Check for "EAGAIN" on write to FD and retry write
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* CSelectableCore.cpp
|
||||
* SelectableCore.cpp
|
||||
*
|
||||
* Created on: 20 May 2016
|
||||
* Author: wentzelc
|
||||
@@ -60,7 +60,7 @@ CSelectableCore::~CSelectableCore()
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
THandle * CSelectableCore::CreateHandle( const char * HandleName, bool CreateLocalIO, bool AutoManage )
|
||||
THandle * CSelectableCore::CreateHandle( const char * HandleName, bool CreateLocalIO )
|
||||
{
|
||||
THandle ** Handle = NULL;
|
||||
|
||||
@@ -73,8 +73,7 @@ THandle * CSelectableCore::CreateHandle( const char * HandleName, bool CreateLo
|
||||
if (!*Handle)
|
||||
{
|
||||
// Create File handle at end of list
|
||||
*Handle = (THandle*)malloc( sizeof(THandle) );
|
||||
memset( *Handle, 0, sizeof(THandle) );
|
||||
*Handle = (THandle*)calloc( 1, sizeof(THandle) );
|
||||
|
||||
// Set name
|
||||
if (HandleName) {
|
||||
@@ -94,9 +93,6 @@ THandle * CSelectableCore::CreateHandle( const char * HandleName, bool CreateLo
|
||||
(*Handle)->LocalIO = AddLocalIO( HandleName );
|
||||
}
|
||||
|
||||
// Set other params
|
||||
(*Handle)->Auto = AutoManage;
|
||||
|
||||
return *Handle;
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
@@ -238,6 +234,20 @@ bool CSelectableCore::ClearHandle( THandle * Handle )
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
bool CSelectableCore::SetAutoManage( THandle * Handle, bool AutoManage, int ReopenTime )
|
||||
{
|
||||
// Validate
|
||||
if (!Handle) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Set params
|
||||
Handle->Auto = AutoManage;
|
||||
Handle->ReopenTimeout = ReopenTime;
|
||||
return true;
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
bool CSelectableCore::SetBuffers( THandle * Handle, int InBufSize, int OutBufSize, int InTimeout, const char * InMarker, int InMarkerLen )
|
||||
{
|
||||
// Validate
|
||||
@@ -473,7 +483,7 @@ int CSelectableCore::OpenRemoteClientSocket( THandle * Handle )
|
||||
|
||||
// Create Remote Client Handle
|
||||
sprintf( ClientName, "%s-%d", Handle->Name, ClientFD );
|
||||
*RemoteClient = CreateHandle( ClientName, false, false );
|
||||
*RemoteClient = CreateHandle( ClientName, false );
|
||||
SetSocketHandle( *RemoteClient, ctRemoteClient, ClientAddress, 0, Handle->KeepAlive );
|
||||
|
||||
// Copy Parent Buffer setup
|
||||
@@ -560,7 +570,7 @@ int CSelectableCore::OpenClientSocket( THandle * Handle )
|
||||
(setsockopt( Handle->FD, SOL_TCP, TCP_KEEPINTVL, &TCPint_opt, sizeof(TCPint_opt)) == -1) ))
|
||||
{
|
||||
// Log Event
|
||||
LogMessage( DebugLevel, dlMedium, "%s: Handle %s - Could not set KeepAlive options [%s:%d] (%s)", Name, Handle->Name, Handle->Address, Handle->PortNo, strerror(errno) );
|
||||
LogMessage( DebugLevel, dlMedium, "%s: Handle '%s' - Could not set KeepAlive options [%s:%d] (%s)", Name, Handle->Name, Handle->Address, Handle->PortNo, strerror(errno) );
|
||||
|
||||
// Set State
|
||||
close( Handle->FD );
|
||||
@@ -578,7 +588,7 @@ int CSelectableCore::OpenClientSocket( THandle * Handle )
|
||||
|
||||
if (!connect( Handle->FD, (struct sockaddr *)&address, addr_len ))
|
||||
{
|
||||
LogMessage( DebugLevel, dlMedium, "%s: Handle %s - Client connected [%s:%d]", Name, Handle->Name, Handle->Address, Handle->PortNo );
|
||||
LogMessage( DebugLevel, dlMedium, "%s: Handle '%s' - Client connected [%s:%d]", Name, Handle->Name, Handle->Address, Handle->PortNo );
|
||||
|
||||
// Add to Select Lists
|
||||
if (Select) {
|
||||
@@ -615,8 +625,13 @@ int CSelectableCore::OpenClientSocket( THandle * Handle )
|
||||
|
||||
// Close socket
|
||||
close( Handle->FD );
|
||||
Handle->FD = -1;
|
||||
Handle->State = csFailed;
|
||||
|
||||
// Start re-open timer
|
||||
SetStartTime( &(Handle->ReopenStart) );
|
||||
|
||||
// Reset Handle
|
||||
Handle->FD = -1;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
@@ -686,6 +701,9 @@ bool CSelectableCore::Close( THandle * Handle, bool CloseChildren )
|
||||
Fail = (close( Handle->FD ))? true : false;
|
||||
Handle->State = ((Fail)? csFailed : csClosed);
|
||||
|
||||
// Start re-open timer
|
||||
SetStartTime( &(Handle->ReopenStart) );
|
||||
|
||||
// Show action
|
||||
switch (Handle->Type)
|
||||
{
|
||||
@@ -787,17 +805,16 @@ bool CSelectableCore::Read( THandle * Handle )
|
||||
}
|
||||
|
||||
// Read File directly into buffer
|
||||
if (!Handle->InBuffer || !(BytesRead = Handle->InBuffer->ReadFromFD( Handle->FD ))) {
|
||||
return false;
|
||||
if (Handle->InBuffer && (BytesRead = Handle->InBuffer->ReadFromFD( Handle->FD )))
|
||||
{
|
||||
// Process Buffer
|
||||
ProcessBuffer( Handle, false );
|
||||
}
|
||||
|
||||
// Process Buffer
|
||||
ProcessBuffer( Handle, false );
|
||||
|
||||
// Reset timer
|
||||
SetStartTime( &(Handle->InStart) );
|
||||
|
||||
return true;
|
||||
return (bool)BytesRead;
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
|
||||
@@ -874,8 +891,6 @@ bool CSelectableCore::ProcessBuffer( THandle * Handle, bool Force )
|
||||
int Pos = 0;
|
||||
int Len = 0;
|
||||
char * Data = NULL;
|
||||
TLocalIO * LocalIO = NULL;
|
||||
TLinkedIO * Output = NULL;
|
||||
|
||||
// Check if buffered data
|
||||
if (!Handle || !Handle->InBuffer || !Handle->InBuffer->Len()) {
|
||||
@@ -883,7 +898,7 @@ bool CSelectableCore::ProcessBuffer( THandle * Handle, bool Force )
|
||||
}
|
||||
|
||||
// Check if forced processed
|
||||
if (Force)
|
||||
if (Force || !Handle->InMarkerLen)
|
||||
{
|
||||
// Show Packet
|
||||
Len = Handle->InBuffer->Peek( &Data );
|
||||
@@ -891,18 +906,9 @@ bool CSelectableCore::ProcessBuffer( THandle * Handle, bool Force )
|
||||
|
||||
// Write buffer to Outputs
|
||||
if (Handle->Type == ctRemoteClient) {
|
||||
LocalIO = Handle->Parent->LocalIO;
|
||||
Output( Handle->Parent->LocalIO, Data, Len );
|
||||
} else {
|
||||
LocalIO = Handle->LocalIO;
|
||||
}
|
||||
if (LocalIO) {
|
||||
ShowOutput( DebugLevel, dlHigh, OutputDisplay, Data, Len, "%s: LocalIO '%s' - OUT:", Name, LocalIO->Name );
|
||||
|
||||
Output = LocalIO->FirstOutput;
|
||||
while (Output) {
|
||||
Output->Function->Input( Output->IOName, Data, Len );
|
||||
Output = Output->Next;
|
||||
}
|
||||
Output( Handle->LocalIO, Data, Len );
|
||||
}
|
||||
|
||||
// Clear processed bytes from buffer
|
||||
@@ -919,18 +925,9 @@ bool CSelectableCore::ProcessBuffer( THandle * Handle, bool Force )
|
||||
|
||||
// Write buffer to Outputs
|
||||
if (Handle->Type == ctRemoteClient) {
|
||||
LocalIO = Handle->Parent->LocalIO;
|
||||
Output( Handle->Parent->LocalIO, Data, Len );
|
||||
} else {
|
||||
LocalIO = Handle->LocalIO;
|
||||
}
|
||||
if (LocalIO) {
|
||||
ShowOutput( DebugLevel, dlHigh, OutputDisplay, Data, Len, "%s: LocalIO '%s' - OUT:", Name, LocalIO->Name );
|
||||
|
||||
Output = LocalIO->FirstOutput;
|
||||
while (Output) {
|
||||
Output->Function->Input( Output->IOName, Data, Len );
|
||||
Output = Output->Next;
|
||||
}
|
||||
Output( Handle->LocalIO, Data, Len );
|
||||
}
|
||||
|
||||
// Clear processed bytes from buffer
|
||||
@@ -957,15 +954,15 @@ int CSelectableCore::ReadFromFD( int FD, char * Data, int MaxLen )
|
||||
while (DataRemain)
|
||||
{
|
||||
// Read from file descriptor
|
||||
BytesRead = read( FD, Data, DataRemain );
|
||||
if (BytesRead <= 0)
|
||||
BytesRead = read( FD, &Data[TotalRead], DataRemain );
|
||||
if ((BytesRead <= 0)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Update Data Pointers
|
||||
TotalRead += BytesRead;
|
||||
DataRemain -= BytesRead;
|
||||
}
|
||||
|
||||
return TotalRead;
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
@@ -986,16 +983,15 @@ int CSelectableCore::WriteToFD( int FD, const char * Data, int Len )
|
||||
while (DataRemain)
|
||||
{
|
||||
// Read from file descriptor
|
||||
BytesWritten = write( FD, Data, DataRemain );
|
||||
//write( FD, "\n", 1 );
|
||||
if (BytesWritten <= 0)
|
||||
BytesWritten = write( FD, &Data[TotalWritten], DataRemain );
|
||||
if ((BytesWritten <= 0) && (errno != EAGAIN)) {
|
||||
break;
|
||||
}
|
||||
|
||||
// Update Data Pointers
|
||||
TotalWritten += BytesWritten;
|
||||
DataRemain -= BytesWritten;
|
||||
}
|
||||
|
||||
return TotalWritten;
|
||||
}
|
||||
//---------------------------------------------------------------------------
|
||||
@@ -1106,8 +1102,16 @@ bool CSelectableCore::Process()
|
||||
// Auto manage handles
|
||||
if (Handle->Auto && ((Handle->State == csNone) || (Handle->State == csFailed) || (Handle->State == csClosed)))
|
||||
{
|
||||
// Complete opening process
|
||||
Open( Handle );
|
||||
// Check duration since last PortIn
|
||||
if (Timeout( Handle->ReopenStart, Handle->ReopenTimeout ))
|
||||
{
|
||||
// Complete opening process
|
||||
if (Open( Handle ) == -1)
|
||||
{
|
||||
// Reset Timer
|
||||
SetStartTime( &(Handle->ReopenStart) );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check Input buffers
|
||||
|
||||
Reference in New Issue
Block a user