Tag: java

  • If Virtual Threads are the solution, what is the problem?

    Java’s Virtual Threads (aka Project Loom or JEP 444) have arrived as a full platform feature in Java 21, which has generated considerable interest and many projects (including Eclipse Jetty) are adding support.

    I have previously been somewhat skeptical about how significant any advantages Virtual Threads actually have over Platform Threads (aka Native Threads). I’ve also pointed out that cheap Threads can do expensive things, so that using Virtual Threads may not be a universal panacea for concurrent programming.

    However, even with those doubts, it is clear that Virtual Threads do have advantages in memory utilization and speed of startup. In this blog we look at what kinds of applications may benefit from those advantages.

    In short we investigate what scalability problems are Virtual Threads the solution for.

    Axioms

    Firstly let’s agree on what is accepted about Virtual Thread usage:

    • Writing asynchronous code is extraordinary difficult. “Yeah I know” you say… yeah but no, it is harder than that! Avoiding the need to write application logic in asynchronous style is key to improving the quality and stability of an application. This blog is not generally advocating you write your applications in an asynchronous style.
    • Virtual Threads are very cheap to create. From a performance perspective there is no reason to pool already started Virtual Threads and such pools are considered an anti pattern. If a Virtual Thread is needed, then just create a new one.
    • Virtual Threads use less memory. This is accepted, but with some significant caveats. Specifically the memory saving is achieved because Virtual Threads only allocate stack memory as needed, whilst Platform Threads provision stack size based on a worst case maximal usage. This is not exactly an apples vs oranges comparison.

    If some are good, are more even better?

    Consider a blocking style application is running on a traditional application server that is not scaling sufficiently. On inspection you see that all the Threads in the pool (default size 200) are allocated and that there are no Threads available to do more work!

    Would making more Threads available be the solution to this scalability problem? Perhaps 2000 Platform Threads will help? Still slow? Let’s try 10,000 Platform Threads! Running out of memory? Then perhaps unlimited Virtual Threads will solve the scalability problems?

    What if on further inspection it is found that the pool Threads are mostly blocked waiting for a JDBC Database connection from the JDBC Connection Pool (default size 8) and that as a result the Thread pool is exhausted.

    If every request needs the database, then any additional Threads will all just block on the same JDBC pool, thus more Threads will not make a more Scalable solution.

    Alternatively, if only some requests need to use the database, then having more Threads would allow request that do not need the database to proceed to completion. However, a fraction of requests would still end up blocked on the JBDC pool. Thus any limited Platform Thread pool could still become exhausted.

    With unlimited Virtual Threads there is no effective limit on the number of Threads, so non database requests could always continue, but the queue of Threads waiting on JBDC would also be unlimited as would the total of any resources held by those Threads whilst waiting. Thus the application would only scale for some types of request, whilst giving JDBC dependent requests the same poor Quality of Service as before.

    Finite Resources

    If an application’s scalability is constrained by access to a finite resource, then it is unlikely that “more Threads” is the solution to any scalability problems. Just like you can’t solve traffic by adding cars to a congested road, adding Threads to an already busy server may make things worse.

    Some common examples of finite resources that applications can encounter are:

    • CPU: If the server CPU is near 100% utilization, then the existing number of Threads are sufficient to keep it fully loaded. More and/or faster CPUs are needed before any increase in Threads could be beneficial.
    • Database: Many database technologies cannot handle many concurrent requests, so parallelism is restricted. If the bottleneck is the database, then it needs to be re-engineered rather than laid siege to by more concurrent Threads.
    • Local Network: An application may block reading or writing data because it has reached the limit on the local network. In such cases, more Threads will not increase throughput, but they might improve latency if some threads can progress reading new requests and have responses ready to write once network becomes less congested. However there is a cost in waiting (see below).
    • Locks: Parallel applications often use some form of lock or mutual exclusion to serialize access to common data structures. Contention on those locks can limit parallelism and require redesign rather than just more Threads.
    • Caches: CPU, memory, file system and object caches are key tools in speeding up execution. However, if too many different tasks are executed concurrently, the capacity of these caches to hold relevant data may be exceeded and execution with a cold cache can be very slow. Sometimes it is better to do less things concurrently and serialize the excess so that caches can be more effective that trying to do everything at once.

    If an application’s lack of scalability is due to Threads waiting for finite resources, then any additional Threads (Platform or Virtual) are unlikely to help and may make your application less stable. At best, careful redesign is needed before Thread counts can be increased in any advantageous way.

    Infinite (OK Scalable) Resources

    Not all resources are finite and some can be considered infinite, at least for some purposes. But let’s call them “Scalable” rather than infinite. Examples of scalable resources that an application may block on include:

    • Database: Not all databases are created equal and some types of database have scalability in excess of the request rates experienced by a server. However, such scalability often comes at a latency cost as the database may be remote and/or distributed, thus applications may block waiting for the database, even if it has capacity to handle more requests in parallel.
    • Micro services: A scalable database is really just a specific example of a micro service that may be provided by a remote and/or distributed system that has effectively infinite capacity at the cost of some latency. Applications can often find themselves waiting on one or more such services.
    • Remote Networks: Local data center networks are often very VERY fast and in many situations they can outstrip even the combined capacity of many client systems. An application sending/receiving larger content may may block writing/reading them due to a slow client, but still have enough local network capacity to communicate with many other clients in parallel.
    • Local Filesystems: Typically file systems are faster than networks, but slower than CPU. They also may have significant latency vs throughput tradeoffs (less so now that drives seldom need to spin physical disks). Thus Threads may block on local IO even though there is additional capacity available.

    Applications that lack scalability due to Threads waiting for such scalable resources may benefit from more Threads. Whilst some Threads are waiting for the a database, micro service, slow client network or file system, it is likely that other Threads can progress even if they need to access the same types of resources.

    Platform Threads pools can easily be increased to many 1000’s or more before typical servers will have memory issues. If scalability is needed beyond that, then Virtual Threads can offer practically unlimited additional Thread, but see the caveats below.

    Furthermore, the fast starting Virtual Threads can be of significant benefit in situations where small jobs with long latency can be carried out in parallel. Consider an application that processes request using data from several micro services, each with some access latency. If these are done serially, then the total request latency is the summation of all. Sometimes asynchronous code is used to execute micro service request in parallel, but spinning up a couple of Virtual Threads in this situation is simpler, less error prone and applicable to more APIs.

    Too Much of a Good Thing?

    There is also some concern with low latency scalable resources that seldom block with Virtual Threads. Since Virtual Threads are not preempted, there can be starvation and/or fairness problems if they are not blocked by slow resources. This is probably a good problem to have, but will need some management on extreme scales for some applications.

    The Cost of Waiting

    We have identified that there are indeed scalable resources on which an application may wait with many Threads. However, there is no such thing as a free lunch and waiting Threads may have a significant cost, even if they are Virtual. Specifically how/where an application waits can greatly affect resource usage.

    Consider a traditional application server with a limited Thread pool that is running near capacity, but with additional demand. While the 200 odd Threads are busy handling 200 concurrent request, there are additional request waiting to be handled. However, in an asynchronous server like Jetty, those additional requests can be cheaply parked and may be represented just be a single set bit in a selector or perhaps a tiny entry in a queue that holds only a reference to a connection that is ready to be read.

    Now consider if requests were serviced by Virtual Threads instead of waiting for a pooled Platform Thread to become available. Pending requests would be allowed to proceed to some blocking point in the application. Waiting like this within the application can have additional expenses including:

    • An input buffer will be allocated to read the request and any content it has.
    • A read is performed into the input buffer, thus removing network back pressure so a client is enabled to send more request/data even if the server is unable to handle them.
    • An object representation of the request will be built, containing at least the meta data and frequently some application data if there is an XML or JSON payload
    • Sessions may be activated and brought into memory from caches or passivation stores.
    • The allocated Thread runs deep inside the application code, potentially reaching near maximal stack depth.
    • Application objects created on the heap are held in memory with references from the stack.
    • An output buffer may be allocated, along with additional character conversion resources.

    When request handling blocks within the application, all these additional resources may be allocated and held during that wait. Worse still, because of the lack of back pressure, a client may send more request/data resulting in more Threads and associated resources being allocated and also being held whilst the application waits for some resource.

    Provisioning for the Worst Case

    We have seen that there are indeed applications that may benefit from having additional Threads available to service requests. But we have also seen that such additional Threads may incur additional costs beyond just the stack size. Waiting/Blocking within an application will typically be done with a deep stack and other resources allocated. Whilst Virtual Threads might be effectively infinite, it is unlikely that these other required resources are equally scalable.

    When an application experiences a worst case peak in load, then ultimately some resource will run out. To provide good Quality of Service, it is vital that such resource exhaustion is handled gracefully, allowing some request handling to continue rather than suffering catastrophic failure.

    With traditional Platform Thread based pools, stack memory is already provisioned for worst case stacks for all Threads and the thread pool sized limit is also an indirect limit on the number of concurrent resources used. Threads have sufficient resources available to complete there handling whilst any excess requests suffer latency whilst waiting cheaply for an available Thread. Furthermore, the back pressure resulting from not reading all offered requests can prevent additional load from sent by the clients. Thread limits are imperfect resource limits, but at least they are some kind of limit that can provide some graceful degradation under load.

    Alternatively, an application using Virtual Threads that has no explicit resource management will be likely to exhaust some of the resources used by those Threads. This can result in an OutOfMemoryException or similar, as the unlimited Virtual Threads each allocate deep stacks and other resources needed for request handling. The cost of average memory savings may be insufficient provisioning for the worst case resulting in catastrophic failure rather than graceful degradation. An analogy is that building more roads can actually make traffic worse if the added cars overwhelm other infrastructure.

    Many applications are written without explicit resource limitations/management. Instead they rely on the imperfect Thread pool for at least some minimal protection. If that is removed, then some form of explicit resource limitation/management is likely to be needed in its place. Stable servers need to be provisioned for the worst case, not the average one.

    Conclusion

    There are applications that can scale better if more Threads are available, but it is not all applications (at least not without significant redesign). Consideration needs to be given to what will limit the worst case load for a server/application if it is not to be Threads. Specifically, the costs of waiting within the application may be such that scalability is likely to have a limit that will not be enforced by practically infinite Virtual Threads.

    It may be that resources have limitations well within the capacity of large but limited Platform Thread pools, which are perfectly capable of scaling to many thousands of threads. So experiments with scaling a Platform Thread pool should first be used to see what limits do apply to an application.

    If no upper limit is found before Platform Threads exhaust kernel memory, then Virtual Threads will allow scaling beyond that limit until some other limit is found. Thus the ultimate resource limit will need to be explicitly managed if catastrophic failure is to be avoided (but, to be fair, applications using Thread pools should also do some explicit resource limit management rather than rely just on the course limits of a Thread pool).

    Recommendation

    If Virtual Threads are not the general solution to scalability then what is? There is no one-size-fits-all solution, but I believe many applications that are limited by blocking on the network would benefit from being deployed in a server like Eclipse Jetty, that can do much of the handling for them asynchronously. Let Jetty read your requests asynchronously and prepare the content as parsed JSON, XML, or form data. Only then allocate a Thread (Virtual or Platform) with a large output buffer so the application can be written in blocking style, but will not block on either reading the request or writing the response. Finally, once the response is prepared, then let Jetty flush it to the network asynchronously. Jetty has always somewhat supported this model (e.g. by delaying dispatch to a Servlet until the first packet of data arrives), but with Jetty-12 we are adding more mechanisms to asynchronously prepare requests and flush responses, whilst leaving the application written in blocking style. More to come on this in future blogs!

  • Security Audit with Trail of Bits

    Several months ago, the Eclipse Foundation approached the Eclipse Jetty project with the offer of a security audit. The effort was being supported through a collaboration with the Open Source Technology Improvement Fund (OSTIF), with the actual funding coming from the Alpha-Omega Project.

    Upon reflection, this collaboration could not have come at a better time for the Jetty open-source project. Completing this security audit before the first release of Jetty 12 was serendipitous. While the collaboration results with Trail of Bits are just now being published, the work has primarily been completed for a couple of months. 

    When we started this audit effort, Jetty 12 was quickly shaping up to be one of the most exciting releases we have ever worked on in the history of Jetty. Support for protocols like HTTP/3 further refined the internals of Jetty to be a modern, scalable network component. Coupling that with a refactoring of the internals to remove strict dependency on Servlet Request and Response objects, the core of Jetty became a more general server component with scalable and performant applications being able to be developed directly in Jetty without the strict requirement for Servlets. This ultimately allowed us to add a new Environment concept that supports multiple versions of the Servlet API on the same Jetty server simultaneously; mixing javax.servlet and jakarta.servlet on the same server allows for many exciting options for our users.

    However, these changes and exciting new features mean that quite a bit has changed, and when many moving parts are evolving, there is always a risk of unwanted behaviors.

    Our committers had low expectations of what this engagement would lead to, as our previous experience with various code analysis tooling often resulted in too many false positives.  Part of the prep work to start this review required us to draw an appropriate-sized box around the Jetty project code where we felt a review was most warranted. It should come as no surprise that much of this code is some of the more nuanced and complex with Jetty. So, throwing caution to the wind, we prepared and submitted the paperwork.

    We could not have been more pleased with how the engagement proceeded from here. Trail of Bits was chosen as the company to perform the review, and it met and exceeded our expectations by far. Sitting down with their engineers, it was apparent they were excited to be working on an open-source project of Jetty’s maturity, and when their work was completed, they demonstrated a much more complete understanding of the reviewed code than the Jetty team expected.

    Ultimately, we could not have been happier with how this effort was executed. The Eclipse Jetty project members are very thankful to the Eclipse Foundation, OSTIF, and Trail of Bits for making this collaboration a resounding success!

    Other References

  • Jetty 12 – Virtual Threads Support

    Executive Summary

    Virtual Threads, introduced in Java 19, are supported in Jetty 12, as they have been in Jetty 10 and Jetty 11 since 10.0.12 and 11.0.12, respectively.

    When virtual threads are supported by the JVM and enabled in Jetty (see embedded usage and standalone usage), applications are invoked using a virtual thread, which allows them to use simple blocking APIs, but with the scalability benefits of virtual threads.

    Introduction

    Virtual threads were introduced as a preview feature in Java 19 via JEP 425 and in Java 20 via JEP 436, and finally integrated as an official feature in Java 21 via JEP 444.

    Historically, the APIs provided to application developers, especially web application developers, were blocking APIs based on InputStream and OutputStream, or based on JDBC.
    These APIs are very simple to use, so applications are simple to develop, understand and troubleshoot.

    However, these APIs come with a cost: when a thread blocks, typically waiting for I/O or a contended lock, all the resources associated with that thread are retained, waiting for the thread to unblock and continue processing: the native thread and its native memory are retained, as well as network buffers, lock structures, etc.

    This means that blocking APIs are less scalable because they retain the resources they use.
    For example, if you have configured your server thread pool with 256 threads, and all are blocked, your server cannot process other requests until one of the blocked threads unblocks, limiting the server’s scalability.

    Furthermore, if you increase the server thread pool capacity, you will use more memory and likely require bigger hardware.

    Asynchronous/Reactive

    For these reasons, non-blocking asynchronous and reactive APIs have been introduced. The primary examples are the asynchronous I/O APIs introduced in Servlet 3.1 and reactive APIs provided by libraries such as RxJava and Spring’s Project Reactor, based on Reactive Streams.
    Unfortunately, REST APIs such as JAX-RS or Jakarta RESTful Web Services have not been (fully) updated with non-blocking APIs, so web applications that use REST are stuck with blocking APIs and scalability problems.

    Essential to note is that asynchronous and reactive APIs are more difficult to use, understand, and troubleshoot than blocking APIs, but are more scalable and typically achieve similar performances at a fraction of the resources. We have seen web applications that, when switched from blocking APIs to non-blocking APIs, reduced the threads usage from 1000+ to 10+.

    Virtual threads aim to be the best of both worlds: simple-to-use blocking APIs for developers, with the scalability of non-blocking APIs provided by the JVM.

    Jetty 12 Architecture

    The Jetty 12 architecture, at its core, is completely non-blocking and uses an AdaptiveExecutionStrategy (formerly known as “eat what you kill” which was covered in previous blogs here and here) to determine how to consume tasks.

    The key feature of AdaptiveExecutionStrategy is that it has a strong preference for consuming tasks in the same thread that produces them, so they are executed with a hot CPU cache, without parallel slowdown and no context-switch latency, yet avoids the risk of the server exhausting its thread pool.

    Simplifying a bit, each task is marked either as blocking or non-blocking; AdaptiveExecutionStrategy looks at the task and at how many threads are available to decide how to consume the task.

    If the task is non-blocking, the current thread runs it immediately.
    Otherwise, if no other threads are available to continue producing tasks, the current thread takes over the production of tasks and gives the tasks to an Executor, where they are likely queued and executed later by different threads.

    Virtual Threads Integration

    This architecture made it easy to integrate virtual threads in Jetty: when virtual threads are supported by the JVM and Jetty’s virtual threads support is enabled (see embedded usage and standalone usage), AdaptiveExecutionStrategy consumes a blocking task by offering the task to the virtual thread Executor rather than the native thread Executor, so that a newly spawned virtual thread runs the blocking task.

    That’s it.

    As a Servlet Container implementation, Jetty calls Servlets assuming they will use blocking APIs, so the task that invokes the Servlet is a blocking task.
    When virtual threads are supported and enabled, the thread that calls Servlet Filters and eventually the HttpServlet.service(...) method is a virtual thread.

    For non-blocking tasks, it is more efficient to have them run by the same native thread that created them; it is only for blocking tasks that you may want to use virtual threads.

    Conclusions

    Jetty’s AdaptiveExecutionStrategy allows the best of all worlds.
    Jetty provides a fast scalable asynchronous implementation, which avoids any possible limitations of virtual threads, whilst giving applications the full benefits of virtual threads. 

    Jetty deals with complex asynchronous concerns, so you don’t have to!

  • Jetty, ALPN & Java 8u252

    Introduction

    The Jetty Project provided to the Java community support for NPN first (the precursor of ALPN) in Java 7, and then support for ALPN in Java 8.
    The ALPN support was implemented by modifying sun.security.ssl classes, and this required that the modified classes were prepended to the bootclasspath, so the command line would like like this:

    java -Xbootclasspath/p:/path/to/alpn-boot-8.1.13.v20181017.jar ...

    However, every different OpenJDK version could require a different version of the alpn-boot jar. ALPN support is there, but cumbersome to use.
    Things improved a bit with the Jetty ALPN agent. The agent could detect the Java version and redefine on-the-fly the sun.security.ssl classes (using the alpn-boot jars). However, a new OpenJDK version could generate a new alpn-boot jar, which in turn generates a new agent version, and therefore for every new OpenJDK version applications needed to verify whether the agent version needed to be updated. A bit less cumbersome, but still another hoop to jump through.

    OpenJDK ALPN APIs in Java 9

    The Jetty Project worked with the OpenJDK project to introduce proper ALPN APIs, and this was introduced in Java 9 with JEP 244.

    ALPN APIs backported to Java 8u252

    On April 14th 2020, OpenJDK 8u252 released (the Oracle version is named 8u251, and it may be equivalent to 8u252, but there is no source code available to confirm this; we will be referring only to the OpenJDK version in the following sections).
    OpenJDK 8u252 contains the ALPN APIs backported from Java 9.
    This means that for OpenJDK 8u252 the alpn-boot jar is no longer necessary, because it’s no longer necessary to modify sun.security.ssl classes as now the official OpenJDK ALPN APIs can be used instead of the Jetty ALPN APIs.
    The Jetty ALPN agent version 2.0.10 does not perform class redefinition if it detects that the OpenJDK version is 8u252 or later. It can still be left in the command line (although we recommend not to), but will do nothing.

    Changes for libraries/applications directly using Jetty ALPN APIs

    Libraries or applications that are directly using the Jetty ALPN APIs (the org.eclipse.jetty.alpn.ALPN and related classes) should now be modified to detect whether the backported OpenJDK ALPN APIs are available. If the backported OpenJDK ALPN APIs are available, libraries or applications must use them; otherwise they must fall back to use the Jetty ALPN APIs (like they were doing in the past).
    For example, the Jetty project is using the Jetty ALPN APIs in artifact jetty-alpn-openjdk8-[client|server]. The classes in that Jetty artifact have been changed like described above, starting from Jetty version 9.4.28.v20200408.

    Changes for applications indirectly using ALPN

    Applications that are using Java 8 and that depend on libraries that provide ALPN support (such as the jetty-alpn-openjdk8-[client|server] artifact described above) must modify the way they are started.
    For an application that is still using an OpenJDK version prior to 8u252, the typical command line requires the alpn-boot jar in the bootclasspath and a library that uses the Jetty ALPN APIs (here as an example, jetty-alpn-openjdk8-server) in the classpath:

    /opt/openjdk-8u242/bin/java -Xbootclasspath/p:/path/to/alpn-boot-8.1.13.v20181017.jar -classpath jetty-alpn-openjdk8-server-9.4.27.v20200227:...

    For the same application that wants to use OpenJDK 8u252 or later, the command line becomes:

    /opt/openjdk-8u252/bin/java -classpath jetty-alpn-openjdk8-server-9.4.28.v20200408:...

    That is, the -Xbootclasspath option must be removed and the library must be upgraded to a version that supports the backported OpenJDK ALPN APIs.
    Alternatively, applications can switch to use the Jetty ALPN agent as described below.

    Frequently Asked Questions

    Was the ALPN APIs backport a good idea?

    Yes, because Java vendors are planning to support Java 8 until (at least) 2030 and this would have required to provide alpn-boot for another 10 years.
    Now instead, applications and libraries will be able to use the official OpenJDK ALPN APIs provided by Java for as long as Java 8 is supported.

    Can I use OpenJDK 8u252 with an old version of Jetty?

    Yes, if your application does not need ALPN (in particular, it does not need artifacts jetty-alpn-openjdk8-[client|server]).
    No, if your application needs ALPN (in particular, it needs artifacts jetty-alpn-openjdk8-[client|server]). You must upgrade to Jetty 9.4.28.v20200402 or later.

    Can I use OpenJDK 8u252 with library X, which depends on the Jetty ALPN APIs?

    You must verify that library X has been updated to OpenJDK 8u252.
    In practice, library X must detect whether the backported OpenJDK ALPN APIs are available; if so, it must use the backported OpenJDK ALPN APIs; otherwise, it must use the Jetty ALPN APIs.

    I don’t know what OpenJDK version my application will be run with!

    You need to make a version of your application that works with OpenJDK 8u252 and earlier, see the previous question.
    For example, if you depend on Jetty, you need to depend on jetty-alpn-openjdk8-[client|server]-9.4.28.v20200402 or later.
    Then you must add the Jetty ALPN agent to the command line, since it supports all Java 8 versions.

    /opt/openjdk-8u242/bin/java -javaagent:/path/to/jetty-alpn-agent-2.0.10.jar -classpath jetty-alpn-openjdk8-server-9.4.28.v20200408:...
    /opt/openjdk-8u252/bin/java -javaagent:/path/to/jetty-alpn-agent-2.0.10.jar -classpath jetty-alpn-openjdk8-server-9.4.28.v20200408:...

    The command lines are identical apart the Java version.
    In this way, if your application is run with with OpenJDK 8u242 or earlier, the Jetty ALPN agent will apply the sun.security.ssl class redefinitions, and jetty-alpn-openjdk8-[client|server]-9.4.28.v20200402 will use the Jetty ALPN APIs.
    Otherwise your application is run with OpenJDK 8u252 or later, the Jetty ALPN agent will do nothing (no class redefinition is necessary), and the jetty-alpn-openjdk8-[client|server]-9.4.28.v20200402 will use the OpenJDK ALPN APIs.

  • Eat What You Kill without Starvation!

    Jetty 9 introduced the Eat-What-You-Kill[n]The EatWhatYouKill strategy is named after a hunting proverb in the sense that one should only kill to eat. The use of this phrase is not an endorsement of hunting nor killing of wildlife for food or sport.[/n] execution strategy to apply mechanically sympathetic techniques to the scheduling of threads in the producer-consumer pattern that are used for core capabilities in the server. The initial implementations proved vulnerable to thread starvation and Jetty-9.3 introduced dual scheduling strategies to keep the server running, which in turn suffered from lock contention on machines with more than 16 cores.  The Jetty-9.4 release now contains the latest incarnation of the Eat-What-You-Kill scheduling strategy which provides mechanical sympathy without the risk of thread starvation in a single strategy.  This blog is an update of the original post with the latest refinements.

    Parallel Mechanical Sympathy

    Parallel computing is a “false friend” for many web applications. The textbooks will tell you that parallelism is about decomposing large tasks into smaller ones that can be executed simultaneously by different computing engines to complete the task faster. While this is true, the issue is that for web application containers there is not an agreement on what is the “large task” that needs to be decomposed.

    From the applications point of view the large task to be solved is how to render a complex page for a user, combining multiple requests and resources, using many services for authentication and perhaps RESTful access to a data model on multiple back end servers. For the application, parallelism can improve quality of service of rendering a single page by spreading the decomposed tasks over all the available CPUs of the server.

    However, a web application container has a different large task to solve: how to provide service to hundreds or thousands, maybe even hundreds of thousands of simultaneous users. Unfortunately, for the container, the way to optimally allocate its this decomposed task to CPUs is completely opposite to how the application would like it’s decomposed tasks to be executed.

    Consider a server with 4 CPUs serving 4 users each which each have 4 tasks. The applications ideal view of parallel decomposition looks like:

    Label UxTy represent Task y for User x. Tasks for the same user are coloured alike

    This view suggests that each user’s combined task will be executed in minimum time. However some users must wait for prior users tasks to complete before their execution can start, so average latency is higher.

    Furthermore, we know from Mechanical Sympathy that such ideal execution is rarely possible, especially if there is data shared between tasks. Each CPU needs time to load its cache and register with data before it can be acted on. If that data is specific to the problem each user is trying to solve, then the real view of the parallel execution looks more like the following, the orange blocks indicating the time taken to load the CPU cache with user and task related data:

    Label UxTy represent Task y for User x. Tasks for the same user are coloured alike. Orange blocks represent cache load time.

    So from the containers point of view, the last thing it wants is the data from one users large problem spread over all its CPUs, because that means that when it executes the next task, it will have a cold cache and it must be reloaded with the data of the next user.  Furthermore, executing tasks for the same user on different CPUs risks Parallel Slowdown, where the cost of mutual exclusion, synchronisation and communication between CPUs can increase the total time needed to execute the tasks to more than serial execution.  If the tasks are fully mutually excluded on user data (unlikely but a bounding case), then the execution could look like:

    For optimal execution from the containers point of view it is far better if tasks from each user, which use common data, are kept on the same CPU so the cache only needs to be loaded once and there is no mutual exclusion on user data:

    While this style of execution does not achieve the minimal latency and throughput of the idealised application view, in reality it is the fairest and most optimal execution, with all users receiving similar quality of service and the optimal average latency.

    In summary, when scheduling the execution of parallel tasks, it is best to keep tasks that share data on the same CPU so that they may benefit from a hot cache (the original blog contains some micro benchmark results that quantifies the benefit).

    Produce Consume (PC)

    In order to facilitate the decomposition of large problems into smaller ones, the Jetty container uses the Producer-Consumer pattern:

    • The NIO Selector produces IO events that need to be consumed by reading, parsing and handling the data.
    • A multiplexed HTTP/2 connection produces Frames that need to be consumed by calling the Servlet Container. Note that the producer of HTTP/2 frames is itself a consumer of IO events!

    The producer-consumer pattern adds another way that tasks can be related by data. Not only might they be for the same user, but consuming a task will share the data that results from producing the task. A simple implementation can achieve this by using only a single CPU to both produce and consume the tasks:

    while (true)
    {
      Runnable task = _producer.produce();
      if (task == null)
        break;
       task.run();
    }

    The resulting execution pattern has good mechanical sympathy characteristics:

    Label UxPy represent Produce Task y for User x, Label UxCy represent Consume Task y for User x. Tasks for the same user are coloured in similar tones. Orange blocks are cache load times.

    Here all the produced tasks are immediately consumed on the same CPU with a hot cache!  Cache load times are minimised, but the cost is that server will suffer from Head of Line (HOL) Blocking, where the serial execution of task from a queue means that execution of tasks are forced to wait for the completion of unrelated tasks.  In this case tasks for U1C0 need not wait for U0C0 and U2C0 tasks need not wait for U1C1 or U0C1 etc. There is no parallel execution and thus this is not an optimal usage of the server resources.

    Produce Execute Consume (PEC)

    To solve the HOL blocking problem, multiple CPUs must be used so that produced tasks can be executed in parallel and even if one is slow or blocks, the other CPU can progress the other tasks.  To achieve this, a typical solution is to have one Thread executing on a CPU that will only produce tasks, which are then placed in a queue of tasks to be executed by Threads running on other CPUs.   Typically the task queue is abstracted into an Executor:

    while (true)
    {
        Runnable task = _producer.produce();
        if (task == null)
            break;
        _executor.execute(task);
    }

    This strategy could be considered the canonical solution to the producer consumer problem, where producers are separated from consumers by a queue and is at the heart of architectures such as SEDA. This strategy well solves the head of line blocking issue, since all tasks produced can complete independently in different Threads on different CPUs:

    This represents a good improvement in throughput and average latency over the simple Produce Consume, solution, but the cost is that every consumed task is executed on a different Thread (and thus likely a different CPU) from the one that produced the task.  While this may appear like a small cost for avoiding HOL blocking, our experience is that CPU cache misses significantly reduced the performance of early Jetty 9 releases.

    Eat What You Kill (EWYK) AKA Execute Produce Consume (EPC)

    To achieve good mechanical sympathy and avoid HOL blocking, Jetty has developed the Execute Produce Consume strategy, that we have nicknamed Eat What You Kill (EWYK) after the expression which states a hunter should only kill an animal they intend to eat. Applied to the producer consumer problem this policy says that a thread should only produce (kill) a task if it intends to consume (eat) it[n]The EatWhatYouKill strategy is named after a hunting proverb in the sense that one should only kill to eat. The use of this phrase is not an endorsement of hunting nor killing of wildlife for food or sport.[/n]. A task queue is still used to achieve parallel execution, but it is the producer that is dispatched rather than the produced task:

        while (true)
        {
            Runnable task = _producer.produce();
            if (task == null)
                break;
            _executor.execute(this); // dispatch production
            task.run(); // consume the task ourselves
        }

    The result is that a task is consumed by the same Thread, and thus likely the same CPU, that produced it, so that consumption is always done with a hot cache:

    Moreover, because any thread that completes consuming a task will immediately attempt to produce another task, there is the possibility of a single Thread/CPU executing multiple produce/consume cycles for the same user. The result is improved average latency and reduced total CPU time.

    Starvation!

    Unfortunately, a pure implementation of EWYK suffers from a fatal flaw! Since any thread producing a task will go on to consume that task,  it is possible for all threads/CPU to be consuming at once.   This was initially seen as a feature as it exerted good back pressure on the network as a busy server used all its resources consuming existing tasks rather than producing new tasks. However, in an application server consuming a task may be a blocking process that waits for more data/frames to be produced. Unfortunately if every thread/CPU ends up consuming such a blocking task, then there are no threads left available to produce the tasks to unblock them. Dead lock!

    A real example of this occurred with HTTP/2, when every Thread from the pool was blocked in a HTTP/2 request because it had used up its flow control window. The windows can be expanded by flow control frames from the other end, but there were no threads available to process the flow control frames!

    Thus the EWYK execution strategy used in Jetty is now adaptive and it can can use the most appropriate of the three strategies outlined above, ensuring there is always at least one thread/CPU producing so that starvation does not occur. To be adaptive, Jetty uses two mechanisms:

    • Tasks that are produced can be interrogated via the Invocable interface to determine if they are nonblocking, blocking or can be run in either mode.  NON_BLOCKING or EITHER tasks can be directly consumed by PC model.
    • The thread pools used by Jetty implement the TryExecutor interface which supports the method boolean tryExecute(Runnable task)which allows the scheduler to know if a thread was available to continue producing and thus allows EWYK/EPC mode, otherwise the task must be passed to an executor to be consumed in PEC mode.  To implement this semantic, Jetty maintains a dynamically sized pool of reserved threads that can respond to tryExecute(Runnable)calls.

    Thus the simple produce consume (PC) model is used for non-blocking tasks; for blocking tasks the EWYK, aka Execute Produce Consume (EPC) mode is used if a reserved thread is available, otherwise the SEDA style Produce Execute Consume (PEC) model is used.

    The adaptive EWYK strategy can be written as :

        while (true)
        {
            Runnable task = _producer.produce();
            if (task == null)
                break;
            if (Invocable.getInvocationType(task)==NON_BLOCKING)
                task.run();                     // Produce Consume
            else if (executor.tryExecute(this)) // recruit a new producer?
                task.run();                     // Execute Produce Consume (EWYK!)
            else
                executor.execute(task);         // Produce Execute Consume
        }
    

    Chained Execution Strategies

    As stated above, in the Jetty use-case it is common for the execution strategy used by the IO layer to call tasks that are themselves an execution strategy for producing and consuming HTTP/2 frames.  Thus EWYK strategies can be chained and by knowing some information about the mode in which the prior  strategy has executed them the strategies can be even more adaptive.

    The adaptable chainable EWYK strategy is outlined here:

      while (true) {
        Runnable task = _producer.produce();
        if (task == null)
          break;
        if (thisThreadIsNonBlocking())
        {
          switch(Invocable.getInvocationType(task))
          {
            case NON_BLOCKING:
              task.run();                 // Produce Consume
              break;
            case BLOCKING:
              executor.execute(task);     // Produce Execute Consume
              break;
            case EITHER:
              executeAsNonBlocking(task); // Produce Consume break;
           }
        }
        else
        {
          switch(Invocable.getInvocationType(task))
          {
            case NON_BLOCKING:
              task.run();                   // Produce Consume
              break;
            case BLOCKING:
              if (_executor.tryExecute(this))
                task.run();                 // Execute Produce Consume (EWYK!)
              else
                executor.execute(task);     // Produce Execute Consume
              break;
            case EITHER:
              if (_executor.tryExecute(this))
                task.run();                 // Execute Produce Consume (EWYK!)
              else
                executeAsNonBlocking(task); // Produce Consume
                break;
           }
        }

    An example of how the chaining works is that the HTTP/2 task declares itself as invocable EITHER in blocking on non blocking mode. If IO strategy is operating in PEC mode, then the HTTP/2 task is in its own thread and free to block, so it can itself use EWYK and potentially execute a blocking task that it produced.

    However, if the IO strategy has no reserved threads it cannot risk queuing an important Flow Control frame in a job queue. Instead it can execute the HTTP/2 as a non blocking task in the PC mode.  So even if the last available thread was running the IO strategy, it can use PC mode to execute HTTP/2 tasks in non blocking mode. The HTTP/2 strategy is then always able to handle flow control frames as they are non-blocking tasks run as PC and all other frames that may block are queued with PEC.

    Conclusion

    The EWYK execution strategy has been implemented in Jetty to improve performance through mechanical sympathy, whilst avoiding the issues of Head of Line blocking, Thread Starvation and Parallel Slowdown.   The team at Webtide continue to work with our clients and users to analyse and innovate better solutions to serve high performance real world applications.

  • CometD 2.4.0.beta1 Released

    CometD 2.4.0.beta1 has been released.
    This is a major release that brings in a few new Java API (see this issue) – client-side channels can now be released to save memory, along with an API deprecation (see this issue) – client-side publish() should not specify the message id.
    On the WebSocket front, the WebSocket transports have been overhauled and made up-to-date with the latest WebSocket drafts (currently Jetty implements up to draft 13, while browsers are still a bit back on draft 7/8 or so), and made scalable as well in both threading and memory usage.
    Following these changes, BayeuxClient has been updated to negotiate transports with the server, and Oort has also been updated to use WebSocket by default for server-to-server communication, making server-to-server communication more efficient and with less latency.
    WebSocket is now supported on Firefox 6 through the use of the Firefox-specific MozWebSocket object in the javascript library.
    We have performed some preliminary benchmarks with WebSocket; they look really promising, although have been done before the latest changes to the CometD WebSocket transports.
    We plan to do a more accurate benchmarking in the next days/weeks.
    The other major change is the pluggability of the JSON library to handle JSON generation and parsing (see this issue).
    CometD has been long time based on Jetty’s JSON library, but now also Jackson can be used (the default will still be Jetty’s however, to avoid breaking deployed applications that were using the Jetty JSON classes).
    Jackson proved to be faster than Jetty in both parsing and generation, and will likely to become the default in few releases, to allow gradual migration of application that made use of Jetty JSON classes directly.
    The applications should be written independently of the JSON library used.
    Of course Jackson also brings in its powerful configurability and annotation processing so that your custom classes can be de/serialized from/to JSON.
    Here you can find the release notes.
    Download it, use it, and report back, any feedback is important before the final 2.4.0 release.

  • Websocket Example: Server, Client and LoadTest

    The websocket protocol specification is approaching final and the Jetty implementation and API have been tracking the draft and is ready when the spec and browsers are available.   More over, Jetty release 7.5.0 now includes a capable websocket java client that can be used for non browser applications or load testing. It is fully asynchronous and can create thousands of connections simultaneously.

    This blog uses the classic chat example to introduce a websocket server, client and load test.

    The project

    The websocket example has been created as a maven project with groupid com.example.  The entire project can be downloaded from here.   The pom.xml defines a dependency on org.eclipse.jetty:jetty-websocket-7.5.0.RC1 (you should update to 7.5.0 when the final release is available), which provides the websocket API and transitively the jetty implementation.  There is also a dependency on org.eclipse.jetty:jetty-servlet which provides the ability to create an embedded servlet container to run the server example.

    While the project implements a Servlet, it is not in a typical webapp layout, as I wanted to provide both client and server in the same project.    Instead of a webapp, this project uses embedded jetty in a simple Main class to provide the server and the static content is served from the classpath from src/resources/com/example/docroot.

    Typically developers will want to build a war file containing a webapp, but I leave it as an exercise for the reader to put the servlet and static content described here into a webapp format.

    The Servlet

    The Websocket connection starts with a HTTP handshake.  Thus the websocket API in jetty also initiated by the handling of a HTTP request (typically) by a Servlet.  The advantage of this approach is that it means that websocket connections are terminated in the same rich application space provided by HTTP servers, thus a websocket enabled web application can be developed in a single environment rather than by collaboration between a HTTP server and a separate websocket server.

    We create the ChatServlet with an init() method that instantiates and configures a WebSocketFactory instance:

    public class ChatServlet extends HttpServlet
    {
      private WebSocketFactory _wsFactory;
      private final Set _members = new CopyOnWriteArraySet();
      @Override
      public void init() throws ServletException
      {
        // Create and configure WS factory
        _wsFactory=new WebSocketFactory(new WebSocketFactory.Acceptor()
        {
          public boolean checkOrigin(HttpServletRequest request, String origin)
          {
            // Allow all origins
            return true;
          }
          public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
          {
             if ("chat".equals(protocol))
               return new ChatWebSocket();
             return null;
          }
        });
        _wsFactory.setBufferSize(4096);
        _wsFactory.setMaxIdleTime(60000);
      }
      ...

    The WebSocketFactory is instantiated by passing it an Acceptor instance, which in this case is an anonymous instance. The Acceptor must implement two methods: checkOrigin, which in this case accepts all; and doWebSocketConnect, which must accept a WebSocket connection by creating and returning an instance of the WebSocket interface to handle incoming messages.  In this case, an instance of the nested ChatWebSocket class is created if the protocol is “chat”.   The other WebSocketFactory fields have been initialised with hard coded buffers size and timeout, but typically these would be configurable from servlet init parameters.

    The servlet handles get requests by passing them to the WebSocketFactory to be accepted or not:

      ...
      protected void doGet(HttpServletRequest request,
                           HttpServletResponse response)
        throws IOException
      {
        if (_wsFactory.acceptWebSocket(request,response))
          return;
        response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
                           "Websocket only");
      }
      ...

    All that is left for the Servlet, is the ChatWebSocket itself.   This is just a POJO that receives callbacks for events.  For this example we have implemented the WebSocket.OnTextMessage interface to restrict the call backs to only connection management and full messages:

      private class ChatWebSocket implements WebSocket.OnTextMessage
      {
        Connection _connection;
        public void onOpen(Connection connection)
        {
          _connection=connection;
          _members.add(this);
        }
        public void onClose(int closeCode, String message)
        {
          _members.remove(this);
        }
        public void onMessage(String data)
        {
          for (ChatWebSocket member : _members)
          {
            try
            {
              member._connection.sendMessage(data);
            }
            catch(IOException e)
            {
              e.printStackTrace();
            }
          }
        }
      }

    The handling of the onOpen callback is to add the ChatWebSocket to the set of all members (and remembering the Connection object for subsequent sends).  The onClose handling simply removes the member from the set.   The onMessage handling iterates through all the members and sends the received message to them (and prints any resulting exceptions).

     

    The Server

    To run the servlet, there is a simple Main method that creates an embedded Jetty server with a ServletHandler for the chat servlet, as ResourceHandler for the static content needed by the browser client and a DefaultHandler to generate errors for all other requests:

    public class Main
    {
      public static void main(String[] arg) throws Exception
      {
        int port=arg.length>1?Integer.parseInt(arg[1]):8080;
        Server server = new Server(port);
        ServletHandler servletHandler = new ServletHandler();
        servletHandler.addServletWithMapping(ChatServlet.class,"/chat/*");
        ResourceHandler resourceHandler = new ResourceHandler();
        resourceHandler.setBaseResource(Resource.newClassPathResource("com/example/docroot/"));
        DefaultHandler defaultHandler = new DefaultHandler();
        HandlerList handlers = new HandlerList();
        handlers.setHandlers(new Handler[] {servletHandler,resourceHandler,defaultHandler});
        server.setHandler(handlers);
        server.start();
        server.join();
      }
    }

    The server can be run from an IDE or via maven using the following command line:

    mvn
    mvn -Pserver exec:exec

    The Browser Client

    The HTML for the chat room simply imports some CSS and the javascript before creating a few simple divs to contain the chat text, the join dialog and the joined dialog:

    <html>
     <head>
     <title>WebSocket Chat Example</title>
     <script type='text/javascript' src="chat.js"></script>
     <link rel="stylesheet" type="text/css" href="chat.css" />
     </head>
     <body>
      <div id='chat'></div>
      <div id='input'>
       <div id='join' >
        Username:&nbsp;<input id='username' type='text'/>
        <input id='joinB' class='button' type='submit' name='join' value='Join'/>
       </div>
       <div id='joined' class='hidden'>
        Chat:&nbsp;<input id='phrase' type='text'/>
        <input id='sendB' class='button' type='submit' name='join' value='Send'/>
       </div>
      </div>
      <script type='text/javascript'>init();</script>
     </body>
    </html>

    The javascript create a room object with methods to handle the various operations of a chat room.  The first operation is to join the chat room, which is triggered by entering a user name.  This creates a new WebSocket object pointing to the /chat URL path on the same server the HTML was loaded from:

    var room = {
      join : function(name) {
        this._username = name;
        var location = document.location.toString()
          .replace('http://', 'ws://')
          .replace('https://', 'wss://')+ "chat";
        this._ws = new WebSocket(location, "chat");
        this._ws.onopen = this.onopen;
        this._ws.onmessage = this.onmessage;
        this._ws.onclose = this.onclose;
      },
      onopen : function() {
        $('join').className = 'hidden';
        $('joined').className = '';
        $('phrase').focus();
        room.send(room._username, 'has joined!');
      },
      ...

    The javascript websocket object is initialised with call backs for onopen, onclose and onmessage. The onopen callback is handled above by switching the join div to the joined div and sending a “has joined” message.

    Sending is implemented by creating a string of username:message and sending that via the WebSocket instance:

      ...
      send : function(user, message) {
        user = user.replace(':', '_');
        if (this._ws)
          this._ws.send(user + ':' + message);
      },
      ...

    If the chat room receives a message, the onmessage callback is called, which sanitises the message, parses out the username and appends the text to the chat div:

      ...
      onmessage : function(m) {
        if (m.data) {
          var c = m.data.indexOf(':');
          var from = m.data.substring(0, c)
            .replace('<','<')
            .replace('>','>');
          var text = m.data.substring(c + 1)
            .replace('<', '<')
            .replace('>', '>');
          var chat = $('chat');
          var spanFrom = document.createElement('span');
          spanFrom.className = 'from';
          spanFrom.innerHTML = from + ': ';
          var spanText = document.createElement('span');
          spanText.className = 'text';
          spanText.innerHTML = text;
          var lineBreak = document.createElement('br');
          chat.appendChild(spanFrom);
          chat.appendChild(spanText);
          chat.appendChild(lineBreak);
          chat.scrollTop = chat.scrollHeight - chat.clientHeight;
        }
      },
      ...

    Finally, the onclose handling empties the chat div and switches back to the join div so that a new username may be entered:

      ...
      onclose : function(m) {
        this._ws = null;
        $('join').className = '';
        $('joined').className = 'hidden';
        $('username').focus();
        $('chat').innerHTML = '';
      }
    };

    With this simple client being served from the server, you can now point your websocket capable browsers at http://localhost:8080 and interact with the chat room. Of course this example glosses over a lot of detail and complications a real chat application would need, so I suggest you read my blog is websocket chat simpler to learn what else needs to be handled.

    The Load Test Client

    The jetty websocket java client is an excellent tool for both functional and load testing of a websocket based service.  It  uses the same endpoint API as the server side and for this example we create a simple implementation of the OnTextMessage interface that keeps track of the all the open connection and counts the number of messages sent and received:

    public class ChatLoadClient implements WebSocket.OnTextMessage
    {
      private static final AtomicLong sent = new AtomicLong(0);
      private static final AtomicLong received = new AtomicLong(0);
      private static final Set<ChatLoadClient> members = new CopyOnWriteArraySet<ChatLoadClient>();
      private final String name;
      private final Connection connection;
      public ChatLoadClient(String username,WebSocketClient client,String host, int port)
      throws Exception
      {
        name=username;
        connection=client.open(new URI("ws://"+host+":"+port+"/chat"),this).get();
      }
      public void send(String message) throws IOException
      {
        connection.sendMessage(name+":"+message);
      }
      public void onOpen(Connection connection)
      {
        members.add(this);
      }
      public void onClose(int closeCode, String message)
      {
        members.remove(this);
      }
      public void onMessage(String data)
      {
        received.incrementAndGet();
      }
      public void disconnect() throws IOException
      {
        connection.disconnect();
      }

    The Websocket is initialized by calling open on the WebSocketClient instance passed to the constructor.  The WebSocketClient instance is shared by multiple connections and contains the thread pool and other common resources for the client.

    This load test example comes with a main method that creates a WebSocketClient from command line options and then creates a number of ChatLoadClient instances:

    public static void main(String... arg) throws Exception
    {
      String host=arg.length>0?arg[0]:"localhost";
      int port=arg.length>1?Integer.parseInt(arg[1]):8080;
      int clients=arg.length>2?Integer.parseInt(arg[2]):1000;
      int mesgs=arg.length>3?Integer.parseInt(arg[3]):1000;
      WebSocketClient client = new WebSocketClient();
      client.setBufferSize(4096);
      client.setMaxIdleTime(30000);
      client.setProtocol("chat");
      client.start();
      // Create client serially
      ChatLoadClient[] chat = new ChatLoadClient[clients];
      for (int i=0;i<chat.length;i++)
        chat[i]=new ChatLoadClient("user"+i,client,host,port);
      ...

    Once the connections are opened, the main method loops around picking a random client to speak in the chat room

      ...
      // Send messages
      Random random = new Random();
      for (int i=0;i<mesgs;i++)
      {
        ChatLoadClient c = chat[random.nextInt(chat.length)];
        String msg = "Hello random "+random.nextLong();
        c.send(msg);
      }
      ...

    Once all the messages have been sent and all the replies have been received, the connections are closed:

      ...
      // close all connections
      for (int i=0;i<chat.length;i++)
        chat[i].disconnect();

    The project is setup so that the load client can be run with the following maven command:

    mvn -Pclient exec:exec

    And the resulting output should look something like:

    Opened 1000 of 1000 connections to localhost:8080 in 1109ms
    Sent/Received 10000/10000000 messages in 15394ms: 649603msg/s
    Closed 1000 connections to localhost:8080 in 45ms

    Yes that is 649603 messages per second!!!!!!!!!!! This is a pretty simple easy test, but it is still scheduling 1000 local sockets plus generating and parsing all the websocket frames. Real applications on real networks are unlikely to achieve close to this level, but the indications are good for the capability of high throughput and stand by for more rigorous bench marks shortly.