Appearance
file /Users/ios_developer/workspace/coldwave-os/build/_deps/flake-src/routing/Router.cpp
Defines
Name | |
---|---|
ROUTER_SEND_THREAD_NAME |
Macros Documentation
define ROUTER_SEND_THREAD_NAME
cpp
#define ROUTER_SEND_THREAD_NAME "rtr_snd_thread"
Source code
cpp
/*
* Routerequest.cpp
*
* Created on: 18.10.2010
* Author: root
*/
#include <routing/Router.h>
#include <protocol/Message.h>
#include <flake.h>
#include <TableImpl.h>
#include <string.h>
#include <routing/IPCConnection.h>
using namespace flake;
using namespace std;
#if FLAKE_NAMED_TAG_SUPPORT
//---------------
//
// Name / Tag Mapping
//
//---
bool namesEqual(const char* a, const char* b) {
long hashA = strchr(a, UNIT_SEPARATOR_CHR) - a;
long hashB = strchr(b, UNIT_SEPARATOR_CHR) - b;
if (hashA >= 0) {
if (hashB >= 0) {
if (hashA == hashB && strncmp(a,b, hashA) == 0)
return true;
} else {
if ((long)strlen(b) == hashA && strncmp(a,b,hashA) == 0)
return true;
}
} else {
if (hashB >= 0) {
if ((long)strlen(a) == hashB && strncmp(a,b, hashB) == 0)
return true;
} else {
if ((long)strlen(b) == (long)strlen(a) && strcmp(a,b) == 0)
return true;
}
}
return false;
}
uint32_t tagFromName(ServersideObject* sfo, const char* name) {
auto i = sfo->mappings.begin();
for (; i != sfo->mappings.end(); ++i) {
if (namesEqual((*i).name, name)) {
return (*i).tag;
}
}
return 0;
}
#endif
#define ROUTER_SEND_THREAD_NAME "rtr_snd_thread"
//---
//
// Router
//
//---
Router::Router ()
: m_authSink (nullptr), m_connections ()
{
flakeMutexNew (&m_idMutex, nullptr);
flakeMutexNew (&m_asynchRespmapMutex, nullptr);
flakeMutexNew (&m_pvm_mutex, nullptr);
flakeMutexNew (&m_runMutex, nullptr);
flakeMutexAttr_t _attr;
memset (&_attr, 0, sizeof (flakeMutexAttr_t));
_attr.attr_bits = flakeMutexRecursive;
flakeMutexNew (&m_connMutex, &_attr);
flakeSemaphoreNew (&m_runCond, 1, 1);
m_isShutdown = false;
m_doShutdown = false;
m_isRunning = false;
//### mhh.. this is just so transportImpl can treat internal and exteral queues equally, but... not good
m_dgramOutQueue.ref ();
auto iter_a = m_asynchResponseMap.begin ();
for (; iter_a != m_asynchResponseMap.end (); ++iter_a)
{
iter_a->token = 0;
iter_a->creationTime = 0;
iter_a->pendingRequest = nullptr;
iter_a->refCount = 0;
iter_a->sink = nullptr;
}
m_nextConnectionId = 0;
m_nextObjectId = 1;
m_nextToken = (uint16_t) random ();
auto *so = new ServersideObject (m_rootValues);
m_rootGroup = nextObjectId ();
m_rootNode = m_registry.setRoot (0, m_rootGroup, *so);
so->ref ();
flakeThreadAttr_t attr;
memset (&attr, 0, sizeof (flakeThreadAttr_t));
attr.name = ROUTER_SEND_THREAD_NAME;
attr.stack_size = 2048;
attr.priority = flakeThreadPriorityNormal;
flakeThreadNew (&sendThread, &Router::sendThreadFunc, this, &attr);
}
void Router::setAuthenticationSink (AuthenticationSink *a)
{
m_authSink = a;
}
uint16_t Router::nextToken ()
{
addr_t res;
flakeMutexAcquire (m_idMutex);
auto iter = m_tokens.lower_bound (m_nextToken);
while (iter != m_tokens.end () && *iter == m_nextToken)
{
++iter;
++m_nextToken;
}
res = m_nextToken++;
m_tokens.insert (res);
flakeMutexRelease (m_idMutex);
return res;
}
void
Router::releaseToken (addr_t token)
{
flakeMutexAcquire (m_idMutex);
m_tokens.erase (token);
flakeMutexRelease (m_idMutex);
}
void Router::releaseObjectId (addr_t id)
{
flakeMutexAcquire (m_idMutex);
m_objectIds.erase (id);
flakeMutexRelease (m_idMutex);
}
addr_t Router::nextObjectId ()
{
addr_t res;
flakeMutexAcquire (m_idMutex);
auto iter = m_objectIds.lower_bound (m_nextObjectId);
while (iter != m_objectIds.end () && *iter == m_nextObjectId)
{
++iter;
++m_nextObjectId;
}
res = m_nextObjectId++;
m_objectIds.insert (res);
flakeMutexRelease (m_idMutex);
return res;
}
void Router::disconnect (ServerConnection *c)
{
ObjectRegistry::NodeList *nl = m_registry.findAll (m_rootGroup);
auto i = nl->begin ();
addr_t id = 0;
for (; i != nl->end (); ++i)
{
if ((*i)->ref->c == c)
{
id = (*i)->id;
}
(*i)->ref->unref ();
}
if (id != 0)
unregisterObject (id);
delete nl;
flakeMutexAcquire (m_connMutex);
auto j = m_connections.begin ();
for (; j != m_connections.end ();)
{
if (*j == c)
{
j = m_connections.erase (j);
}
else
j++;
}
flakeMutexRelease (m_connMutex);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo> ("connection %04x ended\n", c->id ());
#endif
auto cImpl = dynamic_cast<ServerConnectionImpl *>(c);
if (cImpl)
cImpl->unref ();
m_registry.dumpDirectory ();
}
bool Router::registerObject (addr_t id, addr_t parentId, addr_t broadcastId, PropArray &va,
Transport *c, bool requires_auth)
{
bool isWritable = true;
auto obj = new ServersideObject (va, c, requires_auth);
obj->ref ();
ObjectRegistry::Node *n;
if (m_registry.add (parentId, id, broadcastId, *obj, &n) == E_OK)
{
obj->node = n;
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - registered object %06X as child of %06X\n", id,
parentId);
#endif
auto parentObj = m_registry.findFirst (parentId, true);
if (parentObj && parentObj->id != 0)
{
PropArray vaIndi;
Property v (BaseProperties::OBJECT_ADDR);
v.u16 = id;
vaIndi.set (v);
// vaIndi.set(v);
Property v2 (BaseProperties::OBJECT_TYPE);
v2 = va.get (BaseProperties::OBJECT_TYPE);
if (!v2.isErrorValue ())
vaIndi.set (v2);
indicate (MessageType::objectCreated, parentId, m_rootGroup, &vaIndi); // war parentObj->group statt rootGroup
parentObj->ref->unref ();
}
}
else
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - parent %06X not found!\n", parentId);
#endif
}
return isWritable;
}
void Router::unregisterObject (addr_t id)
{
#if FLAKE_DEBUG_LOGGING
uint32_t childCount = 0;
logging::logf<lvlRaw> (" - unregistering object %06X\n", id);
#endif
ObjectRegistry::NodeList *children = m_registry.remove (id);
if (children != nullptr)
{
#if FLAKE_DEBUG_LOGGING
childCount = children->size ();
#endif
while (!children->empty ())
{
ObjectRegistry::Node *ch = children->front ();
//ch->ref->unref (); let's remove this since we're going to destroy it anyways
//### here was a race condition and we're not unrefing correctly
//so in general the code needs some deeper inspection despite this probably working ok now
vector<addr_t> groupMemberships = m_registry.findAllGroupMemberships (ch->id);
for (unsigned long j = 0; j < groupMemberships.size (); j++)
{
ungroup (ch->id, groupMemberships[j]);
}
indicate (MessageType::destroyed, ch->id, ch->group, nullptr);
//ch->ref->unref (); temporary, experimental
delete ch->ref;
releaseObjectId (ch->id);
releaseObjectId (ch->group);
delete ch; // counterpart to ObjectTree::add
children->pop_front ();
}
delete children;
}
// releaseObjectId(id);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - unregistered object %06X (and %ld children)\n", id, childCount - 1);
#endif
}
addr_t Router::group (addr_t objectId, addr_t groupId)
{
addr_t realGroupId = m_registry.group (objectId, groupId);
if (realGroupId == 0)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - requested object (%06X) doesn't exist\n", objectId);
#endif
return 0;
}
else if (realGroupId != groupId)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - object %06X joined object %06X's group (%06X)\n", objectId, groupId, realGroupId);
#endif
}
else
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - object %06X joined group %06X\n", objectId, groupId);
#endif
}
indicate (MessageType::joined, objectId, realGroupId);
return realGroupId;
}
void Router::ungroup (addr_t objectId, addr_t groupId)
{
m_registry.ungroup (objectId, groupId);
indicate (MessageType::left, objectId, groupId);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - object %06X ungrouped\n", objectId);
#endif
}
void Router::indicateDisconnected (uint8_t messageType, Transport *t, PropArray *data, ResponseSink *sink,
Message *req, PropArray *pendingData,
void *assoc_data)
{
auto *d = new Message ();
d->header.mt = DATAGRAM_MAKE_INDICATION(messageType);
d->header.dst = Object::EMPTY_ADDR;
d->header.src = 0;
d->header.res = 0;
d->header.token = nextToken ();
if (data)
d->setData (*data);
if (sink != nullptr)
{
flakeMutexAcquire (m_asynchRespmapMutex);
auto iter = m_asynchResponseMap.begin ();
bool assigned = false;
for (; iter != m_asynchResponseMap.end (); ++iter)
{
if ((*iter).sink == nullptr)
{
(*iter).token = d->header.token;
(*iter).sink = sink;
(*iter).creationTime = utils::timestamp ();
(*iter).pendingRequest = req;
(*iter).associatedData = assoc_data;
if (pendingData)
(*iter).pendingData.copyFrom (*pendingData);
req->retain ();
assigned = true;
break;
}
}
flakeMutexRelease (m_asynchRespmapMutex);
if (!assigned)
{
req->ack (E_NO_ALLOC);
delete d;
delete req;
return;
}
}
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - indication for unconnected client sent to Wire 0x%04X\n", t->wireId ());
#endif
t->pushQueue (d);
if (req && req->clientCount () == 0)
{
if (pendingData && pendingData->count () > 0)
req->ack (E_OK, *pendingData);
else
req->ack (E_OK);
delete req;
}
}
void Router::indicate (uint8_t messageType, addr_t source,
addr_t destination, PropArray *data, ResponseSink *sink, Message *req, PropArray *pendingData,
addr_t exclude_addr)
{
ObjectRegistry::NodeList *nl = m_registry.findAll (destination);
deque<Transport *> served_transports;
if (m_registry.isInGroup (source, destination) && nl->size () == 1)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - src (%d) == dst (%d) - 0 other members in group -> no indication\n", source, destination);
#endif
for (auto i = nl->begin (); i != nl->end (); ++i)
{
(*i)->ref->unref ();
}
delete nl;
if (req)
{
req->ack (E_OK);
delete req;
}
return;
}
else if (nl->size () == 0)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - destination (%d) unreachable\n", destination);
#endif
delete nl;
if (req)
{
req->ack (E_DESTINATION_UNREACHABLE);
delete req;
}
return;
}
auto i = nl->begin ();
for (; i != nl->end (); ++i)
{
if (!m_registry.has (*i) ||
(*i)->id == exclude_addr ||
((*i)->id == source && nl->size () > 1))
{
(*i)->ref->unref ();
continue;
}
auto *d = new Message ();
d->header.mt = DATAGRAM_MAKE_INDICATION(messageType);
d->header.dst = destination;
d->header.src = source;
d->header.res = 0;
d->header.token = nextToken ();
if (data)
{
d->setData (*data);
}
if (sink != nullptr)
{
flakeMutexAcquire (m_asynchRespmapMutex);
auto iter = m_asynchResponseMap.begin ();
bool assigned = false;
for (; iter != m_asynchResponseMap.end (); ++iter)
{
if ((*iter).sink == nullptr)
{
(*iter).token = d->header.token;
(*iter).sink = sink;
(*iter).creationTime = utils::timestamp ();
(*iter).pendingRequest = req;
(*iter).associatedData = nullptr;
if (pendingData)
(*iter).pendingData.copyFrom (*pendingData);
req->retain ();
assigned = true;
break;
}
}
flakeMutexRelease (m_asynchRespmapMutex);
if (!assigned)
{
if (req->clientCount () == 0)
{
req->ack (E_NO_ALLOC);
delete req;
}
releaseToken(d->header.token);
delete d;
(*i)->ref->unref ();
for (; i != nl->end (); ++i)
{
(*i)->ref->unref ();
}
delete nl;
return;
}
}
if ((*i)->ref->c)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> (" - indication for %06X sent to Wire 0x%04X\n", destination, (*i)->ref->c->wireId ());
#endif
bool already_served = false;
for (auto iTrans = served_transports.begin (); iTrans != served_transports.end (); ++iTrans)
{
if (*iTrans == (*i)->ref->c)
{
releaseToken(d->header.token);
delete d;
(*i)->ref->unref ();
already_served = true;
break;
}
}
if (!already_served)
{
served_transports.push_back ((*i)->ref->c);
(*i)->ref->c->pushQueue (d);
(*i)->ref->unref ();
}
}
else
{
releaseToken(d->header.token);
delete d;
(*i)->ref->unref ();
}
}
if (req && req->clientCount () == 0)
{
if (pendingData && pendingData->count () > 0)
req->ack (E_OK, *pendingData);
else
req->ack (E_OK);
delete req;
}
delete nl;
}
void Router::addClient (ServerConnection *c)
{
flakeMutexAcquire (m_connMutex);
auto i = m_connections.begin ();
ServerConnection *duplicate = nullptr;
for (; i != m_connections.end (); ++i)
{
ServerConnection *conn = *i;
if (c == conn)
{
duplicate = conn;
break;
}
}
flakeMutexRelease (m_connMutex);
if (duplicate)
{
duplicate->shutdown ();
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo> ("incoming IPC connection replaces existing one\n");
#endif
}
c->ref ();
flakeMutexAcquire (m_connMutex);
m_connections.push_back (c);
flakeMutexRelease (m_connMutex);
#if FLAKE_DEBUG_LOGGING
if (!duplicate)
logging::logf<lvlInfo> ("incoming connection on %d - id%04x\n", c->wireId (), c->id ());
#endif
}
void Router::sendThreadFunc (void *arg)
{
auto r = static_cast<Router *> (arg);
while (!r->m_doShutdown)
{
Message *dgram = nullptr;
r->m_dgramOutQueue.wait_and_pop (&dgram);
if (dgram != nullptr)
{
dgram->send ();
delete dgram;
}
}
flakeThreadExit ();
}
#ifdef FLAKE_EXPERIMENTAL
ServerConnection* Router::addClient(Wire* w, bool spawnOwnThread) {
flakeMutexAcquire(m_connMutex);
auto i = m_connections.begin();
ServerConnection* duplicate = 0;
for (; i != m_connections.end();++i)
{
ServerConnection* conn = *i;
if (w && conn->wire() && conn->wire()->getId() == w->getId()) {
duplicate = conn;
break;
}
}
flakeMutexRelease(m_connMutex);
if (duplicate) {
// duplicate->shutdown(); ### could happen twice with stale-check
logging::logf<lvlInfo>("incoming connection on %d replaces id%04x as id%04x\n", w->getId(), duplicate->id(), m_nextConnectionId);
}
ServerConnection* c;
if (spawnOwnThread) {
c = new ServerConnectionImpl(w, this, m_dgramInQueue, nullptr, ++m_nextConnectionId);
} else {
c = new ServerConnectionImpl(w, this, m_dgramInQueue, &m_dgramOutQueue, ++m_nextConnectionId);
}
c->ref();
flakeMutexAcquire(m_connMutex);
m_connections.push_back(c);
flakeMutexRelease(m_connMutex);
if (!duplicate)
logging::logf<lvlInfo>("incoming connection on %d - id%04x\n", w->getId(), c->id());
return c;
}
#else
ServerConnection *Router::addClient (Wire *w)
{
flakeMutexAcquire (m_connMutex);
auto i = m_connections.begin ();
ServerConnection *duplicate = 0;
for (; i != m_connections.end (); ++i)
{
ServerConnection *conn = *i;
if (w && conn->wireId () == w->getId ())
{
duplicate = conn;
break;
}
}
flakeMutexRelease (m_connMutex);
if (duplicate)
{
// duplicate->shutdown(); ### could happen twice through stale check
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo> ("incoming connection on %d replaces id%04x as id%04x\n", w->getId (), duplicate->id (), m_nextConnectionId);
#endif
}
ServerConnection *c = new ServerConnectionImpl (w, this, m_dgramInQueue, nullptr, m_nextConnectionId++);
c->ref (); // unref @ Router.cpp:239
dynamic_cast<TransportImpl *>(c)->start ();
flakeMutexAcquire (m_connMutex);
m_connections.push_back (c);
flakeMutexRelease (m_connMutex);
#if FLAKE_DEBUG_LOGGING
if (!duplicate)
logging::logf<lvlInfo> ("incoming connection on %d - id%04x\n", w->getId (), c->id ());
#endif
return c;
}
#endif
int8_t Router::onResponse (const uint8_t messageType, int8_t result,
addr_t sender, addr_t destination, const PropArray ¶ms, const Token t, Transport *transport,
addr_t pendingSender, void *assoc_data)
{
#if !FLAKE_AUTH_SUPPORT
UNUSED(assoc_data);
#endif
UNUSED(destination);
UNUSED(transport);
UNUSED(t);
switch (messageType)
{
case MessageType::ping:
{
return result;
}
case MessageType::customMsgReceived:
{
return result;
}
#if FLAKE_AUTH_SUPPORT
case MessageType::auth:
{
if (m_authSink != nullptr)
{
Property pSign = params.get (BaseProperties::SIGNATURE);
if (!pSign.isErrorValue ())
{
struct assoc_auth_data *ad = (struct assoc_auth_data *) assoc_data;
if (0 == m_authSink->onAuthResponseReceived (ad->sign_algorithm, ad->challenge, ad->challenge_len,
pSign.bin.lpb, pSign.bin.cb))
{
auto conn = dynamic_cast<ServerConnectionImpl *>(transport);
conn->setAuthenticated (true);
return E_OK;
}
}
}
return E_FAILED;
}
#endif
case MessageType::getPropertiesReq
:
{
ObjectRegistry::Node *n = m_registry.findFirst (sender);
if (n)
{
if (result == E_OK)
{
for (uint32_t i = 0; i < params.count (); i++)
{
if (!params[i].isErrorValue ())
{
n->ref->set (params[i]);
}
else
{
result = E_PARTIAL_SUCCESS;
}
}
}
n->ref->unref ();
return result;
}
return E_NOT_FOUND;
}
break;
case MessageType::setPropertiesReq:
{
ObjectRegistry::Node *n = m_registry.findFirst (sender);
if (n)
{
if (result == E_IGNORED)
result = E_OK;
PropArray vaIndi;
uint16_t pc = params.count ();
uint16_t ec = 0;
if (result == E_OK || result == E_PARTIAL_SUCCESS)
{
for (uint32_t i = 0; i < params.count (); i++)
{
if (!params[i].isErrorValue ())
{
n->ref->set (params[i]);
vaIndi.set (params[i]);
}
else
{
ec++;
}
}
}
if (ec == pc)
result = E_FAILED;
else if (vaIndi.count ())
{
addr_t grp = n->group;
indicate (MessageType::changed, sender, grp, &vaIndi, 0, 0, 0, pendingSender);
}
n->ref->unref ();
return result;
}
return E_NOT_FOUND;
}
break;
}
return E_OK;
}
void Router::process ()
{
Message *d;
m_dgramInQueue.wait_and_pop (&d);
if (d != 0)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> ("processing received datagram:\n");
#endif
if (DATAGRAM_IS_RESPONSE(d))
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> ("-> response.\n");
#endif
flakeMutexAcquire (m_asynchRespmapMutex);
auto rli = m_asynchResponseMap.begin ();
if (d->header.token == 0)
{
delete d; //### TODO: This should be solved differently. the "0" atm. is for empty slot in Array...
flakeMutexRelease (m_asynchRespmapMutex);
return;
}
for (; rli != m_asynchResponseMap.end (); ++rli)
{
if ((*rli).token == d->header.token && (*rli).sink != nullptr)
{
ResponseSink *sink = (*rli).sink;
PropArray &pendingData = (*rli).pendingData;
Message *request = (*rli).pendingRequest;
void *assoc_data = (*rli).associatedData;
PropArray data;
if (d->getData ().count () == 0 && pendingData.count () == 0)
data = request->getData ();
else
data = d->getData ();
for (uint32_t i = 0; i < pendingData.count (); i++)
data.set (pendingData[i]);
releaseToken ((*rli).token);
(*rli).sink = nullptr;
(*rli).token = (uint16_t) 0;
(*rli).pendingData.clear ();
flakeMutexRelease (m_asynchRespmapMutex);
int8_t result = sink->onResponse (d->message (), d->header.res, d->header.src, d->header.dst, data,
d->header.token, d->getTransport (), request->source (), assoc_data);
if ((*rli).associatedData != nullptr)
{
free ((*rli).associatedData);
(*rli).associatedData = nullptr;
}
request->release ();
if (request->clientCount () == 0)
{
if (request->message () == MessageType::connect)
{
if (result == E_OK)
{
addr_t id = nextObjectId ();
addr_t parent_id;
addr_t bcId = nextObjectId ();
// This is new to allow for virtual connections - experimental atm.
if (request->source () == ConnectedObject::EMPTY_ADDR)
{
parent_id = 0; // 0 is us, the router
// the next part only gets used for the 'ping' messages, so not needed on v-conn
auto conn = dynamic_cast<ServerConnection *>(request->getTransport ());
if (conn)
conn->setRootObjectAddr (id);
}
else
{
parent_id = request->source ();
}
registerObject (id, parent_id, bcId, request->getData (), request->getTransport (), false);
PropArray va;
Property v (BaseProperties::OBJECT_ADDR);
v.u16 = id;
va.set (v);
request->ack (E_OK, va);
group (id, parent_id);
}
else
{
request->ack (E_FAILED);
}
}
else if (data.count () == 0)
{
request->ack (result);
}
else
{
request->ack (result, data);
}
delete request;
}
}
}
if (rli == m_asynchResponseMap.end ())
{
releaseToken (d->header.token);
flakeMutexRelease (m_asynchRespmapMutex);
}
}
Message &request = *d;
if (DATAGRAM_IS_REQUEST(d))
{
auto c = dynamic_cast<ServerConnection *>(d->getTransport ());
//--
// validate sender && connection relation
//--
ObjectRegistry::Node *callingObject = m_registry.findFirst (d->header.src);
if (callingObject != nullptr || d->message () == MessageType::connect)
{
if (callingObject != nullptr)
{
if (callingObject->ref->c == nullptr ||
(d->getTransport () != nullptr
&& callingObject->ref->c->wireId () != d->getTransport ()->wireId ()))
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlExplicit> ("Security: received datagram from source %d on wrong transport (is %d, should be %d) - dropping\n", d->header.src, d->getTransport ()->wireId (), callingObject->ref->c->wireId ());
#endif
callingObject->ref->unref ();
delete d;
return;
}
else
callingObject->ref->unref ();
}
}
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw> ("-> request.\n");
#endif
switch (request.message ())
{
case MessageType::connect:
{
if (m_objectIds.size () >= UINT16_MAX - 2)
{
request.ack (E_NO_ALLOC);
break;
}
#if FLAKE_AUTH_SUPPORT
else if ((m_authSink != nullptr) && !c->isAuthenticated ())
{
flakeAuthType at = m_authSink->authenticationType ();
if (at == atInteractive)
{
if (0 != m_authSink->onConnect(request.getData()))
{
request.ack (E_UNAUTHORIZED);
break;
}
}
else if (at == atSignature)
{
PropArray paAuth;
Property pAlgo (BaseProperties::SIGN_ALGO);
Property pHash (BaseProperties::SIGN_HASH);
Property pType (BaseProperties::AUTH_TYPE);
int c_len;
if (0 == m_authSink->onAuthChallengeRequested (d->getTransport ()->wire (), &pAlgo.str,
&pHash.bin.lpb, &c_len))
{
if (c_len < UINT16_MAX)
{
pHash.bin.cb = (uint16_t) c_len;
}
else
{
request.ack (E_REFUSED);
}
pType.u8 = atSignature;
paAuth.set (pAlgo);
paAuth.set (pHash);
paAuth.set (pType);
struct assoc_auth_data *ad = new struct assoc_auth_data;
ad->challenge_len = c_len;
ad->challenge = pHash.bin.lpb;
ad->sign_algorithm = pAlgo.str;
indicateDisconnected (MessageType::auth, request.getTransport (),
&paAuth, this, new Message (*d), nullptr,
(void *) ad);
break;
}
}
}
#endif
addr_t id = nextObjectId ();
addr_t parent_id;
addr_t bcId = nextObjectId ();
// This is new to allow for virtual connections - experimental atm.
if (request.source () == ConnectedObject::EMPTY_ADDR)
{
parent_id = 0; // 0 is us, the router
// the next part only gets used for the 'ping' messages, so not needed on v-conn
if (c)
c->setRootObjectAddr (id);
}
else
{
parent_id = request.source ();
}
registerObject (id, parent_id, bcId, request.getData (), request.getTransport (), false);
PropArray va;
Property v (BaseProperties::OBJECT_ADDR);
v.u16 = id;
va.set (v);
request.ack (E_OK, va);
group (id, parent_id);
}
break;
case MessageType::custom:
{
ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());
if (n)
{
if (n->ref->requiresAuth () && !c->isAuthenticated ())
{
request.ack (E_UNAUTHORIZED);
}
else
{
indicate (MessageType::customMsgReceived, request.source (),
request.destination (), &request.getData (), this, new Message (*d));
}
n->ref->unref ();
}
}
break;
case MessageType::destroyObject:
{
ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());
if (n)
{
if (n->ref->requiresAuth () && !c->isAuthenticated ())
{
request.ack (E_UNAUTHORIZED);
}
else
{
unregisterObject (request.source ());
request.ack (E_OK);
}
n->ref->unref ();
}
}
break;
case MessageType::getProperties:
{
ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());
PropArray vaOut;
if (n != nullptr)
{
uint32_t i = 0;
uint16_t cErr = 0;
int8_t res = E_OK;
if (n->ref->requiresAuth () && !c->isAuthenticated ())
{
request.ack (E_UNAUTHORIZED);
break;
}
PropArray vaIndi;
if (request.getData ().count () > 0)
{
for (; i < request.getData ().count (); i++)
{
Property v = request.getData ()[i];
if (IS_VOLATILE(v.tag))
{
vaIndi.set (Property (v.tag));
}
else if (n->ref->has (v.tag))
{
vaOut.set (n->ref->get (v.tag));
}
else
{
cErr++;
res = E_PARTIAL_SUCCESS;
v.tag |= TAG_ERROR;
v.err = E_NOT_FOUND;
vaOut.set (v);
}
}
}
else
{
for (; i < n->ref->count (); i++)
{
if (IS_BASE_PROPERTY((*n->ref)[i].tag))
continue;
if (IS_VOLATILE((*n->ref)[i].tag))
{
vaIndi.set (Property ((*n->ref)[i].tag));
}
else
{
vaOut.set ((*n->ref)[i]);
}
}
}
n->ref->unref ();
if (vaIndi.count () > 0)
{
indicate (MessageType::getPropertiesReq, 0, request.destination (), &vaIndi, this, new Message (*d), &vaOut);
}
else
{
if (cErr == vaOut.count () && cErr > 0)
res = E_FAILED;
request.ack (res, vaOut);
}
}
else
request.ack (E_NOT_FOUND);
}
break;
#if FLAKE_STREAM_SUPPORT
case MessageType::createStream:
case MessageType::streamData:
case MessageType::closeStream:
{
ObjectRegistry::Node* n = m_registry.findFirst(request.destination());
if (n != 0) {
uint8_t msg = 0;
if(request.message() == MessageType::createStream)
msg = MessageType::createStreamReq;
else if(request.message() == MessageType::streamData)
msg = MessageType::streamDataReceived;
else if(request.message() == MessageType::closeStream)
msg = MessageType::closeStreamReq;
n->ref->unref();
indicate(msg, request.source(),
request.destination(), &request.getData(), this, new Message(*d));
} else
request.ack(E_NOT_FOUND);
}
break;
#endif
case MessageType::ping:
{
ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());
if (n )
{
if (n->ref->requiresAuth () && !c->isAuthenticated ())
{
request.ack (E_UNAUTHORIZED);
}
else
{
if (request.destination () == 0)
request.ack (E_OK);
else
indicate (MessageType::ping, request.source (), request.destination (), 0, this);
}
n->ref->unref ();
}
}
break;
case MessageType::setProperties:
{
ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());
PropArray vaOut, vaIndi, vaIndiActionable;
uint16_t grp = 0;
if (n != nullptr)
{
uint32_t i = 0;
int8_t res = E_OK;
unsigned int cErr = 0;
if (n->ref->requiresAuth () && !c->isAuthenticated ())
{
request.ack (E_UNAUTHORIZED);
break;
}
for (; i < request.getData ().count (); i++)
{
Property vNew = request.getData ()[i];
// to adjust missing modifiers
if ((n->parent->id != request.source ()) && (n->id != request.source ()))
vNew.tag = n->ref->get (vNew.tag).tag;
if (IS_READONLY(vNew.tag) &&
!(((n->parent->id == request.source ())
|| (n->id == request.source ())))) // latter is for bw compat.
{
Property vErr (vNew.tag |= TAG_ERROR);
vErr.err = E_READ_ONLY;
vaOut.set (vErr);
cErr++;
res = E_PARTIAL_SUCCESS;
continue;
}
Property vOld = n->ref->get (vNew.tag);
if (vOld != vNew)
{
if ((n->parent->id != request.source ())
&& (n->id != request.source ())) // dtp. latter is for bw compat
{
if (vOld.isErrorValue ())
{
vOld.tag = request.getData ()[i].tag | TAG_ERROR;
vaOut.set (vOld);
cErr++;
}
else if (IS_ACTIONABLE(vNew.tag))
{
vaIndiActionable.set (vNew);
}
else
{
n->ref->set (vNew);
vaOut.set (vNew);
vaIndi.set (vNew);
}
}
else
{
#if FLAKE_DEBUG_LOGGING
if (logging::logLevel >= lvlRaw)
{
char *str = vNew.toString ();
logging::logf<lvlRaw> (" - property changed or new: %s\n", str);
delete[] str;
}
#endif
n->ref->set (vNew);
vaOut.set (vNew);
vaIndi.set (vNew);
}
}
}
grp = n->group;
n->ref->unref ();
if (vaIndiActionable.count () > 0)
{
indicate (MessageType::setPropertiesReq, request.source (), request.destination (), &vaIndiActionable, this, new Message (*d), &vaOut);
}
else if (vaIndi.count () > 0)
{
indicate (MessageType::changed, request.destination(), grp, &vaIndi);
request.ack (res, vaOut);
}
else
{
if (cErr == request.getData ().count ())
request.ack (E_FAILED, vaOut);
else
request.ack (E_NO_CHANGES, vaOut);
}
}
else
{
request.ack (E_NOT_FOUND);
}
}
break;
#if FLAKE_AUTH_SUPPORT
case MessageType::auth:
{
if (request.destination () == 0)
{
if (m_authSink == nullptr)
{
request.ack (E_FAILED);
break;
}
PropArray& paAuth = request.getData ();
if (paAuth.has (BaseProperties::AUTH_TYPE))
{
Property pAuthType = paAuth.get (BaseProperties::AUTH_TYPE);
PropArray pAuthResp;
switch (pAuthType.u8)
{
case atSignature:
{
Property pSignAlgo = paAuth.get (BaseProperties::SIGN_ALGO);
if (pSignAlgo.isErrorValue ())
{
pSignAlgo.tag = (pSignAlgo.tag | TAG_ERROR);
pSignAlgo.err = E_INCOMPLETE;
pAuthResp.set (pSignAlgo);
request.ack (E_FAILED, pAuthResp);
break;
}
Property pAuthHash = paAuth.get (BaseProperties::SIGN_HASH);
if (pAuthHash.isErrorValue ())
{
pAuthHash.tag = (pAuthHash.tag | TAG_ERROR);
pAuthHash.err = E_INCOMPLETE;
pAuthResp.set (pAuthHash);
request.ack (E_FAILED, pAuthResp);
break;
}
byte *signature;
int signature_len;
m_authSink->onAuthChallengeReceived (pSignAlgo.str, pAuthHash.bin.lpb, pAuthHash.bin.cb,
&signature, &signature_len);
if (signature_len != 0 && signature != nullptr)
{
Property pAuthSign (BaseProperties::SIGNATURE);
Property pAuthSignature (BaseProperties::SIGNATURE);
pAuthSignature.bin.cb = signature_len;
pAuthSignature.bin.lpb = (byte *) malloc (signature_len);
memcpy (pAuthSignature.bin.lpb, signature, signature_len);
pAuthResp.set (pAuthSignature);
free (signature);
request.ack (E_OK, pAuthResp);
}
else
{
request.ack (E_FAILED);
}
}
break;
default:
pAuthType.tag = (pAuthType.tag | TAG_ERROR);
pAuthType.err = E_UNSUPPORTED;
pAuthResp.set (pAuthType);
request.ack (E_FAILED, pAuthResp);
}
}
else
{
request.ack (E_INCOMPLETE);
}
}
else
{
request.ack (E_UNSUPPORTED);
}
}
break;
#endif
case MessageType::queryObjects:
{
ValueTable *tbl;
auto nl = m_registry.all ();
auto i = nl->begin ();
tbl = new ValueTable ();
for (; i != nl->end (); ++i)
{
Property v = (*i)->ref->get (BaseProperties::OBJECT_TYPE);
if (!v.isErrorValue () && v.uuid == request.getData (BaseProperties::OBJECT_TYPE).uuid)
{
if ((*i)->ref->requiresAuth () && !c->isAuthenticated ())
{
continue;
}
auto va = new PropArray ();
Property vAddr (BaseProperties::OBJECT_ADDR);
vAddr.u16 = (*i)->id;
va->set (vAddr);
tbl->m_rows.push_back (va);
}
(*i)->ref->unref ();
}
Property v (BaseProperties::OBJECT_TABLE);
tbl->serialize (&v.bin.cb, &v.bin.lpb);
delete tbl;
PropArray vaOut;
vaOut.set (v);
request.ack (E_OK, vaOut);
delete nl;
if (v.bin.lpb)
free (v.bin.lpb);
}
break;
//---
// Grouping Functions
//---
case MessageType::joinGroup:
{
ObjectRegistry::Node *n = m_registry.findFirst (request.destination ());
if (nullptr == n)
{
request.ack (E_NOT_FOUND);
break;
}
if (n->ref->requiresAuth () && !c->isAuthenticated ())
{
request.ack (E_UNAUTHORIZED);
}
else
{
if (m_registry.isInGroup (request.source (), request.destination ()))
{
request.ack (E_OK);
break;
}
uint16_t group_id = group (request.source (), request.destination ());
if (group_id != 0)
{
PropArray pa;
Property p (BaseProperties::BROADCAST_ADDR);
p.u16 = group_id;
pa.set (p);
request.ack (E_OK, pa);
}
else
{
request.ack (E_NOT_FOUND);
}
}
n->ref->unref ();
}
break;
case MessageType::leaveGroup:
{
ungroup (request.source (), request.destination ());
request.ack (E_OK);
}
break;
case MessageType::createProperty:
{
#if FLAKE_NAMED_TAG_SUPPORT
Property vName = request.getData().get(BaseProperties::PROP_NAME);
Property vType = request.getData().get(BaseProperties::PROP_TYPE);
Property vProp = request.getData().get(BaseProperties::PROP_TAG);
if (vName.isErrorValue() || (vType.isErrorValue() && vProp.isErrorValue())) {
request.ack(E_FAILED);
}
else {
ObjectRegistry::Node* n = m_registry.findFirst(request.destination());
if (n && n->ref && n->ref) {
Property v = n->ref->get(BaseProperties::PROP_MAPPINGS);
vector<TagNameMapping>& mappings = n->ref->mappings;
uint16_t maxNamedTag = NAMED_TAG_RANGE_START;
if(!v.isErrorValue() && vProp.isErrorValue()) {
TagNameMapping pm;
for (uint32_t i = 0; i < v.arr.numValues; i++) {
memcpy(&pm, v.arr.lpValues + (i * sizeof(TagNameMapping)), sizeof(TagNameMapping));
if (tagFromName(n->ref, pm.name) == 0)
mappings.push_back(pm);
else {
for (uint32_t j = 0; j < mappings.size(); j++) {
if(namesEqual(mappings[j].name, pm.name)) {
mappings[j].tag = pm.tag;
}
}
}
maxNamedTag = max<uint32_t>(TAG_ID(pm.tag), maxNamedTag);
}
free(v.arr.lpValues);
} else if (vProp.isErrorValue()) {
v.tag = BaseProperties::PROP_MAPPINGS;
v.arr.cValueSize = sizeof(TagNameMapping);
v.arr.numValues = 0;
} else {
v = Property(BaseProperties::PROP_MAPPINGS);
v.arr.cValueSize = sizeof(TagNameMapping);
}
uint32_t tag;
if (vProp.isErrorValue())
tag = (++maxNamedTag << 16) | vType.u16;
else
tag = vProp.u32;
TagNameMapping mapping(vName.str, tag);
mappings.push_back(mapping);
v.arr.numValues++;
v.arr.lpValues = (flake::byte*) malloc(mappings.size() * sizeof(TagNameMapping));
for (unsigned long i = 0; i < mappings.size(); i++)
memcpy(v.arr.lpValues + (i * sizeof(TagNameMapping)), &mappings[i], sizeof(TagNameMapping));
n->ref->set(v);
free(v.arr.lpValues); //### is this right?
Property vNewProp(tag);
n->ref->set(vNewProp);
PropArray va;
PropArray vaIndi;
if (vProp.isErrorValue()) {
Property vNamedTag(BaseProperties::PROP_TAG);
vNamedTag.u32 = tag;
va.set(vNamedTag);
}
va.set(v);
vaIndi.set(v);
logging::logf<lvlInfo>("Object %04X created property \"%s\" -> 0x%08X\n", request.destination(), vName.str, tag);
request.ack(E_OK, va);
indicate(MessageType::propertyCreated, request.source(), n->group, &vaIndi);
} else
request.ack(E_FAILED);
if (n)
n->ref->unref();
}
#else
request.ack (E_NOT_IMPL);
#endif
}
break;
case MessageType::createObject:
{
addr_t id;
addr_t broadcastId;
if (m_objectIds.size () >= UINT16_MAX - 2)
{
request.ack (E_NO_ALLOC);
}
else
{
id = nextObjectId ();
broadcastId = nextObjectId ();
bool requires_auth = false;
PropArray va = request.getData ();
Property req_auth = va.get (BaseProperties::REQUIRES_AUTH);
if (!req_auth.isErrorValue ())
{
requires_auth = req_auth.b;
}
registerObject (id, request.source (), broadcastId, va,
request.getTransport (), requires_auth);
ObjectRegistry::Node *n = m_registry.findFirst (id);
if (n != 0)
for (unsigned int i = 0; i < va.count (); i++)
{
if (va[i].isErrorValue ())
continue;
n->ref->set (va[i]);
}
Property v (BaseProperties::BROADCAST_ADDR);
v.u16 = broadcastId;
va.set (v);
v.tag = BaseProperties::OBJECT_ADDR;
v.u16 = id;
va.set (v);
request.ack (E_OK, va);
n->ref->unref ();
}
}
break;
case MessageType::disconnect:
{
addr_t id = request.source ();
request.ack (E_OK);
unregisterObject (id);
}
break;
}
}
delete d;
}
}
void Router::cleanupResponseMap ()
{
flakeMutexAcquire (m_asynchRespmapMutex);
auto rli = m_asynchResponseMap.begin ();
for (; rli != m_asynchResponseMap.end (); ++rli)
{
if ((*rli).sink == nullptr)
continue;
if ((*rli).creationTime + (FLAKE_DEFAULT_TIMEOUT_MS / 1000) < utils::timestamp ())
{
(*rli).pendingRequest->ack (E_TIMEOUT); //###OR SHOULD IT BE E_FAILED?
releaseToken ((*rli).token);
delete (*rli).pendingRequest;
(*rli).sink = nullptr;
(*rli).pendingData.clear ();
(*rli).token = (uint16_t) 0;
}
}
flakeMutexRelease (m_asynchRespmapMutex);
}
void Router::checkConnections ()
{
flakeMutexAcquire (m_connMutex);
auto i = m_connections.begin ();
for (; i != m_connections.end (); ++i)
{
ServerConnection *conn = *i;
if (m_doShutdown)
{
m_closedConnections.push_back (conn);
}
else if (conn->stale () && !conn->isShuttingDown ())
{
if (conn->pingCount () < 10)
{
conn->ping ();
}
else
{
m_closedConnections.push_back (conn);
}
}
}
flakeMutexRelease (m_connMutex);
for (i = m_closedConnections.begin (); i != m_closedConnections.end (); ++i)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo> ("Closing stale connection id%04x\n", (*i)->id ());
#endif
(*i)->shutdown ();
}
m_closedConnections.clear ();
if (m_doShutdown && m_connections.size () == 0)
{
m_isShutdown = true;
flakeSemaphoreRelease (m_runCond);
}
}