Ignore:
Timestamp:
Jul 3, 2008, 4:45:54 PM (16 years ago)
Author:
waue
Message:

downgrade from 0.17 to 0.16
test for work -> not yet

Location:
sample/hadoop-0.16/tw/org/nchc/demo
Files:
17 edited

Legend:

Unmodified
Added
Removed
  • sample/hadoop-0.16/tw/org/nchc/demo/DemoWordCount.java

    r21 r25  
    2828import org.apache.hadoop.mapred.Reducer;
    2929import org.apache.hadoop.mapred.Reporter;
    30 
    31 import tw.org.nchc.code.Convert;
    3230
    3331/**
     
    107105    conf.setNumMapTasks(mapTasks);
    108106    conf.setNumReduceTasks(reduceTasks);
    109     //0.16
    110 //    conf.setInputPath(new Path(filename));
    111     Convert.setInputPath(conf, new Path(filename));
     107
     108    conf.setInputPath(new Path(filename));
     109
    112110    conf.setOutputKeyClass(Text.class);
    113111    conf.setOutputValueClass(IntWritable.class);
    114     // 0.16
    115 //    conf.setOutputPath(new Path(outputPath));
    116     Convert.setInputPath(conf, new Path(outputPath));
     112
     113    conf.setOutputPath(new Path(outputPath));
    117114    conf.setMapperClass(MapClass.class);
    118115    conf.setCombinerClass(ReduceClass.class);
     
    121118    // Delete the output directory if it exists already
    122119    Path outputDir = new Path(outputPath);
    123     // 0.16
    124 //    FileSystem.get(conf).delete(outputDir);
    125     FileSystem.get(conf).delete(outputDir,true);
     120    FileSystem.get(conf).delete(outputDir);
    126121    JobClient.runJob(conf);
    127122  }
  • sample/hadoop-0.16/tw/org/nchc/demo/LogFetcher.java

    r21 r25  
    2323import java.text.ParseException;
    2424
     25import org.apache.hadoop.fs.FileStatus;
    2526import org.apache.hadoop.fs.FileSystem;
    2627import org.apache.hadoop.fs.Path;
     
    4041import org.apache.hadoop.mapred.OutputCollector;
    4142import org.apache.hadoop.mapred.Reporter;
    42 
    43 import tw.org.nchc.code.Convert;
    4443
    4544/**
     
    9695    }
    9796  }
    98 
     97  static public Path[] listPaths(FileSystem fsm,Path path) throws IOException
     98  {
     99    FileStatus[] fss = fsm.listStatus(path);
     100    int length = fss.length;
     101    Path[] pi = new Path[length];
     102    for (int i=0 ; i< length; i++)
     103    {
     104      pi[i] = fss[i].getPath();
     105    }
     106    return pi;
     107  }
    99108  public static void runMapReduce(String table, String dir)
    100109      throws IOException {
     
    106115    jobConf.set(TABLE, table);
    107116    // my convert function from 0.16 to 0.17
    108     Path[] in = Convert.listPaths(fs, InputDir);
     117    Path[] in = listPaths(fs, InputDir);
    109118    if (fs.isFile(InputDir)) {
    110       // 0.16
    111 //      jobConf.setInputPath(InputDir);
    112       Convert.setInputPath(jobConf, InputDir);
     119      jobConf.setInputPath(InputDir);
    113120    } else {
    114121      for (int i = 0; i < in.length; i++) {
    115122        if (fs.isFile(in[i])) {
    116           // 0.16
    117 //          jobConf.addInputPath(in[i]);
    118           Convert.addInputPath(jobConf,in[i]);
     123          jobConf.addInputPath(in[i]);
    119124        } else {
    120125          // my convert function from 0.16 to 0.17
    121           Path[] sub = Convert.listPaths(fs, in[i]);
     126          Path[] sub = listPaths(fs, in[i]);
    122127          for (int j = 0; j < sub.length; j++) {
    123128            if (fs.isFile(sub[j])) {
    124               // 0.16
    125 //              jobConf.addInputPath(sub[j]);
    126               Convert.addInputPath(jobConf, sub[j]);
     129              jobConf.addInputPath(sub[j]);
    127130            }
    128131          }
     
    130133      }
    131134    }
    132     // 0.16
    133 //    jobConf.setOutputPath(tempDir);
    134     Convert.setOutputPath(jobConf, tempDir);
     135    jobConf.setOutputPath(tempDir);
    135136   
    136137    jobConf.setMapperClass(MapClass.class);
     
    142143
    143144    JobClient.runJob(jobConf);
    144     // 0.16
    145 //    fs.delete(tempDir);
    146     fs.delete(tempDir,true);
    147    
     145
     146    fs.delete(tempDir);   
    148147    fs.close();
    149148  }
Note: See TracChangeset for help on using the changeset viewer.