| rev |
line source |
|
seanhalle@0
|
1 /*
|
|
seanhalle@0
|
2 *
|
|
seanhalle@0
|
3 */
|
|
seanhalle@0
|
4
|
|
seanhalle@0
|
5 #include "main.h"
|
|
seanhalle@0
|
6 #include <pthread.h>
|
|
seanhalle@0
|
7 #include <sched.h>
|
|
seanhalle@0
|
8
|
|
seanhalle@0
|
9 /*
|
|
seanhalle@0
|
10 * Producer.
|
|
seanhalle@0
|
11 *
|
|
seanhalle@0
|
12 * Birth function for thread that performs the producer behavior
|
|
seanhalle@0
|
13 *
|
|
seanhalle@0
|
14 * Note: is pinned to a core, to facilitate collecting measurements
|
|
seanhalle@0
|
15 */
|
|
seanhalle@0
|
16 void*
|
|
seanhalle@0
|
17 producer_birthFn( void* _params )
|
|
seanhalle@0
|
18 {
|
|
seanhalle@0
|
19 cpu_set_t cpuinfo;
|
|
seanhalle@0
|
20 int lastTupleIter, oldConsumerReceivedACKNum;
|
|
seanhalle@0
|
21
|
|
seanhalle@0
|
22 ProducerParams *params = (ProducerParams *)_params;
|
|
seanhalle@0
|
23
|
|
seanhalle@0
|
24 lastTupleIter = 0; //compared to global tupleIter while waiting
|
|
seanhalle@0
|
25 oldConsumerReceivedACKNum = 0; //used when waiting for consumer to receive
|
|
seanhalle@0
|
26
|
|
seanhalle@0
|
27 /* --------------------------------------------------
|
|
seanhalle@0
|
28 * Pin thread to core, the producers are divided
|
|
seanhalle@0
|
29 * equally over all cores. Pinning prohibits the
|
|
seanhalle@0
|
30 * switching of cores so that perf counter and TSC values remain
|
|
seanhalle@0
|
31 * from the same core between readings. Pinning shouldn't
|
|
seanhalle@0
|
32 * affect results.. may be odd case when num thds doesn't divide into
|
|
seanhalle@0
|
33 * num Cores
|
|
seanhalle@0
|
34 * --------------------------------------------------
|
|
seanhalle@0
|
35 */
|
|
seanhalle@0
|
36 /*
|
|
seanhalle@0
|
37 CPU_ZERO( &cpuinfo );
|
|
seanhalle@0
|
38 CPU_SET( params->coreID, &cpuinfo );
|
|
seanhalle@0
|
39 pthread_setaffinity_np( pthread_self(), sizeof(cpuinfo), &cpuinfo );
|
|
seanhalle@0
|
40 pthread_yield(); //get off the core, so next can be created on it
|
|
seanhalle@0
|
41 uint32_t cpuid = sched_getcpu();
|
|
seanhalle@0
|
42 */
|
|
seanhalle@0
|
43
|
|
seanhalle@0
|
44
|
|
seanhalle@0
|
45 /*Protocol:
|
|
seanhalle@0
|
46 * wait for change in tupleIter (save updated tuple num for next time)
|
|
seanhalle@0
|
47 * Get producer lock (only one producer at a time)
|
|
seanhalle@0
|
48 * write into comm vars
|
|
seanhalle@0
|
49 * get current ACK number
|
|
seanhalle@0
|
50 * notify consumer
|
|
seanhalle@0
|
51 * wait for ACK (get ACK lock, check on change in ACK number)
|
|
seanhalle@0
|
52 * release producer lock
|
|
seanhalle@0
|
53 * if not done, repeat
|
|
seanhalle@0
|
54 */
|
|
seanhalle@0
|
55 while( lastTupleIter < params->numTuplesToCreate )
|
|
seanhalle@0
|
56 {
|
|
seanhalle@0
|
57 //wait for change in tupleNum (save updated tuple num for next time)
|
|
seanhalle@0
|
58 pthread_mutex_lock( &tupleIterLock );
|
|
seanhalle@0
|
59 while( lastTupleIter == tupleIter )
|
|
seanhalle@0
|
60 {
|
|
seanhalle@0
|
61 pthread_cond_wait( &tupleIterCond,
|
|
seanhalle@0
|
62 &tupleIterLock );
|
|
seanhalle@0
|
63 }
|
|
seanhalle@0
|
64 pthread_mutex_unlock( &tupleIterLock );
|
|
seanhalle@0
|
65
|
|
seanhalle@0
|
66 lastTupleIter = tupleIter; //save for next time through loop
|
|
seanhalle@0
|
67
|
|
seanhalle@0
|
68 DEBUG__printf2("Producer: %d starting tuple: %d\n", params->producerID, tupleIter);
|
|
seanhalle@0
|
69
|
|
seanhalle@0
|
70 //Two vars used to comm with consumer. One holds message to send,
|
|
seanhalle@0
|
71 // other holds ID of producer sending.
|
|
seanhalle@0
|
72 //Protect the two variables with a lock, that only one
|
|
seanhalle@0
|
73 // producer can get. Update the variable with the message to be
|
|
seanhalle@0
|
74 // communicated, and write ID of sender in second var.
|
|
seanhalle@0
|
75
|
|
seanhalle@0
|
76 //Get producer lock
|
|
seanhalle@0
|
77 pthread_mutex_lock( &producerAccessMutex );
|
|
seanhalle@0
|
78
|
|
seanhalle@0
|
79 // write into comm vars
|
|
seanhalle@0
|
80 producerMessage = tupleIter; //just a dummy -- overhead meas, do nothing
|
|
seanhalle@0
|
81 currProductionNum += 1;
|
|
seanhalle@0
|
82
|
|
seanhalle@0
|
83 // get current ACK number
|
|
seanhalle@0
|
84 oldConsumerReceivedACKNum = currConsumerReceivedACKNum;
|
|
seanhalle@0
|
85
|
|
seanhalle@0
|
86 // notify consumer (don't think need the cond lock here -- teeter-totter)
|
|
seanhalle@0
|
87 pthread_mutex_lock( &productionReadyLock );
|
|
seanhalle@0
|
88 DEBUG__printf1("producer %d wrote msg, about to wake up consumer\n", params->producerID );
|
|
seanhalle@0
|
89 pthread_cond_broadcast( &productionReadyCond );
|
|
seanhalle@0
|
90 pthread_mutex_unlock( &productionReadyLock );
|
|
seanhalle@0
|
91
|
|
seanhalle@0
|
92 // wait for ACK (get ACK lock, check on change in ACK number)
|
|
seanhalle@0
|
93 pthread_mutex_lock( &consumerReceivedAckLock );
|
|
seanhalle@0
|
94 while( currConsumerReceivedACKNum == oldConsumerReceivedACKNum )
|
|
seanhalle@0
|
95 {
|
|
seanhalle@0
|
96 pthread_cond_wait( &consumerReceivedAckCond,
|
|
seanhalle@0
|
97 &consumerReceivedAckLock );
|
|
seanhalle@0
|
98 }
|
|
seanhalle@0
|
99 pthread_mutex_unlock( &consumerReceivedAckLock );
|
|
seanhalle@0
|
100 DEBUG__printf2("producer %d got ack %d\n", params->producerID, currConsumerReceivedACKNum );
|
|
seanhalle@0
|
101
|
|
seanhalle@0
|
102 // release producer lock (so different producer can get and send)
|
|
seanhalle@0
|
103 pthread_mutex_unlock( &producerAccessMutex );
|
|
seanhalle@0
|
104 } //if not done, do again
|
|
seanhalle@0
|
105
|
|
seanhalle@0
|
106 //Shutdown producer
|
|
seanhalle@0
|
107 pthread_exit(NULL);
|
|
seanhalle@0
|
108
|
|
seanhalle@0
|
109 }
|
|
seanhalle@0
|
110
|