手撕Java之Rpc调用

  • 时间:
  • 浏览:0
  • 来源:5分排列3_5分排列3官网

package echorpc;

import java.net.InetSocketAddress;

public class echorpc {

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		new Thread(new Runnable() {
			@Override
			public void run() {
				try {
					RpcExporter.exporter("localhost", 3088);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
		).start();
		RpcImporter<EchoService> importer = new RpcImporter<EchoService>();
		EchoService echo = importer.importer(EchoServiceImpl.class, 
				new InetSocketAddress("localhost", 3088));
		System.out.println(echo.echo("Are you all ok?"));

	}

}

向更高的服务架构技能进发!

参照《分布式服务框架原理与实践-李林锋》的书,

代码调出功能来了。

颤抖吧,老IT!

package echorpc;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class RpcExporter {
	static Executor executor =
			Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
	
	public static void exporter(String hostName, int port) throws Exception {
		ServerSocket server = new ServerSocket();
		server.bind(new InetSocketAddress(hostName, port));
		try {
			while(true) {
				executor.execute(new ExporterTask(server.accept()));
			}
		} finally {
			server.close();
		}
	}
	
	private static class ExporterTask  implements Runnable{ 
		Socket client = null;
		public ExporterTask(Socket client) {
			this.client = client;
		}
		
		@Override
		public void run() {
			ObjectInputStream input = null;
			ObjectOutputStream output = null;
			
			try {
				input = new ObjectInputStream(client.getInputStream());
				String interfaceName = input.readUTF();
				Class<?> service = Class.forName(interfaceName);
				String methodName = input.readUTF();
				Class<?>[] parameterTypes = (Class<?>[])input.readObject();
				Object[] arguments = (Object[])input.readObject();
				Method method = service.getMethod(methodName, parameterTypes);
				Object result = method.invoke(service.newInstance(),arguments);
				output = new ObjectOutputStream(client.getOutputStream());
				output.writeObject(result);
			} catch(Exception e) {
				e.printStackTrace();
			} finally {
				if (output != null)
					try {
						output.close();
					} catch(IOException e) {
						e.printStackTrace();
					}
				if (input != null)
					try {
						input.close();
					} catch(IOException e) {
						e.printStackTrace();
					}
				if (client != null)
					try {
						client.close();
					} catch(IOException e) {
						e.printStackTrace();
					}
			}
			
		}

	}

}




package echorpc;

public class EchoServiceImpl implements EchoService{
	@Override
	public String echo(String ping) {
		return ping != null ? ping + " --> I am ok." : " I am ok.";
	}

}

package echorpc;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

public class RpcImporter<S> {
	@SuppressWarnings("unchecked")
	public S importer(final Class<?> serviceClass, final InetSocketAddress addr) {
		return (S) Proxy.newProxyInstance(serviceClass.getClassLoader(), 
				new Class<?>[] {serviceClass.getInterfaces()[0]},
				new InvocationHandler() {

					@Override
					public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
						Socket socket = null;
						ObjectOutputStream output = null;
						ObjectInputStream input = null;
						try {
							socket = new Socket();
							socket.connect(addr);
							output = new ObjectOutputStream(socket.getOutputStream());
							output.writeUTF(serviceClass.getName());
							output.writeUTF(method.getName());
							output.writeObject(method.getParameterTypes());
							output.writeObject(args);
							input = new ObjectInputStream(socket.getInputStream());
							return input.readObject();
						} finally {
							if (socket != null)
								socket.close();
							if (input != null)
								input.close();
							if (output != null)
								output.close();
							
						}
						
					}
					
		});
	}

}