2012年9月10日 星期一

Eclipse開發MapReduce程式(2)

前言
無止境的一直學習,深恐年紀大了,記憶力不好,趕快寫上來。

基於前一篇Eclipse開發MapReduce程式(1)中,有一段我寫到

"請記得把輸入輸出的路徑改回args[0]以及args[1]再把/hduser/cite/output這資料夾先刪除,不然會出錯。"
想了又想好像不應該是這樣的,於是把MyJob改寫了一段,可以避免輸出資料夾(/hduser/cite/output)已經存在了而發生的Exception。順道附上兩段利用HDFS API來實現hadoop dfs -put跟hadoop dfs -cat這兩個指令。
這都是基本的API操作,還有delete,merge...等等的,好兄弟google真的有一堆,嚇死你。

ps:output資料夾已存在就不能執行程式這是hadoop的安全機制, 以防意外蓋掉之前產生所產生的檔案

環境

Eclipse開發MapReduce程式(1)一樣。

實作

第一步, 在Eclipse中new一個PutHdfs.java
這段code就是"hadoop dfs -put <local input file> <hdfs input dir>" 的意思


import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;


public class PutHdfs {


public static void main(String[] args) throws IOException {

String local_path = args[0];
String hdfs_path = args[1];

InputStream in = new BufferedInputStream(new FileInputStream(local_path));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
OutputStream out = fs.create(new Path(hdfs_path), new Progressable() {

@Override

public void progress() {
System.out.print("=");
}
});
IOUtils.copyBytes(in, out, 4096, true);
}

}


一樣在terminal中編譯成.class再包裝成.jar,利用hadoop jar執行。

這次示範把cite75_99.txt利用java code上傳到/hduser/cite/input中,並改名成cite.txt。以下是指令:
(我把PutHdfs.java放在/home/hadoopbooks/HDFS/put底下,指令請依照自己存放的路徑 )
javac -classpath hadoop-1.0.3/hadoop-core-1.0.3.jar -d hadoopbooks/HDFS/put/ hadoopbooks/HDFS/put/PutHdfs.java
jar -cvf hadoopbooks/HDFS/put/PutHdfs.jar -C hadoopbooks/HDFS/put/ .
hadoop jar hadoopbooks/HDFS/put/PutHdfs.jar PutHdfs hadoopbooks/cite/data/cite75_99.txt /hduser/cite/input/cite.txt


第二步就是本篇主要要講的,改寫MyJob.java,避免發生資料夾已存在的錯誤發生。
在之前的MyJob.java中加入一段CheckAndDelete的class。並在run()中加入CheckAndDelete.checkAndDelete(args[1], conf)呼叫static class來做判斷。見以下的code。


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJob extends Configured implements Tool{

public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
public void map (LongWritable key, Text value, Context context) throws IOException , InterruptedException {
String[] citation = value.toString().split(",");
context.write(new Text(citation[0]), new Text(citation[1]));
}
}

public static class ReduceClass extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
String csv = "";
for (Text val : values) {
if (csv.length() > 0) {
csv += ",";
}
csv += val.toString();
}
context.write(key, new Text(csv));
}
}

public static class CheckAndDelete {
static boolean checkAndDelete(final String path , Configuration conf) {
Path dst_path = new Path(path);
try {
FileSystem hdfs = dst_path.getFileSystem(conf);
if (hdfs.exists(dst_path)) {
hdfs.delete(dst_path, true);
}
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
}

public int run(String [] args) throws IOException , InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
Job job = new Job(conf , "MyJob");
job.setJarByClass(MyJob.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

CheckAndDelete.checkAndDelete(args[1], conf);

job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);

job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);

return 0;
}

public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new MyJob(), args);
System.exit(res);
}

}

一樣在terminal中編譯成.class再包裝成.jar,利用hadoop jar執行。
javac -classpath hadoop-1.0.3/hadoop-core-1.0.3.jar -d hadoopbooks/cite/class/MyJob/ hadoopbooks/cite/java/MyJob/MyJob.java
jar -cvf hadoopJar/MyJob.jar -C hadoopbooks/cite/class/MyJob/ .
hadoop jar hadoopJar/MyJob.jar MyJob /hduser/cite/input /hduser/cite/output

可以重複試試看,就算/hduser/cite/output已經存在了,再執行第二次也步會發生錯誤,避免要刻意去刪掉存放產出檔案的資料夾。


第三步利用CatHdfs.java來觀看執行MyJob.jar後的產出結果,以下是java code


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;


public class CatHdfs {

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(args[0]));
IOUtils.copyBytes(in, System.out, 4096, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
IOUtils.closeStream(in);
}
}

}

一樣在terminal中編譯成.class再包裝成.jar,利用hadoop jar執行。
javac -classpath hadoop-1.0.3/hadoop-core-1.0.3.jar -d hadoopbooks/HDFS/cat/ hadoopbooks/HDFS/cat/CatHdfs.java
jar -cvf hadoopbooks/HDFS/cat/CatHdfs.jar -C hadoopbooks/HDFS/cat/ .
hadoop jar hadoopbooks/HDFS/catCatHdfs.jar CatHdfs /hduser/cite/output/part-r-00000


按下enter(對岸的阿六兄弟似乎說"回車"),就會看到一堆字在跑,沒錯你沒眼花。
想停止就crtl+c。

以上,over。

沒有留言:

張貼留言