comparison LossyCom.c @ 4:7ba5a3a6102d

System with multiple point 2 point trigger working
author Merten Sach <msach@mailbox.tu-berlin.de>
date Tue, 13 Mar 2012 18:22:12 +0100
parents 5c0fb7c519d7
children 95a03e431480
comparison
equal deleted inserted replaced
3:9c4abe9486e0 4:78b75071ac2a
1 1
2 /* 2 /*
3 * For a detailed description see header file. 3 * For a detailed description see header file.
4 */ 4 */
5 5
6 #include <string.h>
7
6 #include "LossyCom.h" 8 #include "LossyCom.h"
7
8 #include "VMS_Implementations/VMS_impl/vmalloc.h" 9 #include "VMS_Implementations/VMS_impl/vmalloc.h"
9 10
10 /* 11 /*
11 * Initializes the central exchange structure. 12 * Initializes the central exchange structure.
12 * Allocates memory to fit the number of endpoints. 13 * Allocates memory to fit the number of endpoints.
18 19
19 exchange = VMS_WL__malloc(sizeof(lossyCom__exchange_t)); 20 exchange = VMS_WL__malloc(sizeof(lossyCom__exchange_t));
20 if(exchange == NULL) 21 if(exchange == NULL)
21 return NULL; 22 return NULL;
22 23
23 exchange->triggerCounter = 0; 24 exchange->BroadcastTriggerCounter = 0;
24 exchange->numEndpoints = numEndpoints; 25 exchange->numEndpoints = numEndpoints;
25 exchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints); 26 exchange->outboxArray = VMS_WL__malloc(sizeof(lossyCom__msg_t)*numEndpoints);
26 if(exchange->outboxArray == NULL){ 27 if(exchange->outboxArray == NULL){
27 VMS_WL__free(exchange); 28 VMS_WL__free(exchange);
28 return NULL; 29 return NULL;
29 } 30 }
30 31
32 exchange->p2pTriggerCounter = VMS_WL__malloc(sizeof(uint16_t)*numEndpoints);
33 if(exchange->p2pTriggerCounter == NULL){
34 VMS_WL__free(exchange->outboxArray);
35 VMS_WL__free(exchange);
36 return NULL;
37 }
38
39 //reset all point 2 point trigger counter
40 memset((void*)exchange->p2pTriggerCounter, 0, sizeof(uint16_t)*numEndpoints);
41
31 return exchange; 42 return exchange;
32 } 43 }
33 44
34 /* 45 /*
35 * Connects the local endpoint to the central exchange structure. 46 * Connects the local endpoint to the central exchange structure.
41 lossyCom__exchange_t* centralExchange, 52 lossyCom__exchange_t* centralExchange,
42 lossyCom__endpointID_t endpointID, 53 lossyCom__endpointID_t endpointID,
43 lossyCom__msgHandler msgHandler, 54 lossyCom__msgHandler msgHandler,
44 void* msgHandlerData) 55 void* msgHandlerData)
45 { 56 {
46 localEndpoint->localTriggerCopy = 0; 57 localEndpoint->lastReceivedBroadcastTrigger = 0;
47 localEndpoint->endpointID = endpointID; 58 localEndpoint->endpointID = endpointID;
48 localEndpoint->centralExchange = centralExchange; 59 localEndpoint->centralExchange = centralExchange;
49 localEndpoint->msgHandler = msgHandler; 60 localEndpoint->msgHandler = msgHandler;
50 localEndpoint->msgHandlerData = msgHandlerData; 61 localEndpoint->msgHandlerData = msgHandlerData;
51 } 62 }
52 63
64 inline lossyCom__msg_t prepareMsg(uint16_t triggerValue,
65 lossyCom__endpointID_t receiverEndpointID,
66 lossyCom__msgBody_t msg)
67 {
68 lossyCom__msg_t msgDraft;
69
70 msgDraft = (0 | msg);
71 msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT);
72 msgDraft |= ((lossyCom__msg_t)triggerValue << TRIGGER_SHIFT);
73
74 return msgDraft;
75 }
76
53 /* 77 /*
54 * This broadcasts a message to all connected receivers 78 * This broadcasts a message to all connected receivers
55 */ 79 */
56 void inline lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint, 80 void lossyCom__broadcastMsg(lossyCom__endpoint_t* localEndpoint,
57 lossyCom__msgBody_t msg) 81 lossyCom__msgBody_t msgBody)
58 { 82 {
59 lossyCom__sendMsg(localEndpoint, 83 lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange;
60 BROADCAST_ID, 84 uint16_t increasedTrigger;
61 msg); 85 lossyCom__msg_t msg;
86
87 increasedTrigger = centralExchange->BroadcastTriggerCounter +1;
88
89 //build message
90 msg = prepareMsg(increasedTrigger, BROADCAST_ID, msgBody);
91
92 //write msg to central exchange
93 centralExchange->outboxArray[localEndpoint->endpointID] = msg;
94
95 //update broadcast trigger counter
96 centralExchange->BroadcastTriggerCounter = increasedTrigger;
62 } 97 }
63 98
64 /* 99 /*
65 * This sends a message another endpoint. Again it is not guaranteed that the 100 * This sends a message another endpoint. Again it is not guaranteed that the
66 * message is received. But in most cases it will. 101 * message is received. But in most cases it will.
67 */ 102 */
68 void inline lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint, 103 void lossyCom__sendMsg(lossyCom__endpoint_t* localEndpoint,
69 lossyCom__endpointID_t receiverEndpointID, 104 lossyCom__endpointID_t receiverEndpointID,
70 lossyCom__msgBody_t msg) 105 lossyCom__msgBody_t msgBody)
71 { 106 {
72 lossyCom__msg_t msgDraft; 107 lossyCom__exchange_t* centralExchange = localEndpoint->centralExchange;
73 uint16_t triggerCopy; 108 lossyCom__msg_t msg;
74 109 uint16_t increasedTrigger;
75 //build message 110
76 msgDraft = (0 | msg); 111 increasedTrigger =
77 msgDraft |= ((lossyCom__msg_t)receiverEndpointID << ENDPOINT_ID_SHIFT); 112 centralExchange->p2pTriggerCounter[receiverEndpointID] +1;
78 113
79 triggerCopy = localEndpoint->centralExchange->triggerCounter +1; 114 //build message
80 msgDraft |= ((lossyCom__msg_t)triggerCopy << TRIGGER_SHIFT); 115 msg = prepareMsg(increasedTrigger, receiverEndpointID, msgBody);
81 116
82 //write msg to central exchange 117 //write msg to central exchange
83 localEndpoint->centralExchange->outboxArray[localEndpoint->endpointID] = 118 centralExchange->outboxArray[localEndpoint->endpointID] = msg;
84 msgDraft; 119
85 120 //write back increased trigger counter
86 localEndpoint->centralExchange->triggerCounter = triggerCopy; 121 centralExchange->p2pTriggerCounter[receiverEndpointID] = increasedTrigger;
87 //printf("send updated trigger to %d\n", triggerCopy); 122 }
88 } 123
89 124 int inline isUnreceivedMsg(uint16_t msgTrigger,
90 void inline lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint) 125 uint16_t lastReceivedTrigger,
91 { 126 uint16_t triggerSnapshot)
92 uint16_t currentTriggerCopy; 127 {
128 if(msgTrigger > lastReceivedTrigger ||
129 msgTrigger <= triggerSnapshot)
130 {
131 // check if the message is new (msg trigger > archived trigger)
132 // and already valid (msgTrigger <= currentTriggerCopy)
133 if((msgTrigger > lastReceivedTrigger &&
134 msgTrigger <= triggerSnapshot) ||
135 ((int64_t) triggerSnapshot- // check for triggerCounterOverflow
136 (int64_t)lastReceivedTrigger < -MAX_TRIGGER/2))
137 {
138 return TRUE;
139 }
140 }
141 return FALSE;
142 }
143
144 void lossyCom__receiveMsg(lossyCom__endpoint_t* localEndpoint)
145 {
146 uint16_t broadcastTriggerSnapshot, p2pTriggerSnapshot;
147 lossyCom__exchange_t* centralExchange;
93 lossyCom__endpointID_t senderEndpointID; 148 lossyCom__endpointID_t senderEndpointID;
94 lossyCom__endpointID_t receiverID; 149 lossyCom__endpointID_t receiverEndpointID;
95 lossyCom__msg_t msgCopy; 150 lossyCom__msg_t msgCopy;
96 uint16_t msgTrigger; 151 uint16_t msgTrigger;
97 lossyCom__msgBody_t msgBody; 152 lossyCom__msgBody_t msgBody;
98 153
99 senderEndpointID = 0; 154 centralExchange = localEndpoint->centralExchange;
100 currentTriggerCopy = localEndpoint->centralExchange->triggerCounter; 155 //save trigger counter to know find valid messages
156 broadcastTriggerSnapshot = centralExchange->BroadcastTriggerCounter;
157 p2pTriggerSnapshot =
158 centralExchange->p2pTriggerCounter[localEndpoint->endpointID];
101 159
102 //new message arrived if trigger counter is higher than the last time read 160 //new message arrived if trigger counter is higher than the last time read
103 if(currentTriggerCopy > localEndpoint->localTriggerCopy) 161 if( broadcastTriggerSnapshot > localEndpoint->lastReceivedBroadcastTrigger ||
162 p2pTriggerSnapshot > localEndpoint->lastReceivedp2pTrigger)
104 { 163 {
105 while(senderEndpointID < localEndpoint->centralExchange->numEndpoints) 164 senderEndpointID = 0;
165 //search outboxes for new messages
166 while(senderEndpointID < centralExchange->numEndpoints)
106 { 167 {
168 //ignore own outbox
107 if(senderEndpointID != localEndpoint->endpointID) 169 if(senderEndpointID != localEndpoint->endpointID)
108 { 170 {
109 msgCopy = localEndpoint->centralExchange->outboxArray[senderEndpointID]; 171 msgCopy = centralExchange->outboxArray[senderEndpointID];
110 msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT); 172 msgTrigger = 0xFFFF & (msgCopy >> TRIGGER_SHIFT);
173 receiverEndpointID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT);
111 174
112 175 if(receiverEndpointID == BROADCAST_ID){//receive broadcast message
113 if(msgTrigger > localEndpoint->localTriggerCopy || 176 if(isUnreceivedMsg(msgTrigger,
114 msgTrigger <= currentTriggerCopy) 177 localEndpoint->lastReceivedBroadcastTrigger,
115 { 178 broadcastTriggerSnapshot))
116 // check if the message is new (msg trigger > archived trigger)
117 // and already valid (msgTrigger <= currentTriggerCopy)
118 if((msgTrigger > localEndpoint->localTriggerCopy &&
119 msgTrigger <= currentTriggerCopy) ||
120 ((int64_t)currentTriggerCopy- // check for triggerCounterOverflow
121 (int64_t)localEndpoint->localTriggerCopy < -MAX_TRIGGER/2))
122 { 179 {
123 //let the message handler parse the message 180 //let the message handler parse the message
124 msgBody = 0xFFFFFFFF & msgCopy; 181 msgBody = 0xFFFFFFFF & msgCopy;
125 receiverID = 0xFFFF & (msgCopy >> ENDPOINT_ID_SHIFT);
126 //only receive broadcast and p2p for own receiverID 182 //only receive broadcast and p2p for own receiverID
127 if(receiverID == BROADCAST_ID || 183
128 receiverID == localEndpoint->endpointID) 184 (*(localEndpoint->msgHandler))(senderEndpointID,
129 { 185 msgBody,
130 (*(localEndpoint->msgHandler))(senderEndpointID, 186 localEndpoint->msgHandlerData);
131 msgBody, 187 }
132 localEndpoint->msgHandlerData); 188 }else{//point 2 point message
133 } 189 if(receiverEndpointID == localEndpoint->endpointID &&
134 } 190 isUnreceivedMsg(msgTrigger,
191 localEndpoint->lastReceivedp2pTrigger,
192 p2pTriggerSnapshot))
193 {
194 //let the message handler parse the message
195 msgBody = 0xFFFFFFFF & msgCopy;
196 //only receive broadcast and p2p for own receiverID
197
198 (*(localEndpoint->msgHandler))(senderEndpointID,
199 msgBody,
200 localEndpoint->msgHandlerData);
201 }
135 } 202 }
136 } 203 }
137 senderEndpointID++; 204 senderEndpointID++;
138 } 205 }//search outbox loop
139 } 206 }
140 //save last TriggerCounter of last parsed Msg 207 //save last TriggerCounter of last parsed Msg
141 localEndpoint->localTriggerCopy = currentTriggerCopy; 208 localEndpoint->lastReceivedBroadcastTrigger = broadcastTriggerSnapshot;
142 } 209 localEndpoint->lastReceivedp2pTrigger = p2pTriggerSnapshot;
210 }