「javahdfs集成」HDFS搭建

博主:adminadmin 2023-01-01 04:57:05 747

本篇文章给大家谈谈javahdfs集成,以及HDFS搭建对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

编写一个JAVA类方法,通过该方法可以获取出存储在HDFS集群中根目录的所有文件?

public void listMyFile() throws Exception {

//获取FileSystem

//"hdfs"为伪造用户,使用hdfs用户进行访问

FileSystem fileSystem = FileSystem.get(new URI("hdfs://192.168.83.141:8020"), new Configuration(), "hdfs");

//获取指定目标目录下的所有文件信息

RemoteIteratorLocatedFileStatus iterator =

fileSystem.listFiles(new Path("/"), true);

//遍历迭代器

while (iterator.hasNext()) {

//获取每个文件详细信息

LocatedFileStatus fileStatus = iterator.next();

//获取每个文件的存储路径

System.out.println("路径:" + fileStatus.getPath() +

"---" + fileStatus.getPath().getName());

//获取文件的block存储信息

BlockLocation[] blockLocations = fileStatus.getBlockLocations();

//打印每个文件的block数

System.out.println("block数量:" + blockLocations.length);

//打印每一个block副本的存储位置

for (BlockLocation blockLocation : blockLocations) {

String[] hosts = blockLocation.getHosts();

for (String host : hosts) {

System.out.println("主机:" + host);

}

}

}

}

java怎么连接hdfs文件系统,需要哪些包?

apache的Hadoop项目提供一类api可以通过java工程操作hdfs中的文件,包括:文件打开,读写,删除等、目录的创建,删除,读取目录中所有文件等。

1、到下载Hadoop,解压后把所有jar加入项目的lib里

2、程序处理步骤: 1)得到Configuration对象,2)得到FileSystem对象,3)进行文件操作,简单示例如下:

/**

*

*/

package org.jrs.wlh;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FSDataInputStream;

import org.apache.hadoop.fs.FSDataOutputStream;

import org.apache.hadoop.fs.FileStatus;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

/**

* @PutMeger.java

* java操作hdfs 往 hdfs中上传数据

* @version $Revision$/br

* update: $Date$

*/

public class PutMeger {

public static void main(String[] args) throws IOException {

String[] str = new String[]{"E:\\hadoop\\UploadFileClient.java","hdfs://master:9000/user/hadoop/inccnt.java"};

Configuration conf = new Configuration();

FileSystem fileS= FileSystem.get(conf);

FileSystem localFile = FileSystem.getLocal(conf); //得到一个本地的FileSystem对象

Path input = new Path(str[0]); //设定文件输入保存路径

Path out = new Path(str[1]); //文件到hdfs输出路径

try{

FileStatus[] inputFile = localFile.listStatus(input); //listStatus得到输入文件路径的文件列表

FSDataOutputStream outStream = fileS.create(out); //创建输出流

for (int i = 0; i inputFile.length; i++) {

System.out.println(inputFile[i].getPath().getName());

FSDataInputStream in = localFile.open(inputFile[i].getPath());

byte buffer[] = new byte[1024];

int bytesRead = 0;

while((bytesRead = in.read(buffer))0){ //按照字节读取数据

System.out.println(buffer);

outStream.write(buffer,0,bytesRead);

}

in.close();

}

}catch(Exception e){

e.printStackTrace();

}

}

}

我正在捣腾hadoop,用java编写了一个程序,想要连接到hdfs上,运行后显示如下,这是什么问题呢?求解!

你的hadoop是2.X的,但是还是按1.X的配置,需修改配置,或者还原hadoop版本。

如何使用Java API读写HDFS

Java API读写HDFS

public class FSOptr {

/**

* @param args

*/

public static void main(String[] args) throws Exception {

// TODO Auto-generated method stub

Configuration conf = new Configuration();

makeDir(conf);

rename(conf);

delete(conf);

}

// 创建文件目录

private static void makeDir(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path dir = new Path("/user/hadoop/data/20140318");

boolean result = fs.mkdirs(dir);// 创建文件夹

System.out.println("make dir :" + result);

// 创建文件,并写入内容

Path dst = new Path("/user/hadoop/data/20140318/tmp");

byte[] buff = "hello,hadoop!".getBytes();

FSDataOutputStream outputStream = fs.create(dst);

outputStream.write(buff, 0, buff.length);

outputStream.close();

FileStatus files[] = fs.listStatus(dst);

for (FileStatus file : files) {

System.out.println(file.getPath());

}

fs.close();

}

// 重命名文件

private static void rename(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path oldName = new Path("/user/hadoop/data/20140318/1.txt");

Path newName = new Path("/user/hadoop/data/20140318/2.txt");

fs.rename(oldName, newName);

FileStatus files[] = fs.listStatus(new Path(

"/user/hadoop/data/20140318"));

for (FileStatus file : files) {

System.out.println(file.getPath());

}

fs.close();

}

// 删除文件

@SuppressWarnings("deprecation")

private static void delete(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path path = new Path("/user/hadoop/data/20140318");

if (fs.isDirectory(path)) {

FileStatus files[] = fs.listStatus(path);

for (FileStatus file : files) {

fs.delete(file.getPath());

}

} else {

fs.delete(path);

}

// 或者

fs.delete(path, true);

fs.close();

}

/**

* 下载,将hdfs文件下载到本地磁盘

*

* @param localSrc1

* 本地的文件地址,即文件的路径

* @param hdfsSrc1

* 存放在hdfs的文件地址

*/

public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {

Configuration conf = new Configuration();

FileSystem fs = null;

try {

fs = FileSystem.get(URI.create(hdfsSrc1), conf);

Path hdfs_path = new Path(hdfsSrc1);

Path local_path = new Path(localSrc1);

fs.copyToLocalFile(hdfs_path, local_path);

return true;

} catch (IOException e) {

e.printStackTrace();

}

return false;

}

/**

* 上传,将本地文件copy到hdfs系统中

*

* @param localSrc

* 本地的文件地址,即文件的路径

* @param hdfsSrc

* 存放在hdfs的文件地址

*/

public boolean sendToHdfs1(String localSrc, String hdfsSrc) {

InputStream in;

try {

in = new BufferedInputStream(new FileInputStream(localSrc));

Configuration conf = new Configuration();// 得到配置对象

FileSystem fs; // 文件系统

try {

fs = FileSystem.get(URI.create(hdfsSrc), conf);

// 输出流,创建一个输出流

OutputStream out = fs.create(new Path(hdfsSrc),

new Progressable() {

// 重写progress方法

public void progress() {

// System.out.println("上传完一个设定缓存区大小容量的文件!");

}

});

// 连接两个流,形成通道,使输入流向输出流传输数据,

IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流

return true;

} catch (IOException e) {

e.printStackTrace();

}

} catch (FileNotFoundException e) {

e.printStackTrace();

}

return false;

}

/**

* 移动

*

* @param old_st原来存放的路径

* @param new_st移动到的路径

*/

public boolean moveFileName(String old_st, String new_st) {

try {

// 下载到服务器本地

boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");

Configuration conf = new Configuration();

FileSystem fs = null;

// 删除源文件

try {

fs = FileSystem.get(URI.create(old_st), conf);

Path hdfs_path = new Path(old_st);

fs.delete(hdfs_path);

} catch (IOException e) {

e.printStackTrace();

}

// 从服务器本地传到新路径

new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));

boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);

if (down_flag uplod_flag) {

return true;

}

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

// copy本地文件到hdfs

private static void CopyFromLocalFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path src = new Path("/home/hadoop/word.txt");

Path dst = new Path("/user/hadoop/data/");

fs.copyFromLocalFile(src, dst);

fs.close();

}

// 获取给定目录下的所有子目录以及子文件

private static void getAllChildFile(Configuration conf) throws Exception {

FileSystem fs = FileSystem.get(conf);

Path path = new Path("/user/hadoop");

getFile(path, fs);

}

private static void getFile(Path path, FileSystem fs)throws Exception {

FileStatus[] fileStatus = fs.listStatus(path);

for (int i = 0; i fileStatus.length; i++) {

if (fileStatus[i].isDir()) {

Path p = new Path(fileStatus[i].getPath().toString());

getFile(p, fs);

} else {

System.out.println(fileStatus[i].getPath().toString());

}

}

}

//判断文件是否存在

private static boolean isExist(Configuration conf,String path)throws Exception{

FileSystem fileSystem = FileSystem.get(conf);

return fileSystem.exists(new Path(path));

}

//获取hdfs集群所有主机结点数据

private static void getAllClusterNodeInfo(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

DistributedFileSystem hdfs = (DistributedFileSystem)fs;

DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();

String[] names = new String[dataNodeStats.length];

System.out.println("list of all the nodes in HDFS cluster:"); //print info

for(int i=0; i dataNodeStats.length; i++){

names[i] = dataNodeStats[i].getHostName();

System.out.println(names[i]); //print info

}

}

//get the locations of a file in HDFS

private static void getFileLocation(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

Path f = new Path("/user/cluster/dfs.txt");

FileStatus filestatus = fs.getFileStatus(f);

BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());

int blkCount = blkLocations.length;

for(int i=0; i blkCount; i++){

String[] hosts = blkLocations[i].getHosts();

//Do sth with the block hosts

System.out.println(hosts);

}

}

//get HDFS file last modification time

private static void getModificationTime(Configuration conf)throws Exception{

FileSystem fs = FileSystem.get(conf);

Path f = new Path("/user/cluster/dfs.txt");

FileStatus filestatus = fs.getFileStatus(f);

long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch

Date d = new Date(modificationTime);

System.out.println(d);

}

}

关于javahdfs集成和HDFS搭建的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。