RxSSE is a Retrofit call adapter factory that emits Server Sent Events streamed as RxJava observables.
A MessageProcessor
is an interface that you must implement. This class is responsible for converting the raw text data sent in an SSE data element into a concrete class. The data element may be json, xml, or any other structure you choose.
For example, a simple Gson message processor might be
class GsonMessageProcessor: MessageProcessor {
companion object {
private val gson = GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE)
.create()
}
override fun <T> processMessage(message: String, type: Type): T {
return gson.fromJson<T>(message, type)
}
}
val api = Retrofit.Builder()
.baseUrl(baseUrl)
.addCallAdapterFactory(RxJavaSseCallAdapterFactory(
scheduler = Schedulers.io(),
messageProcessor = MyGsonMessageProcessor()
))
.build()
@Headers("Content-Type:application/json", "Accept:text/event-stream", "Cache-Control:no-cache")
@POST("/example/thing")
@Streaming
@JvmSuppressWildcards
fun postSomethingNeat(
@Body details: PostDetails
) : Observable<NeatResponse>
api.postSomethingNeat(PostDetails(...))
.subscribe {
neatResponse ->
Log.d(TAG, "$neatResponse received onNext!")
}
RxSSE is hosted on jcenter. You can import it into your project by adding the following to your gradle dependencies:
implementation 'com.gasbuddy.mobile:rxsse:0.9'