Changeset 25 for sample/hadoop-0.16/tw/org/nchc/demo
- Timestamp:
- Jul 3, 2008, 4:45:54 PM (16 years ago)
- Location:
- sample/hadoop-0.16/tw/org/nchc/demo
- Files:
-
- 17 edited
Legend:
- Unmodified
- Added
- Removed
-
sample/hadoop-0.16/tw/org/nchc/demo/DemoWordCount.java
r21 r25 28 28 import org.apache.hadoop.mapred.Reducer; 29 29 import org.apache.hadoop.mapred.Reporter; 30 31 import tw.org.nchc.code.Convert;32 30 33 31 /** … … 107 105 conf.setNumMapTasks(mapTasks); 108 106 conf.setNumReduceTasks(reduceTasks); 109 //0.16 110 //conf.setInputPath(new Path(filename));111 Convert.setInputPath(conf, new Path(filename)); 107 108 conf.setInputPath(new Path(filename)); 109 112 110 conf.setOutputKeyClass(Text.class); 113 111 conf.setOutputValueClass(IntWritable.class); 114 // 0.16 115 // conf.setOutputPath(new Path(outputPath)); 116 Convert.setInputPath(conf, new Path(outputPath)); 112 113 conf.setOutputPath(new Path(outputPath)); 117 114 conf.setMapperClass(MapClass.class); 118 115 conf.setCombinerClass(ReduceClass.class); … … 121 118 // Delete the output directory if it exists already 122 119 Path outputDir = new Path(outputPath); 123 // 0.16 124 // FileSystem.get(conf).delete(outputDir); 125 FileSystem.get(conf).delete(outputDir,true); 120 FileSystem.get(conf).delete(outputDir); 126 121 JobClient.runJob(conf); 127 122 } -
sample/hadoop-0.16/tw/org/nchc/demo/LogFetcher.java
r21 r25 23 23 import java.text.ParseException; 24 24 25 import org.apache.hadoop.fs.FileStatus; 25 26 import org.apache.hadoop.fs.FileSystem; 26 27 import org.apache.hadoop.fs.Path; … … 40 41 import org.apache.hadoop.mapred.OutputCollector; 41 42 import org.apache.hadoop.mapred.Reporter; 42 43 import tw.org.nchc.code.Convert;44 43 45 44 /** … … 96 95 } 97 96 } 98 97 static public Path[] listPaths(FileSystem fsm,Path path) throws IOException 98 { 99 FileStatus[] fss = fsm.listStatus(path); 100 int length = fss.length; 101 Path[] pi = new Path[length]; 102 for (int i=0 ; i< length; i++) 103 { 104 pi[i] = fss[i].getPath(); 105 } 106 return pi; 107 } 99 108 public static void runMapReduce(String table, String dir) 100 109 throws IOException { … … 106 115 jobConf.set(TABLE, table); 107 116 // my convert function from 0.16 to 0.17 108 Path[] in = Convert.listPaths(fs, InputDir);117 Path[] in = listPaths(fs, InputDir); 109 118 if (fs.isFile(InputDir)) { 110 // 0.16 111 // jobConf.setInputPath(InputDir); 112 Convert.setInputPath(jobConf, InputDir); 119 jobConf.setInputPath(InputDir); 113 120 } else { 114 121 for (int i = 0; i < in.length; i++) { 115 122 if (fs.isFile(in[i])) { 116 // 0.16 117 // jobConf.addInputPath(in[i]); 118 Convert.addInputPath(jobConf,in[i]); 123 jobConf.addInputPath(in[i]); 119 124 } else { 120 125 // my convert function from 0.16 to 0.17 121 Path[] sub = Convert.listPaths(fs, in[i]);126 Path[] sub = listPaths(fs, in[i]); 122 127 for (int j = 0; j < sub.length; j++) { 123 128 if (fs.isFile(sub[j])) { 124 // 0.16 125 // jobConf.addInputPath(sub[j]); 126 Convert.addInputPath(jobConf, sub[j]); 129 jobConf.addInputPath(sub[j]); 127 130 } 128 131 } … … 130 133 } 131 134 } 132 // 0.16 133 // jobConf.setOutputPath(tempDir); 134 Convert.setOutputPath(jobConf, tempDir); 135 jobConf.setOutputPath(tempDir); 135 136 136 137 jobConf.setMapperClass(MapClass.class); … … 142 143 143 144 JobClient.runJob(jobConf); 144 // 0.16 145 // fs.delete(tempDir); 146 fs.delete(tempDir,true); 147 145 146 fs.delete(tempDir); 148 147 fs.close(); 149 148 }
Note: See TracChangeset
for help on using the changeset viewer.