Skip to content

file /Users/ios_developer/workspace/coldwave-os/build/_deps/flake-src/ObjectImpl.cpp

Namespaces

Name
flake

Namespaces

Name
flake

Source code

cpp

#include <ObjectImpl.h>
#include <protocol/ClientConnection.h>
#include <protocol/Message.h>
#include <stdint.h>
#include <TableImpl.h>

#ifdef _WIN32
#include <sys/types.h>
#include <time.h>
#endif

namespace flake
{

    const addr_t ObjectImpl::EMPTY_ADDR = 0xffff;
    const addr_t ConnectedObject::EMPTY_ADDR = 0xffff;

    ObjectImpl::ObjectImpl (Service *srv)
        : ObjectImpl ()
    {

      m_serviceRef = srv;
      flakeMutexAttr_t _attr;
      memset (&_attr, 0, sizeof (flakeMutexAttr_t));
      _attr.attr_bits = flakeMutexRecursive;
      flakeMutexNew (&m_blockMutex, &_attr);
    }

    ObjectImpl::ObjectImpl ()
        : ConnectedObjectImpl (nullptr)
    {
      m_initialized = false;
      m_serviceRef = nullptr;
      m_maxNamedTag = NAMED_TAG_RANGE_START;
      flakeMutexAttr_t _attr;
      memset (&_attr, 0, sizeof (flakeMutexAttr_t));
      _attr.attr_bits = flakeMutexRecursive;
      flakeMutexNew (&m_blockMutex, &_attr);
    }

    ObjectImpl::ObjectImpl (ConnectionImpl *conn)
        :
        ConnectedObjectImpl (conn)
    {
      m_serviceRef = nullptr;
      m_maxNamedTag = NAMED_TAG_RANGE_START;
      flakeMutexAttr_t _attr;
      memset (&_attr, 0, sizeof (flakeMutexAttr_t));
      _attr.attr_bits = flakeMutexRecursive;
      flakeMutexNew (&m_blockMutex, &_attr);
    }

    ObjectImpl::~ObjectImpl ()
    {

      flakeMutexDelete (m_blockMutex);

      if (m_connection && m_addr != 0)
        {
          flake::ConnectionImpl::NodeList *nl = nullptr;
          /*    try {*/
          nl = m_connection->remove (m_addr);
          /*  } catch (std::exception e) {
              // don't care
          };*/
          if (nl && nl->size () > 1)
            {
#if FLAKE_DEBUG_LOGGING
              logging::logf<lvlError> (
                  "Destroyed Object [vId=%d] with %ld Children! shouldn't happen\n",
                  m_addr, nl->size ());
#endif
            }
          if (m_connection->isConnected ())
            {
              auto d = new Message (mstRequest, MessageType::destroyObject, m_addr, 0);
              m_connection->sendRequest (&d, 0);
            }
        }
    }


//---------------
//
//  Object
//
//---

    void
    ObjectImpl::init (addr_t addr, addr_t parentAddr,
                      const PropArray &params, addr_t broadcastAddr, bool read_only)
    {
      UNUSED (read_only); // this is for persistent objects only (which we're not doing in this version

      m_parentAddr = parentAddr;
      m_addr = addr;
      m_broadcastAddr = broadcastAddr;

      copyFrom (params);

      PropArray va;
#if FLAKE_NAMED_TAG_SUPPORT

      if (FLAKE_SUCCEEDED (getProperties (va)))
        {
          const Property& v = va.get(BaseProperties::PROP_MAPPINGS);
          updatePropMappings(v);
        }
      for (auto i = m_tagNameMappings.begin(); i != m_tagNameMappings.end(); ++i) {

          TagNameMapping& pm = *i;

          if (pm.tag == 0)
              pm.tag = (++m_maxNamedTag << 16);

      }

#endif
    }

    void ObjectImpl::postInit (PropArray &params)
    {

      PropArray va;
      va.clear ();
      TagArray ta (defaultPropset ());

      for (unsigned int i = 0; i < ta.numTags; i++)
        {
          if (!params.has (ta.tags[i]))
            {
              Property v (ta.tags[i]);
              va.set (v);
            }
        }

      int res = E_OK;

      if (va.count ())
        res = registerProperties (va);

      if (res == E_OK)
        {
          m_initialized = true;
          onInitialized ();
        }

    }

    void
    ObjectImpl::onConfirmation (Confirmation &conf)
    {

      switch (conf.message ())
        {
          case MessageType::setProperties:
            {
              for (uint32_t i = 0; i < conf.getData ().count (); i++)
                {
                  PropArray::set (conf.getData ()[i]);
                }
            }
          break;
          case MessageType::getProperties:
            {
              for (uint32_t i = 0; i < conf.getData ().count (); i++)
                {
                  PropArray::set (conf.getData ()[i]);
                }
              break;
            }

          case MessageType::joinGroup:
          case MessageType::leaveGroup:

            break;

        }

    }

    void
    ObjectImpl::connect (ConnectionImpl *c)
    {

      m_connection = c;
    }

    void
    ObjectImpl::disconnect ()
    {
      m_addr = EMPTY_ADDR;
      m_broadcastAddr = EMPTY_ADDR;
      m_initialized = false;
      m_connection = nullptr;
    }

    int8_t
    ObjectImpl::handleCustomMessage (string name, PropArray &data, PropArray &outProps)
    {

      if (m_serviceRef)
        return m_serviceRef->handleCustomMessage (name, data, outProps);

      return E_NOT_IMPL;
    }

    int8_t
    ObjectImpl::handleStreamData (Stream *s, byte *cb, uint32_t len)
    {

      if (m_serviceRef)
        m_serviceRef->handleStreamData (s, cb, len);

      return E_NOT_IMPL;
    }

    int
    ObjectImpl::getPropertyRequested (uint32_t tag, Property &value)
    {

      if (m_serviceRef)
        return m_serviceRef->getPropertyRequested (tag, value);

      return E_IGNORED;

    }

    int
    ObjectImpl::setPropertyRequested (uint32_t tag, Property &value, const PropArray &transaction, bool internal)
    {

      if (m_serviceRef)
        return m_serviceRef->setPropertyRequested (tag, value, transaction, internal);

      return E_IGNORED;

    }

    void
    ObjectImpl::onInitialized ()
    {
      if (m_serviceRef)
        m_serviceRef->onInitialized ();

    }

    Connection *
    ObjectImpl::connection ()
    {
      return m_connection;
    }

    void
    ObjectImpl::onIndication (Indication &indication)
    {

      switch (indication.message ())
        {

          case MessageType::customMsgReceived:
            {
              Property vMessage = indication.getData ().get (BaseProperties::MESSAGE_NAME);

              if (!vMessage.isErrorValue ())
                {
                  PropArray props;
                  int8_t res = handleCustomMessage (utils::stringFromChar (vMessage.str), indication.getData (), props);
                  indication.ack (res, props);
                }
              else
                indication.ack (E_FAILED);
            }
          break;
          case MessageType::getPropertiesReq:
            {
              PropArray &va = indication.getData ();
              int result = E_OK;
              unsigned int err = 0;
              unsigned int changes = 0;

              for (unsigned int i = 0; i < va.count (); i++)
                {
                  int res = E_OK;

                  if (this->has (va[i].tag))
                    {
                      va.set (PropArray::get (va[i].tag));
                    }
                  else
                    {
                      res = getPropertyRequested (va[i].tag, (Property &) va[i]);
                    }

                  if (res == E_OK)
                    {
                      PropArray::set (va[i]);
                      changes++;
                    }
                  else
                    {
                      va[i].tag |= TAG_ERROR;
                      ((Property &) va[i]).err = res;
                      err++;
                    }

                }
              if (err == va.count ())
                {
                  result = E_FAILED;
                }
              else if (err > 0)
                {
                  result = E_PARTIAL_SUCCESS;
                }

              indication.ack (result, va);

            }
          break;
          case MessageType::setPropertiesReq:
            {

              PropArray &va = indication.getData ();
              Property v;

              int8_t result = E_OK;
              unsigned int err = 0;
              unsigned int changes = 0;

              for (unsigned int i = 0; i < va.count (); i++)
                {
                  Property vIn = get (va[i].tag);
                  if (vIn.isErrorValue ())
                    {

                      v = va[i];
                      v.tag |= TAG_ERROR;
                      v.err = E_NOT_FOUND;
                      va.set (v);
                      err++;

                    }
                  if (vIn != va[i])
                    {
                      Property p = va[i];
                      int res = setPropertyRequested (va[i].tag, p, va, false);

                      if (res == E_OK)
                        {
                          PropArray::set (p);
                          changes++;
                        }
                      else
                        {
                          v = p;
                          v.tag |= TAG_ERROR;
                          v.err = res;
                          va.set (v);
                          err++;
                        }
                    }
                }
              if (err == va.count ())
                {
                  result = E_FAILED;
                }
              else
                {
                  if (changes == 0)
                    {
                      result = E_NO_CHANGES;
                    }
                  else if (err > 0)
                    {
                      result = E_PARTIAL_SUCCESS;
                    }
                }

              indication.ack (result, va);

            }
          break;

#if FLAKE_STREAM_SUPPORT
          case MessageType::createStreamReq:
      {

          PropArray vaIn = indication.getData();
          if (vaIn.has(BaseProperties::STREAM_NAME)) {

              string streamName = vaIn.get(BaseProperties::STREAM_NAME).str;
              uint16_t streamId = (uint16_t)m_streams.size(); //###TODO: Evtl. über counter

              StreamImpl* stream = new StreamImpl(m_connection, m_addr, indication.source(), streamId, streamName);

              m_streams.push_back(stream);

              PropArray va;

              Property v(BaseProperties::STREAM_ID);
              v.u16 = streamId;
              va.set(v);

              indication.ack(E_OK, va);

          } else {

              indication.ack(E_FAILED);

          }

      } break;

      case MessageType::streamDataReceived:
      {

          PropArray vaIn = indication.getData();
          if (vaIn.has(BaseProperties::STREAM_ID)) {

              Property vData = vaIn.get(BaseProperties::STREAM_DATA);

              if (vData.isErrorValue()) {

                  indication.ack(E_FAILED);

              } else {

                  uint16_t streamId = vaIn.get(BaseProperties::STREAM_ID).u16;

                  auto i = m_streams.begin();

                  for (; i != m_streams.end(); ++i) {
                      if ((*i)->streamId() == streamId) {
                          handleStreamData(*i, vData.bin.lpb,  vData.bin.cb);
                          break;
                      }
                  }

                  indication.ack(E_OK);
              }

          } else {

              indication.ack(E_FAILED);

          }


      } break;


      case MessageType::closeStreamReq:
      {

          PropArray vaIn = indication.getData();

          if (vaIn.has(BaseProperties::STREAM_ID)) {

              uint16_t streamId = vaIn.get(BaseProperties::STREAM_ID).u16;

              auto i = m_streams.begin();

              for (; i != m_streams.end(); ++i) {
                  if ((*i)->streamId() == streamId) {
                      m_streams.erase(i);
                      break;
                  }
              }

              indication.ack(E_OK);

          } else {

              indication.ack(E_FAILED);

          }

      } break;
#endif

          case MessageType::changed:
            {
              //### TODO: Prop-Cache aktualisieren!
              PropArray data = indication.getData ();
              for (uint32_t i = 0; i < data.count (); i++)
                PropArray::set (data[i]);
            }
          break;

        }

    }

    int
    ObjectImpl::invoke (const string command, PropArray &params,
                        bool blocking, Confirmation **conf)
    {

      return sendCustomMessage (command, params, blocking, conf, false);
    }

    int
    ObjectImpl::broadcast (const string command, PropArray &params)
    {
      return sendCustomMessage (command, params, false, 0, true);
    }

    int
    ObjectImpl::sendCustomMessage (const string messageName, PropArray &params, bool blocking, Confirmation **conf,
                                   bool broadcast)
    {

      if (!m_connection || !m_connection->isConnected ())
        return E_NOT_CONNECTED;

      int res = m_connection->validateMessageParameters (broadcast ? m_addr : m_parentAddr, broadcast ? m_broadcastAddr
                                                                                                      : m_addr,
                                                         MessageType::custom);

      if (res != E_OK)
        return E_FAILED;

      auto dgram = new Message (mstRequest, MessageType::custom, broadcast ? m_addr : m_parentAddr, broadcast
                                                                                                    ? m_broadcastAddr
                                                                                                    : m_addr);
      Property vMsg (BaseProperties::MESSAGE_NAME);
      vMsg.str = (tt_str_t) messageName.c_str ();
      params.set (vMsg);
      dgram->setData (params);

      if (broadcast)
        {
          blocking = false;

        }
      res = m_connection->sendRequest (&dgram, this, blocking, broadcast ? 50 : FLAKE_DEFAULT_TIMEOUT_MS);

      if (blocking)
        {
          if (conf != nullptr)
            {
              *conf = dgram;
            }
          else
            {
              delete dgram;
            }
          return res;
        }
      else if (broadcast)
        {
          return E_OK;
        }
      return res;

    }

    int
    ObjectImpl::getProperties (PropArray &properties)
    {

      if (!m_connection || !m_connection->isConnected ())
        return E_NOT_CONNECTED;

#if FLAKE_NAMED_TAG_SUPPORT
      for (unsigned long i = 0; i < properties.count(); i++)
          resolveProperty(properties[i]);
#endif

      int res = E_OK;

      PropArray outProps;

      uint32_t i = 0;
      size_t nv = properties.count ();

      for (; i < nv; i++)
        if (this->has (
            properties[i].tag))
          { // && !this->get(properties[i].tag).isErrorValue()) -> only meaningful if error could have been removed on the server by someone else...
            properties.set (PropArray::get (properties[i].tag));
          }
        else
          {
            outProps.set (properties[i]);
          }

      if (outProps.count () > 0)
        {

          res = m_connection->validateMessageParameters (m_parentAddr, m_addr, MessageType::getProperties);

          if (res == E_OK)
            {
              auto dgram = new Message (mstRequest, MessageType::getProperties, m_parentAddr, m_addr);
              dgram->setData (outProps);

              res = m_connection->sendRequest (&dgram, 0, true);

              if (res == E_OK)
                {
                  PropArray &data = dgram->getData ();
                  for (i = 0; i < data.count (); i++)
                    {
#if FLAKE_NAMED_TAG_SUPPORT
                      resolveProperty(data[i]);
#endif
                      properties.set (data[i]);
                    }

                  delete dgram;
                }
            }

        }
      if (res == E_OK)
        {
          for (i = 0; i < properties.count (); i++)
            {
              if (outProps.has (properties[i].tag))
                {
                  PropArray::set (properties[i]);
                }
            }
        }

      return res;

    }

    void
    ObjectImpl::getPendingProperty (Property &property)
    {

      PropArray va;
      va.set (property);

      if (getPendingProperties (va) != E_FAILED)
        {
          property.copyFrom (va[0]);
        }
      else
        {
          property.tag |= TAG_ERROR;
          property.err = E_NOT_FOUND;
        }

    }

    int
    ObjectImpl::getPendingProperties (PropArray &properties)
    {

      unsigned error_count = 0;
      unsigned tid = flakeThreadId ();

      flakeMutexAcquire (m_blockMutex);

      if (m_blockUpdateValues.find (tid) == m_blockUpdateValues.end ())
        {
          flakeMutexRelease (m_blockMutex);
          return E_FAILED;
        }

      for (unsigned i = 0; i < properties.count (); i++)
        {
          if (!m_blockUpdateValues[tid].has (properties[i].tag))
            {
              Property errProp;
              errProp.tag = properties[i].tag |= TAG_ERROR;
              errProp.err = E_NOT_FOUND;
              properties.set (errProp);
              error_count++;
            }
          else
            {
              m_blockUpdateValues[tid].get (properties[i].tag);
            }
        }
      flakeMutexRelease (m_blockMutex);
      if (error_count == 0)
        {
          return E_OK;
        }
      if (error_count < properties.count ())
        {
          return E_PARTIAL_SUCCESS;
        }

      return E_NOT_FOUND;

    }

    int ObjectImpl::registerProperties (PropArray &properties)
    {
      return setPropertiesEx (properties, false);
    }

    int ObjectImpl::setProperties (PropArray &properties, int timeout_ms)
    {
      if (timeout_ms == 0)
        {
          return setPropertiesEx (properties, true);
        }
      else
        {
          return setPropertiesEx (properties, false, timeout_ms);
        }
    }

    int ObjectImpl::setPropertiesEx (PropArray &properties, bool dontBlock, int timeout_ms)
    {

#if FLAKE_NAMED_TAG_SUPPORT
      bool mappingChanged = false;
      for (unsigned long i = 0; i < properties.count(); i++)
          mappingChanged = !resolveProperty(properties[i]);
#endif

      if (!m_connection || m_addr == EMPTY_ADDR || !m_connection->isConnected ())
        return E_NOT_CONNECTED;

      Message *dgram;

      int res = E_FAILED;

      res = m_connection->validateMessageParameters (m_parentAddr, m_addr,
                                                     MessageType::setProperties);

      if (res != E_OK)
        return res;

      ConnectionImpl::Node *n = m_connection->findFirst (m_addr, false);
      if (n && n->group != 0)
        {
          dgram = new Message (mstRequest, MessageType::setProperties, m_addr, m_addr);
        }
      else
        {
          dgram = new Message (mstRequest, MessageType::setProperties, m_parentAddr, m_addr);
        }
      uint32_t i;

      for (i = 0; i < properties.count (); i++)
        {
          Property vIn = get (properties[i].tag);
          if (vIn != properties[i])
            {
              if (IS_VOLATILE (vIn.tag))
                {
                  PropArray::set (vIn);
                  ((Property &) properties[i]).flags |= DONT_SERIALIZE_FLAG;
                }
              else
                {
                  ((Property &) properties[i]).flags &= ~DONT_SERIALIZE_FLAG;
                }
              dgram->setData (properties[i]);
            }

        }

#if FLAKE_NAMED_TAG_SUPPORT
      if (mappingChanged)
          dgram->setData(generatePropMappings());
#endif

      if (dgram->getData ().count () == 0)
        {
          delete dgram;
          return E_NO_CHANGES;
        }

      res = m_connection->sendRequest (&dgram, this, !dontBlock, timeout_ms);

      if (!dontBlock)
        {
          if (res == E_OK || res == E_PARTIAL_SUCCESS)
            {
              for (i = 0; i < dgram->getData ().count (); i++)
                {
                  Property vRes = dgram->getData ()[i];
                  if (!vRes.isErrorValue ())
                    {
                      for (unsigned j = 0; j < properties.count (); j++)
                        {
                          if (TAG_ID (properties[j].tag) == TAG_ID (vRes.tag))
                            {
                              if ((properties[j].flags & DONT_CACHE_FLAG) == 0)
                                {
                                  PropArray::set (vRes);
                                }
                              break;
                            }
                        }
                    }
                  else if (vRes.err == E_OK)
                    {
                      for (unsigned j = 0; j < properties.count (); j++)
                        {
                          if (TAG_ID (properties[j].tag) == TAG_ID (vRes.tag))
                            {
                              if ((properties[j].flags & DONT_CACHE_FLAG) == 0)
                                {
                                  PropArray::set (properties[j]);
                                }
                              break;  // also if result didn't contain prop at all!
                            }
                        }
                    }
                }
            }
          if (res != E_TIMEOUT && res != E_NOT_CONNECTED)
            delete dgram;

        }

      if (res == E_NO_ALLOC)
        delete dgram;

      return res;

    }

    int
    ObjectImpl::setProperty (const Property &property)
    {

#if FLAKE_NAMED_TAG_SUPPORT
      if (TAG_ID(property.tag) == 0 && property.name.length() > 0)
          property.tag = tagFromName(property.name.c_str());
#endif

      if (property.tag == 0)
        return E_NOT_FOUND;

      unsigned tid = flakeThreadId ();
      flakeMutexAcquire (m_blockMutex);
      if ((m_blockUpdateRunning.find (tid) != m_blockUpdateRunning.end ()) && m_blockUpdateRunning[tid])
        {
          m_blockUpdateValues[tid].set (property);
          flakeMutexRelease (m_blockMutex);
          return E_PENDING;
        }
      else
        {
          flakeMutexRelease (m_blockMutex);
          PropArray va;
          va.set (property);
          int res = setProperties (va);
          if (res != E_OK && res != E_PARTIAL_SUCCESS)
            return res;

          Property v = va.get (property.tag);
          if (v.isErrorValue ())
            return v.err;

          return E_OK;

        }

    }

    void
    ObjectImpl::getProperty (Property &property)
    {

      PropArray va;
      va.set (property);

      getProperties (va);

      property.copyFrom (va[0]);

    }

    void
    ObjectImpl::beginUpdate ()
    {
      unsigned tid = flakeThreadId ();
      flakeMutexAcquire (m_blockMutex);
      if (m_blockUpdateRunning.find (tid) == m_blockUpdateRunning.end ())
        {
          m_blockUpdateRunning[tid] = true;
        }
      flakeMutexRelease (m_blockMutex);
    }

    int8_t
    ObjectImpl::commitUpdate (int timeout_ms)
    {

      return commitUpdate (NULL, timeout_ms);

    }

    int8_t
    ObjectImpl::commitUpdate (PropArray *result, int timeout_ms)
    {
      unsigned tid = flakeThreadId ();
      flakeMutexAcquire (m_blockMutex);
      if (m_blockUpdateRunning.find (tid) == m_blockUpdateRunning.end ())
        {
          flakeMutexRelease (m_blockMutex);
          return E_IGNORED;
        }

      if ((m_blockUpdateRunning[tid]) && (m_blockUpdateValues[tid].count () > 0))
        {
          int res = setProperties (m_blockUpdateValues[tid], timeout_ms);
          if (result)
            result->copyFrom (m_blockUpdateValues[tid]);
          m_blockUpdateValues[tid].clear ();
          m_blockUpdateValues.erase (tid);
          m_blockUpdateRunning.erase (tid);
          flakeMutexRelease (m_blockMutex);
          return res;
        }
      flakeMutexRelease (m_blockMutex);
      return E_IGNORED;

    }

    int ObjectImpl::subscribe (std::function<void (Indication &)> f)
    {

      auto ci = dynamic_cast<ConnectionImpl *>(m_connection);

      if (ci)
        return ci->subscribeObject (this, f);

      return E_FAILED;

    }

    void ObjectImpl::unsubscribe ()
    {
      auto ci = dynamic_cast<ConnectionImpl *>(m_connection);

      if (ci)
        ci->unsubscribeObject (this);

    }

#if FLAKE_STREAM_SUPPORT
    int
    ObjectImpl::openStream (string name, Stream **stream)
    {

      auto ci = dynamic_cast<ConnectionImpl *>(m_connection);

      if (ci)
        return ci->openStream (this, name, stream);

      return E_FAILED;

    }
#endif

    const char *
    ObjectImpl::getObjectType ()
    {
      return TYPE_UUID_SERVICE;
    }

    TagArray
    ObjectImpl::defaultPropset ()
    {
      if (m_serviceRef)
        return m_serviceRef->defaultPropset ();

      TagArray ta;
      return ta;
    }

    bool
    ObjectImpl::isInitialized ()
    {
      return m_initialized;
    }

#if FLAKE_STREAM_SUPPORT

    StreamImpl::StreamImpl (ConnectionImpl *conn, addr_t src, addr_t dst, uint16_t streamId, string name)
        :
        m_connection (conn), m_src (src), m_dst (dst), m_streamId (streamId), m_streamName (name)
    {
      m_closed = false;

    }

    StreamImpl::~StreamImpl ()
    {

      if (!m_closed)
        close ();

    }

    int StreamImpl::write (byte *data, unsigned int len)
    {

      if (m_closed)
        return E_FAILED;
      if (!m_connection || !m_connection->isConnected ())
        return E_NOT_CONNECTED;

      int res = m_connection->validateMessageParameters (m_src, m_dst, MessageType::streamData);

      if (res == E_OK)
        {

          auto dgram = new Message (mstRequest, MessageType::streamData, m_src, m_dst);

          PropArray props;
          Property v;

          v.tag = BaseProperties::STREAM_ID;
          v.u16 = m_streamId;
          props.set (v);

          v.tag = BaseProperties::STREAM_DATA;
          v.bin.cb = len;
          v.bin.lpb = (byte *) malloc (v.bin.cb);
          memcpy (v.bin.lpb, data, len);
          props.set (v);
          dgram->setData (props);

          res = m_connection->sendRequest (&dgram, 0, true);

        }

      return res;

    }

    int StreamImpl::read (byte **data, unsigned int *len)
    {

      if (m_closed)
        return E_FAILED;
      if (!m_connection || !m_connection->isConnected ())
        return E_NOT_CONNECTED;

      Property *v;

      m_readQueue.wait_and_pop (&v);

      *data = (byte *) malloc (v->bin.cb);
      memcpy (*data, v->bin.lpb, v->bin.cb);
      *len = v->bin.cb;

      delete v;

      return E_OK;

    }

    void StreamImpl::close ()
    {

      if (!m_connection || !m_connection->isConnected ())
        return;

      if (E_OK == m_connection->validateMessageParameters (m_src, m_dst, MessageType::closeStream))
        {
          auto dgram = new Message (mstRequest, MessageType::closeStream, m_src, m_dst);

          PropArray props;
          Property v;

          v.tag = BaseProperties::STREAM_ID;
          v.u16 = m_streamId;
          props.set (v);
          dgram->setData (props);

          if (m_connection->sendRequest (&dgram, 0, false) == E_OK)
            m_closed = true;

        }

    };

    void StreamImpl::pushReadQueue (Property *v)
    {

      m_readQueue.push (v);

    }
#endif

#if FLAKE_NAMED_TAG_SUPPORT

    //---------------
    //
    //  Name / Tag Mapping
    //
    //---

    bool ObjectImpl::namesEqual(const char* a, const char* b) {

        long hashA = strchr(a, UNIT_SEPARATOR_CHR) - a;
        long hashB = strchr(b, UNIT_SEPARATOR_CHR) - b;

        if (hashA >= 0) {
            if (hashB >= 0) {
                if (hashA == hashB && strncmp(a,b, hashA) == 0)
                    return true;
            } else {
                if ((long)strlen(b) == hashA && strncmp(a,b,hashA) == 0)
                    return true;
            }
        } else {
            if (hashB >= 0) {
                if ((long)strlen(a) ==  hashB && strncmp(a,b, hashB) == 0)
                    return true;
            } else {
                if ((long)strlen(b) == (long)strlen(a) && strcmp(a,b) == 0)
                    return true;
            }
        }
        return false;
    }



    uint32_t
    ObjectImpl::tagFromName(const char* name) {

        auto i = m_tagNameMappings.begin();

        for (; i != m_tagNameMappings.end(); ++i) {

            if (namesEqual((*i).name, name)) {
                return (*i).tag;
            }
        }

        return 0;

    }

    const char*
    ObjectImpl::nameFromTag(uint32_t tag) {


        auto i = m_tagNameMappings.begin();

        for (; i != m_tagNameMappings.end(); ++i) {

            if ((*i).tag == tag) {
                return (*i).name;
            }
        }

        return 0;

    }

    uint32_t
    ObjectImpl::createNamedProperty(const char* name, uint16_t type) {

        uint32_t tag = tagFromName(name);

        if (tag != 0)
            return tag;

        tag = (++m_maxNamedTag << 16) | type;


        if (m_connection->validateMessageParameters(m_parentAddr, m_addr, MessageType::createProperty) == E_OK) {

            auto dgram = new Message(mstRequest, MessageType::createProperty, m_parentAddr, m_addr);

            PropArray va;

            Property vName(BaseProperties::PROP_NAME);
            vName.str = utils::copyStr(name);
            va.set(vName);

            Property vProp(BaseProperties::PROP_TAG);
            vProp.u32 = tag;
            va.set(vProp);

            dgram->setData(va);

            if (m_connection->sendRequest(&dgram, this, true) == E_OK) {

                Property vMappings = dgram->getData().get(BaseProperties::PROP_MAPPINGS);

                if (!vMappings.isErrorValue())
                    updatePropMappings(vMappings);
                else
                    return 0;

            }

        } else
            return 0;

        return tag;
    }

    Property
    ObjectImpl::generatePropMappings() {

        Property v(BaseProperties::PROP_MAPPINGS);    //### Dealloc self needs to be set!!
        v.arr.cValueSize = sizeof(TagNameMapping);
        v.arr.numValues = m_tagNameMappings.size();
        v.arr.valueType = 0;
        v.arr.lpValues = (flake::byte*) malloc(m_tagNameMappings.size() * sizeof(TagNameMapping));
        for (unsigned long i = 0; i < m_tagNameMappings.size(); i++)
            memcpy(v.arr.lpValues + (i * sizeof(TagNameMapping)), &m_tagNameMappings[i], sizeof(TagNameMapping));

        return v;

    }

    void
    ObjectImpl::updatePropMappings(Property v) {
        if(!v.isErrorValue()) {

            TagNameMapping pm;

             for (uint32_t i = 0; i < v.arr.count(); i++) {
                memcpy(&pm, (uint32_t*)v.arr[0] + (i * sizeof(TagNameMapping)), sizeof(TagNameMapping));
                if (tagFromName(pm.name) == 0)
                    m_tagNameMappings.push_back(pm);
                else {
                    for (uint32_t j = 0; j < m_tagNameMappings.size(); j++) {
                        if(namesEqual(m_tagNameMappings[j].name, pm.name)) {
                            m_tagNameMappings[j].tag = pm.tag;
                        }
                    }
                }


                m_maxNamedTag = max<uint16_t>(TAG_ID(pm.tag), m_maxNamedTag);
            }
        }
    }

    bool
    ObjectImpl::resolveProperty(const Property& property) {

        if (property.name.length() > 0) {
            uint32_t tag = tagFromName(property.name.c_str());
            if (TAG_ID(tag) == 0) {
                property.tag = createNamedProperty(property.name.c_str(), property.tag);
                return false;
            }
            property.tag = tag;
        } else {
            property.name = utils::stringFromChar(nameFromTag(property.tag));
        }
        return true;

    }

#endif
}