在eclipse中调用JavaAPI实现HDFS中的相关操作
1、创建一个java工程
2、右键工程,在属性里添加上hadoop解压后的相关jar包(hadoop目录下的jar包和lib目录下的jar包)
3、调用相关代码,实现相关hdfs操作
1 package hdfs; 2 3 import java.io.InputStream; 4 import java.net.URL; 5 6 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; 7 import org.apache.hadoop.io.IOUtils; 8 9 public class App1 {10 /**11 * 异常:unknown host: chaoren 本机没有解析主机名chaoren12 * 在C:\Windows\System32\drivers\etc\hosts文件中添加192.168.80.10013 * chaoren(win10中要添加写入权限才能写入)14 */15 static final String PATH = "hdfs://chaoren:9000/hello";16 17 public static void main(String[] args) throws Exception {18 // 让URL能够解析hdfs协议19 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());20 URL url = new URL(PATH);21 InputStream in = url.openStream();22 /**23 * @param in24 * 输入流25 * @param out26 * 输出流27 * @param buffSize28 * 缓冲大小29 * @param close30 * 在传输结束后是否关闭流31 */32 IOUtils.copyBytes(in, System.out, 1024, true);// 读取文件hello中的内容33 }34 35 }
1 package hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileNotFoundException; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.net.URISyntaxException; 8 9 import org.apache.hadoop.conf.Configuration;10 import org.apache.hadoop.fs.FSDataInputStream;11 import org.apache.hadoop.fs.FSDataOutputStream;12 import org.apache.hadoop.fs.FileStatus;13 import org.apache.hadoop.fs.FileSystem;14 import org.apache.hadoop.fs.Path;15 import org.apache.hadoop.io.IOUtils;16 17 public class App2 {18 static final String PATH = "hdfs://chaoren:9000/";19 static final String DIR = "/d1";20 static final String FILE = "/d1/hello";21 22 public static void main(String[] args) throws Exception {23 FileSystem fileSystem = getFileSystem();24 25 // 创建文件夹 hadoop fs -mkdir /d126 mkDir(fileSystem);27 28 // 上传文件 hadoop fs -put src des29 putData(fileSystem);30 31 // 下载文件 hadoop fs -get src des32 getData(fileSystem);33 34 // 浏览文件夹 hadoop fs -lsr path35 list(fileSystem);36 37 // 删除文件夹 hadoop fs -rmr /d138 remove(fileSystem);39 }40 41 private static void remove(FileSystem fileSystem) throws IOException {42 fileSystem.delete(new Path(DIR), true);43 }44 45 private static void list(FileSystem fileSystem) throws IOException {46 FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));47 for (FileStatus fileStatus : listStatus) {48 String isDir = fileStatus.isDir() ? "文件夹" : "文件";49 String permission = fileStatus.getPermission().toString();50 int replication = fileStatus.getReplication();51 long len = fileStatus.getLen();52 String path = fileStatus.getPath().toString();53 System.out.println(isDir + "\t" + permission + "\t" + replication54 + "\t" + len + "\t" + path);55 }56 }57 58 private static void getData(FileSystem fileSystem) throws IOException {59 FSDataInputStream inputStream = fileSystem.open(new Path(FILE));60 IOUtils.copyBytes(inputStream, System.out, 1024, true);61 }62 63 private static void putData(FileSystem fileSystem) throws IOException,64 FileNotFoundException {65 FSDataOutputStream out = fileSystem.create(new Path(FILE));66 FileInputStream in = new FileInputStream("C:/Users/ahu_lichang/cp.txt");// 斜杠方向跟Windows下是相反的67 IOUtils.copyBytes(in, out, 1024, true);68 }69 70 private static void mkDir(FileSystem fileSystem) throws IOException {71 fileSystem.mkdirs(new Path(DIR));72 }73 74 private static FileSystem getFileSystem() throws IOException,75 URISyntaxException {76 FileSystem fileSystem = FileSystem.get(new URI(PATH),77 new Configuration());78 return fileSystem;79 }80 81 }
RPC
1.1 RPC (remote procedure call)远程过程调用. 远程过程指的是不是同一个进程。 1.2 RPC至少有两个过程。调用方(client),被调用方(server)。 1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。 1.4 RPC是hadoop构建的基础。
示例:
1 package rpc;2 3 import org.apache.hadoop.ipc.VersionedProtocol;4 5 public interface MyBizable extends VersionedProtocol{6 long VERSION = 2345L;7 public abstract String hello(String name);8 }
1 package rpc; 2 3 import java.io.IOException; 4 5 public class MyBiz implements MyBizable{ 6 7 public long getProtocolVersion(String arg0, long arg1) throws IOException { 8 return VERSION; 9 }10 11 public String hello(String name) {12 System.out.println("方法被调用了(检测方法是不是在服务器上被调用的?)");13 return "hello "+name;14 }15 16 }
1 package rpc; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.ipc.RPC; 5 import org.apache.hadoop.ipc.RPC.Server; 6 7 public class MyServer { 8 static final String ADDRESS = "localhost"; 9 static final int PORT = 12345;10 public static void main(String[] args) throws Exception {11 /**12 * 构造一个RPC的服务端13 * @param instance 这个实例中的方法会被调用14 * @param bindAddress 绑定的地址是用于监听连接的15 * @param port 绑定的端口是用于监听连接的16 * @pparam conf17 */18 Server server = RPC.getServer(new MyBiz(), ADDRESS, PORT, new Configuration());19 server.start();20 }21 22 }
1 package rpc; 2 3 import java.net.InetSocketAddress; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.ipc.RPC; 7 8 public class MyClient { 9 public static void main(String[] args) throws Exception {10 /**11 * 构造一个客户端代理对象,该代理对象实现了命名的协议。代理对象会与指定地址的服务器通话12 */13 MyBizable proxy = (MyBizable) RPC.waitForProxy(MyBizable.class,14 MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS,15 MyServer.PORT), new Configuration());16 String result = proxy.hello("hadoop!!!");17 System.out.println("客户端RPC后的结果:" + result);18 // 关闭网络连接19 RPC.stopProxy(proxy);20 }21 }
通过例子获得的认识
2.1 RPC是一个远程过程调用。 2.2 客户端调用服务端的方法,意味着调用服务端的对象中的方法。 2.3 如果服务端的对象允许客户端调用,那么这个对象必须实现接口。 2.4 如果客户端能够调用到服务端对象的方法,那么这些方法一定位于对象的接口中。