001package ganymede.server;
002/*-
003 * ##########################################################################
004 * Ganymede
005 * %%
006 * Copyright (C) 2021, 2022 Allen D. Ball
007 * %%
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *      http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 * ##########################################################################
020 */
021import java.lang.reflect.InvocationTargetException;
022import java.lang.reflect.Method;
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025import lombok.Data;
026import lombok.NonNull;
027import lombok.ToString;
028import lombok.extern.log4j.Log4j2;
029import org.zeromq.SocketType;
030import org.zeromq.ZMQ;
031
032/**
033 * {@link Server} {@link Channel}.
034 *
035 * {@bean.info}
036 *
037 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball}
038 */
039@Data @Log4j2
040public abstract class Channel {
041    @NonNull private final Server server;
042    @NonNull private final SocketType socketType;
043    private final Queue<Dispatcher> dispatcherQueue = new ConcurrentLinkedQueue<>();
044
045    /**
046     * Method to schedule creation of and binding to a {@link ZMQ.Socket}
047     * for this address.
048     *
049     * @param   connection      The kernel {@link Connection}.
050     * @param   address         The address of the {@link ZMQ.Socket} to be
051     *                          created.
052     */
053    public void connect(Connection connection, String address) {
054        Dispatcher dispatcher = new Dispatcher(this, connection, address);
055
056        getDispatcherQueue().add(dispatcher);
057        getServer().submit(dispatcher);
058
059        getServer().setCorePoolSize(Math.max(getServer().getActiveCount() + 4, getServer().getCorePoolSize()));
060    }
061
062    /**
063     * Callback method to receive and dispatch a {@link Message}.  This
064     * method is called on the same thread that the {@link ZMQ.Socket} was
065     * created on and the implementation may call {@link ZMQ.Socket} methods
066     * (including {@code send()}).
067     *
068     * @param   dispatcher      The {@link Dispatcher}.
069     * @param   socket          The {@link ZMQ.Socket}.
070     * @param   frame           The first message frame.
071     */
072    protected abstract void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame);
073
074    /**
075     * Callback method to {@link Server#stamp(Message) stamp} and dispatch a
076     * {@link Message}.  This method is called on the same thread that the
077     * {@link ZMQ.Socket} was created on and the implementation may call
078     * {@link ZMQ.Socket} methods (including {@code send()}).
079     *
080     * @param   dispatcher      The {@link Dispatcher}.
081     * @param   socket          The {@link ZMQ.Socket}.
082     * @param   message         The {@link Message}.
083     */
084    protected void send(Dispatcher dispatcher, ZMQ.Socket socket, Message message) {
085        getServer().stamp(message);
086
087        log.debug("{}\n{}", dispatcher.getAddress(), message);
088
089        message.send(dispatcher.getConnection(), socket);
090    }
091
092    /**
093     * Standard
094     * {@link.uri https://jupyter-client.readthedocs.io/en/latest/messaging.html#heartbeat-for-kernels Heartbeat}
095     * {@link Channel}.
096     *
097     * {@bean.info}
098     */
099    @ToString @Log4j2
100    public static class Heartbeat extends Channel {
101
102        /**
103         * Sole constructor.
104         *
105         * @param  server       The {@link Server}.
106         */
107        public Heartbeat(Server server) { super(server, SocketType.REP); }
108
109        @Override
110        protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame) {
111            socket.send(frame);
112        }
113    }
114
115    /**
116     * Jupyter protocol {@link Channel} abstract base class.
117     *
118     * {@bean.info}
119     */
120    @ToString @Log4j2
121    public static abstract class Protocol extends Channel {
122
123        /**
124         * Sole constructor.
125         *
126         * @param  server       The {@link Server}.
127         * @param  type         The {@link SocketType} for created
128         *                      {@link ZMQ.Socket}s.
129         */
130        public Protocol(Server server, SocketType type) { super(server, type); }
131
132        /**
133         * Callback method to dispatch a {@link Message}.  Default
134         * implementation looks for a declared method
135         * {@code action(Dispatcher,ZMQ.Socket,Message)}.
136         *
137         * @param  dispatcher   The {@link Dispatcher}.
138         * @param  socket       The {@link ZMQ.Socket}.
139         * @param  message      The {@link Message}.
140         */
141        protected abstract void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message);
142
143        @Override
144        protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame) {
145            try {
146                var message = Message.receive(dispatcher.getConnection(), socket, frame);
147
148                log.debug("{}\n{}", dispatcher.getAddress(), message);
149
150                dispatch(dispatcher, socket, message);
151            } catch (Exception exception) {
152                log.warn("{}", exception);
153            }
154        }
155    }
156
157    /**
158     * {@link Control Control} {@link Channel} abstract base class.  The
159     * default implementation of
160     * {@link #dispatch(Dispatcher,ZMQ.Socket,Message)} constructs a
161     * {@link Message reply} skeleton, executes a declared method of the
162     * form {@code action(Dispatcher,Message,Message) throws Exception},
163     * catches any {@link Exception} and updates the reply as necessary, and
164     * sends the reply.
165     *
166     * {@bean.info}
167     */
168    @ToString @Log4j2
169    public static abstract class Control extends Protocol {
170        private interface PROTOTYPE {
171            public void action(Dispatcher dispatcher, Message request, Message reply) throws Exception;
172        }
173
174        private static final Method PROTOTYPE;
175
176        static {
177            PROTOTYPE = PROTOTYPE.class.getDeclaredMethods()[0];
178            PROTOTYPE.setAccessible(true);
179        }
180
181        /**
182         * Sole constructor.
183         *
184         * @param  server       The {@link Server}.
185         */
186        protected Control(Server server) { super(server, SocketType.ROUTER); }
187
188        @Override
189        protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message) {
190            var action = message.getMessageTypeAction();
191
192            if (action != null) {
193                var reply = message.reply();
194
195                try {
196                    var method = getClass().getDeclaredMethod(action, PROTOTYPE.getParameterTypes());
197
198                    method.setAccessible(true);
199                    method.invoke(this, dispatcher, message, reply);
200                } catch (Throwable throwable) {
201                    if (throwable instanceof InvocationTargetException) {
202                        if (throwable.getCause() != null) {
203                            throwable = throwable.getCause();
204                        }
205                    }
206
207                    reply.status(throwable);
208                } finally {
209                    if (reply != null) {
210                        send(dispatcher, socket, reply);
211                    }
212                }
213            } else {
214                log.warn("Could not determine action from {}", message.header());
215            }
216        }
217    }
218
219    /**
220     * {@link IOPub IOPub} {@link Channel}.
221     *
222     * {@bean.info}
223     */
224    @ToString @Log4j2
225    public static class IOPub extends Protocol {
226
227        /**
228         * Sole constructor.
229         *
230         * @param  server       The {@link Server}.
231         */
232        public IOPub(Server server) { super(server, SocketType.PUB); }
233
234        /**
235         * Method to schedule a {@link Message} for publishing.
236         *
237         * @param   message     The {@link Message} to send.
238         */
239        public void pub(Message message) {
240            getDispatcherQueue().forEach(t -> t.pub(message));
241        }
242
243        @Override
244        protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, byte[] frame) {
245            throw new UnsupportedOperationException();
246        }
247
248        @Override
249        protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message) {
250            send(dispatcher, socket, message);
251        }
252    }
253
254    /**
255     * {@link Stdin Stdin} {@link Channel} abstract base class.
256     *
257     * {@bean.info}
258     */
259    @ToString @Log4j2
260    public static abstract class Stdin extends Protocol {
261
262        /**
263         * Sole constructor.
264         *
265         * @param  server       The {@link Server}.
266         */
267        protected Stdin(Server server) { super(server, SocketType.ROUTER); }
268    }
269
270    /**
271     * {@link Shell Shell} {@link Channel} abstract base class.
272     *
273     * {@bean.info}
274     */
275    @ToString @Log4j2
276    public static abstract class Shell extends Control {
277        private final IOPub iopub;
278        private final Stdin stdin;
279
280        /**
281         * Sole constructor.
282         *
283         * @param  server       The {@link Server}.
284         * @param  iopub        The associated {@link IOPub IOPub}
285         *                      {@link Channel}.
286         * @param  stdin        The associated {@link Stdin Stdin}
287         *                      {@link Channel}.
288         */
289        protected Shell(Server server, IOPub iopub, Stdin stdin) {
290            super(server);
291
292            this.iopub = iopub;
293            this.stdin = stdin;
294        }
295
296        @Override
297        protected void dispatch(Dispatcher dispatcher, ZMQ.Socket socket, Message message) {
298            try {
299                iopub.pub(message.status(Message.status.busy));
300
301                super.dispatch(dispatcher, socket, message);
302            } finally {
303                iopub.pub(message.status(Message.status.idle));
304            }
305        }
306    }
307}