• Index

Hadoop MapReduce

Reads: 2121 Edit

AppendFile.java

package org.hduser.hadoop;
import java.io.*;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;

public class AppendFile {
	// http://192.168.0.100:50030/
	// hadoop fs -cat hdfs://master.hadoop:9000/user/hduser/test.txt
	// hadoop dfs -appendToFile /home/hduser/test.txt /user/hduser/test.txt   #将test.txt从本地文件系统append到HDFS上的test.txt文件

	// hadoop jar append.jar org.hduser.hadoop.AppendFile /home/hduser/test.txt /user/hduser/test.txt
	
	// 打包时选择main类
	// hadoop jar /home/hduser/append.jar  /home/hduser/test.txt(本地文件) /user/hduser/test.txt
	// hadoop jar /home/hduser/append.jar  /home/hduser/test.txt(本地文件) /user/hduser/test.txt
	// hadoop dfs -ls /user/hduser
	// hadoop dfs -cat /user/hduser/test.txt

	public static void main(String[] args) throws Exception {
		try{
			String localSrc = args[0];
			String dst = args[1];
			InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
			
			Configuration conf = new Configuration();
			conf.setBoolean("dfs.support.append", true);
			conf.set("user", "hduser");
			conf.set("fs.defaultFS", "hdfs://master.hadoop:9000");
			FileSystem fs = FileSystem.get(URI.create(dst), conf);
			
			//要追加的文件流,inpath为文件
			OutputStream out = fs.append(new Path(dst));
			IOUtils.copyBytes(in, out, 4096, true);
			System.out.println("append success");
		}catch(Exception e){
			System.out.println("fail" + e.toString());
		}
	}
}

DelFile.java

package org.hduser.hadoop;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;

public class DelFile {
	public static void main(String[] args) throws Exception {
		// $ hadoop jar delfile.jar org.hduser.hadoop.DelFile out1
		String src = args[0];;
		
		String url = "hdfs://master.hadoop:9000/";
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(url), conf);

		fs.delete(new Path(src), false);
	}
}

GetFile.java

package org.hduser.hadoop;

import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

public class GetFile {
	public static void main(String[] args) throws Exception {
/**
 * hadoop jar getfile.jar org.hduser.hadoop.GetFile hdfs://192.168.0.100:9000/user/hduser/test.txt /home/hduser/test.txt 512
 * hadoop jar getfile.jar org.hduser.hadoop.GetFile test.txt /home/hduser/test.txt 1024
 * hadoop jar getfile.jar org.hduser.hadoop.GetFile /user/hduser/test.txt /home/hduser/test.txt 1024
 * 
 * HDFS文件必须写全路径,否则报文件找不到
 * java -jar getfile.jar org.hduser.hadoop.GetFile hdfs://192.168.0.100:9000/user/hduser/test.txt /home/hduser/test.txt 512
 */
		String src = args[0];
		String dst = args[1];
		int buffersize = 40960;
		if (args.length > 2) {
			buffersize = Integer.parseInt(args[2]);
		}

		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(src), conf);
		// DBInputFormat<DBWritable>
		// NLineInputFormat
		// SequenceFileInputFormat<K, V>
		// sort
		long start = System.currentTimeMillis();
		OutputStream out = new FileOutputStream(dst);

		InputStream in = null;
		try {
			in = fs.open(new Path(src));
			IOUtils.copyBytes(in, out, buffersize, false);
		} finally {
			IOUtils.closeStream(in);
		}
		System.out.println("Buffer Size:" + buffersize + " Elapsed Time(ms):"
				+ (System.currentTimeMillis() - start));
	}
}

ListFiles.java

package org.hduser.hadoop;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;

public class ListFiles {
	
	// hadoop jar /home/hduser/ListFiles.jar org.hduser.hadoop.ListFiles /user/hduser
	public static void main(String[] args) throws Exception {
		String dir = args[0];
		Configuration conf = new Configuration();
		FileSystem fs = FileSystem.get(URI.create(dir), conf);
		
		Path[] paths = new Path[1];
		paths[0] = new Path(dir);

		FileStatus[] status = fs.listStatus(paths);
		System.out.println("status.length:"+status.length);
		for (FileStatus fileStatus : status) {
			System.out.println("fileStatus.isFile:"+fileStatus.isFile());
			if (fileStatus.isFile()) {
				BlockLocation bls[] = fs.getFileBlockLocations(fileStatus, 1, 16);
				System.out.println("fileStatus.path:"+fileStatus.getPath());
				System.out.println("bls.length:"+bls.length);
				for (BlockLocation blockLocation : bls) {
					System.out.println("bls.blockLength:"+blockLocation.getLength());
					for (int i = 0; i < blockLocation.getNames().length; i++) {
						System.out.println("bls.name:"+blockLocation.getNames()[i]);
						System.out.println("bls.host:"+blockLocation.getHosts()[i]);
						System.out.println("bls.topologyPath:"+blockLocation.getTopologyPaths()[i]);					
					}
				}
			}
		}
		
		
		Path[] listedPaths = FileUtil.stat2Paths(status);
		for (Path p : listedPaths) {
			System.out.println(p);
		}
	}
}

UploadFile.java

package org.hduser.hadoop;
import java.io.*;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

public class UploadFile {
	// http://192.168.0.100:50030/
	// hadoop fs -cat hdfs://localhost:9000/user/hduser/test.txt
	// hadoop dfs -put /home/hduser/test.txt /user/hduser/test.txt   #将test.txt从本地文件系统copy到HDFS上。

	// hadoop jar upload.jar org.hduser.hadoop.UploadFile /home/hduser/test.txt /user/hduser/test.txt
	
	// 打包时选择main类
	// hadoop jar /home/hduser/upload.jar  /home/hduser/test.txt(本地文件) /user/hduser/output
	// hadoop jar /home/hduser/upload.jar  /home/hduser/test.txt(本地文件) /user/hduser/test.txt
	// hadoop dfs -ls /home/hduser/test.txt
	// hadoop dfs -cat /user/hduser/test.txt

	public static void main(String[] args) throws Exception {
		try{
		String localSrc = args[0];
		String dst = args[1];
		
//		String localSrc = "d:/test/test1.txt";
//		String dst = "/home/hduser/test1.txt";
		
		InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
		
		Configuration conf = new Configuration();
		conf.set("user", "hduser");
		conf.set("fs.defaultFS", "hdfs://master.hadoop:9000");
		FileSystem fs = FileSystem.get(URI.create(dst), conf);
		
		OutputStream out = fs.create(new Path(dst));
		
		IOUtils.copyBytes(in, out, 4096, true);
		System.out.println("success");
		}catch(Exception e){
			System.out.println("fail" + e.toString());
		}
	}
}

关于作者

王硕,十年软件开发经验,业余产品经理,精通Java/Python/Go等,喜欢研究技术,著有《PyQt快速开发与实战》《Python 3.* 全栈开发》,多个业余开源项目托管在GitHub上,欢迎微博交流:


Comments

Make a comment

www.ultrapower.com ,王硕的博客,专注于研究互联网产品和技术,提供中文精品教程。 本网站与其它任何公司及/或商标无任何形式关联或合作。
  • Index
aaaaa