001package ganymede.server; 002/*- 003 * ########################################################################## 004 * Ganymede 005 * %% 006 * Copyright (C) 2021, 2022 Allen D. Ball 007 * %% 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 * ########################################################################## 020 */ 021import java.lang.reflect.InvocationTargetException; 022import java.lang.reflect.Method; 023import java.util.Queue; 024import java.util.concurrent.ConcurrentLinkedQueue; 025import lombok.Data; 026import lombok.NonNull; 027import lombok.ToString; 028import lombok.extern.log4j.Log4j2; 029import org.zeromq.SocketType; 030import org.zeromq.ZMQ; 031 032/** 033 * {@link Server} {@link Channel}. 034 * 035 * {@bean.info} 036 * 037 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball} 038 */ 039@Data @Log4j2 040public abstract class Channel { 041 @NonNull private final Server server; 042 @NonNull private final SocketType socketType; 043 private final Queue<Dispatcher> dispatcherQueue = new ConcurrentLinkedQueue<>(); 044 045 /** 046 * Method to schedule creation of and binding to a {@link ZMQ.Socket} 047 * for this address. 048 * 049 * @param connection The kernel {@link Connection}. 050 * @param address The address of the {@link ZMQ.Socket} to be 051 * created. 052 */ 053 public void connect(Connection connection, String address) { 054 Dispatcher dispatcher = new Dispatcher(this, connection, address); 055 056 getDispatcherQueue().add(dispatcher); 057 getServer().submit(dispatcher); 058 059 getServer().setCorePoolSize(Math.max(getServer().getActiveCount() + 4, getServer().getCorePoolSize())); 060 } 061 062 /** 063 * Callback method to receive and dispatch a {@link Message}. This 064 * method is called on the same thread that the {@link ZMQ.Socket} was 065 * created on and the implementation may call {@link ZMQ.Socket} methods 066 * (including {@code send()}). 067 * 068 * @param dispatcher The {@link Dispatcher}. 069 * @param socket The {@link ZMQ.Socket}. 070 * @param frame The first message frame. 071 */ 072 protected abstract void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame); 073 074 /** 075 * Callback method to {@link Server#stamp(Message) stamp} and dispatch a 076 * {@link Message}. This method is called on the same thread that the 077 * {@link ZMQ.Socket} was created on and the implementation may call 078 * {@link ZMQ.Socket} methods (including {@code send()}). 079 * 080 * @param dispatcher The {@link Dispatcher}. 081 * @param socket The {@link ZMQ.Socket}. 082 * @param message The {@link Message}. 083 */ 084 protected void send(Dispatcher dispatcher, ZMQ.Socket socket, Message message) { 085 getServer().stamp(message); 086 087 log.debug("{}\n{}", dispatcher.getAddress(), message); 088 089 message.send(dispatcher.getConnection(), socket); 090 } 091 092 /** 093 * Standard 094 * {@link.uri https://jupyter-client.readthedocs.io/en/latest/messaging.html#heartbeat-for-kernels Heartbeat} 095 * {@link Channel}. 096 * 097 * {@bean.info} 098 */ 099 @ToString @Log4j2 100 public static class Heartbeat extends Channel { 101 102 /** 103 * Sole constructor. 104 * 105 * @param server The {@link Server}. 106 */ 107 public Heartbeat(Server server) { super(server, SocketType.REP); } 108 109 @Override 110 protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame) { 111 socket.send(frame); 112 } 113 } 114 115 /** 116 * Jupyter protocol {@link Channel} abstract base class. 117 * 118 * {@bean.info} 119 */ 120 @ToString @Log4j2 121 public static abstract class Protocol extends Channel { 122 123 /** 124 * Sole constructor. 125 * 126 * @param server The {@link Server}. 127 * @param type The {@link SocketType} for created 128 * {@link ZMQ.Socket}s. 129 */ 130 public Protocol(Server server, SocketType type) { super(server, type); } 131 132 /** 133 * Callback method to dispatch a {@link Message}. Default 134 * implementation looks for a declared method 135 * {@code action(Dispatcher,ZMQ.Socket,Message)}. 136 * 137 * @param dispatcher The {@link Dispatcher}. 138 * @param socket The {@link ZMQ.Socket}. 139 * @param message The {@link Message}. 140 */ 141 protected abstract void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message); 142 143 @Override 144 protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame) { 145 try { 146 var message = Message.receive(dispatcher.getConnection(), socket, frame); 147 148 log.debug("{}\n{}", dispatcher.getAddress(), message); 149 150 dispatch(dispatcher, socket, message); 151 } catch (Exception exception) { 152 log.warn("{}", exception); 153 } 154 } 155 } 156 157 /** 158 * {@link Control Control} {@link Channel} abstract base class. The 159 * default implementation of 160 * {@link #dispatch(Dispatcher,ZMQ.Socket,Message)} constructs a 161 * {@link Message reply} skeleton, executes a declared method of the 162 * form {@code action(Dispatcher,Message,Message) throws Exception}, 163 * catches any {@link Exception} and updates the reply as necessary, and 164 * sends the reply. 165 * 166 * {@bean.info} 167 */ 168 @ToString @Log4j2 169 public static abstract class Control extends Protocol { 170 private interface PROTOTYPE { 171 public void action(Dispatcher dispatcher, Message request, Message reply) throws Exception; 172 } 173 174 private static final Method PROTOTYPE; 175 176 static { 177 PROTOTYPE = PROTOTYPE.class.getDeclaredMethods()[0]; 178 PROTOTYPE.setAccessible(true); 179 } 180 181 /** 182 * Sole constructor. 183 * 184 * @param server The {@link Server}. 185 */ 186 protected Control(Server server) { super(server, SocketType.ROUTER); } 187 188 @Override 189 protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message) { 190 var action = message.getMessageTypeAction(); 191 192 if (action != null) { 193 var reply = message.reply(); 194 195 try { 196 var method = getClass().getDeclaredMethod(action, PROTOTYPE.getParameterTypes()); 197 198 method.setAccessible(true); 199 method.invoke(this, dispatcher, message, reply); 200 } catch (Throwable throwable) { 201 if (throwable instanceof InvocationTargetException) { 202 if (throwable.getCause() != null) { 203 throwable = throwable.getCause(); 204 } 205 } 206 207 reply.status(throwable); 208 } finally { 209 if (reply != null) { 210 send(dispatcher, socket, reply); 211 } 212 } 213 } else { 214 log.warn("Could not determine action from {}", message.header()); 215 } 216 } 217 } 218 219 /** 220 * {@link IOPub IOPub} {@link Channel}. 221 * 222 * {@bean.info} 223 */ 224 @ToString @Log4j2 225 public static class IOPub extends Protocol { 226 227 /** 228 * Sole constructor. 229 * 230 * @param server The {@link Server}. 231 */ 232 public IOPub(Server server) { super(server, SocketType.PUB); } 233 234 /** 235 * Method to schedule a {@link Message} for publishing. 236 * 237 * @param message The {@link Message} to send. 238 */ 239 public void pub(Message message) { 240 getDispatcherQueue().forEach(t -> t.pub(message)); 241 } 242 243 @Override 244 protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame) { 245 throw new UnsupportedOperationException(); 246 } 247 248 @Override 249 protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message) { 250 send(dispatcher, socket, message); 251 } 252 } 253 254 /** 255 * {@link Stdin Stdin} {@link Channel} abstract base class. 256 * 257 * {@bean.info} 258 */ 259 @ToString @Log4j2 260 public static abstract class Stdin extends Protocol { 261 262 /** 263 * Sole constructor. 264 * 265 * @param server The {@link Server}. 266 */ 267 protected Stdin(Server server) { super(server, SocketType.ROUTER); } 268 } 269 270 /** 271 * {@link Shell Shell} {@link Channel} abstract base class. 272 * 273 * {@bean.info} 274 */ 275 @ToString @Log4j2 276 public static abstract class Shell extends Control { 277 private final IOPub iopub; 278 private final Stdin stdin; 279 280 /** 281 * Sole constructor. 282 * 283 * @param server The {@link Server}. 284 * @param iopub The associated {@link IOPub IOPub} 285 * {@link Channel}. 286 * @param stdin The associated {@link Stdin Stdin} 287 * {@link Channel}. 288 */ 289 protected Shell(Server server, IOPub iopub, Stdin stdin) { 290 super(server); 291 292 this.iopub = iopub; 293 this.stdin = stdin; 294 } 295 296 @Override 297 protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message) { 298 try { 299 iopub.pub(message.status(Message.status.busy)); 300 301 super.dispatch(dispatcher, socket, message); 302 } finally { 303 iopub.pub(message.status(Message.status.idle)); 304 } 305 } 306 } 307}