Ktor 1.6.8 Help

Server WebSockets

Ktor supports the WebSocket protocol and allows you to create applications that require real-time data transfer from and to the server. For example, WebSockets can be used to create a chat application.

Ktor allows you to:

  • Configure basic WebSocket settings, such as frame size, a ping period, and so on.

  • Handle a WebSocket session for exchanging messages between the server and client.

  • Add WebSocket extensions. For example, you can use the Deflate extension or implement a custom extension.

Add dependencies

To enable WebSockets support, you need to include the ktor-websockets artifact in the build script:

implementation "io.ktor:ktor-websockets:$ktor_version"
implementation("io.ktor:ktor-websockets:$ktor_version")
<dependency> <groupId>io.ktor</groupId> <artifactId>ktor-websockets</artifactId> <version>${ktor_version}</version> </dependency>

Install WebSockets

To install the WebSockets plugin, pass it to the install function in the application initialization code. Depending on the way used to create a server, this can be the embeddedServer function call ...

import io.ktor.features.* // ... fun main() { embeddedServer(Netty, port = 8080) { install(WebSockets) // ... }.start(wait = true) }

... or a specified module.

import io.ktor.features.* // ... fun Application.module() { install(WebSockets) // ... }

Configure WebSockets settings

Optionally, you can configure the plugin by passing WebSocketOptions to the install function:

install(WebSockets) { pingPeriod = Duration.ofSeconds(15) timeout = Duration.ofSeconds(15) maxFrameSize = Long.MAX_VALUE masking = false }

Handle WebSockets sessions

API overview

After you installed and configured the WebSockets plugin, you are ready to handle a WebSocket session. First, you need to define a WebSocket endpoint on a server by calling the webSocket function inside the routing block:

routing { webSocket("/echo") { // Handle a WebSocket session } }

For such an endpoint, a server accepts WebSocket requests to ws://localhost:8080/echo when a default configuration is used.

Inside the webSocket block, you need to handle a WebSocket session, which is represented by the DefaultWebSocketServerSession class. Session configuration might look as follows:

  1. Use the send function to send text content to the client.

  2. Use the incoming and outgoing properties to access the channels for receiving and sending WebSocket frames. A frame is represented by the Frame class.

  3. When handing a session, you can check a frame type, for example:

    • Frame.Text is a text frame. For this frame type, you can read its content using Frame.Text.readText().

    • Frame.Binary is a binary frame. For this type, you can read its content using Frame.Binary.readBytes().

    • Frame.Close is a closing frame. You can call Frame.Close.readReason() to get a close reason for the current session.

  4. Use the close function to send a close frame with the specified reason.

Below we'll take a look at the examples of using this API.

Example: Handle a single session

The example below shows how to create the echo WebSocket endpoint to handle a session with one client:

routing { webSocket("/echo") { send("Please enter your name") for (frame in incoming) { when (frame) { is Frame.Text -> { val receivedText = frame.readText() if (receivedText.equals("bye", ignoreCase = true)) { close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE")) } else { send(Frame.Text("Hi, $receivedText!")) } } } } } } }

You can find the full example here: server-websockets.

Example: Handle multiple sessions

To handle multiple WebSocket sessions (for example, for a chat application), you need to store each session on a server. For example, you can define a connection with a unique name and associate it with a specified session. A sample Connection class below shows how to do this:

class Connection(val session: DefaultWebSocketSession) { companion object { var lastId = AtomicInteger(0) } val name = "user${lastId.getAndIncrement()}" }

Then, you can create a new connection inside the webSocket handler when a new client connects to the WebSocket endpoint:

routing { val connections = Collections.synchronizedSet<Connection?>(LinkedHashSet()) webSocket("/chat") { val thisConnection = Connection(this) connections += thisConnection send("You've logged in as [${thisConnection.name}]") for (frame in incoming) { when (frame) { is Frame.Text -> { val receivedText = frame.readText() val textWithUsername = "[${thisConnection.name}]: $receivedText" connections.forEach { it.session.send(textWithUsername) } } } } } } }

You can find the full example here: server-websockets.

Testing

You can test WebSocket conversations by using the handleWebSocketConversation function inside the withTestApplication block:

class ModuleTest { @Test fun testConversation() { withTestApplication(Application::module) { handleWebSocketConversation("/echo") { incoming, outgoing -> val greetingText = (incoming.receive() as Frame.Text).readText() assertEquals("Please enter your name", greetingText) outgoing.send(Frame.Text("JetBrains")) val responseText = (incoming.receive() as Frame.Text).readText() assertEquals("Hi, JetBrains!", responseText) outgoing.send(Frame.Text("bye")) val closeReason = (incoming.receive() as Frame.Close).readReason()?.message assertEquals("Client said BYE", closeReason) } } } }

The WebSocket API and Ktor

The standard events from the WebSocket API map to Ktor in the following way:

  • onConnect happens at the start of the block.

  • onMessage happens after successfully reading a message (for example, with incoming.receive()) or using suspended iteration with for(frame in incoming).

  • onClose happens when the incoming channel is closed. That would complete the suspended iteration, or throw a ClosedReceiveChannelException when trying to receive a message`.

  • onError is equivalent to other exceptions.

In both onClose and onError, the closeReason property is set.

webSocket("/echo") { println("onConnect") try { for (frame in incoming){ val text = (frame as Frame.Text).readText() println("onMessage") received += text outgoing.send(Frame.Text(text)) } } catch (e: ClosedReceiveChannelException) { println("onClose ${closeReason.await()}") } catch (e: Throwable) { println("onError ${closeReason.await()}") e.printStackTrace() } }

In this sample, the infinite loop is only exited with an exception is risen: either a ClosedReceiveChannelException or another exception.

Last modified: 11 May 2022