关于Java连hdfs的信息
本篇文章给大家谈谈Java连hdfs,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、我正在捣腾hadoop,用java编写了一个程序,想要连接到hdfs上,运行后显示如下,这是什么问题呢?求解!
- 2、使用java api调用HDFS文件系统时,遇到重复的代码,怎么解决
- 3、Java程序访问不了HDFS下的文件,报套接字超时失败异常,请高手解决一下
- 4、java api 连接HDFS出现报错
- 5、如何使用Java API读写HDFS
- 6、怎么使用java代码直接将从外部拿到的数据存入hdfs
我正在捣腾hadoop,用java编写了一个程序,想要连接到hdfs上,运行后显示如下,这是什么问题呢?求解!
你的hadoop是2.X的,但是还是按1.X的配置,需修改配置,或者还原hadoop版本。
使用java api调用HDFS文件系统时,遇到重复的代码,怎么解决
利用符集编码。
因为HDFS支持6种字符集编码,每个本地文件编码方式又是极可能不一样的,我们上传本地文件的时候其实就是把文件编码成字节流上传到文件系统存储。
Java程序访问不了HDFS下的文件,报套接字超时失败异常,请高手解决一下
连接超时呀,你看看 当前这台机器能ping通hdfs的机器吗。再看看 是不是能成功连上hdfs
java api 连接HDFS出现报错
HDFS是Hadoop生态系统的根基,也是Hadoop生态系统中的重要一员,大部分时候,我们都会使用Linux shell命令来管理HDFS,包括一些文件的创建,删除,修改,上传等等,因为使用shell命令操作HDFS的方式,相对比较简单,方便,但是有时候,我们也需要通过编程的方式来实现对文件系统的管理。
比如有如下的一个小需求,要求我们实现读取HDFS某个文件夹下所有日志,经过加工处理后在写入到HDFS上,或者存进Hbase里,或者存进其他一些存储系统。这时候使用shell的方式就有点麻烦了,所以这时候我们就可以使用编程的方式来完成这件事了,当然散仙在这里使用的是原生的Java语言的方式,其他的一些语言例如C++,PHP,Python都可以实现,散仙在这里不给出演示了,(其实散仙也不会那些语言,除了刚入门的Python) 。
下面,散仙给出代码,以供参考:
view sourceprint?
001 package com.java.api.hdfs;
002
003 import java.io.BufferedReader;
004 import java.io.IOException;
005 import java.io.InputStream;
006 import java.io.InputStreamReader;
007
008 import org.apache.hadoop.conf.Configuration;
009 import org.apache.hadoop.fs.FileStatus;
010 import org.apache.hadoop.fs.FileSystem;
011 import org.apache.hadoop.fs.Path;
012
013
014 /**
015 * @author 三劫散仙
016 * Java API操作HDFS
017 * 工具类
018 *
019 * **/
020 public class OperaHDFS {
021
022
023 public static void main(String[] args)throws Exception {
024
025 //System.out.println("aaa");
026 // uploadFile();
027 //createFileOnHDFS();
028 //deleteFileOnHDFS();
029 //createDirectoryOnHDFS();
030 //deleteDirectoryOnHDFS();
031 // renameFileOrDirectoryOnHDFS();
032 //downloadFileorDirectoryOnHDFS();
033 readHDFSListAll();
034 }
035
036
037
038
039 /***
040 * 加载配置文件
041 * **/
042 static Configuration conf=new Configuration();
043
044
045
046 /**
047 * 重名名一个文件夹或者文件
048 *
049 * **/
050 public static void renameFileOrDirectoryOnHDFS()throws Exception{
051
052 FileSystem fs=FileSystem.get(conf);
053 Path p1 =new Path("hdfs://10.2.143.5:9090/root/myfile/my.txt");
054 Path p2 =new Path("hdfs://10.2.143.5:9090/root/myfile/my2.txt");
055 fs.rename(p1, p2);
056
057 fs.close();//释放资源
058 System.out.println("重命名文件夹或文件成功.....");
059
060 }
如何使用Java API读写HDFS
com.wyc.hadoop.fs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.util.Date;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
public class FSOptr {
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
Configuration conf = new Configuration();
makeDir(conf);
rename(conf);
delete(conf);
}
// 创建文件目录
private static void makeDir(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/user/hadoop/data/20140318");
boolean result = fs.mkdirs(dir);// 创建文件夹
System.out.println("make dir :" + result);
// 创建文件,并写入内容
Path a href=";tn=44039180_cprfenlei=mv6quAkxTZn0IZRqIHckPjm4nH00T1YdmvmvuhfdPHRzP16kuyPW0AP8IA3qPjfsn1bkrjKxmLKz0ZNzUjdCIZwsrBtEXh9GuA7EQhF9pywdQhPEUiqkIyN1IA-EUBtvrjbvPHfdP1bdP10snH6dnWf" target="_blank" class="baidu-highlight"dst/a = new Path("/user/hadoop/data/20140318/tmp");
byte[] buff = "hello,hadoop!".getBytes();
FSDataOutputStream outputStream = fs.create(a href=";tn=44039180_cprfenlei=mv6quAkxTZn0IZRqIHckPjm4nH00T1YdmvmvuhfdPHRzP16kuyPW0AP8IA3qPjfsn1bkrjKxmLKz0ZNzUjdCIZwsrBtEXh9GuA7EQhF9pywdQhPEUiqkIyN1IA-EUBtvrjbvPHfdP1bdP10snH6dnWf" target="_blank" class="baidu-highlight"dst/a);
outputStream.write(buff, 0, buff.length);
outputStream.close();
FileStatus files[] = fs.listStatus(a href=";tn=44039180_cprfenlei=mv6quAkxTZn0IZRqIHckPjm4nH00T1YdmvmvuhfdPHRzP16kuyPW0AP8IA3qPjfsn1bkrjKxmLKz0ZNzUjdCIZwsrBtEXh9GuA7EQhF9pywdQhPEUiqkIyN1IA-EUBtvrjbvPHfdP1bdP10snH6dnWf" target="_blank" class="baidu-highlight"dst/a);
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 重命名文件
private static void rename(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path oldName = new Path("/user/hadoop/data/20140318/1.txt");
Path newName = new Path("/user/hadoop/data/20140318/2.txt");
fs.rename(oldName, newName);
FileStatus files[] = fs.listStatus(new Path(
"/user/hadoop/data/20140318"));
for (FileStatus file : files) {
System.out.println(file.getPath());
}
fs.close();
}
// 删除文件
@SuppressWarnings("deprecation")
private static void delete(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop/data/20140318");
if (fs.isDirectory(path)) {
FileStatus files[] = fs.listStatus(path);
for (FileStatus file : files) {
fs.delete(file.getPath());
}
} else {
fs.delete(path);
}
// 或者
fs.delete(path, true);
fs.close();
}
/**
* 下载,将hdfs文件下载到本地磁盘
*
* @param localSrc1
* 本地的文件地址,即文件的路径
* @param hdfsSrc1
* 存放在hdfs的文件地址
*/
public boolean sendFromHdfs(String hdfsSrc1, String localSrc1) {
Configuration conf = new Configuration();
FileSystem fs = null;
try {
fs = FileSystem.get(URI.create(hdfsSrc1), conf);
Path hdfs_path = new Path(hdfsSrc1);
Path local_path = new Path(localSrc1);
fs.copyToLocalFile(hdfs_path, local_path);
return true;
} catch (IOException e) {
e.printStackTrace();
}
return false;
}
/**
* 上传,将本地文件copy到hdfs系统中
*
* @param localSrc
* 本地的文件地址,即文件的路径
* @param hdfsSrc
* 存放在hdfs的文件地址
*/
public boolean sendToHdfs1(String localSrc, String hdfsSrc) {
InputStream in;
try {
in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();// 得到配置对象
FileSystem fs; // 文件系统
try {
fs = FileSystem.get(URI.create(hdfsSrc), conf);
// 输出流,创建一个输出流
OutputStream out = fs.create(new Path(hdfsSrc),
new Progressable() {
// 重写progress方法
public void progress() {
// System.out.println("上传完一个设定缓存区大小容量的文件!");
}
});
// 连接两个流,形成通道,使输入流向输出流传输数据,
IOUtils.copyBytes(in, out, 10240, true); // in为输入流对象,out为输出流对象,4096为缓冲区大小,true为上传后关闭流
return true;
} catch (IOException e) {
e.printStackTrace();
}
} catch (FileNotFoundException e) {
e.printStackTrace();
}
return false;
}
/**
* 移动
*
* @param old_st原来存放的路径
* @param new_st移动到的路径
*/
public boolean moveFileName(String old_st, String new_st) {
try {
// 下载到服务器本地
boolean down_flag = sendFromHdfs(old_st, "/home/hadoop/文档/temp");
Configuration conf = new Configuration();
FileSystem fs = null;
// 删除源文件
try {
fs = FileSystem.get(URI.create(old_st), conf);
Path hdfs_path = new Path(old_st);
fs.delete(hdfs_path);
} catch (IOException e) {
e.printStackTrace();
}
// 从服务器本地传到新路径
new_st = new_st + old_st.substring(old_st.lastIndexOf("/"));
boolean uplod_flag = sendToHdfs1("/home/hadoop/文档/temp", new_st);
if (down_flag uplod_flag) {
return true;
}
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
// copy本地文件到hdfs
private static void CopyFromLocalFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path src = new Path("/home/hadoop/word.txt");
Path dst = new Path("/user/hadoop/data/");
fs.copyFromLocalFile(src, dst);
fs.close();
}
// 获取给定目录下的所有子目录以及子文件
private static void getAllChildFile(Configuration conf) throws Exception {
FileSystem fs = FileSystem.get(conf);
Path path = new Path("/user/hadoop");
getFile(path, fs);
}
private static void getFile(Path path, FileSystem fs)throws Exception {
FileStatus[] fileStatus = fs.listStatus(path);
for (int i = 0; i fileStatus.length; i++) {
if (fileStatus[i].isDir()) {
Path p = new Path(fileStatus[i].getPath().toString());
getFile(p, fs);
} else {
System.out.println(fileStatus[i].getPath().toString());
}
}
}
//判断文件是否存在
private static boolean isExist(Configuration conf,String path)throws Exception{
FileSystem fileSystem = FileSystem.get(conf);
return fileSystem.exists(new Path(path));
}
//获取hdfs集群所有主机结点数据
private static void getAllClusterNodeInfo(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
DistributedFileSystem hdfs = (DistributedFileSystem)fs;
DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
String[] names = new String[dataNodeStats.length];
System.out.println("list of all the nodes in HDFS cluster:"); //print info
for(int i=0; i dataNodeStats.length; i++){
names[i] = dataNodeStats[i].getHostName();
System.out.println(names[i]); //print info
}
}
//get the locations of a file in HDFS
private static void getFileLocation(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
BlockLocation[] blkLocations = fs.getFileBlockLocations(filestatus,0,filestatus.getLen());
int blkCount = blkLocations.length;
for(int i=0; i blkCount; i++){
String[] hosts = blkLocations[i].getHosts();
//Do sth with the block hosts
System.out.println(hosts);
}
}
//get HDFS file last modification time
private static void getModificationTime(Configuration conf)throws Exception{
FileSystem fs = FileSystem.get(conf);
Path f = new Path("/user/cluster/dfs.txt");
FileStatus filestatus = fs.getFileStatus(f);
long modificationTime = filestatus.getModificationTime(); // measured in milliseconds since the epoch
Date d = new Date(modificationTime);
System.out.println(d);
}
}
怎么使用java代码直接将从外部拿到的数据存入hdfs
存入HDFS有好几种数据格式,我这里给你列出一种格式的存储,sequence的
public class SeqWrite {
private static final String[] data = { "a,b,c,d,e,f,g", "h,i,j,k,l,m,n", "o,p,q,r,s,t", "u,v,w,x,y,z", "0,1,2,3,4", "5,6,7,8,9" };
public static void main(String[] args) throws IOException, Exception {
Configuration configuration = new Configuration();
//这里是你主机的地址
configuration.set("fs.defaultFS", "192.168.51.140");
//这个是存储的路径
Path path = new Path("/tmp/test1.seq");
Option option = SequenceFile.Writer.file(path);
Option optKey = SequenceFile.Writer.keyClass(IntWritable.class);
Option optValue = SequenceFile.Writer.valueClass(Text.class);
SequenceFile.Writer writer = null;
IntWritable key = new IntWritable(10);
Text value = new Text();
writer = SequenceFile.createWriter(configuration, option, optKey, optValue);
for (int i = 0; i data.length; i++) {
key.set(i);
value.set(data[i]);
writer.append(key, value);
writer.hsync();
Thread.sleep(10000L);
}
IOUtils.closeStream(writer);
}
}
关于Java连hdfs和的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。
发布于:2022-11-23,除非注明,否则均为
原创文章,转载请注明出处。