PlusLib  2.9.0
Software library for tracked ultrasound image acquisition, calibration, and processing.
vtkPlusOpenIGTLinkClient.cxx
Go to the documentation of this file.
1 /*=Plus=header=begin======================================================
2 Program: Plus
3 Copyright (c) Laboratory for Percutaneous Surgery. All rights reserved.
4 See License.txt for details.
5 =========================================================Plus=header=end*/
6 
7 #include "PlusConfigure.h"
8 #include "igtlCommandMessage.h"
9 #include "igtlCommon.h"
10 #include "igtlMessageHeader.h"
11 #include "igtlOSUtil.h"
12 #include "igtlServerSocket.h"
13 #include "vtkMultiThreader.h"
14 #include "vtkPlusCommand.h"
17 #include "vtkIGSIORecursiveCriticalSection.h"
18 #include "vtkXMLUtilities.h"
19 
21 
23 
24 //----------------------------------------------------------------------------
27  : IgtlMessageFactory(vtkSmartPointer<vtkPlusIgtlMessageFactory>::New())
28  , DataReceiverActive(std::make_pair(false, false))
29  , DataReceiverThreadId(-1)
30  , Threader(vtkSmartPointer<vtkMultiThreader>::New())
31  , Mutex(vtkSmartPointer<vtkIGSIORecursiveCriticalSection>::New())
32  , SocketMutex(vtkSmartPointer<vtkIGSIORecursiveCriticalSection>::New())
33  , ClientSocket(igtl::ClientSocket::New())
34  , LastGeneratedCommandId(0)
35  , ServerPort(-1)
36  , ServerHost("")
37  , ServerIGTLVersion(IGTL_HEADER_VERSION_1)
38 {
39 
40 }
41 
42 //----------------------------------------------------------------------------
44 {
45 }
46 
47 //----------------------------------------------------------------------------
49 {
50  const double retryDelaySec = 1.0;
51  int errorCode = 1;
52  double startTimeSec = vtkIGSIOAccurateTimer::GetSystemTime();
53  while (errorCode != 0)
54  {
55  errorCode = this->ClientSocket->ConnectToServer(this->ServerHost.c_str(), this->ServerPort);
56  if (vtkIGSIOAccurateTimer::GetSystemTime() - startTimeSec > timeoutSec)
57  {
58  // time is up
59  break;
60  }
61  vtkIGSIOAccurateTimer::DelayWithEventProcessing(retryDelaySec);
62  }
63 
64  if (errorCode != 0)
65  {
66  LOG_ERROR("Cannot connect to the server.");
67  return PLUS_FAIL;
68  }
69  LOG_TRACE("Client successfully connected to server.");
70 
71  this->ClientSocket->SetTimeout(CLIENT_SOCKET_TIMEOUT_SEC * 1000);
72 
73  if (this->DataReceiverThreadId < 0)
74  {
75  this->DataReceiverActive.first = true;
76  this->DataReceiverThreadId = this->Threader->SpawnThread((vtkThreadFunctionType)&DataReceiverThread, this);
77  }
78 
79  return PLUS_SUCCESS;
80 }
81 
82 //----------------------------------------------------------------------------
84 {
85  {
86  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->SocketMutex);
87  this->ClientSocket->CloseSocket();
88  }
89 
90  // Stop data receiver thread
91  if (this->DataReceiverThreadId >= 0)
92  {
93  this->DataReceiverActive.first = false;
94  while (this->DataReceiverActive.second)
95  {
96  // Wait until the thread stops
97  vtkIGSIOAccurateTimer::Delay(0.2);
98  }
99  this->DataReceiverThreadId = -1;
100  }
101 
102  return PLUS_SUCCESS;
103 }
104 
105 //----------------------------------------------------------------------------
107 {
108  // Get the XML string
109  vtkSmartPointer<vtkXMLDataElement> cmdConfig = vtkSmartPointer<vtkXMLDataElement>::New();
110  command->WriteConfiguration(cmdConfig);
111  std::ostringstream xmlStr;
112  vtkXMLUtilities::FlattenElement(cmdConfig, xmlStr);
113  xmlStr << std::ends;
114 
115  std::ostringstream commandUidStringStream;
116 
117  // Ensure commandUid is populated
118  igtlUint32 commandUid;
119  if (command->GetId())
120  {
121  commandUid = command->GetId();
122  }
123  else
124  {
125  if (igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion()) < IGTL_HEADER_VERSION_2)
126  {
127  // command UID is not specified, generate one automatically from the timestamp
128  commandUid = vtkIGSIOAccurateTimer::GetUniversalTime();
129  }
130  else
131  {
132  // command UID is not specified, generate one automatically
133  commandUid = LastGeneratedCommandId;
135  }
136  }
137 
138  // Generate the device name
139  std::ostringstream deviceNameSs;
140  if (igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion()) >= IGTL_HEADER_VERSION_2)
141  {
142  // TODO : determine a way of configurable client name
143  deviceNameSs << "PlusClient_" << PLUSLIB_VERSION;
144  }
145  else
146  {
147  std::string deviceName;
148  commandUidStringStream << commandUid;
149  vtkPlusCommand::GenerateCommandDeviceName(commandUidStringStream.str(), deviceName);
150  deviceNameSs << deviceName;
151  }
152 
153  igtl::MessageBase::Pointer message;
154  if (igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion()) < IGTL_HEADER_VERSION_2)
155  {
156  igtl::StringMessage::Pointer strMsg = dynamic_cast<igtl::StringMessage*>(this->IgtlMessageFactory->CreateSendMessage("STRING", igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion())).GetPointer());
157  strMsg->SetDeviceName(deviceNameSs.str().c_str());
158  std::string xmlString = xmlStr.str();
159  strMsg->SetString(xmlString.c_str());
160  strMsg->Pack();
161  message = strMsg;
162  }
163  else
164  {
165  igtl::CommandMessage::Pointer cmdMsg = dynamic_cast<igtl::CommandMessage*>(this->IgtlMessageFactory->CreateSendMessage("COMMAND", igtl::IGTLProtocolToHeaderLookup(this->GetServerIGTLVersion())).GetPointer());
166  cmdMsg->SetDeviceName(deviceNameSs.str().c_str());
167  cmdMsg->SetCommandId(commandUid);
168  cmdMsg->SetCommandName(command->GetName());
169  cmdMsg->SetCommandContent(xmlStr.str().c_str());
170  cmdMsg->Pack();
171  message = cmdMsg;
172  }
173 
174  // Send the string message to the server.
175  LOG_DEBUG("Sending message: " << xmlStr.str());
176  int success = 0;
177  {
178  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->SocketMutex);
179  success = this->ClientSocket->Send(message->GetBufferPointer(), message->GetBufferSize());
180  }
181  if (!success)
182  {
183  LOG_ERROR("OpenIGTLink client couldn't send command to server.");
184  return PLUS_FAIL;
185  }
186  return PLUS_SUCCESS;
187 }
188 
189 //----------------------------------------------------------------------------
190 PlusStatus vtkPlusOpenIGTLinkClient::SendMessage(igtl::MessageBase::Pointer packedMessage)
191 {
192  int success = 0;
193  {
194  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->SocketMutex);
195  success = this->ClientSocket->Send(packedMessage->GetBufferPointer(), packedMessage->GetBufferSize());
196  }
197  if (!success)
198  {
199  LOG_ERROR("OpenIGTLink client couldn't send message to server.");
200  return PLUS_FAIL;
201  }
202  return PLUS_SUCCESS;
203 }
204 
205 //----------------------------------------------------------------------------
206 PlusStatus vtkPlusOpenIGTLinkClient::ReceiveReply(PlusStatus& result, int32_t& outOriginalCommandId, std::string& outErrorString,
207  std::string& outContent, igtl::MessageBase::MetaDataMap& outParameters,
208  std::string& outCommandName, double timeoutSec/*=0*/)
209 {
210  double startTimeSec = vtkIGSIOAccurateTimer::GetSystemTime();
211  while (1)
212  {
213  {
214  // save command reply
215  igsioLockGuard<vtkIGSIORecursiveCriticalSection> updateMutexGuardedLock(this->Mutex);
216  if (!this->Replies.empty())
217  {
218  igtl::MessageBase::Pointer message = this->Replies.front();
219  if (typeid(*message) == typeid(igtl::StringMessage))
220  {
221  // Process the command as v1/v2 string reply
222  igtl::StringMessage::Pointer strMsg = dynamic_cast<igtl::StringMessage*>(message.GetPointer());
223 
224  if (vtkPlusCommand::IsReplyDeviceName(strMsg->GetDeviceName()))
225  {
226  if (igsioCommon::StringToInt<int32_t>(vtkPlusCommand::GetUidFromCommandDeviceName(strMsg->GetDeviceName()).c_str(), outOriginalCommandId) != PLUS_SUCCESS)
227  {
228  LOG_ERROR("Failed to get UID from command device name.");
229  continue;
230  }
231  }
232  vtkSmartPointer<vtkXMLDataElement> cmdElement = vtkSmartPointer<vtkXMLDataElement>::Take(vtkXMLUtilities::ReadElementFromString(strMsg->GetString()));
233  if (cmdElement == NULL)
234  {
235  LOG_ERROR("Unable to parse command reply as XML. Skipping.");
236  continue;
237  }
238  if (cmdElement->GetAttribute("Status") == NULL)
239  {
240  LOG_ERROR("No status returned. Skipping.");
241  continue;
242  }
243  result = std::string(cmdElement->GetAttribute("Status")) == "SUCCESS" ? PLUS_SUCCESS : PLUS_FAIL;
244  if (cmdElement->GetAttribute("Message") == NULL)
245  {
246  LOG_ERROR("No message returned. Skipping.");
247  continue;
248  }
249  outContent = cmdElement->GetAttribute("Message");
250  }
251  else if (typeid(*message) == typeid(igtl::RTSCommandMessage))
252  {
253  // Process the command as v3 RTS_Command
254  igtl::RTSCommandMessage::Pointer rtsCommandMsg = dynamic_cast<igtl::RTSCommandMessage*>(message.GetPointer());
255 
256  vtkSmartPointer<vtkXMLDataElement> cmdElement = vtkSmartPointer<vtkXMLDataElement>::Take(vtkXMLUtilities::ReadElementFromString(rtsCommandMsg->GetCommandContent().c_str()));
257 
258  outCommandName = rtsCommandMsg->GetCommandName();
259  outOriginalCommandId = rtsCommandMsg->GetCommandId();
260 
261  XML_FIND_NESTED_ELEMENT_OPTIONAL(resultElement, cmdElement, "Result");
262  if (resultElement != NULL)
263  {
264  result = STRCASECMP(resultElement->GetCharacterData(), "true") == 0 ? PLUS_SUCCESS : PLUS_FAIL;
265  }
266  XML_FIND_NESTED_ELEMENT_OPTIONAL(errorElement, cmdElement, "Error");
267  if (!result && errorElement == NULL)
268  {
269  LOG_ERROR("Server sent error without reason. Notify server developers.");
270  }
271  else if (!result && errorElement != NULL)
272  {
273  outErrorString = errorElement->GetCharacterData();
274  }
275  XML_FIND_NESTED_ELEMENT_REQUIRED(messageElement, cmdElement, "Message");
276  outContent = messageElement->GetCharacterData();
277 
278  outParameters = rtsCommandMsg->GetMetaData();
279  }
280  else if (typeid(*message) == typeid(igtl::RTSTrackingDataMessage))
281  {
282  igtl::RTSTrackingDataMessage* rtsTrackingMsg = dynamic_cast<igtl::RTSTrackingDataMessage*>(message.GetPointer());
283 
284  result = rtsTrackingMsg->GetStatus() == 0 ? PLUS_SUCCESS : PLUS_FAIL;
285  outContent = (rtsTrackingMsg->GetStatus() == 0 ? "SUCCESS" : "FAILURE");
286  outCommandName = "RTSTrackingDataMessage";
287  outOriginalCommandId = -1;
288  }
289 
290  this->Replies.pop_front();
291  return PLUS_SUCCESS;
292  }
293  }
294  if (vtkIGSIOAccurateTimer::GetSystemTime() - startTimeSec > timeoutSec)
295  {
296  LOG_DEBUG("vtkPlusOpenIGTLinkClient::ReceiveReply timeout passed (" << timeoutSec << "sec)");
297  return PLUS_FAIL;
298  }
299  vtkIGSIOAccurateTimer::Delay(0.010);
300  }
301  return PLUS_FAIL;
302 }
303 
304 //----------------------------------------------------------------------------
305 void vtkPlusOpenIGTLinkClient::PrintSelf(ostream& os, vtkIndent indent)
306 {
307  this->Superclass::PrintSelf(os, indent);
308 }
309 
310 //----------------------------------------------------------------------------
311 void* vtkPlusOpenIGTLinkClient::DataReceiverThread(vtkMultiThreader::ThreadInfo* data)
312 {
314  self->DataReceiverActive.second = true;
315 
316  while (self->DataReceiverActive.first)
317  {
318 
319  igtl::MessageHeader::Pointer headerMsg = self->IgtlMessageFactory->CreateHeaderMessage(IGTL_HEADER_VERSION_1);
320 
321  // Receive generic header from the socket
322  igtlUint64 numOfBytesReceived = 0;
323  {
324  bool timeout(false);
325  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
326  numOfBytesReceived = self->ClientSocket->Receive(headerMsg->GetBufferPointer(), headerMsg->GetBufferSize(), timeout);
327  }
328  if (numOfBytesReceived == 0 // No message received
329  || numOfBytesReceived != headerMsg->GetPackSize() // Received data is not as we expected
330  )
331  {
332  // Failed to receive data, maybe the socket is disconnected
333  vtkIGSIOAccurateTimer::Delay(0.1);
334  continue;
335  }
336 
337  int c = headerMsg->Unpack(1);
338  if (!(c & igtl::MessageHeader::UNPACK_HEADER))
339  {
340  LOG_ERROR("Failed to receive reply (invalid header)");
341  continue;
342  }
343 
344  if (self->OnMessageReceived(headerMsg.GetPointer()))
345  {
346  // The message body is read and processed
347  continue;
348  }
349 
350  igtl::MessageBase::Pointer bodyMsg = self->IgtlMessageFactory->CreateReceiveMessage(headerMsg);
351  if (bodyMsg.IsNull())
352  {
353  LOG_ERROR("Unable to create message of type: " << headerMsg->GetMessageType());
354  continue;
355  }
356 
357  // Only accept string messages if they have a deviceName of the format ACK_xyz
358  if ((typeid(*bodyMsg) == typeid(igtl::StringMessage)
359  && vtkPlusCommand::IsReplyDeviceName(headerMsg->GetDeviceName(), ""))
360  || typeid(*bodyMsg) == typeid(igtl::RTSCommandMessage))
361  {
362  bodyMsg->SetMessageHeader(headerMsg);
363  bodyMsg->AllocateBuffer();
364  {
365  bool timeout(false);
366  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
367  self->ClientSocket->Receive(bodyMsg->GetBufferBodyPointer(), bodyMsg->GetBufferBodySize(), timeout);
368  }
369 
370  int c = bodyMsg->Unpack(1);
371  if (!(c & igtl::MessageHeader::UNPACK_BODY))
372  {
373  LOG_ERROR("Failed to receive reply (invalid body)");
374  continue;
375  }
376  {
377  // save command reply
378  igsioLockGuard<vtkIGSIORecursiveCriticalSection> updateMutexGuardedLock(self->Mutex);
379  self->Replies.push_back(bodyMsg);
380  }
381  }
382  else if (typeid(*bodyMsg) == typeid(igtl::RTSTrackingDataMessage))
383  {
384  bodyMsg->SetMessageHeader(headerMsg);
385  bodyMsg->AllocateBuffer();
386  {
387  bool timeout(false);
388  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
389  self->ClientSocket->Receive(bodyMsg->GetBufferBodyPointer(), bodyMsg->GetBufferBodySize(), timeout);
390  }
391 
392  int c = bodyMsg->Unpack(1);
393  if (!(c & igtl::MessageHeader::UNPACK_BODY))
394  {
395  LOG_ERROR("Failed to receive reply (invalid body)");
396  continue;
397  }
398  {
399  // save command reply
400  igsioLockGuard<vtkIGSIORecursiveCriticalSection> updateMutexGuardedLock(self->Mutex);
401  self->Replies.push_back(bodyMsg);
402  }
403  }
404  else
405  {
406  // if the incoming message is not a reply to a command, we discard it and continue
407  LOG_TRACE("Received message: " << headerMsg->GetMessageType() << " (not processed)");
408  {
409  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(self->SocketMutex);
410  self->ClientSocket->Skip(headerMsg->GetBodySizeToRead(), 0);
411  }
412  }
413  }
414 
415  // Close thread
416  self->DataReceiverThreadId = -1;
417  self->DataReceiverActive.second = false;
418  return NULL;
419 }
420 
421 //----------------------------------------------------------------------------
422 igtlUint64 vtkPlusOpenIGTLinkClient::SocketReceive(void* data, igtlUint64 length)
423 {
424  bool timeout(false);
425  igsioLockGuard<vtkIGSIORecursiveCriticalSection> socketGuard(this->SocketMutex);
426  return ClientSocket->Receive(data, length, timeout);
427 }
const uint32_t * data
Definition: phidget22.h:3971
virtual PlusStatus WriteConfiguration(vtkXMLDataElement *aConfig)
This is an abstract superclass for commands in the OpenIGTLink network interface for Plus.
igsioStatus PlusStatus
Definition: PlusCommon.h:40
igtlUint64 SocketReceive(void *data, igtlUint64 length)
PlusStatus SendMessage(igtl::MessageBase::Pointer packedMessage)
#define PLUS_FAIL
Definition: PlusCommon.h:43
const char ** deviceName
Definition: phidget22.h:1316
#define PLUS_SUCCESS
Definition: PlusCommon.h:44
vtkSmartPointer< vtkIGSIORecursiveCriticalSection > Mutex
PlusStatus Connect(double timeoutSec=-1)
vtkStandardNewMacro(vtkPlusOpenIGTLinkClient)
Factory class of supported OpenIGTLink message types.
static void * DataReceiverThread(vtkMultiThreader::ThreadInfo *data)
virtual void PrintSelf(ostream &os, vtkIndent indent)
static std::string GetUidFromCommandDeviceName(const std::string &deviceName)
vtkSmartPointer< vtkPlusIgtlMessageFactory > IgtlMessageFactory
static const float CLIENT_SOCKET_TIMEOUT_SEC
const char * message
Definition: phidget22.h:2457
This class provides a network interface to access Plus functions as an OpenIGTLink client.
vtkSmartPointer< vtkIGSIORecursiveCriticalSection > SocketMutex
virtual uint32_t GetId()
std::deque< igtl::MessageBase::Pointer > Replies
static PlusStatus GenerateCommandDeviceName(const std::string &uid, std::string &outDeviceName)
igtl::ClientSocket::Pointer ClientSocket
static bool IsReplyDeviceName(const std::string &deviceName, const std::string &uid=std::string(""))
PlusStatus ReceiveReply(PlusStatus &result, int32_t &outOriginalCommandId, std::string &outErrorString, std::string &outContent, igtl::MessageBase::MetaDataMap &outParameters, std::string &outCommandName, double timeoutSec=0)
vtkSmartPointer< vtkMultiThreader > Threader
std::pair< bool, bool > DataReceiverActive
PlusStatus SendCommand(vtkPlusCommand *command)