001package ball.upnp.ssdp;
002/*-
003 * ##########################################################################
004 * UPnP/SSDP Implementation Classes
005 * %%
006 * Copyright (C) 2013 - 2022 Allen D. Ball
007 * %%
008 * Licensed under the Apache License, Version 2.0 (the "License");
009 * you may not use this file except in compliance with the License.
010 * You may obtain a copy of the License at
011 *
012 *      http://www.apache.org/licenses/LICENSE-2.0
013 *
014 * Unless required by applicable law or agreed to in writing, software
015 * distributed under the License is distributed on an "AS IS" BASIS,
016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
017 * See the License for the specific language governing permissions and
018 * limitations under the License.
019 * ##########################################################################
020 */
021import ball.upnp.RootDevice;
022import ball.upnp.SSDP;
023import java.io.IOException;
024import java.net.DatagramPacket;
025import java.net.DatagramSocket;
026import java.net.MulticastSocket;
027import java.net.SocketAddress;
028import java.net.SocketException;
029import java.net.SocketTimeoutException;
030import java.net.URI;
031import java.util.Comparator;
032import java.util.LinkedList;
033import java.util.List;
034import java.util.Objects;
035import java.util.Random;
036import java.util.concurrent.ConcurrentHashMap;
037import java.util.concurrent.CopyOnWriteArrayList;
038import java.util.concurrent.ScheduledFuture;
039import java.util.concurrent.ScheduledThreadPoolExecutor;
040import java.util.stream.Stream;
041import lombok.ToString;
042import org.apache.hc.core5.http.ParseException;
043import org.apache.hc.core5.http.protocol.HttpDateGenerator;
044
045import static java.nio.charset.StandardCharsets.UTF_8;
046import static java.util.concurrent.TimeUnit.MILLISECONDS;
047import static java.util.concurrent.TimeUnit.SECONDS;
048import static java.util.stream.Collectors.joining;
049import static java.util.stream.Collectors.toList;
050import static org.apache.commons.lang3.StringUtils.EMPTY;
051import static org.apache.commons.lang3.StringUtils.SPACE;
052import static org.apache.hc.core5.http.HttpStatus.SC_OK;
053
054/**
055 * SSDP discovery {@link ScheduledThreadPoolExecutor} implementation.
056 *
057 * {@bean.info}
058 *
059 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball}
060 */
061public class SSDPDiscoveryService extends ScheduledThreadPoolExecutor {
062    private static final String OS =
063        Stream.of("os.name", "os.version")
064        .map(System::getProperty)
065        .map(t -> t.replaceAll("[\\p{Space}]+", EMPTY))
066        .collect(joining("/"));
067    private static final String UPNP = "UPnP/2.0";
068
069    private final String server;
070    private final int bootId = (int) (System.currentTimeMillis() / 1000);
071    private final Random random = new Random();
072    private final MulticastSocket multicast;
073    private final DatagramSocket unicast;
074    private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
075    private final ConcurrentHashMap<RootDevice,ScheduledFuture<?>> advertisers = new ConcurrentHashMap<>();
076
077    /**
078     * Sole constructor.
079     *
080     * @param   product         The {@code product/version} {@link String}
081     *                          identifying this UPnP application.
082     *
083     * @throws  IOException     If the underlying {@link MulticastSocket}
084     *                          cannot be conditioned.
085     */
086    public SSDPDiscoveryService(String product) throws IOException {
087        super(8);
088
089        server =
090            Stream.of(OS, UPNP, product)
091            .filter(Objects::nonNull)
092            .collect(joining(SPACE));
093
094        random.setSeed(System.currentTimeMillis());
095
096        multicast = new SSDPMulticastSocket();
097        /*
098         * Bind to an {@link.rfc 4340} ephemeral port.
099         */
100        DatagramSocket socket = null;
101        List<Integer> ports = random.ints(49152, 65536).limit(256).boxed().collect(toList());
102
103        for (;;) {
104            try {
105                socket = new DatagramSocket(ports.remove(0));
106                break;
107            } catch (SocketException exception) {
108                if (ports.isEmpty()) {
109                    throw exception;
110                } else {
111                    continue;
112                }
113            }
114        }
115
116        unicast = socket;
117
118        addListener(new MSEARCH());
119
120        submit(() -> receive(multicast));
121        submit(() -> receive(unicast));
122    }
123
124    /**
125     * {@code SERVER} and {@code USER-AGENT}
126     *
127     * @return  {@code SERVER} and {@code USER-AGENT}
128     */
129    public String getUserAgent() { return server; }
130
131    /**
132     * {@code BOOTID.UPNP.ORG}
133     *
134     * @return  {@code bootId}
135     */
136    public int getBootId() { return bootId; }
137
138    /**
139     * {@code NEXTBOOTID.UPNP.ORG}
140     *
141     * @return  {@code nextBootId}
142     */
143    public int getNextBootId() { throw new UnsupportedOperationException(); }
144
145    /**
146     * {@code SEARCHPORT.UPNP.ORG}
147     *
148     * @return  Search port.
149     */
150    public int getSearchPort() { return unicast.getLocalPort(); }
151
152    /**
153     * Method to add a {@link Listener}.
154     *
155     * @param   listener        The {@link Listener}.
156     *
157     * @return  {@link.this}
158     */
159    public SSDPDiscoveryService addListener(Listener listener) {
160        if ((! listeners.contains(listener)) && listeners.add(listener)) {
161            listener.register(this);
162        }
163
164        return this;
165    }
166
167    /**
168     * Method to remove a {@link Listener}.
169     *
170     * @param   listener        The {@link Listener}.
171     *
172     * @return  {@link.this}
173     */
174    public SSDPDiscoveryService removeListener(Listener listener) {
175        if (listeners.remove(listener)) {
176            listener.register(this);
177        }
178
179        return this;
180    }
181
182    private void fireSendEvent(DatagramSocket socket, SSDPMessage message) {
183        listeners.stream().forEach(t -> t.sendEvent(this, socket, message));
184    }
185
186    private void fireReceiveEvent(DatagramSocket socket, SSDPMessage message) {
187        listeners.stream().forEach(t -> t.receiveEvent(this, socket, message));
188    }
189
190    /**
191     * Method to add a {@link RootDevice} to advertise.
192     *
193     * @param   device          The {@link RootDevice} to advertise.
194     * @param   rate            The rate (in seconds) to repeat
195     *                          advertisements.
196     *
197     * @return  {@link.this}
198     */
199    public SSDPDiscoveryService advertise(RootDevice device, int rate) {
200        ScheduledFuture<?> future = scheduleAtFixedRate(() -> alive(device), advertisers.size(), rate, SECONDS);
201
202        future = advertisers.put(device, future);
203
204        if (future != null) {
205            future.cancel(true);
206        }
207
208        return this;
209    }
210
211    private void alive(RootDevice device) {
212        device.notify((nt, usn) -> multicast(new Alive(nt, usn, device)));
213    }
214
215    private void byebye(RootDevice device) {
216        device.notify((nt, usn) -> multicast(new ByeBye(nt, usn, device)));
217    }
218
219    /**
220     * Send multicast {@code M-SEARCH} messsage.
221     *
222     * @param   mx              The {@code MX} header value.
223     * @param   st              The {@code ST} header value.
224     */
225    public void msearch(int mx, URI st) { multicast(0, new MSearch(mx, st)); }
226
227    /**
228     * Method to queue an {@link SSDPMessage} for multicast without delay.
229     *
230     * @param   message         The {@link SSDPMessage} to send.
231     */
232    public void multicast(SSDPMessage message) {
233        send(0, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message);
234    }
235
236    /**
237     * Method to queue an {@link SSDPMessage} for multicast with delay.
238     *
239     * @param   delay           Time to delay (in milliseconds) before
240     *                          sending.
241     * @param   message         The {@link SSDPMessage} to send.
242     */
243    public void multicast(long delay, SSDPMessage message) {
244        send(delay, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message);
245    }
246
247    /**
248     * Method to queue an {@link SSDPMessage} for sending without delay.
249     *
250     * @param   address         The destination {@link SocketAddress}.
251     * @param   message         The {@link SSDPMessage} to send.
252     */
253    public void send(SocketAddress address, SSDPMessage message) {
254        send(0, address, message);
255    }
256
257    /**
258     * Method to queue an {@link SSDPMessage} for sending with delay.
259     *
260     * @param   delay           Time to delay (in milliseconds) before
261     *                          sending.
262     * @param   address         The destination {@link SocketAddress}.
263     * @param   message         The {@link SSDPMessage} to send.
264     */
265    public void send(long delay, SocketAddress address, SSDPMessage message) {
266        byte[] bytes = message.toString().getBytes(UTF_8);
267        DatagramPacket packet = new DatagramPacket(bytes, 0, bytes.length, address);
268
269        schedule(() -> task(message, packet), delay, MILLISECONDS);
270    }
271
272    private void task(SSDPMessage message, DatagramPacket packet) {
273        try {
274            fireSendEvent(unicast, message);
275            unicast.send(packet);
276        } catch (IOException exception) {
277        }
278    }
279
280    /**
281     * Method to queue a {@link List} of {@link SSDPMessage}s for sending
282     * with a {@code MX} parameter.  Messages are sent in list order with
283     * random delays none greater that {@code MX} seconds.
284     *
285     * @param   mx              Maximum delay (in seconds) before sending.
286     * @param   address         The destination {@link SocketAddress}.
287     * @param   messages        The {@link List} of {@link SSDPMessage}s to
288     *                          send.
289     */
290    public void send(int mx, SocketAddress address, List<? extends SSDPMessage> messages) {
291        List<Long> delays =
292            messages.stream()
293            .map(t -> random.nextInt((int) SECONDS.toMillis(mx)))
294            .map(t -> SECONDS.toMillis(1) + t)
295            .collect(toList());
296
297        delays.sort(Comparator.naturalOrder());
298
299        messages.stream().forEach(t -> send(delays.remove(0), address, t));
300    }
301
302    private void receive(DatagramSocket socket) {
303        try {
304            socket.setSoTimeout((int) MILLISECONDS.convert(15, SECONDS));
305
306            for (;;) {
307                try {
308                    byte[] bytes = new byte[8 * 1024];
309                    DatagramPacket packet = new DatagramPacket(bytes, bytes.length);
310
311                    socket.receive(packet);
312
313                    SSDPMessage message = parse(packet);
314
315                    if (message != null) {
316                        fireReceiveEvent(socket, message);
317                    }
318                } catch (SocketTimeoutException exception) {
319                }
320            }
321        } catch (IOException exception) {
322        }
323    }
324
325    private SSDPMessage parse(DatagramPacket packet) {
326        SSDPMessage message = null;
327
328        if (message == null) {
329            try {
330                message = SSDPResponse.from(packet);
331            } catch (ParseException exception) {
332            }
333        }
334
335        if (message == null) {
336            try {
337                message = SSDPRequest.from(packet);
338            } catch (ParseException exception) {
339            }
340        }
341
342        return message;
343    }
344
345    @Override
346    public void shutdown() {
347        advertisers.values().stream().forEach(t -> t.cancel(true));
348        advertisers.keySet().stream().forEach(t -> byebye(t));
349        advertisers.clear();
350
351        super.shutdown();
352    }
353
354    /**
355     * {@link SSDPDiscoveryService} listener interface definition.
356     */
357    public interface Listener {
358
359        /**
360         * Callback when a {@link Listener} is added to a
361         * {@link SSDPDiscoveryService}.
362         *
363         * @param       service         The {@link SSDPDiscoveryService}.
364         */
365        public void register(SSDPDiscoveryService service);
366
367        /**
368         * Callback when a {@link Listener} is removed from a
369         * {@link SSDPDiscoveryService}.
370         *
371         * @param       service         The {@link SSDPDiscoveryService}.
372         */
373        public void unregister(SSDPDiscoveryService service);
374
375        /**
376         * Callback made just before sending a {@link SSDPMessage}.
377         *
378         * @param       service         The {@link SSDPDiscoveryService}.
379         * @param       socket          The {@link DatagramSocket}.
380         * @param       message         The {@link SSDPMessage}.
381         */
382        public void sendEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message);
383
384        /**
385         * Callback made after receiving a {@link SSDPMessage}.
386         *
387         * @param       service         The {@link SSDPDiscoveryService}.
388         * @param       socket          The {@link DatagramSocket}.
389         * @param       message         The {@link SSDPMessage}.
390         */
391        public void receiveEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message);
392    }
393
394    /**
395     * {@link SSDPDiscoveryService} {@link SSDPRequest} handler.
396     */
397    public static abstract class RequestHandler implements Listener {
398        private final SSDPRequest.Method method;
399
400        /**
401         * Sole constructor.
402         *
403         * @param       method          The {@link SSDPRequest.Method} to
404         *                              handle.
405         */
406        protected RequestHandler(SSDPRequest.Method method) {
407            this.method = Objects.requireNonNull(method);
408        }
409
410        public abstract void run(SSDPDiscoveryService service, DatagramSocket socket, SSDPRequest request);
411
412        @Override
413        public void register(SSDPDiscoveryService service) { }
414
415        @Override
416        public void unregister(SSDPDiscoveryService service) { }
417
418        @Override
419        public void receiveEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) {
420            if (message instanceof SSDPRequest) {
421                SSDPRequest request = (SSDPRequest) message;
422
423                if (method.is(request.getMethod())) {
424                    service.submit(() -> run(service, socket, request));
425                }
426            }
427        }
428
429        @Override
430        public void sendEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) {
431        }
432    }
433
434    /**
435     * {@link SSDPDiscoveryService} {@link SSDPResponse} handler.
436     */
437    public static abstract class ResponseHandler implements Listener {
438
439        /**
440         * Sole constructor.
441         */
442        protected ResponseHandler() { }
443
444        public abstract void run(SSDPDiscoveryService service, DatagramSocket socket, SSDPResponse request);
445
446        @Override
447        public void register(SSDPDiscoveryService service) { }
448
449        @Override
450        public void unregister(SSDPDiscoveryService service) { }
451
452        @Override
453        public void receiveEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) {
454            if (message instanceof SSDPResponse) {
455                service.submit(() -> run(service, socket, (SSDPResponse) message));
456            }
457        }
458
459        @Override
460        public void sendEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) {
461        }
462    }
463
464    @ToString
465    private class MSEARCH extends RequestHandler {
466        public MSEARCH() { super(SSDPRequest.Method.MSEARCH); }
467
468        @Override
469        public void run(SSDPDiscoveryService service, DatagramSocket socket, SSDPRequest request) {
470            try {
471                if (isHeaderValue(request, SSDPMessage.MAN, "\"ssdp:discover\"")) {
472                    int mx = request.getMX();
473                    SocketAddress address = request.getSocketAddress();
474                    List<SSDPMessage> list = new LinkedList<>();
475                    URI st = request.getST();
476                    boolean all = SSDPMessage.SSDP_ALL.equals(st);
477
478                    advertisers.keySet().stream()
479                        .forEach(device -> device.notify((nt, usn) -> {
480                                    if (SSDP.matches(st, nt)) {
481                                        list.add(new MSearch(service, all ? nt : st, usn, device));
482                                    }
483                                }));
484
485                    service.send(mx, address, list);
486                }
487            } catch (Exception exception) {
488                /* log.error("{}", exception.getMessage(), exception); */
489            }
490        }
491
492        private boolean isHeaderValue(SSDPRequest request, String header, String value) {
493            return Objects.equals(request.getHeaderValue(header), value);
494        }
495
496        private class MSearch extends SSDPResponse {
497            private static final long serialVersionUID = -2963023442177743880L;
498
499            public MSearch(SSDPDiscoveryService service, URI st, URI usn, RootDevice device) {
500                super(SC_OK, "OK");
501
502                header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge());
503                header(DATE, HttpDateGenerator.INSTANCE.getCurrentDate());
504                header(EXT, (String) null);
505                header(LOCATION, device.getLocation());
506                header(SERVER, service.getUserAgent());
507                header(ST, st);
508                header(USN, usn);
509                header(BOOTID_UPNP_ORG, service.getBootId());
510                header(CONFIGID_UPNP_ORG, device.getConfigId());
511                header(SEARCHPORT_UPNP_ORG, service.getSearchPort());
512            }
513        }
514    }
515
516    private class MSearch extends SSDPRequest {
517        private static final long serialVersionUID = -7606319039115145787L;
518
519        public MSearch(int mx, URI st) {
520            super(Method.MSEARCH);
521
522            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
523            header(MAN, "\"ssdp:discover\"");
524            header(MX, mx);
525            header(ST, st);
526            header(USER_AGENT, getUserAgent());
527        }
528    }
529
530    private class Alive extends SSDPRequest {
531        private static final long serialVersionUID = 1405796530322683498L;
532
533        public Alive(URI nt, URI usn, RootDevice device) {
534            super(Method.NOTIFY);
535
536            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
537            header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge());
538            header(NT, nt);
539            header(NTS, SSDP_ALIVE);
540            header(SERVER, getUserAgent());
541            header(USN, usn);
542            header(LOCATION, device.getLocation());
543            header(BOOTID_UPNP_ORG, getBootId());
544            header(CONFIGID_UPNP_ORG, device.getConfigId());
545            header(SEARCHPORT_UPNP_ORG, getSearchPort());
546        }
547    }
548
549    private class ByeBye extends SSDPRequest {
550        private static final long serialVersionUID = -4058077320857161188L;
551
552        public ByeBye(URI nt, URI usn, RootDevice device) {
553            super(Method.NOTIFY);
554
555            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
556            header(NT, nt);
557            header(NTS, SSDP_BYEBYE);
558            header(USN, usn);
559            header(BOOTID_UPNP_ORG, getBootId());
560            header(CONFIGID_UPNP_ORG, device.getConfigId());
561        }
562    }
563
564    private class Update extends SSDPRequest {
565        private static final long serialVersionUID = -3155461457458248824L;
566
567        public Update(URI nt, URI usn, RootDevice device) {
568            super(Method.NOTIFY);
569
570            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
571            header(LOCATION, device.getLocation());
572            header(NT, nt);
573            header(NTS, SSDP_UPDATE);
574            header(USN, usn);
575            header(BOOTID_UPNP_ORG, getBootId());
576            header(CONFIGID_UPNP_ORG, device.getConfigId());
577            header(NEXTBOOTID_UPNP_ORG, getNextBootId());
578            header(SEARCHPORT_UPNP_ORG, getSearchPort());
579        }
580    }
581}