Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [jetty-dev] Reactive Streams

Simone,

my point is that you can't get recycling within just the standard RS APIs.  Ie you can't go get some 3rd party ByteBuffer compressing/encoding/encypting processor and expect it to be able to call your complete() method (or to take your PooledBuffer for that matter).

If you go outside the API or have control of the implementation of the Processors, then there are lots and lots of ways that you can recycle.

But the idea of the standard RS APIs is that you'll be able to plug into 3rd party libraries with them... so they really need some standard mechanism for acknowledgement or recycling or something.

cheers



On 3 June 2015 at 17:09, Simone Bordet <sbordet@xxxxxxxxxxx> wrote:
Hi,

On Wed, Jun 3, 2015 at 3:46 AM, Greg Wilkins <gregw@xxxxxxxxxxx> wrote:
>
> Simone,
>
> yep I'm understanding it better now.   If you work with the standard RS
> APIs,  you can be asynchronous and non-queuing, but only if you give up on
> recyclable items.    You can have recyclable items, but then have to give up
> either on asynchronous or non-queuing.   You can have all three: async,
> bounded memory & recycling.

No, I disagree :)
You can be recycling provided you pass the right T.

So far we have conflated 2 meanings to our Callback.succeeded():
1) We are done with this buffer, then can be recycled.
2) We are done with this buffer, then we can process another.

This is evident, for example in every implementation of
IteratingCallback: we are overriding IteratingCallback.succeeded() to
"clean up" things, and then we call super.succeeded().
The "clean up" typically involves recycling the buffer we have stored
during process().

With the RS APIs, we only have an explicit 2) above semantic, but not 1).

However:

class PooledBuffer {
    private final ByteBufferPool pool;
    private final ByteBuffer buffer;
    public void complete() { pool.release(buffer); }
}

void onNext(PooledBuffer pooled) {
    write(pooled.buffer, new Callback() {
        public void succeeded() {
            pooled.complete();
            subscription.request(1);
       }
    });
}

You can now argue that you have to allocate the per-PooledBuffer
Callback, but that is only easily resolvable by either having
PooledBuffer implement Callback or by having write() using
PooledBuffers.

Again, I don't think the RS APIs impose some inefficiency per-se.
It may perhaps impose some change to the types we use (for example
there is extensive use, in the current Jetty codebase, of a type  that
wraps a Buffer and a Callback) to be efficient, but I don't see any
blocker.

--
Simone Bordet
----
http://cometd.org
http://webtide.com
http://intalio.com
Developer advice, training, services and support
from the Jetty & CometD experts.
Intalio, the modern way to build business applications.



--
Greg Wilkins <gregw@xxxxxxxxxxx>  - an Intalio.com subsidiary
http://eclipse.org/jetty HTTP, SPDY, Websocket server and client that scales
http://www.webtide.com  advice and support for jetty and cometd.

Back to the top