Skip to content

file /Users/ios_developer/workspace/coldwave-os/build/_deps/flake-src/routing/Router.cpp

Source code

cpp
/*******************************************************************************
 * @file      Router.cpp
 * @license   This file is part of the ImagineOn Flake software package
 *            licensed under the ImagineOn software-licensing terms available
 *            under https://www.imagineon.de/de/info/licensing-terms
 * @copyright Copyright (c) 2025 ImagineOn GmbH. www.imagineon.de.
*******************************************************************************/

#include <routing/Router.h>
#include <protocol/Message.h>
#include <flake.h>
#include <TableImpl.h>
#include <cstring>

namespace
{
    constexpr char ROUTER_SERVER_WIRE_THREAD_NAME[] = "flk:r:acc";
    constexpr char ROUTER_SEND_THREAD_NAME[] = "flk:r:snd";
    constexpr char ROUTER_PROCESS_THREAD_NAME[] = "flk:r:prc";
    constexpr char ROUTER_MAINTENANCE_THREAD_NAME[] = "flk:r:mnt";
}


namespace flake
{


    //---
    //
    // Router
    //
    //---
    Router::Router()
        : m_authSink(nullptr)
    {
        m_runMutex = {};
        m_runCond = {};
        m_connMutex = {};
        m_idMutex = {};
        m_pvm_mutex = {};
        m_sendThread = {};
        m_maxConnections = 0U;

        flakeMutexNew(&m_idMutex, nullptr);
        flakeMutexNew(&m_pvm_mutex, nullptr);
        flakeMutexNew(&m_runMutex, nullptr);

        flakeMutexAttr_t _attr = {};
        _attr.attr_bits = flakeMutexRecursive;
        flakeMutexNew(&m_connMutex, &_attr);

        flakeSemaphoreNew(&m_runCond, 1U, 1U);

        m_isShutdown = false;
        m_doShutdown = false;
        m_isRunning = false;

        m_msgInQueue.ref();
        m_msgInQueue.lock();

        m_nextConnectionId = 0;
        (void)m_objectIds.insert(U16(0U));
        (void)m_objectIds.insert(U16(0xFFFF));
        m_nextObjectId = 1U;

        //TODO: Dynamic Memory
        auto* so = new ServersideObject(m_rootValues);

        m_rootGroup = nextObjectId();
        m_rootNode = m_registry.setRoot(0U, m_rootGroup, *so);
        so->ref();

        flakeThreadAttr_t attr = {};

        attr.name = ROUTER_SEND_THREAD_NAME;
        attr.stack_size = 2048U;
        attr.priority = flakeThreadPriorityNormal;

        flakeThreadNew(&m_sendThread, &Router::sendThread, this, &attr);
    }

    Router::~Router()
    {
        shutdown();
        m_msgInQueue.unlock();
        m_msgOutQueue.unref();
    }

    void Router::setAuthenticationSink(AuthenticationSink* a)
    {
        m_authSink = a;
    }

    addr_t Router::nextObjectId()
    {
        (void)flakeMutexAcquire(m_idMutex);

        auto iter = m_objectIds.lower_bound(m_nextObjectId);
        while (iter != m_objectIds.end() && UNSIGNED(*iter) == m_nextObjectId)
        {
            ++iter;
            ++m_nextObjectId;
        }
        addr_t res = m_nextObjectId++;
        (void)m_objectIds.insert(res);
        flakeMutexRelease(m_idMutex);
        return res;
    }

    void Router::releaseObjectId(addr_t id)
    {
        (void)flakeMutexAcquire(m_idMutex);
        (void)m_objectIds.erase(id);
        flakeMutexRelease(m_idMutex);
    }

    void Router::disconnect(ServerConnection* c)
    {
        ObjectRegistry::NodeList* nl = m_registry.findAll(m_rootGroup);

        auto i = nl->begin();

        addr_t id = 0U;

        for (; i != nl->end(); ++i)
        {
            if ((*i)->ref->transport() == c)
            {
                id = (*i)->id;
            }
            (*i)->ref->unref();
        }

        if (id != 0U)
        {
            unregisterObject(id);
        }

        //TODO: Dynamic Memory
        delete nl;

        (void)flakeMutexAcquire(m_connMutex);

        auto j = m_connections.begin();
        while (j != m_connections.end())
        {
            if (*j == c)
            {
                j = m_connections.erase(j);
            }
            else
            {
                ++j;
            }
        }


        flakeMutexRelease(m_connMutex);
#if FLAKE_DEBUG_LOGGING
        logging::logf<lvlInfo>("connection %04x ended\n", c->id());
#endif

        c->unref(); 

        m_registry.dumpDirectory();
    }

    bool Router::registerObject(addr_t id, addr_t parentId, addr_t broadcastId, const PropArray& va,
                                Transport* c, bool requires_auth)
    {
        bool isWritable = true;

        //TODO: Dynamic Memory
        auto obj = new ServersideObject(va, c, requires_auth);

        ObjectRegistry::Node* n = nullptr;

        if (m_registry.add(parentId, id, broadcastId, *obj, &n) == E_OK)
        {
            obj->setNode(n);
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - registered object %04X as child of %04X\n", UNSIGNED(id), UNSIGNED(parentId));
#endif
            auto parentObj = m_registry.findFirst(parentId);
            if (nullptr != parentObj && parentObj->id != 0U)
            {
                PropArray vaIndi;
                Property v(ParameterProperties::OBJECT_ADDR);
                v.setData(id);
                vaIndi.set(v);

                Property v2(ParameterProperties::OBJECT_TYPE);
                v2 = va.get(ParameterProperties::OBJECT_TYPE);
                if (!v2.isErrorValue())
                {
                    vaIndi.set(v2);
                }
                indicate(MessageType::objectCreated, parentId, m_rootGroup, &vaIndi, nullptr, EMPTY_ADDR, 0);

                parentObj->ref->unref();
            }
        }
        else
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - parent %04X not found!\n", UNSIGNED(parentId));
#endif
        }

        return isWritable;
    }

    void Router::unregisterObject(addr_t id)
    {
#if FLAKE_DEBUG_LOGGING
        uint32_t childCount = 0U;
        logging::logf<lvlRaw>(" - unregistering object %04X\n", UNSIGNED(id));
#endif
        ObjectRegistry::NodeList* children = m_registry.remove(id);

        if (children != nullptr)
        {
#if FLAKE_DEBUG_LOGGING
            childCount = U32(children->size());
#endif

            while (!children->empty())
            {
                ObjectRegistry::Node* ch = children->front();

                const std::vector<addr_t>& groupMemberships = m_registry.findAllGroupMemberships(ch->id);

                for (unsigned short groupMembership : groupMemberships)
                {
                    ungroup(ch->id, groupMembership);
                }

                releaseObjectId(ch->id);
                releaseObjectId(ch->group);

                ch->ref->unref();

                children->pop_front();
            }

            //TODO: Dynamic Memory
            delete children;
        }


#if FLAKE_DEBUG_LOGGING
        logging::logf<lvlRaw>(" - unregistered object %04X (and %ld children)\n", UNSIGNED(id), childCount - 1U);
#endif
    }

    addr_t Router::group(addr_t objectId, addr_t groupId)
    {
        addr_t realGroupId = m_registry.group(objectId, groupId);

        if (realGroupId == 0U)
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - requested object (%04X) doesn't exist\n", UNSIGNED(objectId));
#endif
        }
        else if (UNSIGNED(realGroupId) != groupId)
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - object %04X joined object %04X's group (%04X)\n", UNSIGNED(objectId),
                                  UNSIGNED(groupId), UNSIGNED(realGroupId));
#endif
        }
        else
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - object %04X joined group %04X\n", UNSIGNED(objectId), UNSIGNED(groupId));
#endif
        }

        if (realGroupId != 0U)
        {
            indicate(MessageType::joined, objectId, realGroupId,
                nullptr,nullptr, EMPTY_ADDR, 0);
        }

        return realGroupId;
    }

    void Router::ungroup(addr_t objectId, addr_t groupId)
    {
        m_registry.ungroup(objectId, groupId);

        indicate(MessageType::left, objectId, groupId, nullptr, nullptr, EMPTY_ADDR, 0);
#if FLAKE_DEBUG_LOGGING
        logging::logf<lvlRaw>(" - object %04X ungrouped\n", UNSIGNED(objectId));
#endif
    }

    void Router::setMaxConnections(unsigned max) {
#if FLAKE_DEBUG_LOGGING
      logging::logf<lvlInfo>("setting max connections to %d\n", max);
#endif
      m_maxConnections = max;
    }


    bool Router::acceptsMoreConnections() const {
      return m_maxConnections == 0U || m_connections.size() < m_maxConnections;
    }

    ServerConnection* Router::addClient(Wire* w)
    {
        (void)flakeMutexAcquire(m_connMutex);
        auto i = m_connections.begin();
        ServerConnection* duplicate = nullptr;

        for (; i != m_connections.end(); ++i)
        {
            ServerConnection* conn = *i;

            if (conn->wire() == w)
            {
                duplicate = conn;
                break;
            }
        }
        flakeMutexRelease(m_connMutex);

        if (nullptr != duplicate)
        {
            // duplicate->shutdown(); TODO: could happen twice through stale check
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlInfo>("incoming connection on %p replaces id%04x as id%04x\n", w, duplicate
                                   ->id(), m_nextConnectionId);
#endif
        }

        //TODO: Dynamic Memory
        ServerConnection* c = new ServerConnection(w, this, &m_msgInQueue, m_nextConnectionId++);

        c->ref(); 

        m_connections.push_back(c);
        flakeMutexRelease(m_connMutex);
#if FLAKE_DEBUG_LOGGING
        if (nullptr == duplicate)
        {
            logging::logf<lvlInfo>("incoming connection on %p - id%04x\n", w, c->id());
        }
#endif
        return c;
    }

    void Router::indicate(uint8_t messageType, addr_t source,
                          addr_t destination, const PropArray* data, Message* req,
                          addr_t exclude_addr, uint32_t timeout)
    {
        ObjectRegistry::NodeList* nl = m_registry.findAll(destination);

        std::deque<Transport*> served_transports;

        if (m_registry.isInGroup(source, destination) && nl->size() == 1U)
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - src (%d) == dst (%d) - 0 other members in group -> no indication\n",
                                  UNSIGNED(source), UNSIGNED(destination));
#endif
            for (auto& i : *nl)
            {
                i->ref->unref();
            }
            if (nullptr != req)
            {
                req->ack(I8(E_OK));
                req->destroy();
            }
        }
        else if (nl->empty())
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlRaw>(" - destination (%d) unreachable\n", UNSIGNED(destination));
#endif
            if (nullptr != req)
            {
                req->ack(I8(E_DESTINATION_UNREACHABLE));
                req->destroy();
            }
        }
        else
        {
            for (auto& it : *nl)
            {
                ObjectRegistry::Node& node = *it;
                ServersideObject* object = node.ref;

                if ((UNSIGNED(node.id) == exclude_addr) && nl->size() > 1U)
                {
                    object->unref();
                    continue;
                }

                auto* d = Message::create(mstIndication, messageType, source, destination);

                if (nullptr != d)
                {
                    if (nullptr != data)
                    {
                        for (unsigned int i = 0U; i < data->count(); ++i)
                        {
                            d->setData(data->getAt(i));
                        }
                    }

                    if (nullptr != object->transport() && nullptr != object->transport()->wire())
                    {
                        bool already_served = false;

                        for (auto& served_transport : served_transports)
                        {
                            if (served_transport == object->transport())
                            {
                                d->destroy();
                                object->unref();
                                already_served = true;
                                break;
                            }
                        }

                        if (!already_served)
                        {
                            ResponseContext* rc = nullptr;
                            if (nullptr != req && timeout > 0U)
                            {
                                //TODO: Dynamic Memory
                                rc = new ResponseContext();
                                req->setClientCount(req->clientCount() + 1U);
                                rc->pendingRequest = Message::create(*req);

                                rc->assoc_data = nullptr;
                            }
                            served_transports.push_back(object->transport());
                            int indi_res = object->transport()->sendIndication(d, this, timeout, rc);
                            if (indi_res == E_NO_ALLOC)
                            {
                                d->destroy();
                                if (nullptr != rc && nullptr != rc->pendingRequest)
                                {
                                    rc->pendingRequest->ack(I8(E_NO_ALLOC));
                                    rc->pendingRequest->destroy();
                                }
                            }
#if FLAKE_DEBUG_LOGGING
                            else
                            {
                                logging::logf<lvlRaw>(" - indication for %04X sent to Wire %p\n", UNSIGNED(destination),
                                                      object->transport()->wire());
                            }
#endif
                            object->unref();
                        }
                    }
                    else
                    {
                        d->destroy();
                        object->unref();
                    }
                }
            }

        }
        //TODO: Dynamic Memory
        delete nl;
    }

    void Router::onResponse(Message* response, ResponseContext* ctx)
    {
        int32_t result = response->result();
        Transport* transport = response->transport();
        addr_t sender = response->source();

        const PropArray& data = response->getData();
        ObjectRegistry::Node* n = m_registry.findFirst(sender);
        Message* pendingRequest = nullptr;

        if (ctx != nullptr)
        {
            pendingRequest = ctx->pendingRequest;
        }

        switch (UNSIGNED(response->type()))
        {
        case MessageType::ping:
        case MessageType::customMsgReceived: //NOLINT MISRA 5-4-4 fallthrough on purpose
            break;

        case MessageType::auth:
            {
                if (m_authSink != nullptr)
                {
                    Property pSign = data.get(ParameterProperties::SIGNATURE);

                    if (!pSign.isErrorValue())
                    {
                        struct assoc_auth_data* ad = (struct assoc_auth_data*)ctx->assoc_data;

                        if (0 == m_authSink->onAuthResponseReceived(ad->sign_algorithm, ad->challenge,
                                                                    ad->challenge_len,
                                                                    pSign.value<tt_bin_t>()->lpb,
                                                                    SIGNED(pSign.value<tt_bin_t>()->cb)))
                        {
                            auto conn = transport->linkedServerConnection();
                            conn->setAuthenticated(true);
                        }
                    }
                }
            }
            break;
        case MessageType::getPropertiesReq:
            {
                if (nullptr != n)
                {
                    if (result == E_OK)
                    {
                        for (uint32_t i = 0U; i < data.count(); i++)
                        {
                            if (!data[i].isErrorValue())
                            {
                                n->ref->set(data[i]);
                            }
                            else
                            {
                                result = E_PARTIAL_SUCCESS;
                            }
                        }
                    }
                }
            }
            break;
        case MessageType::openPropertyReq:
            {
                uint16_t id = *pendingRequest->getData().get(ParameterProperties::OBJECT_ADDR).value<uint16_t>();

                if (result != E_OK)
                {
                    releaseObjectId(id);
                }
                else
                {
                    addr_t bcId = nextObjectId();
                    group(id, bcId);
                    (void)registerObject(id,
                                         pendingRequest->source(),
                                         bcId,
                                         data,
                                         response->transport(),
                                         false);
                }
            }
            break;
        case MessageType::setPropertiesReq:
            {
                if (nullptr != n)
                {
                    if (result == E_IGNORED)
                    {
                        result = E_OK;
                    }

                    PropArray vaIndi;

                    uint16_t pc = U16(data.count());
                    uint16_t ec = 0U;

                    if (result == E_OK || result == E_PARTIAL_SUCCESS)
                    {
                        for (uint32_t i = 0U; i < data.count(); i++)
                        {
                            if (!data[i].isErrorValue())
                            {
                                n->ref->set(data[i]);
                                vaIndi.set(data[i]);
                            }
                            else
                            {
                                ec++;
                            }
                        }
                    }

                    if (UNSIGNED(ec) == pc && ec != 0U)
                    {
                        result = E_FAILED;
                    }
                    else if (vaIndi.count() > 0U)
                    {
                        addr_t grp = n->group;
                        indicate(MessageType::changed, sender, grp, &vaIndi, nullptr,
                                  pendingRequest->source(), 0);
                    }
                    else
                    {
                        // MISRA 6-4-2: switch preferred over else-if for known cases
                    }
                }
            }
            break;

        default:
            break;
        }

        if (nullptr != n)
        {
            n->ref->unref();
        }
        if (nullptr != pendingRequest)
        {
            pendingRequest->setClientCount(pendingRequest->clientCount() - 1U);
        }
        if (nullptr != pendingRequest && pendingRequest->clientCount() == 0U)
        {
            if (UNSIGNED(pendingRequest->type()) == MessageType::connect)
            {
                if (result == E_OK)
                {
                    addr_t id = nextObjectId();
                    addr_t parent_id = EMPTY_ADDR;
                    addr_t bcId = nextObjectId();

                    // This is new to allow for virtual connections - experimental atm.
                    if (pendingRequest->source() == EMPTY_ADDR)
                    {
                        parent_id = 0U; // 0 is us, the router

                        // the next part only gets used for the 'ping' messages, so not needed on v-conn
                        if (auto conn = pendingRequest->transport()->linkedServerConnection())
                        {
                            conn->setRootObjectAddr(id);
                        }
                    }
                    else
                    {
                        parent_id = pendingRequest->source();
                    }

                    PropArray vaReg(pendingRequest->getData());
                    (void)registerObject(id, parent_id, bcId, vaReg, pendingRequest
                                         ->transport(), false);

                    PropArray va;
                    Property v(ParameterProperties::OBJECT_ADDR);
                    v.setData(id);
                    va.set(v);
                    pendingRequest->ack(I8(E_OK), va);
                    (void)group(id, parent_id);
                }
                else
                {
                    pendingRequest->ack(I8(E_FAILED));
                }
            }
            else if (data.count() == 0U)
            {
                pendingRequest->ack(I8(result));
            }
            else
            {
                pendingRequest->ack(I8(result), data);
            }

            pendingRequest->destroy();
        }
    }

    int Router::authorize(Message* message, ServerConnection* connection, ServersideObject* caller, ServersideObject* callee)
    {
        int result = E_REFUSED;
        if (caller != nullptr)
        {
            if (caller->transport() == nullptr)
            {
#if FLAKE_DEBUG_LOGGING
                logging::logf<lvlExplicit>(
                    "Security: received datagram from source %d on closed transport - dropping\n",
                    caller);
#endif
            }
            else if (message->isRequest() && (connection != nullptr && caller->transport()->wire() != connection->wire()))
            {
#if FLAKE_DEBUG_LOGGING
                logging::logf<lvlExplicit>(
                    "Security: received datagram from source %d on wrong wire (is %p, should be %p) - dropping\n",
                    UNSIGNED(caller->node()->id), connection->wire(),
                    caller->transport()->wire());
#endif
            }
            else
            {
                result = E_OK;
            }
        }
        else
        {
            result = E_OK;
        }

        if (result == E_OK && nullptr != callee && callee->requiresAuth() && connection->isAuthenticated() == 0)
        {
            result = E_UNAUTHORIZED;
        }

        return result;
    }

    void Router::receive()
    {
        Message* message{};

        if (m_msgInQueue.wait_and_pop(&message) && nullptr != message)
        {

            auto servingConnection = message->transport()->linkedServerConnection();

            unsigned callerId = message->source();
            const PropArray& callerData = message->getData();

            unsigned calleeId = message->destination();
            unsigned calleeParentId = EMPTY_ADDR;

            ServersideObject* caller = nullptr;
            ServersideObject* callee = nullptr;
            ServersideObject* calleeParent = nullptr;

            ObjectRegistry::Node* callingObject = m_registry.findFirst(U16(callerId));

            if (callingObject != nullptr)
            {
                caller = callingObject->ref;
            }

            ObjectRegistry::Node* calledObject = m_registry.findFirst(U16(calleeId));

            if (nullptr != calledObject)
            {
                callee = calledObject->ref;
                if (nullptr != calledObject->parent)
                {
                    calleeParent = calledObject->parent->ref;
                    calleeParentId = calledObject->parent->id;
                }
            }

            if ((nullptr == calledObject || nullptr == callingObject) &&
                (UNSIGNED(message->type()) != MessageType::connect) &&
                (UNSIGNED(message->type()) != MessageType::auth))
                {
                    if (message->isRequest())
                    {
                        message->ack(I8(E_NOT_CONNECTED));
                    }
            }
            else
            {
                int res = authorize(message, servingConnection, caller, callee);

                if (res != E_OK && message->isRequest())
                {
                    message->ack(I8(res));
                }
                else
                {
                    if (message->isResponse())
                    {
                        message->transport()->processResponse(message);
                    }
                    else if (message->isRequest())
                    {
                        switch (UNSIGNED(message->type()))
                        {
                        case MessageType::connect:
                            {
                                if (m_objectIds.size() >= UNSIGNED(UINT16_MAX - 2))
                                {
                                    message->ack(I8(E_NO_ALLOC));
                                    break;
                                }
#if FLAKE_AUTH_SUPPORT
                                else if ((m_authSink != nullptr) && servingConnection->isAuthenticated() == 0)
                                {
                                    servingConnection->setConnected();

                                    flakeAuthType at = m_authSink->authenticationType();

                                    switch (at)
                                    {
                                    case flakeAuthType::atInteractive:
                                        if (0 != m_authSink->onConnect(callerData))
                                        {
                                            message->ack(I8(E_UNAUTHORIZED));
                                        }
                                        break;
                                    case flakeAuthType::atSignature:
                                        {
                                            PropArray paAuth;
                                            char* sign_algo{};
                                            tt_bin_t sign_hash{};
                                            flakeAuthType auth_type{};
                                            int c_len{};

                                            int res = m_authSink->onAuthChallengeRequested(servingConnection->wire(),
                                                &sign_algo,
                                                &sign_hash.lpb,
                                                &c_len);

                                            if (E_OK == res)
                                            {
                                                if (c_len < UINT16_MAX)
                                                {
                                                    sign_hash.cb = U16(c_len);
                                                }
                                                else
                                                {
                                                    message->ack(I8(E_REFUSED));
                                                }

                                                auth_type = flakeAuthType::atSignature;
                                                paAuth.set<ParameterProperties::SIGN_HASH>(sign_hash);
                                                paAuth.set<ParameterProperties::SIGN_ALGO>(sign_algo);
                                                paAuth.set<ParameterProperties::AUTH_TYPE>(U8(auth_type));

                                                //TODO: Dynamic Memory
                                                struct assoc_auth_data* ad = new struct assoc_auth_data;

                                                ad->challenge_len = c_len;
                                                ad->challenge = sign_hash.lpb;
                                                ad->sign_algorithm = sign_algo;

                                                auto* d = Message::create(mstIndication, MessageType::auth, 0U,EMPTY_ADDR);

                                                for (unsigned int i = 0U; i < paAuth.count(); ++i)
                                                {
                                                    d->setData(paAuth.getAt(i));
                                                }

                                                //TODO: Dynamic Memory
                                                ResponseContext* rc = new ResponseContext();
                                                rc->pendingRequest = Message::create(*d);
                                                rc->assoc_data = ad;

                                                int res = d->transport()->sendIndication(d,
                                                    this,
                                                    FLAKE_DEFAULT_TIMEOUT_MS,
                                                    rc);
                                                if (res == E_NO_ALLOC)
                                                {
                                                   rc->pendingRequest->ack(I8(E_NO_ALLOC));
                                                }
                                            }
                                        }
                                        break;
                                    }
                                }
                                else
                                {
#endif
                                    servingConnection->setConnected();

                                    addr_t id = nextObjectId();
                                    addr_t bcId = nextObjectId();
                                    addr_t parent_id = 0U;

                                    // This is new to allow for virtual connections - experimental atm.
                                    if (callerId == EMPTY_ADDR)
                                    {
                                        // the next part only gets used for the 'ping' messages, so not needed on v-conn
                                        if (nullptr != servingConnection)
                                        {
                                            servingConnection->setRootObjectAddr(id);
                                        }
                                    }
                                    else
                                    {
                                        parent_id = U16(callerId);
                                    }

                                    PropArray vaReq(callerData);

                                    (void)registerObject(id, parent_id, bcId, vaReq, servingConnection, false);

                                    PropArray va;
                                    Property v(ParameterProperties::OBJECT_ADDR);
                                    v.setData(id);
                                    va.set(v);

                                    message->ack(I8(E_OK), va);

                                    (void)group(id, parent_id);
                                }
#if FLAKE_AUTH_SUPPORT
                            }
#endif
                            break;

                        case MessageType::custom:
                            {
                                uint32_t timeout = FLAKE_DEFAULT_TIMEOUT_MS;
                                if (calledObject != nullptr && calleeId == calledObject->group)
                                {
                                  timeout = 0U;
                                }
                                indicate(MessageType::customMsgReceived,
                                         U16(callerId),
                                         U16(calleeId),
                                         &callerData,
                                         message, EMPTY_ADDR, timeout);
                            }
                            break;

                        case MessageType::destroyObject:
                            {
                                if (nullptr != calleeParent &&
                                    (UNSIGNED(calleeParent->node()->id) == callerId || callerId == calleeId))
                                {
                                    indicate(MessageType::objectDestroyed, U16(callerId), U16(calleeId),
                                           nullptr, message, EMPTY_ADDR, 0);

                                    unregisterObject(U16(calleeId));

                                    message->ack(I8(E_OK));
                                }
                                else
                                {
                                    message->ack(I8(E_FAILED));
                                }
                            }
                            break;

                        case MessageType::getProperties:
                            {
                                PropArray vaOut;
                                PropArray vaIndi;

                                uint32_t i = 0U;
                                uint16_t cErr = 0U;
                                int8_t res = I8(E_OK);

                                if (callerData.count() > 0U)
                                {
                                    for (; i < callerData.count(); i++)
                                    {
                                        Property v = callerData[i];

                                        if (IS_VOLATILE(v.tag()))
                                        {
                                            vaIndi.set(Property(v.tag()));
                                        }
                                        else if (callee->has(v.tag()))
                                        {
                                            vaOut.set(callee->get(v.tag()));
                                        }
                                        else
                                        {
                                            cErr++;
                                            res = I8(E_PARTIAL_SUCCESS);
                                            v.setError(I8(E_NOT_FOUND));
                                            vaOut.set(v);
                                        }
                                    }
                                }
                                else
                                {
                                    for (; i < callee->count(); i++)
                                    {
                                        if (IS_VOLATILE((*callee)[i].tag()))
                                        {
                                            vaIndi.set(Property((*callee)[i].tag()));
                                        }
                                        else
                                        {
                                            vaOut.set((*callee)[i]);
                                        }
                                    }
                                }

                                if (vaIndi.count() > 0U)
                                {
                                    indicate(MessageType::getPropertiesReq,
                                             0U,
                                             U16(calleeId),
                                             &vaIndi,
                                             message);
                                }
                                else
                                {
                                    if (cErr == vaOut.count() && cErr > 0U)
                                    {
                                        res = I8(E_FAILED);
                                    }
                                    message->ack(res, vaOut);
                                }
                            }
                            break;

                        case MessageType::openProperty:
                            {
                                PropArray propertyData= callerData;
                                uint16_t id = nextObjectId();
                                Property p(ParameterProperties::OBJECT_ADDR);
                                p.setData(id);
                                propertyData.set(p);
                                message->setData(p);
                               // (void)registerObject(id, U16(calleeParentId), EMPTY_ADDR, propertyData, servingConnection, false);
                                indicate(MessageType::openPropertyReq,
                                         U16(callerId),
                                         U16(calleeId),
                                         &propertyData,
                                         message);
                            }
                            break;


                        case MessageType::streamWrite:
                        case MessageType::streamRead: //NOLINT MISRA 5-4-4 fallthrough on purpose
                        case MessageType::streamSeek: //NOLINT MISRA 5-4-4 fallthrough on purpose
                            {
                                uint8_t msg = U8(0U);

                                if (UNSIGNED(message->type()) == MessageType::streamRead)
                                {
                                    msg = MessageType::streamReadReq;
                                }
                                else if (UNSIGNED(message->type()) == MessageType::streamWrite)
                                {
                                    if (calleeParentId == callerId)
                                    {
                                        msg = MessageType::changed;
                                    }
                                    else
                                    {
                                        msg = MessageType::streamWriteReq;
                                    }
                                }
                                else // (UNSIGNED(d->type()) == MessageType::streamSeek)
                                {
                                    msg = MessageType::streamSeekReq;
                                }

                                indicate(msg,
                                         U16(callerId),
                                         U16(calleeId),
                                         &callerData,
                                         message);
                            }
                            break;

                        case MessageType::ping:
                            if (calleeId == 0U)
                            {
                                message->ack(I8(E_OK));
                            }
                            else
                            {
                                indicate(MessageType::ping,
                                         U16(callerId),
                                         U16(calleeId));
                            }
                            break;

                        case MessageType::setProperties:
                            {
                                PropArray vaOut{};
                                PropArray vaIndi{};
                                PropArray vaIndiActionable{};

                                int8_t res = I8(E_OK);

                                unsigned i = 0U;
                                unsigned cErr = 0U;

                                for (; i < callerData.count(); i++)
                                {
                                    Property vNew (callerData[i].tag());
                                    // to adjust missing modifiers
                                    if (!(calleeParentId == message->source() || calleeId == message->source()))// && callerId != message->source())
                                    {
                                        if (!callee->has(vNew.tag()))
                                        {
                                           vNew.setError(I8(E_NOT_FOUND));
                                        }
                                        else
                                        {
                                           vNew = Property(callee->get(vNew.tag()).tag());
                                           vNew.setData(callerData[i].data());
                                        }
                                    }
                                    else
                                    {
                                        vNew.setData(callerData[i].data());
                                    }

                                    if (IS_READONLY(vNew.tag()) &&
                                        !(calleeParentId == message->source() || calleeId == message->source()))
                                        // latter is for bw compat.
                                    {
                                        Property vErr;
                                        vErr.setError(I8(E_READ_ONLY));
                                        vaOut.set(vErr);
                                        cErr++;
                                        res = I8(E_PARTIAL_SUCCESS);
                                        continue;
                                    }

                                    Property vOld = callee->get(vNew.tag());

                                //    if (vOld != vNew) ### bandwidth saving
                                    {
                                        if (calleeParentId != message->source() && calleeId != message->source())
                                            // dtp. latter is for bw compat
                                        {
                                            if (vOld.isErrorValue())
                                            {
                                                vaOut.set(vOld);
                                                cErr++;
                                            }
                                            else if (IS_RESERVED(vNew.tag()))
                                            {
                                                vOld.setError( I8(E_READ_ONLY));
                                                vaOut.set(vOld);
                                                cErr++;
                                            }
                                            else if (IS_ACTIONABLE(vNew.tag()))
                                            {
                                                vaIndiActionable.set(vNew);
                                            }
                                            else
                                            {
                                                callee->set(vNew);
                                                if (!IS_META(vNew.tag())) {
                                                    vaOut.set(vNew);
                                                    vaIndi.set(vNew);
                                                }
                                            }
                                        }
                                        else
                                        {
#if FLAKE_DEBUG_LOGGING
                                            if (logging::logLevel >= lvlRaw)
                                            {
                                                char* str = vNew.toString();
                                                logging::logf<lvlRaw>(" - property changed or new: %s\n", str);
                                                delete[] str;
                                            }
#endif

                                            callee->set(vNew);

                                            if (!IS_META(vNew.tag()))
                                            {
                                                vaOut.set(vNew);
                                                vaIndi.set(vNew);
                                            }
                                        }
                                    }
                                }

                                if (vaIndiActionable.count() > 0U)
                                {
                                    Message* pendingRequest = Message::create(*message);
                                    pendingRequest->clearData();

                                    for (unsigned int i = 0U; i < vaOut.count(); ++i)
                                    {
                                        pendingRequest->setData(vaOut.getAt(i));
                                    }

                                    indicate(MessageType::setPropertiesReq,
                                             U16(callerId),
                                             U16(calleeId),
                                             &vaIndiActionable,
                                             pendingRequest);

                                    pendingRequest->destroy();
                                }
                                else if (vaIndi.count() > 0U)
                                {
                                    indicate(MessageType::changed,
                                             U16(calleeId),
                                             callee->node()->group,
                                             &vaIndi,
                                             nullptr, U16(callerId), 0);

                                    message->ack(res, vaOut);
                                }
                                else
                                {
                                    if (cErr == callerData.count())
                                    {
                                        message->ack(I8(E_FAILED), vaOut);
                                    }
                                    else
                                    {
                                        message->ack(I8(E_NO_CHANGES), vaOut);
                                    }
                                }
                            }
                            break;
#if FLAKE_AUTH_SUPPORT
                        case MessageType::auth:
                            {
                                if (calleeId == 0U)
                                {
                                    if (m_authSink == nullptr)
                                    {
                                        message->ack(I8(E_FAILED));
                                        break;
                                    }

                                    if (callerData.has(ParameterProperties::AUTH_TYPE))
                                    {
                                        Property pAuthType = callerData.get(ParameterProperties::AUTH_TYPE);
                                        PropArray pAuthResp;

                                        flakeAuthType auth_type = flakeAuthType::atSignature;

                                        if (*pAuthType.value<tt_uint8_t>() == 2U)
                                        {
                                            auth_type = flakeAuthType::atInteractive;
                                        }

                                        switch (auth_type)
                                        {
                                        case flakeAuthType::atSignature:
                                            {
                                                Property pSignAlgo = callerData.get(ParameterProperties::SIGN_ALGO);

                                                if (pSignAlgo.isErrorValue())
                                                {
                                                    pSignAlgo.setError(I8(E_INCOMPLETE));
                                                    pAuthResp.set(pSignAlgo);
                                                    message->ack(I8(E_FAILED), pAuthResp);
                                                    break;
                                                }

                                                Property pAuthHash = callerData.get(ParameterProperties::SIGN_HASH);

                                                if (pAuthHash.isErrorValue())
                                                {
                                                    pAuthHash.setError( I8(E_INCOMPLETE));
                                                    pAuthResp.set(pAuthHash);
                                                    message->ack(I8(E_FAILED), pAuthResp);
                                                    break;
                                                }

                                                uint8_t* signature{};
                                                int signature_len{};
                                                tt_bin_t auth_hash = *pAuthHash.value<tt_bin_t>();
                                                auto sign_algo = pSignAlgo.value<tt_str_t>()->c_str();

                                                m_authSink->onAuthChallengeReceived(
                                                    sign_algo, auth_hash.lpb, SIGNED(auth_hash.cb),
                                                    &signature, &signature_len);

                                                if (signature_len != 0 && signature != nullptr)
                                                {
                                                    tt_bin_t auth_sign;
                                                    auth_sign.cb = U16(signature_len);
                                                    //TODO: Dynamic Memory
                                                    auth_sign.lpb = (uint8_t*)malloc(U16(signature_len));
                                                    utils::memcpy(auth_sign.lpb, signature, UNSIGNED(signature_len));
                                                    pAuthResp.set<ParameterProperties::SIGNATURE>(auth_sign);
                                                    free(signature);
                                                    message->ack(I8(E_OK), pAuthResp);
                                                }
                                                else
                                                {
                                                    message->ack(I8(E_FAILED));
                                                }
                                            }
                                            break;
                                        default:
                                            {
                                                pAuthType.setError( I8(E_UNSUPPORTED));
                                                pAuthResp.set(pAuthType);
                                                message->ack(I8(E_FAILED), pAuthResp);
                                            }
                                            break;
                                        }
                                    }
                                    else
                                    {
                                        message->ack(I8(E_INCOMPLETE));
                                    }
                                }
                                else
                                {
                                    message->ack(I8(E_UNSUPPORTED));
                                }
                            }
                            break;
#endif
                        case MessageType::queryObjects:
                            {
                                Property request_uuid = callerData.get(ParameterProperties::OBJECT_TYPE);
                                //TODO: Dynamic Memory
                                ValueTable* tbl = new ValueTable();

                                auto nl = m_registry.all();

                                for (auto i = nl->begin(); i != nl->end(); ++i)
                                {
                                    Property v = (*i)->ref->get(ParameterProperties::OBJECT_TYPE);

                                    if (!v.isErrorValue())
                                    {
                                        if ((*i)->ref->requiresAuth() && servingConnection->isAuthenticated() == 0)
                                        {
                                            continue;
                                        }

                                        if (request_uuid.isErrorValue() || v == request_uuid)
                                        {
                                            //TODO: Dynamic Memory
                                            auto va = new PropArray();

                                            Property vAddr(ParameterProperties::OBJECT_ADDR);
                                            vAddr.setData( (*i)->id);
                                            va->set(vAddr);

                                            if (request_uuid.isErrorValue())
                                            {
                                                va->set(v);
                                            }

                                            tbl->m_rows.push_back(va);
                                        }
                                    }
                                    (*i)->ref->unref();
                                }

                                tt_bin_t bin;
                                (void)tbl->serialize(&bin.cb, &bin.lpb);

                                Property v(ParameterProperties::OBJECT_TABLE);
                                v.setData(bin);

                                //TODO: Dynamic Memory
                                delete tbl;

                                PropArray vaOut;
                                vaOut.set(v);

                                message->ack(I8(E_OK), vaOut);

                                //TODO: Dynamic Memory
                                delete nl;

                                if (bin.cb != 0U && bin.lpb != nullptr)
                                {
                                    free(bin.lpb);
                                }
                            }
                            break;

                        case MessageType::joinGroup:
                            {
                                if (m_registry.isInGroup(U16(callerId), U16(calleeId)))
                                {
                                    message->ack(I8(E_OK));
                                    break;
                                }

                                uint16_t group_id = group(U16(callerId), U16(calleeId));

                                if (group_id != 0U)
                                {
                                    PropArray pa;
                                    Property p(ParameterProperties::BROADCAST_ADDR);
                                    p.setData(group_id);
                                    pa.set(p);
                                    message->ack(I8(E_OK), pa);
                                }
                                else
                                {
                                    message->ack(I8(E_NOT_FOUND));
                                }
                            }
                            break;

                        case MessageType::leaveGroup:
                            {
                                ungroup(U16(callerId), U16(calleeId));
                                message->ack(I8(E_OK));
                            }
                            break;

                        case MessageType::createObject:
                            {
                                if (m_objectIds.size() >= U16(UINT16_MAX - 2))
                                {
                                    message->ack(I8(E_NO_ALLOC));
                                }
                                else
                                {
                                    addr_t id = nextObjectId();
                                    addr_t broadcastId = nextObjectId();
                                    PropArray objectData = callerData;

                                    bool requires_auth = false;

                                    Property req_auth = callerData.get(ParameterProperties::REQUIRES_AUTH);

                                    if (!req_auth.isErrorValue())
                                    {
                                        requires_auth = *req_auth.value<tt_bool_t>();
                                    }

                                    (void)registerObject(id, U16(callerId), broadcastId, callerData,
                                                         servingConnection, requires_auth);

                                    ObjectRegistry::Node* newObject = m_registry.findFirst(id);

                                    if (nullptr != newObject)
                                    {
                                        for (unsigned int i = 0U; i < callerData.count(); i++)
                                        {
                                            if (callerData[i].isErrorValue())
                                            {
                                                continue;
                                            }
                                            newObject->ref->set(callerData[i]);
                                        }
                                        newObject->ref->unref();
                                    }

                                    Property v1 (ParameterProperties::BROADCAST_ADDR);
                                    v1.setData(broadcastId);
                                    objectData.set(v1);

                                    Property v2(ParameterProperties::OBJECT_ADDR);
                                    v2.setData( id);
                                    objectData.set(v2);

                                    message->ack(I8(E_OK), objectData);
                                }
                            }
                            break;

                        case MessageType::disconnect:
                            {
                                message->ack(I8(E_OK));
                                if (servingConnection->wire()->type() == WireType::wtDatagram)
                                {
                                    servingConnection->shutdown();
                                }
                                else
                                {
                                    unregisterObject(U16(callerId));
                                }
                            }
                            break;

                        default:
                            message->ack(I8(E_UNSUPPORTED));
                            break;
                        }
                    }
                    else
                    {
                        // MISRA 6-4-2: switch preferred over else-if for known cases
                    }
                }

                if (nullptr != callee)
                {
                    callee->unref();
                }

                if (nullptr != caller)
                {
                    caller->unref();
                }
            }

            message->destroy();
        }
    }

    void Router::checkConnections()
    {
        (void)flakeMutexAcquire(m_connMutex);

        auto i = m_connections.begin();

        for (; i != m_connections.end(); ++i)
        {
            ServerConnection* conn = *i;

            if (m_doShutdown)
            {
                m_closedConnections.push_back(conn);
            }
            else if (conn->stale())
            {
                if (conn->pingCount() < 3U)
                {
                    conn->ping();
                }
                else
                {
                    m_closedConnections.push_back(conn);
                }
            }
            else if (!conn->connected())
            {
                conn->increaseConnectionWaitCounter();
                if (conn->connectionWaitCounter() > 3)
                {
                    m_closedConnections.push_back(conn);
                }
            }
            else
            {
                conn->resetPingCount();
            }
        }

        flakeMutexRelease(m_connMutex);

        for (i = m_closedConnections.begin(); i != m_closedConnections.end(); ++i)
        {
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlInfo>("Closing stale connection id%04x\n", (*i)->id());
#endif
            (*i)->shutdown();
        }

        m_closedConnections.clear();

        if (m_doShutdown && m_connections.empty())
        {
            m_isShutdown = true;
            flakeSemaphoreRelease(m_runCond);
        }
    }


    /*
      MISRA-C:2012 Rule 5-2-8 deviation justification:
      - POSIX thread entry functions require the signature `void*(*)(void*)`.
      - The cast from `void*` to `Router*` is verified and safe,
        as we pass exactly the same pointer object that was originally allocated.
    */
    void Router::sendThread(void* arg)
    {
        auto r = static_cast<Router*>(arg);  //NOLINT, Rule 5-2-8 - seea above

        while (!r->m_doShutdown)
        {
            Message* message = nullptr;

            (void)r->m_msgOutQueue.wait_and_pop(&message);

            if (message != nullptr)
            {
                Transport* t = message->transport();
                if (nullptr != t)
                {
                    (void)t->reply(message);
                }
            }
        }

        flakeThreadExit();
    }

    void Router::shutdown()
    {

#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlInfo>("Shutting down.\n");
#endif
            m_doShutdown = true;
            while (!finishedShutdown())
            {
                flakeUSleep(100U * 1000U);
            }
            (void)m_msgInQueue.push(nullptr);
            (void)m_msgOutQueue.push(nullptr);


        for (unsigned i = 0U; i < m_transportThreads.size(); i++)
        {
            flakeThreadTerminate(m_transportThreads[i]);
        }

        for (unsigned i = 0U; i < m_serverWires.size(); i++)
            {
                delete m_serverWires[i];
            }

    }

    void Router::addServerWire(ServerWire* w)
    {
        //TODO: Dynamic Memory
        auto srvw = new struct ServerWireRecord;

        srvw->wire = w;
        srvw->router = this;

        m_serverWires.push_back(w);

        flakeThreadAttr_t attr = {};
        attr.name = ROUTER_SERVER_WIRE_THREAD_NAME;
        attr.priority = flakeThreadPriorityBelowNormal;
        attr.stack_size = U32(w->stackSizeBytes());

        flakeThreadId_t thread{};
        flakeThreadNew(&thread, &Router::acceptThread, srvw, &attr);
        m_transportThreads.push_back(thread);
    }



    /*
     MISRA-C:2012 Rule 5-2-8 deviation justification:
     - POSIX thread entry functions require the signature `void*(*)(void*)`.
     - The cast from `void*` to `Router*` is verified and safe,
       as we pass exactly the same pointer object that was originally allocated.
   */
    __attribute__((noreturn))
    void Router::acceptThread(void* p)
    {
        auto swr = static_cast<struct ServerWireRecord*>(p); //NOLINT, Rule 5-2-8 - seea above

#if FLAKE_DEBUG_LOGGING
        logging::logf<lvlError>("Initializing transport %s\n", swr->wire->describe());
#endif
        while (true)
        {
            if (swr->wire->available())
            {
                if (swr->wire->init() != E_OK)
                {
#if FLAKE_DEBUG_LOGGING
                    logging::logf<lvlError>("Initializing %s failed: %d \n", swr->wire->describe(), errno);
#endif
                    flakeUSleep(5U * MICROSEC_PER_SEC);
                    continue;
                }
            }
            else
            {
                flakeUSleep(1U * MICROSEC_PER_SEC);
                continue;
            }
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlError>("Transport %s is available.\n", swr->wire->describe());
#endif
            while (swr->wire->available())
            {
                if (swr->router->acceptsMoreConnections())
                    {
                        Wire* clientWire = swr->wire->accept();

                        if (clientWire != nullptr)
                            {
                                (void)swr->router->addClient(clientWire);
                            }
                        else
                            {
#if FLAKE_DEBUG_LOGGING
                                logging::logf<lvlError>("accept() on %s failed: %d \n",swr->wire->describe(), errno);
#endif
                            }
                    }
                    else
                    {
                       flakeUSleep(1U * MICROSEC_PER_SEC);
                    }
            }
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlError>("Transport %s is unavailable.\n", swr->wire->describe());
#endif
        }
    }

    /*
     MISRA-C:2012 Rule 5-2-8 deviation justification:
     - POSIX thread entry functions require the signature `void*(*)(void*)`.
     - The cast from `void*` to `Router*` is verified and safe,
       as we pass exactly the same pointer object that was originally allocated.
   */
    void Router::maintenanceThread(void* p)
    {
        auto r = static_cast<Router*>(p); //NOLINT, Rule 5-2-8 - seea above

        while (!r->finishedShutdown())
        {
            r->doHousekeeping();

            flakeUSleep(1U * MICROSEC_PER_SEC);
        }

        flakeThreadExit();
    }

    /*
      MISRA-C:2012 Rule 5-2-8 deviation justification:
      - POSIX thread entry functions require the signature `void*(*)(void*)`.
      - The cast from `void*` to `Router*` is verified and safe,
        as we pass exactly the same pointer object that was originally allocated.
    */
    void Router::receiveThread(void* p)
    {
        auto r = static_cast<Router*>(p);  //NOLINT, Rule 5-2-8 - seea above

        r->m_isRunning = true;
        while (!r->finishedShutdown())
        {
            r->receive();
        }
        flakeThreadExit();
    }

    void Router::run()
    {
        if (!m_isRunning)
        {

            flakeThreadAttr_t attr_process = {};
            attr_process.name = ROUTER_PROCESS_THREAD_NAME;
            attr_process.stack_size = 8192U;
            attr_process.priority = flakeThreadPriorityNormal;

            flakeThreadAttr_t attr_maint = {};
            attr_maint.name = ROUTER_MAINTENANCE_THREAD_NAME;
            attr_maint.stack_size = 2048U;
            attr_maint.priority = flakeThreadPriorityNormal;

            flakeThreadId_t thread{};
            flakeThreadNew(&thread, &receiveThread, this, &attr_process);
            flakeThreadNew(&thread, &maintenanceThread, this, &attr_maint);
#if FLAKE_DEBUG_LOGGING
            logging::logf<lvlInfo>("Dispatcher running.\n");
#endif

        }
    }
}