rpc框架搭建
consumer 消费者应用
provider 提供的服务
Provider-common 公共类模块
rpc 架构
service-Registration 服务发现
nacos nacos配置中心
load-balancing 负载均衡
redis-trench 手写redis实现和链接
package com.trench.protocol;import com.trench.enumUtil.RedisRepEnum;import redis.clients.jedis.util.SafeEncoder;import java.io.IOException;import java.io.OutputStream;import java.nio.charset.StandardCharsets;import java.util.Arrays;public class Protocol {public static finalString DOLLAR="$";public static final String STAR="*";public static final String BLANK="\r\n";public static voidsendCommand(OutputStream outputStream, RedisRepEnum redisRepEnum,byte [] ... args){StringBuffer str=new StringBuffer();str.append(STAR).append(args.length-1).append(BLANK);str.append(DOLLAR).append(redisRepEnum.name().length()).append(BLANK);str.append(redisRepEnum).append(BLANK);Arrays.stream(args).forEach(arg->{str.append(DOLLAR).append(arg.length).append(BLANK);str.append(new String(arg)).append(BLANK);});try {outputStream.write(str.toString().getBytes(StandardCharsets.UTF_8));} catch (IOException e) {e.printStackTrace();}}public static final byte[] toByteArray(long value) {return SafeEncoder.encode(String.valueOf(value));}}
package com.trench.api;import com.trench.connection.Connetion;import com.trench.enumUtil.RedisRepEnum;import com.trench.protocol.Protocol;import com.trench.util.SerializeUtils;import redis.clients.jedis.BuilderFactory;import java.nio.charset.StandardCharsets;import java.util.List;import java.util.Set;public class Client {Connetion connetion;public Client(String host,Integer port){connetion=new Connetion(port,host);}public void set(finalString key, final String values){connetion.sendCommand( RedisRepEnum.SET,key.getBytes(StandardCharsets.UTF_8), SerializeUtils.serialize(values));}public Object get(final String key){connetion.sendCommand(RedisRepEnum.GET,key.getBytes(StandardCharsets.UTF_8));return connetion.getData();}public voiddelete(final String key){connetion.sendCommand(RedisRepEnum.GETDEL,key.getBytes(StandardCharsets.UTF_8));}//封装redis的过期时间public void expire(String key, long seconds){connetion.sendCommand(RedisRepEnum.EXISTS, key.getBytes(StandardCharsets.UTF_8), Protocol.toByteArray(seconds));}//是否存在keypublic boolean exists(final String key) {connetion.sendCommand(RedisRepEnum.EXISTS, key.getBytes(StandardCharsets.UTF_8));return (Long)connetion.getData()==1L;}//查找key中set包含public Set<String> keys(final String key){connetion.sendCommand(RedisRepEnum.KEYS,key.getBytes(StandardCharsets.UTF_8));return (Set) BuilderFactory.STRING_SET.build((List)connetion.getData());}}
rpc框架核心代码
package com.trench.protocol;import com.trench.SerializeUtils;import com.trench.frawork.Invocation;import com.trench.nacos.dome.NacosHttp;import org.apache.commons.io.IOUtils;import java.io.*;import java.net.HttpURLConnection;import java.net.MalformedURLException;import java.net.URL;import java.nio.charset.StandardCharsets;public class HttpClient {public String send(String hostName, Integer port, Invocation invocation) throws IOException {//读取nacos中的配置用户的请求方式。如http POST get等NacosHttp nacosHttp=new NacosHttp();try {URL url=new URL(nacosHttp.getHttp(),hostName,port,nacosHttp.getFile());HttpURLConnection httpURLConnection=(HttpURLConnection)url.openConnection();httpURLConnection.setRequestMethod(nacosHttp.getRequestMethod());httpURLConnection.setDoOutput(true);//配置OutputStream outputStream=httpURLConnection.getOutputStream();ObjectOutputStream oss = new ObjectOutputStream(outputStream);oss.writeObject(SerializeUtils.serialize(invocation));oss.flush();oss.close();InputStream inputStream = httpURLConnection.getInputStream(); return (String) SerializeUtils.deSerialize(IOUtils.toString(inputStream).getBytes(StandardCharsets.UTF_8));} catch (MalformedURLException e) { throw e;} catch (IOException e) { throw e;}}}
启动tomcatpackage com.trench.protocol;import org.apache.catalina.*;import org.apache.catalina.connector.Connector;import org.apache.catalina.core.StandardContext;import org.apache.catalina.core.StandardEngine;import org.apache.catalina.core.StandardHost;import org.apache.catalina.startup.Tomcat;public class HttpServer {public void start(String hostname,Integer port){//读取用户配置Tomcat tomcat=new Tomcat();Server server = tomcat.getServer();Service service = server.findService("Tomcat");Connector connector=new Connector();connector.setPort(port);Engine engine=new StandardEngine();engine.setDefaultHost(hostname);Host host=new StandardHost();host.setName(hostname);String contextPash="";Context context=new StandardContext();context.setPath(contextPash);context.addLifecycleListener(new Tomcat.FixContextListener());host.addChild(context);engine.addChild(host);service.setContainer(engine);service.addConnector(connector);tomcat.addServlet(contextPash,"dispatcher",new DispatcherServlet());context.addServletMappingDecoded("/*","dispatcher");try {tomcat.start();tomcat.getServer().await();}catch (LifecycleException e){e.printStackTrace();}}}
package com.trench.protocol;import com.trench.frawork.Invocation;import com.trench.register.LocalRegister;import jakarta.servlet.http.HttpServletRequest;import jakarta.servlet.http.HttpServletResponse;import org.apache.commons.io.IOUtils;import java.io.IOException;import java.io.ObjectInputStream;import java.lang.reflect.InvocationTargetException;import java.lang.reflect.Method;import java.nio.charset.StandardCharsets;public class HttpServerHandler extends HttpServer {public void handler(HttpServletRequest request, HttpServletResponse response){//可自行添加为心跳监听等操作//处理请求try {Invocation invocation = (Invocation)new ObjectInputStream(request.getInputStream()).readObject();String interfaceName =invocation.getInterfaceName();//接口名称Class aClass = LocalRegister.get(interfaceName,invocation.getVersion());Method method = aClass.getMethod(invocation.getMethodName(), invocation.getParameterTypes());String invoke = (String) method.invoke(aClass.newInstance(), invocation.getParameter());IOUtils.write(invoke.getBytes(StandardCharsets.UTF_8),response.getOutputStream());} catch (IOException e) {e.printStackTrace();}catch (ClassNotFoundException e){e.printStackTrace();} catch (NoSuchMethodException e) {e.printStackTrace();} catch (InvocationTargetException e) {e.printStackTrace();} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();}}}
相关的gitub仓库地址:(https://github.com/zhaoyiwen-wuxian/RpcTrench.git) master分支,进行切换分支到master
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END