Socket 回调消息分发器

1. 实现回调消息抽象基类

/** * @Description: 回调消息抽象基类 */public abstract class SuperCallBack {    /**     * 随机字符串,识别服务器应答消息的唯一标识     */    private String callBackId;    /**     * 构造方法     *     * @param callBackId 识别服务器应答消息的唯一标识     */    public SuperCallBack(String callBackId) {        this.callBackId = callBackId;    }    /**     * 获取回调ID     *     * @return     */    public String getCallBackId() {        return callBackId;    }    /**     * 开始     */    public abstract void onStart();    /**     * 完成     */    public abstract void onCompleted();    /**     * 错误     *     * @param e 异常信息     */    public abstract void onError(Exception e);    /**     * 返回消息内容     *     * @param data 原始数据     */    public abstract void onResponse(OriginReadData data);    /**     * 成功     *     * @param data 原始数据     */    public void onSuccess(OriginReadData data) {        onCompleted();        onResponse(data);    }}

2. 实现延时队列的Item,

/** * @Description: 延时队列的 Item */public class TimeOutItem implements Delayed {    public String callBackId; // 当前 callBack 的 callBackId    private long executeTime; // 触发时间    public TimeOutItem(String callBackId, long delayTime, TimeUnit timeUnit) {        this.callBackId = callBackId;        this.executeTime = System.currentTimeMillis() + (delayTime > 0 ? timeUnit.toMillis(delayTime) : 0);        //Log.i(TAG, " executeTime: " + executeTime + " timeUnit: " + timeUnit.toMillis(delayTime));    }    /**     * 获取剩余时间     * 需要实现的接口,获得延迟时间   用过期时间-当前时间     *     * @param unit the time unit     * @return     */    @Override    public long getDelay(TimeUnit unit) {        return executeTime - System.currentTimeMillis();    }    /**     * 比较延时,队列里元素的排序依据     * 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间     *     * @param o the object to be compared.     * @return     */    @Override    public int compareTo(Delayed o) {        //Log.i(TAG, " surplusTime: " + surplusTime + " currentTime: " + currentTime + " compare: " + compare);        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));    }}

3. 创建 Socket 回调功能,实现 callBackID 作为唯一标识抽象类,

/** * @Description: 实现 Socket 回调功能,必须实现此工厂类,callBackID 作为回调消息的唯一标识 */public abstract class CallBackIDFactory {    /**     * 返回 callBackID     *     * @param data     * @return 如果没有 callBackID 请返回 null     */    public abstract String getCallBackID(OriginReadData data);}

4. 创建请求超时异常类,

/** * @Description: 请求超时异常 */public class RequestTimeOutException extends Exception {    public RequestTimeOutException(String message) {        super(message);    }}

5. 回调消息时,添加 Socket 配置参数

5.1 实现回调功能需要的 callBackID 工厂

5.2 请求超时时间

​​​​5.3 是否开启请求超时检测

/** * @Description: Socket 配置参数 */public class IOSocketOptions {    /**     * 是否为调试模式,默认为 true     */    private static boolean isDebug = true;    /**     * Socket 主机地址     */    private SocketAddress socketAddress;    /**     * Socket 备用主机地址     */    private SocketAddress backupAddress;    /**     * 写入 Socket 字节时的字节序     */    private ByteOrder writeOrder;    /**     * 读取 Socket 字节时的字节序     */    private ByteOrder readOrder;    /**     * 写数据时单个数据包的最大值 默认 100     */    private int maxWriteBytes;    /**     * 读取数据时,单次读取最大缓存值,默认 50,数值越大效率越高,但是系统消耗也越大     */    private int maxReadBytes;    /**     * 从 Socket 读取数据时,遵从的数据包结构协议,在业务层进行定义     */    private IMessageProtocol messageProtocol;    /**     * 服务器返回数据的最大值 (单位Mb) ,防止客服端内存溢出     */    private int maxResponseDataMb;    /**     * 连接超时时间(单位毫秒)     */    private int connectTimeout;    /**     * 是否重连 Socket,默认为 true     */    private boolean isReConnection;    /**     * Socket 重连管理器     */    private AbsReConnection reConnectionManager;    /**     * Socket 工厂     */    private SocketFactory socketFactory;    /**     * Socket 安全套接字协议相关配置     */    private SocketSSLConfig socketSSLConfig;    /**     * 实现回调功能需要 callBackID,而 callBackID 是保存在发送消息和应答消息中的,此工厂用来获取 Socket 消息中     * 保存 callBackID 值的 key,比如 JSON 格式中的 key-value 中的 key     */    private CallBackIDFactory callBackIDFactory;    /**     * 请求超时时间,单位毫秒,默认 十秒     */    private long requestTimeOut;    /**     * 是否开启请求超时检测     */    private boolean isOpenRequestTimeOut;    /**     * IO 字节流的编码方式,默认 UTF-8     */    private Charset charsetName;    /**     * 静态内部类     */    public static class Builder {        IOSocketOptions socketOptions;        // 首先获取一个默认的配置        public Builder() {            this(getDefaultOptions());        }        public Builder(IOSocketOptions defaultOptions) {            socketOptions = defaultOptions;        }        /**         * 设置 Socket 主机地址         *         * @param socketAddress         * @return         */        public Builder setSocketAddress(SocketAddress socketAddress) {            socketOptions.socketAddress = socketAddress;            return this;        }        /**         * 设置 Socket 备用主机地址         *         * @param backupAddress         * @return         */        public Builder setBackupAddress(SocketAddress backupAddress) {            socketOptions.backupAddress = backupAddress;            return this;        }        /**         * 设置服务器返回数据的允许的最大值,单位兆/Mb         *         * @param maxResponseDataMb         * @return         */        public Builder setMaxResponseDataMb(int maxResponseDataMb) {            socketOptions.maxResponseDataMb = maxResponseDataMb;            return this;        }        /**         * 设置连接超时时间(单位毫秒)         *         * @param connectTimeout         * @return         */        public Builder setConnectTimeout(int connectTimeout) {            socketOptions.connectTimeout = connectTimeout;            return this;        }        /**         * 是否重连 Socket         *         * @param reConnection         * @return         */        public Builder setReConnection(boolean reConnection) {            socketOptions.isReConnection = reConnection;            return this;        }        /**         * 设置 Socket 重连管理器         *         * @param reConnectionManager         * @return         */        public Builder setReConnectionManager(AbsReConnection reConnectionManager) {            socketOptions.reConnectionManager = reConnectionManager;            return this;        }        /**         * 自定义创建 socket 工厂         *         * @param socketFactory         */        public Builder setSocketFactory(SocketFactory socketFactory) {            socketOptions.socketFactory = socketFactory;            return this;        }        /**         * 安全套接字协议的配置         *         * @param socketSSLConfig         * @return         */        public Builder setSocketSSLConfig(SocketSSLConfig socketSSLConfig) {            socketOptions.socketSSLConfig = socketSSLConfig;            return this;        }        /**         * 设置请求 ack 回调功能的工厂         *         * @param callBackIDFactory         * @return         */        public Builder setCallBackIDFactory(CallBackIDFactory callBackIDFactory) {            socketOptions.callBackIDFactory = callBackIDFactory;            return this;        }        /**         * 设置请求超时时间         *         * @param requestTimeOut 毫秒         */        public Builder setRequestTimeOut(long requestTimeOut) {            socketOptions.requestTimeOut = requestTimeOut;            return this;        }        /**         * 设置是否开启请求超时的检测         *         * @param openRequestTimeOut         */        public Builder setOpenRequestTimeOut(boolean openRequestTimeOut) {            socketOptions.isOpenRequestTimeOut = openRequestTimeOut;            return this;        }        /**         * 设置 Socket 写字节时的字节序         *         * @param writeOrder         */        public Builder setWriteOrder(ByteOrder writeOrder) {            socketOptions.writeOrder = writeOrder;            return this;        }        /**         * 设置 Socket 读取字节时的字节序         *         * @param readOrder         * @return         */        public Builder setReadOrder(ByteOrder readOrder) {            socketOptions.readOrder = readOrder;            return this;        }        /**         * 设置写数据时,单个数据包的最大值         *         * @param maxWriteBytes         * @return         */        public Builder setMaxWriteBytes(int maxWriteBytes) {            socketOptions.maxWriteBytes = maxWriteBytes;            return this;        }        /**         * 设置读取数据时,单次读取最大缓存值         *         * @param maxReadBytes         * @return         */        public Builder setMaxReadBytes(int maxReadBytes) {            socketOptions.maxReadBytes = maxReadBytes;            return this;        }        /**         * 设置读取数据的数据结构协议         *         * @param messageProtocol         */        public Builder setMessageProtocol(IMessageProtocol messageProtocol) {            socketOptions.messageProtocol = messageProtocol;            return this;        }        /**         * 设置 IO 字节流的编码方式,默认 UTF-8         *         * @param charsetName         * @return         */        public Builder setCharsetName(Charset charsetName) {            socketOptions.charsetName = charsetName;            return this;        }        public IOSocketOptions build() {            return socketOptions;        }    }    /**     * 获取默认的配置     */    public static IOSocketOptions getDefaultOptions() {        IOSocketOptions options = new IOSocketOptions();        options.socketAddress = null;        options.backupAddress = null;        options.isReConnection = true;// 是否重连主机,默认为 true        options.maxWriteBytes = 100;        options.maxReadBytes = 50;        options.writeOrder = ByteOrder.BIG_ENDIAN;// 大端序        options.readOrder = ByteOrder.BIG_ENDIAN;// 大端序        options.messageProtocol = null;// 默认数据包结构协议为 null        options.maxResponseDataMb = 5;// 默认接收最大值 5MB        options.reConnectionManager = new DefaultReConnection(); // 默认 Socket 重连器        options.connectTimeout = 10 * 1000;// 连接超时默认 5 秒        options.socketFactory = null;        options.socketSSLConfig = null;        options.callBackIDFactory = null;// 实现回调功能需要的 callBackID 工厂        options.requestTimeOut = 10 * 1000;// 请求超时时间,默认10秒        options.isOpenRequestTimeOut = true;// 是否开启请求超时检测,默认 true 开启        options.charsetName = StandardCharsets.UTF_8;        return options;    }    public Charset getCharsetName() {        return charsetName;    }    public boolean isIsDebug() {        return isDebug;    }    public SocketAddress getSocketAddress() {        return socketAddress;    }    public SocketAddress getBackupAddress() {        return backupAddress;    }    public ByteOrder getWriteOrder() {        return writeOrder;    }    public int getMaxWriteBytes() {        return maxWriteBytes;    }    public ByteOrder getReadOrder() {        return readOrder;    }    public int getMaxReadBytes() {        return maxReadBytes;    }    public IMessageProtocol getMessageProtocol() {        return messageProtocol;    }    public int getMaxResponseDataMb() {        return maxResponseDataMb;    }    public int getConnectTimeout() {        return connectTimeout;    }    public boolean isReConnection() {        return isReConnection;    }    public AbsReConnection getReConnectionManager() {        return reConnectionManager;    }    public SocketFactory getSocketFactory() {        return socketFactory;    }    public SocketSSLConfig getSocketSSLConfig() {        return socketSSLConfig;    }    public CallBackIDFactory getCallBackIDFactory() {        return callBackIDFactory;    }    public long getRequestTimeOut() {        return requestTimeOut;    }    public boolean isOpenRequestTimeOut() {        return isOpenRequestTimeOut;    }    public static void setIsDebug(boolean isDebug) {        IOSocketOptions.isDebug = isDebug;    }}

6. 发送消息时,定义接口及抽象类

6.1 发送消息接口类,

/** * @Description: 发送消息的接口 */public interface ISender extends Serializable {}

6.2 发送消息基础类,

/** * @Description: 发送消息基础类 */public class SuperSender implements ISender {}

6.3 发送消息抽象类,

/** * @Description: 发送消息抽象类 */public abstract class SuperCallBackSender extends SuperSender {    /**     * 随机回调标识 callbackId     */    private String callbackId;    public SuperCallBackSender() {        generateCallBackId();    }    public String getCallbackId() {        return callbackId;    }    /**     * 根据自己的协议打包消息     *     * @return     */    public abstract byte[] pack();    /**     * 随机生成一个回调标识 CallBackId,在消息发送前执行,CallBackId 作为消息的唯一标识一起传给服务器,     * 服务器反馈当前消息的时候,也是携带同样的 CallBackId 给客户端,用以识别调用的方法     */    public void generateCallBackId() {        callbackId = Util.getRandomChar(20);    }}

7. 工具类添加,生成随机数和非空检测

/** * @Description: 工具类 */public class Util {    /**     * 判断字符串是否为空     *     * @param str     * @return     */    public static boolean isStringEmpty(String str) {        return str == null || str.trim().length() == 0;    }    /**     * 拼接连个 byte 数组     *     * @param byte1     * @param byte2     * @return     */    public static byte[] concatBytes(byte[] byte1, byte[] byte2) {        if (byte1 == null) {            return byte2;        }        if (byte2 == null) {            return byte1;        }        byte[] byte3 = new byte[byte1.length + byte2.length];        System.arraycopy(byte1, 0, byte3, 0, byte1.length);        System.arraycopy(byte2, 0, byte3, byte1.length, byte2.length);        return byte3;    }    /**     * 生成随机字符串     *     * @param length 长度     * @return     */    public static String getRandomChar(int length) {        char[] chars = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F',                'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z'};        Random random = new Random();        StringBuilder builder = new StringBuilder();        for (int i = 0; i < length; i++) {            builder.append(chars[random.nextInt(36)]);        }        return builder.toString();    }    /**     * 非空检测     *     * @param object     * @param msg     */    public static void checkNotNull(Object object, String msg) {        try {            if (object == null) throw new Exception(msg);        } catch (Exception e) {            throw new RuntimeException(e);        }    }}

8. 创建回调消息分发器,

/** * @Description: 回调消息分发器 */public class CallbackResponseDispatcher {    /**     * 保存发送的每个回调消息监听示例,key 为回调标识 callBackId,这样有回调消息反馈的时候,     * 就可以找到并调用对应的监听对象     */    private Map callBacks = new HashMap();    /**     * 保存需要进行超时检测的请求,这是一个延时队列,元素超时的时候会被取出来     */    private DelayQueue timeOutItemQueue = new DelayQueue();    /**     * 超时检测的线程管理器     */    private ExecutorService timeOutExecutor;    /**     * Socket 连接管理器     */    IConnectionManager connectionManager;    /**     * Socket 配置参数     */    IOSocketOptions socketOptions;    public CallbackResponseDispatcher(IConnectionManager connectionManager) {        this.connectionManager = connectionManager;        //this.socketOptions = connectionManager.getOptions();        //注册 Socket 行为监听        //connectionManager.subscribeSocketAction(socketActionListener);    }    /**     * 设置 Socket 配置参数     *     * @param socketOptions     */    public void setSocketOptions(IOSocketOptions socketOptions) {        this.socketOptions = socketOptions;    }    /**     * Socket 行为监听,重写反馈消息的回调方法     */    private SocketActionListener socketActionListener = new SocketActionListener() {        @Override        public void onSocketResponse(SocketAddress socketAddress, OriginReadData originReadData) {            // 回调消息集合为 0            if (callBacks.size() == 0) return;            // 实现回调功能需要的 callBackID 工厂,拿到 callBackID 的 key            if (socketOptions.getCallBackIDFactory() == null) return;            // 获取回调的ID            String callBackID = socketOptions.getCallBackIDFactory().getCallBackID(originReadData);            if (callBackID != null) {                // 获取 callBackID 对应的 callBack                SuperCallBack callBack = callBacks.get(callBackID);                if (callBack != null) {                    //回调消息                    callBack.onSuccess(originReadData);                    //移除完成任务的 callBack                    callBacks.remove(callBackID);                    LogUtil.i("移除的 callBackID: " + callBackID);                }            }        }    };    /**     * 请求超时检测线程     */    public void engineThread() {        if (timeOutExecutor == null || timeOutExecutor.isShutdown()) {            timeOutExecutor = Executors.newSingleThreadExecutor();            timeOutExecutor.execute(new Runnable() {                @Override                public void run() {                    try {                        // 超时的元素会被取出,没有元素将等待                        TimeOutItem item = timeOutItemQueue.take();                        if (item != null) {                            SuperCallBack callBack = callBacks.remove(item.callBackId);                            if (callBack != null) {                                callBack.onError(new RequestTimeOutException("Request timeout"));                            }                        }                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                    //继续循环执行                    if (timeOutExecutor != null && !timeOutExecutor.isShutdown()) {                        run();                    }                }            });        }    }    /**     * 每发一条回调消息,都要在这里添加监听对象     *     * @param superCallBack     */    public void addSocketCallBack(SuperCallBack superCallBack) {        //存放回调消息监听器        callBacks.put(superCallBack.getCallBackId(), superCallBack);        //放入延时队列中        //TODO        long delayTime = socketOptions == null ? 2 * 1000 : socketOptions.getRequestTimeOut();        //回调消息监听添加到延时队列        timeOutItemQueue.add(new TimeOutItem(superCallBack.getCallBackId(), delayTime, TimeUnit.MILLISECONDS));    }    /**     * 移除 CallBack 请求     */    public void removeSocketCallBack(String callBackKey){        callBacks.remove(callBackKey);    }    /**     * 同一个消息发送多次, callBackId 是不能一样的,所以这里要先 check 一下,否则服务端反馈的时候,客户端接收就会乱套     *     * @param callBackSender     */    public void checkCallBackSender(SuperCallBackSender callBackSender) {        Util.checkNotNull(socketOptions.getCallBackIDFactory(), "要想实现 Socket 的回调功能,CallBackIDFactory 不能为 null," +                "请实现一个 CallBackIDFactory 并在初始化的时候通过 SocketOptions 的 setCallBackIDFactory 进行配置");        String callBackId = callBackSender.getCallbackId();        // 同一个消息发送两次以上,callBackId 是不能一样的,否则服务端反馈的时候,客户端接收就会乱套        if (callBacks.containsKey(callBackId)) {            callBackSender.generateCallBackId();        }    }    /**     * 关闭线程     */    public void shutdownThread() {        if (timeOutExecutor != null && !timeOutExecutor.isShutdown()) {            // shutdown 和 shutdownNow 的区别,前者中断未执行的线程,后者中断所有线程            timeOutExecutor.shutdownNow();            timeOutExecutor = null;        }    }}

9. 测试方法

    //测试方法    public void initView(View view) {        responseDispatcher = new CallbackResponseDispatcher(null);        String callBackId = Util.getRandomChar(20);        view.findViewById( -> {            responseDispatcher.engineThread();            responseDispatcher.addSocketCallBack(new SimpleCallBack(callBackId) {                @Override                public void onResponse(OriginReadData data) {                }            });        });        view.findViewById( -> {            responseDispatcher.removeSocketCallBack(callBackId);        });        view.findViewById( -> responseDispatcher.shutdownThread());    }

© 版权声明