comparison src/Application/producer.c @ 1:88db7b62b961

Working queue based version
author Sean Halle <seanhalle@yahoo.com>
date Wed, 10 Jul 2013 14:17:04 -0700
parents 9cf9b2091eeb
children
comparison
equal deleted inserted replaced
0:ee1e1e746014 1:70163cf233d8
8 8
9 /* 9 /*
10 * Producer. 10 * Producer.
11 * 11 *
12 * Birth function for thread that performs the producer behavior 12 * Birth function for thread that performs the producer behavior
13 *
14 * Note: is pinned to a core, to facilitate collecting measurements
15 */ 13 */
16 void* 14 void*
17 producer_birthFn( void* _params ) 15 producer_birthFn( void* _params )
18 { 16 {
19 cpu_set_t cpuinfo; 17 cpu_set_t cpuinfo;
20 int lastTupleIter, oldConsumerReceivedACKNum; 18 int lastTupleIter, oldConsumerReceivedACKNum;
21 19
22 ProducerParams *params = (ProducerParams *)_params; 20 ProducerParams *params = (ProducerParams *)_params;
23 21
24 lastTupleIter = 0; //compared to global tupleIter while waiting 22 lastTupleIter = 0; //compared to global tupleIter while waiting
25 oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
26 23
27 /* -------------------------------------------------- 24 /* --------------------------------------------------
28 * Pin thread to core, the producers are divided 25 * Pin thread to core, the producers are divided
29 * equally over all cores. Pinning prohibits the 26 * equally over all cores. Pinning prohibits the
30 * switching of cores so that perf counter and TSC values remain 27 * switching of cores so that perf counter and TSC values remain
42 */ 39 */
43 40
44 41
45 /*Protocol: 42 /*Protocol:
46 * wait for change in tupleIter (save updated tuple num for next time) 43 * wait for change in tupleIter (save updated tuple num for next time)
47 * Get producer lock (only one producer at a time) 44 * Get queue lock
48 * write into comm vars 45 * write into queue
49 * get current ACK number 46 * release queue lock
50 * notify consumer
51 * wait for ACK (get ACK lock, check on change in ACK number)
52 * release producer lock
53 * if not done, repeat 47 * if not done, repeat
54 */ 48 */
55 while( lastTupleIter < params->numTuplesToCreate ) 49 while( lastTupleIter < params->numTuplesToCreate )
56 { 50 {
57 //wait for change in tupleNum (save updated tuple num for next time) 51 //wait for change in tupleNum (save updated tuple num for next time)
59 while( lastTupleIter == tupleIter ) 53 while( lastTupleIter == tupleIter )
60 { 54 {
61 pthread_cond_wait( &tupleIterCond, 55 pthread_cond_wait( &tupleIterCond,
62 &tupleIterLock ); 56 &tupleIterLock );
63 } 57 }
64 pthread_mutex_unlock( &tupleIterLock ); 58 pthread_mutex_unlock( &tupleIterLock );
65 59
66 lastTupleIter = tupleIter; //save for next time through loop 60 lastTupleIter = tupleIter; //save for next time through loop
67 61
68 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter); 62 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
69 63
70 //Two vars used to comm with consumer. One holds message to send, 64 //Q used to comm with consumer, protected with a lock
71 // other holds ID of producer sending.
72 //Protect the two variables with a lock, that only one
73 // producer can get. Update the variable with the message to be
74 // communicated, and write ID of sender in second var.
75 65
76 //Get producer lock 66 //Get producer lock
77 pthread_mutex_lock( &producerAccessMutex ); 67 pthread_mutex_lock( &queueAccessLock );
78 68 writePrivQ( params, commQ ); //params is just a dummy pointer
79 // write into comm vars 69 pthread_mutex_unlock( &queueAccessLock );
80 producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing
81 currProductionNum += 1;
82
83 // get current ACK number
84 oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
85
86 // notify consumer (don't think need the cond lock here -- teeter-totter)
87 pthread_mutex_lock( &productionReadyLock );
88 DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
89 pthread_cond_broadcast( &productionReadyCond );
90 pthread_mutex_unlock( &productionReadyLock );
91
92 // wait for ACK (get ACK lock, check on change in ACK number)
93 pthread_mutex_lock( &consumerReceivedAckLock );
94 while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
95 {
96 pthread_cond_wait( &consumerReceivedAckCond,
97 &consumerReceivedAckLock );
98 }
99 pthread_mutex_unlock( &consumerReceivedAckLock );
100 DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
101
102 // release producer lock (so different producer can get and send)
103 pthread_mutex_unlock( &producerAccessMutex );
104 } //if not done, do again 70 } //if not done, do again
105 71
106 //Shutdown producer 72 //Shutdown producer
107 pthread_exit(NULL); 73 pthread_exit(NULL);
108 74