Changes between Version 2 and Version 3 of waue/2011/0708


Ignore:
Timestamp:
Jul 8, 2011, 1:26:54 PM (13 years ago)
Author:
waue
Comment:

--

Legend:

Unmodified
Added
Removed
Modified
  • waue/2011/0708

    v2 v3  
    2424> list output_words;
    2525}}}
     26
     27
     28{{{
     29#!java
     30
     31import java.io.IOException;
     32import java.nio.ByteBuffer;
     33import java.util.*;
     34
     35import org.apache.cassandra.thrift.Column;
     36import org.apache.cassandra.thrift.ColumnOrSuperColumn;
     37import org.apache.cassandra.thrift.Mutation;
     38import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat;
     39import org.slf4j.Logger;
     40import org.slf4j.LoggerFactory;
     41
     42import static com.google.common.base.Charsets.UTF_8;
     43
     44import org.apache.cassandra.db.IColumn;
     45import org.apache.cassandra.hadoop.ColumnFamilyInputFormat;
     46import org.apache.cassandra.hadoop.ConfigHelper;
     47import org.apache.cassandra.thrift.SlicePredicate;
     48import org.apache.cassandra.utils.ByteBufferUtil;
     49import org.apache.hadoop.conf.Configuration;
     50import org.apache.hadoop.conf.Configured;
     51import org.apache.hadoop.fs.Path;
     52import org.apache.hadoop.io.IntWritable;
     53import org.apache.hadoop.io.Text;
     54import org.apache.hadoop.mapreduce.Job;
     55import org.apache.hadoop.mapreduce.Mapper;
     56import org.apache.hadoop.mapreduce.Reducer;
     57import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
     58import org.apache.hadoop.util.Tool;
     59import org.apache.hadoop.util.ToolRunner;
     60
     61/**
     62 * This counts the occurrences of words in ColumnFamily Standard1, that has a single column (that we care about)
     63 * "text" containing a sequence of words.
     64 *
     65 * For each word, we output the total number of occurrences across all texts.
     66 *
     67 * When outputting to Cassandra, we write the word counts as a {word, count} column/value pair,
     68 * with a row key equal to the name of the source column we read the words from.
     69 */
     70public class WordCount extends Configured implements Tool
     71{
     72    private static final Logger logger = LoggerFactory.getLogger(WordCount.class);
     73
     74    static final String KEYSPACE = "wordcount";
     75    static final String COLUMN_FAMILY = "input_words";
     76
     77    static final String OUTPUT_REDUCER_VAR = "output_reducer";
     78    static final String OUTPUT_COLUMN_FAMILY = "output_words";
     79    private static final String OUTPUT_PATH_PREFIX = "/tmp/word_count";
     80
     81    private static final String CONF_COLUMN_NAME = "columnname";
     82
     83    public static void main(String[] args) throws Exception
     84    {
     85        // Let ToolRunner handle generic command-line options
     86        ToolRunner.run(new Configuration(), new WordCount(), args);
     87        System.exit(0);
     88    }
     89
     90    public static class TokenizerMapper extends Mapper<ByteBuffer, SortedMap<ByteBuffer, IColumn>, Text, IntWritable>
     91    {
     92        private final static IntWritable one = new IntWritable(1);
     93        private Text word = new Text();
     94        private ByteBuffer sourceColumn;
     95
     96        protected void setup(org.apache.hadoop.mapreduce.Mapper.Context context)
     97        throws IOException, InterruptedException
     98        {
     99            sourceColumn = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
     100        }
     101
     102        public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) throws IOException, InterruptedException
     103        {
     104            IColumn column = columns.get(sourceColumn);
     105            if (column == null)
     106                return;
     107            String value = ByteBufferUtil.string(column.value());
     108            logger.debug("read " + key + ":" + value + " from " + context.getInputSplit());
     109
     110            StringTokenizer itr = new StringTokenizer(value);
     111            while (itr.hasMoreTokens())
     112            {
     113                word.set(itr.nextToken());
     114                context.write(word, one);
     115            }
     116        }
     117    }
     118
     119    public static class ReducerToFilesystem extends Reducer<Text, IntWritable, Text, IntWritable>
     120    {
     121        public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
     122        {
     123            int sum = 0;
     124            for (IntWritable val : values)
     125                sum += val.get();
     126            context.write(key, new IntWritable(sum));
     127        }
     128    }
     129
     130    public static class ReducerToCassandra extends Reducer<Text, IntWritable, ByteBuffer, List<Mutation>>
     131    {
     132        private ByteBuffer outputKey;
     133
     134        protected void setup(org.apache.hadoop.mapreduce.Reducer.Context context)
     135        throws IOException, InterruptedException
     136        {
     137            outputKey = ByteBufferUtil.bytes(context.getConfiguration().get(CONF_COLUMN_NAME));
     138        }
     139
     140        public void reduce(Text word, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException
     141        {
     142            int sum = 0;
     143            for (IntWritable val : values)
     144                sum += val.get();
     145            context.write(outputKey, Collections.singletonList(getMutation(word, sum)));
     146        }
     147
     148        private static Mutation getMutation(Text word, int sum)
     149        {
     150            Column c = new Column();
     151            c.setName(Arrays.copyOf(word.getBytes(), word.getLength()));
     152            c.setValue(ByteBufferUtil.bytes(String.valueOf(sum)));
     153            c.setTimestamp(System.currentTimeMillis());
     154
     155            Mutation m = new Mutation();
     156            m.setColumn_or_supercolumn(new ColumnOrSuperColumn());
     157            m.column_or_supercolumn.setColumn(c);
     158            return m;
     159        }
     160    }
     161
     162    public int run(String[] args) throws Exception
     163    {
     164        String outputReducerType = "filesystem";
     165        if (args != null && args[0].startsWith(OUTPUT_REDUCER_VAR))
     166        {
     167            String[] s = args[0].split("=");
     168            if (s != null && s.length == 2)
     169                outputReducerType = s[1];
     170        }
     171        logger.info("output reducer type: " + outputReducerType);
     172
     173        for (int i = 0; i < WordCountSetup.TEST_COUNT; i++)
     174        {
     175            String columnName = "text" + i;
     176            getConf().set(CONF_COLUMN_NAME, columnName);
     177
     178            Job job = new Job(getConf(), "wordcount");
     179            job.setJarByClass(WordCount.class);
     180            job.setMapperClass(TokenizerMapper.class);
     181
     182            if (outputReducerType.equalsIgnoreCase("filesystem"))
     183            {
     184                job.setCombinerClass(ReducerToFilesystem.class);
     185                job.setReducerClass(ReducerToFilesystem.class);
     186                job.setOutputKeyClass(Text.class);
     187                job.setOutputValueClass(IntWritable.class);
     188                FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH_PREFIX + i));
     189            }
     190            else
     191            {
     192                job.setReducerClass(ReducerToCassandra.class);
     193
     194                job.setMapOutputKeyClass(Text.class);
     195                job.setMapOutputValueClass(IntWritable.class);
     196                job.setOutputKeyClass(ByteBuffer.class);
     197                job.setOutputValueClass(List.class);
     198
     199                job.setOutputFormatClass(ColumnFamilyOutputFormat.class);
     200
     201                ConfigHelper.setOutputColumnFamily(job.getConfiguration(), KEYSPACE, OUTPUT_COLUMN_FAMILY);
     202            }
     203
     204            job.setInputFormatClass(ColumnFamilyInputFormat.class);
     205
     206
     207            ConfigHelper.setRpcPort(job.getConfiguration(), "9160");
     208            ConfigHelper.setInitialAddress(job.getConfiguration(), "localhost");
     209            ConfigHelper.setPartitioner(job.getConfiguration(), "org.apache.cassandra.dht.RandomPartitioner");
     210            ConfigHelper.setInputColumnFamily(job.getConfiguration(), KEYSPACE, COLUMN_FAMILY);
     211            SlicePredicate predicate = new SlicePredicate().setColumn_names(Arrays.asList(ByteBufferUtil.bytes(columnName)));
     212            ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
     213
     214            job.waitForCompletion(true);
     215        }
     216        return 0;
     217    }
     218}
     219}}}