7 #include "PlusConfigure.h" 8 #include "igtlCommandMessage.h" 9 #include "igtlCommon.h" 10 #include "igtlMessageHeader.h" 11 #include "igtlOSUtil.h" 12 #include "igtlServerSocket.h" 13 #include "vtkMultiThreader.h" 17 #include "vtkIGSIORecursiveCriticalSection.h" 18 #include "vtkXMLUtilities.h" 28 , DataReceiverActive(std::make_pair(false, false))
29 , DataReceiverThreadId(-1)
30 , Threader(vtkSmartPointer<vtkMultiThreader>::New())
31 , Mutex(vtkSmartPointer<vtkIGSIORecursiveCriticalSection>::New())
32 , SocketMutex(vtkSmartPointer<vtkIGSIORecursiveCriticalSection>::New())
33 , ClientSocket(
igtl::ClientSocket::New())
34 , LastGeneratedCommandId(0)
37 , ServerIGTLVersion(IGTL_HEADER_VERSION_1)
50 const double retryDelaySec = 1.0;
52 double startTimeSec = vtkIGSIOAccurateTimer::GetSystemTime();
53 while (errorCode != 0)
56 if (vtkIGSIOAccurateTimer::GetSystemTime() - startTimeSec > timeoutSec)
61 vtkIGSIOAccurateTimer::DelayWithEventProcessing(retryDelaySec);
66 LOG_ERROR(
"Cannot connect to the server.");
69 LOG_TRACE(
"Client successfully connected to server.");
86 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->
SocketMutex);
97 vtkIGSIOAccurateTimer::Delay(0.2);
109 vtkSmartPointer<vtkXMLDataElement> cmdConfig = vtkSmartPointer<vtkXMLDataElement>::New();
111 std::ostringstream xmlStr;
112 vtkXMLUtilities::FlattenElement(cmdConfig, xmlStr);
115 std::ostringstream commandUidStringStream;
118 igtlUint32 commandUid;
119 if (command->
GetId())
121 commandUid = command->
GetId();
125 if (igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion()) < IGTL_HEADER_VERSION_2)
128 commandUid = vtkIGSIOAccurateTimer::GetUniversalTime();
139 std::ostringstream deviceNameSs;
140 if (igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion()) >= IGTL_HEADER_VERSION_2)
143 deviceNameSs <<
"PlusClient_" << PLUSLIB_VERSION;
148 commandUidStringStream << commandUid;
153 igtl::MessageBase::Pointer
message;
154 if (igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion()) < IGTL_HEADER_VERSION_2)
156 igtl::StringMessage::Pointer strMsg = dynamic_cast<igtl::StringMessage*>(this->
IgtlMessageFactory->CreateSendMessage(
"STRING", igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion())).GetPointer());
157 strMsg->SetDeviceName(deviceNameSs.str().c_str());
158 std::string xmlString = xmlStr.str();
159 strMsg->SetString(xmlString.c_str());
165 igtl::CommandMessage::Pointer cmdMsg = dynamic_cast<igtl::CommandMessage*>(this->
IgtlMessageFactory->CreateSendMessage(
"COMMAND", igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion())).GetPointer());
166 cmdMsg->SetDeviceName(deviceNameSs.str().c_str());
167 cmdMsg->SetCommandId(commandUid);
168 cmdMsg->SetCommandName(command->GetName());
169 cmdMsg->SetCommandContent(xmlStr.str().c_str());
175 LOG_DEBUG(
"Sending message: " << xmlStr.str());
178 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->
SocketMutex);
183 LOG_ERROR(
"OpenIGTLink client couldn't send command to server.");
194 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->
SocketMutex);
195 success = this->
ClientSocket->Send(packedMessage->GetBufferPointer(), packedMessage->GetBufferSize());
199 LOG_ERROR(
"OpenIGTLink client couldn't send message to server.");
207 std::string& outContent, igtl::MessageBase::MetaDataMap& outParameters,
208 std::string& outCommandName,
double timeoutSec)
210 double startTimeSec = vtkIGSIOAccurateTimer::GetSystemTime();
215 igsioLockGuard<vtkIGSIORecursiveCriticalSection> updateMutexGuardedLock(this->
Mutex);
219 if (
typeid(*message) ==
typeid(igtl::StringMessage))
222 igtl::StringMessage::Pointer strMsg = dynamic_cast<igtl::StringMessage*>(
message.GetPointer());
228 LOG_ERROR(
"Failed to get UID from command device name.");
232 vtkSmartPointer<vtkXMLDataElement> cmdElement = vtkSmartPointer<vtkXMLDataElement>::Take(vtkXMLUtilities::ReadElementFromString(strMsg->GetString()));
233 if (cmdElement == NULL)
235 LOG_ERROR(
"Unable to parse command reply as XML. Skipping.");
238 if (cmdElement->GetAttribute(
"Status") == NULL)
240 LOG_ERROR(
"No status returned. Skipping.");
244 if (cmdElement->GetAttribute(
"Message") == NULL)
246 LOG_ERROR(
"No message returned. Skipping.");
249 outContent = cmdElement->GetAttribute(
"Message");
251 else if (
typeid(*
message) ==
typeid(igtl::RTSCommandMessage))
254 igtl::RTSCommandMessage::Pointer rtsCommandMsg = dynamic_cast<igtl::RTSCommandMessage*>(
message.GetPointer());
256 vtkSmartPointer<vtkXMLDataElement> cmdElement = vtkSmartPointer<vtkXMLDataElement>::Take(vtkXMLUtilities::ReadElementFromString(rtsCommandMsg->GetCommandContent().c_str()));
258 outCommandName = rtsCommandMsg->GetCommandName();
259 outOriginalCommandId = rtsCommandMsg->GetCommandId();
261 XML_FIND_NESTED_ELEMENT_OPTIONAL(resultElement, cmdElement,
"Result");
262 if (resultElement != NULL)
266 XML_FIND_NESTED_ELEMENT_OPTIONAL(errorElement, cmdElement,
"Error");
267 if (!result && errorElement == NULL)
269 LOG_ERROR(
"Server sent error without reason. Notify server developers.");
271 else if (!result && errorElement != NULL)
273 outErrorString = errorElement->GetCharacterData();
275 XML_FIND_NESTED_ELEMENT_REQUIRED(messageElement, cmdElement,
"Message");
276 outContent = messageElement->GetCharacterData();
278 outParameters = rtsCommandMsg->GetMetaData();
280 else if (
typeid(*
message) ==
typeid(igtl::RTSTrackingDataMessage))
282 igtl::RTSTrackingDataMessage* rtsTrackingMsg = dynamic_cast<igtl::RTSTrackingDataMessage*>(
message.GetPointer());
285 outContent = (rtsTrackingMsg->GetStatus() == 0 ?
"SUCCESS" :
"FAILURE");
286 outCommandName =
"RTSTrackingDataMessage";
287 outOriginalCommandId = -1;
294 if (vtkIGSIOAccurateTimer::GetSystemTime() - startTimeSec > timeoutSec)
296 LOG_DEBUG(
"vtkPlusOpenIGTLinkClient::ReceiveReply timeout passed (" << timeoutSec <<
"sec)");
299 vtkIGSIOAccurateTimer::Delay(0.010);
307 this->Superclass::PrintSelf(os, indent);
316 while (self->DataReceiverActive.first)
319 igtl::MessageHeader::Pointer headerMsg =
self->IgtlMessageFactory->CreateHeaderMessage(IGTL_HEADER_VERSION_1);
322 igtlUint64 numOfBytesReceived = 0;
325 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
326 numOfBytesReceived =
self->ClientSocket->Receive(headerMsg->GetBufferPointer(), headerMsg->GetBufferSize(), timeout);
328 if (numOfBytesReceived == 0
329 || numOfBytesReceived != headerMsg->GetPackSize()
333 vtkIGSIOAccurateTimer::Delay(0.1);
337 int c = headerMsg->Unpack(1);
338 if (!(c & igtl::MessageHeader::UNPACK_HEADER))
340 LOG_ERROR(
"Failed to receive reply (invalid header)");
344 if (self->OnMessageReceived(headerMsg.GetPointer()))
350 igtl::MessageBase::Pointer bodyMsg =
self->IgtlMessageFactory->CreateReceiveMessage(headerMsg);
351 if (bodyMsg.IsNull())
353 LOG_ERROR(
"Unable to create message of type: " << headerMsg->GetMessageType());
358 if ((
typeid(*bodyMsg) ==
typeid(igtl::StringMessage)
360 ||
typeid(*bodyMsg) ==
typeid(igtl::RTSCommandMessage))
362 bodyMsg->SetMessageHeader(headerMsg);
363 bodyMsg->AllocateBuffer();
366 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
367 self->ClientSocket->Receive(bodyMsg->GetBufferBodyPointer(), bodyMsg->GetBufferBodySize(), timeout);
370 int c = bodyMsg->Unpack(1);
371 if (!(c & igtl::MessageHeader::UNPACK_BODY))
373 LOG_ERROR(
"Failed to receive reply (invalid body)");
378 igsioLockGuard<vtkIGSIORecursiveCriticalSection> updateMutexGuardedLock(self->Mutex);
379 self->Replies.push_back(bodyMsg);
382 else if (
typeid(*bodyMsg) ==
typeid(igtl::RTSTrackingDataMessage))
384 bodyMsg->SetMessageHeader(headerMsg);
385 bodyMsg->AllocateBuffer();
388 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
389 self->ClientSocket->Receive(bodyMsg->GetBufferBodyPointer(), bodyMsg->GetBufferBodySize(), timeout);
392 int c = bodyMsg->Unpack(1);
393 if (!(c & igtl::MessageHeader::UNPACK_BODY))
395 LOG_ERROR(
"Failed to receive reply (invalid body)");
400 igsioLockGuard<vtkIGSIORecursiveCriticalSection> updateMutexGuardedLock(self->Mutex);
401 self->Replies.push_back(bodyMsg);
407 LOG_TRACE(
"Received message: " << headerMsg->GetMessageType() <<
" (not processed)");
409 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
410 self->ClientSocket->Skip(headerMsg->GetBodySizeToRead(), 0);
416 self->DataReceiverThreadId = -1;
417 self->DataReceiverActive.second =
false;
425 igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->
SocketMutex);
virtual PlusStatus WriteConfiguration(vtkXMLDataElement *aConfig)
vtkPlusOpenIGTLinkClient()
This is an abstract superclass for commands in the OpenIGTLink network interface for Plus.
igtlUint64 SocketReceive(void *data, igtlUint64 length)
virtual ~vtkPlusOpenIGTLinkClient()
PlusStatus SendMessage(igtl::MessageBase::Pointer packedMessage)
igtlUint32 LastGeneratedCommandId
vtkSmartPointer< vtkIGSIORecursiveCriticalSection > Mutex
PlusStatus Connect(double timeoutSec=-1)
vtkStandardNewMacro(vtkPlusOpenIGTLinkClient)
Factory class of supported OpenIGTLink message types.
static void * DataReceiverThread(vtkMultiThreader::ThreadInfo *data)
virtual void PrintSelf(ostream &os, vtkIndent indent)
static std::string GetUidFromCommandDeviceName(const std::string &deviceName)
vtkSmartPointer< vtkPlusIgtlMessageFactory > IgtlMessageFactory
static const float CLIENT_SOCKET_TIMEOUT_SEC
This class provides a network interface to access Plus functions as an OpenIGTLink client.
vtkSmartPointer< vtkIGSIORecursiveCriticalSection > SocketMutex
std::deque< igtl::MessageBase::Pointer > Replies
static PlusStatus GenerateCommandDeviceName(const std::string &uid, std::string &outDeviceName)
igtl::ClientSocket::Pointer ClientSocket
static bool IsReplyDeviceName(const std::string &deviceName, const std::string &uid=std::string(""))
PlusStatus ReceiveReply(PlusStatus &result, int32_t &outOriginalCommandId, std::string &outErrorString, std::string &outContent, igtl::MessageBase::MetaDataMap &outParameters, std::string &outCommandName, double timeoutSec=0)
vtkSmartPointer< vtkMultiThreader > Threader
std::pair< bool, bool > DataReceiverActive
PlusStatus SendCommand(vtkPlusCommand *command)