msach@0: /* msach@0: * For a detailed description see header file. msach@0: */ msach@0: msach@4: #include msach@4: msach@0: #include "LossyCom.h" msach@0: #include "VMS_Implementations/VMS_impl/vmalloc.h" msach@0: msach@0: /* msach@5: * Initializes the central exchange structure and sets all trigger counter to 0 msach@0: * Allocates memory to fit the number of endpoints. msach@0: * Returns NULL if an error occurs. msach@0: */ msach@0: lossyCom__exchange_t* lossyCom__initialize(uint16_t numEndpoints) msach@0: { msach@5: lossyCom__exchange_t* centralExchange; msach@0: msach@5: centralExchange = VMS_WL__malloc(sizeof(lossyCom__exchange_t)); msach@5: if(centralExchange == NULL) msach@0: return NULL; msach@0: msach@5: centralExchange->broadcastTriggerCounter = 0; msach@5: centralExchange->numEndpoints = numEndpoints; msach@5: centralExchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints); msach@5: if(centralExchange->outboxArray == NULL){ msach@5: VMS_WL__free(centralExchange); msach@0: return NULL; msach@0: } msach@0: msach@5: centralExchange->p2pTriggerCounters = VMS_WL__malloc(sizeof(uint16_t)*numEndpoints); msach@5: if(centralExchange->p2pTriggerCounters == NULL){ msach@5: VMS_WL__free(centralExchange->outboxArray); msach@5: VMS_WL__free(centralExchange); msach@4: return NULL; msach@4: } msach@4: msach@4: //reset all point 2 point trigger counter msach@5: memset((void*)centralExchange->p2pTriggerCounters, 0, sizeof(uint16_t)*numEndpoints); msach@4: msach@5: return centralExchange; msach@0: } msach@0: msach@1: /* msach@1: * Connects the local endpoint to the central exchange structure. msach@5: * This registers a message handler that handles all incoming messages. msach@1: * Also an endpointID is set this ID has to be between 0 and total number of msach@5: * endpoints-1 and unique. msach@1: */ msach@0: void lossyCom__initialize_endpoint(lossyCom__endpoint_t* localEndpoint, msach@0: lossyCom__exchange_t* centralExchange, msach@0: lossyCom__endpointID_t endpointID, msach@1: lossyCom__msgHandler msgHandler, msach@1: void* msgHandlerData) msach@0: { msach@4: localEndpoint->lastReceivedBroadcastTrigger = 0; msach@0: localEndpoint->endpointID = endpointID; msach@1: localEndpoint->centralExchange = centralExchange; msach@1: localEndpoint->msgHandler = msgHandler; msach@1: localEndpoint->msgHandlerData = msgHandlerData; msach@0: } msach@0: msach@5: /* msach@5: * Prepare a lossyCom__msg_t in the correct format that contains the trigger msach@5: * value, the receiver endpoint ID and the message body msach@5: */ msach@4: inline lossyCom__msg_t prepareMsg(uint16_t triggerValue, msach@4: lossyCom__endpointID_t receiverEndpointID, msach@5: lossyCom__msgBody_t msgBody) msach@4: { msach@4: lossyCom__msg_t msgDraft; msach@4: msach@5: msgDraft = (0 | msgBody); msach@4: msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT); msach@4: msgDraft |= ((lossyCom__msg_t)triggerValue << TRIGGER_SHIFT); msach@4: msach@4: return msgDraft; msach@4: } msach@4: msach@1: /* msach@1: * This broadcasts a message to all connected receivers msach@1: */ msach@4: void lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint, msach@4: lossyCom__msgBody_t msgBody) msach@0: { msach@4: lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange; msach@4: uint16_t increasedTrigger; msach@4: lossyCom__msg_t msg; msach@4: msach@5: increasedTrigger = centralExchange->broadcastTriggerCounter +1; msach@4: msach@4: //build message msach@4: msg = prepareMsg(increasedTrigger, BROADCAST_ID, msgBody); msach@4: msach@4: //write msg to central exchange msach@4: centralExchange->outboxArray[localEndpoint->endpointID] = msg; msach@4: msach@4: //update broadcast trigger counter msach@5: centralExchange->broadcastTriggerCounter = increasedTrigger; msach@0: } msach@0: msach@1: /* msach@1: * This sends a message another endpoint. Again it is not guaranteed that the msach@1: * message is received. But in most cases it will. msach@1: */ msach@4: void lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint, msach@0: lossyCom__endpointID_t receiverEndpointID, msach@4: lossyCom__msgBody_t msgBody) msach@0: { msach@4: lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange; msach@4: lossyCom__msg_t msg; msach@4: uint16_t increasedTrigger; msach@0: msach@4: increasedTrigger = msach@5: centralExchange->p2pTriggerCounters[receiverEndpointID] +1; msach@0: msach@4: //build message msach@4: msg = prepareMsg(increasedTrigger, receiverEndpointID, msgBody); msach@0: msach@0: //write msg to central exchange msach@4: centralExchange->outboxArray[localEndpoint->endpointID] = msg; msach@0: msach@4: //write back increased trigger counter msach@5: centralExchange->p2pTriggerCounters[receiverEndpointID] = increasedTrigger; msach@0: } msach@0: msach@4: int inline isUnreceivedMsg(uint16_t msgTrigger, msach@4: uint16_t lastReceivedTrigger, msach@4: uint16_t triggerSnapshot) msach@0: { msach@4: if(msgTrigger > lastReceivedTrigger || msach@4: msgTrigger <= triggerSnapshot) msach@4: { msach@4: // check if the message is new (msg trigger > archived trigger) msach@4: // and already valid (msgTrigger <= currentTriggerCopy) msach@4: if((msgTrigger > lastReceivedTrigger && msach@4: msgTrigger <= triggerSnapshot) || msach@4: ((int64_t) triggerSnapshot- // check for triggerCounterOverflow msach@4: (int64_t)lastReceivedTrigger < -MAX_TRIGGER/2)) msach@4: { msach@4: return TRUE; msach@4: } msach@4: } msach@4: return FALSE; msach@4: } msach@4: msach@4: void lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint) msach@4: { msach@4: uint16_t broadcastTriggerSnapshot, p2pTriggerSnapshot; msach@4: lossyCom__exchange_t* centralExchange; msach@0: lossyCom__endpointID_t senderEndpointID; msach@4: lossyCom__endpointID_t receiverEndpointID; msach@0: lossyCom__msg_t msgCopy; msach@0: uint16_t msgTrigger; msach@0: lossyCom__msgBody_t msgBody; msach@0: msach@4: centralExchange = localEndpoint->centralExchange; msach@4: //save trigger counter to know find valid messages msach@5: broadcastTriggerSnapshot = centralExchange->broadcastTriggerCounter; msach@4: p2pTriggerSnapshot = msach@5: centralExchange->p2pTriggerCounters[localEndpoint->endpointID]; msach@1: msach@0: //new message arrived if trigger counter is higher than the last time read msach@4: if( broadcastTriggerSnapshot > localEndpoint->lastReceivedBroadcastTrigger || msach@4: p2pTriggerSnapshot > localEndpoint->lastReceivedp2pTrigger) msach@0: { msach@4: senderEndpointID = 0; msach@4: //search outboxes for new messages msach@4: while(senderEndpointID < centralExchange->numEndpoints) msach@0: { msach@4: //ignore own outbox msach@2: if(senderEndpointID != localEndpoint->endpointID) msach@0: { msach@4: msgCopy = centralExchange->outboxArray[senderEndpointID]; msach@2: msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT); msach@4: receiverEndpointID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT); msach@3: msach@4: if(receiverEndpointID == BROADCAST_ID){//receive broadcast message msach@4: if(isUnreceivedMsg(msgTrigger, msach@4: localEndpoint->lastReceivedBroadcastTrigger, msach@4: broadcastTriggerSnapshot)) msach@2: { msach@3: //let the message handler parse the message msach@4: msgBody = 0xFFFFFFFF & msgCopy; msach@3: //only receive broadcast and p2p for own receiverID msach@4: msach@4: (*(localEndpoint->msgHandler))(senderEndpointID, msach@4: msgBody, msach@4: localEndpoint->msgHandlerData); msach@4: } msach@4: }else{//point 2 point message msach@4: if(receiverEndpointID == localEndpoint->endpointID && msach@4: isUnreceivedMsg(msgTrigger, msach@4: localEndpoint->lastReceivedp2pTrigger, msach@4: p2pTriggerSnapshot)) msach@4: { msach@4: //let the message handler parse the message msach@4: msgBody = 0xFFFFFFFF & msgCopy; msach@4: //only receive broadcast and p2p for own receiverID msach@4: msach@4: (*(localEndpoint->msgHandler))(senderEndpointID, msach@4: msgBody, msach@4: localEndpoint->msgHandlerData); msach@4: } msach@2: } msach@0: } msach@2: senderEndpointID++; msach@4: }//search outbox loop msach@0: } msach@2: //save last TriggerCounter of last parsed Msg msach@4: localEndpoint->lastReceivedBroadcastTrigger = broadcastTriggerSnapshot; msach@4: localEndpoint->lastReceivedp2pTrigger = p2pTriggerSnapshot; msach@0: }