wiki:hadoop_hbase_sample2

說明:

This sample code will put the indicate data to Hbase.

執行方式 :

 1. put test.txt in t1 directory which content is 

---------------

name:locate:years 

waue:taiwan:1981

shellon:taiwan:1981

---------------

 2. hadoop_root/$ bin/hadoop dfs -put t1 t1

 3. hbase_root/$ bin/hbase shell

 4. hql > create table t1_table("person");

 5. Come to Eclipse and run this code, and we will let database as that 

 t1_table -> person

  ----------------

  |  name | locate | years |

  ----------------

  | waue  | taiwan | 1981 |

  ----------------

  | shellon | taiwan | 1981 |

  * 6. Go to hbase console, type : hql > select * from t1_table;

結果:

08/06/06 12:20:48 INFO hbase.HTable: Creating scanner over t1_table starting at key 

+-------------------------+-------------------------+-------------------------+

| Row                     | Column                  | Cell                    |

+-------------------------+-------------------------+-------------------------+

| 0                       | person:locate           | locate                  |

+-------------------------+-------------------------+-------------------------+

| 0                       | person:name             | name                    |

+-------------------------+-------------------------+-------------------------+

| 0                       | person:years            | years                   |

+-------------------------+-------------------------+-------------------------+

| 19                      | person:locate           | taiwan                  |

+-------------------------+-------------------------+-------------------------+

| 19                      | person:name             | waue                    |

+-------------------------+-------------------------+-------------------------+

| 19                      | person:years            | 1981                    |

+-------------------------+-------------------------+-------------------------+

| 36                      | person:locate           | taiwan                  |

+-------------------------+-------------------------+-------------------------+

| 36                      | person:name             | shellon                 |

+-------------------------+-------------------------+-------------------------+

| 36                      | person:years            | 1981                    |

+-------------------------+-------------------------+-------------------------+

3 row(s) in set. (0.04 sec)

程式碼 :

/*

 *  NCHC Hbase with map reduce sample code 

 *  DemoHBaseSlink.java

 */



package tw.org.nchc.demo;



import java.io.IOException;

import java.util.Iterator;



import org.apache.hadoop.fs.Path;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapred.TableReduce;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.MapWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.lib.IdentityMapper;

import org.apache.hadoop.mapred.lib.IdentityReducer;

public class DemoHBaseSink {



	private static class ReduceClass extends TableReduce<LongWritable, Text> {



		// Column id is created dymanically, 

		private static final Text col_name = new Text("person:name");

		private static final Text col_local = new Text("person:locate");

		private static final Text col_year = new Text("person:years");

		

		// this map holds the columns per row

		private MapWritable map = new MapWritable();	

		

		// on this sample, map is nonuse, we use reduce to handle

		public void reduce(LongWritable key, Iterator<Text> values,

				OutputCollector<Text, MapWritable> output, Reporter reporter)

				throws IOException {



			// values.next().getByte() can get value and transfer to byte form, there is an other way that let decode()

			// to substitude getByte() 

			String stro = new String(values.next().getBytes());

			String str[] = stro.split(":");

			byte b_local[] = str[0].getBytes();

			byte b_name[] = str[1].getBytes();

			byte b_year[] = str[2].getBytes();

			

			// contents must be ImmutableBytesWritable

			ImmutableBytesWritable w_local = new ImmutableBytesWritable( b_local);

			ImmutableBytesWritable w_name = new ImmutableBytesWritable( b_name );

			ImmutableBytesWritable w_year = new ImmutableBytesWritable( b_year );



			// populate the current row

			map.clear();

			map.put(col_name, w_local);

			map.put(col_local, w_name);

			map.put(col_year, w_year);



			// add the row with the key as the row id

			output.collect(new Text(key.toString()), map);

		}

	}



	private DemoHBaseSink() {

	}



	/**

	 * Runs the demo.

	 */

	public static void main(String[] args) throws IOException {

		// which path of input files in Hadoop file system 

		String file_path = "/user/waue/t1";



		int mapTasks = 1;

		int reduceTasks = 1;



		JobConf conf = new JobConf(DemoHBaseSink.class);



		//Job name; you can modify to any you like  

		conf.setJobName("DemoPersonBase");



		// Hbase table name must be correct , in our profile is t1_table

		TableReduce.initJob("t1_table", ReduceClass.class, conf);

		

		// below are map-reduce profile

		conf.setNumMapTasks(mapTasks);

		conf.setNumReduceTasks(reduceTasks);

		conf.setInputPath(new Path(file_path));

		conf.setMapperClass(IdentityMapper.class);

		conf.setCombinerClass(IdentityReducer.class);

		conf.setReducerClass(ReduceClass.class);

		JobClient.runJob(conf);

	}

}

Last modified 16 years ago Last modified on Jun 6, 2008, 2:08:03 PM