msach@0: /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ msach@0: /* File: pthreads_kmeans.c (OpenMP version) */ msach@0: /* Description: Implementation of simple k-means clustering algorithm */ msach@0: /* This program takes an array of N data objects, each with */ msach@0: /* M coordinates and performs a k-means clustering given a */ msach@0: /* user-provided value of the number of clusters (K). The */ msach@0: /* clustering results are saved in 2 arrays: */ msach@0: /* 1. a returned array of size [K][N] indicating the center */ msach@0: /* coordinates of K clusters */ msach@0: /* 2. membership[N] stores the cluster center ids, each */ msach@0: /* corresponding to the cluster a data object is assigned */ msach@0: /* */ msach@0: /* Author: Wei-keng Liao */ msach@0: /* ECE Department, Northwestern University */ msach@0: /* email: wkliao@ece.northwestern.edu */ msach@0: /* Copyright, 2005, Wei-keng Liao */ msach@0: /* */ msach@0: /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */ msach@0: msach@0: #include msach@0: #include msach@0: #include msach@0: #include msach@0: #include msach@0: #include "kmeans.h" msach@0: msach@0: #define PREC 300 msach@0: msach@0: char __ProgrammName[] = "kmeans"; msach@0: char __DataSet[255]; msach@0: msach@0: extern int nthreads; /* Thread count */ msach@0: double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */ msach@0: volatile int finished; msach@0: msach@0: pthread_barrier_t barr; msach@0: pthread_mutex_t lock1; msach@0: pthread_attr_t attr; msach@0: msach@0: /* msach@0: * Struct: input msach@0: * ------------- msach@0: * Encapsulates all the input data for the benchmark, i.e. the object list, msach@0: * clustering output and the input statistics. msach@0: */ msach@0: struct input { msach@0: int t; msach@0: double **objects; /* Object list */ msach@0: double **clusters; /* Cluster list, out: [numClusters][numCoords] */ msach@0: int *membership; /* For each object, contains the index of the cluster it currently belongs to */ msach@0: int **local_newClusterSize; /* Thread-safe, [nthreads][numClusters] */ msach@0: double ***local_newClusters; /* Thread-safe, [nthreads][numClusters][numCoords] */ msach@0: int numObjs,numClusters,numCoords; msach@0: }; msach@0: msach@0: /* msach@0: * Function: euclid_dist_2 msach@0: * ----------------------- msach@0: * Computes the square of the euclidean distance between two multi-dimensional points. msach@0: */ msach@0: __inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) { msach@0: int i; msach@0: double ans=0.0; msach@0: msach@0: for (i=0; it; msach@0: double local_delta=0; msach@0: int i; msach@0: for (i = tid; i < x->numObjs; i += nthreads) { msach@0: /* find the array index of nearest cluster center */ msach@0: int index = find_nearest_cluster(x->numClusters, x->numCoords, msach@0: x->objects[i], x->clusters); msach@0: msach@0: /* if membership changes, increase delta by 1 */ msach@0: if (x->membership[i] != index) msach@0: local_delta += 1.0; msach@0: /* assign the membership to object i */ msach@0: x->membership[i] = index; msach@0: msach@0: /* update new cluster centers : sum of all objects located msach@0: within (average will be performed later) */ msach@0: x->local_newClusterSize[tid][index]++; msach@0: int j; msach@0: for (j=0; j < x->numCoords; j++) msach@0: x->local_newClusters[tid][index][j] += x->objects[i][j]; msach@0: msach@0: } msach@0: pthread_mutex_lock(&lock1); msach@0: delta +=local_delta; msach@0: pthread_mutex_unlock(&lock1); msach@0: } msach@0: /* msach@0: * Function: thread function work msach@0: * -------------- msach@0: * Worker function for threading. Work distribution is done so that each thread computers msach@0: */ msach@0: void* tfwork(void *ip) msach@0: { msach@0: struct input *x; msach@0: x = (struct input *)ip; msach@0: msach@0: for(;;){ msach@0: pthread_barrier_wait(&barr); msach@0: if (finished){ msach@0: break; msach@0: } msach@0: work(x); msach@0: pthread_barrier_wait(&barr); msach@0: } msach@0: msach@0: pthread_exit(NULL); msach@0: } msach@0: msach@0: /* msach@0: * Function: create_array_2d_f msach@0: * -------------------------- msach@0: * Allocates memory for a 2-dim double array as needed for the algorithm. msach@0: */ msach@0: double** create_array_2d_f(int height, int width) { msach@0: double** ptr; msach@0: int i; msach@0: ptr = calloc(height, sizeof(double*)); msach@0: assert(ptr != NULL); msach@0: ptr[0] = calloc(width * height, sizeof(double)); msach@0: assert(ptr[0] != NULL); msach@0: /* Assign pointers correctly */ msach@0: for(i = 1; i < height; i++) msach@0: ptr[i] = ptr[i-1] + width; msach@0: return ptr; msach@0: } msach@0: msach@0: /* msach@0: * Function: create_array_2Dd_i msach@0: * -------------------------- msach@0: * Allocates memory for a 2-dim integer array as needed for the algorithm. msach@0: */ msach@0: int** create_array_2d_i(int height, int width) { msach@0: int** ptr; msach@0: int i; msach@0: ptr = calloc(height, sizeof(int*)); msach@0: assert(ptr != NULL); msach@0: ptr[0] = calloc(width * height, sizeof(int)); msach@0: assert(ptr[0] != NULL); msach@0: /* Assign pointers correctly */ msach@0: for(i = 1; i < height; i++) msach@0: ptr[i] = ptr[i-1] + width; msach@0: return ptr; msach@0: } msach@0: msach@0: /* msach@0: * Function: pthreads_kmeans msach@0: * ------------------------- msach@0: * Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords]. msach@0: */ msach@0: double** pthreads_kmeans(int is_perform_atomic, /* in: */ msach@0: double **objects, /* in: [numObjs][numCoords] */ msach@0: int numCoords, /* no. coordinates */ msach@0: int numObjs, /* no. objects */ msach@0: int numClusters, /* no. clusters */ msach@0: double threshold, /* % objects change membership */ msach@0: int *membership) /* out: [numObjs] */ msach@0: { msach@0: msach@0: int i, j, k, index, loop = 0, rc; msach@0: int *newClusterSize; /* [numClusters]: no. objects assigned in each msach@0: new cluster */ msach@0: double **clusters; /* out: [numClusters][numCoords] */ msach@0: double **newClusters; /* [numClusters][numCoords] */ msach@0: double timing; msach@0: int **local_newClusterSize; /* [nthreads][numClusters] */ msach@0: double ***local_newClusters; /* [nthreads][numClusters][numCoords] */ msach@0: msach@0: pthread_t *thread; msach@0: msach@0: /* === MEMORY SETUP === */ msach@0: msach@0: /* [numClusters] clusters of [numCoords] double coordinates each */ msach@0: clusters = create_array_2d_f(numClusters, numCoords); msach@0: msach@0: /* Pick first numClusters elements of objects[] as initial cluster centers */ msach@0: for (i=0; i < numClusters; i++) msach@0: for (j=0; j < numCoords; j++) msach@0: clusters[i][j] = objects[i][j]; msach@0: msach@0: /* Initialize membership, no object belongs to any cluster yet */ msach@0: for (i = 0; i < numObjs; i++) msach@0: membership[i] = -1; msach@0: msach@0: /* newClusterSize holds information on the count of members in each cluster */ msach@0: newClusterSize = (int*)calloc(numClusters, sizeof(int)); msach@0: assert(newClusterSize != NULL); msach@0: msach@0: /* newClusters holds the coordinates of the freshly created clusters */ msach@0: newClusters = create_array_2d_f(numClusters, numCoords); msach@0: local_newClusterSize = create_array_2d_i(nthreads, numClusters); msach@0: msach@0: /* local_newClusters is a 3D array */ msach@0: local_newClusters = (double***)malloc(nthreads * sizeof(double**)); msach@0: assert(local_newClusters != NULL); msach@0: local_newClusters[0] = (double**) malloc(nthreads * numClusters * sizeof(double*)); msach@0: assert(local_newClusters[0] != NULL); msach@0: msach@0: /* Set up the pointers */ msach@0: for (i = 1; i < nthreads; i++) msach@0: local_newClusters[i] = local_newClusters[i-1] + numClusters; msach@0: msach@0: for (i = 0; i < nthreads; i++) { msach@0: for (j = 0; j < numClusters; j++) { msach@0: local_newClusters[i][j] = (double*)calloc(numCoords, sizeof(double)); msach@0: assert(local_newClusters[i][j] != NULL); msach@0: } msach@0: } msach@0: /* Perform thread setup */ msach@0: thread = (pthread_t*)calloc(nthreads, sizeof(pthread_t)); msach@0: msach@0: printf("nthreads %d\n", nthreads); msach@0: pthread_barrier_init(&barr, NULL, nthreads); msach@0: pthread_attr_init(&attr); msach@0: pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); msach@0: pthread_mutex_init(&lock1, NULL); msach@0: finished=0; msach@0: msach@0: struct input *ip = malloc(nthreads * sizeof(struct input)); msach@0: /* Provide thread-safe memory locations for each worker */ msach@0: for(i = 0; i < nthreads; i++){ msach@0: ip[i].t = i; msach@0: ip[i].objects=objects; msach@0: ip[i].clusters=clusters; msach@0: ip[i].membership=membership; msach@0: ip[i].local_newClusterSize=local_newClusterSize; msach@0: ip[i].local_newClusters=local_newClusters; msach@0: ip[i].numObjs=numObjs; msach@0: ip[i].numClusters=numClusters; msach@0: ip[i].numCoords=numCoords; msach@0: msach@0: if (i>0){ msach@0: rc = pthread_create(&thread[i], &attr, tfwork, (void *)&ip[i]); msach@0: if (rc) { msach@0: fprintf(stderr, "ERROR: Return Code For Thread Creation Is %d\n", rc); msach@0: exit(EXIT_FAILURE); msach@0: } msach@0: } msach@0: } msach@0: msach@0: /* === COMPUTATIONAL PHASE === */ msach@0: msach@0: do { msach@0: delta = 0.0; msach@0: pthread_barrier_wait(&barr); msach@0: work(&ip[0]); msach@0: msach@0: pthread_barrier_wait(&barr); msach@0: /* Let the main thread perform the array reduction */ msach@0: for (i = 0; i < numClusters; i++) { msach@0: for (j = 0; j < nthreads; j++) { msach@0: newClusterSize[i] += local_newClusterSize[j][i]; msach@0: local_newClusterSize[j][i] = 0.0; msach@0: for (k = 0; k < numCoords; k++) { msach@0: newClusters[i][k] += local_newClusters[j][i][k]; msach@0: local_newClusters[j][i][k] = 0.0; msach@0: } msach@0: } msach@0: } msach@0: msach@0: /* Average the sum and replace old cluster centers with newClusters */ msach@0: for (i = 0; i < numClusters; i++) { msach@0: for (j = 0; j < numCoords; j++) { msach@0: if (newClusterSize[i] > 1) msach@0: clusters[i][j] = newClusters[i][j] / newClusterSize[i]; msach@0: newClusters[i][j] = 0.0; /* set back to 0 */ msach@0: } msach@0: newClusterSize[i] = 0; /* set back to 0 */ msach@0: } msach@0: msach@0: delta /= numObjs; msach@0: } while (loop++ < PREC && delta > threshold); msach@0: msach@0: // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program, msach@0: // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed msach@0: // iterations, therefore making benchmarking completely unreliable. msach@0: msach@0: finished=1; msach@0: pthread_barrier_wait(&barr); msach@0: msach@0: for(i = 1; i < nthreads; i++) { msach@0: rc = pthread_join(thread[i], NULL); msach@0: if (rc) { msach@0: fprintf(stderr, "ERROR: Return Code For Thread Join Is %d\n", rc); msach@0: exit(EXIT_FAILURE); msach@0: } msach@0: } msach@0: msach@0: free(ip); msach@0: free(thread); msach@0: pthread_barrier_destroy(&barr); msach@0: pthread_mutex_destroy(&lock1); msach@0: pthread_attr_destroy(&attr); msach@0: msach@0: free(local_newClusterSize[0]); msach@0: free(local_newClusterSize); msach@0: msach@0: for (i = 0; i < nthreads; i++) msach@0: for (j = 0; j < numClusters; j++) msach@0: free(local_newClusters[i][j]); msach@0: free(local_newClusters[0]); msach@0: free(local_newClusters); msach@0: msach@0: free(newClusters[0]); msach@0: free(newClusters); msach@0: free(newClusterSize); msach@0: return clusters; msach@0: } msach@0: