「javahdfs写文件」java读取hdfs文件夹下的文件
本篇文章给大家谈谈javahdfs写文件,以及java读取hdfs文件夹下的文件对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、如何使用Java API读写HDFS
- 2、hdfs api创建文件并写入内容
- 3、Hadoop文档(2.9.2) - HDFS架构
- 4、HDFS文件
- 5、HDFS读写和冷备份原理
- 6、关于用java写程序把本地文件上传到HDFS中的问题
如何使用Java API读写HDFS
com.wyc.hadoop.fs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path a href=";tn=44039180_cprfenlei=mv6quAkxTZn0IZRqIHckPjm4nH00T1YdmvmvuhfdPHRzP16kuyPW0AP8IA3qPjfsn1bkrjKxmLKz0ZNzUjdCIZwsrBtEXh9GuA7EQhF9pywdQhPEUiqkIyN1IA-EUBtvrjbvPHfdP1bdP10snH6dnWf" target="_blank" class="baidu-highlight"dst/a = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(a href=";tn=44039180_cprfenlei=mv6quAkxTZn0IZRqIHckPjm4nH00T1YdmvmvuhfdPHRzP16kuyPW0AP8IA3qPjfsn1bkrjKxmLKz0ZNzUjdCIZwsrBtEXh9GuA7EQhF9pywdQhPEUiqkIyN1IA-EUBtvrjbvPHfdP1bdP10snH6dnWf" target="_blank" class="baidu-highlight"dst/a);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(a href=";tn=44039180_cprfenlei=mv6quAkxTZn0IZRqIHckPjm4nH00T1YdmvmvuhfdPHRzP16kuyPW0AP8IA3qPjfsn1bkrjKxmLKz0ZNzUjdCIZwsrBtEXh9GuA7EQhF9pywdQhPEUiqkIyN1IA-EUBtvrjbvPHfdP1bdP10snH6dnWf" target="_blank" class="baidu-highlight"dst/a);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上传,将本地文件copy到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
hdfs api创建文件并写入内容
Hadoop中关于文件操作类基本上全部是在"org.apache.hadoop.fs"包中,这些API能够支持的操作包含:打开文件,读写文件,删除文件等。
Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个:
hdfs api创建文件写入内容全部程序如下:
(1):import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class HDFSTest {
//在指定位置新建一个文件,并写入字符
public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);
Path path = new Path(file);
FSDataOutputStream out = fs.create(path); //创建文件
//两个方法都用于文件写入,好像一般多使用后者
out.writeBytes(words);
out.write(words.getBytes("UTF-8"));
out.close();
//如果是要从输入流中写入,或是从一个文件写到另一个文件(此时用输入流打开已有内容的文件)
//可以使用如下IOUtils.copyBytes方法。
//FSDataInputStream in = fs.open(new Path(args[0]));
//IOUtils.copyBytes(in, out, 4096, true) //4096为一次复制块大小,true表示复制完成后关闭流
}
(2): public static void DeleteHDFSFile(String file) throws IOException
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(file), conf);
Path path = new Path(file);
//查看fs的delete API可以看到三个方法。deleteonExit实在退出JVM时删除,下面的方法是在指定为目录是递归删除
fs.delete(path,true);
fs.close();
}
public static void UploadLocalFileHDFS(String src, String dst) throws IOException
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
Path pathDst = new Path(dst);
Path pathSrc = new Path(src);
fs.copyFromLocalFile(pathSrc, pathDst);
fs.close();
}
public static void ListDirAll(String DirFile) throws IOException
{
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(DirFile), conf);
Path path = new Path(DirFile);
FileStatus[] status = fs.listStatus(path);
//方法1
for(FileStatus f: status)
{
System.out.println(f.getPath().toString());
}
//方法2
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths){
System.out.println(p.toString());
}
}
(3):
Hadoop文档(2.9.2) - HDFS架构
Hadoop分布式文件系统(HDFS)是一种运行在通用硬件上的分布式文件系统。它与传统的分布式文件系统有很多相似之处,但是也有显著的不同。HDFS是高容错的,可以部署在低成本硬件上。HDFS提供了对应用数据的高吞吐量访问,适用于具有大数据集的应用。HDFS为了流数据访问放松了一些POSIX的限制。
HDFS是主从结构。一个HDFS集群由一个NameNode和一组DataNode组成。NameNode是主服务器,负责管理文件系统命名空间以及客户端对文件的访问。DataNode通常每个节点一个,负责管理存储。HDFS对外暴露了一个文件系统命名空间并允许用户数据作为文件存储。在内部实现上,一个文件会被分割成一个或多个block,这些block存储在一组DataNode上。NameNode负责执行文件系统命名空间操作,例如打开,关闭,重命名文件和目录等。此外NameNode还维护着block和DataNode之间的映射关系。DataNode负责处理来自客户端的读写请求,并根据NameNode的指令创建,删除,备份block。
NameNode和DataNode都是运行在通用机器上的软件。这些机器通常使用Linux系统。HDFS使用Java构建,任何支持Java的机器都可以运行NameNode和DataNode。一种典型的集群部署方式是使用一台机器运行NameNode,其它机器每台运行一个DataNode实例。
HDFS使用传统的分层文件结构。用户可以创建目录并在目录下存储文件。文件系统命名空间结构与传统文件系统类似,用户可以创建,删除文件,将文件从一个目录移动到另一个目录,重命名文件。HDFS支持用户限额和访问权限。
NameNode维护整个文件系统命名空间,它会记录任何对命名空间的修改。应用程序可以指定HDFS中文件的备份数量。文件的拷贝数称为该文件的备份因子。这个信息也存储在NameNode中。
HDFS可以跨机器存储海量文件。每个文件分成一个block的序列存储。为了容错,文件的block会被备份。每个文件的block大小和备份因子都是可配置的。
文件中所有block的大小是相等的(除了最后一个),而对append和hsync提供可变长block支持后,用户可以直接创建一个新block,不必继续填充最后一个block。
应用程序可以指定文件的备份数。备份因子可在文件创建时指定,也可以稍后修改。HDFS的文件都是一次写入的(除了append和truncate),并且任何时候都只有一个写入器。
NameNode决定如何备份block。它周期性的接收来自DataNode的心跳检测和block报表。收到心跳检测说明DataNode工作正常,block报表包含该DataNode上的所有block。
备份文件的位置对HDFS的可用性和性能至关重要。对备份的优化让HDFS从众多分布式系统中脱颖而出。这个工作需要大量的优化和经验。机架感知备份放置策略的目的是提高数据的可靠性,可用性和网络带宽利用率。目前的备份放置策略实现是这个方向上的第一步。短期目标是在生产环境上对其进行验证,更多的了解它的行为,为测试和研究更复杂的策略奠定基础。
大型HDFS集群的机器通常隶属于多个机架。两个不同机架上的节点进行通信必须通过交换机。一般来说,同一机架机器之间的网络带宽要优于不同机架机器间的网络带宽。
NameNode通过Hadoop Rack Awareness进程确定每个DataNode所属的机架ID。一个简单但是并非最优的策略是将备份放置在独立的机架上。这种策略可以避免机架故障时丢失数据,读数据时也可以利用多个机架的网络带宽。这种策略在集群中平均分配备份文件,这样组件发生故障时可以平衡负载。但是这种策略会增加写入成本,因为数据需要跨机架传输。
最常见的情况,备份因子是3。HDFS的放置策略是:如果写入器位于DataNode上,则将副本放置在本地计算机,否则随机选择一个DataNode,另一个副本放置在另一个远程机架的节点上,最后一个副本放在同一个远程机架的另一个节点上。这种策略减少了机架间的写入流量,从而提高写性能。机架发生故障的几率远小于节点故障几率。这种策略并不影响数据可靠性和可用性,但是它确实减少了读操作时的聚合网络带宽,因为一个block被放置到两个机架上而不是三个。这种策略的文件副本并不是均匀的分布在所有机架上,副本的三分之一位于一个节点,剩下的三分之二位于另一个机架上。这种策略可以提高写性能,而不会影响数据可靠性和读性能。
如果备份因子大于3,那么第四个和之后的副本随机放置,同时要保证副本数量不能超过机架的上限(公式: (replicas - 1) / racks + 2 )。
由于DataNode不能放置同一个block的多个副本,所以最大备份因子就是最大DataNode数。
在提供了存储类型和存储策略的支持之后,除了机架感知,NameNode放置副本时也会考虑放置策略。NameNode首先根据机架感知选择节点,然后根据备份文件的放置策略检查该节点的存储类型,如果该候选节点没有要求的存储类型,NameNode会查找下一个节点。如果第一轮没有找到足够的节点放置备份,NameNode会使用后备存储类型开始第二轮查找。
目前,副本放置策略依然在开发中。
为了减少带宽消耗和读延迟,HDFS会尝试找寻一个离读请求最近的副本。如果读请求节点所在机架有这样一个副本,HDFS就优先使用这个副本。如果HDFS集群跨越多个数据中心,则本地数据中心的副本优先于远程副本。
启动HDFS时,NameNode会进入一种称为安全模式的特殊状态。安全模式下数据block无法备份。NameNode会从DataNode接收心跳检测和block报表。block报表包含该DataNode下所有数据block的列表信息。每个block都有一个指定的最小备份数。只有block的最小备份数登记到NameNode中后,block才可以备份。备份登记结束后,NameNode退出安全模式。这是如果还有block不满足最小备份数的条件,NameNode才开始备份这些block。
HDFS命名空间由NameNode保存,NameNode使用一个称为EditLog的事务日志记录对文件系统元数据的所有更改。例如,创建一个新文件会在EditLog中插入一条对应记录,同样的,修改文件备份因子也会插入一条记录。NameNode使用本地文件存储EditLog。整个文件系统命名空间,包括文件与block之间的映射关系,文件系统数据等,都保存在FsImage文件中。
NameNode在内存中维护文件系统命名空间和文件block映射关系的镜像。当NameNode启动,或者某个阈值触发了检查点时,NameNode从磁盘上读取FsImage和EditLog的内容,将所有EditLog中的事务操作应用到FsImage的内存镜像中,然后在磁盘上生成一个全新的FsImage。之后可以截断EditLog,因为所有事务都已持久化到FsImage。这个过程称为检查点。检查点的目的是通过获取文件系统元数据的快照并保存到FsImage来保证HDFS文件系统元数据的一致性。读取FsImage可能很快,但是持续编辑FsImage就不同了。因此我们将操作记录到EditLog中,而不是直接修改FsImage。在检查点期间,所有EditLog操作应用到FsImage。检查点可以按周期触发( dfs.namenode.checkpoint.period ),也可以按事务数触发( dfs.namenode.checkpoint.txns )。如果两个属性都设置了,第一个满足的阈值会触发检查点。
DataNode在本地文件系统中存储HDFS数据。DataNode对HDFS文件一无所知,它以block为单位存储HDFS数据。DataNode不会在同一个目录下保存所有文件。相反,它使用启发式方法来确定每个目录的最佳文件数,并适时创建子目录。在同一个目录下创建所有文件并不是最佳选择,因为本地文件系统可能无法支持一个目录下的大量文件。DataNode启动时,它会扫描整个本地文件系统,生成一个本地文件与数据block之间的关系列表,将其发送给NameNode,这个列表称为block报告。
所有HDFS通信协议都构建在TCP/IP协议之上。客户端通过TCP端口与NameNode建立连接,它使用ClientProtocol与NameNode交互。DataNode使用DataProtocol与NameNode交互。一个RPC抽象封装了客户端协议和DataNode协议。NameNode从不初始化任何RPC,它只是响应来自的客户端和DataNode的请求。
HDFS的主要目标是即使出现故障也可以可靠的存储数据。三种常见的故障分别是:NameNode故障,DataNode故障和网络分区。
DataNode周期性的发送心跳检测给NameNode。网络分区可能导致某些DataNode无法连接NameNode。NameNode无法收到DataNode的心跳检测后,它会把这样的DataNode标记为dead,并不在发送新的I/O请求。注册到死亡DataNode上的数据对HDFS来说不再可用,也会导致某些block的备份数少于文件指定的最小备份数。NameNode持续追踪block的备份情况并在必要时初始化备份操作。重备份的原因是多种多样的:DataNode不可用,某个备份文件损坏,DataNode磁盘故障,或者文件的备份因子增大。
为了避免DataNode状态抖动引起的备份风暴,标记DataNode死亡的超时时间设置的很长(默认超过10分钟)。用户可以设置一个更短的时间将DataNode标记为陈旧(stale),这样可以避免对性能敏感的工作负载的陈旧DataNode的读写操作。
HDFS架构与数据重平衡scheme兼容。scheme可以在DataNode的磁盘空间低于某个阈值时将数据移动到另一个DataNode上。如果对某个文件的需求特别高,scheme还可以动态创建额外的副本并平衡到整个集群中。这些数据平衡scheme还未实现。
从DataNode中读取的block可能是损坏的。损坏的原因有多种:磁盘故障,网络故障,或者软件问题。HDFS客户端会对文件内容进行校验和检查。当客户端创建一个HDFS文件时,它会计算出文件所有block的校验和并保存在同一个命名空间的一个独立的隐藏文件中。当客户单检索文件时还要检查对应校验和文件中的值。如果校验和不匹配,客户端会尝试该block其它节点上的副本。
FsImage和EditLog是HDFS的核心数据结构。如果它们发生损坏,HDFS就无法使用了。因此,可以通过配置让NameNode维护多个FsImage和EditLog的拷贝。对两个文件的修改会同步到所有拷贝中。这种同步操作会降低NameNode的TPS,但是这种牺牲是可接受的,因为HDFS是数据密集,不是元数据密集。NameNode重启时,它会选择最一致的FsImage和EditLog使用。
另一种减低故障的办法是使用HA。
(略)
HDFS的目的是支持大型文件。HDFS支持一次写入多次读取。一个典型的block大小是128MB。因此,HDFS文件按照128MB的大小分割,每个block可能分布在不同的节点上。
客户端向HDFS文件写入数据时,如果备份因子是三,NameNode使用备份目标选择算法检索出一组DataNode。这个列表是可以存储副本的DataNode。客户端先向第一个DataNode写入数据,DataNode接收数据并将数据传输到列表中的第二个DataNode。第二个DataNode开始接收数据并继续传输数据到第三个DataNode。这样,数据通过管道从一个DataNode传输到下一个。
(略)
如果开启了trash配置,从FS shell中删除的文件并不会立刻从HDFS中删除,HDFS将它移动到一个trash目录(每个用户都有自己的trash目录, /user/username/.Trash )。只要文件还在trash目录中就可以快速恢复。
最近删除的文件移动到 /user/username/.Trash/Current 目录中,每隔一段时间,HDFS会为这些文件创建检查点文件( /user/username/.Trash/date )并删除旧检查点文件。
如果trash中的文件过期了,NameNode将这些文件从命名空间中删除。与文件关联的block被释放。删除文件和空间释放之间可能会有延迟。
下面是一个例子,首先创建两个文件:
然后删除test1,该文件会被移到Trash目录:
接着跳过Trash删除test2:
现在可以查看Trash目录:
文件的备份因子降低后,NameNode选择可以删除的副本,在下次心跳检测时把信息发送给DataNode,之后DataNode删除block并释放空间。
HDFS文件
Hadoop支持的文件系统由很多(见下图),HDFS只是其中一种实现。Java抽象类 org.apache.hadoop.fs.FileSystem 定义了Hadoop中一个文件系统的客户端接口,并且该抽象类有几个具体实现。Hadoop一般使用URI(下图)方案来选取合适的文件系统实例进行交互。
特别的,HDFS文件系统的操作可以使用 FsSystem shell 、客户端(http rest api、Java api、C api等)。
FsSystem shell 的用法基本同本地shell类似,命令可参考 FsSystem shell
Hadoop是用Java写的,通过Java Api( FileSystem 类)可以调用大部分Hadoop文件系统的交互操作。更详细的介绍可参考 hadoop Filesystem 。
非Java开发的应用可以使用由WebHDFS协议提供的HTTP REST API,但是HTTP比原生的Java客户端要慢,所以不到万不得已尽量不要使用HTTP传输特大数据。通过HTTP来访问HDFS有两种方法:
两种如图
在第一种情况中,namenode和datanode内嵌的web服务作为WebHDFS的端节点运行(是否启用WebHDFS可通过dfs.webhdfs.enabled设置,默认为true)。文件元数据在namenode上,文件读写操作首先被发往namenode,有namenode发送一个HTTP重定向至某个客户端,指示以流的方式传输文件数据的目的或源datanode。
第二种方法依靠一个或多个独立代理服务器通过HTTP访问HDFS。所有集群的网络通信都需要通过代理,因此客户端从来不直接访问namenode或datanode。使用代理后可以使用更严格的防火墙策略和带宽策略。
HttpFs代理提供和WebHDFS相同的HTTP接口,这样客户端能够通过webhdfs URI访问接口。HttpFS代理启动独立于namenode和datanode的守护进程,使用httpfs.sh 脚本,默认在一个不同的端口上监听(14000)。
下图描述了
读文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。
对上图的解释如下:
在读取过程中, 如果 FSDataInputStream 在和一个 datanode 进行交流时出现了一个错误,他就去试一试下一个最接近的块,他当然也会记住刚才发生错误的 datanode 以至于之后不会再在这个 datanode 上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了, DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。
在这个设计中一个重要的方面就是客户端直接从 datanode 上检索数据,并通过 namenode 指导来得到每一个块的最佳 datanode。这种设计允许 HDFS 扩展大量的并发客户端,因为数据传输只是集群上的所有 datanode 展开的。期间,namenode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。
我们知道,相对于客户端(之后就是 mapreduce task 了),块的位置有以下可能性:
我们认为他们对于客户端的带宽递减,距离递增(括号中表示距离)。示意图如下:
如果集群中的机器都在同一个机架上,我们无需其他配置,若集群比较复杂,由于hadoop无法自动发现网络拓扑,所以需要额外配置网络拓扑。
基本读取程序,将文件内容输出到console
FileSystemCat
随机读取
展开原码
下图描述了写文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。
对上图的解释如下:
如果在任何一个 datanode 在写入数据的时候失败了,接下来所做的一切对客户端都是透明的:首先, pipeline 被关闭,在确认队列中的剩下的包会被添加进数据队列的起始位置上,以至于在失败的节点下游的任 何节点都不会丢失任何的包。然后与 namenode 联系后,当前在一个好的 datanode 会联系 namenode, 给失败节点上还未写完的块生成一个新的标识ID, 以至于如果这个失败的 datanode 不久后恢复了,这个不完整的块将会被删除。失败节点会从 pipeline 中移除,然后剩下两个好的 datanode 会组成一个的新的 pipeline ,剩下的 这些块的包(也就是刚才放在数据队列队首的包)会继续写进 pipeline 中好的 datanode 中。最后,namenode 注意到块备份数小于规定的备份数,他就安排在另一个节点上创建完成备份,直接从已有的块中复制就可以。然后一直到满足了备份数( dfs.replication )。如果有多个节点的写入失败了,如果满足了最小备份数的设置( dfs.namenode.repliction.min ),写入也将会成功,然后剩下的备份会被集群异步的执行备份,直到满足了备份数( dfs.replication )。
创建目录
文件压缩有两大好处:
Hadoop 对于压缩格式的是自动识别。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。
Hadoop中有多种压缩格式、算法和工具,下图列出了常用的压缩方法。
表中的“是否可切分”表示对应的压缩算法是否支持切分,也就是说是否可以搜索数据流的任意位置并进一步往下读取数据,可切分的压缩格式尤其适合MapReduce。
所有的压缩算法都需要权衡空间/时间:压缩和解压缩速度更快,其代价通常是只能节省少量的空间。不同的压缩工具有不同的特性:
更详细的比较如下
1.压缩性能比较
2.优缺点
另外使用hadoop原生(native)类库比其他java实现有更快的压缩和解压缩速度。特征比较如下:
使用容器文件格式结合压缩算法也能更好的提高效率。顺序文件、Arvo文件、ORCFiles、Parqurt文件同时支持压缩和切分。
压缩举例(Java)
压缩
解压缩
六、文件序列化
序列化是指将结构化数据转换为字节流以便在网络上传输或写到磁盘进行永久存储。反序列化狮子将字节流转换回结构化对象的逆过程。
序列化用于分布式数据处理的两大领域:进程间通信和永久存储。
对序列化的要求时是格式紧凑(高效使用存储空间)、快速(读写效率高)、可扩展(可以透明地读取老格式数据)且可以互操作(可以使用不同的语言读写数据)。
Hadoop使用的是自己的序列化格式 Writable ,它绝对紧凑、速度快,但不太容易用java以外的语言进行扩展或使用。
当然,用户也可以使用其他序列化框架或者自定义序列化方式,如 Avro 框架。
Hadoop内部还使用了 Apache Thrift 和 Protocal Buffers 来实现RPC和数据交换。
HDFS读写和冷备份原理
当用户通过命令行或者JavaAPI向Hadoop集群发起写入文件操作时,将触发写文件流程,分为3个阶段:创建文件,建立数据流管道和写数据。
数据传输过程中,如果datanode2突然挂掉了,HDFS会启动如下步骤进行容错。
namenode负责HDFS集群的元数据管理,要保证快速检索,namenode必须将数据放到内存中,但一旦断电或者故障,元数据会全部丢失,因此还必须在磁盘上做持久化。HDFS集群做元数据持久化的方式是edits.log+FSImage。edits.log存储近期的操作,FSImage存储以前的操作,这样是为了尽可能地保障namenode的启动速度。
关于用java写程序把本地文件上传到HDFS中的问题
将这FileSystem hdfs = FileSystem.get(config);
改成FileSystem hdfs = FileSystem.get(URI.create("hdfs://master:9000"),config)
上面那句取得的是本地文件系统对象,改成下面这个才是取得hdfs文件系统对象,当你要操作本地文件对象的时候就要用上面那句取得本地文件对象,我在2.7.4刚开始也是跟你一样的错误,改为下面的就可以了
javahdfs写文件的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于java读取hdfs文件夹下的文件、javahdfs写文件的信息别忘了在本站进行查找喔。
发布于:2022-12-24,除非注明,否则均为
原创文章,转载请注明出处。