Appearance
file /Users/ios_developer/workspace/coldwave-os/build/_deps/flake-src/routing/Router.cpp
Source code
cpp
/*******************************************************************************
* @file Router.cpp
* @license This file is part of the ImagineOn Flake software package
* licensed under the ImagineOn software-licensing terms available
* under https://www.imagineon.de/de/info/licensing-terms
* @copyright Copyright (c) 2025 ImagineOn GmbH. www.imagineon.de.
*******************************************************************************/
#include <routing/Router.h>
#include <protocol/Message.h>
#include <flake.h>
#include <TableImpl.h>
#include <cstring>
namespace
{
constexpr char ROUTER_SERVER_WIRE_THREAD_NAME[] = "flk:r:acc";
constexpr char ROUTER_SEND_THREAD_NAME[] = "flk:r:snd";
constexpr char ROUTER_PROCESS_THREAD_NAME[] = "flk:r:prc";
constexpr char ROUTER_MAINTENANCE_THREAD_NAME[] = "flk:r:mnt";
}
namespace flake
{
//---
//
// Router
//
//---
Router::Router()
: m_authSink(nullptr)
{
m_runMutex = {};
m_runCond = {};
m_connMutex = {};
m_idMutex = {};
m_pvm_mutex = {};
m_sendThread = {};
m_maxConnections = 0U;
flakeMutexNew(&m_idMutex, nullptr);
flakeMutexNew(&m_pvm_mutex, nullptr);
flakeMutexNew(&m_runMutex, nullptr);
flakeMutexAttr_t _attr = {};
_attr.attr_bits = flakeMutexRecursive;
flakeMutexNew(&m_connMutex, &_attr);
flakeSemaphoreNew(&m_runCond, 1U, 1U);
m_isShutdown = false;
m_doShutdown = false;
m_isRunning = false;
m_msgInQueue.ref();
m_msgInQueue.lock();
m_nextConnectionId = 0;
(void)m_objectIds.insert(U16(0U));
(void)m_objectIds.insert(U16(0xFFFF));
m_nextObjectId = 1U;
//TODO: Dynamic Memory
auto* so = new ServersideObject(m_rootValues);
m_rootGroup = nextObjectId();
m_rootNode = m_registry.setRoot(0U, m_rootGroup, *so);
so->ref();
flakeThreadAttr_t attr = {};
attr.name = ROUTER_SEND_THREAD_NAME;
attr.stack_size = 2048U;
attr.priority = flakeThreadPriorityNormal;
flakeThreadNew(&m_sendThread, &Router::sendThread, this, &attr);
}
Router::~Router()
{
shutdown();
m_msgInQueue.unlock();
m_msgOutQueue.unref();
}
void Router::setAuthenticationSink(AuthenticationSink* a)
{
m_authSink = a;
}
addr_t Router::nextObjectId()
{
(void)flakeMutexAcquire(m_idMutex);
auto iter = m_objectIds.lower_bound(m_nextObjectId);
while (iter != m_objectIds.end() && UNSIGNED(*iter) == m_nextObjectId)
{
++iter;
++m_nextObjectId;
}
addr_t res = m_nextObjectId++;
(void)m_objectIds.insert(res);
flakeMutexRelease(m_idMutex);
return res;
}
void Router::releaseObjectId(addr_t id)
{
(void)flakeMutexAcquire(m_idMutex);
(void)m_objectIds.erase(id);
flakeMutexRelease(m_idMutex);
}
void Router::disconnect(ServerConnection* c)
{
ObjectRegistry::NodeList* nl = m_registry.findAll(m_rootGroup);
auto i = nl->begin();
addr_t id = 0U;
for (; i != nl->end(); ++i)
{
if ((*i)->ref->transport() == c)
{
id = (*i)->id;
}
(*i)->ref->unref();
}
if (id != 0U)
{
unregisterObject(id);
}
//TODO: Dynamic Memory
delete nl;
(void)flakeMutexAcquire(m_connMutex);
auto j = m_connections.begin();
while (j != m_connections.end())
{
if (*j == c)
{
j = m_connections.erase(j);
}
else
{
++j;
}
}
flakeMutexRelease(m_connMutex);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo>("connection %04x ended\n", c->id());
#endif
c->unref();
m_registry.dumpDirectory();
}
bool Router::registerObject(addr_t id, addr_t parentId, addr_t broadcastId, const PropArray& va,
Transport* c, bool requires_auth)
{
bool isWritable = true;
//TODO: Dynamic Memory
auto obj = new ServersideObject(va, c, requires_auth);
ObjectRegistry::Node* n = nullptr;
if (m_registry.add(parentId, id, broadcastId, *obj, &n) == E_OK)
{
obj->setNode(n);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - registered object %04X as child of %04X\n", UNSIGNED(id), UNSIGNED(parentId));
#endif
auto parentObj = m_registry.findFirst(parentId);
if (nullptr != parentObj && parentObj->id != 0U)
{
PropArray vaIndi;
Property v(ParameterProperties::OBJECT_ADDR);
v.setData(id);
vaIndi.set(v);
Property v2(ParameterProperties::OBJECT_TYPE);
v2 = va.get(ParameterProperties::OBJECT_TYPE);
if (!v2.isErrorValue())
{
vaIndi.set(v2);
}
indicate(MessageType::objectCreated, parentId, m_rootGroup, &vaIndi, nullptr, EMPTY_ADDR, 0);
parentObj->ref->unref();
}
}
else
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - parent %04X not found!\n", UNSIGNED(parentId));
#endif
}
return isWritable;
}
void Router::unregisterObject(addr_t id)
{
#if FLAKE_DEBUG_LOGGING
uint32_t childCount = 0U;
logging::logf<lvlRaw>(" - unregistering object %04X\n", UNSIGNED(id));
#endif
ObjectRegistry::NodeList* children = m_registry.remove(id);
if (children != nullptr)
{
#if FLAKE_DEBUG_LOGGING
childCount = U32(children->size());
#endif
while (!children->empty())
{
ObjectRegistry::Node* ch = children->front();
const std::vector<addr_t>& groupMemberships = m_registry.findAllGroupMemberships(ch->id);
for (unsigned short groupMembership : groupMemberships)
{
ungroup(ch->id, groupMembership);
}
releaseObjectId(ch->id);
releaseObjectId(ch->group);
ch->ref->unref();
children->pop_front();
}
//TODO: Dynamic Memory
delete children;
}
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - unregistered object %04X (and %ld children)\n", UNSIGNED(id), childCount - 1U);
#endif
}
addr_t Router::group(addr_t objectId, addr_t groupId)
{
addr_t realGroupId = m_registry.group(objectId, groupId);
if (realGroupId == 0U)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - requested object (%04X) doesn't exist\n", UNSIGNED(objectId));
#endif
}
else if (UNSIGNED(realGroupId) != groupId)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - object %04X joined object %04X's group (%04X)\n", UNSIGNED(objectId),
UNSIGNED(groupId), UNSIGNED(realGroupId));
#endif
}
else
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - object %04X joined group %04X\n", UNSIGNED(objectId), UNSIGNED(groupId));
#endif
}
if (realGroupId != 0U)
{
indicate(MessageType::joined, objectId, realGroupId,
nullptr,nullptr, EMPTY_ADDR, 0);
}
return realGroupId;
}
void Router::ungroup(addr_t objectId, addr_t groupId)
{
m_registry.ungroup(objectId, groupId);
indicate(MessageType::left, objectId, groupId, nullptr, nullptr, EMPTY_ADDR, 0);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - object %04X ungrouped\n", UNSIGNED(objectId));
#endif
}
void Router::setMaxConnections(unsigned max) {
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo>("setting max connections to %d\n", max);
#endif
m_maxConnections = max;
}
bool Router::acceptsMoreConnections() const {
return m_maxConnections == 0U || m_connections.size() < m_maxConnections;
}
ServerConnection* Router::addClient(Wire* w)
{
(void)flakeMutexAcquire(m_connMutex);
auto i = m_connections.begin();
ServerConnection* duplicate = nullptr;
for (; i != m_connections.end(); ++i)
{
ServerConnection* conn = *i;
if (conn->wire() == w)
{
duplicate = conn;
break;
}
}
flakeMutexRelease(m_connMutex);
if (nullptr != duplicate)
{
// duplicate->shutdown(); TODO: could happen twice through stale check
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo>("incoming connection on %p replaces id%04x as id%04x\n", w, duplicate
->id(), m_nextConnectionId);
#endif
}
//TODO: Dynamic Memory
ServerConnection* c = new ServerConnection(w, this, &m_msgInQueue, m_nextConnectionId++);
c->ref();
m_connections.push_back(c);
flakeMutexRelease(m_connMutex);
#if FLAKE_DEBUG_LOGGING
if (nullptr == duplicate)
{
logging::logf<lvlInfo>("incoming connection on %p - id%04x\n", w, c->id());
}
#endif
return c;
}
void Router::indicate(uint8_t messageType, addr_t source,
addr_t destination, const PropArray* data, Message* req,
addr_t exclude_addr, uint32_t timeout)
{
ObjectRegistry::NodeList* nl = m_registry.findAll(destination);
std::deque<Transport*> served_transports;
if (m_registry.isInGroup(source, destination) && nl->size() == 1U)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - src (%d) == dst (%d) - 0 other members in group -> no indication\n",
UNSIGNED(source), UNSIGNED(destination));
#endif
for (auto& i : *nl)
{
i->ref->unref();
}
if (nullptr != req)
{
req->ack(I8(E_OK));
req->destroy();
}
}
else if (nl->empty())
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlRaw>(" - destination (%d) unreachable\n", UNSIGNED(destination));
#endif
if (nullptr != req)
{
req->ack(I8(E_DESTINATION_UNREACHABLE));
req->destroy();
}
}
else
{
for (auto& it : *nl)
{
ObjectRegistry::Node& node = *it;
ServersideObject* object = node.ref;
if ((UNSIGNED(node.id) == exclude_addr) && nl->size() > 1U)
{
object->unref();
continue;
}
auto* d = Message::create(mstIndication, messageType, source, destination);
if (nullptr != d)
{
if (nullptr != data)
{
for (unsigned int i = 0U; i < data->count(); ++i)
{
d->setData(data->getAt(i));
}
}
if (nullptr != object->transport() && nullptr != object->transport()->wire())
{
bool already_served = false;
for (auto& served_transport : served_transports)
{
if (served_transport == object->transport())
{
d->destroy();
object->unref();
already_served = true;
break;
}
}
if (!already_served)
{
ResponseContext* rc = nullptr;
if (nullptr != req && timeout > 0U)
{
//TODO: Dynamic Memory
rc = new ResponseContext();
req->setClientCount(req->clientCount() + 1U);
rc->pendingRequest = Message::create(*req);
rc->assoc_data = nullptr;
}
served_transports.push_back(object->transport());
int indi_res = object->transport()->sendIndication(d, this, timeout, rc);
if (indi_res == E_NO_ALLOC)
{
d->destroy();
if (nullptr != rc && nullptr != rc->pendingRequest)
{
rc->pendingRequest->ack(I8(E_NO_ALLOC));
rc->pendingRequest->destroy();
}
}
#if FLAKE_DEBUG_LOGGING
else
{
logging::logf<lvlRaw>(" - indication for %04X sent to Wire %p\n", UNSIGNED(destination),
object->transport()->wire());
}
#endif
object->unref();
}
}
else
{
d->destroy();
object->unref();
}
}
}
}
//TODO: Dynamic Memory
delete nl;
}
void Router::onResponse(Message* response, ResponseContext* ctx)
{
int32_t result = response->result();
Transport* transport = response->transport();
addr_t sender = response->source();
const PropArray& data = response->getData();
ObjectRegistry::Node* n = m_registry.findFirst(sender);
Message* pendingRequest = nullptr;
if (ctx != nullptr)
{
pendingRequest = ctx->pendingRequest;
}
switch (UNSIGNED(response->type()))
{
case MessageType::ping:
case MessageType::customMsgReceived: //NOLINT MISRA 5-4-4 fallthrough on purpose
break;
case MessageType::auth:
{
if (m_authSink != nullptr)
{
Property pSign = data.get(ParameterProperties::SIGNATURE);
if (!pSign.isErrorValue())
{
struct assoc_auth_data* ad = (struct assoc_auth_data*)ctx->assoc_data;
if (0 == m_authSink->onAuthResponseReceived(ad->sign_algorithm, ad->challenge,
ad->challenge_len,
pSign.value<tt_bin_t>()->lpb,
SIGNED(pSign.value<tt_bin_t>()->cb)))
{
auto conn = transport->linkedServerConnection();
conn->setAuthenticated(true);
}
}
}
}
break;
case MessageType::getPropertiesReq:
{
if (nullptr != n)
{
if (result == E_OK)
{
for (uint32_t i = 0U; i < data.count(); i++)
{
if (!data[i].isErrorValue())
{
n->ref->set(data[i]);
}
else
{
result = E_PARTIAL_SUCCESS;
}
}
}
}
}
break;
case MessageType::openPropertyReq:
{
uint16_t id = *pendingRequest->getData().get(ParameterProperties::OBJECT_ADDR).value<uint16_t>();
if (result != E_OK)
{
releaseObjectId(id);
}
else
{
addr_t bcId = nextObjectId();
group(id, bcId);
(void)registerObject(id,
pendingRequest->source(),
bcId,
data,
response->transport(),
false);
}
}
break;
case MessageType::setPropertiesReq:
{
if (nullptr != n)
{
if (result == E_IGNORED)
{
result = E_OK;
}
PropArray vaIndi;
uint16_t pc = U16(data.count());
uint16_t ec = 0U;
if (result == E_OK || result == E_PARTIAL_SUCCESS)
{
for (uint32_t i = 0U; i < data.count(); i++)
{
if (!data[i].isErrorValue())
{
n->ref->set(data[i]);
vaIndi.set(data[i]);
}
else
{
ec++;
}
}
}
if (UNSIGNED(ec) == pc && ec != 0U)
{
result = E_FAILED;
}
else if (vaIndi.count() > 0U)
{
addr_t grp = n->group;
indicate(MessageType::changed, sender, grp, &vaIndi, nullptr,
pendingRequest->source(), 0);
}
else
{
// MISRA 6-4-2: switch preferred over else-if for known cases
}
}
}
break;
default:
break;
}
if (nullptr != n)
{
n->ref->unref();
}
if (nullptr != pendingRequest)
{
pendingRequest->setClientCount(pendingRequest->clientCount() - 1U);
}
if (nullptr != pendingRequest && pendingRequest->clientCount() == 0U)
{
if (UNSIGNED(pendingRequest->type()) == MessageType::connect)
{
if (result == E_OK)
{
addr_t id = nextObjectId();
addr_t parent_id = EMPTY_ADDR;
addr_t bcId = nextObjectId();
// This is new to allow for virtual connections - experimental atm.
if (pendingRequest->source() == EMPTY_ADDR)
{
parent_id = 0U; // 0 is us, the router
// the next part only gets used for the 'ping' messages, so not needed on v-conn
if (auto conn = pendingRequest->transport()->linkedServerConnection())
{
conn->setRootObjectAddr(id);
}
}
else
{
parent_id = pendingRequest->source();
}
PropArray vaReg(pendingRequest->getData());
(void)registerObject(id, parent_id, bcId, vaReg, pendingRequest
->transport(), false);
PropArray va;
Property v(ParameterProperties::OBJECT_ADDR);
v.setData(id);
va.set(v);
pendingRequest->ack(I8(E_OK), va);
(void)group(id, parent_id);
}
else
{
pendingRequest->ack(I8(E_FAILED));
}
}
else if (data.count() == 0U)
{
pendingRequest->ack(I8(result));
}
else
{
pendingRequest->ack(I8(result), data);
}
pendingRequest->destroy();
}
}
int Router::authorize(Message* message, ServerConnection* connection, ServersideObject* caller, ServersideObject* callee)
{
int result = E_REFUSED;
if (caller != nullptr)
{
if (caller->transport() == nullptr)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlExplicit>(
"Security: received datagram from source %d on closed transport - dropping\n",
caller);
#endif
}
else if (message->isRequest() && (connection != nullptr && caller->transport()->wire() != connection->wire()))
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlExplicit>(
"Security: received datagram from source %d on wrong wire (is %p, should be %p) - dropping\n",
UNSIGNED(caller->node()->id), connection->wire(),
caller->transport()->wire());
#endif
}
else
{
result = E_OK;
}
}
else
{
result = E_OK;
}
if (result == E_OK && nullptr != callee && callee->requiresAuth() && connection->isAuthenticated() == 0)
{
result = E_UNAUTHORIZED;
}
return result;
}
void Router::receive()
{
Message* message{};
if (m_msgInQueue.wait_and_pop(&message) && nullptr != message)
{
auto servingConnection = message->transport()->linkedServerConnection();
unsigned callerId = message->source();
const PropArray& callerData = message->getData();
unsigned calleeId = message->destination();
unsigned calleeParentId = EMPTY_ADDR;
ServersideObject* caller = nullptr;
ServersideObject* callee = nullptr;
ServersideObject* calleeParent = nullptr;
ObjectRegistry::Node* callingObject = m_registry.findFirst(U16(callerId));
if (callingObject != nullptr)
{
caller = callingObject->ref;
}
ObjectRegistry::Node* calledObject = m_registry.findFirst(U16(calleeId));
if (nullptr != calledObject)
{
callee = calledObject->ref;
if (nullptr != calledObject->parent)
{
calleeParent = calledObject->parent->ref;
calleeParentId = calledObject->parent->id;
}
}
if ((nullptr == calledObject || nullptr == callingObject) &&
(UNSIGNED(message->type()) != MessageType::connect) &&
(UNSIGNED(message->type()) != MessageType::auth))
{
if (message->isRequest())
{
message->ack(I8(E_NOT_CONNECTED));
}
}
else
{
int res = authorize(message, servingConnection, caller, callee);
if (res != E_OK && message->isRequest())
{
message->ack(I8(res));
}
else
{
if (message->isResponse())
{
message->transport()->processResponse(message);
}
else if (message->isRequest())
{
switch (UNSIGNED(message->type()))
{
case MessageType::connect:
{
if (m_objectIds.size() >= UNSIGNED(UINT16_MAX - 2))
{
message->ack(I8(E_NO_ALLOC));
break;
}
#if FLAKE_AUTH_SUPPORT
else if ((m_authSink != nullptr) && servingConnection->isAuthenticated() == 0)
{
servingConnection->setConnected();
flakeAuthType at = m_authSink->authenticationType();
switch (at)
{
case flakeAuthType::atInteractive:
if (0 != m_authSink->onConnect(callerData))
{
message->ack(I8(E_UNAUTHORIZED));
}
break;
case flakeAuthType::atSignature:
{
PropArray paAuth;
char* sign_algo{};
tt_bin_t sign_hash{};
flakeAuthType auth_type{};
int c_len{};
int res = m_authSink->onAuthChallengeRequested(servingConnection->wire(),
&sign_algo,
&sign_hash.lpb,
&c_len);
if (E_OK == res)
{
if (c_len < UINT16_MAX)
{
sign_hash.cb = U16(c_len);
}
else
{
message->ack(I8(E_REFUSED));
}
auth_type = flakeAuthType::atSignature;
paAuth.set<ParameterProperties::SIGN_HASH>(sign_hash);
paAuth.set<ParameterProperties::SIGN_ALGO>(sign_algo);
paAuth.set<ParameterProperties::AUTH_TYPE>(U8(auth_type));
//TODO: Dynamic Memory
struct assoc_auth_data* ad = new struct assoc_auth_data;
ad->challenge_len = c_len;
ad->challenge = sign_hash.lpb;
ad->sign_algorithm = sign_algo;
auto* d = Message::create(mstIndication, MessageType::auth, 0U,EMPTY_ADDR);
for (unsigned int i = 0U; i < paAuth.count(); ++i)
{
d->setData(paAuth.getAt(i));
}
//TODO: Dynamic Memory
ResponseContext* rc = new ResponseContext();
rc->pendingRequest = Message::create(*d);
rc->assoc_data = ad;
int res = d->transport()->sendIndication(d,
this,
FLAKE_DEFAULT_TIMEOUT_MS,
rc);
if (res == E_NO_ALLOC)
{
rc->pendingRequest->ack(I8(E_NO_ALLOC));
}
}
}
break;
}
}
else
{
#endif
servingConnection->setConnected();
addr_t id = nextObjectId();
addr_t bcId = nextObjectId();
addr_t parent_id = 0U;
// This is new to allow for virtual connections - experimental atm.
if (callerId == EMPTY_ADDR)
{
// the next part only gets used for the 'ping' messages, so not needed on v-conn
if (nullptr != servingConnection)
{
servingConnection->setRootObjectAddr(id);
}
}
else
{
parent_id = U16(callerId);
}
PropArray vaReq(callerData);
(void)registerObject(id, parent_id, bcId, vaReq, servingConnection, false);
PropArray va;
Property v(ParameterProperties::OBJECT_ADDR);
v.setData(id);
va.set(v);
message->ack(I8(E_OK), va);
(void)group(id, parent_id);
}
#if FLAKE_AUTH_SUPPORT
}
#endif
break;
case MessageType::custom:
{
uint32_t timeout = FLAKE_DEFAULT_TIMEOUT_MS;
if (calledObject != nullptr && calleeId == calledObject->group)
{
timeout = 0U;
}
indicate(MessageType::customMsgReceived,
U16(callerId),
U16(calleeId),
&callerData,
message, EMPTY_ADDR, timeout);
}
break;
case MessageType::destroyObject:
{
if (nullptr != calleeParent &&
(UNSIGNED(calleeParent->node()->id) == callerId || callerId == calleeId))
{
indicate(MessageType::objectDestroyed, U16(callerId), U16(calleeId),
nullptr, message, EMPTY_ADDR, 0);
unregisterObject(U16(calleeId));
message->ack(I8(E_OK));
}
else
{
message->ack(I8(E_FAILED));
}
}
break;
case MessageType::getProperties:
{
PropArray vaOut;
PropArray vaIndi;
uint32_t i = 0U;
uint16_t cErr = 0U;
int8_t res = I8(E_OK);
if (callerData.count() > 0U)
{
for (; i < callerData.count(); i++)
{
Property v = callerData[i];
if (IS_VOLATILE(v.tag()))
{
vaIndi.set(Property(v.tag()));
}
else if (callee->has(v.tag()))
{
vaOut.set(callee->get(v.tag()));
}
else
{
cErr++;
res = I8(E_PARTIAL_SUCCESS);
v.setError(I8(E_NOT_FOUND));
vaOut.set(v);
}
}
}
else
{
for (; i < callee->count(); i++)
{
if (IS_VOLATILE((*callee)[i].tag()))
{
vaIndi.set(Property((*callee)[i].tag()));
}
else
{
vaOut.set((*callee)[i]);
}
}
}
if (vaIndi.count() > 0U)
{
indicate(MessageType::getPropertiesReq,
0U,
U16(calleeId),
&vaIndi,
message);
}
else
{
if (cErr == vaOut.count() && cErr > 0U)
{
res = I8(E_FAILED);
}
message->ack(res, vaOut);
}
}
break;
case MessageType::openProperty:
{
PropArray propertyData= callerData;
uint16_t id = nextObjectId();
Property p(ParameterProperties::OBJECT_ADDR);
p.setData(id);
propertyData.set(p);
message->setData(p);
// (void)registerObject(id, U16(calleeParentId), EMPTY_ADDR, propertyData, servingConnection, false);
indicate(MessageType::openPropertyReq,
U16(callerId),
U16(calleeId),
&propertyData,
message);
}
break;
case MessageType::streamWrite:
case MessageType::streamRead: //NOLINT MISRA 5-4-4 fallthrough on purpose
case MessageType::streamSeek: //NOLINT MISRA 5-4-4 fallthrough on purpose
{
uint8_t msg = U8(0U);
if (UNSIGNED(message->type()) == MessageType::streamRead)
{
msg = MessageType::streamReadReq;
}
else if (UNSIGNED(message->type()) == MessageType::streamWrite)
{
if (calleeParentId == callerId)
{
msg = MessageType::changed;
}
else
{
msg = MessageType::streamWriteReq;
}
}
else // (UNSIGNED(d->type()) == MessageType::streamSeek)
{
msg = MessageType::streamSeekReq;
}
indicate(msg,
U16(callerId),
U16(calleeId),
&callerData,
message);
}
break;
case MessageType::ping:
if (calleeId == 0U)
{
message->ack(I8(E_OK));
}
else
{
indicate(MessageType::ping,
U16(callerId),
U16(calleeId));
}
break;
case MessageType::setProperties:
{
PropArray vaOut{};
PropArray vaIndi{};
PropArray vaIndiActionable{};
int8_t res = I8(E_OK);
unsigned i = 0U;
unsigned cErr = 0U;
for (; i < callerData.count(); i++)
{
Property vNew (callerData[i].tag());
// to adjust missing modifiers
if (!(calleeParentId == message->source() || calleeId == message->source()))// && callerId != message->source())
{
if (!callee->has(vNew.tag()))
{
vNew.setError(I8(E_NOT_FOUND));
}
else
{
vNew = Property(callee->get(vNew.tag()).tag());
vNew.setData(callerData[i].data());
}
}
else
{
vNew.setData(callerData[i].data());
}
if (IS_READONLY(vNew.tag()) &&
!(calleeParentId == message->source() || calleeId == message->source()))
// latter is for bw compat.
{
Property vErr;
vErr.setError(I8(E_READ_ONLY));
vaOut.set(vErr);
cErr++;
res = I8(E_PARTIAL_SUCCESS);
continue;
}
Property vOld = callee->get(vNew.tag());
// if (vOld != vNew) ### bandwidth saving
{
if (calleeParentId != message->source() && calleeId != message->source())
// dtp. latter is for bw compat
{
if (vOld.isErrorValue())
{
vaOut.set(vOld);
cErr++;
}
else if (IS_RESERVED(vNew.tag()))
{
vOld.setError( I8(E_READ_ONLY));
vaOut.set(vOld);
cErr++;
}
else if (IS_ACTIONABLE(vNew.tag()))
{
vaIndiActionable.set(vNew);
}
else
{
callee->set(vNew);
if (!IS_META(vNew.tag())) {
vaOut.set(vNew);
vaIndi.set(vNew);
}
}
}
else
{
#if FLAKE_DEBUG_LOGGING
if (logging::logLevel >= lvlRaw)
{
char* str = vNew.toString();
logging::logf<lvlRaw>(" - property changed or new: %s\n", str);
delete[] str;
}
#endif
callee->set(vNew);
if (!IS_META(vNew.tag()))
{
vaOut.set(vNew);
vaIndi.set(vNew);
}
}
}
}
if (vaIndiActionable.count() > 0U)
{
Message* pendingRequest = Message::create(*message);
pendingRequest->clearData();
for (unsigned int i = 0U; i < vaOut.count(); ++i)
{
pendingRequest->setData(vaOut.getAt(i));
}
indicate(MessageType::setPropertiesReq,
U16(callerId),
U16(calleeId),
&vaIndiActionable,
pendingRequest);
pendingRequest->destroy();
}
else if (vaIndi.count() > 0U)
{
indicate(MessageType::changed,
U16(calleeId),
callee->node()->group,
&vaIndi,
nullptr, U16(callerId), 0);
message->ack(res, vaOut);
}
else
{
if (cErr == callerData.count())
{
message->ack(I8(E_FAILED), vaOut);
}
else
{
message->ack(I8(E_NO_CHANGES), vaOut);
}
}
}
break;
#if FLAKE_AUTH_SUPPORT
case MessageType::auth:
{
if (calleeId == 0U)
{
if (m_authSink == nullptr)
{
message->ack(I8(E_FAILED));
break;
}
if (callerData.has(ParameterProperties::AUTH_TYPE))
{
Property pAuthType = callerData.get(ParameterProperties::AUTH_TYPE);
PropArray pAuthResp;
flakeAuthType auth_type = flakeAuthType::atSignature;
if (*pAuthType.value<tt_uint8_t>() == 2U)
{
auth_type = flakeAuthType::atInteractive;
}
switch (auth_type)
{
case flakeAuthType::atSignature:
{
Property pSignAlgo = callerData.get(ParameterProperties::SIGN_ALGO);
if (pSignAlgo.isErrorValue())
{
pSignAlgo.setError(I8(E_INCOMPLETE));
pAuthResp.set(pSignAlgo);
message->ack(I8(E_FAILED), pAuthResp);
break;
}
Property pAuthHash = callerData.get(ParameterProperties::SIGN_HASH);
if (pAuthHash.isErrorValue())
{
pAuthHash.setError( I8(E_INCOMPLETE));
pAuthResp.set(pAuthHash);
message->ack(I8(E_FAILED), pAuthResp);
break;
}
uint8_t* signature{};
int signature_len{};
tt_bin_t auth_hash = *pAuthHash.value<tt_bin_t>();
auto sign_algo = pSignAlgo.value<tt_str_t>()->c_str();
m_authSink->onAuthChallengeReceived(
sign_algo, auth_hash.lpb, SIGNED(auth_hash.cb),
&signature, &signature_len);
if (signature_len != 0 && signature != nullptr)
{
tt_bin_t auth_sign;
auth_sign.cb = U16(signature_len);
//TODO: Dynamic Memory
auth_sign.lpb = (uint8_t*)malloc(U16(signature_len));
utils::memcpy(auth_sign.lpb, signature, UNSIGNED(signature_len));
pAuthResp.set<ParameterProperties::SIGNATURE>(auth_sign);
free(signature);
message->ack(I8(E_OK), pAuthResp);
}
else
{
message->ack(I8(E_FAILED));
}
}
break;
default:
{
pAuthType.setError( I8(E_UNSUPPORTED));
pAuthResp.set(pAuthType);
message->ack(I8(E_FAILED), pAuthResp);
}
break;
}
}
else
{
message->ack(I8(E_INCOMPLETE));
}
}
else
{
message->ack(I8(E_UNSUPPORTED));
}
}
break;
#endif
case MessageType::queryObjects:
{
Property request_uuid = callerData.get(ParameterProperties::OBJECT_TYPE);
//TODO: Dynamic Memory
ValueTable* tbl = new ValueTable();
auto nl = m_registry.all();
for (auto i = nl->begin(); i != nl->end(); ++i)
{
Property v = (*i)->ref->get(ParameterProperties::OBJECT_TYPE);
if (!v.isErrorValue())
{
if ((*i)->ref->requiresAuth() && servingConnection->isAuthenticated() == 0)
{
continue;
}
if (request_uuid.isErrorValue() || v == request_uuid)
{
//TODO: Dynamic Memory
auto va = new PropArray();
Property vAddr(ParameterProperties::OBJECT_ADDR);
vAddr.setData( (*i)->id);
va->set(vAddr);
if (request_uuid.isErrorValue())
{
va->set(v);
}
tbl->m_rows.push_back(va);
}
}
(*i)->ref->unref();
}
tt_bin_t bin;
(void)tbl->serialize(&bin.cb, &bin.lpb);
Property v(ParameterProperties::OBJECT_TABLE);
v.setData(bin);
//TODO: Dynamic Memory
delete tbl;
PropArray vaOut;
vaOut.set(v);
message->ack(I8(E_OK), vaOut);
//TODO: Dynamic Memory
delete nl;
if (bin.cb != 0U && bin.lpb != nullptr)
{
free(bin.lpb);
}
}
break;
case MessageType::joinGroup:
{
if (m_registry.isInGroup(U16(callerId), U16(calleeId)))
{
message->ack(I8(E_OK));
break;
}
uint16_t group_id = group(U16(callerId), U16(calleeId));
if (group_id != 0U)
{
PropArray pa;
Property p(ParameterProperties::BROADCAST_ADDR);
p.setData(group_id);
pa.set(p);
message->ack(I8(E_OK), pa);
}
else
{
message->ack(I8(E_NOT_FOUND));
}
}
break;
case MessageType::leaveGroup:
{
ungroup(U16(callerId), U16(calleeId));
message->ack(I8(E_OK));
}
break;
case MessageType::createObject:
{
if (m_objectIds.size() >= U16(UINT16_MAX - 2))
{
message->ack(I8(E_NO_ALLOC));
}
else
{
addr_t id = nextObjectId();
addr_t broadcastId = nextObjectId();
PropArray objectData = callerData;
bool requires_auth = false;
Property req_auth = callerData.get(ParameterProperties::REQUIRES_AUTH);
if (!req_auth.isErrorValue())
{
requires_auth = *req_auth.value<tt_bool_t>();
}
(void)registerObject(id, U16(callerId), broadcastId, callerData,
servingConnection, requires_auth);
ObjectRegistry::Node* newObject = m_registry.findFirst(id);
if (nullptr != newObject)
{
for (unsigned int i = 0U; i < callerData.count(); i++)
{
if (callerData[i].isErrorValue())
{
continue;
}
newObject->ref->set(callerData[i]);
}
newObject->ref->unref();
}
Property v1 (ParameterProperties::BROADCAST_ADDR);
v1.setData(broadcastId);
objectData.set(v1);
Property v2(ParameterProperties::OBJECT_ADDR);
v2.setData( id);
objectData.set(v2);
message->ack(I8(E_OK), objectData);
}
}
break;
case MessageType::disconnect:
{
message->ack(I8(E_OK));
if (servingConnection->wire()->type() == WireType::wtDatagram)
{
servingConnection->shutdown();
}
else
{
unregisterObject(U16(callerId));
}
}
break;
default:
message->ack(I8(E_UNSUPPORTED));
break;
}
}
else
{
// MISRA 6-4-2: switch preferred over else-if for known cases
}
}
if (nullptr != callee)
{
callee->unref();
}
if (nullptr != caller)
{
caller->unref();
}
}
message->destroy();
}
}
void Router::checkConnections()
{
(void)flakeMutexAcquire(m_connMutex);
auto i = m_connections.begin();
for (; i != m_connections.end(); ++i)
{
ServerConnection* conn = *i;
if (m_doShutdown)
{
m_closedConnections.push_back(conn);
}
else if (conn->stale())
{
if (conn->pingCount() < 3U)
{
conn->ping();
}
else
{
m_closedConnections.push_back(conn);
}
}
else if (!conn->connected())
{
conn->increaseConnectionWaitCounter();
if (conn->connectionWaitCounter() > 3)
{
m_closedConnections.push_back(conn);
}
}
else
{
conn->resetPingCount();
}
}
flakeMutexRelease(m_connMutex);
for (i = m_closedConnections.begin(); i != m_closedConnections.end(); ++i)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo>("Closing stale connection id%04x\n", (*i)->id());
#endif
(*i)->shutdown();
}
m_closedConnections.clear();
if (m_doShutdown && m_connections.empty())
{
m_isShutdown = true;
flakeSemaphoreRelease(m_runCond);
}
}
/*
MISRA-C:2012 Rule 5-2-8 deviation justification:
- POSIX thread entry functions require the signature `void*(*)(void*)`.
- The cast from `void*` to `Router*` is verified and safe,
as we pass exactly the same pointer object that was originally allocated.
*/
void Router::sendThread(void* arg)
{
auto r = static_cast<Router*>(arg); //NOLINT, Rule 5-2-8 - seea above
while (!r->m_doShutdown)
{
Message* message = nullptr;
(void)r->m_msgOutQueue.wait_and_pop(&message);
if (message != nullptr)
{
Transport* t = message->transport();
if (nullptr != t)
{
(void)t->reply(message);
}
}
}
flakeThreadExit();
}
void Router::shutdown()
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo>("Shutting down.\n");
#endif
m_doShutdown = true;
while (!finishedShutdown())
{
flakeUSleep(100U * 1000U);
}
(void)m_msgInQueue.push(nullptr);
(void)m_msgOutQueue.push(nullptr);
for (unsigned i = 0U; i < m_transportThreads.size(); i++)
{
flakeThreadTerminate(m_transportThreads[i]);
}
for (unsigned i = 0U; i < m_serverWires.size(); i++)
{
delete m_serverWires[i];
}
}
void Router::addServerWire(ServerWire* w)
{
//TODO: Dynamic Memory
auto srvw = new struct ServerWireRecord;
srvw->wire = w;
srvw->router = this;
m_serverWires.push_back(w);
flakeThreadAttr_t attr = {};
attr.name = ROUTER_SERVER_WIRE_THREAD_NAME;
attr.priority = flakeThreadPriorityBelowNormal;
attr.stack_size = U32(w->stackSizeBytes());
flakeThreadId_t thread{};
flakeThreadNew(&thread, &Router::acceptThread, srvw, &attr);
m_transportThreads.push_back(thread);
}
/*
MISRA-C:2012 Rule 5-2-8 deviation justification:
- POSIX thread entry functions require the signature `void*(*)(void*)`.
- The cast from `void*` to `Router*` is verified and safe,
as we pass exactly the same pointer object that was originally allocated.
*/
__attribute__((noreturn))
void Router::acceptThread(void* p)
{
auto swr = static_cast<struct ServerWireRecord*>(p); //NOLINT, Rule 5-2-8 - seea above
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlError>("Initializing transport %s\n", swr->wire->describe());
#endif
while (true)
{
if (swr->wire->available())
{
if (swr->wire->init() != E_OK)
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlError>("Initializing %s failed: %d \n", swr->wire->describe(), errno);
#endif
flakeUSleep(5U * MICROSEC_PER_SEC);
continue;
}
}
else
{
flakeUSleep(1U * MICROSEC_PER_SEC);
continue;
}
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlError>("Transport %s is available.\n", swr->wire->describe());
#endif
while (swr->wire->available())
{
if (swr->router->acceptsMoreConnections())
{
Wire* clientWire = swr->wire->accept();
if (clientWire != nullptr)
{
(void)swr->router->addClient(clientWire);
}
else
{
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlError>("accept() on %s failed: %d \n",swr->wire->describe(), errno);
#endif
}
}
else
{
flakeUSleep(1U * MICROSEC_PER_SEC);
}
}
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlError>("Transport %s is unavailable.\n", swr->wire->describe());
#endif
}
}
/*
MISRA-C:2012 Rule 5-2-8 deviation justification:
- POSIX thread entry functions require the signature `void*(*)(void*)`.
- The cast from `void*` to `Router*` is verified and safe,
as we pass exactly the same pointer object that was originally allocated.
*/
void Router::maintenanceThread(void* p)
{
auto r = static_cast<Router*>(p); //NOLINT, Rule 5-2-8 - seea above
while (!r->finishedShutdown())
{
r->doHousekeeping();
flakeUSleep(1U * MICROSEC_PER_SEC);
}
flakeThreadExit();
}
/*
MISRA-C:2012 Rule 5-2-8 deviation justification:
- POSIX thread entry functions require the signature `void*(*)(void*)`.
- The cast from `void*` to `Router*` is verified and safe,
as we pass exactly the same pointer object that was originally allocated.
*/
void Router::receiveThread(void* p)
{
auto r = static_cast<Router*>(p); //NOLINT, Rule 5-2-8 - seea above
r->m_isRunning = true;
while (!r->finishedShutdown())
{
r->receive();
}
flakeThreadExit();
}
void Router::run()
{
if (!m_isRunning)
{
flakeThreadAttr_t attr_process = {};
attr_process.name = ROUTER_PROCESS_THREAD_NAME;
attr_process.stack_size = 8192U;
attr_process.priority = flakeThreadPriorityNormal;
flakeThreadAttr_t attr_maint = {};
attr_maint.name = ROUTER_MAINTENANCE_THREAD_NAME;
attr_maint.stack_size = 2048U;
attr_maint.priority = flakeThreadPriorityNormal;
flakeThreadId_t thread{};
flakeThreadNew(&thread, &receiveThread, this, &attr_process);
flakeThreadNew(&thread, &maintenanceThread, this, &attr_maint);
#if FLAKE_DEBUG_LOGGING
logging::logf<lvlInfo>("Dispatcher running.\n");
#endif
}
}
}