Urbi SDK Remote for C++  2.7.3
uabstractclient.cc
Go to the documentation of this file.
00001 /*
00002  * Copyright (C) 2005-2011, Gostai S.A.S.
00003  *
00004  * This software is provided "as is" without warranty of any kind,
00005  * either expressed or implied, including but not limited to the
00006  * implied warranties of fitness for a particular purpose.
00007  *
00008  * See the LICENSE file for more information.
00009  */
00010 
00012 
00013 #include <algorithm>
00014 #include <libport/cassert>
00015 #include <libport/cerrno>
00016 #include <libport/cmath>
00017 #include <libport/cstdlib>
00018 #include <fstream>
00019 #include <iostream>
00020 
00021 #include <libport/format.hh>
00022 #include <libport/io-stream.hh>
00023 #include <libport/lexical-cast.hh>
00024 
00025 #include <libport/cstdio>
00026 #include <libport/cstring>
00027 #include <libport/containers.hh>
00028 #include <libport/debug.hh>
00029 #include <libport/escape.hh>
00030 #include <libport/lexical-cast.hh>
00031 #include <libport/lockable.hh>
00032 #include <libport/sys/stat.h>
00033 #include <libport/unistd.h>
00034 #include <libport/windows.hh>
00035 
00036 #include <urbi/uabstractclient.hh>
00037 #include <urbi/uconversion.hh>
00038 #include <urbi/umessage.hh>
00039 #include <urbi/utag.hh>
00040 
00041 #include <liburbi/compatibility.hh>
00042 
00043 GD_CATEGORY(Urbi.Client.Abstract);
00044 
00045 namespace urbi
00046 {
00047 
00049   const char* tag_error = "[error]";
00051   const char* tag_wildcard = "[wildcard]";
00052 
00053   std::ostream&
00054   default_stream()
00055   {
00056     return (getDefaultClient()
00057             ? ((UAbstractClient*)getDefaultClient())->stream_get()
00058             : std::cerr);
00059   }
00060 
00061 
00062   /*-------------.
00063   | UCallbacks.  |
00064   `-------------*/
00065 
00066   static UCallbackID nextId;
00067 
00068   class UCallbackWrapperCB: public UCallbackWrapper
00069   {
00070     UCallback cb;
00071   public:
00072     UCallbackWrapperCB(UCallback cb)
00073       : cb(cb)
00074     {
00075     }
00076     virtual UCallbackAction operator()(const UMessage& msg)
00077     {
00078       return cb(msg);
00079     }
00080   };
00081 
00082 
00083   class UCallbackWrapperCCB: public UCallbackWrapper
00084   {
00085     UCustomCallback cb;
00086     void * data;
00087   public:
00088     UCallbackWrapperCCB(UCustomCallback cb, void* data)
00089       : cb(cb)
00090       , data(data)
00091     {
00092     }
00093     virtual UCallbackAction operator()(const UMessage& msg)
00094     {
00095       return cb(data, msg);
00096     }
00097   };
00098 
00099 
00100   /*-------------------.
00101   | UClientStreambuf.  |
00102   `-------------------*/
00103 
00104   class UClientStreambuf: public libport::StreamBuffer
00105   {
00106   public:
00107     UClientStreambuf(UAbstractClient* cl)
00108       : client_(cl)
00109     {}
00110 
00111   protected:
00112      virtual size_t read(char* buffer, size_t size);
00113      virtual void write(char* buffer, size_t size);
00114 
00115   private:
00116     UAbstractClient* client_;
00117   };
00118 
00119   void
00120   UClientStreambuf::write(char* buffer, size_t size)
00121   {
00122     client_->effective_send(buffer, size);
00123   }
00124 
00125   size_t
00126   UClientStreambuf::read(char*, size_t)
00127   {
00128     return 0;
00129   }
00130 
00131   /*------------------.
00132   | UAbstractClient.  |
00133   `------------------*/
00134 
00135 
00136   const char* UAbstractClient::CLIENTERROR_TAG = "client_error";
00137 
00138   void
00139   UAbstractClient::bins_clear()
00140   {
00141     for (/* nothing */; !bins.empty(); bins.pop_front())
00142       bins.front().clear();
00143   }
00144 
00145   inline
00146   bool
00147   matching_tag(const UMessage& msg, const char* tag)
00148   {
00149     return
00150       msg.tag == tag
00151       || (libport::streq(tag, tag_error) && msg.type == MESSAGE_ERROR)
00152       // The wild card does not match tags starting with
00153       // TAG_PRIVATE_PREFIX.
00154       || (libport::streq(tag, tag_wildcard)
00155           && msg.tag.compare(0,
00156                              sizeof TAG_PRIVATE_PREFIX - 1,
00157                              TAG_PRIVATE_PREFIX));
00158   }
00159 
00160   void
00161   UAbstractClient::notifyCallbacks(const UMessage& msg)
00162   {
00163     libport::BlockLock bl(listLock);
00164     bool inc = true;
00165     for (callbacks_type::iterator it = callbacks_.begin();
00166          it != callbacks_.end();
00167          inc ? it++ : it, inc = true)
00168       if (matching_tag(msg, it->tag))
00169       {
00170         UCallbackAction ua = it->callback(msg);
00171         if (ua == URBI_REMOVE)
00172         {
00173           delete &it->callback;
00174           it = callbacks_.erase(it);
00175           inc = false;
00176         }
00177       }
00178   }
00179 
00180   UAbstractClient::UAbstractClient(const std::string& host,
00181                                    unsigned port,
00182                                    size_t buflen,
00183                                    bool server)
00184     : LockableOstream(new UClientStreambuf(this))
00185     , closed_ (false)
00186     , listLock()
00187     , host_(host)
00188     , port_(port)
00189     , server_(server)
00190     , sendBufSize(buflen)
00191     , recvBufSize(buflen)
00192     , rc(0)
00193 
00194     , recvBuffer(new char[buflen])
00195     , recvBufferPosition(0)
00196     , sendBuffer(new char[buflen])
00197 
00198     , kernelMajor_(-1)
00199     , kernelMinor_(-1)
00200     , binaryBuffer(0)
00201     , parsePosition(0)
00202     , inString(false)
00203     , nBracket(0)
00204     , binaryMode(false)
00205     , system(false)
00206     , init_(true)
00207     , counter_(0)
00208     , stream_(this)
00209   {
00210     exceptions(std::ostream::eofbit | std::ostream::failbit |
00211                std::ostream::badbit);
00212     recvBuffer[0] = 0;
00213     sendBuffer[0] = 0;
00214   }
00215 
00216   UAbstractClient::~UAbstractClient()
00217   {
00218     // No more default client if delete.
00219     if ((void*)getDefaultClient() == (void*)this)
00220       setDefaultClient(0);
00221     delete [] recvBuffer;
00222     delete [] sendBuffer;
00223   }
00224 
00229   UAbstractClient::error_type
00230   UAbstractClient::startPack()
00231   {
00232     sendBufferLock.lock();
00233     return 0;
00234   }
00235 
00236   UAbstractClient::error_type
00237   UAbstractClient::endPack()
00238   {
00239     error_type res = effective_send(sendBuffer);
00240     sendBuffer[0] = 0;
00241     sendBufferLock.unlock();
00242     return res;
00243   }
00244 
00245   UAbstractClient::error_type
00246   UAbstractClient::send(const char* command, ...)
00247   {
00248     if (rc)
00249       return -1;
00250     va_list arg;
00251     va_start(arg, command);
00252     sendBufferLock.lock();
00253     rc = vpack(command, arg);
00254     va_end(arg);
00255     if (rc < 0)
00256     {
00257       sendBufferLock.unlock();
00258       return rc;
00259     }
00260     return rc = endPack();
00261   }
00262 
00263   UAbstractClient::error_type
00264   UAbstractClient::send(const std::string& s)
00265   {
00266     return send("%s", s.c_str());
00267   }
00268 
00269   UAbstractClient::error_type
00270   UAbstractClient::send(const UValue& v)
00271   {
00272     switch (v.type)
00273     {
00274     // Bounce to UValue operator << for those types.
00275     case DATA_DOUBLE:
00276     case DATA_SLOTNAME:
00277     case DATA_STRING:
00278       return send(string_cast(v));
00279       break;
00280 
00281     // Use our own sendBinary for binary, who knows how to talk to k1 and k2.
00282     case DATA_BINARY:
00283       if (v.binary->type != BINARY_NONE
00284           && v.binary->type != BINARY_UNKNOWN)
00285         v.binary->buildMessage();
00286       return sendBinary(v.binary->common.data, v.binary->common.size,
00287                         v.binary->message);
00288       break;
00289 
00290     // Lists can contain binary, so recurse using this function.
00291     case DATA_LIST:
00292       send("[");
00293       foreach (const UValue* u, *v.list)
00294         send(libport::format("%s,", u));
00295       return send("]");
00296       break;
00297 
00298     case DATA_DICTIONARY:
00299       send("[");
00300       if (v.dictionary->empty())
00301         send("=>");
00302       else
00303         foreach (const UDictionary::value_type& d, *v.dictionary)
00304           send(libport::format("\"%s\"=>%s,",
00305                                libport::escape(d.first), d.second));
00306       return send("]");
00307       break;
00308 
00309     case DATA_VOID:
00310       break;
00311     };
00312     return 0;
00313   }
00314 
00315   UAbstractClient::error_type
00316   UAbstractClient::send(std::istream& is)
00317   {
00318     if (rc)
00319       return -1;
00320     sendBufferLock.lock();
00321     while (is.good() && !rc)
00322     {
00323       is.read(sendBuffer, sendBufSize);
00324       rc = effective_send(sendBuffer, is.gcount());
00325     }
00326     sendBuffer[0] = 0;
00327     sendBufferLock.unlock();
00328     return rc;
00329   }
00330 
00331 
00332 
00337   UAbstractClient::error_type
00338   UAbstractClient::pack(const char* command, ...)
00339   {
00340     if (rc)
00341       return -1;
00342     va_list arg;
00343     va_start(arg, command);
00344     rc = vpack(command, arg);
00345     va_end(arg);
00346     return rc;
00347   }
00348 
00349 
00350   UAbstractClient::error_type
00351   UAbstractClient::vpack(const char* command, va_list arg)
00352   {
00353     if (rc)
00354       return -1;
00355     sendBufferLock.lock();
00356     if (command)
00357     {
00358       // Don't print if we overflow the buffer.  It would be nice to
00359       // rely on the behavior of the GNU LibC which accepts 0 as
00360       // destination buffer to query the space needed.  But it is not
00361       // portable (e.g., segv on OS X).  So rather, try to vsnprintf,
00362       // and upon failure, revert the buffer in its previous state.
00363       size_t slen = strlen(sendBuffer);
00364       size_t msize = sendBufSize - slen;
00365       int r = vsnprintf(sendBuffer + slen, msize, command, arg);
00366       // vsnprintf returns the number of characters to write.  Check
00367       // that it fits.  Don't forget the ending '\0' that it does not
00368       // count, but wants to add.
00369       if (r < 0 || static_cast<int>(msize) <= r)
00370       {
00371         // Don't produce partial input.
00372         sendBuffer[slen] = 0;
00373         rc = -1;
00374       }
00375     }
00376     sendBufferLock.unlock();
00377     return rc;
00378   }
00379 
00380 
00381   UAbstractClient::error_type
00382   UAbstractClient::sendFile(const std::string& f)
00383   {
00384     if (f == "/dev/stdin")
00385       return send(std::cin);
00386     else
00387     {
00388       std::ifstream is(f.c_str(), std::ios::binary);
00389       if (is.fail())
00390         return -1;
00391       else
00392         return send(is);
00393     }
00394   }
00395 
00396 
00397   UAbstractClient::error_type
00398   UAbstractClient::sendBin(const void* buffer, size_t len)
00399   {
00400     return sendBin(buffer, len, 0);
00401   }
00402 
00403 
00404   UAbstractClient::error_type
00405   UAbstractClient::sendBin(const void* buffer, size_t len,
00406                            const char* header, ...)
00407   {
00408     if (rc)
00409       return -1;
00410     sendBufferLock.lock();
00411     if (header)
00412     {
00413       va_list arg;
00414       va_start(arg, header);
00415       vpack(header, arg);
00416       va_end(arg);
00417       effective_send(sendBuffer);
00418     }
00419 
00420     error_type res = effective_send(buffer, len);
00421     sendBuffer[0] = 0;
00422     sendBufferLock.unlock();
00423     return res;
00424   }
00425 
00426   UAbstractClient::error_type
00427   UAbstractClient::sendBinary(const void* data, size_t len,
00428                               const std::string& header)
00429   {
00430     if (kernelMajor() < 2)
00431       return sendBin(data, len, "BIN %lu %s;",
00432                      static_cast<unsigned long>(len), header.c_str());
00433     else
00434     {
00435       sendBufferLock.lock();
00436       *this << libport::format("Global.Binary.new(\"%s\", \"\\B(%s)(",
00437                                libport::escape(header), len);
00438       flush();
00439       effective_send(data, len);
00440       *this << ")\")";
00441       sendBufferLock.unlock();
00442       return rc;
00443     }
00444   }
00445 
00446   struct sendSoundData
00447   {
00448     char* buffer;
00449     int bytespersec;
00450     size_t length;
00451     size_t pos;
00452     char* device;
00453     char* tag;
00454     char formatString[50];
00455     USoundFormat format;
00456     UAbstractClient* uc;
00457     bool startNotify;
00458   };
00459 
00460   static UCallbackAction sendSound_(void* cb, const UMessage &msg)
00461   {
00462     //the idea is to cut the sound into small chunks,
00463     //add a header and send each chunk separately
00464 
00465     //create the header.
00466     static const size_t CHUNK_SIZE = 32 * 8*60;
00467     sendSoundData* s = (sendSoundData*)cb;
00468     //handle next chunk
00469     if (s->format == SOUND_WAV && s->pos==0)
00470       s->pos = sizeof (wavheader);
00471     size_t tosend = std::min(CHUNK_SIZE, s->length - s->pos);
00472 
00473     //int playlength = tosend *1000 / s->bytespersec;
00474     std::string header = ((s->format == SOUND_WAV) ? "wav " : "raw ")
00475       + (std::string)s->formatString;
00476 
00477     s->uc->send("%s.val = Global.Binary.new(\"%s\", \"\\B(%lu)(",
00478                 s->device,
00479                 header.c_str(),
00480                 static_cast<unsigned long>
00481                 (tosend + ((s->format == SOUND_WAV) ? sizeof (wavheader) : 0))
00482                 );
00483 
00484     if (s->format == SOUND_WAV)
00485     {
00486       wavheader wh;
00487       memcpy(&wh, s->buffer, sizeof wh);
00488       wh.datalength=tosend;
00489       wh.length=tosend+44-8;
00490       s->uc->sendBin(&wh, sizeof wh);
00491     }
00492     s->uc->sendBin(s->buffer+s->pos, tosend);
00494     s->uc->send(")\")|;waituntil(%s.remain < 1000);\n"
00495                 " %s << 1;\n", s->device, msg.tag.c_str());
00496     s->pos += tosend;
00497     if (s->pos >= s->length)
00498     {
00499       const char* dev = s->device ? s->device : "speaker";
00500       s->uc->send("%s.val->blend = %s.sendsoundsaveblend;", dev, dev);
00501 
00502       if (s->tag && s->tag[0])
00503         s->uc->send("Channel.new(\"%s\") << 1;\n", s->tag);
00504       delete[] s->buffer;
00505       free(s->tag);
00506       free(s->device);
00507       delete s;
00508       return URBI_REMOVE;
00509     }
00510     return URBI_CONTINUE;
00511   }
00512 
00519   UAbstractClient::error_type
00520   UAbstractClient::sendSound(const char* device, const USound& sound,
00521                              const char* tag)
00522   {
00523     switch (sound.soundFormat)
00524     {
00525     case SOUND_MP3:
00526     case SOUND_OGG:
00527       // We don't handle chunking for these formats.
00528       return sendBin(sound.data, sound.size,
00529                      "%s +report:  %s.val = BIN %lu %s;",
00530                      tag, device, static_cast<unsigned long>(sound.size),
00531                      sound.soundFormat == SOUND_MP3 ? "mp3" : "ogg");
00532       break;
00533 
00534     case SOUND_WAV:
00535     case SOUND_RAW:
00536     {
00537       const char* dev = device ? device : "speaker";
00538       send("%s.removeSlot(\"sendsoundsaveblend\") |"
00539            "var %s.sendsoundsaveblend = %s.val->blend;"
00540            "%s.val->blend=\"queue\";",
00541            dev, dev, dev, dev);
00542       sendSoundData* s = new sendSoundData();
00543       s->bytespersec = sound.channels * sound.rate * (sound.sampleSize / 8);
00544       s->uc = this;
00545       s->buffer = new char[sound.size];
00546       memcpy(s->buffer, sound.data, sound.size);
00547       s->length = sound.size;
00548       s->tag = tag ? strdup(tag) : 0;
00549       s->device = strdup(device);
00550       s->pos = 0;
00551       s->format = sound.soundFormat;
00552       if (sound.soundFormat == SOUND_RAW)
00553         sprintf(s->formatString, "%zd %zd %zd %d",
00554                 sound.channels, sound.rate, sound.sampleSize,
00555                 sound.sampleFormat);
00556       else
00557         s->formatString[0] = 0;
00558       s->startNotify = false;
00559       std::string utag = fresh();
00560       (*this) << "var " + utag +" = Channel.new(\"" << utag << "\");";
00561       UCallbackID cid = setCallback(sendSound_, s, utag.c_str());
00562       // Invoke it 2 times to queue sound.
00563       if (sendSound_(s, UMessage(*this, 0, utag, "*** stop",
00564                                  binaries_type()))
00565           == URBI_CONTINUE)
00566       {
00567         if (sendSound_(s, UMessage(*this, 0, utag, "*** stop",
00568                                    binaries_type()))
00569             == URBI_REMOVE)
00570           deleteCallback(cid);
00571       }
00572       else
00573         deleteCallback(cid);
00574       return 0;
00575     }
00576 
00577     default:
00578       // Unrecognized format.
00579       return 1;
00580     }
00581   }
00582 
00583   UCallbackID
00584   UAbstractClient::setCallback(UCallback cb, const char* tag)
00585   {
00586     return addCallback(tag, *new UCallbackWrapperCB(cb));
00587   }
00588 
00589   UCallbackID
00590   UAbstractClient::setCallback(UCustomCallback cb,
00591                                void* cbData,
00592                                const char* tag)
00593   {
00594     return addCallback(tag, *new UCallbackWrapperCCB(cb, cbData));
00595   }
00596 
00597 
00598   int
00599   UAbstractClient::getAssociatedTag(UCallbackID id, char* tag)
00600   {
00601     listLock.lock();
00602     callbacks_type::iterator it =
00603       std::find(callbacks_.begin(), callbacks_.end(), id);
00604     if (it == callbacks_.end())
00605     {
00606       listLock.unlock();
00607       return 0;
00608     }
00609     strcpy(tag, it->tag);
00610     listLock.unlock();
00611     return 1;
00612   }
00613 
00614 
00615   int
00616   UAbstractClient::deleteCallback(UCallbackID id)
00617   {
00618     listLock.lock();
00619     callbacks_type::iterator it =
00620       std::find(callbacks_.begin(), callbacks_.end(), id);
00621     if (it == callbacks_.end())
00622     {
00623       listLock.unlock();
00624       return 0;
00625     }
00626     delete &(it->callback);
00627     callbacks_.erase(it);
00628     listLock.unlock();
00629     return 1;
00630   }
00631 
00632   UCallbackID
00633   UAbstractClient::sendCommand(UCallback cb, const char* cmd, ...)
00634   {
00635     std::string tag = fresh();
00636     std::string mcmd = tag + " << " + cmd;
00637     UCallbackID res = setCallback(cb, tag.c_str());
00638     sendBufferLock.lock();
00639     va_list arg;
00640     va_start(arg, cmd);
00641     vpack(mcmd.c_str(), arg);
00642     va_end(arg);
00643     if (endPack())
00644     {
00645       deleteCallback(res);
00646       return UINVALIDCALLBACKID;
00647     }
00648     return res;
00649   }
00650 
00651   UCallbackID
00652   UAbstractClient::sendCommand(UCustomCallback cb, void *cbData,
00653                                const char* cmd, ...)
00654   {
00655     std::string tag = fresh();
00656     std::string mcmd = tag + " << " + cmd;
00657     UCallbackID res = setCallback(cb, cbData, tag.c_str());
00658     sendBufferLock.lock();
00659     va_list arg;
00660     va_start(arg, cmd);
00661     vpack(mcmd.c_str(), arg);
00662     va_end(arg);
00663     if (endPack())
00664     {
00665       deleteCallback(res);
00666       return UINVALIDCALLBACKID;
00667     }
00668     return res;
00669   }
00670 
00671   UAbstractClient::error_type
00672   UAbstractClient::putFile(const char* localName, const char* remoteName)
00673   {
00674     size_t len;
00675     struct stat st;
00676     if (stat(localName, &st) == -1)
00677       return 1;
00678     len = st.st_size;
00679     sendBufferLock.lock();
00680     if (!remoteName)
00681       remoteName = localName;
00682     send("save(\"%s\", \"", remoteName);
00683     error_type res = sendFile(localName);
00684     send("\");");
00685     sendBufferLock.unlock();
00686     return res;
00687   }
00688 
00689   UAbstractClient::error_type
00690   UAbstractClient::putFile(const void* buffer, size_t length,
00691                            const char* remoteName)
00692   {
00693     send("save(\"%s\", \"", remoteName);
00694     sendBin(buffer, length);
00695     send("\");");
00696     sendBufferLock.unlock();
00697     return 0;
00698   }
00699 
00700   std::string
00701   UAbstractClient::fresh()
00702   {
00703     static boost::format fmt("URBI_%s");
00704     return str(fmt % ++counter_);
00705   }
00706 
00707   void
00708   UAbstractClient::makeUniqueTag(char* tag)
00709   {
00710     strcpy(tag, fresh().c_str());
00711   }
00712 
00713   bool
00714   UAbstractClient::process_recv_buffer_binary_()
00715   {
00716     //Receiving binary. Append to binaryBuffer;
00717     size_t len =
00718       std::min(recvBufferPosition - endOfHeaderPosition,
00719                binaryBufferLength - binaryBufferPosition);
00720     if (binaryBuffer)
00721       memcpy (static_cast<char*> (binaryBuffer) + binaryBufferPosition,
00722               recvBuffer + endOfHeaderPosition, len);
00723     binaryBufferPosition += len;
00724 
00725     if (binaryBufferPosition == binaryBufferLength)
00726     {
00727       //Finished receiving binary.
00728       //append
00729       BinaryData bd;
00730       bd.size = binaryBufferLength;
00731       bd.data = binaryBuffer;
00732       bins << bd;
00733       binaryBuffer = 0;
00734 
00735       if (nBracket == 0)
00736       {
00737         //end of command, send
00738         //dumb listLock.lock();
00739         UMessage msg(*this, currentTimestamp, currentTag, currentCommand,
00740                      bins);
00741         notifyCallbacks(msg);
00742         //unlistLock.lock();
00743 
00744         bins_clear();
00745 
00746         //flush
00747         parsePosition = 0;
00748         //Move the extra we received
00749         recvBufferPosition -= len + endOfHeaderPosition;
00750         memmove(recvBuffer,
00751                 recvBuffer + endOfHeaderPosition + len,
00752                 recvBufferPosition);
00753       }
00754       else
00755       {
00756         // not over yet
00757         //leave parseposition where it is
00758         //move the extra (parsePosition = endOfHeaderPosition)
00759         recvBufferPosition -= len;
00760         memmove(recvBuffer + parsePosition,
00761                 recvBuffer + endOfHeaderPosition + len,
00762                 recvBufferPosition - endOfHeaderPosition);
00763       }
00764       binaryBuffer = 0;
00765       binaryMode = false;
00766 
00767       // Reenter loop.
00768       return true;
00769     }
00770     else
00771     {
00772       // Not finished receiving binary.
00773       recvBufferPosition = endOfHeaderPosition;
00774       return false;
00775     }
00776   }
00777 
00778   bool
00779   UAbstractClient::process_recv_buffer_text_()
00780   {
00781     // Not in binary mode.
00782     char* endline =
00783       static_cast<char*> (memchr(recvBuffer+parsePosition, '\n',
00784                                  recvBufferPosition - parsePosition));
00785     if (!endline)
00786       return false; //no new end of command/start of binary: wait
00787 
00788     if (parsePosition == 0) // parse header
00789     {
00790       // Ignore empty lines.
00791       if (endline == recvBuffer)
00792       {
00793         memmove(recvBuffer, recvBuffer+1, recvBufferPosition - 1);
00794         recvBufferPosition--;
00795         return true;
00796       }
00797 
00798       if (2 != sscanf(recvBuffer, "[%d:%64[A-Za-z0-9_.]]",
00799                       &currentTimestamp, currentTag))
00800       {
00801         if (1 == sscanf(recvBuffer, "[%d]", &currentTimestamp))
00802           currentTag[0] = 0;
00803         else
00804         {
00805           // failure
00806           GD_FERROR("read, error parsing header: '%s'", recvBuffer);
00807           currentTimestamp = 0;
00808           strcpy(currentTag, "UNKNWN");
00809           //listLock.lock();
00810           UMessage msg(*this, 0, tag_error,
00811                        "!!! UAbstractClient::read, fatal error parsing header",
00812                        binaries_type());
00813           notifyCallbacks(msg);
00814           //unlistLock.lock();
00815         }
00816       }
00817 
00818       currentCommand = strstr(recvBuffer, "]");
00819       if (!currentCommand)
00820       {
00821         //reset all
00822         nBracket = 0;
00823         inString = false;
00824         parsePosition = 0;
00825         recvBufferPosition = 0;
00826         return false;
00827       }
00828 
00829       ++currentCommand;
00830       while (*currentCommand == ' ')
00831         ++currentCommand;
00832       system = (*currentCommand == '!' || *currentCommand == '*');
00833       parsePosition = (long) currentCommand - (long) recvBuffer;
00834 
00835       //reinit just to be sure:
00836       nBracket = 0;
00837       inString = false;
00838     }
00839 
00840     for (/* nothing */; parsePosition < recvBufferPosition; ++parsePosition)
00841     {
00842       if (inString)
00843         switch (recvBuffer[parsePosition])
00844         {
00845         case '\\':
00846           if (parsePosition == recvBufferPosition-1)
00847             //we cant handle the '\\'
00848             return false;
00849           ++parsePosition; //ignore next character
00850           continue;
00851         case '"':
00852           inString = false;
00853           continue;
00854         }
00855       else
00856       {
00857         switch (recvBuffer[parsePosition])
00858         {
00859         case '"':
00860           inString = true;
00861           continue;
00862         case '[':
00863           ++nBracket;
00864           continue;
00865         case ']':
00866           --nBracket;
00867           continue;
00868         case '\n':
00869           // FIXME: handle '[' in echoed messages or errors nBracket == 0.
00870           //
00871           // end of command
00872           recvBuffer[parsePosition] = 0;
00873           //listLock.lock();
00874           UMessage msg(*this, currentTimestamp, currentTag,
00875                        currentCommand,
00876                        bins);
00877           notifyCallbacks(msg);
00878           //unlistLock.lock();
00879           //prepare for next read, copy the extra
00880           memmove(recvBuffer, recvBuffer + parsePosition + 1,
00881                   recvBufferPosition - parsePosition - 1);
00882           // copy beginning of next cmd
00883           recvBufferPosition = recvBufferPosition - parsePosition - 1;
00884           recvBuffer[recvBufferPosition] = 0;
00885           parsePosition = 0;
00886           bins_clear();
00887           goto line_finished; //restart
00888         }
00889 
00890         if (!system && !strncmp(recvBuffer+parsePosition-3, "BIN ", 4))
00891         {
00892           //very important: scan starts below current point
00893           //compute length
00894           char* endLength;
00895           binaryBufferLength =
00896             strtol(recvBuffer+parsePosition+1, &endLength, 0);
00897           if (endLength == recvBuffer+parsePosition+1)
00898           {
00899             GD_ERROR("read, error parsing bin data length.");
00900             recvBufferPosition = 0;
00901             return false;
00902           }
00903           //go to end of header
00904           while (recvBuffer[parsePosition] !='\n')
00905             ++parsePosition; //we now we will find a \n
00906           ++parsePosition;
00907           endOfHeaderPosition = parsePosition;
00908           binaryMode = true;
00909           binaryBuffer = malloc(binaryBufferLength);
00910           binaryBufferPosition = 0;
00911           break; //restart in binarymode to handle binary
00912         }
00913       }
00914     }
00915   line_finished:
00916     // Either we ate all characters, or we were asked to restart.
00917     return parsePosition != recvBufferPosition;
00918   }
00919 
00924   void
00925   UAbstractClient::processRecvBuffer()
00926   {
00927     while (binaryMode
00928            ? process_recv_buffer_binary_()
00929            : process_recv_buffer_text_())
00930       continue;
00931   }
00932 
00933   UCallbackID
00934   UAbstractClient::setWildcardCallback(UCallbackWrapper& callback)
00935   {
00936     return addCallback(tag_wildcard, callback);
00937   }
00938 
00939   UCallbackID
00940   UAbstractClient::setErrorCallback(UCallbackWrapper& callback)
00941   {
00942     return addCallback(tag_error, callback);
00943   }
00944 
00945   UCallbackID
00946   UAbstractClient::setClientErrorCallback(UCallbackWrapper& callback)
00947   {
00948     return addCallback(CLIENTERROR_TAG, callback);
00949   }
00950 
00951   UCallbackID
00952   UAbstractClient::setCallback(UCallbackWrapper& callback,
00953                                const char* tag)
00954   {
00955     return addCallback(tag, callback);
00956   }
00957 
00958   UCallbackID
00959   UAbstractClient::addCallback(const char* tag,
00960                                UCallbackWrapper& w)
00961   {
00962     listLock.lock();
00963     UCallbackInfo ci(w);
00964     strncpy(ci.tag, tag, URBI_MAX_TAG_LENGTH-1);
00965     ci.tag[URBI_MAX_TAG_LENGTH-1]=0;
00966     ci.id = ++nextId;
00967     callbacks_.push_front(ci);
00968     listLock.unlock();
00969     return ci.id;
00970   }
00971 
00972   void
00973   UAbstractClient::clientError(std::string message, int erc)
00974   {
00975     // Like in UMessage's constructor, skip the possible "!!! "
00976     // prefix.
00977     const char prefix[] = "!!! ";
00978     if (message.substr(0, sizeof prefix - 1) == prefix)
00979       message.erase(0, sizeof prefix - 1);
00980 
00981     if (erc)
00982     {
00983       message += message.empty() ? "" : ": ";
00984       message += libport::strerror(erc);
00985     }
00986 
00987     UMessage m(*this);
00988     m.type = MESSAGE_ERROR;
00989     // rawMessage is incorrect but we don't care.
00990     m.message = m.rawMessage = message;
00991     m.timestamp = 0;
00992     m.tag = CLIENTERROR_TAG;
00993     notifyCallbacks(m);
00994   }
00995 
00996   void
00997   UAbstractClient::clientError(const char* message, int erc)
00998   {
00999     return clientError(std::string(message ? message : ""), erc);
01000   }
01001 
01002   void
01003   UAbstractClient::onConnection()
01004   {
01005 # define VERSION_TAG TAG_PRIVATE_PREFIX "__version"
01006     setCallback(*this, &UAbstractClient::setVersion, VERSION_TAG);
01007     // We don't know our kernel version yet.
01008     send(SYNCLINE_WRAP(
01009            "{\n"
01010            "  var __ver__ = 2;\n"
01011            "  {var __ver__ = 1};\n"
01012            "  var " VERSION_TAG ";\n"
01013            "  if (__ver__ == 1)\n"
01014            "    " VERSION_TAG " << system.version\n"
01015            "  else\n"
01016            "  {\n"
01017            "    " VERSION_TAG " = Channel.new(\"" VERSION_TAG "\");\n"
01018            "    " VERSION_TAG " << System.version;\n"
01019            "  };\n"
01020            "};\n"));
01021 # undef VERSION_TAG
01022   }
01023 
01024   UCallbackAction
01025   UAbstractClient::setConnectionID(const UMessage& msg)
01026   {
01027     GD_FINFO_TRACE("setConnectionId for client %p", this);
01028     if (msg.type == MESSAGE_DATA && msg.value)
01029     {
01030       std::string id(*msg.value);
01031       if (!id.empty())
01032       {
01033         libport::BlockLock bl(sendBufferLock);
01034         connectionID_ = id;
01035         return URBI_REMOVE;
01036       }
01037     }
01038     return URBI_CONTINUE;
01039   }
01040 
01041   UCallbackAction
01042   UAbstractClient::setVersion(const UMessage& msg)
01043   {
01044     GD_FINFO_TRACE("setVersion for client %p", this);
01045     libport::BlockLock bl(sendBufferLock);
01046     if (msg.type != MESSAGE_DATA)
01047       return URBI_CONTINUE;
01048     aver_eq(msg.value->type, DATA_STRING);
01049     kernelVersion_ = *msg.value->stringValue;
01050     size_t sep = kernelVersion_.find_first_of('.');
01051     try
01052     {
01053       kernelMajor_ = boost::lexical_cast<int>(kernelVersion_.substr(0, sep));
01054       size_t sep2 = kernelVersion_.find_first_of('.', sep+1);
01055       if (sep2 != kernelVersion_.npos)
01056         kernelMinor_ =
01057           boost::lexical_cast<int>(kernelVersion_.substr(sep+1,
01058                                                          sep2-sep-1));
01059       else
01060         kernelMinor_ = 0;
01061     }
01062     catch (boost::bad_lexical_cast&)
01063     {
01064       kernelMajor_ = 2;
01065       kernelMinor_ = 0;
01066       GD_FWARN("failed to parse kernel version string: '%s', assuming %s.%s.",
01067                kernelVersion_,
01068                kernelMajor_,
01069                kernelMinor_);
01070     }
01071     // Set the kernel version of our associated stream.
01072     ::urbi::kernelMajor(*this) = kernelMajor_;
01073 
01074     // Have the connectionId sent on __ident.
01075 # define IDENT_TAG TAG_PRIVATE_PREFIX "__ident"
01076     setCallback(*this, &UAbstractClient::setConnectionID, IDENT_TAG);
01077     if (kernelMajor_ < 2)
01078       send(IDENT_TAG " << local.connectionID;\n");
01079     else
01080       send(SYNCLINE_WRAP("Channel.new(\"" IDENT_TAG "\")"
01081                          " << connectionTag.name;\n"));
01082     return URBI_REMOVE;
01083 # undef IDENT_TAG
01084   }
01085 
01086   int
01087   UAbstractClient::getCurrentTimestamp() const
01088   {
01089     return currentTimestamp;
01090   }
01091 
01092   const std::string&
01093   UAbstractClient::connectionID() const
01094   {
01095     libport::BlockLock bl(sendBufferLock);
01096     return connectionID_;
01097   }
01098 
01099   std::string
01100   getClientConnectionID(const UAbstractClient* cli)
01101   {
01102     if (!cli)
01103       return "";
01104     return cli->connectionID();
01105   }
01106 
01107 
01108   /*-----------------.
01109   | Default client.  |
01110   `-----------------*/
01111 
01112   UClient* defaultClient = 0;
01113 
01114   UClient* getDefaultClient()
01115   {
01116     return defaultClient;
01117   }
01118 
01119   UClient& get_default_client()
01120   {
01121     return *getDefaultClient();
01122   }
01123 
01124   void setDefaultClient(UClient* cl)
01125   {
01126     defaultClient = cl;
01127   }
01128 
01129 
01130   std::ostream&
01131   unarmorAndSend(const char* a, UAbstractClient* where)
01132   {
01133     aver(a);
01134     aver(where);
01135     std::ostream& s = *where;
01136     if (strlen(a)>2)
01137     {
01138       if (a[0]=='(' && a[strlen(a)-1]==')')
01139         s.rdbuf()->sputn(a+1, strlen(a)-2);
01140       else
01141         s << a; //this is baaad, user forgot the parenthesis but was lucky
01142     }
01143     return s;
01144   }
01145 
01146 } // namespace urbi