134 | | |
| 142 | Mapper.Context 就有宣告了getInputSplit();方法,因此使用 context 幾乎可以滿足之前的 ''' OutputCollector 與 Reporter ''' |
| 143 | |
| 144 | {{{ |
| 145 | #!java |
| 146 | public class WordIndex { |
| 147 | public static class wordindexM extends |
| 148 | Mapper<LongWritable, Text, Text, Text> { |
| 149 | public void map(LongWritable key, Text value, Context context) |
| 150 | throws IOException, InterruptedException { |
| 151 | |
| 152 | FileSplit fileSplit = (FileSplit) context.getInputSplit(); |
| 153 | |
| 154 | Text map_key = new Text(); |
| 155 | Text map_value = new Text(); |
| 156 | String line = value.toString(); |
| 157 | StringTokenizer st = new StringTokenizer(line.toLowerCase()); |
| 158 | while (st.hasMoreTokens()) { |
| 159 | String word = st.nextToken(); |
| 160 | map_key.set(word); |
| 161 | map_value.set(fileSplit.getPath().getName() + ":" + line); |
| 162 | context.write(map_key, map_value); |
| 163 | } |
| 164 | } |
| 165 | } |
| 166 | |
| 167 | |
| 168 | static public class wordindexR extends Reducer<Text, Text, Text, Text> { |
| 169 | |
| 170 | public void reduce(Text key, Iterable<Text> values, |
| 171 | OutputCollector<Text, Text> output, Reporter reporter) |
| 172 | throws IOException { |
| 173 | String v = ""; |
| 174 | StringBuilder ret = new StringBuilder("\n"); |
| 175 | for (Text val : values) { |
| 176 | v += val.toString().trim(); |
| 177 | if (v.length() > 0) |
| 178 | ret.append(v + "\n"); |
| 179 | } |
| 180 | |
| 181 | output.collect((Text) key, new Text(ret.toString())); |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | public static void main(String[] args) throws IOException, |
| 186 | InterruptedException, ClassNotFoundException { |
| 187 | // debug using |
| 188 | String[] argv = { "/user/waue/input", "/user/waue/output-wi" }; |
| 189 | args = argv; |
| 190 | |
| 191 | Configuration conf = new Configuration(); |
| 192 | String[] otherArgs = new GenericOptionsParser(conf, args) |
| 193 | .getRemainingArgs(); |
| 194 | if (otherArgs.length < 2) { |
| 195 | System.out.println("hadoop jar WordIndex.jar <inDir> <outDir>"); |
| 196 | return; |
| 197 | } |
| 198 | Job job = new Job(conf, "word index"); |
| 199 | job.setJobName("word inverted index"); |
| 200 | job.setJarByClass(WordIndex.class); |
| 201 | |
| 202 | job.setMapOutputKeyClass(Text.class); |
| 203 | job.setMapOutputValueClass(Text.class); |
| 204 | job.setOutputKeyClass(Text.class); |
| 205 | job.setOutputValueClass(Text.class); |
| 206 | |
| 207 | job.setMapperClass(wordindexM.class); |
| 208 | job.setReducerClass(wordindexR.class); |
| 209 | job.setCombinerClass(wordindexR.class); |
| 210 | |
| 211 | FileInputFormat.setInputPaths(job, args[0]); |
| 212 | HelloHadoopV2.checkAndDelete(args[1], conf); |
| 213 | FileOutputFormat.setOutputPath(job, new Path(args[1])); |
| 214 | |
| 215 | long start = System.nanoTime(); |
| 216 | |
| 217 | job.waitForCompletion(true); |
| 218 | |
| 219 | long time = System.nanoTime() - start; |
| 220 | System.err.println(time * (1E-9) + " secs."); |
| 221 | } |
| 222 | } |
| 223 | |
| 224 | }}} |