好玩系列 | 如果后端让你用SSE接收多次返回,那我们改造Retrofit吧

前言

说来惭愧,最近都没写技术帖,每月写一篇原创的Flag也倒了,倒并非无内容可写,实则是因为懒惰, 虽然工作和生活牵扯了很多精力,但仍是有碎片化时间的。一篇关于DataBinding非常有趣的文章还在编写中,今天我们先看一篇轻松点的内容。

随着ChatGpt等生成式AI大火,SSE(Server Send Events)又回到了技术讨论当中,今天我们聊一聊,如果客户端需要获取服务端长耗时任务结果,有哪些实践方式

本篇博客将聚焦于4个方面:

  • 归纳获取服务端长耗时任务结果的常见实践方式
  • 对SSE进行详细的讨论,尤其是其协议细节
  • 实操:搭建SSE服务端、使用OKHttp搭建客户端
  • 探索:对Retrofit进行了分析,扩展Retrofit使用SSE

4种常见的实践方式

不外乎两种思路:

  • 客户端 polling
  • 服务端 push

4种方式.png

客户端polling

不难理解,服务端处理一项任务,只有服务端才清晰地知道任务的状态,客户端可以通过 周期性主动轮询 直至获取任务的最终结果。

按照此类实践方式,可使用 HTTP/HTTPs 协议。

short polling

客户端向服务端发起请求,如果服务端状态已经更新,返回结果并关闭链接;如果服务端状态尚未更新,返回一个特定的结果告知还在处理中,并关闭链接

long polling

short polling 类似,客户端向服务端发起请求,如果服务端状态已经更新,返回结果并关闭链接;

但如果服务端状态尚未更新,

  • 服务端会在超时时间内维持链接,进行等待,直至状态更新,返回结果并关闭链接;
  • 若等待至超时也未更新状态,返回一个特定的结果告知还在处理中,并关闭链接

服务端push

这类实践方式中,不需要客户端采取"问",而是当服务端状态更新时,自行通知客户端,典型的实践方式有:

  • Server Send Events (SSE)
  • WebSocket

SSE

SSE是本文的讨论核心,从服务端发往客户端的消息,存在一个限制,即仅可以发送 纯文本 类型的消息。

SSE基于http协议的持久连接,SSE 具有 W3C 标准化的网络协议和 EventSource 客户端接口,属于 HTML5 标准套件。

有兴趣的读者可以访问 扩展阅读 了解协议细节

WebSocket

定制性、扩展性最强的实践方案,有趣的话题实在太多,不做任何展开。

SSE协议细节

不难理解,SSE形如消息订阅,由客户端主动发起订阅,双方维持链接,服务端向客户端推送message。

Header主要部分

按照协议约定,MethodGET,并且需要在 Header 中包含以下内容:

  • Accept: text/event-stream 指定Media Type
  • Cache-Control: no-cache 不使用cache
  • Connection: keep-alive 使用持久性连接

以下是Header部分的示例:

GET /{path} HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

服务端的响应Header至少包含:

  • Content-Type: text/event-stream;charset=UTF-8 指定Media Type
  • Transfer-Encoding: chunked

以下是响应Header部分的示例:

HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked

Event-Stream 解析

event-stream的内容(结构和字符)需满足以下 ABNF 语法范式约定

stream        = [ bom ] *event
event         = *( comment / field ) end-of-line
comment       = colon *any-char end-of-line
field         = 1*name-char [ colon [ space ] *any-char ] end-of-line
end-of-line   = ( cr lf / cr / lf )

; characters
lf            = %x000A ; U+000A LINE FEED (LF)
cr            = %x000D ; U+000D CARRIAGE RETURN (CR)
space         = %x0020 ; U+0020 SPACE
colon         = %x003A ; U+003A COLON (:)
bom           = %xFEFF ; U+FEFF BYTE ORDER MARK
name-char     = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
any-char      = %x0000-0009 / %x000B-000C / %x000E-10FFFF
; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)

需要逐行读取内容并进行解析,按照语法约定:end-of-line = ( cr lf / cr / lf ),CRLF、CR、LF都代表换行

  • 如果是空行,后续是新Event,可通俗理解为事件之间用空行分隔
  • 如果是以 : 冒号,UniCode编码 U+003A 开始,忽略此行
  • 如果包含 : 冒号,UniCode编码 U+003A,从第一个冒号开始分割
    • 冒号前的内容为 field
    • 冒号后的内容为 value,如果冒号后的第一个字符为 空格 U+0020,从 value 中移除
  • 非空但不包含 :, 内容全部作为 field, value 为空字符串

field的约定:

  • "event":其value为 event type
  • "data":其value为事件数据,解析后单独再尾部拼接 LF
  • "id":若其value不包含 U+0000 即NULL,值为事件的ID,否则忽略
  • "retry":其value若只包含ASCII码表示的数值,认为是十进制数解析为Int,作为毫秒级的重连时间,当链接断开,客户端应当在此时间后自动发起重连,否则忽略
  • 其他情况均忽略

Last-Event-ID

前文已经提到了如下内容:

  • Header
  • Event-Stream中field为"id"、"retry"

不难理解,当出现意外情况时,例如网络原因导致的链接断开,涉及到重连;显然,重连时一般不希望再获取已经收到的信息。

Header中存在 Last-Event-ID field约定,指定上次收到的Event的id,因Event有序,可避免冗余信息传输。

纸上得来终觉浅

至此,协议部分的主要内容已经讨论完毕,让我们进入愉快的Demo环节。

编写服务端

对部分读者而言,这部分可能有点超纲,但没有关系,已经准备好了Demo代码,各位只需要准备好Java环境和路由器即可。

代码仓库

在Demo中,准备了两条接口:

  • http://{ip}:8080/sse/mvc/words , 模拟多次生成文本段落并推送至客户端
  • http://{ip}:8080/sse/mvc/folder-watch , 模拟文件夹监听

启动服务后,可使用控制台和curl进行测试:

curl -v http://localhost:8080/sse/mvc/words

编写客户端

很显然,一个非新事物往往不需要普通开发者造轮子,OkHttp 已经支持SSE

不同于大家熟知的 Call, Okhttp-SSE中封装了 EventSource,并通过 Factory 创建实例、发起请求:

一个朴素地使用示例如下:

class Demo {
    void demoCode() {

        OkHttpClient okHttpClient = new OkHttpClient();
        EventSource.Factory factory = EventSources.createFactory(okHttpClient);

        Request.Builder builder = new Request.Builder().get().url(url);

        builder.addHeader("Content-Type", "text/event-stream")
                .addHeader("Accept-Encoding", "")
                .addHeader("Accept", "text/event-stream")
                .addHeader("Cache-Control", "no-cache");

        builder.addHeader("Last-Event-ID", "2");

        Request request = builder.build();

        factory.newEventSource(request, new EventSourceListener() {
            //ignore
        });
    }
}

请注意,如果将其作为 Call,将无法获得多次推送的效果,在不出现错误或超时的情况下,形如一次响应较慢的Get请求。

demo代码仓库

是否可以使用Retrofit

截至目前为止,Retrofit未对其进行适配,其实在多年前即展开过讨论,我找到了此条 讨论issue

Retrofit的设计原理并不复杂:

  • 通过运行时反射,建立动态代理,依据注解构建Okhttp的Call
  • 通过CallAdapter,将Call的结果处理方式,进行不同的适配

但设计之初,仅设计了 Retrofit.CallOkhttp.Call 进行Wrap,而 EventSource 并不继承 Okhttp.Call,当然,依据其特性也不应当继承自Call

鉴于此,若将 EventSource 强行包装为 Retrofit.Call 将会很容易引起错误,例如调用

  • Response<T> execute() throws IOException;
  • void enqueue(Callback<T> callback);

若在Retrofit中平行展开一条 EventSource 的处理逻辑,需要对库进行很多修改,并且考虑对生态的影响,并不划算。

但是使用者自己的行为是不受限的,自己结合项目情况开发,自己对其负责。

动手写Retrofit扩展库

正如我上文所言,直接在Retrofit中进行扩展需要考虑对生态的影响,读者诸君可以详细阅读Jake Wharton 的post,其设计构思在考虑 "一致性" 问题。

但如果在Retrofit之外进行扩展,则可以回避生态问题,即便有非一致性设计,也可以不用直面。

代码仓库 需切换到 retrofit-sse 分支!

作者按:读者诸君请注意,在编写此文时,我尚未有充足的时间投入扩展库的设计与编写,代码库中的Sample代码,仅可作为指导,仍存在设计盲点,不要直接投入商用。

很容易得出以下核心设计思路,可看导图或下文的文字说明,较为啰嗦

扩展库.png

  • 将连续的Event内容转变为可观测的Source,例如Rxjava的观测源、LiveData、kotlin协程的Flow
    • 内建 EventSourceListener 实现类,从回调触发源的更新
    • 定义 EventSourceAdapter,用于将 Okhttp-EventSource 转变为 可观测的Source
  • 需要存在一种机制,依据Interface中Method定义的返回类型,获取 EventSourceAdapter 实例
    • 应当有Factory、Factory注册池,亦可以考虑直接使用实例、实例注册池
    • 需要利用反射
    • Factory注册池或Adapter实例注册池需能区分目标类型
  • 需要能够创建(完整功能应当为管理EventSource
    • 需可注册 EventSource.Factory 实例
  • 存在一条扩展路径,符合SSE时,走新设计;否则走原Retrofit设计
    • 增加入口,例如 RetrofitSSE,承载以上设计,并包含 Retrofit
    • 扩展 ServiceMethod<?> loadServiceMethod(Method method),新增SSE情况的执行路径
    • 扩展 ServiceMethod,内建SSE情况的处理逻辑,即承载设计1、2

简要的代码如下:

内建的EventSourceListener -- 对应1

内建选择了Flow,可自行扩展其他,通过回调实现源的更新

class FlowAdapterEventListener(
    val channel: Channel<Event>,
) : EventSourceListener() {

    private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

    override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
        super.onEvent(eventSource, id, type, data)
        scope.launch {
            channel.send(Event(id, type, data))
        }
    }

    override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) {
        super.onFailure(eventSource, t, response)
        scope.launch {
            channel.send(
                Event(null, null, "", t)
            )
            channel.close(t)
        }
    }
}

定义EventSourceAdapter -- 对应2

interface EventSourceAdapter<T> {
    fun adapt(request: Request, factory: EventSource.Factory): T

    abstract class Factory {
        abstract operator fun get(
            returnType: Type, annotations: Array<Annotation>, retrofitSSE: RetrofitSSE
        ): EventSourceAdapter<*>?
    }
}

内建EventSourceAdapter 和 Factory -- 对应2

这是一个示例,以内建选择的Flow情况为例,展示Factory能区分目标类型, Factory注册池遍历执行至非NULL返回时,认为得到目标类型的Factory

class FlowAdapter : EventSourceAdapter<Flow<Event>> {

        override fun adapt(request: Request, factory: EventSource.Factory): Flow<Event> {

            val channel = Channel<Event>()
            return channel.receiveAsFlow()
                .shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly)
                .onSubscription {
                    factory.newEventSource(
                        request, FlowAdapterEventListener(channel)
                    )
                }
        }

        companion object {

            val Factory = object : Factory() {
                override fun get(
                    returnType: Type,
                    annotations: Array<Annotation>,
                    retrofitSSE: RetrofitSSE
                ): EventSourceAdapter<*>? {
                    if (Flow::class.java.isAssignableFrom(getRawType(returnType))) {
                        return FlowAdapter()
                    }
                    return null
                }

            }
        }
    }

ServiceMethod扩展以及路径

新路径:

abstract class ServiceMethodV2<T> extends ServiceMethod<T> {

    static <T> ServiceMethod<T> parseAnnotationsV2(RetrofitSSE retrofit, Method method) {
        RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit.retrofit, method);

        Type returnType = method.getGenericReturnType();
        if (Utils.hasUnresolvableType(returnType)) {
            throw methodError(/*ignore*/);
        }
        if (returnType == void.class) {
            throw methodError(method, "Service methods cannot return void.");
        }

        return HttpServiceMethodV2.parseAnnotations(retrofit, method, requestFactory);
    }
}

扩展 ServiceMethod,内建SSE情况的处理逻辑,即承载设计1、2

详见:HttpServiceMethodV2,代码较多,不做展开,当满足SSE情况时,走新逻辑,否则回归至 HttpServiceMethod#parseAnnotations

新入口

  • 具备EventSource.Factory注册
  • 具备EventSourceAdapter.Factory池,以及识别目标的能力
  • 走新逻辑:ServiceMethodV2.parseAnnotationsV2(this, method)

public final class RetrofitSSE {
    private final Map<Method, ServiceMethod<?>> serviceMethodCache = new ConcurrentHashMap<>();

    final Retrofit retrofit;

    @NotNull
    final EventSource.Factory eventSourceFactory;

    final List<EventSourceAdapter.Factory> eventSourceAdapterFactories = new ArrayList<>();

    public RetrofitSSE(Retrofit retrofit, @NotNull EventSource.Factory eventSourceFactory) {
        this.retrofit = retrofit;
        this.eventSourceFactory = eventSourceFactory;
    }


    @SuppressWarnings("unchecked") // Single-interface proxy creation guarded by parameter safety.
    public <T> T create(final Class<T> service) {
        //ignore
    }

    public RetrofitSSE addEventSourceAdapterFactory(EventSourceAdapter.Factory factory) {
        eventSourceAdapterFactories.add(Objects.requireNonNull(factory, "factory == null"));
        return this;
    }

    ServiceMethod<?> loadServiceMethod(Method method) {
        ServiceMethod<?> result = serviceMethodCache.get(method);
        if (result != null) return result;

        synchronized (serviceMethodCache) {
            result = serviceMethodCache.get(method);
            if (result == null) {
                result = ServiceMethodV2.parseAnnotationsV2(this, method);
                serviceMethodCache.put(method, result);
            }
        }
        return result;
    }


    public EventSourceAdapter<?> eventSourceAdapter(Type returnType, Annotation[] annotations) {
        return nextEventSourceAdapter(null, returnType, annotations);
    }


    public EventSourceAdapter<?> nextEventSourceAdapter(
            @Nullable EventSourceAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
        Objects.requireNonNull(returnType, "returnType == null");
        Objects.requireNonNull(annotations, "annotations == null");

        int start = eventSourceAdapterFactories.indexOf(skipPast) + 1;
        for (int i = start, count = eventSourceAdapterFactories.size(); i < count; i++) {
            EventSourceAdapter<?> adapter = eventSourceAdapterFactories.get(i).get(returnType, annotations, this);
            if (adapter != null) {
                return adapter;
            }
        }

        throw new IllegalArgumentException(/*ignore*/);
    }
}

作者按:更多的代码细节,还请移步 代码仓库 需切换到 retrofit-sse 分支!

结语

又到了说再见的时候,这篇文章中,我们一同完成了:

  • 获取服务端长耗时任务结果的常见实践方式的归纳
  • 对SSE进行了详细的讨论,尤其是其协议细节
  • 搭建SSE服务端、使用OKHttp搭建客户端,进行实操
  • 对Retrofit进行了分析,探讨并发现了一条扩展路径,可通过Retrofit使用SSE

应当是一次有趣、好玩的历程!

在结尾,还是再次提醒,代码仓库 中Retrofit-SSE的代码,并未经过严谨的设计论证并进行充分的测试, 我仅花费了数小时时间实现博客内容所需的最少设计,势必会存在BUG和设计不健全的内容,不要直接商用,以免影响绩效

汇总链接如下:

SSE协议内容扩展阅读

ABNF语法RFC

服务端 SSE demo代码仓库

Retrofit关于SSE的讨论issue

Android代码仓库 注意分支