Skip to content

file /Users/ios_developer/workspace/coldwave-os/build/_deps/flake-src/routing/Router.cpp

Defines

Name
ROUTER_SEND_THREAD_NAME

Macros Documentation

define ROUTER_SEND_THREAD_NAME

cpp
#define ROUTER_SEND_THREAD_NAME "rtr_snd_thread"

Source code

cpp
/*
 * Routerequest.cpp
 *
 *  Created on: 18.10.2010
 *      Author: root
 */
#include <routing/Router.h>
#include <protocol/Message.h>
#include <flake.h>
#include <TableImpl.h>
#include <string.h>
#include <routing/IPCConnection.h>

using namespace flake;
using namespace std;

#if FLAKE_NAMED_TAG_SUPPORT

//---------------
//
//  Name / Tag Mapping
//
//---


bool namesEqual(const char* a, const char* b) {

    long hashA = strchr(a, UNIT_SEPARATOR_CHR) - a;
    long hashB = strchr(b, UNIT_SEPARATOR_CHR) - b;

    if (hashA >= 0) {
        if (hashB >= 0) {
            if (hashA == hashB && strncmp(a,b, hashA) == 0)
                return true;

        } else {
            if ((long)strlen(b) == hashA && strncmp(a,b,hashA) == 0)
                return true;

        }
    } else {
        if (hashB >= 0) {
            if ((long)strlen(a) ==  hashB && strncmp(a,b, hashB) == 0)
                return true;

        } else {
            if ((long)strlen(b) == (long)strlen(a) && strcmp(a,b) == 0)
                return true;

        }
    }
    return false;
}



uint32_t tagFromName(ServersideObject* sfo, const char* name) {

    auto i = sfo->mappings.begin();

    for (; i != sfo->mappings.end(); ++i) {

        if (namesEqual((*i).name, name)) {
            return (*i).tag;
        }
    }

    return 0;

}

#endif

#define ROUTER_SEND_THREAD_NAME "rtr_snd_thread"

//---
//
// Router
//
//---
Router::Router ()
    : m_authSink (nullptr), m_connections ()
{

  flakeMutexNew (&m_idMutex, nullptr);
  flakeMutexNew (&m_asynchRespmapMutex, nullptr);
  flakeMutexNew (&m_pvm_mutex, nullptr);
  flakeMutexNew (&m_runMutex, nullptr);

  flakeMutexAttr_t _attr;
  memset (&_attr, 0, sizeof (flakeMutexAttr_t));
  _attr.attr_bits = flakeMutexRecursive;
  flakeMutexNew (&m_connMutex, &_attr);

  flakeSemaphoreNew (&m_runCond, 1, 1);

  m_isShutdown = false;
  m_doShutdown = false;
  m_isRunning = false;

  //### mhh.. this is just so transportImpl can treat internal and exteral queues equally, but... not good
  m_dgramOutQueue.ref ();

  auto iter_a = m_asynchResponseMap.begin ();

  for (; iter_a != m_asynchResponseMap.end (); ++iter_a)
    {

      iter_a->token = 0;
      iter_a->creationTime = 0;
      iter_a->pendingRequest = nullptr;
      iter_a->refCount = 0;
      iter_a->sink = nullptr;

    }

  m_nextConnectionId = 0;
  m_nextObjectId = 1;
  m_nextToken = (uint16_t) random ();
  auto *so = new ServersideObject (m_rootValues);

  m_rootGroup = nextObjectId ();
  m_rootNode = m_registry.setRoot (0, m_rootGroup, *so);
  so->ref ();

  flakeThreadAttr_t attr;

  memset (&attr, 0, sizeof (flakeThreadAttr_t));
  attr.name = ROUTER_SEND_THREAD_NAME;
  attr.stack_size = 2048;
  attr.priority = flakeThreadPriorityNormal;

  flakeThreadNew (&sendThread, &Router::sendThreadFunc, this, &attr);

}

void Router::setAuthenticationSink (AuthenticationSink *a)
{

  m_authSink = a;

}

uint16_t Router::nextToken ()
{
  addr_t res;
  flakeMutexAcquire (m_idMutex);

  auto iter = m_tokens.lower_bound (m_nextToken);
  while (iter != m_tokens.end () && *iter == m_nextToken)
    {
      ++iter;
      ++m_nextToken;
    }
  res = m_nextToken++;
  m_tokens.insert (res);
  flakeMutexRelease (m_idMutex);
  return res;
}

void
Router::releaseToken (addr_t token)
{
  flakeMutexAcquire (m_idMutex);
  m_tokens.erase (token);
  flakeMutexRelease (m_idMutex);

}

void Router::releaseObjectId (addr_t id)
{
  flakeMutexAcquire (m_idMutex);
  m_objectIds.erase (id);
  flakeMutexRelease (m_idMutex);
}

addr_t Router::nextObjectId ()
{

  addr_t res;
  flakeMutexAcquire (m_idMutex);

  auto iter = m_objectIds.lower_bound (m_nextObjectId);
  while (iter != m_objectIds.end () && *iter == m_nextObjectId)
    {
      ++iter;
      ++m_nextObjectId;
    }
  res = m_nextObjectId++;
  m_objectIds.insert (res);
  flakeMutexRelease (m_idMutex);
  return res;
}

void Router::disconnect (ServerConnection *c)
{

  ObjectRegistry::NodeList *nl = m_registry.findAll (m_rootGroup);

  auto i = nl->begin ();

  addr_t id = 0;

  for (; i != nl->end (); ++i)
    {
      if ((*i)->ref->c == c)
        {
          id = (*i)->id;
        }
      (*i)->ref->unref ();
    }

  if (id != 0)
    unregisterObject (id);

  delete nl;

  flakeMutexAcquire (m_connMutex);

  auto j = m_connections.begin ();

  for (; j != m_connections.end ();)
    {
      if (*j == c)
        {
          j = m_connections.erase (j);
        }
      else
        j++;
    }

  flakeMutexRelease (m_connMutex);
#if FLAKE_DEBUG_LOGGING
  logging::logf<lvlInfo> ("connection %04x ended\n", c->id ());
#endif
  auto cImpl = dynamic_cast<ServerConnectionImpl *>(c);

  if (cImpl)
    cImpl->unref ();  

  m_registry.dumpDirectory ();

}

bool Router::registerObject (addr_t id, addr_t parentId, addr_t broadcastId, PropArray &va,
                             Transport *c, bool requires_auth)
{

  bool isWritable = true;

  auto obj = new ServersideObject (va, c, requires_auth);
  obj->ref ();

  ObjectRegistry::Node *n;

  if (m_registry.add (parentId, id, broadcastId, *obj, &n) == E_OK)
    {
      obj->node = n;
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - registered object %06X as child of %06X\n", id,
                             parentId);
#endif
      auto parentObj = m_registry.findFirst (parentId, true);
      if (parentObj && parentObj->id != 0)
        {
          PropArray vaIndi;
          Property v (BaseProperties::OBJECT_ADDR);
          v.u16 = id;
          vaIndi.set (v);
          // vaIndi.set(v);
          Property v2 (BaseProperties::OBJECT_TYPE);
          v2 = va.get (BaseProperties::OBJECT_TYPE);
          if (!v2.isErrorValue ())
            vaIndi.set (v2);
          indicate (MessageType::objectCreated, parentId, m_rootGroup, &vaIndi); // war parentObj->group statt rootGroup
          parentObj->ref->unref ();
        }
    }
  else
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - parent %06X not found!\n", parentId);
#endif
    }

  return isWritable;

}

void Router::unregisterObject (addr_t id)
{
#if FLAKE_DEBUG_LOGGING
  uint32_t childCount = 0;
  logging::logf<lvlRaw> (" - unregistering object %06X\n", id);
#endif
  ObjectRegistry::NodeList *children = m_registry.remove (id);

  if (children != nullptr)
    {
#if FLAKE_DEBUG_LOGGING
      childCount = children->size ();
#endif

      while (!children->empty ())
        {
          ObjectRegistry::Node *ch = children->front ();

          //ch->ref->unref (); let's remove this since we're going to destroy it anyways
          //### here was a race condition and we're not unrefing correctly
          //so in general the code needs some deeper inspection despite this probably working ok now

          vector<addr_t> groupMemberships = m_registry.findAllGroupMemberships (ch->id);

          for (unsigned long j = 0; j < groupMemberships.size (); j++)
            {
              ungroup (ch->id, groupMemberships[j]);
            }

          indicate (MessageType::destroyed, ch->id, ch->group, nullptr);

          //ch->ref->unref ();  temporary, experimental
          delete ch->ref;

          releaseObjectId (ch->id);
          releaseObjectId (ch->group);

          delete ch;  // counterpart to ObjectTree::add

          children->pop_front ();

        }

      delete children;
    }

//  releaseObjectId(id);
#if FLAKE_DEBUG_LOGGING
  logging::logf<lvlRaw> (" - unregistered object %06X (and %ld children)\n", id, childCount - 1);
#endif
}

addr_t Router::group (addr_t objectId, addr_t groupId)
{

  addr_t realGroupId = m_registry.group (objectId, groupId);

  if (realGroupId == 0)
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - requested object (%06X) doesn't exist\n", objectId);
#endif
      return 0;
    }
  else if (realGroupId != groupId)
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - object %06X joined object %06X's group (%06X)\n", objectId, groupId, realGroupId);
#endif
    }
  else
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - object %06X joined group %06X\n", objectId, groupId);
#endif
    }
  indicate (MessageType::joined, objectId, realGroupId);

  return realGroupId;
}

void Router::ungroup (addr_t objectId, addr_t groupId)
{

  m_registry.ungroup (objectId, groupId);

  indicate (MessageType::left, objectId, groupId);
#if FLAKE_DEBUG_LOGGING
  logging::logf<lvlRaw> (" - object %06X ungrouped\n", objectId);
#endif
}

void Router::indicateDisconnected (uint8_t messageType, Transport *t, PropArray *data, ResponseSink *sink,
                                   Message *req, PropArray *pendingData,
                                   void *assoc_data)
{

  auto *d = new Message ();

  d->header.mt = DATAGRAM_MAKE_INDICATION(messageType);
  d->header.dst = Object::EMPTY_ADDR;
  d->header.src = 0;
  d->header.res = 0;
  d->header.token = nextToken ();

  if (data)
    d->setData (*data);

  if (sink != nullptr)
    {
      flakeMutexAcquire (m_asynchRespmapMutex);

      auto iter = m_asynchResponseMap.begin ();

      bool assigned = false;
      for (; iter != m_asynchResponseMap.end (); ++iter)
        {
          if ((*iter).sink == nullptr)
            {
              (*iter).token = d->header.token;
              (*iter).sink = sink;
              (*iter).creationTime = utils::timestamp ();
              (*iter).pendingRequest = req;
              (*iter).associatedData = assoc_data;

              if (pendingData)
                (*iter).pendingData.copyFrom (*pendingData);
              req->retain ();
              assigned = true;
              break;
            }
        }

      flakeMutexRelease (m_asynchRespmapMutex);

      if (!assigned)
        {
          req->ack (E_NO_ALLOC);
          delete d;
          delete req;
          return;
        }
    }

#if FLAKE_DEBUG_LOGGING
  logging::logf<lvlRaw> (" - indication for unconnected client sent to Wire 0x%04X\n", t->wireId ());
#endif

  t->pushQueue (d);

  if (req && req->clientCount () == 0)
    {
      if (pendingData && pendingData->count () > 0)
        req->ack (E_OK, *pendingData);
      else
        req->ack (E_OK);
      delete req;
    }
}

void Router::indicate (uint8_t messageType, addr_t source,
                       addr_t destination, PropArray *data, ResponseSink *sink, Message *req, PropArray *pendingData,
                       addr_t exclude_addr)
{

  ObjectRegistry::NodeList *nl = m_registry.findAll (destination);

  deque<Transport *> served_transports;

  if (m_registry.isInGroup (source, destination) && nl->size () == 1)
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - src (%d) == dst (%d) - 0 other members in group -> no indication\n", source, destination);
#endif
      for (auto i = nl->begin (); i != nl->end (); ++i)
        {
          (*i)->ref->unref ();
        }
      delete nl;
      if (req)
        {
          req->ack (E_OK);
          delete req;
        }
      return;
    }
  else if (nl->size () == 0)
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> (" - destination (%d) unreachable\n", destination);
#endif
      delete nl;
      if (req)
        {
          req->ack (E_DESTINATION_UNREACHABLE);
          delete req;
        }
      return;
    }

  auto i = nl->begin ();
  for (; i != nl->end (); ++i)
    {
      if (!m_registry.has (*i) ||
          (*i)->id == exclude_addr ||
          ((*i)->id == source && nl->size () > 1))
        {
          (*i)->ref->unref ();
          continue;
        }

      auto *d = new Message ();

      d->header.mt = DATAGRAM_MAKE_INDICATION(messageType);
      d->header.dst = destination;
      d->header.src = source;
      d->header.res = 0;
      d->header.token = nextToken ();

      if (data)
        {
          d->setData (*data);
        }

      if (sink != nullptr)
        {
          flakeMutexAcquire (m_asynchRespmapMutex);

          auto iter = m_asynchResponseMap.begin ();

          bool assigned = false;
          for (; iter != m_asynchResponseMap.end (); ++iter)
            {
              if ((*iter).sink == nullptr)
                {
                  (*iter).token = d->header.token;
                  (*iter).sink = sink;
                  (*iter).creationTime = utils::timestamp ();
                  (*iter).pendingRequest = req;
                  (*iter).associatedData = nullptr;

                  if (pendingData)
                    (*iter).pendingData.copyFrom (*pendingData);
                  req->retain ();
                  assigned = true;
                  break;
                }
            }

          flakeMutexRelease (m_asynchRespmapMutex);

          if (!assigned)
            {
              if (req->clientCount () == 0)
                {
                  req->ack (E_NO_ALLOC);
                  delete req;
                }
              releaseToken(d->header.token);
              delete d;
              (*i)->ref->unref ();
              for (; i != nl->end (); ++i)
                {
                  (*i)->ref->unref ();
                }
              delete nl;
              return;
            }
        }

      if ((*i)->ref->c)
        {
#if FLAKE_DEBUG_LOGGING
          logging::logf<lvlRaw> (" - indication for %06X sent to Wire 0x%04X\n", destination, (*i)->ref->c->wireId ());
#endif

          bool already_served = false;

          for (auto iTrans = served_transports.begin (); iTrans != served_transports.end (); ++iTrans)
            {
              if (*iTrans == (*i)->ref->c)
                {
                  releaseToken(d->header.token);
                  delete d;
                  (*i)->ref->unref ();
                  already_served = true;
                  break;
                }
            }

          if (!already_served)
            {
              served_transports.push_back ((*i)->ref->c);
              (*i)->ref->c->pushQueue (d);
              (*i)->ref->unref ();
            }
        }
      else
        {
          releaseToken(d->header.token);
          delete d;
          (*i)->ref->unref ();
        }

    }

  if (req && req->clientCount () == 0)
    {
      if (pendingData && pendingData->count () > 0)
        req->ack (E_OK, *pendingData);
      else
        req->ack (E_OK);
      delete req;
    }

  delete nl;

}

void Router::addClient (ServerConnection *c)
{

  flakeMutexAcquire (m_connMutex);
  auto i = m_connections.begin ();
  ServerConnection *duplicate = nullptr;

  for (; i != m_connections.end (); ++i)
    {
      ServerConnection *conn = *i;
      if (c == conn)
        {
          duplicate = conn;
          break;
        }
    }
  flakeMutexRelease (m_connMutex);

  if (duplicate)
    {
      duplicate->shutdown ();
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlInfo> ("incoming IPC connection replaces existing one\n");
#endif
    }

  c->ref ();

  flakeMutexAcquire (m_connMutex);
  m_connections.push_back (c);
  flakeMutexRelease (m_connMutex);
#if FLAKE_DEBUG_LOGGING
  if (!duplicate)
    logging::logf<lvlInfo> ("incoming connection on %d - id%04x\n", c->wireId (), c->id ());

#endif

}

void Router::sendThreadFunc (void *arg)
{

  auto r = static_cast<Router *> (arg);

  while (!r->m_doShutdown)
    {

      Message *dgram = nullptr;

      r->m_dgramOutQueue.wait_and_pop (&dgram);

      if (dgram != nullptr)
        {

          dgram->send ();
          delete dgram;

        }

    }

  flakeThreadExit ();

}

#ifdef FLAKE_EXPERIMENTAL
ServerConnection* Router::addClient(Wire* w, bool spawnOwnThread)  {

    flakeMutexAcquire(m_connMutex);
    auto i = m_connections.begin();
    ServerConnection* duplicate = 0;

    for (; i != m_connections.end();++i)
    {
        ServerConnection* conn = *i;
        if (w && conn->wire() && conn->wire()->getId() == w->getId()) {
            duplicate = conn;
            break;
        }
    }
    flakeMutexRelease(m_connMutex);

    if (duplicate) {
// duplicate->shutdown(); ### could happen twice with stale-check
        logging::logf<lvlInfo>("incoming connection on %d replaces id%04x as id%04x\n", w->getId(), duplicate->id(), m_nextConnectionId);
    }

    ServerConnection* c;

    if (spawnOwnThread) {
        c = new ServerConnectionImpl(w, this,  m_dgramInQueue, nullptr, ++m_nextConnectionId);
    } else {
        c = new ServerConnectionImpl(w, this,  m_dgramInQueue, &m_dgramOutQueue, ++m_nextConnectionId);
    }

    c->ref();

    flakeMutexAcquire(m_connMutex);
    m_connections.push_back(c);
    flakeMutexRelease(m_connMutex);

    if (!duplicate)
        logging::logf<lvlInfo>("incoming connection on %d - id%04x\n", w->getId(), c->id());

    return c;
}
#else

ServerConnection *Router::addClient (Wire *w)
{

  flakeMutexAcquire (m_connMutex);
  auto i = m_connections.begin ();
  ServerConnection *duplicate = 0;

  for (; i != m_connections.end (); ++i)
    {
      ServerConnection *conn = *i;

      if (w && conn->wireId () == w->getId ())
        {

          duplicate = conn;
          break;
        }
    }
  flakeMutexRelease (m_connMutex);

  if (duplicate)
    {
// duplicate->shutdown(); ### could happen twice through stale check
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlInfo> ("incoming connection on %d replaces id%04x as id%04x\n", w->getId (), duplicate->id (), m_nextConnectionId);
#endif
    }

  ServerConnection *c = new ServerConnectionImpl (w, this, m_dgramInQueue, nullptr, m_nextConnectionId++);

  c->ref ();  // unref @ Router.cpp:239


  dynamic_cast<TransportImpl *>(c)->start ();

  flakeMutexAcquire (m_connMutex);
  m_connections.push_back (c);
  flakeMutexRelease (m_connMutex);
#if FLAKE_DEBUG_LOGGING
  if (!duplicate)
    logging::logf<lvlInfo> ("incoming connection on %d - id%04x\n", w->getId (), c->id ());
#endif
  return c;
}

#endif

int8_t Router::onResponse (const uint8_t messageType, int8_t result,
                           addr_t sender, addr_t destination, const PropArray &params, const Token t, Transport *transport,
                           addr_t pendingSender, void *assoc_data)
{
#if !FLAKE_AUTH_SUPPORT
  UNUSED(assoc_data);
#endif
  UNUSED(destination);
  UNUSED(transport);
  UNUSED(t);

  switch (messageType)
    {

      case MessageType::ping:
        {
          return result;
        }

      case MessageType::customMsgReceived:
        {
          return result;
        }
#if FLAKE_AUTH_SUPPORT
      case MessageType::auth:
    {
      if (m_authSink != nullptr)
        {
          Property pSign = params.get (BaseProperties::SIGNATURE);

          if (!pSign.isErrorValue ())
            {
              struct assoc_auth_data *ad = (struct assoc_auth_data *) assoc_data;

              if (0 == m_authSink->onAuthResponseReceived (ad->sign_algorithm, ad->challenge, ad->challenge_len,
                                                           pSign.bin.lpb, pSign.bin.cb))
                {
                  auto conn = dynamic_cast<ServerConnectionImpl *>(transport);
                  conn->setAuthenticated (true);
                  return E_OK;
                }
            }
        }
      return E_FAILED;
    }
#endif
      case MessageType::getPropertiesReq
        :
        {

          ObjectRegistry::Node *n = m_registry.findFirst (sender);
          if (n)
            {

              if (result == E_OK)
                {
                  for (uint32_t i = 0; i < params.count (); i++)
                    {
                      if (!params[i].isErrorValue ())
                        {
                          n->ref->set (params[i]);
                        }
                      else
                        {
                          result = E_PARTIAL_SUCCESS;
                        }
                    }
                }
              n->ref->unref ();

              return result;
            }
          return E_NOT_FOUND;

        }
      break;

      case MessageType::setPropertiesReq:
        {

          ObjectRegistry::Node *n = m_registry.findFirst (sender);
          if (n)
            {

              if (result == E_IGNORED)
                result = E_OK;

              PropArray vaIndi;

              uint16_t pc = params.count ();
              uint16_t ec = 0;

              if (result == E_OK || result == E_PARTIAL_SUCCESS)
                {
                  for (uint32_t i = 0; i < params.count (); i++)
                    {
                      if (!params[i].isErrorValue ())
                        {
                          n->ref->set (params[i]);
                          vaIndi.set (params[i]);
                        }
                      else
                        {
                          ec++;
                        }
                    }
                }

              if (ec == pc)
                result = E_FAILED;
              else if (vaIndi.count ())
                {
                  addr_t grp = n->group;
                  indicate (MessageType::changed, sender, grp, &vaIndi, 0, 0, 0, pendingSender);
                }

              n->ref->unref ();

              return result;
            }
          return E_NOT_FOUND;

        }
      break;
    }

  return E_OK;

}

void Router::process ()
{

  Message *d;

  m_dgramInQueue.wait_and_pop (&d);

  if (d != 0)
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlRaw> ("processing received datagram:\n");
#endif

      if (DATAGRAM_IS_RESPONSE(d))
        {
#if FLAKE_DEBUG_LOGGING
          logging::logf<lvlRaw> ("-> response.\n");
#endif
          flakeMutexAcquire (m_asynchRespmapMutex);

          auto rli = m_asynchResponseMap.begin ();

          if (d->header.token == 0)
            {
              delete d; //### TODO: This should be solved differently. the "0" atm. is for empty slot in Array...
              flakeMutexRelease (m_asynchRespmapMutex);
              return;
            }

          for (; rli != m_asynchResponseMap.end (); ++rli)
            {
              if ((*rli).token == d->header.token && (*rli).sink != nullptr)
                {

                  ResponseSink *sink = (*rli).sink;
                  PropArray &pendingData = (*rli).pendingData;
                  Message *request = (*rli).pendingRequest;
                  void *assoc_data = (*rli).associatedData;

                  PropArray data;
                  if (d->getData ().count () == 0 && pendingData.count () == 0)
                    data = request->getData ();
                  else
                    data = d->getData ();

                  for (uint32_t i = 0; i < pendingData.count (); i++)
                    data.set (pendingData[i]);

                  releaseToken ((*rli).token);
                  (*rli).sink = nullptr;
                  (*rli).token = (uint16_t) 0;
                  (*rli).pendingData.clear ();

                  flakeMutexRelease (m_asynchRespmapMutex);

                  int8_t result = sink->onResponse (d->message (), d->header.res, d->header.src, d->header.dst, data,
                                                    d->header.token, d->getTransport (), request->source (), assoc_data);

                  if ((*rli).associatedData != nullptr)
                    {
                      free ((*rli).associatedData);
                      (*rli).associatedData = nullptr;
                    }

                  request->release ();

                  if (request->clientCount () == 0)
                    {
                      if (request->message () == MessageType::connect)
                        {
                          if (result == E_OK)
                            {
                              addr_t id = nextObjectId ();
                              addr_t parent_id;
                              addr_t bcId = nextObjectId ();

                              // This is new to allow for virtual connections - experimental atm.
                              if (request->source () == ConnectedObject::EMPTY_ADDR)
                                {
                                  parent_id = 0;  // 0 is us, the router

                                  // the next part only gets used for the 'ping' messages, so not needed on v-conn
                                  auto conn = dynamic_cast<ServerConnection *>(request->getTransport ());
                                  if (conn)
                                    conn->setRootObjectAddr (id);
                                }
                              else
                                {
                                  parent_id = request->source ();
                                }

                              registerObject (id, parent_id, bcId, request->getData (), request->getTransport (), false);

                              PropArray va;
                              Property v (BaseProperties::OBJECT_ADDR);
                              v.u16 = id;
                              va.set (v);
                              request->ack (E_OK, va);
                              group (id, parent_id);
                            }
                          else
                            {
                              request->ack (E_FAILED);
                            }
                        }
                      else if (data.count () == 0)
                        {
                          request->ack (result);
                        }
                      else
                        {
                          request->ack (result, data);
                        }
                      delete request;
                    }
                }
            }

          if (rli == m_asynchResponseMap.end ())
            {
              releaseToken (d->header.token);
              flakeMutexRelease (m_asynchRespmapMutex);
            }

        }

      Message &request = *d;

      if (DATAGRAM_IS_REQUEST(d))
        {

          auto c = dynamic_cast<ServerConnection *>(d->getTransport ());
          //--
          // validate sender && connection relation
          //--
          ObjectRegistry::Node *callingObject = m_registry.findFirst (d->header.src);

          if (callingObject != nullptr || d->message () == MessageType::connect)
            {
              if (callingObject != nullptr)
                {
                  if (callingObject->ref->c == nullptr ||
                      (d->getTransport () != nullptr
                       && callingObject->ref->c->wireId () != d->getTransport ()->wireId ()))
                    {
#if FLAKE_DEBUG_LOGGING
                      logging::logf<lvlExplicit> ("Security: received datagram from source %d on wrong transport (is %d, should be %d) - dropping\n", d->header.src, d->getTransport ()->wireId (), callingObject->ref->c->wireId ());
#endif
                      callingObject->ref->unref ();
                      delete d;
                      return;
                    }
                  else
                    callingObject->ref->unref ();
                }
            }
#if FLAKE_DEBUG_LOGGING
          logging::logf<lvlRaw> ("-> request.\n");
#endif
          switch (request.message ())
            {

              case MessageType::connect:
                {
                  if (m_objectIds.size () >= UINT16_MAX - 2)
                    {
                      request.ack (E_NO_ALLOC);
                      break;
                    }
#if FLAKE_AUTH_SUPPORT
                  else if ((m_authSink != nullptr) && !c->isAuthenticated ())
                {
                  flakeAuthType at = m_authSink->authenticationType ();

                  if (at == atInteractive)
                    {
                      if (0 != m_authSink->onConnect(request.getData()))
                        {
                          request.ack (E_UNAUTHORIZED);
                          break;
                        }

                    }
                  else if (at == atSignature)
                    {
                      PropArray paAuth;

                      Property pAlgo (BaseProperties::SIGN_ALGO);
                      Property pHash (BaseProperties::SIGN_HASH);
                      Property pType (BaseProperties::AUTH_TYPE);

                      int c_len;

                      if (0 == m_authSink->onAuthChallengeRequested (d->getTransport ()->wire (), &pAlgo.str,
                                                                     &pHash.bin.lpb, &c_len))
                        {
                          if (c_len < UINT16_MAX)
                            {
                              pHash.bin.cb = (uint16_t) c_len;
                            }
                          else
                            {
                              request.ack (E_REFUSED);
                            }

                          pType.u8 = atSignature;
                          paAuth.set (pAlgo);
                          paAuth.set (pHash);
                          paAuth.set (pType);

                          struct assoc_auth_data *ad = new struct assoc_auth_data;

                          ad->challenge_len = c_len;
                          ad->challenge = pHash.bin.lpb;
                          ad->sign_algorithm = pAlgo.str;

                          indicateDisconnected (MessageType::auth, request.getTransport (),
                                                &paAuth, this, new Message (*d), nullptr,
                                                (void *) ad);
                          break;
                        }
                    }
                }
#endif
                  addr_t id = nextObjectId ();
                  addr_t parent_id;
                  addr_t bcId = nextObjectId ();

                  // This is new to allow for virtual connections - experimental atm.
                  if (request.source () == ConnectedObject::EMPTY_ADDR)
                    {
                      parent_id = 0;  // 0 is us, the router

                      // the next part only gets used for the 'ping' messages, so not needed on v-conn
                      if (c)
                        c->setRootObjectAddr (id);
                    }
                  else
                    {
                      parent_id = request.source ();
                    }
                  registerObject (id, parent_id, bcId, request.getData (), request.getTransport (), false);

                  PropArray va;
                  Property v (BaseProperties::OBJECT_ADDR);
                  v.u16 = id;
                  va.set (v);
                  request.ack (E_OK, va);
                  group (id, parent_id);

                }
              break;

              case MessageType::custom:
                {
                  ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());

                  if (n)
                    {

                      if (n->ref->requiresAuth () && !c->isAuthenticated ())
                        {
                          request.ack (E_UNAUTHORIZED);
                        }
                      else
                        {
                          indicate (MessageType::customMsgReceived, request.source (),
                                    request.destination (), &request.getData (), this, new Message (*d));
                        }
                      n->ref->unref ();
                    }
                }
              break;
              case MessageType::destroyObject:
                {
                  ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());

                  if (n)
                    {
                      if (n->ref->requiresAuth () && !c->isAuthenticated ())
                        {
                          request.ack (E_UNAUTHORIZED);
                        }
                      else
                        {
                          unregisterObject (request.source ());

                          request.ack (E_OK);
                        }
                      n->ref->unref ();
                    }
                }

              break;
              case MessageType::getProperties:
                {

                  ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());

                  PropArray vaOut;

                  if (n != nullptr)
                    {
                      uint32_t i = 0;
                      uint16_t cErr = 0;

                      int8_t res = E_OK;

                      if (n->ref->requiresAuth () && !c->isAuthenticated ())
                        {
                          request.ack (E_UNAUTHORIZED);
                          break;
                        }

                      PropArray vaIndi;

                      if (request.getData ().count () > 0)
                        {
                          for (; i < request.getData ().count (); i++)
                            {
                              Property v = request.getData ()[i];
                              if (IS_VOLATILE(v.tag))
                                {
                                  vaIndi.set (Property (v.tag));
                                }
                              else if (n->ref->has (v.tag))
                                {
                                  vaOut.set (n->ref->get (v.tag));
                                }
                              else
                                {
                                  cErr++;
                                  res = E_PARTIAL_SUCCESS;
                                  v.tag |= TAG_ERROR;
                                  v.err = E_NOT_FOUND;
                                  vaOut.set (v);
                                }
                            }
                        }
                      else
                        {

                          for (; i < n->ref->count (); i++)
                            {
                              if (IS_BASE_PROPERTY((*n->ref)[i].tag))
                                continue;
                              if (IS_VOLATILE((*n->ref)[i].tag))
                                {
                                  vaIndi.set (Property ((*n->ref)[i].tag));
                                }
                              else
                                {
                                  vaOut.set ((*n->ref)[i]);
                                }
                            }
                        }

                      n->ref->unref ();
                      if (vaIndi.count () > 0)
                        {
                          indicate (MessageType::getPropertiesReq, 0, request.destination (), &vaIndi, this, new Message (*d), &vaOut);
                        }
                      else
                        {
                          if (cErr == vaOut.count () && cErr > 0)
                            res = E_FAILED;
                          request.ack (res, vaOut);
                        }

                    }
                  else
                    request.ack (E_NOT_FOUND);

                }
              break;
#if FLAKE_STREAM_SUPPORT
              case MessageType::createStream:
            case MessageType::streamData:
            case MessageType::closeStream:
            {
                ObjectRegistry::Node* n = m_registry.findFirst(request.destination());

                if (n != 0) {
                    uint8_t msg = 0;
                    if(request.message() == MessageType::createStream)
                        msg = MessageType::createStreamReq;
                    else if(request.message() == MessageType::streamData)
                        msg = MessageType::streamDataReceived;
                    else  if(request.message() == MessageType::closeStream)
                        msg = MessageType::closeStreamReq;
                    n->ref->unref();
                   indicate(msg, request.source(),
                             request.destination(), &request.getData(), this, new Message(*d));
                } else
                    request.ack(E_NOT_FOUND);


            }
                break;
#endif

              case MessageType::ping:
                {
                  ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());

                  if (n )
                    {
                      if (n->ref->requiresAuth () && !c->isAuthenticated ())
                        {
                          request.ack (E_UNAUTHORIZED);

                        }
                      else
                        {
                          if (request.destination () == 0)
                            request.ack (E_OK);
                          else
                            indicate (MessageType::ping, request.source (), request.destination (), 0, this);
                        }

                      n->ref->unref ();
                    }
                }
              break;

              case MessageType::setProperties:
                {
                  ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());

                  PropArray vaOut, vaIndi, vaIndiActionable;

                  uint16_t grp = 0;

                  if (n != nullptr)
                    {
                      uint32_t i = 0;
                      int8_t res = E_OK;
                      unsigned int cErr = 0;

                      if (n->ref->requiresAuth () && !c->isAuthenticated ())
                        {
                          request.ack (E_UNAUTHORIZED);
                          break;
                        }

                      for (; i < request.getData ().count (); i++)
                        {
                          Property vNew = request.getData ()[i];

                          // to adjust missing modifiers
                          if ((n->parent->id != request.source ()) && (n->id != request.source ()))
                            vNew.tag = n->ref->get (vNew.tag).tag;

                          if (IS_READONLY(vNew.tag) &&
                              !(((n->parent->id == request.source ())
                                 || (n->id == request.source ())))) // latter is for bw compat.
                            {
                              Property vErr (vNew.tag |= TAG_ERROR);
                              vErr.err = E_READ_ONLY;
                              vaOut.set (vErr);
                              cErr++;
                              res = E_PARTIAL_SUCCESS;
                              continue;
                            }

                          Property vOld = n->ref->get (vNew.tag);

                          if (vOld != vNew)
                            {

                              if ((n->parent->id != request.source ())
                                  && (n->id != request.source ())) // dtp. latter is for bw compat
                                {
                                  if (vOld.isErrorValue ())
                                    {
                                      vOld.tag = request.getData ()[i].tag | TAG_ERROR;
                                      vaOut.set (vOld);
                                      cErr++;
                                    }
                                  else if (IS_ACTIONABLE(vNew.tag))
                                    {
                                      vaIndiActionable.set (vNew);
                                    }
                                  else
                                    {
                                      n->ref->set (vNew);
                                      vaOut.set (vNew);
                                      vaIndi.set (vNew);
                                    }
                                }
                              else
                                {
#if FLAKE_DEBUG_LOGGING
                                  if (logging::logLevel >= lvlRaw)
                                    {
                                      char *str = vNew.toString ();
                                      logging::logf<lvlRaw> (" - property changed or new: %s\n", str);
                                      delete[] str;
                                    }
#endif
                                  n->ref->set (vNew);
                                  vaOut.set (vNew);
                                  vaIndi.set (vNew);
                                }
                            }
                        }

                      grp = n->group;
                      n->ref->unref ();

                      if (vaIndiActionable.count () > 0)
                        {
                          indicate (MessageType::setPropertiesReq, request.source (), request.destination (), &vaIndiActionable, this, new Message (*d), &vaOut);
                        }
                      else if (vaIndi.count () > 0)
                        {
                          indicate (MessageType::changed, request.destination(), grp, &vaIndi);
                          request.ack (res, vaOut);
                        }
                      else
                        {
                          if (cErr == request.getData ().count ())
                            request.ack (E_FAILED, vaOut);
                          else
                            request.ack (E_NO_CHANGES, vaOut);
                        }

                    }
                  else
                    {
                      request.ack (E_NOT_FOUND);
                    }

                }
              break;
#if FLAKE_AUTH_SUPPORT
              case MessageType::auth:
            {

              if (request.destination () == 0)
                {
                  if (m_authSink == nullptr)
                    {
                      request.ack (E_FAILED);
                      break;
                    }

                  PropArray& paAuth = request.getData ();
                  if (paAuth.has (BaseProperties::AUTH_TYPE))
                    {
                      Property pAuthType = paAuth.get (BaseProperties::AUTH_TYPE);
                      PropArray pAuthResp;

                      switch (pAuthType.u8)
                        {
                      case atSignature:
                        {
                          Property pSignAlgo = paAuth.get (BaseProperties::SIGN_ALGO);

                          if (pSignAlgo.isErrorValue ())
                            {
                              pSignAlgo.tag = (pSignAlgo.tag | TAG_ERROR);
                              pSignAlgo.err = E_INCOMPLETE;
                              pAuthResp.set (pSignAlgo);
                              request.ack (E_FAILED, pAuthResp);
                              break;
                            }

                          Property pAuthHash = paAuth.get (BaseProperties::SIGN_HASH);

                          if (pAuthHash.isErrorValue ())
                            {
                              pAuthHash.tag = (pAuthHash.tag | TAG_ERROR);
                              pAuthHash.err = E_INCOMPLETE;
                              pAuthResp.set (pAuthHash);
                              request.ack (E_FAILED, pAuthResp);
                              break;
                            }

                          byte *signature;
                          int signature_len;

                          m_authSink->onAuthChallengeReceived (pSignAlgo.str, pAuthHash.bin.lpb, pAuthHash.bin.cb,
                                                               &signature, &signature_len);

                          if (signature_len != 0 && signature != nullptr)
                            {
                              Property pAuthSign (BaseProperties::SIGNATURE);
                              Property pAuthSignature (BaseProperties::SIGNATURE);
                              pAuthSignature.bin.cb = signature_len;
                              pAuthSignature.bin.lpb = (byte *) malloc (signature_len);
                              memcpy (pAuthSignature.bin.lpb, signature, signature_len);
                              pAuthResp.set (pAuthSignature);
                              free (signature);
                              request.ack (E_OK, pAuthResp);
                            }
                          else
                            {
                              request.ack (E_FAILED);
                            }

                        }
                          break;

                      default:
                        pAuthType.tag = (pAuthType.tag | TAG_ERROR);
                          pAuthType.err = E_UNSUPPORTED;
                          pAuthResp.set (pAuthType);
                          request.ack (E_FAILED, pAuthResp);
                        }

                    }
                  else
                    {
                      request.ack (E_INCOMPLETE);
                    }
                }
              else
                {
                  request.ack (E_UNSUPPORTED);
                }
            }
              break;
#endif
              case MessageType::queryObjects:
                {

                  ValueTable *tbl;

                  auto nl = m_registry.all ();
                  auto i = nl->begin ();
                  tbl = new ValueTable ();

                  for (; i != nl->end (); ++i)
                    {
                      Property v = (*i)->ref->get (BaseProperties::OBJECT_TYPE);

                      if (!v.isErrorValue () && v.uuid == request.getData (BaseProperties::OBJECT_TYPE).uuid)
                        {
                          if ((*i)->ref->requiresAuth () && !c->isAuthenticated ())
                            {
                              continue;
                            }
                          auto va = new PropArray ();

                          Property vAddr (BaseProperties::OBJECT_ADDR);
                          vAddr.u16 = (*i)->id;
                          va->set (vAddr);

                          tbl->m_rows.push_back (va);

                        }
                      (*i)->ref->unref ();
                    }
                  Property v (BaseProperties::OBJECT_TABLE);

                  tbl->serialize (&v.bin.cb, &v.bin.lpb);

                  delete tbl;

                  PropArray vaOut;
                  vaOut.set (v);

                  request.ack (E_OK, vaOut);

                  delete nl;

                  if (v.bin.lpb)
                    free (v.bin.lpb);

                }

              break;


              //---
              // Grouping Functions
              //---
              case MessageType::joinGroup:
                {
                  ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());

                  if (nullptr == n)
                  {
                      request.ack (E_NOT_FOUND);
                      break;
                  }

                  if (n->ref->requiresAuth () && !c->isAuthenticated ())
                    {
                      request.ack (E_UNAUTHORIZED);
                    }
                  else
                    {

                      if (m_registry.isInGroup (request.source (), request.destination ()))
                        {
                          request.ack (E_OK);
                          break;
                        }

                      uint16_t group_id = group (request.source (), request.destination ());
                      if (group_id != 0)
                        {
                          PropArray pa;
                          Property p (BaseProperties::BROADCAST_ADDR);
                          p.u16 = group_id;
                          pa.set (p);
                          request.ack (E_OK, pa);

                        }
                      else
                        {
                          request.ack (E_NOT_FOUND);
                        }
                    }
                  n->ref->unref ();
                }
              break;

              case MessageType::leaveGroup:
                {
                  ungroup (request.source (), request.destination ());
                  request.ack (E_OK);

                }
              break;

              case MessageType::createProperty:
                {

#if FLAKE_NAMED_TAG_SUPPORT
                  Property vName = request.getData().get(BaseProperties::PROP_NAME);
                Property vType = request.getData().get(BaseProperties::PROP_TYPE);
                Property vProp = request.getData().get(BaseProperties::PROP_TAG);

                if (vName.isErrorValue() || (vType.isErrorValue() && vProp.isErrorValue())) {

                    request.ack(E_FAILED);

                }

                else {

                    ObjectRegistry::Node* n = m_registry.findFirst(request.destination());

                    if (n && n->ref && n->ref) {
                        Property v = n->ref->get(BaseProperties::PROP_MAPPINGS);

                        vector<TagNameMapping>& mappings = n->ref->mappings;
                        uint16_t maxNamedTag = NAMED_TAG_RANGE_START;


                        if(!v.isErrorValue() && vProp.isErrorValue()) {

                            TagNameMapping pm;

                            for (uint32_t i = 0; i < v.arr.numValues; i++) {
                                memcpy(&pm, v.arr.lpValues + (i * sizeof(TagNameMapping)), sizeof(TagNameMapping));

                                if (tagFromName(n->ref, pm.name) == 0)
                                    mappings.push_back(pm);
                                else {
                                    for (uint32_t j = 0; j < mappings.size(); j++) {
                                        if(namesEqual(mappings[j].name, pm.name)) {
                                            mappings[j].tag = pm.tag;
                                        }
                                    }
                                }

                                maxNamedTag = max<uint32_t>(TAG_ID(pm.tag), maxNamedTag);
                            }


                            free(v.arr.lpValues);

                        } else if (vProp.isErrorValue()) {
                            v.tag = BaseProperties::PROP_MAPPINGS;
                            v.arr.cValueSize = sizeof(TagNameMapping);
                            v.arr.numValues = 0;
                        } else {
                            v = Property(BaseProperties::PROP_MAPPINGS);
                            v.arr.cValueSize = sizeof(TagNameMapping);
                        }

                        uint32_t tag;
                        if (vProp.isErrorValue())
                            tag = (++maxNamedTag << 16) | vType.u16;
                        else
                            tag = vProp.u32;


                        TagNameMapping mapping(vName.str, tag);
                        mappings.push_back(mapping);


                        v.arr.numValues++;
                        v.arr.lpValues = (flake::byte*) malloc(mappings.size() * sizeof(TagNameMapping));
                        for (unsigned long i = 0; i < mappings.size(); i++)
                            memcpy(v.arr.lpValues + (i * sizeof(TagNameMapping)), &mappings[i], sizeof(TagNameMapping));

                        n->ref->set(v);

                        free(v.arr.lpValues); //### is this right?

                        Property vNewProp(tag);

                        n->ref->set(vNewProp);

                        PropArray va;
                        PropArray vaIndi;

                        if (vProp.isErrorValue()) {
                            Property vNamedTag(BaseProperties::PROP_TAG);
                            vNamedTag.u32 = tag;
                            va.set(vNamedTag);
                        }
                        va.set(v);
                        vaIndi.set(v);

                        logging::logf<lvlInfo>("Object %04X created property \"%s\" -> 0x%08X\n", request.destination(), vName.str, tag);

                        request.ack(E_OK, va);
                        indicate(MessageType::propertyCreated, request.source(), n->group, &vaIndi);

                    } else
                        request.ack(E_FAILED);

                    if (n)
                      n->ref->unref();
                }
#else
                  request.ack (E_NOT_IMPL);
#endif
                }
              break;

              case MessageType::createObject:
                {

                  addr_t id;
                  addr_t broadcastId;

                  if (m_objectIds.size () >= UINT16_MAX - 2)
                    {
                      request.ack (E_NO_ALLOC);
                    }
                  else
                    {

                      id = nextObjectId ();
                      broadcastId = nextObjectId ();

                      bool requires_auth = false;

                      PropArray va = request.getData ();
                      Property req_auth = va.get (BaseProperties::REQUIRES_AUTH);
                      if (!req_auth.isErrorValue ())
                        {
                          requires_auth = req_auth.b;
                        }

                      registerObject (id, request.source (), broadcastId, va,
                                      request.getTransport (), requires_auth);

                      ObjectRegistry::Node *n = m_registry.findFirst (id);

                      if (n != 0)
                        for (unsigned int i = 0; i < va.count (); i++)
                          {
                            if (va[i].isErrorValue ())
                              continue;
                            n->ref->set (va[i]);
                          }

                      Property v (BaseProperties::BROADCAST_ADDR);
                      v.u16 = broadcastId;
                      va.set (v);

                      v.tag = BaseProperties::OBJECT_ADDR;
                      v.u16 = id;
                      va.set (v);

                      request.ack (E_OK, va);

                      n->ref->unref ();
                    }
                }
              break;

              case MessageType::disconnect:
                {
                  addr_t id = request.source ();

                  request.ack (E_OK);

                  unregisterObject (id);

                }
              break;

            }
        }
      delete d;
    }

}

void Router::cleanupResponseMap ()
{

  flakeMutexAcquire (m_asynchRespmapMutex);

  auto rli = m_asynchResponseMap.begin ();

  for (; rli != m_asynchResponseMap.end (); ++rli)
    {

      if ((*rli).sink == nullptr)
        continue;

      if ((*rli).creationTime + (FLAKE_DEFAULT_TIMEOUT_MS / 1000) < utils::timestamp ())
        {

          (*rli).pendingRequest->ack (E_TIMEOUT); //###OR SHOULD IT BE E_FAILED?
          releaseToken ((*rli).token);

          delete (*rli).pendingRequest;

          (*rli).sink = nullptr;
          (*rli).pendingData.clear ();
          (*rli).token = (uint16_t) 0;
        }

    }

  flakeMutexRelease (m_asynchRespmapMutex);
}

void Router::checkConnections ()
{

  flakeMutexAcquire (m_connMutex);

  auto i = m_connections.begin ();

  for (; i != m_connections.end (); ++i)
    {
      ServerConnection *conn = *i;

      if (m_doShutdown)
        {
          m_closedConnections.push_back (conn);
        }
      else if (conn->stale () && !conn->isShuttingDown ())
        {
          if (conn->pingCount () < 10)
            {
              conn->ping ();
            }
          else
            {
              m_closedConnections.push_back (conn);
            }
        }
    }

  flakeMutexRelease (m_connMutex);

  for (i = m_closedConnections.begin (); i != m_closedConnections.end (); ++i)
    {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlInfo> ("Closing stale connection id%04x\n", (*i)->id ());
#endif
      (*i)->shutdown ();
    }

  m_closedConnections.clear ();

  if (m_doShutdown && m_connections.size () == 0)
    {
      m_isShutdown = true;
      flakeSemaphoreRelease (m_runCond);
    }

}