Accéder au contenu.
Menu Sympa

starpu-devel - Re: [Starpu-devel] Assert failure with regenerate flag enabled

Objet : Developers list for StarPU

Archives de la liste

Re: [Starpu-devel] Assert failure with regenerate flag enabled


Chronologique Discussions 
  • From: Jeff Hand <jeffrey.hand@gmail.com>
  • To: Samuel Thibault <samuel.thibault@ens-lyon.org>, Jeff Hand <jeffrey.hand@gmail.com>, Starpu-devel@lists.gforge.inria.fr
  • Subject: Re: [Starpu-devel] Assert failure with regenerate flag enabled
  • Date: Wed, 5 Nov 2014 15:27:06 -0600
  • List-archive: <http://lists.gforge.inria.fr/pipermail/starpu-devel/>
  • List-id: "Developers list. For discussion of new features, code changes, etc." <starpu-devel.lists.gforge.inria.fr>

Here is my test example. 

"#undef FUNC_USLEEP" fails ~100% within 10 minutes.
"#define FUNC_USLEEP 100" I have seen it run up to 20 minutes.

cheers,
Jeff


On Tue, Nov 4, 2014 at 11:15 AM, Samuel Thibault <samuel.thibault@ens-lyon.org> wrote:
Jeff Hand, le Tue 04 Nov 2014 11:05:25 -0600, a écrit :
> I have 4 simple CPU based tasks in series with regenerate flag enabled.  If I
> execute the code normally, I receive the assert below (failure occurs within 5
> seconds).  If I add a 2 msec sleep in the first task, the program executes
> successfully (will operate 10 minutes plus).
>
> Is this expected behaviour?

It is not an expected behavior.  We would gladly integrate this test
case in our testsuite, since we already have some, but they don't
produce such crashes :)

Samuel

#include <starpu.h>
#include <unistd.h>

#define      NX    25600000

//#define DEBUG_CPU_FUNC

float Input[NX];
float GainOutput[NX];
float ConstantOutput[NX];

// Enable if you want to see Number of Task Executions
#define CALLBACKS_ENABLED

//#define FUNC_USLEEP 1 // I have seen crash
//#define FUNC_USLEEP 2 // I have seen crash
//#define FUNC_USLEEP 4 // TBD
//#define FUNC_USLEEP 5 // I have seen crash
//#define FUNC_USLEEP 10 // I have seen crash
//#define FUNC_USLEEP 20 // I have seen crash
//#define FUNC_USLEEP 100 // I have seen it work for 10 minutes and 20 minutes
#undef FUNC_USLEEP // Crashes reliably [simple: ../../src/core/dependencies/cg.c:155: _starpu_notify_cg: Assertion `job_successors->ndeps >= ndeps_completed' failed.]

struct starpu_task *starpuInputTask;
struct starpu_task *starpuGainTask;
struct starpu_task *starpuConstantTask;
struct starpu_task *starpuOutputTask;

size_t inputCount = 0;
void cpu_input_func(void *buffers[], void *cl_arg)
{
    float *outptr = (float *)STARPU_VECTOR_GET_PTR(buffers[0]);
#ifdef DEBUG_CPU_FUNC
    printf("cpu_input_func\n");
    printf("cpu_input_func %.1f\n", outptr[0]);
#endif
#if FUNC_USLEEP
    usleep(FUNC_USLEEP);
#endif
    if(outptr[0] != 2.0)
    {
        printf("cpu_input_func  Failed!!\n");
        exit(-1);
    }
}

void cpu_input_callback(void *callback_arg)
{
    inputCount++;
}

void cpu_gain_func(void *buffers[], void *cl_arg)
{
    float *inptr = (float *)STARPU_VECTOR_GET_PTR(buffers[0]);
    float *outptr = (float *)STARPU_VECTOR_GET_PTR(buffers[1]);
    int length = STARPU_VECTOR_GET_NX(static_cast<starpu_vector_interface *> (buffers[0]));

    for(int i=0;i<length;i++)
    {
        outptr[i] = inptr[i] * 10;
    }
}

size_t gainCount = 0;
void cpu_gain_callback(void *callback_arg)
{
    gainCount++;
}

void cpu_constant_func(void *buffers[], void *cl_arg)
{
    float *inptr = (float *)STARPU_VECTOR_GET_PTR(buffers[0]);
    float *outptr = (float *)STARPU_VECTOR_GET_PTR(buffers[1]);
    int length = STARPU_VECTOR_GET_NX(static_cast<starpu_vector_interface *> (buffers[0]));

    for(int i=0;i<length;i++)
    {
        outptr[i] = inptr[i] + 10;
    }
}

size_t constantCount = 0;
void cpu_constant_callback(void *callback_arg)
{
    constantCount++;
}

size_t outputCount = 0;
void cpu_output_func(void *buffers[], void *cl_arg)
{
    float *inptr = (float *)STARPU_VECTOR_GET_PTR(buffers[0]);
#ifdef DEBUG_CPU_FUNC
    printf("cpu_output_func\n");
    printf("cpu_output_func %.1f\n", inptr[0]);
#endif
    if(inptr[0] != 30.0)
    {

        printf("cpu_output_func  Failed!!\n");
        exit(-1);
    }
}

void cpu_output_callback(void *callback_arg)
{
    outputCount++;
}

struct starpu_codelet starpuInputCodelet;
struct starpu_codelet starpuGainCodelet;
struct starpu_codelet starpuConstantCodelet;
struct starpu_codelet starpuOutputCodelet;


static struct starpu_perfmodel mult_perf_model;

void ReportTask(struct starpu_codelet *starpuCodelet, struct starpu_task *starpuTask);
void ReportCodelet(struct starpu_codelet *starpuCodelet);

void Report(struct starpu_codelet *starpuCodelet, struct starpu_task *starpuTask)
{
    printf("\n Report Task: \n");
    printf("     inputCount = %ld\n", inputCount);
    printf("     gainCount = %ld\n", gainCount);
    printf("     constantCount = %ld\n", constantCount);
    printf("     outputCount = %ld\n", outputCount);

    ReportTask(starpuCodelet, starpuTask);
    ReportCodelet(starpuCodelet);

}

void ReportTask(struct starpu_codelet *starpuCodelet, struct starpu_task *starpuTask)
{
    printf("\n   starpuTask->\n");
    printf("     cl = %lx\n", (unsigned long int)starpuTask->cl);
    printf("     cl_arg = %lx\n", (unsigned long int)starpuTask->cl_arg);
    printf("     cl_arg_size = %lx\n", (unsigned long int)starpuTask->cl_arg_size);
    printf("     callback_func = %lx\n", (unsigned long int)starpuTask->callback_func);
    printf("     callback_arg = %lx\n", (unsigned long int)starpuTask->callback_arg);
    printf("     priority = %x\n", starpuTask->priority);
    printf("     use_tag = %x\n", starpuTask->use_tag);
    printf("     synchronous = %x\n", starpuTask->synchronous);
    printf("     execute_on_a_specific_worker = %x\n", starpuTask->execute_on_a_specific_worker);
    printf("     bundle = %lx\n", (unsigned long int)starpuTask->bundle);
    printf("     detach = %x\n", starpuTask->detach);
    printf("     destroy = %x\n", starpuTask->destroy);
    printf("     regenerate = %x\n", starpuTask->regenerate);
#if 0
    printf("     status = %x\n", starpuTask->status);
#else
    printf("     status = ");
    switch(starpuTask->status)
    {
        case STARPU_TASK_INVALID:
            printf("STARPU_TASK_INVALID");
            break;
        case STARPU_TASK_BLOCKED:
            printf("STARPU_TASK_BLOCKED");
            break;
        case STARPU_TASK_READY:
            printf("STARPU_TASK_READY");
            break;
        case STARPU_TASK_RUNNING:
            printf("STARPU_TASK_RUNNING");
            break;
        case STARPU_TASK_FINISHED:
            printf("STARPU_TASK_FINISHED");
            break;
        case STARPU_TASK_BLOCKED_ON_TAG:
            printf("STARPU_TASK_BLOCKED_ON_TAG");
            break;
        case STARPU_TASK_BLOCKED_ON_TASK:
            printf("STARPU_TASK_BLOCKED_ON_TASK");
            break;
        case STARPU_TASK_BLOCKED_ON_DATA:
            printf("STARPU_TASK_BLOCKED_ON_DATA");
            break;
    }
    printf("\n");
#endif
    printf("     profiling_info = %lx\n", (unsigned long int)starpuTask->profiling_info);
    printf("     predicted = %f\n", starpuTask->predicted);
    printf("     predicted_transfer = %f\n", starpuTask->predicted_transfer);
    printf("     starpu_private = %lx\n", (unsigned long int)starpuTask->starpu_private);
    printf("     magic = %x\n", starpuTask->magic);
    printf("     sched_ctx = %x\n", starpuTask->sched_ctx);
    printf("     hypervisor_tag = %x\n", starpuTask->hypervisor_tag);
    printf("     flops = %f\n", starpuTask->flops);
    printf("     scheduled = %x\n", starpuTask->scheduled);

    for(unsigned int i=0;i<starpuCodelet->nbuffers;i++)
    {
        printf( "     handles[%d] = %lx\n", i, (unsigned long int)starpuTask->handles[i]);
    }

    printf("     dyn_handles = %lx\n", (unsigned long int)starpuTask->dyn_handles);
    printf("     dyn_interfaces = %lx\n", (unsigned long int)starpuTask->dyn_interfaces);

}

void ReportCodelet(struct starpu_codelet *starpuCodelet)
{
    printf("\n   starpuCodelet->\n");
    printf("     where =");
    if (starpuCodelet->where == 0)
    {
        printf(" None");
    }
    if (starpuCodelet->where & STARPU_CPU)
    {
        printf(" STARPU_CPU");
    }
    if (starpuCodelet->where & STARPU_OPENCL)
    {
        printf(" STARPU_OPENCL");
    }
    if (starpuCodelet->where & STARPU_CUDA)
    {
        printf(" STARPU_CUDA");
    }
    printf("\n");

    printf("     type =");;
    switch(starpuCodelet->type)
    {
        case STARPU_SEQ:
            printf(" STARPU_SEQ");
            break;
        case STARPU_SPMD:
            printf(" STARPU_SPMD");
            break;
        case STARPU_FORKJOIN:
            printf(" STARPU_FORKJOIN");
            break;
    }
    printf("\n");
    printf("     cpu_funcs[0] = %lx \n", (unsigned long int)starpuCodelet->cpu_funcs[0]);
    printf("     cpu_funcs[1] = %lx \n", (unsigned long int)starpuCodelet->cpu_funcs[1]);
    printf("     opencl_funcs[0] = %lx \n", (unsigned long int)starpuCodelet->opencl_funcs[0]);
    printf("     opencl_funcs[1] = %lx \n", (unsigned long int)starpuCodelet->opencl_funcs[1]);

    for(unsigned int i=0;i<starpuCodelet->nbuffers;i++)
    {
        switch(starpuCodelet->modes[i])
        {
        case STARPU_NONE:
            printf("     modes[%d] = STARPU_NONE\n", i);
            break;
        case STARPU_RW:
            printf("     modes[%d] = STARPU_RW\n", i);
            break;
        case STARPU_R:
            printf("     modes[%d] = STARPU_R\n", i);
            break;
        case STARPU_W:
            printf("     modes[%d] = STARPU_W\n", i);
            break;
        case STARPU_SCRATCH:
            printf("     modes[%d] = STARPU_SCRATCH\n", i);
            break;
        case STARPU_REDUX:
            printf("     modes[%d] = STARPU_REDUX\n", i);
            break;
        }
    }

    printf("     model = %lx\n", (unsigned long)starpuCodelet->model);
    printf("     nbuffers = %d\n", starpuCodelet->nbuffers);


}

void SetAllRegenerate(int regenerate)
{
    starpuInputTask->regenerate = regenerate;
    starpuGainTask->regenerate = regenerate;
    starpuConstantTask->regenerate = regenerate;
    starpuOutputTask->regenerate = regenerate;
}

void InitializeInputProgram(void)
{
    starpuInputCodelet.where = STARPU_CPU;
    starpuInputCodelet.nbuffers = 1;
    starpuInputCodelet.modes[0] = STARPU_RW;
    starpuInputCodelet.cpu_funcs[0] = (starpu_cpu_func_t) cpu_input_func;
    starpuInputCodelet.cpu_funcs[1] = NULL;
    starpuInputCodelet.opencl_funcs[0] = NULL;
    starpuInputCodelet.model = &mult_perf_model;
}

void InitializeGainProgram(void)
{
    starpuGainCodelet.where = STARPU_CPU;
    starpuGainCodelet.nbuffers = 2;
    starpuGainCodelet.modes[0] = STARPU_RW;
    starpuGainCodelet.modes[1] = STARPU_RW;
    starpuGainCodelet.cpu_funcs[0] = (starpu_cpu_func_t) cpu_gain_func;
    starpuGainCodelet.cpu_funcs[1] = NULL;
    starpuGainCodelet.model = &mult_perf_model;
}

void InitializeConstantProgram(void)
{
    starpuConstantCodelet.where = STARPU_CPU;
    starpuConstantCodelet.nbuffers = 2;
    starpuConstantCodelet.modes[0] = STARPU_RW;
    starpuConstantCodelet.modes[1] = STARPU_RW;
    starpuConstantCodelet.cpu_funcs[0] = (starpu_cpu_func_t) cpu_constant_func;
    starpuConstantCodelet.cpu_funcs[1] = NULL;
    starpuConstantCodelet.model = &mult_perf_model;
}

void InitializeOutputProgram(void)
{
    starpuOutputCodelet.where = STARPU_CPU;
    starpuOutputCodelet.nbuffers = 1;
    starpuOutputCodelet.modes[0] = STARPU_RW;
    starpuOutputCodelet.cpu_funcs[0] = (starpu_cpu_func_t) cpu_output_func;
    starpuOutputCodelet.cpu_funcs[1] = NULL;
    starpuOutputCodelet.opencl_funcs[0] = NULL;
    starpuOutputCodelet.model = &mult_perf_model;
}


int TestOne(size_t testTime)
{
    int ret = 0;

    printf("Troubleshooting StarPU Tasks\n");

    mult_perf_model.type = STARPU_HISTORY_BASED;//STARPU_NL_REGRESSION_BASED; //STARPU_REGRESSION_BASED;//STARPU_HISTORY_BASED;
    mult_perf_model.symbol = "mult_perf_model";

    struct starpu_conf conf;
    ret = starpu_conf_init(&conf);
    if(ret)
        printf("%s %s:%d starpu_conf_init err: %d\n", __FILE__, __FUNCTION__, __LINE__, ret);

    ret = starpu_init(&conf);
    if(ret)
        printf("%s %s:%d starpu_init err: %d\n", __FILE__, __FUNCTION__, __LINE__, ret);


    starpu_data_handle_t vectorInputHandle;
    /*
     * Home node is 0. If it was set to -1, the handle would have been dynamically created and destroyed
     * The data would be freed upon the unregister
     */
    starpu_vector_data_register(&vectorInputHandle, 0, (uintptr_t)Input, NX, sizeof(Input[0]));

    starpu_data_handle_t vectorGainOutputHandle;
    /*
     * Home node is 0. If it was set to -1, the handle would have been dynamically created and destroyed
     * The data would be freed upon the unregister
     */
    starpu_vector_data_register(&vectorGainOutputHandle, 0, (uintptr_t)GainOutput, NX, sizeof(GainOutput[0]));

    starpu_data_handle_t vectorConstantOutputHandle;
    /*
     * Home node is 0. If it was set to -1, the handle would have been dynamically created and destroyed
     * The data would be freed upon the unregister
     */
    starpu_vector_data_register(&vectorConstantOutputHandle, 0, (uintptr_t)ConstantOutput, NX, sizeof(ConstantOutput[0]));

    /***********************************************************
     * Task One
     ***********************************************************/
    for (int i = 0; i < NX; i++)
    {
        Input[i] = 2.0f;
        GainOutput[i] = 0;
        ConstantOutput[i] = 0;
    }
    printf("Initial Values: \n");
    printf("   Input[0]: %.1f \n", Input[0]);
    printf("   GainOutput[0]: %.1f \n", GainOutput[0]);
    printf("   ConstantOutput[0]: %.1f \n", ConstantOutput[0]);

    /********************* Input Task *********************/
    starpuInputTask = starpu_task_create( );
    inputCount = 0;
    starpuInputTask->cl = &starpuInputCodelet;
    InitializeInputProgram();
    starpuInputTask->detach = 1;
    starpuInputTask->destroy = 0;

#ifdef CALLBACKS_ENABLED
    starpuInputTask->callback_func = cpu_input_callback;
    starpuInputTask->callback_arg = (void *)0x42;
#endif

    /* starpu_submit_task will be a blocking call */
    starpuInputTask->synchronous = 0;

    starpuInputTask->handles[0] = vectorInputHandle;
    starpuInputTask->cl_arg = 0;
    starpuInputTask->cl_arg_size = 0;

    /********************* Gain Task *********************/
    starpuGainTask = starpu_task_create( );
    gainCount = 0;
    starpuGainTask->cl = &starpuGainCodelet;
    InitializeGainProgram();
    starpuGainTask->detach = 1;
    starpuGainTask->destroy = 0;

#ifdef CALLBACKS_ENABLED
    starpuGainTask->callback_func = cpu_gain_callback;
    starpuGainTask->callback_arg = (void *)0x42;
#endif

    /* starpu_submit_task will be a blocking call */
    starpuGainTask->synchronous = 0;

    starpuGainTask->handles[0] = vectorInputHandle;
    starpuGainTask->handles[1] = vectorGainOutputHandle;
    void *arg_buffer;
    size_t arg_buffer_size;
    float factor = 5;
    starpu_codelet_pack_args( &arg_buffer,
                              &arg_buffer_size,
                              STARPU_VALUE, &factor, sizeof(factor),
                              0);
    starpuGainTask->cl_arg = arg_buffer;
    starpuGainTask->cl_arg_size = arg_buffer_size;


    /********************* Constant Task *********************/
    starpuConstantTask = starpu_task_create( );
    constantCount = 0;
    starpuConstantTask->cl = &starpuConstantCodelet;
    InitializeConstantProgram();
    starpuConstantTask->detach = 1;
    starpuConstantTask->destroy = 0;

#ifdef CALLBACKS_ENABLED
    starpuConstantTask->callback_func = cpu_constant_callback;
    starpuConstantTask->callback_arg = (void *)0x42;
#endif

    /* starpu_submit_task will be a blocking call */
    starpuConstantTask->synchronous = 0;

    starpuConstantTask->handles[0] = vectorGainOutputHandle;
    starpuConstantTask->handles[1] = vectorConstantOutputHandle;
    void *arg_buffer2;
    size_t arg_buffer_size2;
    float constant = 5;
    starpu_codelet_pack_args( &arg_buffer2,
                              &arg_buffer_size2,
                              STARPU_VALUE, &constant, sizeof(constant),
                              0);
    starpuConstantTask->cl_arg = arg_buffer2;
    starpuConstantTask->cl_arg_size = arg_buffer_size2;

    /********************* Output Task *********************/
    starpuOutputTask = starpu_task_create( );
    outputCount = 0;
    starpuOutputTask->cl = &starpuOutputCodelet;
    InitializeOutputProgram();
    starpuOutputTask->detach = 1;
    starpuOutputTask->destroy = 0;

#ifdef CALLBACKS_ENABLED
    starpuOutputTask->callback_func = cpu_output_callback;
    starpuOutputTask->callback_arg = (void *)0x42;
#endif

    /* starpu_submit_task will be a blocking call */
    starpuOutputTask->synchronous = 0;

    starpuOutputTask->handles[0] = vectorConstantOutputHandle;
    starpuOutputTask->cl_arg = 0;
    starpuOutputTask->cl_arg_size = 0;

    /********************* Submit Tasks *********************/
    starpu_data_set_sequential_consistency_flag(vectorInputHandle, 0);
    starpu_data_set_sequential_consistency_flag(vectorGainOutputHandle, 0);
    starpu_data_set_sequential_consistency_flag(vectorConstantOutputHandle, 0);

    starpu_task_declare_deps_array ( starpuGainTask , 1, &starpuInputTask );
    starpu_task_declare_deps_array ( starpuConstantTask , 1, &starpuGainTask );
    starpu_task_declare_deps_array ( starpuOutputTask , 1, &starpuConstantTask );

    SetAllRegenerate(1);

    /********************* Submit Tasks *********************/
    ret = starpu_task_submit(starpuInputTask);
    if(ret)
        printf("%s %s:%d starpu_task_submit err: %d\n", __FILE__, __FUNCTION__, __LINE__, ret);

    ret = starpu_task_submit(starpuConstantTask);
    if(ret)
        printf("%s %s:%d starpu_task_submit err: %d\n", __FILE__, __FUNCTION__, __LINE__, ret);


    ret = starpu_task_submit(starpuGainTask);
    if(ret)
        printf("%s %s:%d starpu_task_submit err: %d\n", __FILE__, __FUNCTION__, __LINE__, ret);


    ret = starpu_task_submit(starpuOutputTask);
    if(ret)
        printf("%s %s:%d starpu_task_submit err: %d\n", __FILE__, __FUNCTION__, __LINE__, ret);

    printf("%s %s:%d Running\n", __FILE__, __FUNCTION__, __LINE__);

    Report(&starpuInputCodelet, starpuInputTask);

    /* Wait for Period of time */
    sleep(testTime);

    SetAllRegenerate(0);

    starpu_task_wait_for_all();

    starpu_data_unregister(vectorInputHandle);
    starpu_data_unregister(vectorGainOutputHandle);
    starpu_data_unregister(vectorConstantOutputHandle);


    /* terminate StarPU */
    starpu_shutdown();

    printf("Results: \n");
    printf("   GainInput[0]: %.1f\n", Input[0]);
    printf("   GainOutput[0]: %.1f\n", GainOutput[0]);
    printf("   ConstantOutput[0]: %.1f\n", ConstantOutput[0]);

    printf("\n   inputCount: %ld\n", inputCount);
    printf("   gainCount: %ld\n", gainCount);
    printf("   constantCount: %ld\n", constantCount);
    printf("   outputCount: %ld\n", outputCount);

    return 0;
}

int main ( int argc, char *argv[] )
{
    int i = 0;

    /* Testing for 10 minutes */
    if(TestOne(600))
    {
        printf("Tests Failed\n");
        return -1;
    }
    printf("Test:  %d - Passed\n", i+1);

}






Archives gérées par MHonArc 2.6.19+.

Haut de le page