wiki:R-MPI

Version 12 (modified by chris, 17 years ago) (diff)

--

  • The sample R program is from here
    • Original R Program of 10 loops (without Rmpi)
      start_time <- Sys.time()
      # first make some data
      n <- 1000 # number of obs
      p <- 30 # number of variables
      
      x <- matrix(rnorm(n*p),n,p)
      beta <- c(rnorm(p/2,0,5),rnorm(p/2,0,.25))
      y <- x %*% beta + rnorm(n,0,20)
      thedata <- data.frame(y=y,x=x)
      
      # summary(lm(y~x))
      
      fold <- rep(1:10,length=n)
      fold <- sample(fold)
      
      rssresult <- matrix(0,p,10)
      
      for (j in 1:10) {
          for (i in 1:p) {
              templm <- lm(y~.,data=thedata[fold!=j,1:(i+1)])
              yhat <- predict(templm,newdata=thedata[fold==j,1:(i+1)])
              rssresult[i,j] <- sum((yhat-y[fold==j])^2)
          }
      }
      
      end_time <- Sys.time()
      runTime<-difftime(end_time,start_time,units="secs")
      write(runTime,file="single_10_runTime",append=TRUE)
      write(rssresult,file="single_10_result")
      q(save="no")
      
      
    • Parallel R Program of 10 loops (with Rmpi)
      # Create Start Timer
      start_time<-Sys.time()
      
      # Initialize MPI
      library("Rmpi")
      
      # Notice we just say "give us all the slaves you've got."
      mpi.spawn.Rslaves(nslaves=2)
      
      if (mpi.comm.size() < 2) {
          print("More slave processes are required.")
          mpi.quit()
          }
      
      .Last <- function(){
          if (is.loaded("mpi_initialize")){
              if (mpi.comm.size(1) > 0){
                  print("Please use mpi.close.Rslaves() to close slaves.")
                  mpi.close.Rslaves()
              }
              print("Please use mpi.quit() to quit R")
              .Call("mpi_finalize")
          }
      }
      
      # Function the slaves will call to perform a validation on the
      # fold equal to their slave number.
      # Assumes: thedata,fold,foldNumber,p
      foldslave <- function() {
          # Note the use of the tag for sent messages:
          #     1=ready_for_task, 2=done_task, 3=exiting
          # Note the use of the tag for received messages:
          #     1=task, 2=done_tasks
          junk <- 0
      
          done <- 0
          while (done != 1) {
              # Signal being ready to receive a new task
              mpi.send.Robj(junk,0,1)
      
              # Receive a task
              task <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
              task_info <- mpi.get.sourcetag()
              tag <- task_info[2]
      
              if (tag == 1) {
                  foldNumber <- task$foldNumber
      
                  rss <- double(p)
                  for (i in 1:p) {
                      # produce a linear model on the first i variables on
                      # training data
                      templm <- lm(y~.,data=thedata[fold!=foldNumber,1:(i+1)])
      
                      # produce predicted data from test data
                      yhat <- predict(templm,newdata=thedata[fold==foldNumber,1:(i+1)])
      
                      # get rss of yhat-y
                      localrssresult <- sum((yhat-thedata[fold==foldNumber,1])^2)
                      rss[i] <- localrssresult
                      }
      
                  # Send a results message back to the master
                  results <- list(result=rss,foldNumber=foldNumber)
                  mpi.send.Robj(results,0,2)
                  }
              else if (tag == 2) {
                  done <- 1
                  }
              # We'll just ignore any unknown messages
              }
      
          mpi.send.Robj(junk,0,3)
          }
      
      # We're in the parent.
      # first make some data
      n <- 1000       # number of obs
      p <- 30         # number of variables
      
      # Create data as a set of n samples of p independent variables,
      # make a "random" beta with higher weights in the front.
      # Generate y's as y = beta*x + random
      x <- matrix(rnorm(n*p),n,p)
      beta <- c(rnorm(p/2,0,5),rnorm(p/2,0,.25))
      y <- x %*% beta + rnorm(n,0,20)
      thedata <- data.frame(y=y,x=x)
      
      fold <- rep(1:10,length=n)
      fold <- sample(fold)
      
      #summary(lm(y~x))
      
      # Now, send the data to the slaves
      mpi.bcast.Robj2slave(thedata)
      mpi.bcast.Robj2slave(fold)
      mpi.bcast.Robj2slave(p)
      
      # Send the function to the slaves
      mpi.bcast.Robj2slave(foldslave)
      
      # Call the function in all the slaves to get them ready to
      # undertake tasks
      mpi.bcast.cmd(foldslave())
      
      
      # Create task list
      tasks <- vector('list')
      for (i in 1:10) {
          tasks[[i]] <- list(foldNumber=i)
          }
      
      # Create data structure to store the results
      rssresult = matrix(0,p,10)
      
      junk <- 0
      closed_slaves <- 0
      n_slaves <- mpi.comm.size()-1
      
      while (closed_slaves < n_slaves) {
          # Receive a message from a slave
          message <- mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
          message_info <- mpi.get.sourcetag()
          slave_id <- message_info[1]
          tag <- message_info[2]
      
          if (tag == 1) {
              # slave is ready for a task. Give it the next task, or tell it tasks
              # are done if there are none.
              if (length(tasks) > 0) {
                  # Send a task, and then remove it from the task list
                  mpi.send.Robj(tasks[[1]], slave_id, 1);
                  tasks[[1]] <- NULL
                  }
              else {
                  mpi.send.Robj(junk, slave_id, 2)
                  }
              }
          else if (tag == 2) {
              # The message contains results. Do something with the results.
              # Store them in the data structure
              foldNumber <- message$foldNumber
              rssresult[,foldNumber] <- message$result
              }
          else if (tag == 3) {
              # A slave has closed down.
              closed_slaves <- closed_slaves + 1
              }
          }
      
      
      # plot the results
      # plot(apply(rssresult,1,mean))
      end_time<-Sys.time()
      runTime<-difftime(end_time,start_time,units="secs")
      write(runTime, file="parallel_10_runTime",append=TRUE)
      write(rssresult, file="parallel_10_result")
      mpi.close.Rslaves()
      mpi.quit(save="no")
      
      
  • Here are the running times(secs) with different times of loops of origianl R:
    10 loops : 1.087226
    20 loops : 3.395167
    30 loops : 7.027248
    40 loops : 11.95344
    50 loops : 17.02942
    60 loops : 23.54457
    70 loops : 32.23314
    80 loops : 42.41104
    90 loops : 53.88992
    100 loops : 64.43279
    
  • And Here are the running times(secs) with diffrent times of loops of PARALLEL R (Rmpi):
    10 loops : 1.759266
    20 loops : 2.582305
    30 loops : 4.262631
    40 loops : 6.176133
    50 loops : 8.695369
    60 loops : 12.00457
    70 loops : 16.19619
    80 loops : 20.33303
    90 loops : 25.45973
    100 loops : 30.71952
    
  • The Curve is as below:
    The Y-aix represents runtime of both single mode and parallel mode.(units: seconds)
    The X-aix represents how many times of loop set by us in the sample program (from 10 to 100)

  • We could get a conclusion : more times of loop, more useful of Rmpi.

Attachments (3)

Download all attachments as: .zip