Important Update:

- SelectableCore:
  - Finish UDP implementation
    - Change UDPsock to UDPserver/client/remote
    - Add OpenUDPserver/client/remote methods
    - Add ReadFrom/WriteToUDP() methods (cannot write to FD)
    - ReadFrom/WriteToFD():
      - return negative bytes on error
      - small delay between consecutive reads/writes
  - Set address correctly on resolve (for UDP or TCP)
  - Improve ReadFrom/WriteToFD() methods (improved timing & error)
  - Improve error reporting on Input/OutputHandle() methods
- CharBufferCore:
  - Standardise ReadFrom/WriteToFD() method
  - Bug fix: Peek/PeekCopy error if Data param not passed
- FileCore
  - Standardise ReadFrom/WriteToFD() method
This commit is contained in:
Charl Wentzel
2018-11-19 19:52:39 +02:00
parent 1e74b9cd60
commit 278198171d
4 changed files with 420 additions and 129 deletions

View File

@@ -182,7 +182,7 @@ int CRollingBuffer::Peek( char ** Data, int PeekPos, int MaxLen )
// Validate // Validate
if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) { if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) {
*Data = NULL; if (Data) *Data = NULL;
return 0; return 0;
} }
@@ -227,7 +227,7 @@ int CRollingBuffer::PeekCopy( char ** Data, int PeekPos, int MaxLen )
// Check if any data // Check if any data
if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) { if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) {
if (*Data) (*Data)[0] = 0; if (Data && *Data) (*Data)[0] = 0;
return 0; return 0;
} }
@@ -410,6 +410,7 @@ int CRollingBuffer::ReadFromFD( int Handle, int MaxRead, bool Overwrite )
int TotalRead = 0; int TotalRead = 0;
int BufRemain = 0; int BufRemain = 0;
int DataRemain = 0; int DataRemain = 0;
bool Error = false;
// Check if buffer created // Check if buffer created
if (!BufSize) { if (!BufSize) {
@@ -426,6 +427,8 @@ int CRollingBuffer::ReadFromFD( int Handle, int MaxRead, bool Overwrite )
BufRemain = BufSize - BufEnd; BufRemain = BufSize - BufEnd;
BytesRead = read( Handle, &Buffer[BufEnd], ((BufRemain > DataRemain)? DataRemain : BufRemain) ); BytesRead = read( Handle, &Buffer[BufEnd], ((BufRemain > DataRemain)? DataRemain : BufRemain) );
if (BytesRead <= 0) { if (BytesRead <= 0) {
Error = true;
errno = (!BytesRead)? 0 : errno; // No error if no bytes written
break; break;
} }
@@ -443,8 +446,12 @@ int CRollingBuffer::ReadFromFD( int Handle, int MaxRead, bool Overwrite )
BufLen = BufSize; BufLen = BufSize;
BufStart = BufEnd; BufStart = BufEnd;
} }
if (DataRemain) {
usleep( 500 );
}
} }
return TotalRead; return (Error)? -TotalRead : TotalRead; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@@ -454,8 +461,9 @@ int CRollingBuffer::WriteToFD( int Handle, int MaxLen )
int BytesWritten = 0; int BytesWritten = 0;
int TotalWritten = 0; int TotalWritten = 0;
int BufRemain = 0; int BufRemain = 0;
int DataRemain = 0; int DataRemain = (MaxLen == -1)? BufLen : MaxLen;;
int ReadPos = 0; int ReadPos = BufStart;
bool Error = false;
// Check if buffer created // Check if buffer created
if (!BufSize) { if (!BufSize) {
@@ -463,14 +471,14 @@ int CRollingBuffer::WriteToFD( int Handle, int MaxLen )
} }
// Read Data into buffer // Read Data into buffer
ReadPos = BufStart;
DataRemain = (MaxLen == -1)? BufLen : MaxLen;
while (DataRemain) while (DataRemain)
{ {
// Read from file descriptor // Read from file descriptor
BufRemain = BufSize - ReadPos; BufRemain = BufSize - ReadPos;
BytesWritten = write( Handle, &Buffer[ReadPos], ((BufRemain > DataRemain)? DataRemain : BufRemain) ); BytesWritten = write( Handle, &Buffer[ReadPos], ((BufRemain > DataRemain)? DataRemain : BufRemain) );
if (BytesWritten <= 0) { if (BytesWritten <= 0) {
Error = true;
errno = (!BytesWritten)? 0 : errno; // No error if no bytes written
break; break;
} }
@@ -483,9 +491,13 @@ int CRollingBuffer::WriteToFD( int Handle, int MaxLen )
if (ReadPos >= BufSize) { if (ReadPos >= BufSize) {
ReadPos = 0; // Rolling over end of buffer, start at beginning ReadPos = 0; // Rolling over end of buffer, start at beginning
} }
if (DataRemain) {
usleep( 500 );
}
} }
return TotalWritten; return (Error)? -TotalWritten : TotalWritten; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@@ -534,7 +546,7 @@ int CShiftBuffer::Peek( char ** Data, int PeekPos, int MaxLen )
// Validate // Validate
if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) { if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) {
*Data = NULL; if (Data) *Data = NULL;
return 0; return 0;
} }
@@ -563,7 +575,7 @@ int CShiftBuffer::PeekCopy( char ** Data, int PeekPos, int MaxLen )
// Check if any data // Check if any data
if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) { if (!BufSize || !Data || !MaxLen || (PeekPos < 0) || (PeekPos > BufLen)) {
if (*Data) (*Data)[0] = 0; if (Data && *Data) (*Data)[0] = 0;
return 0; return 0;
} }
@@ -727,7 +739,8 @@ int CShiftBuffer::ReadFromFD( int Handle, int MaxRead, bool Overwrite )
{ {
int BytesRead = 0; int BytesRead = 0;
int TotalRead = 0; int TotalRead = 0;
int DataRemain = 0; int DataRemain = MaxRead;
bool Error = false;
// Check if buffer created // Check if buffer created
if (!BufSize) { if (!BufSize) {
@@ -743,6 +756,8 @@ int CShiftBuffer::ReadFromFD( int Handle, int MaxRead, bool Overwrite )
// Read from file descriptor // Read from file descriptor
BytesRead = read( Handle, &Buffer[BufLen], DataRemain ); BytesRead = read( Handle, &Buffer[BufLen], DataRemain );
if (BytesRead <= 0) { if (BytesRead <= 0) {
Error = true;
errno = (!BytesRead)? 0 : errno; // No error if no bytes written
break; break;
} }
@@ -753,8 +768,12 @@ int CShiftBuffer::ReadFromFD( int Handle, int MaxRead, bool Overwrite )
// Zero terminate // Zero terminate
Buffer[BufLen] = 0; Buffer[BufLen] = 0;
if (DataRemain) {
usleep( 500 );
}
} }
return TotalRead; return (Error)? -TotalRead : TotalRead; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@@ -764,8 +783,9 @@ int CShiftBuffer::WriteToFD( int Handle, int MaxLen )
int BytesWritten = 0; int BytesWritten = 0;
int TotalWritten = 0; int TotalWritten = 0;
int BufRemain = 0; int BufRemain = 0;
int DataRemain = 0; int DataRemain = ((MaxLen == -1) || (MaxLen > BufLen))? BufLen : MaxLen;
int ReadPos = 0; int ReadPos = 0;
bool Error = false;
// Check if buffer created // Check if buffer created
if (!BufSize) { if (!BufSize) {
@@ -773,14 +793,14 @@ int CShiftBuffer::WriteToFD( int Handle, int MaxLen )
} }
// Read Data into buffer // Read Data into buffer
ReadPos = 0;
DataRemain = ((MaxLen == -1) || (MaxLen > BufLen))? BufLen : MaxLen;
while (DataRemain) while (DataRemain)
{ {
// Read from file descriptor // Read from file descriptor
BufRemain = BufSize - ReadPos; BufRemain = BufSize - ReadPos;
BytesWritten = write( Handle, &Buffer[ReadPos], ((BufRemain > DataRemain)? DataRemain : BufRemain) ); BytesWritten = write( Handle, &Buffer[ReadPos], ((BufRemain > DataRemain)? DataRemain : BufRemain) );
if (BytesWritten <= 0) { if (BytesWritten <= 0) {
Error = true;
errno = (!BytesWritten)? 0 : errno; // No error if no bytes written
break; break;
} }
@@ -793,9 +813,13 @@ int CShiftBuffer::WriteToFD( int Handle, int MaxLen )
if (ReadPos >= BufSize) { if (ReadPos >= BufSize) {
ReadPos = 0; // Rolling over end of buffer, start at beginning ReadPos = 0; // Rolling over end of buffer, start at beginning
} }
if (DataRemain) {
usleep( 500 );
}
} }
return TotalWritten; return (Error)? -TotalWritten : TotalWritten; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------

View File

@@ -185,7 +185,8 @@ int CFileCore::ReadFromFD( int FD, char * Data, int MaxLen )
{ {
int BytesRead = 0; int BytesRead = 0;
int TotalRead = 0; int TotalRead = 0;
int DataRemain = 0; int DataRemain = MaxLen;
bool Error = false;
// Check if buffer created // Check if buffer created
if ((FD == -1) || !Data) { if ((FD == -1) || !Data) {
@@ -193,20 +194,26 @@ int CFileCore::ReadFromFD( int FD, char * Data, int MaxLen )
} }
// Read Data into buffer // Read Data into buffer
DataRemain = (MaxLen == -1)? strlen(Data) : MaxLen;
while (DataRemain) while (DataRemain)
{ {
// Read from file descriptor // Read from file descriptor
BytesRead = read( FD, &Data[TotalRead], DataRemain ); BytesRead = read( FD, &Data[TotalRead], DataRemain );
if (BytesRead <= 0) if (BytesRead <= 0) {
Error = true;
errno = (!BytesRead)? 0 : errno; // No error if no bytes written
break; break;
}
// Update Data Pointers // Update Data Pointers
TotalRead += BytesRead; TotalRead += BytesRead;
DataRemain -= BytesRead; DataRemain -= BytesRead;
if (DataRemain) {
usleep( 500 );
}
} }
return TotalRead; return (Error)? -TotalRead : TotalRead; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@@ -214,28 +221,34 @@ int CFileCore::WriteToFD( int FD, const char * Data, int Len )
{ {
int BytesWritten = 0; int BytesWritten = 0;
int TotalWritten = 0; int TotalWritten = 0;
int DataRemain = 0; int DataRemain = (Len != -1)? Len : (Data)? strlen(Data) : 0;
bool Error = false;
// Check if buffer created // Check if buffer created
if ((FD == -1) || !Data) { if ((FD == -1) || !DataRemain) {
return 0; return 0;
} }
// Read Data into buffer // Read Data into buffer
DataRemain = (Len == -1)? strlen(Data) : Len;
while (DataRemain) while (DataRemain)
{ {
// Read from file descriptor // Read from file descriptor
BytesWritten = write( FD, &Data[TotalWritten], DataRemain ); BytesWritten = write( FD, &Data[TotalWritten], DataRemain );
if ((BytesWritten <= 0) && (errno != EAGAIN)) if ((BytesWritten <= 0) && (errno != EAGAIN)) {
Error = true;
errno = (!BytesWritten)? 0 : errno; // No error if no bytes written
break; break;
}
// Update Data Pointers // Update Data Pointers
TotalWritten += BytesWritten; TotalWritten += BytesWritten;
DataRemain -= BytesWritten; DataRemain -= BytesWritten;
}
return TotalWritten; if (DataRemain) {
usleep( 500 );
}
}
return (Error)? -TotalWritten : TotalWritten; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------

View File

@@ -170,7 +170,7 @@ bool CSelectableCore::LoadConfigData()
} }
SetUnixHandle( Handle, ctUNIXclient, Address ); SetUnixHandle( Handle, ctUNIXclient, Address );
} }
else if (!strcasecmp( Type, "UDP" )) else if (!strcasecmp( Type, "UDPserver" ))
{ {
if ((Name = (char*)TempMember->GetMemStr( "Socket/Name", NULL ))) { if ((Name = (char*)TempMember->GetMemStr( "Socket/Name", NULL ))) {
sprintf( Path, "Address/%s/Address", Name ); sprintf( Path, "Address/%s/Address", Name );
@@ -183,7 +183,22 @@ bool CSelectableCore::LoadConfigData()
Port = (char*)TempMember->GetMemStr( "Socket/Port", "0", true ); // Get default Port value Port = (char*)TempMember->GetMemStr( "Socket/Port", "0", true ); // Get default Port value
} }
Delay = TempMember->GetMemInt( "Socket/ResolveDelay", 0, true ); Delay = TempMember->GetMemInt( "Socket/ResolveDelay", 0, true );
SetSocketHandle( Handle, ctUDPsock, Address, strlcase(Port), Delay ); SetSocketHandle( Handle, ctUDPserver, Address, strlcase(Port), Delay );
}
else if (!strcasecmp( Type, "UDPclient" ))
{
if ((Name = (char*)TempMember->GetMemStr( "Socket/Name", NULL ))) {
sprintf( Path, "Address/%s/Address", Name );
Address = (char*)DataTree->GetMemStr( Path, NULL, true ); // Get AddressList Address value
sprintf( Path, "Address/%s/Port", Name );
Port = (char*)DataTree->GetMemStr( Path, "0", true ); // Get AddressList Port value
}
else {
Address = (char*)TempMember->GetMemStr( "Socket/Address", NULL, true ); // Get default Address value
Port = (char*)TempMember->GetMemStr( "Socket/Port", "0", true ); // Get default Port value
}
Delay = TempMember->GetMemInt( "Socket/ResolveDelay", 0, true );
SetSocketHandle( Handle, ctUDPclient, Address, strlcase(Port), Delay );
} }
else if (!strcasecmp( Type, "TCPserver" )) else if (!strcasecmp( Type, "TCPserver" ))
{ {
@@ -465,7 +480,7 @@ bool CSelectableCore::SetSocketHandle( THandle * Handle, EConnectType Type, con
{ {
// Validate // Validate
if (!Handle || !HostName || !PortName || if (!Handle || !HostName || !PortName ||
!((Type == ctUDPsock) || (Type == ctTCPserver) || (Type == ctTCPclient) || (Type == ctTCPremote)) || !((Type == ctUDPserver) || (Type == ctUDPclient) || (Type == ctUDPremote) || (Type == ctTCPserver) || (Type == ctTCPclient) || (Type == ctTCPremote)) ||
!((Handle->Type == ctNone) || (Handle->Type == Type)) ) { !((Handle->Type == ctNone) || (Handle->Type == Type)) ) {
return false; return false;
} }
@@ -1098,8 +1113,13 @@ bool CSelectableCore::ResolveAddress( THandle * Handle, bool DelayResolve )
// Set address specification // Set address specification
memset( &hints, 0, sizeof hints ); memset( &hints, 0, sizeof hints );
hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6 if ((Handle->Type == ctTCPserver) || (Handle->Type == ctTCPclient)) {
hints.ai_socktype = SOCK_STREAM; hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
hints.ai_socktype = SOCK_STREAM;
} else {
hints.ai_family = AF_UNSPEC; // use AF_INET6 to force IPv6
hints.ai_socktype = SOCK_DGRAM;
}
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Resolving Host name [%s:%s]...", if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Resolving Host name [%s:%s]...",
Name, Handle->Name, Handle->HostName, Handle->PortName ); Name, Handle->Name, Handle->HostName, Handle->PortName );
@@ -1145,18 +1165,10 @@ bool CSelectableCore::ResolveAddress( THandle * Handle, bool DelayResolve )
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
int CSelectableCore::OpenUDPsocket( THandle * Handle, bool DelayResolve ) int CSelectableCore::OpenUDPserverSocket( THandle * Handle, bool DelayResolve )
{ {
// Socket options
struct linger ServerLinger_opt;
ServerLinger_opt.l_onoff = 1;
ServerLinger_opt.l_linger = 5;
int Reuse_opt = 1;
int KeepAlive_opt = 1;
// Validate Handle // Validate Handle
if (Handle->Type != ctUDPsock) { if (Handle->Type != ctUDPserver) {
return false; return false;
} }
@@ -1168,25 +1180,13 @@ int CSelectableCore::OpenUDPsocket( THandle * Handle, bool DelayResolve )
if ((Handle->FD = socket( Handle->AddressInfo->ai_family, Handle->AddressInfo->ai_socktype, Handle->AddressInfo->ai_protocol )) < 0) if ((Handle->FD = socket( Handle->AddressInfo->ai_family, Handle->AddressInfo->ai_socktype, Handle->AddressInfo->ai_protocol )) < 0)
{ {
// Log Event // Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Failed to create UDP Server socket [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Failed to create UDP socket [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) );
// Set state // Set state
ChangeState( Handle, csFailed ); ChangeState( Handle, csFailed );
return -1; return -1;
}; };
// Configure connection
if ((setsockopt( Handle->FD, SOL_SOCKET, SO_LINGER, &ServerLinger_opt, sizeof(ServerLinger_opt)) == -1) ||
(setsockopt( Handle->FD, SOL_SOCKET, SO_REUSEADDR, &Reuse_opt, sizeof(Reuse_opt)) == -1))
{
// Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Could not set socket options [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) );
// Set state
ChangeState( Handle, csFailed );
return -1;
}
// Set non-blocking flag // Set non-blocking flag
int flags = fcntl( Handle->FD, F_GETFL, 0 ); int flags = fcntl( Handle->FD, F_GETFL, 0 );
fcntl( Handle->FD, F_SETFL, flags | O_NONBLOCK ); fcntl( Handle->FD, F_SETFL, flags | O_NONBLOCK );
@@ -1195,21 +1195,7 @@ int CSelectableCore::OpenUDPsocket( THandle * Handle, bool DelayResolve )
if (bind( Handle->FD, Handle->AddressInfo->ai_addr, Handle->AddressInfo->ai_addrlen ) < 0) if (bind( Handle->FD, Handle->AddressInfo->ai_addr, Handle->AddressInfo->ai_addrlen ) < 0)
{ {
// Log Event // Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Failed to bind UDP Server socket [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Failed to bind UDP socket [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) );
// Set state
close( Handle->FD );
Handle->FD = -1;
ChangeState( Handle, csFailed );
Handle->AddressFailed = true;
return -1;
};
// Create que for 5 connections
if (listen( Handle->FD, 5 ) < 0)
{
// Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Failed to listen on UDP Server socket [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) );
// Set state // Set state
close( Handle->FD ); close( Handle->FD );
@@ -1220,7 +1206,7 @@ int CSelectableCore::OpenUDPsocket( THandle * Handle, bool DelayResolve )
}; };
// Log Event // Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Server binded and listening [%s:%s]", Name, Handle->Name, Handle->HostName, Handle->PortName ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP socket binded and listening [%s:%s]", Name, Handle->Name, Handle->HostName, Handle->PortName );
// Add to Select Lists // Add to Select Lists
if (Selector) { if (Selector) {
@@ -1233,6 +1219,101 @@ int CSelectableCore::OpenUDPsocket( THandle * Handle, bool DelayResolve )
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
int CSelectableCore::OpenUDPremoteSocket( THandle * Handle, char * ClientAddress, char * ClientPort )
{
THandle ** RemoteClient;
int ClientCount;
char ClientName[100];
// Validate
if (!Handle || (Handle->Type != ctUDPserver)) {
return -1;
}
// Check if Remote client already exists
ClientCount = 1;
RemoteClient = &FirstHandle;
while (*RemoteClient && (strcmp((*RemoteClient)->HostName, ClientAddress) || strcmp((*RemoteClient)->PortName, ClientPort))) {
RemoteClient = &((*RemoteClient)->Next);
ClientCount++;
}
if (*RemoteClient) {
return (*RemoteClient)->FD;
}
// Create Remote Client Handle
sprintf( ClientName, "%s-%d", Handle->Name, ClientCount );
*RemoteClient = CreateHandle( ClientName, false );
if (!SetSocketHandle( *RemoteClient, ctUDPremote, ClientAddress, ClientPort, 0 )) {
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Server failed to configure Remote UDP Client connection (%s)", Name, Handle->Name, strerror(errno) );
return -1;
}
// Copy Parent Buffer setup
SetInBuffer( *RemoteClient, ((Handle->InBuffer)? Handle->InBuffer->Size() : 0),
Handle->InTimeout, Handle->InMarker, Handle->InMarkerLen );
SetOutBuffer( *RemoteClient, ((Handle->OutBuffer)? Handle->OutBuffer->Size() : 0) );
// Set Key parameters
(*RemoteClient)->FD = Handle->FD;
(*RemoteClient)->Parent = Handle;
(*RemoteClient)->State = csOpen;
// Reset Timer
SetStartTime( &((*RemoteClient)->LastAction) );
// Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Server accepted Remote UDP Client connection [%s:%s]", Name, Handle->Name, ClientAddress, ClientPort );
// Add to Select Lists
if (Selector) {
Selector->Add( (*RemoteClient)->FD, true, true, *RemoteClient, this );
}
return (*RemoteClient)->FD;
}
//---------------------------------------------------------------------------
int CSelectableCore::OpenUDPclientSocket( THandle * Handle, bool DelayResolve )
{
// Check state
if (Handle->State == csOpen) {
// Already open
return Handle->FD;
}
// Resolve IP Address
if (!ResolveAddress( Handle, DelayResolve ))
return -1;
// Create File descriptor
if ((Handle->FD = socket( Handle->AddressInfo->ai_family, Handle->AddressInfo->ai_socktype, Handle->AddressInfo->ai_protocol )) < 0)
{
// Log Event
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Failed to create UDP Client socket [%s:%s] (%s)", Name, Handle->Name, Handle->HostName, Handle->PortName, strerror(errno) );
// Set Status
ChangeState( Handle, csFailed );
return -1;
};
// Set Non blocking open
int flags = fcntl( Handle->FD, F_GETFL, 0 );
fcntl( Handle->FD, F_SETFL, O_NONBLOCK|flags );
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Client ready [%s:%s]", Name, Handle->Name, Handle->HostName, Handle->PortName );
// Add to Select Lists
if (Selector) {
Selector->Add( Handle->FD, true, true, Handle, this );
}
// Set status
ChangeState( Handle, csOpen );
return Handle->FD;
}
//---------------------------------------------------------------------------
int CSelectableCore::OpenTCPserverSocket( THandle * Handle, bool DelayResolve ) int CSelectableCore::OpenTCPserverSocket( THandle * Handle, bool DelayResolve )
{ {
// Socket options // Socket options
@@ -1567,11 +1648,11 @@ int CSelectableCore::Open( THandle * Handle, bool DelayResolve )
case ctUNIXclient : case ctUNIXclient :
FD = OpenUNIXclientSocket( Handle ); FD = OpenUNIXclientSocket( Handle );
break; break;
case ctUNIXremote : case ctUDPserver :
FD = OpenUNIXremoteSocket( Handle ); FD = OpenUDPserverSocket( Handle, DelayResolve );
break; break;
case ctUDPsock : case ctUDPclient :
FD = OpenUDPsocket( Handle, DelayResolve ); FD = OpenUDPclientSocket( Handle, DelayResolve );
break; break;
case ctTCPserver : case ctTCPserver :
FD = OpenTCPserverSocket( Handle, DelayResolve ); FD = OpenTCPserverSocket( Handle, DelayResolve );
@@ -1579,9 +1660,6 @@ int CSelectableCore::Open( THandle * Handle, bool DelayResolve )
case ctTCPclient : case ctTCPclient :
FD = OpenTCPclientSocket( Handle, DelayResolve ); FD = OpenTCPclientSocket( Handle, DelayResolve );
break; break;
case ctTCPremote :
FD = OpenTCPremoteSocket( Handle );
break;
default: default:
FD = -1; FD = -1;
} }
@@ -1604,7 +1682,7 @@ bool CSelectableCore::Close( THandle * Handle, bool QuickReopen )
return false; return false;
// Close Children // Close Children
if ((Handle->Type == ctTCPserver) || (Handle->Type == ctUNIXserver)) if ((Handle->Type == ctTCPserver) || (Handle->Type == ctUDPserver) || (Handle->Type == ctUNIXserver))
{ {
ChildHandle = FirstHandle; ChildHandle = FirstHandle;
while (ChildHandle) while (ChildHandle)
@@ -1625,7 +1703,11 @@ bool CSelectableCore::Close( THandle * Handle, bool QuickReopen )
} }
// Close Handle // Close Handle
Fail = (close( Handle->FD ))? true : false; if (Handle->Type == ctUDPremote) {
Fail = false;
} else {
Fail = (close( Handle->FD ))? true : false;
}
ChangeState( Handle, ((Fail)? csFailed : csClosed) ); ChangeState( Handle, ((Fail)? csFailed : csClosed) );
// Start timer (for re-open) // Start timer (for re-open)
@@ -1657,15 +1739,23 @@ bool CSelectableCore::Close( THandle * Handle, bool QuickReopen )
break; break;
case ctUNIXclient: case ctUNIXclient:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UNIX Client connection %s [%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->Path ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UNIX Client %s [%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->Path );
break; break;
case ctUNIXremote: case ctUNIXremote:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UNIX Remote Client connection %s [%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->Path ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UNIX Remote Client %s [%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->Path );
break; break;
case ctUDPsock: case ctUDPserver:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Socket connection %s [%s:%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName, Handle->PortName ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Server %s [%s:%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName, Handle->PortName );
break;
case ctUDPclient:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Client %s [%s:%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName, Handle->PortName );
break;
case ctUDPremote:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - UDP Remote Client %s [%s:%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName, Handle->PortName );
break; break;
case ctTCPserver: case ctTCPserver:
@@ -1673,11 +1763,11 @@ bool CSelectableCore::Close( THandle * Handle, bool QuickReopen )
break; break;
case ctTCPremote: case ctTCPremote:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - TCP Remote Client connection %s [%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - TCP Remote Client %s [%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName );
break; break;
case ctTCPclient: case ctTCPclient:
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - TCP Client connection %s [%s:%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName, Handle->PortName ); if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - TCP Client %s [%s:%s]", Name, Handle->Name, ((Fail)? "failed" : "closed"), Handle->HostName, Handle->PortName );
break; break;
case ctNone: case ctNone:
@@ -1704,6 +1794,10 @@ bool CSelectableCore::Read( THandle * Handle )
int BytesRead = 0; int BytesRead = 0;
int BytesWaiting = -1; int BytesWaiting = -1;
char * UDPbuffer = NULL;
char UDPaddress[50] = "";
char UDPport[20] = "";
// Validate // Validate
if (!Handle || (Handle->State == csNone) || (Handle->State == csFailed) || (Handle->State == csClosed)) { if (!Handle || (Handle->State == csNone) || (Handle->State == csFailed) || (Handle->State == csClosed)) {
return false; return false;
@@ -1774,6 +1868,37 @@ bool CSelectableCore::Read( THandle * Handle )
return false; return false;
} }
} }
else if ((Handle->Type == ctUDPserver) || (Handle->Type == ctUDPclient) || (Handle->Type == ctUDPremote))
{
// Check if anything to read
ioctl( Handle->FD, FIONREAD, &BytesWaiting );
if (BytesWaiting < 0)
{
if (Log) Log->Message( LogLevel, dlMedium, "%s: Handle '%s' - Data waiting error (%s)", Name, Handle->Name, strerror(errno) );
// Close Handle
Close( Handle, false );
return false;
}
// Read incoming message and address
errno = 0;
UDPbuffer = (char*)malloc( BytesWaiting+1 );
BytesRead = ReadFromUDP( Handle, (char*)UDPaddress, (char*)UDPport, UDPbuffer, BytesWaiting );
if (!errno && (Handle->Type == ctUDPserver))
{
// Create/Find Incoming client
ClientFD = OpenUDPremoteSocket( Handle, UDPaddress, UDPport );
if (ClientFD == -1) {
return false;
}
}
// Reset Timer
SetStartTime( &(Handle->LastAction) );
}
else if (Handle->Type == ctSerial) else if (Handle->Type == ctSerial)
{ {
// Check if anything to read // Check if anything to read
@@ -1811,10 +1936,31 @@ bool CSelectableCore::Read( THandle * Handle )
} }
// Read File directly into buffer // Read File directly into buffer
if (Handle->InBuffer && (BytesRead = Handle->InBuffer->ReadFromFD( Handle->FD, BytesWaiting ))) if (Handle->InBuffer)
{ {
// Process Buffer if ((Handle->Type == ctUDPserver) || (Handle->Type == ctUDPclient) || (Handle->Type == ctUDPremote)) {
ProcessInputBuffer( Handle, false ); if (BytesRead) {
Handle->InBuffer->Push( true, UDPbuffer, abs(BytesRead) );
}
if (UDPbuffer) free( UDPbuffer );
}
else {
errno = 0;
BytesRead = Handle->InBuffer->ReadFromFD( Handle->FD, BytesWaiting );
}
// Report failure
if (errno) {
if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Error reading data [%d/%d] (%s)", Name, Handle->Name, -BytesRead, BytesWaiting, strerror(errno) );
}
else if (BytesRead < BytesWaiting) {
if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Incomplete data read [%d/%d]", Name, Handle->Name, BytesRead, BytesWaiting );
}
if (BytesRead != 0) {
// Process Buffer
ProcessInputBuffer( Handle, false );
}
} }
// Reset timer // Reset timer
@@ -1878,28 +2024,41 @@ bool CSelectableCore::Write( THandle * Handle )
{ {
if (Handle->OutBuffer) if (Handle->OutBuffer)
{ {
// Write to FD directly from output buffer // Write directly to handle / socket
if ((BytesWritten = Handle->OutBuffer->WriteToFD( Handle->FD ))) errno = 0;
{ if ((Handle->Type == ctUDPclient)|| (Handle->Type == ctUDPremote)) {
if (LogLevel >= dlHigh) { Len = Handle->OutBuffer->Peek( &Data );
// Show event BytesWritten = WriteToUDP( Handle, Data, Len, true );
Len = Handle->OutBuffer->Peek( &Data ); }
if (Log) Log->Output( LogLevel, dlHigh, LogOutput, Data, Len, "%s: Handle '%s' - OUT:", Name, Handle->Name ); else {
} BytesWritten = Handle->OutBuffer->WriteToFD( Handle->FD );
// Update Buffer
Handle->OutBuffer->Clear( BytesWritten );
// Reset Timer
SetStartTime( &(Handle->LastAction) );
} }
// Check if Buffer emtpy if (Log) Log->Output( LogLevel, dlHigh, LogOutput, Data, Len, "%s: Handle '%s' - OUT:", Name, Handle->Name );
if (!Handle->OutBuffer->Len()) {
// Add to Select Write list // Report failure
if (Selector) { if (errno) {
Selector->Remove( Handle->FD, false, true ); if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Error sending data [%d/%d] (%s)", Name, Handle->Name, -BytesWritten, Len, strerror(errno) );
}
else if (BytesWritten < Len) {
if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Incomplete data sent [%d/%d]", Name, Handle->Name, BytesWritten, Len );
}
if (BytesWritten != 0)
{
// Update Buffer
Handle->OutBuffer->Clear( (BytesWritten > 0)? BytesWritten : -BytesWritten ); // negative value reported if error occurred
// Check if Buffer empty
if (!Handle->OutBuffer->Len()) {
// Add to Select Write list
if (Selector) {
Selector->Remove( Handle->FD, false, true );
}
} }
// Reset timeout
SetStartTime( &(Handle->LastAction) );
} }
} }
else else
@@ -1972,28 +2131,34 @@ int CSelectableCore::ReadFromFD( int FD, char * Data, int MaxLen )
{ {
int BytesRead = 0; int BytesRead = 0;
int TotalRead = 0; int TotalRead = 0;
int DataRemain = 0; int DataRemain = MaxLen;
bool Error = false;
// Check if buffer created // Check if buffer created
if ((FD == -1) || !Data) { if ((FD == -1) || (MaxLen < 1)) {
return 0; return 0;
} }
// Read Data into buffer // Read Data into buffer
DataRemain = (MaxLen == -1)? strlen(Data) : MaxLen;
while (DataRemain) while (DataRemain)
{ {
// Read from file descriptor // Read from file descriptor
BytesRead = read( FD, &Data[TotalRead], DataRemain ); BytesRead = read( FD, &Data[TotalRead], DataRemain );
if ((BytesRead <= 0)) { if ((BytesRead < 0)) {
Error = true;
errno = (!BytesRead)? 0 : errno; // No error if no bytes written
break; break;
} }
// Update Data Pointers // Update Data Pointers
TotalRead += BytesRead; TotalRead += BytesRead;
DataRemain -= BytesRead; DataRemain -= BytesRead;
if (DataRemain) {
usleep( 500 );
}
} }
return TotalRead; return (Error)? -TotalRead : TotalRead; // Report negative total on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@@ -2001,28 +2166,81 @@ int CSelectableCore::WriteToFD( int FD, const char * Data, int Len, bool Force )
{ {
int BytesWritten = 0; int BytesWritten = 0;
int TotalWritten = 0; int TotalWritten = 0;
int DataRemain = 0; int DataRemain = (Len != -1)? Len : (Data)? strlen(Data) : 0;
bool Error = false;
// Check if buffer created // Check if buffer created
if ((FD == -1) || !Data) { if ((FD == -1) || !DataRemain) {
return 0; return 0;
} }
// Read Data into buffer // Read Data into buffer
DataRemain = (Len == -1)? strlen(Data) : Len;
while (DataRemain) while (DataRemain)
{ {
// Read from file descriptor // Read from file descriptor
BytesWritten = write( FD, &Data[TotalWritten], DataRemain ); BytesWritten = write( FD, &Data[TotalWritten], DataRemain );
if ((BytesWritten <= 0) && (!Force || (errno != EAGAIN))) { if ((BytesWritten <= 0) && (!Force || (errno != EAGAIN))) {
Error = true;
errno = (!BytesWritten)? 0 : errno; // No error if no bytes written
break; break;
} }
// Update Data Pointers // Update Data Pointers
TotalWritten += BytesWritten; TotalWritten += BytesWritten;
DataRemain -= BytesWritten; DataRemain -= BytesWritten;
if (DataRemain) {
usleep( 500 );
}
} }
return TotalWritten; return (Error)? -TotalWritten : TotalWritten; // Report negative total on error
}
//---------------------------------------------------------------------------
int CSelectableCore::ReadFromUDP( THandle * Handle, char * RemoteAddr, char * RemotePort, char * Data, int MaxLen )
{
int BytesRead = 0;
struct sockaddr_in Addr;
socklen_t AddrLen = sizeof(Addr);
// Check if buffer created
if (!Handle || (Handle->FD == -1) || !RemoteAddr || !RemotePort || (MaxLen < 1)) {
return 0;
}
// Get datagram
Addr.sin_family = AF_UNSPEC; // use AF_INET6 to force IPv6
Addr.sin_port = ((struct sockaddr_in *)Handle->AddressInfo->ai_addr)->sin_port;
Addr.sin_addr.s_addr = ((struct sockaddr_in *)Handle->AddressInfo->ai_addr)->sin_addr.s_addr;
BytesRead = recvfrom( Handle->FD, Data, MaxLen, 0, (struct sockaddr *)&Addr, &AddrLen );
if (BytesRead >= 0) {
// Decode address
strcpy( RemoteAddr, inet_ntoa(Addr.sin_addr) );
sprintf( RemotePort, "%d", ntohs(Addr.sin_port) );
}
return (BytesRead < 0)? 0 : BytesRead; // Report zero on error
}
//---------------------------------------------------------------------------
int CSelectableCore::WriteToUDP( THandle * Handle, const char * Data, int Len, bool Force )
{
int BytesWritten = 0;
int DataLen = (Len != -1)? Len : (Data)? strlen(Data) : 0;
// Check if buffer created
if (!Handle || (Handle->FD == -1) || !DataLen) {
return 0;
}
// Set Options
BytesWritten = sendto( Handle->FD, Data, DataLen, MSG_NOSIGNAL /*| ((!Force)? MSG_DONTWAIT : 0)*/,
Handle->AddressInfo->ai_addr, Handle->AddressInfo->ai_addrlen );
return (BytesWritten < 0)? 0 : BytesWritten; // Report zero on error
} }
//--------------------------------------------------------------------------- //---------------------------------------------------------------------------
@@ -2082,6 +2300,7 @@ int CSelectableCore::OutputHandle( THandle * Handle, const char * Data, int Len
{ {
THandle * ChildHandle = NULL; THandle * ChildHandle = NULL;
int BytesWritten = 0; int BytesWritten = 0;
int DataLen = (Len != -1)? Len : (Data)? strlen(Data) : 0;
if ((Handle->State != csOpen)) if ((Handle->State != csOpen))
{ {
@@ -2146,14 +2365,29 @@ int CSelectableCore::OutputHandle( THandle * Handle, const char * Data, int Len
} }
else else
{ {
// Show event // Write directly to handle / socket
errno = 0;
if ((Handle->Type == ctUDPclient)|| (Handle->Type == ctUDPremote)) {
BytesWritten = WriteToUDP( ChildHandle, Data, Len, true );
}
else {
BytesWritten = WriteToFD( ChildHandle->FD, Data, Len, true );
}
if (Log) Log->Output( LogLevel, dlHigh, LogOutput, Data, Len, "%s: Handle '%s' - OUT:", Name, ChildHandle->Name ); if (Log) Log->Output( LogLevel, dlHigh, LogOutput, Data, Len, "%s: Handle '%s' - OUT:", Name, ChildHandle->Name );
// Write directly to handle // Report failure
BytesWritten = WriteToFD( ChildHandle->FD, Data, Len, true ); if (errno) {
if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Error sending data [%d/%d] (%s)", Name, ChildHandle->Name, -BytesWritten, DataLen, strerror(errno) );
}
else if (BytesWritten < DataLen) {
if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Incomplete data sent [%d/%d]", Name, ChildHandle->Name, BytesWritten, DataLen );
}
// Reset Timer if (BytesWritten != 0) {
SetStartTime( &(Handle->LastAction) ); // Reset timeout
SetStartTime( &(ChildHandle->LastAction) );
}
} }
} }
// Next // Next
@@ -2178,13 +2412,27 @@ int CSelectableCore::OutputHandle( THandle * Handle, const char * Data, int Len
} }
else else
{ {
// Show event // Write directly to handle / socket
errno = 0;
if ((Handle->Type == ctUDPclient)|| (Handle->Type == ctUDPremote)) {
BytesWritten = WriteToUDP( Handle, Data, Len, true );
}
else {
BytesWritten = WriteToFD( Handle->FD, Data, Len, true );
}
if (Log) Log->Output( LogLevel, dlHigh, LogOutput, Data, Len, "%s: Handle '%s' - OUT:", Name, Handle->Name ); if (Log) Log->Output( LogLevel, dlHigh, LogOutput, Data, Len, "%s: Handle '%s' - OUT:", Name, Handle->Name );
// Write directly to handle // Report failure
if ((BytesWritten = WriteToFD( Handle->FD, Data, Len, true ))) if (errno) {
{ if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Error sending data [%d/%d] (%s)", Name, Handle->Name, -BytesWritten, DataLen, strerror(errno) );
// Reset Timer }
else if (BytesWritten < DataLen) {
if (Log) Log->Message( LogLevel, dlHigh, "%s: Handle '%s' - Incomplete data sent [%d/%d]", Name, Handle->Name, BytesWritten, DataLen );
}
if (BytesWritten != 0) {
// Reset timeout
SetStartTime( &(Handle->LastAction) ); SetStartTime( &(Handle->LastAction) );
} }
} }

View File

@@ -18,8 +18,9 @@
// Types required for connections // Types required for connections
typedef enum { ctNone = 0, ctSerial = 1, ctLinePrinter = 2, ctForkPipe = 3, ctUNIXserver = 4, ctUNIXclient = 5, ctUNIXremote = 6, typedef enum { ctNone = 0, ctSerial = 1, ctLinePrinter = 2, ctForkPipe = 3, ctUNIXserver = 4, ctUNIXclient = 5, ctUNIXremote = 6,
ctUDPsock = 7, ctTCPserver = 8, ctTCPremote = 9, ctTCPclient = 10 } EConnectType; ctUDPserver = 7, ctUDPclient = 8, ctUDPremote = 9, ctTCPserver = 10, ctTCPremote = 11, ctTCPclient = 12 } EConnectType;
const char ConnectTypeName[][20] = { "None", "Serial", "LinePrinter", "ForkPipe", "UNIXserver", "UNIXclient", "UNIXremote", "UDPsock", "TCPserver", "TCPremote", "TCPclient" }; const char ConnectTypeName[][20] = { "None", "Serial", "LinePrinter", "ForkPipe", "UNIXserver", "UNIXclient", "UNIXremote",
"UDPserver", "UDPclient", "UDPremote", "TCPserver", "TCPremote", "TCPclient" };
typedef enum { csNone = 0, csOpenRequest = 1, csWaitingtoOpen = 2, csOpen = 3, csDataWaiting = 4, csClosed = 5, csFailed = 6 } EConnectState; typedef enum { csNone = 0, csOpenRequest = 1, csWaitingtoOpen = 2, csOpen = 3, csDataWaiting = 4, csClosed = 5, csFailed = 6 } EConnectState;
const char ConnectStateName[][15] = { "None", "OpenRequest", "WaitingToOpen", "Open", "DataWaiting", "Closed", "Failed" }; const char ConnectStateName[][15] = { "None", "OpenRequest", "WaitingToOpen", "Open", "DataWaiting", "Closed", "Failed" };
@@ -218,7 +219,9 @@ protected:
// Socket Operations // Socket Operations
bool ResolveAddress( THandle * Handle, bool DelayResolve ); bool ResolveAddress( THandle * Handle, bool DelayResolve );
int OpenUDPsocket( THandle * Handle, bool DelayResolve ); int OpenUDPserverSocket( THandle * Handle, bool DelayResolve );
int OpenUDPremoteSocket( THandle * Handle, char * RemoteAddr, char * RemotePort );
int OpenUDPclientSocket( THandle * Handle, bool DelayResolve );
int OpenTCPserverSocket( THandle * Handle, bool DelayResolve ); int OpenTCPserverSocket( THandle * Handle, bool DelayResolve );
int OpenTCPremoteSocket( THandle * Handle ); int OpenTCPremoteSocket( THandle * Handle );
int OpenTCPclientSocket( THandle * Handle, bool DelayResolve ); int OpenTCPclientSocket( THandle * Handle, bool DelayResolve );
@@ -227,6 +230,9 @@ protected:
int ReadFromFD( int FD, char * Data, int MaxLen ); int ReadFromFD( int FD, char * Data, int MaxLen );
int WriteToFD( int FD, const char * Data, int Len, bool Force ); int WriteToFD( int FD, const char * Data, int Len, bool Force );
int ReadFromUDP( THandle * Handle, char * RemoteAddr, char * RemotePort, char * Data, int MaxLen );
int WriteToUDP( THandle * Handle, const char * Data, int Len, bool Force );
// Buffer operations // Buffer operations
virtual bool ProcessInputBuffer( THandle * Handle, bool Force ); virtual bool ProcessInputBuffer( THandle * Handle, bool Force );