/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
/*   File:         pthreads_kmeans.c  (OpenMP version)                     */
/*   Description:  Implementation of simple k-means clustering algorithm     */
/*                 This program takes an array of N data objects, each with  */
/*                 M coordinates and performs a k-means clustering given a   */
/*                 user-provided value of the number of clusters (K). The    */
/*                 clustering results are saved in 2 arrays:                 */
/*                 1. a returned array of size [K][N] indicating the center  */
/*                    coordinates of K clusters                              */
/*                 2. membership[N] stores the cluster center ids, each      */
/*                    corresponding to the cluster a data object is assigned */
/*                                                                           */
/*   Author:  Wei-keng Liao                                                  */
/*            ECE Department, Northwestern University                        */
/*            email: wkliao@ece.northwestern.edu                             */
/*   Copyright, 2005, Wei-keng Liao                                          */
/*                                                                           */
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <time.h>
#include <math.h>
#include "kmeans.h"

#include "VPThread_lib/VPThread.h"

#define PREC 300

struct barrier_t
{
    int counter;
    int nthreads;
    int32 mutex;
    int32 cond;
};
typedef struct barrier_t barrier;

extern int nthreads; /* Thread count */
double delta; /* Delta is a value between 0 and 1 describing the percentage of objects which changed cluster membership */
volatile int finished;

barrier barr;
int32 lock1;
pthread_attr_t attr;

void inline barrier_init(barrier *barr, int nthreads, VirtProcr *VProc)
{
    barr->counter = 0;
    barr->nthreads = nthreads;
    barr->mutex   = VPThread__make_mutex(VProc);
    barr->cond    = VPThread__make_cond(barr->mutex, VProc);
}

void inline barrier_wait(barrier *barr, VirtProcr *VProc)
{
    int i;
    
    VPThread__mutex_lock(barr->mutex, VProc);
    barr->counter++;
    if(barr->counter == barr->nthreads)
    {
        barr->counter = 0;
        for(i=0; i < barr->nthreads; i++)
            VPThread__cond_signal(barr->cond, VProc);
    }
    else
    {
        VPThread__cond_wait(barr->cond, VProc);
    }
    VPThread__mutex_unlock(barr->mutex, VProc);
}

/*
*	Struct: input
*	-------------
*	Encapsulates all the input data for the benchmark, i.e. the object list,
*	clustering output and the input statistics.
*/
struct input {
    int t;
    double **objects;				/* Object list */
    double  **clusters;       		/* Cluster list, out: [numClusters][numCoords] */
    int    *membership;				/* For each object, contains the index of the cluster it currently belongs to */
    int    **local_newClusterSize; 	/* Thread-safe, [nthreads][numClusters] */
    double ***local_newClusters;    	/* Thread-safe, [nthreads][numClusters][numCoords] */
    int numObjs,numClusters,numCoords;
};

/*
*	Function: euclid_dist_2
*	-----------------------
*	Computes the square of the euclidean distance between two multi-dimensional points.
*/
__inline static double euclid_dist_2(int numdims, double *coord1, double *coord2) {
    int i;
    double ans=0.0;

    for (i=0; i<numdims; i++)
        ans += (coord1[i]-coord2[i]) * (coord1[i]-coord2[i]);

    return(ans);
}

/*
*	Function: find_nearest_cluster
*	------------------------------
*	Function determining the cluster center which is closest to the given object.
*	Returns the index of that cluster center.
*/
__inline static int find_nearest_cluster(int numClusters, int numCoords, double *object, double **clusters) {
    int   index, i;
    double dist, min_dist;
    
    /* find the cluster id that has min distance to object */
    index    = 0;
    min_dist = euclid_dist_2(numCoords, object, clusters[0]);
    for (i=1; i<numClusters; i++) {
        dist = euclid_dist_2(numCoords, object, clusters[i]);
    
        /* no need square root */
        if (dist < min_dist) { /* find the min and its array index */
            min_dist = dist;
            index    = i;
        }
    }
    return index;
}

void work(struct input *x, VirtProcr *VProc){
    int tid = x->t;
    double local_delta=0;
    int i;
    for (i = tid; i < x->numObjs; i += nthreads) {
        /* find the array index of nearest cluster center */
        int index = find_nearest_cluster(x->numClusters, x->numCoords,
                                    x->objects[i], x->clusters);

        /* if membership changes, increase delta by 1 */        
        if (x->membership[i] != index)
            local_delta += 1.0;
        /* assign the membership to object i */
        x->membership[i] = index;

        /* update new cluster centers : sum of all objects located
        within (average will be performed later) */
        x->local_newClusterSize[tid][index]++;
	int j;
        for (j=0; j < x->numCoords; j++)
            x->local_newClusters[tid][index][j] += x->objects[i][j];

    }
    VPThread__mutex_lock(lock1, VProc);
    delta +=local_delta;
    VPThread__mutex_unlock(lock1, VProc);
}

/*
*	Function: thread function work
*	--------------
*	Worker function for threading. Work distribution is done so that each thread computers
*/
void tfwork(void *ip, VirtProcr *VProc)
{
    struct input *x;
    x = (struct input *)ip;    
    
    for(;;){
        barrier_wait(&barr, VProc);
        if (finished){            
            break;
        }
        work(x, VProc);
        barrier_wait(&barr, VProc);
    }
    
	//pthread_exit(NULL);
    VPThread__dissipate_thread(VProc);
}

/*
*	Function: create_array_2d_f
*	--------------------------
*	Allocates memory for a 2-dim double array as needed for the algorithm.
*/
double** create_array_2d_f(int height, int width, VirtProcr *VProc) {
	double** ptr;
	int i;
	ptr = VPThread__malloc(height * sizeof(double*), VProc);
	assert(ptr != NULL);
	ptr[0] = VPThread__malloc(width * height * sizeof(double), VProc);
	assert(ptr[0] != NULL);
	/* Assign pointers correctly */
	for(i = 1; i < height; i++)
		ptr[i] = ptr[i-1] + width; 
	return ptr;
}

/*
*	Function: create_array_2Dd_i
*	--------------------------
*	Allocates memory for a 2-dim integer array as needed for the algorithm.
*/
int** create_array_2d_i(int height, int width, VirtProcr *VProc) {
	int** ptr;
	int i;
	ptr = VPThread__malloc(height * sizeof(int*), VProc);
	assert(ptr != NULL);
	ptr[0] = VPThread__malloc(width * height * sizeof(int), VProc);
	assert(ptr[0] != NULL);
	/* Assign pointers correctly */
	for(i = 1; i < height; i++)
		ptr[i] = ptr[i-1] + width; 
	return ptr;
}

/*
*	Function: pthreads_kmeans
*	-------------------------
*	Algorithm main function. Returns a 2D array of cluster centers of size [numClusters][numCoords].
*/
void pthreads_kmeans(void *data, VirtProcr *VProc)
{
    struct call_data *cluster_data = (struct call_data*)data;
    //int is_perform_atomic = cluster_data->is_perform_atomic;	/* in: */
    double **objects      = cluster_data->objects;           	/* in: [numObjs][numCoords] */
    int     numCoords     = cluster_data->numCoords;         	/* no. coordinates */
    int     numObjs       = cluster_data->numObjs;           	/* no. objects */
    int     numClusters   = cluster_data->numClusters;     	/* no. clusters */
    double  threshold     = cluster_data->threshold;         	/* % objects change membership */
    int    *membership    = cluster_data->membership;        	/* out: [numObjs] */ 

    int      i, j, k, loop = 0;
    int     *newClusterSize; /* [numClusters]: no. objects assigned in each
                                new cluster */
    double  **clusters    = cluster_data->clusters;       /* out: [numClusters][numCoords] */
    double  **newClusters;    /* [numClusters][numCoords] */
    //double   timing;
    int    **local_newClusterSize; /* [nthreads][numClusters] */
    double ***local_newClusters;    /* [nthreads][numClusters][numCoords] */

	VirtProcr **thread;

	/* === MEMORY SETUP === */

	/* [numClusters] clusters of [numCoords] double coordinates each */
        //Set pointers
        for(i = 1; i < numClusters; i++)
		clusters[i] = clusters[i-1] + numCoords; 

    /* Pick first numClusters elements of objects[] as initial cluster centers */
    for (i=0; i < numClusters; i++)
        for (j=0; j < numCoords; j++)
            clusters[i][j] = objects[i][j];

    /* Initialize membership, no object belongs to any cluster yet */
    for (i = 0; i < numObjs; i++) 
		membership[i] = -1;

	/* newClusterSize holds information on the count of members in each cluster */
    newClusterSize = (int*)VPThread__malloc(numClusters * sizeof(int), VProc);
    assert(newClusterSize != NULL);

	/* newClusters holds the coordinates of the freshly created clusters */
	newClusters = create_array_2d_f(numClusters, numCoords, VProc);
	local_newClusterSize = create_array_2d_i(nthreads, numClusters, VProc);

    /* local_newClusters is a 3D array */
    local_newClusters    = (double***)VPThread__malloc(nthreads * sizeof(double**), VProc);
    assert(local_newClusters != NULL);
    local_newClusters[0] = (double**) VPThread__malloc(nthreads * numClusters * sizeof(double*), VProc);
    assert(local_newClusters[0] != NULL);

	/* Set up the pointers */
    for (i = 1; i < nthreads; i++)
        local_newClusters[i] = local_newClusters[i-1] + numClusters;

    for (i = 0; i < nthreads; i++) {
        for (j = 0; j < numClusters; j++) {
            local_newClusters[i][j] = (double*)VPThread__malloc(numCoords * sizeof(double), VProc);
            assert(local_newClusters[i][j] != NULL);
        }
    }
    /* Perform thread setup */
    thread = (VirtProcr**)VPThread__malloc(nthreads * sizeof(VirtProcr*), VProc);

    barrier_init(&barr, nthreads, VProc);
    lock1 = VPThread__make_mutex(VProc);
    finished=0;
    
    struct input *ip = VPThread__malloc(nthreads * sizeof(struct input), VProc);
    /* Provide thread-safe memory locations for each worker */
    for(i = 0; i < nthreads; i++){
        ip[i].t = i;
        ip[i].objects=objects;
        ip[i].clusters=clusters;
        ip[i].membership=membership;
        ip[i].local_newClusterSize=local_newClusterSize;
        ip[i].local_newClusters=local_newClusters;
        ip[i].numObjs=numObjs;
        ip[i].numClusters=numClusters;
        ip[i].numCoords=numCoords;

        if (i>0){
            thread[i] = VPThread__create_thread(tfwork, (void*)&ip[i], VProc);
        }
    }
    
	/* === COMPUTATIONAL PHASE === */
    
    do {
        delta = 0.0;
        barrier_wait(&barr, VProc);
        work(&ip[0], VProc);
        
        barrier_wait(&barr, VProc);
		/* Let the main thread perform the array reduction */
		for (i = 0; i < numClusters; i++) {
			for (j = 0; j < nthreads; j++) {
				newClusterSize[i] += local_newClusterSize[j][i];
				local_newClusterSize[j][i] = 0.0;
				for (k = 0; k < numCoords; k++) {
					newClusters[i][k] += local_newClusters[j][i][k];
					local_newClusters[j][i][k] = 0.0;
				}
			}
		}
	
		/* Average the sum and replace old cluster centers with newClusters */
		for (i = 0; i < numClusters; i++) {
			for (j = 0; j < numCoords; j++) {
				if (newClusterSize[i] > 1)
				clusters[i][j] = newClusters[i][j] / newClusterSize[i];
				newClusters[i][j] = 0.0;   /* set back to 0 */
			}
			newClusterSize[i] = 0;   /* set back to 0 */
		}
		
		delta /= numObjs;
    } while (loop++ < PREC && delta > threshold);
    
    // Changing to a fixed number of iterations is for benchmarking reasons. I know it affects the results compared to the original program,
    // but minor double precision floating point inaccuracies caused by threading would otherwise lead to huge differences in computed
    // iterations, therefore making benchmarking completely unreliable.

    finished=1;
    barrier_wait(&barr, VProc);
    

    VPThread__free(ip, VProc);
    VPThread__free(thread, VProc);

    VPThread__free(local_newClusterSize[0], VProc);
    VPThread__free(local_newClusterSize, VProc);
        
    for (i = 0; i < nthreads; i++)
        for (j = 0; j < numClusters; j++)
            VPThread__free(local_newClusters[i][j], VProc);
    VPThread__free(local_newClusters[0], VProc);
    VPThread__free(local_newClusters, VProc);

    VPThread__free(newClusters[0], VProc);
    VPThread__free(newClusters, VProc);
    VPThread__free(newClusterSize, VProc);
    
    (cluster_data)->clusters = clusters;
    
    VPThread__dissipate_thread(VProc);
}

