Servlet-3.0 API allows to detach a request/response context and answer to it later.
However if I try to write a big amount of data, something like:
AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()
It may actually block - and it does block in trivial test cases - for both Tomcat 7 and Jetty 8. The tutorials recommend to create a thread pool that would handle such a setup - witch is generally the counter-positive to a traditional 10K architecture.
However if I have 10,000 open connections and a thread pool of let's say 10 threads, it is enough for even 1% of clients that have low speed connections or just blocked connection to block the thread pool and completely block the comet response or slow it down significantly.
The expected practice is to get "write-ready" notification or I/O completion notification and than continue to push the data.
How can this be done using Servlet-3.0 API, i.e. how do I get either:
If this is not supported by the Servlet-3.0 API, are there any Web Server specific APIs (like Jetty Continuation or Tomcat CometEvent) that allow to handle such events truly asynchronously without faking asynchronous I/O using thread pool.
Does anybody know?
And if this is not possible can you confirm it with a reference to documentation?
I had attached the code below that emulates event-stream.
Notes:
ServletOutputStream
that throws IOException
to detect disconnected clientskeep-alive
messages to make sure clients are still thereIn such an example I explicitly defined thread pool of size 1 to show the problem:
curl http://localhost:8080/path/to/app
(twice)curd -d m=message http://localhost:8080/path/to/app
curd -d m=message http://localhost:8080/path/to/app
I want to solve such a problem without using thread pool, because with 1000-5000 open connections I can exhaust the thread pool very fast.
The sample code below.
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;
import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;
@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {
private long id = 0;
private String message = "";
private final ThreadPoolExecutor pool =
new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
// it is explicitly small for demonstration purpose
private final Thread timer = new Thread(new Runnable() {
public void run()
{
try {
while(true) {
Thread.sleep(1000);
sendKeepAlive();
}
}
catch(InterruptedException e) {
// exit
}
}
});
class RunJob implements Runnable {
volatile long lastUpdate = System.nanoTime();
long id = 0;
AsyncContext ac;
RunJob(AsyncContext ac)
{
this.ac = ac;
}
public void keepAlive()
{
if(System.nanoTime() - lastUpdate > 1000000000L)
pool.submit(this);
}
String formatMessage(String msg)
{
StringBuilder sb = new StringBuilder();
sb.append("id");
sb.append(id);
for(int i=0;i<100000;i++) {
sb.append("data:");
sb.append(msg);
sb.append("\n");
}
sb.append("\n");
return sb.toString();
}
public void run()
{
String message = null;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id) {
this.id = HugeStreamWithThreads.this.id;
message = HugeStreamWithThreads.this.message;
}
}
if(message == null)
message = ":keep-alive\n\n";
else
message = formatMessage(message);
if(!sendMessage(message))
return;
boolean once_again = false;
synchronized(HugeStreamWithThreads.this) {
if(this.id != HugeStreamWithThreads.this.id)
once_again = true;
}
if(once_again)
pool.submit(this);
}
boolean sendMessage(String message)
{
try {
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(message);
out.flush();
lastUpdate = System.nanoTime();
return true;
}
catch(IOException e) {
ac.complete();
removeContext(this);
return false;
}
}
};
private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();
@Override
public void init(ServletConfig config) throws ServletException
{
super.init(config);
timer.start();
}
@Override
public void destroy()
{
for(;;){
try {
timer.interrupt();
timer.join();
break;
}
catch(InterruptedException e) {
continue;
}
}
pool.shutdown();
super.destroy();
}
protected synchronized void removeContext(RunJob ac)
{
asyncContexts.remove(ac);
}
// GET method is used to establish a stream connection
@Override
protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
// Content-Type header
response.setContentType("text/event-stream");
response.setCharacterEncoding("utf-8");
// Access-Control-Allow-Origin header
response.setHeader("Access-Control-Allow-Origin", "*");
final AsyncContext ac = request.startAsync();
ac.setTimeout(0);
RunJob job = new RunJob(ac);
asyncContexts.add(job);
if(id!=0) {
pool.submit(job);
}
}
private synchronized void sendKeepAlive()
{
for(RunJob job : asyncContexts) {
job.keepAlive();
}
}
// POST method is used to communicate with the server
@Override
protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException
{
request.setCharacterEncoding("utf-8");
id++;
message = request.getParameter("m");
for(RunJob job : asyncContexts) {
pool.submit(job);
}
}
}
The sample above uses threads to prevent blocking... However if the number of blocking clients is bigger than the size of the thread pool it would block.
How could it be implemented without blocking?
I've found the Servlet 3.0
Asynchronous
API tricky to implement correctly and helpful documentation to be sparse. After a lot of trial and error and trying many different approaches, I was able to find a robust solution that I've been very happy with. When I look at my code and compare it to yours, I notice one major difference that may help you with your particular problem. I use a ServletResponse
to write the data and not a ServletOutputStream
.
Here my go-to Asynchronous Servlet class adapted slightly for your some_big_data
case:
import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;
import org.apache.log4j.Logger;
@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {
private static final Logger logger = Logger.getLogger(AsyncServlet.class);
public static final int CALLBACK_TIMEOUT = 10000; // ms
/** executor service */
private ExecutorService exec;
@Override
public void init(ServletConfig config) throws ServletException {
super.init(config);
int size = Integer.parseInt(getInitParameter("threadpoolsize"));
exec = Executors.newFixedThreadPool(size);
}
@Override
public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {
final AsyncContext ctx = req.startAsync();
final HttpSession session = req.getSession();
// set the timeout
ctx.setTimeout(CALLBACK_TIMEOUT);
// attach listener to respond to lifecycle events of this AsyncContext
ctx.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
logger.info("onComplete called");
}
@Override
public void onTimeout(AsyncEvent event) throws IOException {
logger.info("onTimeout called");
}
@Override
public void onError(AsyncEvent event) throws IOException {
logger.info("onError called: " + event.toString());
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException {
logger.info("onStartAsync called");
}
});
enqueLongRunningTask(ctx, session);
}
/**
* if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
* <p/>
* if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
*/
private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {
exec.execute(new Runnable() {
@Override
public void run() {
String some_big_data = getSomeBigData();
try {
ServletResponse response = ctx.getResponse();
if (response != null) {
response.getWriter().write(some_big_data);
ctx.complete();
} else {
throw new IllegalStateException(); // this is caught below
}
} catch (IllegalStateException ex) {
logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
} catch (Exception e) {
logger.error("ERROR IN AsyncServlet", e);
}
}
});
}
/** destroy the executor */
@Override
public void destroy() {
exec.shutdown();
}
}