[20] | 1 | /* |
---|
| 2 | * Cloud9: A MapReduce Library for Hadoop |
---|
| 3 | * |
---|
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); you |
---|
| 5 | * may not use this file except in compliance with the License. You may |
---|
| 6 | * obtain a copy of the License at |
---|
| 7 | * |
---|
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
---|
| 9 | * |
---|
| 10 | * Unless required by applicable law or agreed to in writing, software |
---|
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
---|
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
---|
| 13 | * implied. See the License for the specific language governing |
---|
| 14 | * permissions and limitations under the License. |
---|
| 15 | */ |
---|
| 16 | |
---|
| 17 | package tw.org.nchc.demo; |
---|
| 18 | |
---|
| 19 | import java.io.BufferedReader; |
---|
| 20 | import java.io.FileInputStream; |
---|
| 21 | import java.io.IOException; |
---|
| 22 | import java.io.InputStreamReader; |
---|
| 23 | import java.util.StringTokenizer; |
---|
| 24 | |
---|
| 25 | import org.apache.hadoop.io.Text; |
---|
| 26 | |
---|
| 27 | import tw.org.nchc.tuple.ListWritable; |
---|
| 28 | import tw.org.nchc.tuple.Schema; |
---|
| 29 | import tw.org.nchc.tuple.Tuple; |
---|
| 30 | import tw.org.nchc.util.LocalTupleRecordWriter; |
---|
| 31 | |
---|
| 32 | /** |
---|
| 33 | * <p> |
---|
| 34 | * Demo that packs the sample collection into records using the tuple library, |
---|
| 35 | * illustrating the use of the {@link tw.org.nchc.tuple.Tuple} and |
---|
| 36 | * {@link tw.org.nchc.tuple.ListWritable} classes. The records are stored in |
---|
| 37 | * a local SequenceFile; this file can then be transfered over to HDFS to serve |
---|
| 38 | * as the starting point for a MapReduce operation. |
---|
| 39 | * </p> |
---|
| 40 | * |
---|
| 41 | * <p> |
---|
| 42 | * Each record is a tuple with two fields: |
---|
| 43 | * </p> |
---|
| 44 | * |
---|
| 45 | * <ul> |
---|
| 46 | * |
---|
| 47 | * <li>the first field of the tuple is an Integer with the field name "length"; |
---|
| 48 | * its value is the length of the record in number of characters.</li> |
---|
| 49 | * |
---|
| 50 | * <li>the second field of the tuple is a ListWritable<Text> with the field |
---|
| 51 | * name "tokens"; its value is a list of tokens that comprise the text of the |
---|
| 52 | * record.</li> |
---|
| 53 | * |
---|
| 54 | * </ul> |
---|
| 55 | * |
---|
| 56 | * @see DemoPackRecords |
---|
| 57 | * @see DemoReadPackedRecords2 |
---|
| 58 | */ |
---|
| 59 | public class DemoPackRecords2 { |
---|
| 60 | private DemoPackRecords2() { |
---|
| 61 | } |
---|
| 62 | |
---|
| 63 | // define the tuple schema for the input record |
---|
| 64 | private static final Schema RECORD_SCHEMA = new Schema(); |
---|
| 65 | static { |
---|
| 66 | RECORD_SCHEMA.addField("length", Integer.class); |
---|
| 67 | RECORD_SCHEMA.addField("tokens", ListWritable.class, ""); |
---|
| 68 | } |
---|
| 69 | |
---|
| 70 | // instantiate a single tuple |
---|
| 71 | private static Tuple tuple = RECORD_SCHEMA.instantiate(); |
---|
| 72 | |
---|
| 73 | /** |
---|
| 74 | * Runs the demo. |
---|
| 75 | */ |
---|
| 76 | public static void main(String[] args) throws IOException { |
---|
| 77 | String infile = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc"; |
---|
| 78 | String outfile = "../umd-hadoop-dist/sample-input/bible+shakes.nopunc.packed2"; |
---|
| 79 | |
---|
| 80 | // create LocalTupleRecordWriter to write tuples to a local SequenceFile |
---|
| 81 | LocalTupleRecordWriter writer = new LocalTupleRecordWriter(outfile); |
---|
| 82 | |
---|
| 83 | // read in raw text records, line separated |
---|
| 84 | BufferedReader data = new BufferedReader(new InputStreamReader( |
---|
| 85 | new FileInputStream(infile))); |
---|
| 86 | |
---|
| 87 | String line; |
---|
| 88 | while ((line = data.readLine()) != null) { |
---|
| 89 | ListWritable<Text> tokens = new ListWritable<Text>(); |
---|
| 90 | StringTokenizer itr = new StringTokenizer(line); |
---|
| 91 | while (itr.hasMoreTokens()) { |
---|
| 92 | tokens.add(new Text(itr.nextToken())); |
---|
| 93 | } |
---|
| 94 | |
---|
| 95 | // write the record |
---|
| 96 | tuple.set("length", line.length()); |
---|
| 97 | tuple.set("tokens", tokens); |
---|
| 98 | writer.add(tuple); |
---|
| 99 | } |
---|
| 100 | |
---|
| 101 | data.close(); |
---|
| 102 | writer.close(); |
---|
| 103 | |
---|
| 104 | System.out.println("Wrote " + writer.getRecordCount() + " records."); |
---|
| 105 | } |
---|
| 106 | } |
---|