AppendFile.java
package org.hduser.hadoop;
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
public class AppendFile {
// http://192.168.0.100:50030/
// hadoop fs -cat hdfs://master.hadoop:9000/user/hduser/test.txt
// hadoop dfs -appendToFile /home/hduser/test.txt /user/hduser/test.txt #将test.txt从本地文件系统append到HDFS上的test.txt文件
// hadoop jar append.jar org.hduser.hadoop.AppendFile /home/hduser/test.txt /user/hduser/test.txt
// 打包时选择main类
// hadoop jar /home/hduser/append.jar /home/hduser/test.txt(本地文件) /user/hduser/test.txt
// hadoop jar /home/hduser/append.jar /home/hduser/test.txt(本地文件) /user/hduser/test.txt
// hadoop dfs -ls /user/hduser
// hadoop dfs -cat /user/hduser/test.txt
public static void main(String[] args) throws Exception {
try{
String localSrc = args[0];
String dst = args[1];
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
conf.setBoolean("dfs.support.append", true);
conf.set("user", "hduser");
conf.set("fs.defaultFS", "hdfs://master.hadoop:9000");
FileSystem fs = FileSystem.get(URI.create(dst), conf);
//要追加的文件流,inpath为文件
OutputStream out = fs.append(new Path(dst));
IOUtils.copyBytes(in, out, 4096, true);
System.out.println("append success");
}catch(Exception e){
System.out.println("fail" + e.toString());
}
}
}
DelFile.java
package org.hduser.hadoop;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
public class DelFile {
public static void main(String[] args) throws Exception {
// $ hadoop jar delfile.jar org.hduser.hadoop.DelFile out1
String src = args[0];;
String url = "hdfs://master.hadoop:9000/";
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(url), conf);
fs.delete(new Path(src), false);
}
}
GetFile.java
package org.hduser.hadoop;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class GetFile {
public static void main(String[] args) throws Exception {
/**
* hadoop jar getfile.jar org.hduser.hadoop.GetFile hdfs://192.168.0.100:9000/user/hduser/test.txt /home/hduser/test.txt 512
* hadoop jar getfile.jar org.hduser.hadoop.GetFile test.txt /home/hduser/test.txt 1024
* hadoop jar getfile.jar org.hduser.hadoop.GetFile /user/hduser/test.txt /home/hduser/test.txt 1024
*
* HDFS文件必须写全路径,否则报文件找不到
* java -jar getfile.jar org.hduser.hadoop.GetFile hdfs://192.168.0.100:9000/user/hduser/test.txt /home/hduser/test.txt 512
*/
String src = args[0];
String dst = args[1];
int buffersize = 40960;
if (args.length > 2) {
buffersize = Integer.parseInt(args[2]);
}
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(src), conf);
// DBInputFormat<DBWritable>
// NLineInputFormat
// SequenceFileInputFormat<K, V>
// sort
long start = System.currentTimeMillis();
OutputStream out = new FileOutputStream(dst);
InputStream in = null;
try {
in = fs.open(new Path(src));
IOUtils.copyBytes(in, out, buffersize, false);
} finally {
IOUtils.closeStream(in);
}
System.out.println("Buffer Size:" + buffersize + " Elapsed Time(ms):"
+ (System.currentTimeMillis() - start));
}
}
ListFiles.java
package org.hduser.hadoop;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
public class ListFiles {
// hadoop jar /home/hduser/ListFiles.jar org.hduser.hadoop.ListFiles /user/hduser
public static void main(String[] args) throws Exception {
String dir = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dir), conf);
Path[] paths = new Path[1];
paths[0] = new Path(dir);
FileStatus[] status = fs.listStatus(paths);
System.out.println("status.length:"+status.length);
for (FileStatus fileStatus : status) {
System.out.println("fileStatus.isFile:"+fileStatus.isFile());
if (fileStatus.isFile()) {
BlockLocation bls[] = fs.getFileBlockLocations(fileStatus, 1, 16);
System.out.println("fileStatus.path:"+fileStatus.getPath());
System.out.println("bls.length:"+bls.length);
for (BlockLocation blockLocation : bls) {
System.out.println("bls.blockLength:"+blockLocation.getLength());
for (int i = 0; i < blockLocation.getNames().length; i++) {
System.out.println("bls.name:"+blockLocation.getNames()[i]);
System.out.println("bls.host:"+blockLocation.getHosts()[i]);
System.out.println("bls.topologyPath:"+blockLocation.getTopologyPaths()[i]);
}
}
}
}
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
UploadFile.java
package org.hduser.hadoop;
import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class UploadFile {
// http://192.168.0.100:50030/
// hadoop fs -cat hdfs://localhost:9000/user/hduser/test.txt
// hadoop dfs -put /home/hduser/test.txt /user/hduser/test.txt #将test.txt从本地文件系统copy到HDFS上。
// hadoop jar upload.jar org.hduser.hadoop.UploadFile /home/hduser/test.txt /user/hduser/test.txt
// 打包时选择main类
// hadoop jar /home/hduser/upload.jar /home/hduser/test.txt(本地文件) /user/hduser/output
// hadoop jar /home/hduser/upload.jar /home/hduser/test.txt(本地文件) /user/hduser/test.txt
// hadoop dfs -ls /home/hduser/test.txt
// hadoop dfs -cat /user/hduser/test.txt
public static void main(String[] args) throws Exception {
try{
String localSrc = args[0];
String dst = args[1];
// String localSrc = "d:/test/test1.txt";
// String dst = "/home/hduser/test1.txt";
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
conf.set("user", "hduser");
conf.set("fs.defaultFS", "hdfs://master.hadoop:9000");
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst));
IOUtils.copyBytes(in, out, 4096, true);
System.out.println("success");
}catch(Exception e){
System.out.println("fail" + e.toString());
}
}
}
关于作者
王硕,十年软件开发经验,业余产品经理,精通Java/Python/Go等,喜欢研究技术,著有《PyQt快速开发与实战》《Python 3.* 全栈开发》,多个业余开源项目托管在GitHub上,欢迎微博交流: