Accéder au contenu.
Menu Sympa

starpu-devel - [Starpu-devel] How to use starpu_mpi_task_insert

Objet : Developers list for StarPU

Archives de la liste

[Starpu-devel] How to use starpu_mpi_task_insert


Chronologique Discussions 
  • From: Ravi Teja Mullapudi <raviteja.mullapudi@gmail.com>
  • To: starpu-devel@lists.gforge.inria.fr
  • Cc: Roshan D <roshan18@gmail.com>, Uday Reddy <udayreddy@gmail.com>
  • Subject: [Starpu-devel] How to use starpu_mpi_task_insert
  • Date: Sat, 30 Mar 2013 19:19:29 +0530
  • List-archive: <http://lists.gforge.inria.fr/pipermail/starpu-devel>
  • List-id: "Developers list. For discussion of new features, code changes, etc." <starpu-devel.lists.gforge.inria.fr>

Hi,

I am trying to use starpu_mpi_task_insert to avoid explicitly doing MPI communication and letting starpu handle it. I am trying to do a floyd warshall implantation. The following is the outline of the code.

Task Creation
create_dist_tasks(int matrix_size, int block_size, starpu_data_handle_t * data_handles)
{
    int i,j,k;
    int ret;
    int num_blocks = matrix_size/block_size;

    /* Create non-entry tasks */
    for(k = 0; k < matrix_size; k++) {
        for(i = 0; i < num_blocks ; i++) {
            for(j = 0; j < num_blocks; j++) {
                starpu_mpi_insert_task(MPI_COMM_WORLD, &dist_cl,
                                       STARPU_VALUE, &k,
                                       STARPU_VALUE, &i,
                                       STARPU_VALUE, &j,
                                       STARPU_RW, data_handles[i*num_blocks +j],
                                       STARPU_R,  data_handles[i*num_blocks +(k/block_size)],
                                       STARPU_R,  data_handles[(k/block_size)*num_blocks + j],
                                       STARPU_EXECUTE_ON_DATA, data_handles[i*num_blocks +j],
                                       0);
            }
        }
    }
    return 0;
}

Registering Data
    for(i = 0; i < num_blocks; i++) {
        for (j = 0; j < num_blocks; j++) {
            int mpi_rank = data_distribution(i, j, num_nodes);
            if (mpi_rank == my_rank)
            {
                /* Owning data */
                starpu_vector_data_register(&data_handles[i*num_blocks + j], 0,
                        (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
            }
            else
            {
                /* This can probably be further optimized */
                starpu_vector_data_register(&data_handles[i*num_blocks + j], -1,
                        (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
            }

            /* Marking where to find what data */
            starpu_data_set_rank(data_handles[i*num_blocks + j], mpi_rank);
        }
    }

Codelet

void dist_task(void *buffers[], void *cl_arg)
{
    int k, i, j, block_size;
    starpu_codelet_unpack_args(cl_arg, &k, &i, &j);

    /* Get block size */
    block_size = STARPU_VECTOR_GET_NX(buffers[0]);

    double *block_ij = (double *)STARPU_VECTOR_GET_PTR(buffers[0]);
    double *block_ik = (double *)STARPU_VECTOR_GET_PTR(buffers[1]);
    double *block_kj = (double *)STARPU_VECTOR_GET_PTR(buffers[2]);

    for (int p = 0; p < block_size; p++){
        for (int q = 0; q < block_size; q++) {
            for (int r = 0; r < block_size; r++) {
                block_ij[q*block_size +r] = min(block_ik[q*block_size + p] + block_kj[p*block_size + r], block_ij[q*block_size +r]);
            }
        }
    }
}

The problem with the above approach is I hit the following assertion 
starpu_mpi_insert_task: Assertion `mpi_tag >= 0 && "StarPU needs to be told the MPI rank of this data, using starpu_data_set_rank"' failed.

I do not clearly understand how the mpi_tag must be specified I was under the assumption that the the above is all that is required for starpu to handle the communication automatically. I understand that I can rewrite using starpu_mpi_send and starpu_mpi_recv but I do not explicitly want to specify the communication. I am basically evaluating StarPU vs Intel CnC in a distributed setting. Attaching the implementation for a complete context. 

Any help or inputs are appreciated. 

Thanks,
Ravi Teja.
#include <stdint.h>
#include <inttypes.h>
#include <starpu.h>
#include <starpu_mpi.h>
#include <unistd.h>
#include "../utils/utils.h"
#include "floyd_starpu.h"

#define FPRINTF(ofile, fmt, args ...) do { if (!getenv("STARPU_SSILENT")) {fprintf(ofile, fmt, ##args); }} while(0)
#define TAG(iter, i, j) ((starpu_tag_t) ( ((uint64_t)(iter)<<47) |  ((uint64_t)(j)<<11) | (i)) )

struct params
{
    int k;
    int i;
    int j;
    // This is redundant for every task, but I do not like globals and secondly this 
    // is the way to for a distributed memory setting. CnC has context where the
    // parameters for the task graph can be stored. It can manage this redundancy much
    // better. 
    double   *matrix; 
    int matrix_size;
    int block_size;
};

static struct starpu_perfmodel mult_perf_model = {
    .type = STARPU_HISTORY_BASED,
    .symbol = "mult_perf_model"
};

static void express_deps(int k, int i, int j, int block_size)
{
    if (k > 0)
        starpu_tag_declare_deps(TAG(k, i, j), 3, TAG(k-1, i, k/block_size), TAG(k-1, k/block_size, j), TAG(k-1, i, j)); 
}

static void tag_cleanup(int matrix_size, int block_size)
{
    int i,j,k;
    int num_blocks = matrix_size/block_size;
    for (k = 0; k < matrix_size; k++)
    {
        for (i = 0; i < num_blocks ; i++)
        { 
            for(j = 0; j < num_blocks; j++)
            {
                starpu_tag_remove(TAG(k, i, j));
            }
        }
    }
}

/* Every implementation of a codelet must have this prototype, the first
 * argument (buffers) describes the buffers/streams that are managed by the
 * DSM; the second arguments references read-only data that is passed as an
 * argument of the codelet (task->cl_arg). Here, "buffers" is unused as there
 * are no data input/output managed by the DSM (cl.nbuffers = 0) */

void shared_mem_task(void *buffers[], void *cl_arg)
{
    struct params *params = (struct params *) cl_arg;
    int i, j, k, size, block_size;
    k = params->k;
    double* path_distance_matrix = params->matrix;
    size = params->matrix_size;
    block_size = params->block_size;
    //printf("Task Scheduled(%d %d %d) TAG=%" PRIu64 "\n", params->k, params->i, params->j, TAG(k ,i, j));
    for(i = params->i * block_size; i < (params->i + 1) * block_size; i++)
    {
        for(j = params->j * block_size; j < (params->j + 1) * block_size; j++)
        {
            path_distance_matrix[i*size + j] = min(path_distance_matrix[i*size + k] + path_distance_matrix[k*size + j], path_distance_matrix[i*size + j]);
        }
    }
}

struct starpu_codelet shared_mem_cl = 
{
    .where = STARPU_CPU,
    .cpu_funcs = { shared_mem_task, NULL },
    .nbuffers = 0,
    .model = &mult_perf_model,
    .name = "Floyd"
};

static int create_shared_mem_tasks(int matrix_size, int block_size, struct params* params)
{
    int i,j,k;
    int ret;
    int num_blocks = matrix_size/block_size;
    /* Create non-entry tasks */
    for(k = 0; k < matrix_size; k++)
    {
        for(i = 0; i < num_blocks ; i++)
        { 
            for(j = 0; j < num_blocks; j++)
            {
                /* Create a new task */
                struct starpu_task *task = starpu_task_create();
                task->callback_func      = NULL;
                task->cl                 = &shared_mem_cl;
                task->cl_arg             = &params[k*num_blocks*num_blocks + i*num_blocks + j];
                task->cl_arg_size        = sizeof(struct params);

                task->use_tag = 1;
                task->tag_id = TAG(k, i, j);
                //printf("Task(%d %d %d) TAG=%" PRIu64 "\n", k, i, j, task->tag_id);

                express_deps(k, i, j, block_size);

                ret = starpu_task_submit(task);
                if (ret == -ENODEV) return 77;
                STARPU_CHECK_RETURN_VALUE(ret, "starpu_task_submit");
            }
        }
    }
    return 0;
}

int floyd_shared_mem_starpu(double* path_distance_matrix, int matrix_size, int block_size)
{
    int num_blocks, k, i, j, ret; 
    num_blocks = matrix_size/block_size;   
    // Creating parameters for tasks 
    struct params *params = (struct params*) malloc (sizeof(struct params) * matrix_size * (num_blocks) * (num_blocks));
    for( k = 0; k < matrix_size; k++)
    {
        for( i = 0; i < num_blocks; i++)
        {
            for( j = 0; j < num_blocks; j++)
            {                
                struct params* p = &params[(k*num_blocks*num_blocks) + (i*num_blocks) + j];
                p->k = k; p->i = i; p->j = j;
                p->matrix      = path_distance_matrix;
                p->matrix_size = matrix_size;
                p->block_size  = block_size; 
            }
        }
    }

    /* initialize StarPU : passing a NULL argument means that we use
     * default configuration for the scheduling policies and the number of
     * processors/accelerators */
    ret = starpu_init(NULL);
    if (ret == -ENODEV)
        return 77;
    STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");

    /* Create tasks*/
    ret = create_shared_mem_tasks(matrix_size, block_size, params);
    if (ret == 0)
        starpu_task_wait_for_all();

    tag_cleanup(matrix_size, block_size);

    /* terminate StarPU: statistics and other debug outputs are not
     * guaranteed to be generated unless this function is called. Once it
     * is called, it is not possible to submit tasks anymore, and the user
     * is responsible for making sure all tasks have already been executed:
     * calling starpu_shutdown() before the termination of all the tasks
     * results in an undefined behaviour */
    starpu_shutdown();
    free(params);
    return ret;
}

void dist_task(void *buffers[], void *cl_arg)
{
    int k, i, j, block_size;
    starpu_codelet_unpack_args(cl_arg, &k, &i, &j);

    /* Get block size */
    block_size = STARPU_VECTOR_GET_NX(buffers[0]);

    double *block_ij = (double *)STARPU_VECTOR_GET_PTR(buffers[0]);
    double *block_ik = (double *)STARPU_VECTOR_GET_PTR(buffers[1]);
    double *block_kj = (double *)STARPU_VECTOR_GET_PTR(buffers[2]);

    //printf("Task Scheduled(%d %d %d)\n", k, i, j);
    for (int p = 0; p < block_size; p++){
        for (int q = 0; q < block_size; q++) {
            for (int r = 0; r < block_size; r++) {    
                block_ij[q*block_size +r] = min(block_ik[q*block_size + p] + block_kj[p*block_size + r], block_ij[q*block_size +r]); 
            }
        }
    }
}

struct starpu_codelet dist_cl = 
{
    .where = STARPU_CPU,
    .cpu_funcs = { dist_task, NULL },
    .nbuffers = 3,
    .model = &mult_perf_model,
    .name = "Floyd Dist"
};

static int create_dist_tasks(int matrix_size, int block_size, starpu_data_handle_t * data_handles)
{
    int i,j,k;
    int ret;
    int num_blocks = matrix_size/block_size;
    
    /* Create non-entry tasks */
    for(k = 0; k < matrix_size; k++) {
        for(i = 0; i < num_blocks ; i++) { 
            for(j = 0; j < num_blocks; j++) {
                starpu_mpi_insert_task(MPI_COMM_WORLD, &dist_cl,
                                       STARPU_VALUE, &k,
                                       STARPU_VALUE, &i,
                                       STARPU_VALUE, &j,
                                       STARPU_RW, data_handles[i*num_blocks +j],
                                       STARPU_R,  data_handles[i*num_blocks +(k/block_size)],
                                       STARPU_R,  data_handles[(k/block_size)*num_blocks + j],
                                       0);
            }
        }
    }    
    return 0;
}

/* Returns the MPI node number where data is */
int data_distribution(int x, int y, int num_nodes) {
    
    /* Block distrib */
    //return ((int)(x / sqrt(num_nodes) + (y / sqrt(num_nodes)) * sqrt(num_nodes))) % num_nodes;

    // /* Other examples useful for other kinds of computations */
    // /* / distrib */
    // return (x+y) % num_nodes;

    // /* Block cyclic distrib */
    // unsigned side = sqrt(num_nodes);
    // return x % side + (y % side) * size;
    return 0;
}

int floyd_dist_starpu(double* path_distance_matrix, int matrix_size, int block_size)
{
    int num_blocks, k, i, j, ret, my_rank, num_nodes; 
    num_blocks = matrix_size/block_size;   
    
    /* Initialize StarPU : passing a NULL argument means that we use
     * default configuration for the scheduling policies and the number of
     * processors/accelerators */
    ret = starpu_init(NULL);
    if (ret == -ENODEV)
        return 77;
    STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");

    /* Initializaing StarPU MPI */
    ret = starpu_mpi_initialize_extended(&my_rank, &num_nodes);
    if (ret == -ENODEV)
        return 77;
    STARPU_CHECK_RETURN_VALUE(ret, "starpu_init");


    starpu_data_handle_t* data_handles = (starpu_data_handle_t*) malloc(sizeof(starpu_data_handle_t) * num_blocks * num_blocks);
    double** tiles = (double**) malloc(sizeof(double*) * num_blocks * num_blocks);

    for(i = 0; i < num_blocks; i++) {
        for(j = 0; j < num_blocks; j++) {
            /* Allocate memory for the tile*/
            tiles[i*num_blocks + j] = (double*) malloc (sizeof(double) * block_size * block_size);
            double * tile = tiles[i*num_blocks + j];
            for(int org_i = i*block_size,t_i = 0; t_i < block_size; org_i++,t_i++) {
                for(int org_j = j*block_size,t_j = 0; t_j < block_size; org_j++,t_j++) {
                    tile[t_i*block_size + t_j] = path_distance_matrix[org_i*matrix_size + org_j];
                }
            }
        }
    }

    for(i = 0; i < num_blocks; i++) {
        for (j = 0; j < num_blocks; j++) {
            int mpi_rank = data_distribution(i, j, num_nodes);
            if (mpi_rank == my_rank)
            {
                /* Owning data */
                starpu_vector_data_register(&data_handles[i*num_blocks + j], 0,
                        (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
            }
            else 
            {
                /* This can probably be further optimized */ 
                starpu_vector_data_register(&data_handles[i*num_blocks + j], -1,
                        (uintptr_t)(tiles[i*num_blocks + j]), block_size *block_size, sizeof(double));
            }

            /* Marking where to find what data */
            starpu_data_set_rank(data_handles[i*num_blocks + j], mpi_rank);
        }
    }
   
    /* Create tasks*/
    ret = create_dist_tasks(matrix_size, block_size, data_handles);
    if (ret == 0)
        starpu_task_wait_for_all();

    for(i = 0; i < num_blocks; i++) {
        for (j = 0; j < num_blocks; j++) {
            starpu_data_unregister(data_handles[i*num_blocks + j]);
        }
    }

    /* Collecting all the data at rank 0*/

    /* terminate StarPU: statistics and other debug outputs are not
     * guaranteed to be generated unless this function is called. Once it
     * is called, it is not possible to submit tasks anymore, and the user
     * is responsible for making sure all tasks have already been executed:
     * calling starpu_shutdown() before the termination of all the tasks
     * results in an undefined behaviour */
    starpu_mpi_shutdown();
    starpu_shutdown();
    for (i = 0; i < num_blocks; i++) {
        for (j = 0; j < num_blocks; j++) {
            free(tiles[i*num_blocks + j]);
        }
    }
    free(tiles);
    free(data_handles);
    return ret;
}



Archives gérées par MHonArc 2.6.19+.

Haut de le page