| 26 | |
| 27 | |
| 28 | {{{ |
| 29 | #!java |
| 30 | |
| 31 | import java.io.IOException; |
| 32 | import java.nio.ByteBuffer; |
| 33 | import java.util.*; |
| 34 | |
| 35 | import org.apache.cassandra.thrift.Column; |
| 36 | import org.apache.cassandra.thrift.ColumnOrSuperColumn; |
| 37 | import org.apache.cassandra.thrift.Mutation; |
| 38 | import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat; |
| 39 | import org.slf4j.Logger; |
| 40 | import org.slf4j.LoggerFactory; |
| 41 | |
| 42 | import static com.google.common.base.Charsets.UTF_8; |
| 43 | |
| 44 | import org.apache.cassandra.db.IColumn; |
| 45 | import org.apache.cassandra.hadoop.ColumnFamilyInputFormat; |
| 46 | import org.apache.cassandra.hadoop.ConfigHelper; |
| 47 | import org.apache.cassandra.thrift.SlicePredicate; |
| 48 | import org.apache.cassandra.utils.ByteBufferUtil; |
| 49 | import org.apache.hadoop.conf.Configuration; |
| 50 | import org.apache.hadoop.conf.Configured; |
| 51 | import org.apache.hadoop.fs.Path; |
| 52 | import org.apache.hadoop.io.IntWritable; |
| 53 | import org.apache.hadoop.io.Text; |
| 54 | import org.apache.hadoop.mapreduce.Job; |
| 55 | import org.apache.hadoop.mapreduce.Mapper; |
| 56 | import org.apache.hadoop.mapreduce.Reducer; |
| 57 | import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| 58 | import org.apache.hadoop.util.Tool; |
| 59 | import org.apache.hadoop.util.ToolRunner; |
| 60 | |
| 61 | /** |
| 62 | * This counts the occurrences of words in ColumnFamily Standard1, that has a single column (that we care about) |
| 63 | * "text" containing a sequence of words. |
| 64 | * |
| 65 | * For each word, we output the total number of occurrences across all texts. |
| 66 | * |
| 67 | * When outputting to Cassandra, we write the word counts as a {word, count} column/value pair, |
| 68 | * with a row key equal to the name of the source column we read the words from. |
| 69 | */ |
| 70 | public class WordCount extends Configured implements Tool |
| 71 | { |
| 72 | private static final Logger logger = LoggerFactory.getLogger(WordCount.class); |
| 73 | |
| 74 | static final String KEYSPACE = "wordcount"; |
| 75 | static final String COLUMN_FAMILY = "input_words"; |
| 76 | |
| 77 | static final String OUTPUT_REDUCER_VAR = "output_reducer"; |
| 78 | static final String OUTPUT_COLUMN_FAMILY = "output_words"; |
| 79 | private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count"; |
| 80 | |
| 81 | private static final String CONF_COLUMN_NAME = "columnname"; |
| 82 | |
| 83 | public static void main(String[] args) throws Exception |
| 84 | { |
| 85 | // Let ToolRunner handle generic command-line options |
| 86 | ToolRunner.run(new Configuration(), new WordCount(), args); |
| 87 | System.exit(0); |
| 88 | } |
| 89 | |
| 90 | public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable> |
| 91 | { |
| 92 | private final static IntWritable one = new IntWritable(1); |
| 93 | private Text word = new Text(); |
| 94 | private ByteBuffer sourceColumn; |
| 95 | |
| 96 | protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context) |
| 97 | throws IOException, InterruptedException |
| 98 | { |
| 99 | sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); |
| 100 | } |
| 101 | |
| 102 | public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException |
| 103 | { |
| 104 | IColumn column = columns.get(sourceColumn); |
| 105 | if (column == null) |
| 106 | return; |
| 107 | String value = ByteBufferUtil.string(column.value()); |
| 108 | logger.debug("read " + key + ":" + value + " from " + context.getInputSplit()); |
| 109 | |
| 110 | StringTokenizer itr = new StringTokenizer(value); |
| 111 | while (itr.hasMoreTokens()) |
| 112 | { |
| 113 | word.set(itr.nextToken()); |
| 114 | context.write(word, one); |
| 115 | } |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable> |
| 120 | { |
| 121 | public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException |
| 122 | { |
| 123 | int sum = 0; |
| 124 | for (IntWritable val : values) |
| 125 | sum += val.get(); |
| 126 | context.write(key, new IntWritable(sum)); |
| 127 | } |
| 128 | } |
| 129 | |
| 130 | public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>> |
| 131 | { |
| 132 | private ByteBuffer outputKey; |
| 133 | |
| 134 | protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context) |
| 135 | throws IOException, InterruptedException |
| 136 | { |
| 137 | outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME)); |
| 138 | } |
| 139 | |
| 140 | public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException |
| 141 | { |
| 142 | int sum = 0; |
| 143 | for (IntWritable val : values) |
| 144 | sum += val.get(); |
| 145 | context.write(outputKey, Collections.singletonList(getMutation(word, sum))); |
| 146 | } |
| 147 | |
| 148 | private static Mutation getMutation(Text word, int sum) |
| 149 | { |
| 150 | Column c = new Column(); |
| 151 | c.setName(Arrays.copyOf(word.getBytes(), word.getLength())); |
| 152 | c.setValue(ByteBufferUtil.bytes(String.valueOf(sum))); |
| 153 | c.setTimestamp(System.currentTimeMillis()); |
| 154 | |
| 155 | Mutation m = new Mutation(); |
| 156 | m.setColumn_or_supercolumn(new ColumnOrSuperColumn()); |
| 157 | m.column_or_supercolumn.setColumn(c); |
| 158 | return m; |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | public int run(String[] args) throws Exception |
| 163 | { |
| 164 | String outputReducerType = "filesystem"; |
| 165 | if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR)) |
| 166 | { |
| 167 | String[] s = args[0].split("="); |
| 168 | if (s != null && s.length == 2) |
| 169 | outputReducerType = s[1]; |
| 170 | } |
| 171 | logger.info("output reducer type: " + outputReducerType); |
| 172 | |
| 173 | for (int i = 0; i < WordCountSetup.TEST_COUNT; i++) |
| 174 | { |
| 175 | String columnName = "text" + i; |
| 176 | getConf().set(CONF_COLUMN_NAME, columnName); |
| 177 | |
| 178 | Job job = new Job(getConf(), "wordcount"); |
| 179 | job.setJarByClass(WordCount.class); |
| 180 | job.setMapperClass(TokenizerMapper.class); |
| 181 | |
| 182 | if (outputReducerType.equalsIgnoreCase("filesystem")) |
| 183 | { |
| 184 | job.setCombinerClass(ReducerToFilesystem.class); |
| 185 | job.setReducerClass(ReducerToFilesystem.class); |
| 186 | job.setOutputKeyClass(Text.class); |
| 187 | job.setOutputValueClass(IntWritable.class); |
| 188 | FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i)); |
| 189 | } |
| 190 | else |
| 191 | { |
| 192 | job.setReducerClass(ReducerToCassandra.class); |
| 193 | |
| 194 | job.setMapOutputKeyClass(Text.class); |
| 195 | job.setMapOutputValueClass(IntWritable.class); |
| 196 | job.setOutputKeyClass(ByteBuffer.class); |
| 197 | job.setOutputValueClass(List.class); |
| 198 | |
| 199 | job.setOutputFormatClass(ColumnFamilyOutputFormat.class); |
| 200 | |
| 201 | ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY); |
| 202 | } |
| 203 | |
| 204 | job.setInputFormatClass(ColumnFamilyInputFormat.class); |
| 205 | |
| 206 | |
| 207 | ConfigHelper.setRpcPort(job.getConfiguration(), "9160"); |
| 208 | ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost"); |
| 209 | ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner"); |
| 210 | ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY); |
| 211 | SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName))); |
| 212 | ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate); |
| 213 | |
| 214 | job.waitForCompletion(true); |
| 215 | } |
| 216 | return 0; |
| 217 | } |
| 218 | } |
| 219 | }}} |