|
Urbi SDK Remote for C++
2.7.3
|
00001 /* 00002 * Copyright (C) 2005-2011, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00012 00013 #include <libport/format.hh> 00014 00015 #include <libport/containers.hh> 00016 #include <libport/debug.hh> 00017 #include <libport/escape.hh> 00018 #include <libport/lexical-cast.hh> 00019 00020 #include <urbi/uabstractclient.hh> 00021 #include <urbi/ublend-type.hh> 00022 #include <urbi/uexternal.hh> 00023 #include <urbi/umessage.hh> 00024 #include <urbi/uobject.hh> 00025 #include <urbi/usyncclient.hh> 00026 #include <urbi/uvalue-serialize.hh> 00027 00028 #include <liburbi/compatibility.hh> 00029 00030 #include <libuobject/remote-ucontext-impl.hh> 00031 00032 namespace urbi 00033 { 00034 namespace impl 00035 { 00036 00037 GD_CATEGORY(Urbi.LibUObject); 00038 00040 void 00041 RemoteUVarImpl::initialize(UVar* owner) 00042 { 00043 GD_FINFO_TRACE("RemoteUVarImpl::initialize %s %s", owner->get_name(), 00044 this); 00045 owner_ = owner; 00046 bypass_ = false; 00047 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00048 client_ = ctx->backend_; 00049 LockableOstream* outputStream = ctx->outputStream; 00050 std::string name = owner_->get_name(); 00051 { 00052 libport::BlockLock bl(ctx->tableLock); 00053 UVarTable::callbacks_type& ct = ctx->varmap()[name]; 00054 bool first = ct.empty(); 00055 ct.push_back(owner_); 00056 if (first) 00057 { 00058 value_ = new UValue(); 00059 timestamp_ = new time_t; 00060 } 00061 else 00062 { 00063 RemoteUVarImpl* impl = static_cast<RemoteUVarImpl*>(ct.front()->impl_); 00064 value_ = impl->value_; 00065 timestamp_ = impl->timestamp_; 00066 } 00067 } 00068 URBI_SEND_PIPED_COMMAND_C((*outputStream), "if (!isdef(" << name << ")) var " 00069 << name); 00070 URBI_SEND_PIPED_COMMAND_C 00071 ((*outputStream), 00072 libport::format("external var %s from %s", 00073 owner_->get_name(), ctx->hookPointName())); 00074 ctx->markDataSent(); 00075 } 00076 00077 bool RemoteUVarImpl::setBypass(bool enable) 00078 { 00079 bypass_ = enable; 00080 return true; 00081 } 00082 00084 ufloat& 00085 RemoteUVarImpl::out() 00086 { 00087 return const_cast<ufloat&>(get().val); 00088 } 00089 00091 ufloat& 00092 RemoteUVarImpl::in() 00093 { 00094 return const_cast<ufloat&>(get().val); 00095 } 00096 00097 00098 void 00099 RemoteUVarImpl::setProp(UProperty p, const UValue& v) 00100 { 00101 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00102 LockableOstream* outputStream = ctx->outputStream; 00103 URBI_SEND_PIPED_COMMAND_C((*outputStream), owner_->get_name() << "->" 00104 << urbi::name(p) << " = " << v); 00105 ctx->markDataSent(); 00106 } 00107 00108 void 00109 RemoteUVarImpl::keepSynchronized() 00110 { 00111 //FIXME: do something? 00112 } 00113 00114 UValue 00115 RemoteUVarImpl::getProp(UProperty p) 00116 { 00117 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00118 UMessage* m = ctx->syncGet(owner_->get_name() +"->" 00119 + urbi::name(p)); 00120 if (!m->value) 00121 throw std::runtime_error("Error fetching property on " 00122 + owner_->get_name()); 00123 UValue res = *m->value; 00124 delete m; 00125 return res; 00126 } 00127 00129 void 00130 RemoteUVarImpl::clean() 00131 { 00132 RemoteUContextImpl* ctx = dynamic_cast<RemoteUContextImpl*>(owner_->ctx_); 00133 libport::BlockLock bl(ctx->tableLock); 00134 ctx->varmap().clean(*owner_); 00135 if (ctx->varmap()[owner_->get_name()].empty()) 00136 { 00137 delete value_; 00138 delete timestamp_; 00139 } 00140 } 00141 00142 static 00143 std::string 00144 rtp_id() 00145 { 00146 // Compute once in some thread implementations, each thread has different 00147 // PID. 00148 static std::string res = 00149 libport::format("URTP_%s_%s", getFilteredHostname(), 00150 #ifdef __UCLIBC__ 00151 "default" 00152 #else 00153 getpid() 00154 #endif 00155 ); 00156 return res; 00157 } 00158 00159 static std::string makeLinkName(const std::string& key) 00160 { 00161 // We cannot have '.' in here, but we want to be able to regenerate the 00162 // original key unambiguously, so use something unlikely (as in reserved 00163 // idealy) 00164 std::string res = rtp_id() + "___" + key; 00165 res[res.find_first_of(".")] = '_'; 00166 return res; 00167 } 00168 00169 void 00170 RemoteUContextImpl::makeRTPLink(const std::string& key) 00171 { 00172 /* Setup RTP mode 00173 * We create two instances of the URTP UObject: one local to this 00174 * remote, and one plugged in the engine, and connect them together. 00175 */ 00176 // Spawn a new local RTP instance 00177 std::string localRTP = rtp_id(); 00178 RemoteUContextImpl::objects_type::iterator oi = objects.find(localRTP); 00179 if (oi == objects.end()) 00180 return; 00181 baseURBIStarter* bsa = oi->second->cloner; 00182 std::string linkName = makeLinkName(key); 00183 GD_SINFO_TRACE("Instanciating local RTP " << linkName); 00184 bsa->instanciate(this, linkName); 00185 // Call init 00186 localCall(linkName, "init"); 00187 00188 // Spawn a remote RTP instance and bind it. 00189 // Also destroy it when this remote disconnects. 00190 std::string rLinkName = linkName + "_l"; 00191 URBI_SEND_COMMAND_C 00192 (*outputStream, 00193 libport::format("var %s = URTP.new|\n" 00194 "%s.sourceContext = lobby.uid|\n", 00195 rLinkName, rLinkName)); 00196 // Now asynchronously ask the remote object to listen and to report 00197 // the port number. 00198 GD_SINFO_TRACE("fetching engine listen port..."); 00199 backend_->setCallback( 00200 callback(*this, &RemoteUContextImpl::onRTPListenMessage), 00201 (URBI_REMOTE_RTP_INIT_CHANNEL + key).c_str()); 00202 URBI_SEND_COMMA_COMMAND_C 00203 (*outputStream, 00204 libport::format("Channel.new(\"%s%s\") << %s.listen(\"0.0.0.0\", \"0\")", 00205 URBI_REMOTE_RTP_INIT_CHANNEL, key, rLinkName)); 00206 rtpLinks[key] = 0; // Not ready yet. 00207 } 00208 00209 UCallbackAction RemoteUContextImpl::onRTPListenMessage(const UMessage& mport) 00210 { 00211 // Second stage of RTP initialization: the remote is listening. 00212 if (mport.type != MESSAGE_DATA 00213 || mport.value->type != DATA_DOUBLE) 00214 { 00215 GD_SWARN("Failed to get remote RTP port, disabling RTP"); 00216 enableRTP = false; 00217 return URBI_REMOVE; 00218 } 00219 // Extract key from channel. 00220 std::string key = mport.tag.substr(strlen(URBI_REMOTE_RTP_INIT_CHANNEL), 00221 mport.tag.npos); 00222 // Regenerate link name 00223 std::string linkName = makeLinkName(key); 00224 std::string rLinkName = linkName + "_l"; 00225 // And uvar name 00226 std::string varname = key; 00227 size_t p = varname.find("___"); 00228 if (p != varname.npos) 00229 varname = varname.substr(0, p) + varname.substr(p+3, varname.npos); 00230 int port = int(mport.value->val); 00231 GD_FINFO_TRACE("Finishing RTP init, link %s port %s variable %s", 00232 linkName, port, varname); 00233 // Invoke the connect method on our RTP instance. Having a reference 00234 // to URTP symbols would be painful, so pass through our 00235 // UGenericCallback mechanism. 00236 localCall(linkName, "connect", backend_->getRemoteHost(), port); 00237 UObject* ob = getUObject(linkName); 00238 // Monitor this RTP link. 00239 URBI_SEND_COMMA_COMMAND_C(*outputStream, 00240 "detach('external'.monitorRTP(" << linkName << "," 00241 << rLinkName << ", closure() {'external'.failRTP}))|" 00242 << rLinkName << ".receiveVar(\"" << varname 00243 << "\")"); 00244 rtpLinks[key] = ob; 00245 return URBI_REMOVE; 00246 } 00247 00248 void 00249 RemoteUVarImpl::set(const UValue& v) 00250 { 00251 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00252 libport::utime_t time = libport::utime(); 00253 if (!owner_->get_local()) 00254 transmit(v, time); 00255 // Loopback notification 00256 ctx->assignMessage(owner_->get_name(), v, time, bypass_); 00257 } 00258 00259 void 00260 RemoteUVarImpl::transmitSerialized(const UValue& v, libport::utime_t time) 00261 { 00262 GD_INFO_DEBUG("transmitSerialized"); 00263 char av = UEM_ASSIGNVALUE; 00264 std::string n = owner_->get_name(); 00265 unsigned int tlow = (unsigned int)time; 00266 unsigned int thi = (unsigned int)(time >> 32); 00267 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00268 ctx->backend_->startPack(); 00269 ctx->outputStream->flush(); 00270 *static_cast<RemoteUContextImpl*>(owner_->ctx_)-> 00271 oarchive 00272 << av 00273 << n 00274 << v 00275 << tlow << thi; 00276 client_->flush(); 00277 ctx->backend_->endPack(); 00278 } 00279 00280 void 00281 RemoteUVarImpl::transmit(const UValue& v, libport::utime_t time) 00282 { 00283 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00284 std::string fullname = owner_->get_name(); 00285 size_t pos = fullname.rfind("."); 00286 assert(pos != std::string::npos); 00287 std::string owner = fullname.substr(0, pos); 00288 std::string name = fullname.substr(pos + 1); 00289 GD_FINFO_DUMP("transmit new value for %s", fullname); 00290 bool rtp = false; 00291 if (v.type == DATA_BINARY) 00292 { 00293 std::string localRTP = rtp_id(); 00294 if (ctx->enableRTP && getUObject(localRTP) 00295 && owner_->get_rtp() != UVar::RTP_NO) 00296 { 00297 GD_SINFO_TRACE("Trying RTP mode using " << localRTP); 00298 RemoteUContextImpl::RTPLinks::iterator i 00299 = ctx->rtpLinks.find(owner_->get_name()); 00300 if (i == ctx->rtpLinks.end()) 00301 { 00302 // Initiate rtp link asynchronously 00303 GD_INFO_TRACE("Asynchronous RTP link initialization."); 00304 ctx->makeRTPLink(owner_->get_name()); 00305 goto rtpfail; 00306 } 00307 else if (i->second == 0) 00308 { 00309 GD_INFO_TRACE("RTP link not ready yet, fallback"); 00310 goto rtpfail; // init started, link not ready yet 00311 } 00312 GD_FINFO_TRACE("Link ready, using cache if %s", ctx->rtpSend); 00313 if (ctx->rtpSend) 00314 ctx->rtpSend(i->second, v); 00315 else 00316 ctx->localCall(i->second->__name, "send", v); 00317 rtp = true; 00318 } 00319 rtpfail: 00320 if (!rtp) 00321 { 00322 if (ctx->serializationMode) 00323 transmitSerialized(v, time); 00324 else 00325 { 00326 ctx->backend_->startPack(); 00327 *ctx->outputStream 00328 << owner 00329 << ".getSlot(\"" << libport::escape(name) 00330 << "\").update_timed("; 00331 // Sendbinary is not using the stream, so we must flush. 00332 ctx->outputStream->flush(); 00333 UBinary& b = *(v.binary); 00334 ctx->backend_->sendBinary(b.common.data, b.common.size, 00335 b.getMessage()); 00336 *ctx->outputStream << ", " << time << ")|"; 00337 ctx->backend_->endPack(); 00338 } 00339 } 00340 } 00341 else 00342 { 00343 if (ctx->enableRTP && owner_->get_rtp()) 00344 { 00345 if (!ctx->sharedRTP_) 00346 { 00347 RemoteUContextImpl::RTPLinks::iterator i 00348 = ctx->rtpLinks.find("_shared_"); 00349 if (i == ctx->rtpLinks.end()) 00350 { 00351 GD_INFO_DUMP("Async init of RTP shared link"); 00352 ctx->makeRTPLink("_shared_"); 00353 goto rtpfail2; 00354 } 00355 else if (!i->second) 00356 { 00357 GD_INFO_DUMP("RTP shared link not yet ready"); 00358 goto rtpfail2; 00359 } 00360 ctx->sharedRTP_ = i->second; 00361 } 00362 GD_INFO_DUMP("localCalling sendGrouped"); 00363 if (ctx->rtpSendGrouped) 00364 ctx->rtpSendGrouped(ctx->sharedRTP_, owner_->get_name(), v, time); 00365 else 00366 ctx->localCall(ctx->sharedRTP_->__name, "sendGrouped", 00367 owner_->get_name(), v, time); 00368 rtp = true; 00369 } 00370 rtpfail2: 00371 if (!rtp) 00372 { 00373 if (ctx->serializationMode) 00374 transmitSerialized(v, time); 00375 else 00376 { 00377 ctx->backend_->startPack(); 00378 *ctx->outputStream 00379 << owner 00380 << ".getSlot(\"" << libport::escape(name) 00381 << "\").update_timed("; 00382 if (v.type == DATA_STRING) 00383 (*ctx->outputStream) << "\"" << libport::escape(*v.stringValue, '"') << "\""; 00384 else 00385 *ctx->outputStream << v ; 00386 *ctx->outputStream << ", " << time << ")|"; 00387 ctx->backend_->endPack(); 00388 } 00389 } 00390 } 00391 if (!rtp) 00392 { 00393 ctx->markDataSent(); 00394 } 00395 GD_FINFO_DUMP("transmit new value for %s done", fullname); 00396 } 00397 00398 const UValue& RemoteUVarImpl::get() const 00399 { 00400 return *value_; 00401 }; 00402 00404 void 00405 RemoteUVarImpl::setOwned() 00406 { 00407 owner_->owned = true; 00408 } 00409 00411 UDataType 00412 RemoteUVarImpl::type() const 00413 { 00414 return get().type; 00415 } 00416 00417 void 00418 RemoteUVarImpl::request() 00419 { 00420 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00421 std::string name = owner_->get_name(); 00422 //build a getvalue message that will be parsed and returned by the server 00423 URBI_SEND_PIPED_COMMAND_C((*ctx->outputStream), externalModuleTag << "<<" 00424 <<'[' << UEM_ASSIGNVALUE << "," 00425 << '"' << name << '"' << ',' << name << ']'); 00426 ctx->markDataSent(); 00427 } 00428 00429 void 00430 RemoteUVarImpl::sync() 00431 { 00432 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00433 std::string name = owner_->get_name(); 00434 UMessage* m = ctx->syncGet(name + ".uvalueSerialize"); 00435 if (m->type == MESSAGE_DATA) 00436 value_->set(*m->value); 00437 delete m; 00438 } 00439 00440 time_t 00441 RemoteUVarImpl::timestamp() const 00442 { 00443 return *timestamp_; 00444 } 00445 00446 void RemoteUVarImpl::unnotify() 00447 { 00448 GD_FINFO_TRACE("RemoteUVarImpl::unnotify on %s (%s)", owner_->get_name(), 00449 this); 00450 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00451 std::string name = owner_->get_name(); 00452 size_t p = name.find_first_of("."); 00453 if (p == name.npos) 00454 throw std::runtime_error("unnotify: invalid argument: " + name); 00455 // Each UVar creation and each notifychange causes an 'external 00456 // var' message, so when the UVar dies, creation count is 00457 // callbacks.size +1. 00458 ctx->send(libport::format("UObject.unnotify(\"%s\", \"%s\", %s)|", 00459 name.substr(0, p), name.substr(p+1, name.npos), 00460 callbacks_.size()+1)); 00461 libport::BlockLock bl(ctx->tableLock); 00462 foreach(RemoteUGenericCallbackImpl* c, callbacks_) 00463 { 00464 UTable& t = 00465 dynamic_cast<RemoteUContextImpl*>(c->owner_->ctx_) 00466 ->tableByName(c->owner_->type); 00467 UTable::callbacks_type& ct = t[c->owner_->name]; 00468 UTable::callbacks_type::iterator i = libport::find(ct, c->owner_); 00469 if (i != ct.end()) 00470 ct.erase(i); 00471 owner_->ctx_->addCleanup(c->owner_); // Will clean the impl_ too. 00472 } 00473 callbacks_.clear(); 00474 ctx->markDataSent(); 00475 if (std::list<UVar*> *us = ctx->varmap().find0(name)) 00476 us->remove(owner_); 00477 } 00478 00479 void RemoteUVarImpl::useRTP(bool enable) 00480 { 00481 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00482 std::string name = owner_->get_name(); 00483 size_t p = name.find_first_of("."); 00484 if (p == name.npos) 00485 throw std::runtime_error("invalid argument to useRTP: "+name); 00486 ctx->send(libport::format("%s.getSlot(\"%s\").rtp = %s|", 00487 name.substr(0, p), name.substr(p+1, name.npos), 00488 enable ? "true" : "false")); 00489 ctx->markDataSent(); 00490 } 00491 00492 void RemoteUVarImpl::setInputPort(bool enable) 00493 { 00494 RemoteUContextImpl* ctx = static_cast<RemoteUContextImpl*>(owner_->ctx_); 00495 std::string name = owner_->get_name(); 00496 size_t p = name.find_first_of("."); 00497 if (p == name.npos) 00498 throw std::runtime_error("invalid argument to setInputPort: "+name); 00499 ctx->send(libport::format("%s.getSlot(\"%s\").%s|", 00500 name.substr(0, p), name.substr(p+1, name.npos), 00501 enable 00502 ? "setSlot(\"inputPort\", true)" 00503 : "removeLocalSlot(\"inputPort\")")); 00504 ctx->markDataSent(); 00505 } 00506 00507 } 00508 } //namespace urbi