Ktor 3.0.0-beta-2 Help

WebSockets

WebSocket is a protocol which provides a full-duplex communication session between the user's browser and a server over a single TCP connection. It is particularly useful for creating applications that require real-time data transfer from and to the server.

Ktor supports the WebSocket protocol both on the server-, and the client-side.

Ktor allows you to:

  • Configure basic WebSocket settings, such as frame size, a ping period, and so on.

  • Handle a WebSocket session for exchanging messages between the server and client.

  • Add WebSocket extensions. For example, you can use the Deflate extension or implement a custom extension.

Add dependencies

To use WebSockets, you need to include the ktor-server-websockets artifact in the build script:

implementation("io.ktor:ktor-server-websockets:$ktor_version")
implementation "io.ktor:ktor-server-websockets:$ktor_version"
<dependency> <groupId>io.ktor</groupId> <artifactId>ktor-server-websockets-jvm</artifactId> <version>${ktor_version}</version> </dependency>

Install WebSockets

To install the WebSockets plugin to the application, pass it to the install function in the specified module. The code snippets below show how to install WebSockets...

  • ... inside the embeddedServer function call.

  • ... inside the explicitly defined module, which is an extension function of the Application class.

import io.ktor.server.engine.* import io.ktor.server.netty.* import io.ktor.server.application.* import io.ktor.server.websocket.* fun main() { embeddedServer(Netty, port = 8080) { install(WebSockets) // ... }.start(wait = true) }
import io.ktor.server.application.* import io.ktor.server.websocket.* // ... fun Application.module() { install(WebSockets) // ... }

Configure WebSockets

Optionally, you can configure the plugin inside the install block by passing WebSocketOptions:

  • Use the pingPeriod property to specify the duration between pings.

  • Use the timeout property to set a timeout after which the connection to be closed.

  • Use the maxFrameSize property to set a maximum Frame that could be received or sent.

  • Use the masking property to specify whether masking is enabled.

  • Use the contentConverter property to set a converter for serialization/deserialization.

install(WebSockets) { pingPeriod = Duration.ofSeconds(15) timeout = Duration.ofSeconds(15) maxFrameSize = Long.MAX_VALUE masking = false }

Handle WebSockets sessions

API overview

Once you have installed and configured the WebSockets plugin, you can define an endpoint to handle a Websocket session. To define a WebSocket endpoint on a server, call the webSocket function inside the routing block:

routing { webSocket("/echo") { // Handle a WebSocket session } }

In this example, the server accepts WebSocket requests to ws://localhost:8080/echo when a default configuration is used.

Inside the webSocket block, you define the handler for the WebSocket session, which is represented by the DefaultWebSocketServerSession class. The following functions and properties are available within the block:

  • Use the send function to send text content to the client.

  • Use the incoming and outgoing properties to access the channels for receiving and sending WebSocket frames. A frame is represented by the Frame class.

  • Use the close function to send a close frame with the specified reason.

When handling a session, you can check a frame type, for example:

  • Frame.Text is a text frame. For this frame type, you can read its content using Frame.Text.readText().

  • Frame.Binary is a binary frame. For this type, you can read its content using Frame.Binary.readBytes().

Below, we'll take a look at the examples of using this API.

Example: Handle a single session

The example below shows how to create the echo WebSocket endpoint to handle a session with one client:

routing { webSocket("/echo") { send("Please enter your name") for (frame in incoming) { frame as? Frame.Text ?: continue val receivedText = frame.readText() if (receivedText.equals("bye", ignoreCase = true)) { close(CloseReason(CloseReason.Codes.NORMAL, "Client said BYE")) } else { send(Frame.Text("Hi, $receivedText!")) } } } }

For the full example, see server-websockets.

Example: Handle multiple sessions

To efficiently manage multiple WebSocket sessions and handle broadcasting, you can utilize Kotlin's SharedFlow. This approach provides a scalable and concurrency-friendly method for managing WebSocket communications. Here's how to implement this pattern:

  1. Define a SharedFlow for broadcasting messages:

val messageResponseFlow = MutableSharedFlow<MessageResponse>() val sharedFlow = messageResponseFlow.asSharedFlow()
  1. In your WebSocket route, implement the broadcasting and message handling logic:

webSocket("/ws") { send("You are connected to WebSocket!") val job = launch { sharedFlow.collect { message -> send(message.message) } } runCatching { incoming.consumeEach { frame -> if (frame is Frame.Text) { val receivedText = frame.readText() val messageResponse = MessageResponse(receivedText) messageResponseFlow.emit(messageResponse) } } }.onFailure { exception -> println("WebSocket exception: ${exception.localizedMessage}") }.also { job.cancel() } }

The runCatching block processes incoming messages and emits them to the SharedFlow, which then broadcasts to all collectors.

By using this pattern, you can efficiently manage multiple WebSocket sessions without manually tracking individual connections. This approach scales well for applications with many concurrent WebSocket connections and provides a clean, reactive way to handle message broadcasting.

For the full example, see server-websockets-sharedflow.

The WebSocket API and Ktor

The standard events from the WebSocket API map to Ktor in the following way:

  • onConnect happens at the start of the block.

  • onMessage happens after successfully reading a message (for example, with incoming.receive()) or using suspended iteration with for(frame in incoming).

  • onClose happens when the incoming channel is closed. That would complete the suspended iteration, or throw a ClosedReceiveChannelException when trying to receive a message.

  • onError is equivalent to other exceptions.

In both onClose and onError, the closeReason property is set.

In the following example, the infinite loop will only be exited when an exception has risen (either a ClosedReceiveChannelException or another exception):

webSocket("/echo") { println("onConnect") try { for (frame in incoming){ val text = (frame as Frame.Text).readText() println("onMessage") received += text outgoing.send(Frame.Text(text)) } } catch (e: ClosedReceiveChannelException) { println("onClose ${closeReason.await()}") } catch (e: Throwable) { println("onError ${closeReason.await()}") e.printStackTrace() } }
Last modified: 18 July 2024