In JDK 19, virtual thread is available, but still in preview. We may also take advantage of virtual thread.
The implementation is quite similar as MultiThreadNioServer.
public class VirtualThreadNioServer extends AbstractNioServer {
@Override
protected void process(Iterator<SelectionKey> iterator) throws IOException {
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) {
registerReadOperation(selectionKey);
} else if (selectionKey.isReadable()) {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
selectionKey.cancel();
process(socketChannel);
}
iterator.remove();
}
handleClients(false);
}
@Override
protected void process(SocketChannel socketChannel) {
Thread.ofVirtual().start(() -> {
Client client = new Client(socketChannel);
MeasureData measureData = new MeasureData();
client.setMeasureData(measureData);
try {
measureData.add(RequestMeasureCollector.FIELD_NAME_CONNECTED);
DataReader dataReader = new ChannelDataReader(socketChannel, bufSize, MAX_COMMAND_LENGTH);
List<ByteWord> words = commandReader.read(dataReader);
measureData.add(RequestMeasureCollector.FIELD_NAME_READ);
Command command = CommandFactory.parseCommand(words);
measureData.add(RequestMeasureCollector.FIELD_NAME_PARSE);
client.setCommand(command);
} catch (JimdsException e) {
client.setClientError(true);
measureData.add(RequestMeasureCollector.FIELD_NAME_ERROR);
// internal exception, write to output
NetworkData data = new NetworkError(e.getMessage());
client.setResult(data);
} catch (Throwable e) {
client.setClientError(true);
measureData.add(RequestMeasureCollector.FIELD_NAME_ERROR);
// should only be thrown when writing data to output
// if input throws IOException, an error message will be written to output.
// throw new RuntimeException(e);
// ignore the message, and close socket
e.printStackTrace();
}
queue.offer(client);
});
}
/**
* Handle all clients in queue.
*
* @param forceSync whether to handle clients synchronized
*/
private void handleClients(boolean forceSync) {
List<Client> clients = new ArrayList<>();
// only process clients in queue currently.
while (!queue.isEmpty()) {
clients.add(queue.poll());
}
for (Client client : clients) {
if (client.isClientError()) {
SocketChannel socketChannel = client.getSocketChannel();
if (socketChannel.isOpen()) {
NetworkData data = client.getResult();
if (data == null) {
data = new NetworkError("Error");
}
writeData(data, socketChannel, true);
}
} else {
executeCommand(client);
if (!forceSync && writeAsync) {
Thread.ofVirtual().start(() -> {
writeData(client.getResult(), client.getSocketChannel(), true);
});
} else {
writeData(client.getResult(), client.getSocketChannel(), true);
}
}
}
}
}
The implementation doesn’t create its own ForkJoinPool, but use builtin instead.