[
Date Prev][
Date Next][
Thread Prev][
Thread Next][
Date Index][
Thread Index]
[
List Home]
[jetty-dev] SDPY client gets overwhelmed when subscribing to high volume incoming messages
|
Hello,
Please find below a sample subscriber and a publisher. The subscriber becomes unresponsive and stops subscribing to messages after the count crosses two hundred thousand. Could you please let me know what I am doing wrong,
OS: MAC
VMArgs: -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.KQueueSelectorProvider -Xbootclasspath/p:/Users/abhinavsunderrajan/.m2/repository/org/mortbay/jetty/npn/npn-boot/7.6.2.v20120308/npn-boot-7.6.2.v20120308.jar
************************ PUBLISHER *********************************************************************
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.spdy.SPDYServerConnector;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
public class TestServer {
/**
* Main method.
* @param args
* @throws Exception
*/
public static void main(final String[] args) throws Exception {
System.setProperty("Djavax.net.debug", "all");
startTLSServer();
}
private static void startTLSServer() throws Exception {
// Frame listener that handles the communication over speedy
ServerSessionFrameListener frameListener = new ServerSessionFrameListener.Adapter() {
/**
* As soon as we receive a syninfo we return the handler for the stream
* on this session
*/
@Override
public StreamFrameListener onSyn(final Stream stream,
final SynInfo synInfo) {
System.out.println("onSyn");
// Send a reply to this message
stream.reply(new ReplyInfo(false));
// and start a timer that sends a request to this stream every 5
// seconds
ScheduledExecutorService executor = Executors
.newSingleThreadScheduledExecutor();
Runnable periodicTask = new Runnable() {
private int i = 0;
public void run() {
stream.data(new StringDataInfo(System.currentTimeMillis()
+ ", some data from the server, " + i++, false));
}
};
executor.scheduleAtFixedRate(periodicTask, 0, 1,
TimeUnit.MICROSECONDS);
// Next create an adapter to further handle the client input from
// specific stream.
return new StreamFrameListener.Adapter() {
/**
* We're only interested in the data, not the headers in this
* example
*/
public void onData(final Stream stream, final DataInfo dataInfo) {
String clientData = dataInfo.asString("UTF-8", true);
System.out.println("Received the following client data: "
+ clientData);
}
};
}
};
// Wire up and start the connector
Server server = new Server();
SPDYServerConnector connector = new SPDYServerConnector(frameListener);
connector.setPort(5082);
server.addConnector(connector);
server.start();
server.join();
}
}
*********************** SPDY SUBSCRIBER***********************
import java.net.InetSocketAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.spdy.SPDYClient;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
/**
* Put <code>-Xbootclasspath/p:lib/spdy/npn-boot-7.6.2.v20120308.jar</code> as
* vmarg.
*/
public class TestClient {
/**
* Main method.
* @param args
* @throws Exception
*/
public static void main(final String[] args) throws Exception {
// create client
SPDYClient.Factory clientFactory = new SPDYClient.Factory();
clientFactory.start();
SPDYClient client = clientFactory.newSPDYClient(SPDY.V2);
// create a session to the server running on localhost port 8181
Future<Session> future = client.connect(new InetSocketAddress(
"localhost", 5082), null);
Session session = future.get(5, TimeUnit.SECONDS);
// this listener receives data from the server and prints it
StreamFrameListener streamListener = new StreamFrameListener.Adapter() {
public void onData(final Stream stream, final DataInfo dataInfo) {
// Data received from server
String content = dataInfo.asString("UTF-8", true);
System.out.println("SPDY content: " + content);
}
};
// Start a new session, and configure the stream listener
final Stream stream = session.syn(new SynInfo(false), streamListener)
.get(5, TimeUnit.SECONDS);
stream.data(new StringDataInfo("hello publisher!!", false));
}
}
Many thanks,
Abhinav