好玩系列 | 如果后端让你用SSE接收多次返回,那我们改造Retrofit吧
前言
说来惭愧,最近都没写技术帖,每月写一篇原创的Flag也倒了,倒并非无内容可写,实则是因为懒惰, 虽然工作和生活牵扯了很多精力,但仍是有碎片化时间的。一篇关于DataBinding非常有趣的文章还在编写中,今天我们先看一篇轻松点的内容。
随着ChatGpt等生成式AI大火,SSE(Server Send Events)又回到了技术讨论当中,今天我们聊一聊,如果客户端需要获取服务端长耗时任务结果,有哪些实践方式。
本篇博客将聚焦于4个方面:
- 归纳获取服务端长耗时任务结果的常见实践方式
- 对SSE进行详细的讨论,尤其是其协议细节
- 实操:搭建SSE服务端、使用OKHttp搭建客户端
- 探索:对Retrofit进行了分析,扩展Retrofit使用SSE
4种常见的实践方式
不外乎两种思路:
- 客户端 polling
- 服务端 push
客户端polling
不难理解,服务端处理一项任务,只有服务端才清晰地知道任务的状态,客户端可以通过 周期性主动轮询 直至获取任务的最终结果。
按照此类实践方式,可使用 HTTP/HTTPs
协议。
short polling
客户端向服务端发起请求,如果服务端状态已经更新,返回结果并关闭链接;如果服务端状态尚未更新,返回一个特定的结果告知还在处理中,并关闭链接
long polling
和 short polling
类似,客户端向服务端发起请求,如果服务端状态已经更新,返回结果并关闭链接;
但如果服务端状态尚未更新,
- 服务端会在超时时间内维持链接,进行等待,直至状态更新,返回结果并关闭链接;
- 若等待至超时也未更新状态,返回一个特定的结果告知还在处理中,并关闭链接
服务端push
这类实践方式中,不需要客户端采取"问",而是当服务端状态更新时,自行通知客户端,典型的实践方式有:
- Server Send Events (SSE)
- WebSocket
SSE
SSE是本文的讨论核心,从服务端发往客户端的消息,存在一个限制,即仅可以发送 纯文本
类型的消息。
SSE基于http协议的持久连接,SSE
具有 W3C 标准化的网络协议和 EventSource 客户端接口,属于 HTML5
标准套件。
有兴趣的读者可以访问 扩展阅读 了解协议细节
WebSocket
定制性、扩展性最强的实践方案,有趣的话题实在太多,不做任何展开。
SSE协议细节
不难理解,SSE形如消息订阅,由客户端主动发起订阅,双方维持链接,服务端向客户端推送message。
Header主要部分
按照协议约定,Method
为 GET
,并且需要在 Header
中包含以下内容:
- Accept: text/event-stream 指定Media Type
- Cache-Control: no-cache 不使用cache
- Connection: keep-alive 使用持久性连接
以下是Header部分的示例:
GET /{path} HTTP/1.1
Accept: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
服务端的响应Header至少包含:
- Content-Type: text/event-stream;charset=UTF-8 指定Media Type
- Transfer-Encoding: chunked
以下是响应Header部分的示例:
HTTP/1.1 200
Content-Type: text/event-stream;charset=UTF-8
Transfer-Encoding: chunked
Event-Stream 解析
event-stream的内容(结构和字符)需满足以下 ABNF 语法范式约定
stream = [ bom ] *event
event = *( comment / field ) end-of-line
comment = colon *any-char end-of-line
field = 1*name-char [ colon [ space ] *any-char ] end-of-line
end-of-line = ( cr lf / cr / lf )
; characters
lf = %x000A ; U+000A LINE FEED (LF)
cr = %x000D ; U+000D CARRIAGE RETURN (CR)
space = %x0020 ; U+0020 SPACE
colon = %x003A ; U+003A COLON (:)
bom = %xFEFF ; U+FEFF BYTE ORDER MARK
name-char = %x0000-0009 / %x000B-000C / %x000E-0039 / %x003B-10FFFF
; a scalar value other than U+000A LINE FEED (LF), U+000D CARRIAGE RETURN (CR), or U+003A COLON (:)
any-char = %x0000-0009 / %x000B-000C / %x000E-10FFFF
; a scalar value other than U+000A LINE FEED (LF) or U+000D CARRIAGE RETURN (CR)
需要逐行读取内容并进行解析,按照语法约定:end-of-line = ( cr lf / cr / lf )
,CRLF、CR、LF都代表换行
- 如果是空行,后续是新Event,可通俗理解为事件之间用空行分隔
- 如果是以
:
冒号,UniCode编码 U+003A 开始,忽略此行 - 如果包含
:
冒号,UniCode编码 U+003A,从第一个冒号开始分割- 冒号前的内容为
field
- 冒号后的内容为
value
,如果冒号后的第一个字符为空格
U+0020,从value
中移除
- 冒号前的内容为
- 非空但不包含
:
, 内容全部作为field
,value
为空字符串
field的约定:
- "event":其value为 event type
- "data":其value为事件数据,解析后单独再尾部拼接
LF
- "id":若其value不包含
U+0000 即NULL
,值为事件的ID,否则忽略 - "retry":其value若只包含ASCII码表示的数值,认为是十进制数解析为Int,作为毫秒级的重连时间,当链接断开,客户端应当在此时间后自动发起重连,否则忽略
- 其他情况均忽略
Last-Event-ID
前文已经提到了如下内容:
- Header
- Event-Stream中field为"id"、"retry"
不难理解,当出现意外情况时,例如网络原因导致的链接断开,涉及到重连;显然,重连时一般不希望再获取已经收到的信息。
Header中存在 Last-Event-ID
field约定,指定上次收到的Event的id,因Event有序,可避免冗余信息传输。
纸上得来终觉浅
至此,协议部分的主要内容已经讨论完毕,让我们进入愉快的Demo环节。
编写服务端
对部分读者而言,这部分可能有点超纲,但没有关系,已经准备好了Demo代码,各位只需要准备好Java环境和路由器即可。
在Demo中,准备了两条接口:
- http://{ip}:8080/sse/mvc/words , 模拟多次生成文本段落并推送至客户端
- http://{ip}:8080/sse/mvc/folder-watch , 模拟文件夹监听
启动服务后,可使用控制台和curl进行测试:
curl -v http://localhost:8080/sse/mvc/words
编写客户端
很显然,一个非新事物往往不需要普通开发者造轮子,OkHttp 已经支持SSE
不同于大家熟知的 Call
, Okhttp-SSE中封装了 EventSource
,并通过 Factory
创建实例、发起请求:
一个朴素地使用示例如下:
class Demo {
void demoCode() {
OkHttpClient okHttpClient = new OkHttpClient();
EventSource.Factory factory = EventSources.createFactory(okHttpClient);
Request.Builder builder = new Request.Builder().get().url(url);
builder.addHeader("Content-Type", "text/event-stream")
.addHeader("Accept-Encoding", "")
.addHeader("Accept", "text/event-stream")
.addHeader("Cache-Control", "no-cache");
builder.addHeader("Last-Event-ID", "2");
Request request = builder.build();
factory.newEventSource(request, new EventSourceListener() {
//ignore
});
}
}
请注意,如果将其作为 Call
,将无法获得多次推送的效果,在不出现错误或超时的情况下,形如一次响应较慢的Get请求。
是否可以使用Retrofit
截至目前为止,Retrofit未对其进行适配,其实在多年前即展开过讨论,我找到了此条 讨论issue
Retrofit的设计原理并不复杂:
- 通过运行时反射,建立动态代理,依据注解构建Okhttp的Call
- 通过CallAdapter,将Call的结果处理方式,进行不同的适配
但设计之初,仅设计了 Retrofit.Call
对 Okhttp.Call
进行Wrap,而 EventSource
并不继承 Okhttp.Call
,当然,依据其特性也不应当继承自Call
鉴于此,若将 EventSource
强行包装为 Retrofit.Call
将会很容易引起错误,例如调用
Response<T> execute() throws IOException;
void enqueue(Callback<T> callback);
若在Retrofit中平行展开一条 EventSource
的处理逻辑,需要对库进行很多修改,并且考虑对生态的影响,并不划算。
但是使用者自己的行为是不受限的,自己结合项目情况开发,自己对其负责。
动手写Retrofit扩展库
正如我上文所言,直接在Retrofit中进行扩展需要考虑对生态的影响,读者诸君可以详细阅读Jake Wharton 的post,其设计构思在考虑 "一致性" 问题。
但如果在Retrofit之外进行扩展,则可以回避生态问题,即便有非一致性设计,也可以不用直面。
代码仓库 需切换到 retrofit-sse
分支!
作者按:读者诸君请注意,在编写此文时,我尚未有充足的时间投入扩展库的设计与编写,代码库中的Sample代码,仅可作为指导,仍存在设计盲点,不要直接投入商用。
很容易得出以下核心设计思路,可看导图或下文的文字说明,较为啰嗦:
- 将连续的Event内容转变为可观测的Source,例如Rxjava的观测源、LiveData、kotlin协程的Flow
- 内建
EventSourceListener
实现类,从回调触发源的更新 - 定义
EventSourceAdapter
,用于将Okhttp-EventSource
转变为 可观测的Source
- 内建
- 需要存在一种机制,依据Interface中Method定义的返回类型,获取
EventSourceAdapter
实例- 应当有Factory、Factory注册池,亦可以考虑直接使用实例、实例注册池
- 需要利用反射
- Factory注册池或Adapter实例注册池需能区分目标类型
- 需要能够创建(完整功能应当为管理)
EventSource
- 需可注册
EventSource.Factory
实例
- 需可注册
- 存在一条扩展路径,符合SSE时,走新设计;否则走原Retrofit设计
- 增加入口,例如
RetrofitSSE
,承载以上设计,并包含Retrofit
- 扩展
ServiceMethod<?> loadServiceMethod(Method method)
,新增SSE情况的执行路径 - 扩展
ServiceMethod
,内建SSE情况的处理逻辑,即承载设计1、2
- 增加入口,例如
简要的代码如下:
内建的EventSourceListener -- 对应1
内建选择了Flow,可自行扩展其他,通过回调实现源的更新
class FlowAdapterEventListener(
val channel: Channel<Event>,
) : EventSourceListener() {
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
override fun onEvent(eventSource: EventSource, id: String?, type: String?, data: String) {
super.onEvent(eventSource, id, type, data)
scope.launch {
channel.send(Event(id, type, data))
}
}
override fun onFailure(eventSource: EventSource, t: Throwable?, response: Response?) {
super.onFailure(eventSource, t, response)
scope.launch {
channel.send(
Event(null, null, "", t)
)
channel.close(t)
}
}
}
定义EventSourceAdapter -- 对应2
interface EventSourceAdapter<T> {
fun adapt(request: Request, factory: EventSource.Factory): T
abstract class Factory {
abstract operator fun get(
returnType: Type, annotations: Array<Annotation>, retrofitSSE: RetrofitSSE
): EventSourceAdapter<*>?
}
}
内建EventSourceAdapter 和 Factory -- 对应2
这是一个示例,以内建选择的Flow情况为例,展示Factory能区分目标类型, Factory注册池遍历执行至非NULL返回时,认为得到目标类型的Factory
class FlowAdapter : EventSourceAdapter<Flow<Event>> {
override fun adapt(request: Request, factory: EventSource.Factory): Flow<Event> {
val channel = Channel<Event>()
return channel.receiveAsFlow()
.shareIn(CoroutineScope(Dispatchers.IO), SharingStarted.Eagerly)
.onSubscription {
factory.newEventSource(
request, FlowAdapterEventListener(channel)
)
}
}
companion object {
val Factory = object : Factory() {
override fun get(
returnType: Type,
annotations: Array<Annotation>,
retrofitSSE: RetrofitSSE
): EventSourceAdapter<*>? {
if (Flow::class.java.isAssignableFrom(getRawType(returnType))) {
return FlowAdapter()
}
return null
}
}
}
}
ServiceMethod扩展以及路径
新路径:
abstract class ServiceMethodV2<T> extends ServiceMethod<T> {
static <T> ServiceMethod<T> parseAnnotationsV2(RetrofitSSE retrofit, Method method) {
RequestFactory requestFactory = RequestFactory.parseAnnotations(retrofit.retrofit, method);
Type returnType = method.getGenericReturnType();
if (Utils.hasUnresolvableType(returnType)) {
throw methodError(/*ignore*/);
}
if (returnType == void.class) {
throw methodError(method, "Service methods cannot return void.");
}
return HttpServiceMethodV2.parseAnnotations(retrofit, method, requestFactory);
}
}
扩展
ServiceMethod
,内建SSE情况的处理逻辑,即承载设计1、2
详见:HttpServiceMethodV2
,代码较多,不做展开,当满足SSE情况时,走新逻辑,否则回归至 HttpServiceMethod#parseAnnotations
新入口
- 具备EventSource.Factory注册
- 具备EventSourceAdapter.Factory池,以及识别目标的能力
- 走新逻辑:
ServiceMethodV2.parseAnnotationsV2(this, method)
public final class RetrofitSSE {
private final Map<Method, ServiceMethod<?>> serviceMethodCache = new ConcurrentHashMap<>();
final Retrofit retrofit;
@NotNull
final EventSource.Factory eventSourceFactory;
final List<EventSourceAdapter.Factory> eventSourceAdapterFactories = new ArrayList<>();
public RetrofitSSE(Retrofit retrofit, @NotNull EventSource.Factory eventSourceFactory) {
this.retrofit = retrofit;
this.eventSourceFactory = eventSourceFactory;
}
@SuppressWarnings("unchecked") // Single-interface proxy creation guarded by parameter safety.
public <T> T create(final Class<T> service) {
//ignore
}
public RetrofitSSE addEventSourceAdapterFactory(EventSourceAdapter.Factory factory) {
eventSourceAdapterFactories.add(Objects.requireNonNull(factory, "factory == null"));
return this;
}
ServiceMethod<?> loadServiceMethod(Method method) {
ServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) return result;
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = ServiceMethodV2.parseAnnotationsV2(this, method);
serviceMethodCache.put(method, result);
}
}
return result;
}
public EventSourceAdapter<?> eventSourceAdapter(Type returnType, Annotation[] annotations) {
return nextEventSourceAdapter(null, returnType, annotations);
}
public EventSourceAdapter<?> nextEventSourceAdapter(
@Nullable EventSourceAdapter.Factory skipPast, Type returnType, Annotation[] annotations) {
Objects.requireNonNull(returnType, "returnType == null");
Objects.requireNonNull(annotations, "annotations == null");
int start = eventSourceAdapterFactories.indexOf(skipPast) + 1;
for (int i = start, count = eventSourceAdapterFactories.size(); i < count; i++) {
EventSourceAdapter<?> adapter = eventSourceAdapterFactories.get(i).get(returnType, annotations, this);
if (adapter != null) {
return adapter;
}
}
throw new IllegalArgumentException(/*ignore*/);
}
}
作者按:更多的代码细节,还请移步 代码仓库 需切换到 retrofit-sse
分支!
结语
又到了说再见的时候,这篇文章中,我们一同完成了:
- 获取服务端长耗时任务结果的常见实践方式的归纳
- 对SSE进行了详细的讨论,尤其是其协议细节
- 搭建SSE服务端、使用OKHttp搭建客户端,进行实操
- 对Retrofit进行了分析,探讨并发现了一条扩展路径,可通过Retrofit使用SSE
应当是一次有趣、好玩的历程!
在结尾,还是再次提醒,代码仓库 中Retrofit-SSE的代码,并未经过严谨的设计论证并进行充分的测试, 我仅花费了数小时时间实现博客内容所需的最少设计,势必会存在BUG和设计不健全的内容,不要直接商用,以免影响绩效。
汇总链接如下:
Android代码仓库 注意分支