Changes between Version 6 and Version 7 of R-MPI


Ignore:
Timestamp:
May 5, 2008, 2:41:16 PM (17 years ago)
Author:
chris
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • R-MPI

    v6 v7  
    1 * Testing the sample R program from www
    2  * Original R Program (without Rmpi)
    3  * Parallel R Program (with Rmpi)
     1* Testing the sample R program from [here http://ace.acadiau.ca/math/ACMMaC/Rmpi/sample.html]
     2 * Original R Program of 10 loops (without Rmpi)
     3{{{
     4start_time <- Sys.time()
     5# first make some data
     6n <- 1000 # number of obs
     7p <- 30 # number of variables
     8
     9x <- matrix(rnorm(n*p),n,p)
     10beta <- c(rnorm(p/2,0,5),rnorm(p/2,0,.25))
     11y <- x %*% beta + rnorm(n,0,20)
     12thedata <- data.frame(y=y,x=x)
     13
     14# summary(lm(y~x))
     15
     16fold <- rep(1:10,length=n)
     17fold <- sample(fold)
     18
     19rssresult <- matrix(0,p,10)
     20
     21for (j in 1:10) {
     22    for (i in 1:p) {
     23        templm <- lm(y~.,data=thedata[fold!=j,1:(i+1)])
     24        yhat <- predict(templm,newdata=thedata[fold==j,1:(i+1)])
     25        rssresult[i,j] <- sum((yhat-y[fold==j])^2)
     26    }
     27}
     28
     29end_time <- Sys.time()
     30runTime<-difftime(end_time,start_time,units="secs")
     31write(runTime,file="single_10_runTime",append=TRUE)
     32write(rssresult,file="single_10_result")
     33q(save="no")
     34
     35}}}
     36 * Parallel R Program of 10 loops (with Rmpi)
     37{{{
     38# Create Start Timer
     39start_time<-Sys.time()
     40
     41# Initialize MPI
     42library("Rmpi")
     43
     44# Notice we just say "give us all the slaves you've got."
     45mpi.spawn.Rslaves(nslaves=2)
     46
     47if (mpi.comm.size() < 2) {
     48    print("More slave processes are required.")
     49    mpi.quit()
     50    }
     51
     52.Last <- function(){
     53    if (is.loaded("mpi_initialize")){
     54        if (mpi.comm.size(1) > 0){
     55            print("Please use mpi.close.Rslaves() to close slaves.")
     56            mpi.close.Rslaves()
     57        }
     58        print("Please use mpi.quit() to quit R")
     59        .Call("mpi_finalize")
     60    }
     61}
     62
     63# Function the slaves will call to perform a validation on the
     64# fold equal to their slave number.
     65# Assumes: thedata,fold,foldNumber,p
     66foldslave <- function() {
     67    # Note the use of the tag for sent messages:
     68    #     1=ready_for_task, 2=done_task, 3=exiting
     69    # Note the use of the tag for received messages:
     70    #     1=task, 2=done_tasks
     71    junk <- 0
     72
     73    done <- 0
     74    while (done != 1) {
     75        # Signal being ready to receive a new task
     76        mpi.send.Robj(junk,0,1)
     77
     78        # Receive a task
     79        task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
     80        task_info <- mpi.get.sourcetag()
     81        tag <- task_info[2]
     82
     83        if (tag == 1) {
     84            foldNumber <- task$foldNumber
     85
     86            rss <- double(p)
     87            for (i in 1:p) {
     88                # produce a linear model on the first i variables on
     89                # training data
     90                templm <- lm(y~.,data=thedata[fold!=foldNumber,1:(i+1)])
     91
     92                # produce predicted data from test data
     93                yhat <- predict(templm,newdata=thedata[fold==foldNumber,1:(i+1)])
     94
     95                # get rss of yhat-y
     96                localrssresult <- sum((yhat-thedata[fold==foldNumber,1])^2)
     97                rss[i] <- localrssresult
     98                }
     99
     100            # Send a results message back to the master
     101            results <- list(result=rss,foldNumber=foldNumber)
     102            mpi.send.Robj(results,0,2)
     103            }
     104        else if (tag == 2) {
     105            done <- 1
     106            }
     107        # We'll just ignore any unknown messages
     108        }
     109
     110    mpi.send.Robj(junk,0,3)
     111    }
     112
     113# We're in the parent.
     114# first make some data
     115n <- 1000       # number of obs
     116p <- 30         # number of variables
     117
     118# Create data as a set of n samples of p independent variables,
     119# make a "random" beta with higher weights in the front.
     120# Generate y's as y = beta*x + random
     121x <- matrix(rnorm(n*p),n,p)
     122beta <- c(rnorm(p/2,0,5),rnorm(p/2,0,.25))
     123y <- x %*% beta + rnorm(n,0,20)
     124thedata <- data.frame(y=y,x=x)
     125
     126fold <- rep(1:10,length=n)
     127fold <- sample(fold)
     128
     129#summary(lm(y~x))
     130
     131# Now, send the data to the slaves
     132mpi.bcast.Robj2slave(thedata)
     133mpi.bcast.Robj2slave(fold)
     134mpi.bcast.Robj2slave(p)
     135
     136# Send the function to the slaves
     137mpi.bcast.Robj2slave(foldslave)
     138
     139# Call the function in all the slaves to get them ready to
     140# undertake tasks
     141mpi.bcast.cmd(foldslave())
     142
     143
     144# Create task list
     145tasks <- vector('list')
     146for (i in 1:10) {
     147    tasks[[i]] <- list(foldNumber=i)
     148    }
     149
     150# Create data structure to store the results
     151rssresult = matrix(0,p,10)
     152
     153junk <- 0
     154closed_slaves <- 0
     155n_slaves <- mpi.comm.size()-1
     156
     157while (closed_slaves < n_slaves) {
     158    # Receive a message from a slave
     159    message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
     160    message_info <- mpi.get.sourcetag()
     161    slave_id <- message_info[1]
     162    tag <- message_info[2]
     163
     164    if (tag == 1) {
     165        # slave is ready for a task. Give it the next task, or tell it tasks
     166        # are done if there are none.
     167        if (length(tasks) > 0) {
     168            # Send a task, and then remove it from the task list
     169            mpi.send.Robj(tasks[[1]], slave_id, 1);
     170            tasks[[1]] <- NULL
     171            }
     172        else {
     173            mpi.send.Robj(junk, slave_id, 2)
     174            }
     175        }
     176    else if (tag == 2) {
     177        # The message contains results. Do something with the results.
     178        # Store them in the data structure
     179        foldNumber <- message$foldNumber
     180        rssresult[,foldNumber] <- message$result
     181        }
     182    else if (tag == 3) {
     183        # A slave has closed down.
     184        closed_slaves <- closed_slaves + 1
     185        }
     186    }
     187
     188
     189# plot the results
     190# plot(apply(rssresult,1,mean))
     191end_time<-Sys.time()
     192runTime<-difftime(end_time,start_time,units="secs")
     193write(runTime, file="parallel_10_runTime",append=TRUE)
     194write(rssresult, file="parallel_10_result")
     195mpi.close.Rslaves()
     196mpi.quit(save="no")
     197
     198}}}
     199
    4200 * Here are the running times(secs) with different times of loops of origianl R:
    5201{{{