|
Urbi SDK Remote for C++
2.7.3
|
00001 /* 00002 * Copyright (C) 2005-2011, Gostai S.A.S. 00003 * 00004 * This software is provided "as is" without warranty of any kind, 00005 * either expressed or implied, including but not limited to the 00006 * implied warranties of fitness for a particular purpose. 00007 * 00008 * See the LICENSE file for more information. 00009 */ 00010 00012 00013 #include <algorithm> 00014 #include <libport/cassert> 00015 #include <libport/cerrno> 00016 #include <libport/cmath> 00017 #include <libport/cstdlib> 00018 #include <fstream> 00019 #include <iostream> 00020 00021 #include <libport/format.hh> 00022 #include <libport/io-stream.hh> 00023 #include <libport/lexical-cast.hh> 00024 00025 #include <libport/cstdio> 00026 #include <libport/cstring> 00027 #include <libport/containers.hh> 00028 #include <libport/debug.hh> 00029 #include <libport/escape.hh> 00030 #include <libport/lexical-cast.hh> 00031 #include <libport/lockable.hh> 00032 #include <libport/sys/stat.h> 00033 #include <libport/unistd.h> 00034 #include <libport/windows.hh> 00035 00036 #include <urbi/uabstractclient.hh> 00037 #include <urbi/uconversion.hh> 00038 #include <urbi/umessage.hh> 00039 #include <urbi/utag.hh> 00040 00041 #include <liburbi/compatibility.hh> 00042 00043 GD_CATEGORY(Urbi.Client.Abstract); 00044 00045 namespace urbi 00046 { 00047 00049 const char* tag_error = "[error]"; 00051 const char* tag_wildcard = "[wildcard]"; 00052 00053 std::ostream& 00054 default_stream() 00055 { 00056 return (getDefaultClient() 00057 ? ((UAbstractClient*)getDefaultClient())->stream_get() 00058 : std::cerr); 00059 } 00060 00061 00062 /*-------------. 00063 | UCallbacks. | 00064 `-------------*/ 00065 00066 static UCallbackID nextId; 00067 00068 class UCallbackWrapperCB: public UCallbackWrapper 00069 { 00070 UCallback cb; 00071 public: 00072 UCallbackWrapperCB(UCallback cb) 00073 : cb(cb) 00074 { 00075 } 00076 virtual UCallbackAction operator()(const UMessage& msg) 00077 { 00078 return cb(msg); 00079 } 00080 }; 00081 00082 00083 class UCallbackWrapperCCB: public UCallbackWrapper 00084 { 00085 UCustomCallback cb; 00086 void * data; 00087 public: 00088 UCallbackWrapperCCB(UCustomCallback cb, void* data) 00089 : cb(cb) 00090 , data(data) 00091 { 00092 } 00093 virtual UCallbackAction operator()(const UMessage& msg) 00094 { 00095 return cb(data, msg); 00096 } 00097 }; 00098 00099 00100 /*-------------------. 00101 | UClientStreambuf. | 00102 `-------------------*/ 00103 00104 class UClientStreambuf: public libport::StreamBuffer 00105 { 00106 public: 00107 UClientStreambuf(UAbstractClient* cl) 00108 : client_(cl) 00109 {} 00110 00111 protected: 00112 virtual size_t read(char* buffer, size_t size); 00113 virtual void write(char* buffer, size_t size); 00114 00115 private: 00116 UAbstractClient* client_; 00117 }; 00118 00119 void 00120 UClientStreambuf::write(char* buffer, size_t size) 00121 { 00122 client_->effective_send(buffer, size); 00123 } 00124 00125 size_t 00126 UClientStreambuf::read(char*, size_t) 00127 { 00128 return 0; 00129 } 00130 00131 /*------------------. 00132 | UAbstractClient. | 00133 `------------------*/ 00134 00135 00136 const char* UAbstractClient::CLIENTERROR_TAG = "client_error"; 00137 00138 void 00139 UAbstractClient::bins_clear() 00140 { 00141 for (/* nothing */; !bins.empty(); bins.pop_front()) 00142 bins.front().clear(); 00143 } 00144 00145 inline 00146 bool 00147 matching_tag(const UMessage& msg, const char* tag) 00148 { 00149 return 00150 msg.tag == tag 00151 || (libport::streq(tag, tag_error) && msg.type == MESSAGE_ERROR) 00152 // The wild card does not match tags starting with 00153 // TAG_PRIVATE_PREFIX. 00154 || (libport::streq(tag, tag_wildcard) 00155 && msg.tag.compare(0, 00156 sizeof TAG_PRIVATE_PREFIX - 1, 00157 TAG_PRIVATE_PREFIX)); 00158 } 00159 00160 void 00161 UAbstractClient::notifyCallbacks(const UMessage& msg) 00162 { 00163 libport::BlockLock bl(listLock); 00164 bool inc = true; 00165 for (callbacks_type::iterator it = callbacks_.begin(); 00166 it != callbacks_.end(); 00167 inc ? it++ : it, inc = true) 00168 if (matching_tag(msg, it->tag)) 00169 { 00170 UCallbackAction ua = it->callback(msg); 00171 if (ua == URBI_REMOVE) 00172 { 00173 delete &it->callback; 00174 it = callbacks_.erase(it); 00175 inc = false; 00176 } 00177 } 00178 } 00179 00180 UAbstractClient::UAbstractClient(const std::string& host, 00181 unsigned port, 00182 size_t buflen, 00183 bool server) 00184 : LockableOstream(new UClientStreambuf(this)) 00185 , closed_ (false) 00186 , listLock() 00187 , host_(host) 00188 , port_(port) 00189 , server_(server) 00190 , sendBufSize(buflen) 00191 , recvBufSize(buflen) 00192 , rc(0) 00193 00194 , recvBuffer(new char[buflen]) 00195 , recvBufferPosition(0) 00196 , sendBuffer(new char[buflen]) 00197 00198 , kernelMajor_(-1) 00199 , kernelMinor_(-1) 00200 , binaryBuffer(0) 00201 , parsePosition(0) 00202 , inString(false) 00203 , nBracket(0) 00204 , binaryMode(false) 00205 , system(false) 00206 , init_(true) 00207 , counter_(0) 00208 , stream_(this) 00209 { 00210 exceptions(std::ostream::eofbit | std::ostream::failbit | 00211 std::ostream::badbit); 00212 recvBuffer[0] = 0; 00213 sendBuffer[0] = 0; 00214 } 00215 00216 UAbstractClient::~UAbstractClient() 00217 { 00218 // No more default client if delete. 00219 if ((void*)getDefaultClient() == (void*)this) 00220 setDefaultClient(0); 00221 delete [] recvBuffer; 00222 delete [] sendBuffer; 00223 } 00224 00229 UAbstractClient::error_type 00230 UAbstractClient::startPack() 00231 { 00232 sendBufferLock.lock(); 00233 return 0; 00234 } 00235 00236 UAbstractClient::error_type 00237 UAbstractClient::endPack() 00238 { 00239 error_type res = effective_send(sendBuffer); 00240 sendBuffer[0] = 0; 00241 sendBufferLock.unlock(); 00242 return res; 00243 } 00244 00245 UAbstractClient::error_type 00246 UAbstractClient::send(const char* command, ...) 00247 { 00248 if (rc) 00249 return -1; 00250 va_list arg; 00251 va_start(arg, command); 00252 sendBufferLock.lock(); 00253 rc = vpack(command, arg); 00254 va_end(arg); 00255 if (rc < 0) 00256 { 00257 sendBufferLock.unlock(); 00258 return rc; 00259 } 00260 return rc = endPack(); 00261 } 00262 00263 UAbstractClient::error_type 00264 UAbstractClient::send(const std::string& s) 00265 { 00266 return send("%s", s.c_str()); 00267 } 00268 00269 UAbstractClient::error_type 00270 UAbstractClient::send(const UValue& v) 00271 { 00272 switch (v.type) 00273 { 00274 // Bounce to UValue operator << for those types. 00275 case DATA_DOUBLE: 00276 case DATA_SLOTNAME: 00277 case DATA_STRING: 00278 return send(string_cast(v)); 00279 break; 00280 00281 // Use our own sendBinary for binary, who knows how to talk to k1 and k2. 00282 case DATA_BINARY: 00283 if (v.binary->type != BINARY_NONE 00284 && v.binary->type != BINARY_UNKNOWN) 00285 v.binary->buildMessage(); 00286 return sendBinary(v.binary->common.data, v.binary->common.size, 00287 v.binary->message); 00288 break; 00289 00290 // Lists can contain binary, so recurse using this function. 00291 case DATA_LIST: 00292 send("["); 00293 foreach (const UValue* u, *v.list) 00294 send(libport::format("%s,", u)); 00295 return send("]"); 00296 break; 00297 00298 case DATA_DICTIONARY: 00299 send("["); 00300 if (v.dictionary->empty()) 00301 send("=>"); 00302 else 00303 foreach (const UDictionary::value_type& d, *v.dictionary) 00304 send(libport::format("\"%s\"=>%s,", 00305 libport::escape(d.first), d.second)); 00306 return send("]"); 00307 break; 00308 00309 case DATA_VOID: 00310 break; 00311 }; 00312 return 0; 00313 } 00314 00315 UAbstractClient::error_type 00316 UAbstractClient::send(std::istream& is) 00317 { 00318 if (rc) 00319 return -1; 00320 sendBufferLock.lock(); 00321 while (is.good() && !rc) 00322 { 00323 is.read(sendBuffer, sendBufSize); 00324 rc = effective_send(sendBuffer, is.gcount()); 00325 } 00326 sendBuffer[0] = 0; 00327 sendBufferLock.unlock(); 00328 return rc; 00329 } 00330 00331 00332 00337 UAbstractClient::error_type 00338 UAbstractClient::pack(const char* command, ...) 00339 { 00340 if (rc) 00341 return -1; 00342 va_list arg; 00343 va_start(arg, command); 00344 rc = vpack(command, arg); 00345 va_end(arg); 00346 return rc; 00347 } 00348 00349 00350 UAbstractClient::error_type 00351 UAbstractClient::vpack(const char* command, va_list arg) 00352 { 00353 if (rc) 00354 return -1; 00355 sendBufferLock.lock(); 00356 if (command) 00357 { 00358 // Don't print if we overflow the buffer. It would be nice to 00359 // rely on the behavior of the GNU LibC which accepts 0 as 00360 // destination buffer to query the space needed. But it is not 00361 // portable (e.g., segv on OS X). So rather, try to vsnprintf, 00362 // and upon failure, revert the buffer in its previous state. 00363 size_t slen = strlen(sendBuffer); 00364 size_t msize = sendBufSize - slen; 00365 int r = vsnprintf(sendBuffer + slen, msize, command, arg); 00366 // vsnprintf returns the number of characters to write. Check 00367 // that it fits. Don't forget the ending '\0' that it does not 00368 // count, but wants to add. 00369 if (r < 0 || static_cast<int>(msize) <= r) 00370 { 00371 // Don't produce partial input. 00372 sendBuffer[slen] = 0; 00373 rc = -1; 00374 } 00375 } 00376 sendBufferLock.unlock(); 00377 return rc; 00378 } 00379 00380 00381 UAbstractClient::error_type 00382 UAbstractClient::sendFile(const std::string& f) 00383 { 00384 if (f == "/dev/stdin") 00385 return send(std::cin); 00386 else 00387 { 00388 std::ifstream is(f.c_str(), std::ios::binary); 00389 if (is.fail()) 00390 return -1; 00391 else 00392 return send(is); 00393 } 00394 } 00395 00396 00397 UAbstractClient::error_type 00398 UAbstractClient::sendBin(const void* buffer, size_t len) 00399 { 00400 return sendBin(buffer, len, 0); 00401 } 00402 00403 00404 UAbstractClient::error_type 00405 UAbstractClient::sendBin(const void* buffer, size_t len, 00406 const char* header, ...) 00407 { 00408 if (rc) 00409 return -1; 00410 sendBufferLock.lock(); 00411 if (header) 00412 { 00413 va_list arg; 00414 va_start(arg, header); 00415 vpack(header, arg); 00416 va_end(arg); 00417 effective_send(sendBuffer); 00418 } 00419 00420 error_type res = effective_send(buffer, len); 00421 sendBuffer[0] = 0; 00422 sendBufferLock.unlock(); 00423 return res; 00424 } 00425 00426 UAbstractClient::error_type 00427 UAbstractClient::sendBinary(const void* data, size_t len, 00428 const std::string& header) 00429 { 00430 if (kernelMajor() < 2) 00431 return sendBin(data, len, "BIN %lu %s;", 00432 static_cast<unsigned long>(len), header.c_str()); 00433 else 00434 { 00435 sendBufferLock.lock(); 00436 *this << libport::format("Global.Binary.new(\"%s\", \"\\B(%s)(", 00437 libport::escape(header), len); 00438 flush(); 00439 effective_send(data, len); 00440 *this << ")\")"; 00441 sendBufferLock.unlock(); 00442 return rc; 00443 } 00444 } 00445 00446 struct sendSoundData 00447 { 00448 char* buffer; 00449 int bytespersec; 00450 size_t length; 00451 size_t pos; 00452 char* device; 00453 char* tag; 00454 char formatString[50]; 00455 USoundFormat format; 00456 UAbstractClient* uc; 00457 bool startNotify; 00458 }; 00459 00460 static UCallbackAction sendSound_(void* cb, const UMessage &msg) 00461 { 00462 //the idea is to cut the sound into small chunks, 00463 //add a header and send each chunk separately 00464 00465 //create the header. 00466 static const size_t CHUNK_SIZE = 32 * 8*60; 00467 sendSoundData* s = (sendSoundData*)cb; 00468 //handle next chunk 00469 if (s->format == SOUND_WAV && s->pos==0) 00470 s->pos = sizeof (wavheader); 00471 size_t tosend = std::min(CHUNK_SIZE, s->length - s->pos); 00472 00473 //int playlength = tosend *1000 / s->bytespersec; 00474 std::string header = ((s->format == SOUND_WAV) ? "wav " : "raw ") 00475 + (std::string)s->formatString; 00476 00477 s->uc->send("%s.val = Global.Binary.new(\"%s\", \"\\B(%lu)(", 00478 s->device, 00479 header.c_str(), 00480 static_cast<unsigned long> 00481 (tosend + ((s->format == SOUND_WAV) ? sizeof (wavheader) : 0)) 00482 ); 00483 00484 if (s->format == SOUND_WAV) 00485 { 00486 wavheader wh; 00487 memcpy(&wh, s->buffer, sizeof wh); 00488 wh.datalength=tosend; 00489 wh.length=tosend+44-8; 00490 s->uc->sendBin(&wh, sizeof wh); 00491 } 00492 s->uc->sendBin(s->buffer+s->pos, tosend); 00494 s->uc->send(")\")|;waituntil(%s.remain < 1000);\n" 00495 " %s << 1;\n", s->device, msg.tag.c_str()); 00496 s->pos += tosend; 00497 if (s->pos >= s->length) 00498 { 00499 const char* dev = s->device ? s->device : "speaker"; 00500 s->uc->send("%s.val->blend = %s.sendsoundsaveblend;", dev, dev); 00501 00502 if (s->tag && s->tag[0]) 00503 s->uc->send("Channel.new(\"%s\") << 1;\n", s->tag); 00504 delete[] s->buffer; 00505 free(s->tag); 00506 free(s->device); 00507 delete s; 00508 return URBI_REMOVE; 00509 } 00510 return URBI_CONTINUE; 00511 } 00512 00519 UAbstractClient::error_type 00520 UAbstractClient::sendSound(const char* device, const USound& sound, 00521 const char* tag) 00522 { 00523 switch (sound.soundFormat) 00524 { 00525 case SOUND_MP3: 00526 case SOUND_OGG: 00527 // We don't handle chunking for these formats. 00528 return sendBin(sound.data, sound.size, 00529 "%s +report: %s.val = BIN %lu %s;", 00530 tag, device, static_cast<unsigned long>(sound.size), 00531 sound.soundFormat == SOUND_MP3 ? "mp3" : "ogg"); 00532 break; 00533 00534 case SOUND_WAV: 00535 case SOUND_RAW: 00536 { 00537 const char* dev = device ? device : "speaker"; 00538 send("%s.removeSlot(\"sendsoundsaveblend\") |" 00539 "var %s.sendsoundsaveblend = %s.val->blend;" 00540 "%s.val->blend=\"queue\";", 00541 dev, dev, dev, dev); 00542 sendSoundData* s = new sendSoundData(); 00543 s->bytespersec = sound.channels * sound.rate * (sound.sampleSize / 8); 00544 s->uc = this; 00545 s->buffer = new char[sound.size]; 00546 memcpy(s->buffer, sound.data, sound.size); 00547 s->length = sound.size; 00548 s->tag = tag ? strdup(tag) : 0; 00549 s->device = strdup(device); 00550 s->pos = 0; 00551 s->format = sound.soundFormat; 00552 if (sound.soundFormat == SOUND_RAW) 00553 sprintf(s->formatString, "%zd %zd %zd %d", 00554 sound.channels, sound.rate, sound.sampleSize, 00555 sound.sampleFormat); 00556 else 00557 s->formatString[0] = 0; 00558 s->startNotify = false; 00559 std::string utag = fresh(); 00560 (*this) << "var " + utag +" = Channel.new(\"" << utag << "\");"; 00561 UCallbackID cid = setCallback(sendSound_, s, utag.c_str()); 00562 // Invoke it 2 times to queue sound. 00563 if (sendSound_(s, UMessage(*this, 0, utag, "*** stop", 00564 binaries_type())) 00565 == URBI_CONTINUE) 00566 { 00567 if (sendSound_(s, UMessage(*this, 0, utag, "*** stop", 00568 binaries_type())) 00569 == URBI_REMOVE) 00570 deleteCallback(cid); 00571 } 00572 else 00573 deleteCallback(cid); 00574 return 0; 00575 } 00576 00577 default: 00578 // Unrecognized format. 00579 return 1; 00580 } 00581 } 00582 00583 UCallbackID 00584 UAbstractClient::setCallback(UCallback cb, const char* tag) 00585 { 00586 return addCallback(tag, *new UCallbackWrapperCB(cb)); 00587 } 00588 00589 UCallbackID 00590 UAbstractClient::setCallback(UCustomCallback cb, 00591 void* cbData, 00592 const char* tag) 00593 { 00594 return addCallback(tag, *new UCallbackWrapperCCB(cb, cbData)); 00595 } 00596 00597 00598 int 00599 UAbstractClient::getAssociatedTag(UCallbackID id, char* tag) 00600 { 00601 listLock.lock(); 00602 callbacks_type::iterator it = 00603 std::find(callbacks_.begin(), callbacks_.end(), id); 00604 if (it == callbacks_.end()) 00605 { 00606 listLock.unlock(); 00607 return 0; 00608 } 00609 strcpy(tag, it->tag); 00610 listLock.unlock(); 00611 return 1; 00612 } 00613 00614 00615 int 00616 UAbstractClient::deleteCallback(UCallbackID id) 00617 { 00618 listLock.lock(); 00619 callbacks_type::iterator it = 00620 std::find(callbacks_.begin(), callbacks_.end(), id); 00621 if (it == callbacks_.end()) 00622 { 00623 listLock.unlock(); 00624 return 0; 00625 } 00626 delete &(it->callback); 00627 callbacks_.erase(it); 00628 listLock.unlock(); 00629 return 1; 00630 } 00631 00632 UCallbackID 00633 UAbstractClient::sendCommand(UCallback cb, const char* cmd, ...) 00634 { 00635 std::string tag = fresh(); 00636 std::string mcmd = tag + " << " + cmd; 00637 UCallbackID res = setCallback(cb, tag.c_str()); 00638 sendBufferLock.lock(); 00639 va_list arg; 00640 va_start(arg, cmd); 00641 vpack(mcmd.c_str(), arg); 00642 va_end(arg); 00643 if (endPack()) 00644 { 00645 deleteCallback(res); 00646 return UINVALIDCALLBACKID; 00647 } 00648 return res; 00649 } 00650 00651 UCallbackID 00652 UAbstractClient::sendCommand(UCustomCallback cb, void *cbData, 00653 const char* cmd, ...) 00654 { 00655 std::string tag = fresh(); 00656 std::string mcmd = tag + " << " + cmd; 00657 UCallbackID res = setCallback(cb, cbData, tag.c_str()); 00658 sendBufferLock.lock(); 00659 va_list arg; 00660 va_start(arg, cmd); 00661 vpack(mcmd.c_str(), arg); 00662 va_end(arg); 00663 if (endPack()) 00664 { 00665 deleteCallback(res); 00666 return UINVALIDCALLBACKID; 00667 } 00668 return res; 00669 } 00670 00671 UAbstractClient::error_type 00672 UAbstractClient::putFile(const char* localName, const char* remoteName) 00673 { 00674 size_t len; 00675 struct stat st; 00676 if (stat(localName, &st) == -1) 00677 return 1; 00678 len = st.st_size; 00679 sendBufferLock.lock(); 00680 if (!remoteName) 00681 remoteName = localName; 00682 send("save(\"%s\", \"", remoteName); 00683 error_type res = sendFile(localName); 00684 send("\");"); 00685 sendBufferLock.unlock(); 00686 return res; 00687 } 00688 00689 UAbstractClient::error_type 00690 UAbstractClient::putFile(const void* buffer, size_t length, 00691 const char* remoteName) 00692 { 00693 send("save(\"%s\", \"", remoteName); 00694 sendBin(buffer, length); 00695 send("\");"); 00696 sendBufferLock.unlock(); 00697 return 0; 00698 } 00699 00700 std::string 00701 UAbstractClient::fresh() 00702 { 00703 static boost::format fmt("URBI_%s"); 00704 return str(fmt % ++counter_); 00705 } 00706 00707 void 00708 UAbstractClient::makeUniqueTag(char* tag) 00709 { 00710 strcpy(tag, fresh().c_str()); 00711 } 00712 00713 bool 00714 UAbstractClient::process_recv_buffer_binary_() 00715 { 00716 //Receiving binary. Append to binaryBuffer; 00717 size_t len = 00718 std::min(recvBufferPosition - endOfHeaderPosition, 00719 binaryBufferLength - binaryBufferPosition); 00720 if (binaryBuffer) 00721 memcpy (static_cast<char*> (binaryBuffer) + binaryBufferPosition, 00722 recvBuffer + endOfHeaderPosition, len); 00723 binaryBufferPosition += len; 00724 00725 if (binaryBufferPosition == binaryBufferLength) 00726 { 00727 //Finished receiving binary. 00728 //append 00729 BinaryData bd; 00730 bd.size = binaryBufferLength; 00731 bd.data = binaryBuffer; 00732 bins << bd; 00733 binaryBuffer = 0; 00734 00735 if (nBracket == 0) 00736 { 00737 //end of command, send 00738 //dumb listLock.lock(); 00739 UMessage msg(*this, currentTimestamp, currentTag, currentCommand, 00740 bins); 00741 notifyCallbacks(msg); 00742 //unlistLock.lock(); 00743 00744 bins_clear(); 00745 00746 //flush 00747 parsePosition = 0; 00748 //Move the extra we received 00749 recvBufferPosition -= len + endOfHeaderPosition; 00750 memmove(recvBuffer, 00751 recvBuffer + endOfHeaderPosition + len, 00752 recvBufferPosition); 00753 } 00754 else 00755 { 00756 // not over yet 00757 //leave parseposition where it is 00758 //move the extra (parsePosition = endOfHeaderPosition) 00759 recvBufferPosition -= len; 00760 memmove(recvBuffer + parsePosition, 00761 recvBuffer + endOfHeaderPosition + len, 00762 recvBufferPosition - endOfHeaderPosition); 00763 } 00764 binaryBuffer = 0; 00765 binaryMode = false; 00766 00767 // Reenter loop. 00768 return true; 00769 } 00770 else 00771 { 00772 // Not finished receiving binary. 00773 recvBufferPosition = endOfHeaderPosition; 00774 return false; 00775 } 00776 } 00777 00778 bool 00779 UAbstractClient::process_recv_buffer_text_() 00780 { 00781 // Not in binary mode. 00782 char* endline = 00783 static_cast<char*> (memchr(recvBuffer+parsePosition, '\n', 00784 recvBufferPosition - parsePosition)); 00785 if (!endline) 00786 return false; //no new end of command/start of binary: wait 00787 00788 if (parsePosition == 0) // parse header 00789 { 00790 // Ignore empty lines. 00791 if (endline == recvBuffer) 00792 { 00793 memmove(recvBuffer, recvBuffer+1, recvBufferPosition - 1); 00794 recvBufferPosition--; 00795 return true; 00796 } 00797 00798 if (2 != sscanf(recvBuffer, "[%d:%64[A-Za-z0-9_.]]", 00799 ¤tTimestamp, currentTag)) 00800 { 00801 if (1 == sscanf(recvBuffer, "[%d]", ¤tTimestamp)) 00802 currentTag[0] = 0; 00803 else 00804 { 00805 // failure 00806 GD_FERROR("read, error parsing header: '%s'", recvBuffer); 00807 currentTimestamp = 0; 00808 strcpy(currentTag, "UNKNWN"); 00809 //listLock.lock(); 00810 UMessage msg(*this, 0, tag_error, 00811 "!!! UAbstractClient::read, fatal error parsing header", 00812 binaries_type()); 00813 notifyCallbacks(msg); 00814 //unlistLock.lock(); 00815 } 00816 } 00817 00818 currentCommand = strstr(recvBuffer, "]"); 00819 if (!currentCommand) 00820 { 00821 //reset all 00822 nBracket = 0; 00823 inString = false; 00824 parsePosition = 0; 00825 recvBufferPosition = 0; 00826 return false; 00827 } 00828 00829 ++currentCommand; 00830 while (*currentCommand == ' ') 00831 ++currentCommand; 00832 system = (*currentCommand == '!' || *currentCommand == '*'); 00833 parsePosition = (long) currentCommand - (long) recvBuffer; 00834 00835 //reinit just to be sure: 00836 nBracket = 0; 00837 inString = false; 00838 } 00839 00840 for (/* nothing */; parsePosition < recvBufferPosition; ++parsePosition) 00841 { 00842 if (inString) 00843 switch (recvBuffer[parsePosition]) 00844 { 00845 case '\\': 00846 if (parsePosition == recvBufferPosition-1) 00847 //we cant handle the '\\' 00848 return false; 00849 ++parsePosition; //ignore next character 00850 continue; 00851 case '"': 00852 inString = false; 00853 continue; 00854 } 00855 else 00856 { 00857 switch (recvBuffer[parsePosition]) 00858 { 00859 case '"': 00860 inString = true; 00861 continue; 00862 case '[': 00863 ++nBracket; 00864 continue; 00865 case ']': 00866 --nBracket; 00867 continue; 00868 case '\n': 00869 // FIXME: handle '[' in echoed messages or errors nBracket == 0. 00870 // 00871 // end of command 00872 recvBuffer[parsePosition] = 0; 00873 //listLock.lock(); 00874 UMessage msg(*this, currentTimestamp, currentTag, 00875 currentCommand, 00876 bins); 00877 notifyCallbacks(msg); 00878 //unlistLock.lock(); 00879 //prepare for next read, copy the extra 00880 memmove(recvBuffer, recvBuffer + parsePosition + 1, 00881 recvBufferPosition - parsePosition - 1); 00882 // copy beginning of next cmd 00883 recvBufferPosition = recvBufferPosition - parsePosition - 1; 00884 recvBuffer[recvBufferPosition] = 0; 00885 parsePosition = 0; 00886 bins_clear(); 00887 goto line_finished; //restart 00888 } 00889 00890 if (!system && !strncmp(recvBuffer+parsePosition-3, "BIN ", 4)) 00891 { 00892 //very important: scan starts below current point 00893 //compute length 00894 char* endLength; 00895 binaryBufferLength = 00896 strtol(recvBuffer+parsePosition+1, &endLength, 0); 00897 if (endLength == recvBuffer+parsePosition+1) 00898 { 00899 GD_ERROR("read, error parsing bin data length."); 00900 recvBufferPosition = 0; 00901 return false; 00902 } 00903 //go to end of header 00904 while (recvBuffer[parsePosition] !='\n') 00905 ++parsePosition; //we now we will find a \n 00906 ++parsePosition; 00907 endOfHeaderPosition = parsePosition; 00908 binaryMode = true; 00909 binaryBuffer = malloc(binaryBufferLength); 00910 binaryBufferPosition = 0; 00911 break; //restart in binarymode to handle binary 00912 } 00913 } 00914 } 00915 line_finished: 00916 // Either we ate all characters, or we were asked to restart. 00917 return parsePosition != recvBufferPosition; 00918 } 00919 00924 void 00925 UAbstractClient::processRecvBuffer() 00926 { 00927 while (binaryMode 00928 ? process_recv_buffer_binary_() 00929 : process_recv_buffer_text_()) 00930 continue; 00931 } 00932 00933 UCallbackID 00934 UAbstractClient::setWildcardCallback(UCallbackWrapper& callback) 00935 { 00936 return addCallback(tag_wildcard, callback); 00937 } 00938 00939 UCallbackID 00940 UAbstractClient::setErrorCallback(UCallbackWrapper& callback) 00941 { 00942 return addCallback(tag_error, callback); 00943 } 00944 00945 UCallbackID 00946 UAbstractClient::setClientErrorCallback(UCallbackWrapper& callback) 00947 { 00948 return addCallback(CLIENTERROR_TAG, callback); 00949 } 00950 00951 UCallbackID 00952 UAbstractClient::setCallback(UCallbackWrapper& callback, 00953 const char* tag) 00954 { 00955 return addCallback(tag, callback); 00956 } 00957 00958 UCallbackID 00959 UAbstractClient::addCallback(const char* tag, 00960 UCallbackWrapper& w) 00961 { 00962 listLock.lock(); 00963 UCallbackInfo ci(w); 00964 strncpy(ci.tag, tag, URBI_MAX_TAG_LENGTH-1); 00965 ci.tag[URBI_MAX_TAG_LENGTH-1]=0; 00966 ci.id = ++nextId; 00967 callbacks_.push_front(ci); 00968 listLock.unlock(); 00969 return ci.id; 00970 } 00971 00972 void 00973 UAbstractClient::clientError(std::string message, int erc) 00974 { 00975 // Like in UMessage's constructor, skip the possible "!!! " 00976 // prefix. 00977 const char prefix[] = "!!! "; 00978 if (message.substr(0, sizeof prefix - 1) == prefix) 00979 message.erase(0, sizeof prefix - 1); 00980 00981 if (erc) 00982 { 00983 message += message.empty() ? "" : ": "; 00984 message += libport::strerror(erc); 00985 } 00986 00987 UMessage m(*this); 00988 m.type = MESSAGE_ERROR; 00989 // rawMessage is incorrect but we don't care. 00990 m.message = m.rawMessage = message; 00991 m.timestamp = 0; 00992 m.tag = CLIENTERROR_TAG; 00993 notifyCallbacks(m); 00994 } 00995 00996 void 00997 UAbstractClient::clientError(const char* message, int erc) 00998 { 00999 return clientError(std::string(message ? message : ""), erc); 01000 } 01001 01002 void 01003 UAbstractClient::onConnection() 01004 { 01005 # define VERSION_TAG TAG_PRIVATE_PREFIX "__version" 01006 setCallback(*this, &UAbstractClient::setVersion, VERSION_TAG); 01007 // We don't know our kernel version yet. 01008 send(SYNCLINE_WRAP( 01009 "{\n" 01010 " var __ver__ = 2;\n" 01011 " {var __ver__ = 1};\n" 01012 " var " VERSION_TAG ";\n" 01013 " if (__ver__ == 1)\n" 01014 " " VERSION_TAG " << system.version\n" 01015 " else\n" 01016 " {\n" 01017 " " VERSION_TAG " = Channel.new(\"" VERSION_TAG "\");\n" 01018 " " VERSION_TAG " << System.version;\n" 01019 " };\n" 01020 "};\n")); 01021 # undef VERSION_TAG 01022 } 01023 01024 UCallbackAction 01025 UAbstractClient::setConnectionID(const UMessage& msg) 01026 { 01027 GD_FINFO_TRACE("setConnectionId for client %p", this); 01028 if (msg.type == MESSAGE_DATA && msg.value) 01029 { 01030 std::string id(*msg.value); 01031 if (!id.empty()) 01032 { 01033 libport::BlockLock bl(sendBufferLock); 01034 connectionID_ = id; 01035 return URBI_REMOVE; 01036 } 01037 } 01038 return URBI_CONTINUE; 01039 } 01040 01041 UCallbackAction 01042 UAbstractClient::setVersion(const UMessage& msg) 01043 { 01044 GD_FINFO_TRACE("setVersion for client %p", this); 01045 libport::BlockLock bl(sendBufferLock); 01046 if (msg.type != MESSAGE_DATA) 01047 return URBI_CONTINUE; 01048 aver_eq(msg.value->type, DATA_STRING); 01049 kernelVersion_ = *msg.value->stringValue; 01050 size_t sep = kernelVersion_.find_first_of('.'); 01051 try 01052 { 01053 kernelMajor_ = boost::lexical_cast<int>(kernelVersion_.substr(0, sep)); 01054 size_t sep2 = kernelVersion_.find_first_of('.', sep+1); 01055 if (sep2 != kernelVersion_.npos) 01056 kernelMinor_ = 01057 boost::lexical_cast<int>(kernelVersion_.substr(sep+1, 01058 sep2-sep-1)); 01059 else 01060 kernelMinor_ = 0; 01061 } 01062 catch (boost::bad_lexical_cast&) 01063 { 01064 kernelMajor_ = 2; 01065 kernelMinor_ = 0; 01066 GD_FWARN("failed to parse kernel version string: '%s', assuming %s.%s.", 01067 kernelVersion_, 01068 kernelMajor_, 01069 kernelMinor_); 01070 } 01071 // Set the kernel version of our associated stream. 01072 ::urbi::kernelMajor(*this) = kernelMajor_; 01073 01074 // Have the connectionId sent on __ident. 01075 # define IDENT_TAG TAG_PRIVATE_PREFIX "__ident" 01076 setCallback(*this, &UAbstractClient::setConnectionID, IDENT_TAG); 01077 if (kernelMajor_ < 2) 01078 send(IDENT_TAG " << local.connectionID;\n"); 01079 else 01080 send(SYNCLINE_WRAP("Channel.new(\"" IDENT_TAG "\")" 01081 " << connectionTag.name;\n")); 01082 return URBI_REMOVE; 01083 # undef IDENT_TAG 01084 } 01085 01086 int 01087 UAbstractClient::getCurrentTimestamp() const 01088 { 01089 return currentTimestamp; 01090 } 01091 01092 const std::string& 01093 UAbstractClient::connectionID() const 01094 { 01095 libport::BlockLock bl(sendBufferLock); 01096 return connectionID_; 01097 } 01098 01099 std::string 01100 getClientConnectionID(const UAbstractClient* cli) 01101 { 01102 if (!cli) 01103 return ""; 01104 return cli->connectionID(); 01105 } 01106 01107 01108 /*-----------------. 01109 | Default client. | 01110 `-----------------*/ 01111 01112 UClient* defaultClient = 0; 01113 01114 UClient* getDefaultClient() 01115 { 01116 return defaultClient; 01117 } 01118 01119 UClient& get_default_client() 01120 { 01121 return *getDefaultClient(); 01122 } 01123 01124 void setDefaultClient(UClient* cl) 01125 { 01126 defaultClient = cl; 01127 } 01128 01129 01130 std::ostream& 01131 unarmorAndSend(const char* a, UAbstractClient* where) 01132 { 01133 aver(a); 01134 aver(where); 01135 std::ostream& s = *where; 01136 if (strlen(a)>2) 01137 { 01138 if (a[0]=='(' && a[strlen(a)-1]==')') 01139 s.rdbuf()->sputn(a+1, strlen(a)-2); 01140 else 01141 s << a; //this is baaad, user forgot the parenthesis but was lucky 01142 } 01143 return s; 01144 } 01145 01146 } // namespace urbi