Mercurial > cgi-bin > hgwebdir.cgi > VMS > C_Libraries > BestEffortMessaging
comparison LossyCom.c @ 4:7ba5a3a6102d
System with multiple point 2 point trigger working
| author | Merten Sach <msach@mailbox.tu-berlin.de> |
|---|---|
| date | Tue, 13 Mar 2012 18:22:12 +0100 |
| parents | 5c0fb7c519d7 |
| children | 95a03e431480 |
comparison
equal
deleted
inserted
replaced
| 3:9c4abe9486e0 | 4:78b75071ac2a |
|---|---|
| 1 | 1 |
| 2 /* | 2 /* |
| 3 * For a detailed description see header file. | 3 * For a detailed description see header file. |
| 4 */ | 4 */ |
| 5 | 5 |
| 6 #include <string.h> | |
| 7 | |
| 6 #include "LossyCom.h" | 8 #include "LossyCom.h" |
| 7 | |
| 8 #include "VMS_Implementations/VMS_impl/vmalloc.h" | 9 #include "VMS_Implementations/VMS_impl/vmalloc.h" |
| 9 | 10 |
| 10 /* | 11 /* |
| 11 * Initializes the central exchange structure. | 12 * Initializes the central exchange structure. |
| 12 * Allocates memory to fit the number of endpoints. | 13 * Allocates memory to fit the number of endpoints. |
| 18 | 19 |
| 19 exchange = VMS_WL__malloc(sizeof(lossyCom__exchange_t)); | 20 exchange = VMS_WL__malloc(sizeof(lossyCom__exchange_t)); |
| 20 if(exchange == NULL) | 21 if(exchange == NULL) |
| 21 return NULL; | 22 return NULL; |
| 22 | 23 |
| 23 exchange->triggerCounter = 0; | 24 exchange->BroadcastTriggerCounter = 0; |
| 24 exchange->numEndpoints = numEndpoints; | 25 exchange->numEndpoints = numEndpoints; |
| 25 exchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints); | 26 exchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints); |
| 26 if(exchange->outboxArray == NULL){ | 27 if(exchange->outboxArray == NULL){ |
| 27 VMS_WL__free(exchange); | 28 VMS_WL__free(exchange); |
| 28 return NULL; | 29 return NULL; |
| 29 } | 30 } |
| 30 | 31 |
| 32 exchange->p2pTriggerCounter = VMS_WL__malloc(sizeof(uint16_t)*numEndpoints); | |
| 33 if(exchange->p2pTriggerCounter == NULL){ | |
| 34 VMS_WL__free(exchange->outboxArray); | |
| 35 VMS_WL__free(exchange); | |
| 36 return NULL; | |
| 37 } | |
| 38 | |
| 39 //reset all point 2 point trigger counter | |
| 40 memset((void*)exchange->p2pTriggerCounter, 0, sizeof(uint16_t)*numEndpoints); | |
| 41 | |
| 31 return exchange; | 42 return exchange; |
| 32 } | 43 } |
| 33 | 44 |
| 34 /* | 45 /* |
| 35 * Connects the local endpoint to the central exchange structure. | 46 * Connects the local endpoint to the central exchange structure. |
| 41 lossyCom__exchange_t* centralExchange, | 52 lossyCom__exchange_t* centralExchange, |
| 42 lossyCom__endpointID_t endpointID, | 53 lossyCom__endpointID_t endpointID, |
| 43 lossyCom__msgHandler msgHandler, | 54 lossyCom__msgHandler msgHandler, |
| 44 void* msgHandlerData) | 55 void* msgHandlerData) |
| 45 { | 56 { |
| 46 localEndpoint->localTriggerCopy = 0; | 57 localEndpoint->lastReceivedBroadcastTrigger = 0; |
| 47 localEndpoint->endpointID = endpointID; | 58 localEndpoint->endpointID = endpointID; |
| 48 localEndpoint->centralExchange = centralExchange; | 59 localEndpoint->centralExchange = centralExchange; |
| 49 localEndpoint->msgHandler = msgHandler; | 60 localEndpoint->msgHandler = msgHandler; |
| 50 localEndpoint->msgHandlerData = msgHandlerData; | 61 localEndpoint->msgHandlerData = msgHandlerData; |
| 51 } | 62 } |
| 52 | 63 |
| 64 inline lossyCom__msg_t prepareMsg(uint16_t triggerValue, | |
| 65 lossyCom__endpointID_t receiverEndpointID, | |
| 66 lossyCom__msgBody_t msg) | |
| 67 { | |
| 68 lossyCom__msg_t msgDraft; | |
| 69 | |
| 70 msgDraft = (0 | msg); | |
| 71 msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT); | |
| 72 msgDraft |= ((lossyCom__msg_t)triggerValue << TRIGGER_SHIFT); | |
| 73 | |
| 74 return msgDraft; | |
| 75 } | |
| 76 | |
| 53 /* | 77 /* |
| 54 * This broadcasts a message to all connected receivers | 78 * This broadcasts a message to all connected receivers |
| 55 */ | 79 */ |
| 56 void inline lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint, | 80 void lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint, |
| 57 lossyCom__msgBody_t msg) | 81 lossyCom__msgBody_t msgBody) |
| 58 { | 82 { |
| 59 lossyCom__sendMsg(localEndpoint, | 83 lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange; |
| 60 BROADCAST_ID, | 84 uint16_t increasedTrigger; |
| 61 msg); | 85 lossyCom__msg_t msg; |
| 86 | |
| 87 increasedTrigger = centralExchange->BroadcastTriggerCounter +1; | |
| 88 | |
| 89 //build message | |
| 90 msg = prepareMsg(increasedTrigger, BROADCAST_ID, msgBody); | |
| 91 | |
| 92 //write msg to central exchange | |
| 93 centralExchange->outboxArray[localEndpoint->endpointID] = msg; | |
| 94 | |
| 95 //update broadcast trigger counter | |
| 96 centralExchange->BroadcastTriggerCounter = increasedTrigger; | |
| 62 } | 97 } |
| 63 | 98 |
| 64 /* | 99 /* |
| 65 * This sends a message another endpoint. Again it is not guaranteed that the | 100 * This sends a message another endpoint. Again it is not guaranteed that the |
| 66 * message is received. But in most cases it will. | 101 * message is received. But in most cases it will. |
| 67 */ | 102 */ |
| 68 void inline lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint, | 103 void lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint, |
| 69 lossyCom__endpointID_t receiverEndpointID, | 104 lossyCom__endpointID_t receiverEndpointID, |
| 70 lossyCom__msgBody_t msg) | 105 lossyCom__msgBody_t msgBody) |
| 71 { | 106 { |
| 72 lossyCom__msg_t msgDraft; | 107 lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange; |
| 73 uint16_t triggerCopy; | 108 lossyCom__msg_t msg; |
| 74 | 109 uint16_t increasedTrigger; |
| 75 //build message | 110 |
| 76 msgDraft = (0 | msg); | 111 increasedTrigger = |
| 77 msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT); | 112 centralExchange->p2pTriggerCounter[receiverEndpointID] +1; |
| 78 | 113 |
| 79 triggerCopy = localEndpoint->centralExchange->triggerCounter +1; | 114 //build message |
| 80 msgDraft |= ((lossyCom__msg_t)triggerCopy << TRIGGER_SHIFT); | 115 msg = prepareMsg(increasedTrigger, receiverEndpointID, msgBody); |
| 81 | 116 |
| 82 //write msg to central exchange | 117 //write msg to central exchange |
| 83 localEndpoint->centralExchange->outboxArray[localEndpoint->endpointID] = | 118 centralExchange->outboxArray[localEndpoint->endpointID] = msg; |
| 84 msgDraft; | 119 |
| 85 | 120 //write back increased trigger counter |
| 86 localEndpoint->centralExchange->triggerCounter = triggerCopy; | 121 centralExchange->p2pTriggerCounter[receiverEndpointID] = increasedTrigger; |
| 87 //printf("send updated trigger to %d\n", triggerCopy); | 122 } |
| 88 } | 123 |
| 89 | 124 int inline isUnreceivedMsg(uint16_t msgTrigger, |
| 90 void inline lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint) | 125 uint16_t lastReceivedTrigger, |
| 91 { | 126 uint16_t triggerSnapshot) |
| 92 uint16_t currentTriggerCopy; | 127 { |
| 128 if(msgTrigger > lastReceivedTrigger || | |
| 129 msgTrigger <= triggerSnapshot) | |
| 130 { | |
| 131 // check if the message is new (msg trigger > archived trigger) | |
| 132 // and already valid (msgTrigger <= currentTriggerCopy) | |
| 133 if((msgTrigger > lastReceivedTrigger && | |
| 134 msgTrigger <= triggerSnapshot) || | |
| 135 ((int64_t) triggerSnapshot- // check for triggerCounterOverflow | |
| 136 (int64_t)lastReceivedTrigger < -MAX_TRIGGER/2)) | |
| 137 { | |
| 138 return TRUE; | |
| 139 } | |
| 140 } | |
| 141 return FALSE; | |
| 142 } | |
| 143 | |
| 144 void lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint) | |
| 145 { | |
| 146 uint16_t broadcastTriggerSnapshot, p2pTriggerSnapshot; | |
| 147 lossyCom__exchange_t* centralExchange; | |
| 93 lossyCom__endpointID_t senderEndpointID; | 148 lossyCom__endpointID_t senderEndpointID; |
| 94 lossyCom__endpointID_t receiverID; | 149 lossyCom__endpointID_t receiverEndpointID; |
| 95 lossyCom__msg_t msgCopy; | 150 lossyCom__msg_t msgCopy; |
| 96 uint16_t msgTrigger; | 151 uint16_t msgTrigger; |
| 97 lossyCom__msgBody_t msgBody; | 152 lossyCom__msgBody_t msgBody; |
| 98 | 153 |
| 99 senderEndpointID = 0; | 154 centralExchange = localEndpoint->centralExchange; |
| 100 currentTriggerCopy = localEndpoint->centralExchange->triggerCounter; | 155 //save trigger counter to know find valid messages |
| 156 broadcastTriggerSnapshot = centralExchange->BroadcastTriggerCounter; | |
| 157 p2pTriggerSnapshot = | |
| 158 centralExchange->p2pTriggerCounter[localEndpoint->endpointID]; | |
| 101 | 159 |
| 102 //new message arrived if trigger counter is higher than the last time read | 160 //new message arrived if trigger counter is higher than the last time read |
| 103 if(currentTriggerCopy > localEndpoint->localTriggerCopy) | 161 if( broadcastTriggerSnapshot > localEndpoint->lastReceivedBroadcastTrigger || |
| 162 p2pTriggerSnapshot > localEndpoint->lastReceivedp2pTrigger) | |
| 104 { | 163 { |
| 105 while(senderEndpointID < localEndpoint->centralExchange->numEndpoints) | 164 senderEndpointID = 0; |
| 165 //search outboxes for new messages | |
| 166 while(senderEndpointID < centralExchange->numEndpoints) | |
| 106 { | 167 { |
| 168 //ignore own outbox | |
| 107 if(senderEndpointID != localEndpoint->endpointID) | 169 if(senderEndpointID != localEndpoint->endpointID) |
| 108 { | 170 { |
| 109 msgCopy = localEndpoint->centralExchange->outboxArray[senderEndpointID]; | 171 msgCopy = centralExchange->outboxArray[senderEndpointID]; |
| 110 msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT); | 172 msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT); |
| 173 receiverEndpointID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT); | |
| 111 | 174 |
| 112 | 175 if(receiverEndpointID == BROADCAST_ID){//receive broadcast message |
| 113 if(msgTrigger > localEndpoint->localTriggerCopy || | 176 if(isUnreceivedMsg(msgTrigger, |
| 114 msgTrigger <= currentTriggerCopy) | 177 localEndpoint->lastReceivedBroadcastTrigger, |
| 115 { | 178 broadcastTriggerSnapshot)) |
| 116 // check if the message is new (msg trigger > archived trigger) | |
| 117 // and already valid (msgTrigger <= currentTriggerCopy) | |
| 118 if((msgTrigger > localEndpoint->localTriggerCopy && | |
| 119 msgTrigger <= currentTriggerCopy) || | |
| 120 ((int64_t)currentTriggerCopy- // check for triggerCounterOverflow | |
| 121 (int64_t)localEndpoint->localTriggerCopy < -MAX_TRIGGER/2)) | |
| 122 { | 179 { |
| 123 //let the message handler parse the message | 180 //let the message handler parse the message |
| 124 msgBody = 0xFFFFFFFF & msgCopy; | 181 msgBody = 0xFFFFFFFF & msgCopy; |
| 125 receiverID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT); | |
| 126 //only receive broadcast and p2p for own receiverID | 182 //only receive broadcast and p2p for own receiverID |
| 127 if(receiverID == BROADCAST_ID || | 183 |
| 128 receiverID == localEndpoint->endpointID) | 184 (*(localEndpoint->msgHandler))(senderEndpointID, |
| 129 { | 185 msgBody, |
| 130 (*(localEndpoint->msgHandler))(senderEndpointID, | 186 localEndpoint->msgHandlerData); |
| 131 msgBody, | 187 } |
| 132 localEndpoint->msgHandlerData); | 188 }else{//point 2 point message |
| 133 } | 189 if(receiverEndpointID == localEndpoint->endpointID && |
| 134 } | 190 isUnreceivedMsg(msgTrigger, |
| 191 localEndpoint->lastReceivedp2pTrigger, | |
| 192 p2pTriggerSnapshot)) | |
| 193 { | |
| 194 //let the message handler parse the message | |
| 195 msgBody = 0xFFFFFFFF & msgCopy; | |
| 196 //only receive broadcast and p2p for own receiverID | |
| 197 | |
| 198 (*(localEndpoint->msgHandler))(senderEndpointID, | |
| 199 msgBody, | |
| 200 localEndpoint->msgHandlerData); | |
| 201 } | |
| 135 } | 202 } |
| 136 } | 203 } |
| 137 senderEndpointID++; | 204 senderEndpointID++; |
| 138 } | 205 }//search outbox loop |
| 139 } | 206 } |
| 140 //save last TriggerCounter of last parsed Msg | 207 //save last TriggerCounter of last parsed Msg |
| 141 localEndpoint->localTriggerCopy = currentTriggerCopy; | 208 localEndpoint->lastReceivedBroadcastTrigger = broadcastTriggerSnapshot; |
| 142 } | 209 localEndpoint->lastReceivedp2pTrigger = p2pTriggerSnapshot; |
| 210 } |
