Vert.x Event loop - How is this asynchronous?

user378101 picture user378101 · Nov 25, 2015 · Viewed 9.3k times · Source

I'm playing around with Vert.x and quite new to the servers based on event loop as opposed to the thread/connection model.

public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568, result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }
  • I'm just trying to simulate a long running request handler to understand how this model works.
  • What I've observed is the so called event loop is blocked until my first request completes. Whatever little time it takes, subsequent request is not acted upon until the previous one completes.
  • Obviously I'm missing a piece here and that's the question that I have here.

Edited based on the answers so far:

  1. Isn't accepting all requests considered to be asynchronous? If a new connection can only be accepted when the previous one is cleared off, how is it async?
    • Assume a typical request takes anywhere between 100 ms to 1 sec (based on the kind and nature of the request). So it means, the event loop can't accept a new connection until the previous request finishes(even if its winds up in a second). And If I as a programmer have to think through all these and push such request handlers to a worker thread , then how does it differ from a thread/connection model?
    • I'm just trying to understand how is this model better from a traditional thread/conn server models? Assume there is no I/O op or all the I/O op are handled asynchronously? How does it even solve c10k problem, when it can't start all concurrent requests parallely and have to wait till the previous one terminates?
  2. Even if I decide to push all these operations to a worker thread(pooled), then I'm back to the same problem isn't it? Context switching between threads? Edits and topping this question for a bounty

    • Do not completely understand how this model is claimed to asynchronous.
    • Vert.x has an async JDBC client (Asyncronous is the keyword) which I tried to adapt with RXJava.
    • Here is a code sample (Relevant portions)

    server.requestStream().toObservable().subscribe(req -> {

            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
            jdbc.getConnectionObservable().subscribe(
                    conn -> {
    
                        // Now chain some statements using flatmap composition
                        Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                        // Subscribe to the final result
                        resa.subscribe(resultSet -> {
    
                            req.response().end(resultSet.getRows().toString());
                            System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                        }, err -> {
                            System.out.println("Database problem");
                            err.printStackTrace();
                        });
                    },
    
                    // Could not connect
                    err -> {
                        err.printStackTrace();
                    }
                    );
    
    });
    server.listen(4568);
    
    • The select query there takes 3 seconds approx to return the complete table dump.
    • When I fire concurrent requests(tried with just 2), I see that the second request completely waits for the first one to complete.
    • If the JDBC select is asynchronous, Isn't it a fair expectation to have the framework handle the second connection while it waits for the select query to return anything.?

Answer

Dmitry Spikhalskiy picture Dmitry Spikhalskiy · Dec 2, 2015

Vert.x event loop is, in fact, a classical event loop existing on many platforms. And of course, most explanations and docs could be found for Node.js, as it's the most popular framework based on this architecture pattern. Take a look at one more or less good explanation of mechanics under Node.js event loop. Vert.x tutorial has fine explanation between "Don’t call us, we’ll call you" and "Verticles" too.

Edit for your updates:

First of all, when you are working with an event loop, the main thread should work very quickly for all requests. You shouldn't do any long job in this loop. And of course, you shouldn't wait for a response to your call to the database. - Schedule a call asynchronously - Assign a callback (handler) to result - Callback will be executed in the worker thread, not event loop thread. This callback, for example, will return a response to the socket. So, your operations in the event loop should just schedule all asynchronous operations with callbacks and go to the next request without awaiting any results.

Assume a typical request takes anywhere between 100 ms to 1 sec (based on the kind and nature of the request).

In that case, your request has some computation expensive parts or access to IO - your code in the event loop shouldn't wait for the result of these operations.

I'm just trying to understand how is this model better from a traditional thread/conn server models? Assume there is no I/O op or all the I/O op are handled asynchronously?

When you have too many concurrent requests and a traditional programming model, you will make thread per each request. What this thread will do? They will be mostly waiting for IO operations (for example, result from database). It's a waste of resources. In our event loop model, you have one main thread that schedule operations and preallocated amount of worker threads for long tasks. + None of these workers actually wait for the response, they just can execute another code while waiting for IO result (it can be implemented as callbacks or periodical checking status of IO jobs currently in progress). I would recommend you go through Java NIO and Java NIO 2 to understand how this async IO can be actually implemented inside the framework. Green threads is a very related concept too, that would be good to understand. Green threads and coroutines are a type of shadowed event loop, that trying to achieve the same thing - fewer threads because we can reuse system thread while green thread waiting for something.

How does it even solve c10k problem, when it can't start all concurrent requests parallel and have to wait till the previous one terminates?

For sure we don't wait in the main thread for sending the response for the previous request. Get request, schedule long/IO tasks execution, next request.

Even if I decide to push all these operations to a worker thread(pooled), then I'm back to the same problem isn't it? Context switching between threads?

If you make everything right - no. Even more, you will get good data locality and execution flow prediction. One CPU core will execute your short event loop and schedule async work without context switching and nothing more. Other cores make a call to the database and return response and only this. Switching between callbacks or checking different channels for IO status doesn't actually require any system thread's context switching - it's actually working in one worker thread. So, we have one worker thread per core and this one system thread await/checks results availability from multiple connections to database for example. Revisit Java NIO concept to understand how it can work this way. (Classical example for NIO - proxy-server that can accept many parallel connections (thousands), proxy requests to some other remote servers, listen to responses and send responses back to clients and all of this using one or two threads)

About your code, I made a sample project for you to demonstrate that everything works as expected:

public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
                .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class", "org.hsqldb.jdbcDriver")
                .put("max_pool_size", 30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key, name varchar(255))", create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080, result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),
        context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080, "localhost", "/",
                            response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}

Output:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned

So, we accept a request - schedule a request to the database, go to the next request, we consume all of them and send a response for each request only when everything is done with the database.

About your code sample I see two possible issues - first, it looks like you don't close() connection, which is important to return it to pool. Second, how your pool is configured? If there is only one free connection - these requests will serialize waiting for this connection.

I recommend you to add some printing of a timestamp for both requests to find a place where you serialize. You have something that makes the calls in the event loop to be blocking. Or... check that you send requests in parallel in your test. Not next after getting a response after previous.