|
Urbi SDK Remote for C++
2.7.3
|
00001 /* 00002 * Copyright (C) 2005-2011, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00012 00013 #if !defined WIN32 00014 # include <libport/ctime> 00015 # include <libport/csignal> 00016 #endif 00017 00018 #include <boost/lambda/bind.hpp> 00019 00020 #include <libport/boost-error.hh> 00021 #include <libport/format.hh> 00022 00023 #include <urbi/uclient.hh> 00024 #include <urbi/utag.hh> 00025 00026 #include <liburbi/compatibility.hh> 00027 00028 GD_CATEGORY(Urbi.Client); 00029 00030 namespace urbi 00031 { 00032 00033 /*-------------------. 00034 | UClient::options. | 00035 `-------------------*/ 00036 00037 UClient::options::options(bool server) 00038 : server_(server) 00039 // Unless stated otherwise, auto start. 00040 , start_(true) 00041 , asynchronous_(false) 00042 { 00043 } 00044 00045 UCLIENT_OPTION_IMPL(UClient, bool, server) 00046 UCLIENT_OPTION_IMPL(UClient, bool, start) 00047 UCLIENT_OPTION_IMPL(UClient, bool, asynchronous) 00048 00049 /*----------. 00050 | UClient. | 00051 `----------*/ 00052 00053 UClient::UClient(const std::string& host, unsigned port, 00054 size_t buflen, 00055 const options& opt) 00056 : UAbstractClient(host, port, buflen, opt.server()) 00057 , ping_interval_(0) 00058 , pong_timeout_(0) 00059 , link_(new UClient*(this)) 00060 , ping_sent_(libport::utime()) 00061 , ping_sem_(0) 00062 , asynchronous_(opt.asynchronous()) 00063 , synchronous_send_(false) 00064 { 00065 if (opt.start()) 00066 start(); 00067 } 00068 00069 UClient::error_type 00070 UClient::start() 00071 { 00072 return rc = server_ ? listen_() : connect_(); 00073 } 00074 00075 UClient::error_type 00076 UClient::connect_() 00077 { 00078 if (boost::system::error_code erc = connect(host_, port_, false, 0, 00079 asynchronous_)) 00080 { 00081 libport::boost_error(libport::format("UClient::UClient connect(%s, %s)", 00082 host_, port_), 00083 erc); 00084 return -1; 00085 } 00086 else 00087 return 0; 00088 } 00089 00090 UClient::error_type 00091 UClient::listen_() 00092 { 00093 if (boost::system::error_code erc = 00094 listen(boost::bind(&UClient::mySocketFactory, this), host_, port_)) 00095 { 00096 libport::boost_error(libport::format("UClient::UClient listen(%s, %s)", 00097 host_, port_), 00098 erc); 00099 return -1; 00100 } 00101 else 00102 return 0; 00103 } 00104 00105 UClient::~UClient() 00106 { 00107 *link_ = 0; 00108 closeUClient(); 00109 } 00110 00111 UClient::error_type 00112 UClient::onClose() 00113 { 00114 if (!closed_) 00115 UAbstractClient::onClose(); 00116 return !!closed_; 00117 } 00118 00119 UClient::error_type 00120 UClient::closeUClient() 00121 { 00122 close(); 00123 onClose(); 00124 return 0; 00125 } 00126 00127 UClient::error_type 00128 UClient::effectiveSend(const void* buffer, size_t size) 00129 { 00130 if (rc) 00131 return -1; 00132 if (synchronous_send_) 00133 libport::Socket::syncWrite(buffer, size); 00134 else 00135 libport::Socket::write(buffer, size); 00136 return 0; 00137 } 00138 00139 libport::Socket* 00140 UClient::mySocketFactory() 00141 { 00142 return this; 00143 } 00144 00145 void 00146 UClient::onConnect() 00147 { 00148 init_ = true; 00149 onConnection(); 00150 00151 // Declare ping channel for kernel that requires it. Do not try 00152 // to depend on kernelMajor, because it has not been computed yet. 00153 // And computing kernelMajor requires this code to be run. So we 00154 // need to write something that both k1 and k2 will like. 00155 send(SYNCLINE_WRAP( 00156 "if (isdef(Channel))\n" 00157 " var lobby.%s = Channel.new(\"%s\")|;", 00158 internalPongTag, internalPongTag)); 00159 // The folowwing calls may fail if we got disconnected. 00160 try 00161 { 00162 host_ = getRemoteHost(); 00163 port_ = getRemotePort(); 00164 } 00165 catch (const std::exception& e) 00166 { 00167 // Ignore the error, next read attempt will trigger onError. 00168 GD_FINFO_DUMP("ignore std::exception: %s", e.what()); 00169 } 00170 if (ping_interval_) 00171 sendPing(link_); 00172 } 00173 00174 void 00175 UClient::onError(boost::system::error_code erc) 00176 { 00177 rc = -1; 00178 resetAsyncCalls_(); 00179 clientError("!!! " + erc.message()); 00180 notifyCallbacks(UMessage(*this, 0, CLIENTERROR_TAG, 00181 "!!! " + erc.message())); 00182 return; 00183 } 00184 00185 size_t 00186 UClient::onRead(const void* data, size_t length) 00187 { 00188 size_t capacity = recvBufSize - recvBufferPosition - 1; 00189 00190 if (ping_interval_ && ping_sem_.uget(1)) 00191 { 00192 pong_timeout_handler_->cancel(); 00193 send_ping_handler_ = 00194 libport::asyncCall(boost::bind(&UClient::sendPing, 00195 this, link_), 00196 ping_interval_ - (libport::utime() - ping_sent_)); 00197 } 00198 if (capacity < length) 00199 { 00200 size_t nsz = std::max(recvBufSize*2, recvBufferPosition + length+1); 00201 char* nbuf = new char[nsz]; 00202 memcpy(nbuf, recvBuffer, recvBufferPosition); 00203 delete[] recvBuffer; 00204 recvBuffer = nbuf; 00205 recvBufSize = nsz; 00206 } 00207 memcpy(&recvBuffer[recvBufferPosition], data, length); 00208 recvBufferPosition += length; 00209 recvBuffer[recvBufferPosition] = 0; 00210 processRecvBuffer(); 00211 return length; 00212 } 00213 00214 void 00215 UClient::pongTimeout(link_type l) 00216 { 00217 if (*l) 00218 { 00219 const char* err = "!!! Lost connection with server: ping timeout"; 00220 // FIXME: Choose between two differents way to alert user program. 00221 clientError(err); 00222 notifyCallbacks(UMessage(*this, 0, connectionTimeoutTag, err)); 00223 close(); 00224 } 00225 } 00226 00227 void 00228 UClient::sendPing(link_type l) 00229 { 00230 if (*l) 00231 { 00232 pong_timeout_handler_ = 00233 libport::asyncCall(boost::bind(&UClient::pongTimeout, this, link_), 00234 pong_timeout_); 00235 send("%s << 1,", internalPongTag); 00236 ping_sent_ = libport::utime(); 00237 ping_sem_++; 00238 } 00239 } 00240 00241 void 00242 UClient::printf(const char * format, ...) 00243 { 00244 va_list arg; 00245 va_start(arg, format); 00246 vfprintf(stderr, format, arg); 00247 va_end(arg); 00248 } 00249 00250 unsigned int UClient::getCurrentTime() const 00251 { 00252 // FIXME: Put this into libport. 00253 #ifdef WIN32 00254 return GetTickCount(); 00255 #else 00256 struct timeval tv; 00257 gettimeofday(&tv, NULL); 00258 return tv.tv_sec*1000+tv.tv_usec/1000; 00259 #endif 00260 } 00261 00262 void 00263 UClient::setKeepAliveCheck(unsigned ping_interval, 00264 unsigned pong_timeout) 00265 { 00266 // Always interrupt previous ping handler. 00267 resetAsyncCalls_(); 00268 // From milliseconds to microseconds. 00269 ping_interval_ = ping_interval * 1000; 00270 pong_timeout_ = pong_timeout * 1000; 00271 if (ping_interval_) 00272 sendPing(link_); 00273 } 00274 00275 void 00276 UClient::resetAsyncCalls_() 00277 { 00278 if (pong_timeout_handler_) 00279 { 00280 pong_timeout_handler_->cancel(); 00281 pong_timeout_handler_.reset(); 00282 } 00283 if (send_ping_handler_) 00284 { 00285 send_ping_handler_->cancel(); 00286 send_ping_handler_.reset(); 00287 } 00288 } 00289 00290 void 00291 UClient::waitForKernelVersion() const 00292 { 00293 // FIXME: use a condition. 00294 while (kernelMajor_ < 0 && !error()) 00295 sleep(100000); 00296 } 00297 00298 void 00299 UClient::setSynchronousSend(bool enable) 00300 { 00301 synchronous_send_ = enable; 00302 } 00303 00304 00305 00306 /*-----------------------. 00307 | Standalone functions. | 00308 `-----------------------*/ 00309 00310 void execute() 00311 { 00312 while (true) 00313 sleep(100); 00314 } 00315 00316 void exit(int code) 00317 { 00318 ::exit(code); 00319 } 00320 00321 UClient& 00322 connect(const std::string& host) 00323 { 00324 return *new UClient(host); 00325 } 00326 00327 void disconnect(UClient &client) 00328 { 00329 // Asynchronous deletion to let our async handlers terminate. 00330 client.destroy(); 00331 } 00332 00333 } // namespace urbi