1. 实现回调消息抽象基类 SuperCallBack.java
/** * @Description: 回调消息抽象基类 */public abstract class SuperCallBack { /** * 随机字符串,识别服务器应答消息的唯一标识 */ private String callBackId; /** * 构造方法 * * @param callBackId 识别服务器应答消息的唯一标识 */ public SuperCallBack(String callBackId) { this.callBackId = callBackId; } /** * 获取回调ID * * @return */ public String getCallBackId() { return callBackId; } /** * 开始 */ public abstract void onStart(); /** * 完成 */ public abstract void onCompleted(); /** * 错误 * * @param e 异常信息 */ public abstract void onError(Exception e); /** * 返回消息内容 * * @param data 原始数据 */ public abstract void onResponse(OriginReadData data); /** * 成功 * * @param data 原始数据 */ public void onSuccess(OriginReadData data) { onCompleted(); onResponse(data); }}
2. 实现延时队列的Item,TimeOutItem.java
/** * @Description: 延时队列的 Item */public class TimeOutItem implements Delayed { public String callBackId; // 当前 callBack 的 callBackId private long executeTime; // 触发时间 public TimeOutItem(String callBackId, long delayTime, TimeUnit timeUnit) { this.callBackId = callBackId; this.executeTime = System.currentTimeMillis() + (delayTime > 0 ? timeUnit.toMillis(delayTime) : 0); //Log.i(TAG, " executeTime: " + executeTime + " timeUnit: " + timeUnit.toMillis(delayTime)); } /** * 获取剩余时间 * 需要实现的接口,获得延迟时间 用过期时间-当前时间 * * @param unit the time unit * @return */ @Override public long getDelay(TimeUnit unit) { return executeTime - System.currentTimeMillis(); } /** * 比较延时,队列里元素的排序依据 * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间 * * @param o the object to be compared. * @return */ @Override public int compareTo(Delayed o) { //Log.i(TAG, " surplusTime: " + surplusTime + " currentTime: " + currentTime + " compare: " + compare); return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS)); }}
3. 创建 Socket 回调功能,实现 callBackID 作为唯一标识抽象类,CallBackIDFactory.java
/** * @Description: 实现 Socket 回调功能,必须实现此工厂类,callBackID 作为回调消息的唯一标识 */public abstract class CallBackIDFactory { /** * 返回 callBackID * * @param data * @return 如果没有 callBackID 请返回 null */ public abstract String getCallBackID(OriginReadData data);}
4. 创建请求超时异常类,RequestTimeOutException.java
/** * @Description: 请求超时异常 */public class RequestTimeOutException extends Exception { public RequestTimeOutException(String message) { super(message); }}
5. 回调消息时,添加 Socket 配置参数
5.1 实现回调功能需要的 callBackID 工厂
5.2 请求超时时间
5.3 是否开启请求超时检测
/** * @Description: Socket 配置参数 */public class IOSocketOptions { /** * 是否为调试模式,默认为 true */ private static boolean isDebug = true; /** * Socket 主机地址 */ private SocketAddress socketAddress; /** * Socket 备用主机地址 */ private SocketAddress backupAddress; /** * 写入 Socket 字节时的字节序 */ private ByteOrder writeOrder; /** * 读取 Socket 字节时的字节序 */ private ByteOrder readOrder; /** * 写数据时单个数据包的最大值 默认 100 */ private int maxWriteBytes; /** * 读取数据时,单次读取最大缓存值,默认 50,数值越大效率越高,但是系统消耗也越大 */ private int maxReadBytes; /** * 从 Socket 读取数据时,遵从的数据包结构协议,在业务层进行定义 */ private IMessageProtocol messageProtocol; /** * 服务器返回数据的最大值 (单位Mb) ,防止客服端内存溢出 */ private int maxResponseDataMb; /** * 连接超时时间(单位毫秒) */ private int connectTimeout; /** * 是否重连 Socket,默认为 true */ private boolean isReConnection; /** * Socket 重连管理器 */ private AbsReConnection reConnectionManager; /** * Socket 工厂 */ private SocketFactory socketFactory; /** * Socket 安全套接字协议相关配置 */ private SocketSSLConfig socketSSLConfig; /** * 实现回调功能需要 callBackID,而 callBackID 是保存在发送消息和应答消息中的,此工厂用来获取 Socket 消息中 * 保存 callBackID 值的 key,比如 JSON 格式中的 key-value 中的 key */ private CallBackIDFactory callBackIDFactory; /** * 请求超时时间,单位毫秒,默认 十秒 */ private long requestTimeOut; /** * 是否开启请求超时检测 */ private boolean isOpenRequestTimeOut; /** * IO 字节流的编码方式,默认 UTF-8 */ private Charset charsetName; /** * 静态内部类 */ public static class Builder { IOSocketOptions socketOptions; // 首先获取一个默认的配置 public Builder() { this(getDefaultOptions()); } public Builder(IOSocketOptions defaultOptions) { socketOptions = defaultOptions; } /** * 设置 Socket 主机地址 * * @param socketAddress * @return */ public Builder setSocketAddress(SocketAddress socketAddress) { socketOptions.socketAddress = socketAddress; return this; } /** * 设置 Socket 备用主机地址 * * @param backupAddress * @return */ public Builder setBackupAddress(SocketAddress backupAddress) { socketOptions.backupAddress = backupAddress; return this; } /** * 设置服务器返回数据的允许的最大值,单位兆/Mb * * @param maxResponseDataMb * @return */ public Builder setMaxResponseDataMb(int maxResponseDataMb) { socketOptions.maxResponseDataMb = maxResponseDataMb; return this; } /** * 设置连接超时时间(单位毫秒) * * @param connectTimeout * @return */ public Builder setConnectTimeout(int connectTimeout) { socketOptions.connectTimeout = connectTimeout; return this; } /** * 是否重连 Socket * * @param reConnection * @return */ public Builder setReConnection(boolean reConnection) { socketOptions.isReConnection = reConnection; return this; } /** * 设置 Socket 重连管理器 * * @param reConnectionManager * @return */ public Builder setReConnectionManager(AbsReConnection reConnectionManager) { socketOptions.reConnectionManager = reConnectionManager; return this; } /** * 自定义创建 socket 工厂 * * @param socketFactory */ public Builder setSocketFactory(SocketFactory socketFactory) { socketOptions.socketFactory = socketFactory; return this; } /** * 安全套接字协议的配置 * * @param socketSSLConfig * @return */ public Builder setSocketSSLConfig(SocketSSLConfig socketSSLConfig) { socketOptions.socketSSLConfig = socketSSLConfig; return this; } /** * 设置请求 ack 回调功能的工厂 * * @param callBackIDFactory * @return */ public Builder setCallBackIDFactory(CallBackIDFactory callBackIDFactory) { socketOptions.callBackIDFactory = callBackIDFactory; return this; } /** * 设置请求超时时间 * * @param requestTimeOut 毫秒 */ public Builder setRequestTimeOut(long requestTimeOut) { socketOptions.requestTimeOut = requestTimeOut; return this; } /** * 设置是否开启请求超时的检测 * * @param openRequestTimeOut */ public Builder setOpenRequestTimeOut(boolean openRequestTimeOut) { socketOptions.isOpenRequestTimeOut = openRequestTimeOut; return this; } /** * 设置 Socket 写字节时的字节序 * * @param writeOrder */ public Builder setWriteOrder(ByteOrder writeOrder) { socketOptions.writeOrder = writeOrder; return this; } /** * 设置 Socket 读取字节时的字节序 * * @param readOrder * @return */ public Builder setReadOrder(ByteOrder readOrder) { socketOptions.readOrder = readOrder; return this; } /** * 设置写数据时,单个数据包的最大值 * * @param maxWriteBytes * @return */ public Builder setMaxWriteBytes(int maxWriteBytes) { socketOptions.maxWriteBytes = maxWriteBytes; return this; } /** * 设置读取数据时,单次读取最大缓存值 * * @param maxReadBytes * @return */ public Builder setMaxReadBytes(int maxReadBytes) { socketOptions.maxReadBytes = maxReadBytes; return this; } /** * 设置读取数据的数据结构协议 * * @param messageProtocol */ public Builder setMessageProtocol(IMessageProtocol messageProtocol) { socketOptions.messageProtocol = messageProtocol; return this; } /** * 设置 IO 字节流的编码方式,默认 UTF-8 * * @param charsetName * @return */ public Builder setCharsetName(Charset charsetName) { socketOptions.charsetName = charsetName; return this; } public IOSocketOptions build() { return socketOptions; } } /** * 获取默认的配置 */ public static IOSocketOptions getDefaultOptions() { IOSocketOptions options = new IOSocketOptions(); options.socketAddress = null; options.backupAddress = null; options.isReConnection = true;// 是否重连主机,默认为 true options.maxWriteBytes = 100; options.maxReadBytes = 50; options.writeOrder = ByteOrder.BIG_ENDIAN;// 大端序 options.readOrder = ByteOrder.BIG_ENDIAN;// 大端序 options.messageProtocol = null;// 默认数据包结构协议为 null options.maxResponseDataMb = 5;// 默认接收最大值 5MB options.reConnectionManager = new DefaultReConnection(); // 默认 Socket 重连器 options.connectTimeout = 10 * 1000;// 连接超时默认 5 秒 options.socketFactory = null; options.socketSSLConfig = null; options.callBackIDFactory = null;// 实现回调功能需要的 callBackID 工厂 options.requestTimeOut = 10 * 1000;// 请求超时时间,默认10秒 options.isOpenRequestTimeOut = true;// 是否开启请求超时检测,默认 true 开启 options.charsetName = StandardCharsets.UTF_8; return options; } public Charset getCharsetName() { return charsetName; } public boolean isIsDebug() { return isDebug; } public SocketAddress getSocketAddress() { return socketAddress; } public SocketAddress getBackupAddress() { return backupAddress; } public ByteOrder getWriteOrder() { return writeOrder; } public int getMaxWriteBytes() { return maxWriteBytes; } public ByteOrder getReadOrder() { return readOrder; } public int getMaxReadBytes() { return maxReadBytes; } public IMessageProtocol getMessageProtocol() { return messageProtocol; } public int getMaxResponseDataMb() { return maxResponseDataMb; } public int getConnectTimeout() { return connectTimeout; } public boolean isReConnection() { return isReConnection; } public AbsReConnection getReConnectionManager() { return reConnectionManager; } public SocketFactory getSocketFactory() { return socketFactory; } public SocketSSLConfig getSocketSSLConfig() { return socketSSLConfig; } public CallBackIDFactory getCallBackIDFactory() { return callBackIDFactory; } public long getRequestTimeOut() { return requestTimeOut; } public boolean isOpenRequestTimeOut() { return isOpenRequestTimeOut; } public static void setIsDebug(boolean isDebug) { IOSocketOptions.isDebug = isDebug; }}
6. 发送消息时,定义接口及抽象类
6.1 发送消息接口类,ISender.java
/** * @Description: 发送消息的接口 */public interface ISender extends Serializable {}
6.2 发送消息基础类,SuperSender.java
/** * @Description: 发送消息基础类 */public class SuperSender implements ISender {}
6.3 发送消息抽象类,SuperCallBackSender.java
/** * @Description: 发送消息抽象类 */public abstract class SuperCallBackSender extends SuperSender { /** * 随机回调标识 callbackId */ private String callbackId; public SuperCallBackSender() { generateCallBackId(); } public String getCallbackId() { return callbackId; } /** * 根据自己的协议打包消息 * * @return */ public abstract byte[] pack(); /** * 随机生成一个回调标识 CallBackId,在消息发送前执行,CallBackId 作为消息的唯一标识一起传给服务器, * 服务器反馈当前消息的时候,也是携带同样的 CallBackId 给客户端,用以识别调用的方法 */ public void generateCallBackId() { callbackId = Util.getRandomChar(20); }}
7. 工具类添加,生成随机数和非空检测
/** * @Description: 工具类 */public class Util { /** * 判断字符串是否为空 * * @param str * @return */ public static boolean isStringEmpty(String str) { return str == null || str.trim().length() == 0; } /** * 拼接连个 byte 数组 * * @param byte1 * @param byte2 * @return */ public static byte[] concatBytes(byte[] byte1, byte[] byte2) { if (byte1 == null) { return byte2; } if (byte2 == null) { return byte1; } byte[] byte3 = new byte[byte1.length + byte2.length]; System.arraycopy(byte1, 0, byte3, 0, byte1.length); System.arraycopy(byte2, 0, byte3, byte1.length, byte2.length); return byte3; } /** * 生成随机字符串 * * @param length 长度 * @return */ public static String getRandomChar(int length) { char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'}; Random random = new Random(); StringBuilder builder = new StringBuilder(); for (int i = 0; i < length; i++) { builder.append(chars[random.nextInt(36)]); } return builder.toString(); } /** * 非空检测 * * @param object * @param msg */ public static void checkNotNull(Object object, String msg) { try { if (object == null) throw new Exception(msg); } catch (Exception e) { throw new RuntimeException(e); } }}
8. 创建回调消息分发器,CallbackResponseDispatcher.java
/** * @Description: 回调消息分发器 */public class CallbackResponseDispatcher { /** * 保存发送的每个回调消息监听示例,key 为回调标识 callBackId,这样有回调消息反馈的时候, * 就可以找到并调用对应的监听对象 */ private Map callBacks = new HashMap(); /** * 保存需要进行超时检测的请求,这是一个延时队列,元素超时的时候会被取出来 */ private DelayQueue timeOutItemQueue = new DelayQueue(); /** * 超时检测的线程管理器 */ private ExecutorService timeOutExecutor; /** * Socket 连接管理器 */ IConnectionManager connectionManager; /** * Socket 配置参数 */ IOSocketOptions socketOptions; public CallbackResponseDispatcher(IConnectionManager connectionManager) { this.connectionManager = connectionManager; //this.socketOptions = connectionManager.getOptions(); //注册 Socket 行为监听 //connectionManager.subscribeSocketAction(socketActionListener); } /** * 设置 Socket 配置参数 * * @param socketOptions */ public void setSocketOptions(IOSocketOptions socketOptions) { this.socketOptions = socketOptions; } /** * Socket 行为监听,重写反馈消息的回调方法 */ private SocketActionListener socketActionListener = new SocketActionListener() { @Override public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) { // 回调消息集合为 0 if (callBacks.size() == 0) return; // 实现回调功能需要的 callBackID 工厂,拿到 callBackID 的 key if (socketOptions.getCallBackIDFactory() == null) return; // 获取回调的ID String callBackID = socketOptions.getCallBackIDFactory().getCallBackID(originReadData); if (callBackID != null) { // 获取 callBackID 对应的 callBack SuperCallBack callBack = callBacks.get(callBackID); if (callBack != null) { //回调消息 callBack.onSuccess(originReadData); //移除完成任务的 callBack callBacks.remove(callBackID); LogUtil.i("移除的 callBackID: " + callBackID); } } } }; /** * 请求超时检测线程 */ public void engineThread() { if (timeOutExecutor == null || timeOutExecutor.isShutdown()) { timeOutExecutor = Executors.newSingleThreadExecutor(); timeOutExecutor.execute(new Runnable() { @Override public void run() { try { // 超时的元素会被取出,没有元素将等待 TimeOutItem item = timeOutItemQueue.take(); if (item != null) { SuperCallBack callBack = callBacks.remove(item.callBackId); if (callBack != null) { callBack.onError(new RequestTimeOutException("Request timeout")); } } } catch (InterruptedException e) { e.printStackTrace(); } //继续循环执行 if (timeOutExecutor != null && !timeOutExecutor.isShutdown()) { run(); } } }); } } /** * 每发一条回调消息,都要在这里添加监听对象 * * @param superCallBack */ public void addSocketCallBack(SuperCallBack superCallBack) { //存放回调消息监听器 callBacks.put(superCallBack.getCallBackId(), superCallBack); //放入延时队列中 //TODO long delayTime = socketOptions == null ? 2 * 1000 : socketOptions.getRequestTimeOut(); //回调消息监听添加到延时队列 timeOutItemQueue.add(new TimeOutItem(superCallBack.getCallBackId(), delayTime, TimeUnit.MILLISECONDS)); } /** * 移除 CallBack 请求 */ public void removeSocketCallBack(String callBackKey){ callBacks.remove(callBackKey); } /** * 同一个消息发送多次, callBackId 是不能一样的,所以这里要先 check 一下,否则服务端反馈的时候,客户端接收就会乱套 * * @param callBackSender */ public void checkCallBackSender(SuperCallBackSender callBackSender) { Util.checkNotNull(socketOptions.getCallBackIDFactory(), "要想实现 Socket 的回调功能,CallBackIDFactory 不能为 null," + "请实现一个 CallBackIDFactory 并在初始化的时候通过 SocketOptions 的 setCallBackIDFactory 进行配置"); String callBackId = callBackSender.getCallbackId(); // 同一个消息发送两次以上,callBackId 是不能一样的,否则服务端反馈的时候,客户端接收就会乱套 if (callBacks.containsKey(callBackId)) { callBackSender.generateCallBackId(); } } /** * 关闭线程 */ public void shutdownThread() { if (timeOutExecutor != null && !timeOutExecutor.isShutdown()) { // shutdown 和 shutdownNow 的区别,前者中断未执行的线程,后者中断所有线程 timeOutExecutor.shutdownNow(); timeOutExecutor = null; } }}
9. 测试方法
//测试方法 public void initView(View view) { responseDispatcher = new CallbackResponseDispatcher(null); String callBackId = Util.getRandomChar(20); view.findViewById(R.id.but_add).setOnClickListener(v -> { responseDispatcher.engineThread(); responseDispatcher.addSocketCallBack(new SimpleCallBack(callBackId) { @Override public void onResponse(OriginReadData data) { } }); }); view.findViewById(R.id.but_remove).setOnClickListener(v -> { responseDispatcher.removeSocketCallBack(callBackId); }); view.findViewById(R.id.but_stop).setOnClickListener(v -> responseDispatcher.shutdownThread()); }