001package ball.upnp.ssdp;
002/*-
003 * ##########################################################################
004 * UPnP/SSDP Implementation Classes
005 * $Id: SSDPDiscoveryService.java 7215 2021-01-03 18:39:51Z ball $
006 * $HeadURL: svn+ssh://svn.hcf.dev/var/spool/scm/repository.svn/ball-upnp/trunk/src/main/java/ball/upnp/ssdp/SSDPDiscoveryService.java $
007 * %%
008 * Copyright (C) 2013 - 2021 Allen D. Ball
009 * %%
010 * Licensed under the Apache License, Version 2.0 (the "License");
011 * you may not use this file except in compliance with the License.
012 * You may obtain a copy of the License at
013 *
014 *      http://www.apache.org/licenses/LICENSE-2.0
015 *
016 * Unless required by applicable law or agreed to in writing, software
017 * distributed under the License is distributed on an "AS IS" BASIS,
018 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
019 * See the License for the specific language governing permissions and
020 * limitations under the License.
021 * ##########################################################################
022 */
023import ball.upnp.RootDevice;
024import ball.upnp.SSDP;
025import java.io.IOException;
026import java.net.DatagramPacket;
027import java.net.DatagramSocket;
028import java.net.MulticastSocket;
029import java.net.SocketAddress;
030import java.net.SocketException;
031import java.net.SocketTimeoutException;
032import java.net.URI;
033import java.util.Comparator;
034import java.util.LinkedList;
035import java.util.List;
036import java.util.Objects;
037import java.util.Random;
038import java.util.concurrent.ConcurrentHashMap;
039import java.util.concurrent.CopyOnWriteArrayList;
040import java.util.concurrent.ScheduledFuture;
041import java.util.concurrent.ScheduledThreadPoolExecutor;
042import java.util.stream.Stream;
043import lombok.ToString;
044import org.apache.http.ParseException;
045
046import static java.nio.charset.StandardCharsets.UTF_8;
047import static java.util.concurrent.TimeUnit.MILLISECONDS;
048import static java.util.concurrent.TimeUnit.SECONDS;
049import static java.util.stream.Collectors.joining;
050import static java.util.stream.Collectors.toList;
051import static org.apache.commons.lang3.StringUtils.EMPTY;
052import static org.apache.commons.lang3.StringUtils.SPACE;
053
054/**
055 * SSDP discovery {@link ScheduledThreadPoolExecutor} implementation.
056 *
057 * {@bean.info}
058 *
059 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball}
060 * @version $Revision: 7215 $
061 */
062public class SSDPDiscoveryService extends ScheduledThreadPoolExecutor {
063    private static final String OS =
064        Stream.of("os.name", "os.version")
065        .map(System::getProperty)
066        .map(t -> t.replaceAll("[\\p{Space}]+", EMPTY))
067        .collect(joining("/"));
068    private static final String UPNP = "UPnP/2.0";
069
070    private final String server;
071    private final int bootId = (int) (System.currentTimeMillis() / 1000);
072    private final Random random = new Random();
073    private final MulticastSocket multicast;
074    private final DatagramSocket unicast;
075    private final CopyOnWriteArrayList<Listener> listeners =
076        new CopyOnWriteArrayList<>();
077    private final ConcurrentHashMap<RootDevice,ScheduledFuture<?>> advertisers =
078        new ConcurrentHashMap<>();
079
080    /**
081     * Sole constructor.
082     *
083     * @param   product         The {@code product/version} {@link String}
084     *                          identifying this UPnP application.
085     *
086     * @throws  IOException     If the underlying {@link MulticastSocket}
087     *                          cannot be conditioned.
088     */
089    public SSDPDiscoveryService(String product) throws IOException {
090        super(8);
091
092        server =
093            Stream.of(OS, UPNP, product)
094            .filter(Objects::nonNull)
095            .collect(joining(SPACE));
096
097        random.setSeed(System.currentTimeMillis());
098
099        multicast = new SSDPMulticastSocket();
100        /*
101         * Bind to an {@link.rfc 4340} ephemeral port.
102         */
103        DatagramSocket socket = null;
104        List<Integer> ports =
105            random.ints(49152, 65536).limit(256).boxed().collect(toList());
106
107        for (;;) {
108            try {
109                socket = new DatagramSocket(ports.remove(0));
110                break;
111            } catch (SocketException exception) {
112                if (ports.isEmpty()) {
113                    throw exception;
114                } else {
115                    continue;
116                }
117            }
118        }
119
120        unicast = socket;
121
122        addListener(new MSEARCH());
123
124        submit(() -> receive(multicast));
125        submit(() -> receive(unicast));
126    }
127
128    /**
129     * {@code SERVER} and {@code USER-AGENT}
130     *
131     * @return  {@code SERVER} and {@code USER-AGENT}
132     */
133    public String getUserAgent() { return server; }
134
135    /**
136     * {@code BOOTID.UPNP.ORG}
137     *
138     * @return  {@code bootId}
139     */
140    public int getBootId() { return bootId; }
141
142    /**
143     * {@code NEXTBOOTID.UPNP.ORG}
144     *
145     * @return  {@code nextBootId}
146     */
147    public int getNextBootId() { throw new UnsupportedOperationException(); }
148
149    /**
150     * {@code SEARCHPORT.UPNP.ORG}
151     *
152     * @return  Search port.
153     */
154    public int getSearchPort() { return unicast.getLocalPort(); }
155
156    /**
157     * Method to add a {@link Listener}.
158     *
159     * @param   listener        The {@link Listener}.
160     *
161     * @return  {@link.this}
162     */
163    public SSDPDiscoveryService addListener(Listener listener) {
164        if ((! listeners.contains(listener)) && listeners.add(listener)) {
165            listener.register(this);
166        }
167
168        return this;
169    }
170
171    /**
172     * Method to remove a {@link Listener}.
173     *
174     * @param   listener        The {@link Listener}.
175     *
176     * @return  {@link.this}
177     */
178    public SSDPDiscoveryService removeListener(Listener listener) {
179        if (listeners.remove(listener)) {
180            listener.register(this);
181        }
182
183        return this;
184    }
185
186    private void fireSendEvent(DatagramSocket socket, SSDPMessage message) {
187        listeners.stream().forEach(t -> t.sendEvent(this, socket, message));
188    }
189
190    private void fireReceiveEvent(DatagramSocket socket, SSDPMessage message) {
191        listeners.stream().forEach(t -> t.receiveEvent(this, socket, message));
192    }
193
194    /**
195     * Method to add a {@link RootDevice} to advertise.
196     *
197     * @param   device          The {@link RootDevice} to advertise.
198     * @param   rate            The rate (in seconds) to repeat
199     *                          advertisements.
200     *
201     * @return  {@link.this}
202     */
203    public SSDPDiscoveryService advertise(RootDevice device,
204                                          int rate) {
205        ScheduledFuture<?> future =
206            scheduleAtFixedRate(() -> alive(device),
207                                advertisers.size(), rate, SECONDS);
208
209        future = advertisers.put(device, future);
210
211        if (future != null) {
212            future.cancel(true);
213        }
214
215        return this;
216    }
217
218    private void alive(RootDevice device) {
219        device.notify((nt, usn) -> multicast(new Alive(nt, usn, device)));
220    }
221
222    private void byebye(RootDevice device) {
223        device.notify((nt, usn) -> multicast(new ByeBye(nt, usn, device)));
224    }
225
226    /**
227     * Send multicast {@code M-SEARCH} messsage.
228     *
229     * @param   mx              The {@code MX} header value.
230     * @param   st              The {@code ST} header value.
231     */
232    public void msearch(int mx, URI st) { multicast(0, new MSearch(mx, st)); }
233
234    /**
235     * Method to queue an {@link SSDPMessage} for multicast without delay.
236     *
237     * @param   message         The {@link SSDPMessage} to send.
238     */
239    public void multicast(SSDPMessage message) {
240        send(0, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message);
241    }
242
243    /**
244     * Method to queue an {@link SSDPMessage} for multicast with delay.
245     *
246     * @param   delay           Time to delay (in milliseconds) before
247     *                          sending.
248     * @param   message         The {@link SSDPMessage} to send.
249     */
250    public void multicast(long delay, SSDPMessage message) {
251        send(delay, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message);
252    }
253
254    /**
255     * Method to queue an {@link SSDPMessage} for sending without delay.
256     *
257     * @param   address         The destination {@link SocketAddress}.
258     * @param   message         The {@link SSDPMessage} to send.
259     */
260    public void send(SocketAddress address, SSDPMessage message) {
261        send(0, address, message);
262    }
263
264    /**
265     * Method to queue an {@link SSDPMessage} for sending with delay.
266     *
267     * @param   delay           Time to delay (in milliseconds) before
268     *                          sending.
269     * @param   address         The destination {@link SocketAddress}.
270     * @param   message         The {@link SSDPMessage} to send.
271     */
272    public void send(long delay, SocketAddress address, SSDPMessage message) {
273        byte[] bytes = message.toString().getBytes(UTF_8);
274        DatagramPacket packet =
275            new DatagramPacket(bytes, 0, bytes.length, address);
276
277        schedule(() -> task(message, packet), delay, MILLISECONDS);
278    }
279
280    private void task(SSDPMessage message, DatagramPacket packet) {
281        try {
282            fireSendEvent(unicast, message);
283            unicast.send(packet);
284        } catch (IOException exception) {
285        }
286    }
287
288    /**
289     * Method to queue a {@link List} of {@link SSDPMessage}s for sending
290     * with a {@code MX} parameter.  Messages are sent in list order with
291     * random delays none greater that {@code MX} seconds.
292     *
293     * @param   mx              Maximum delay (in seconds) before sending.
294     * @param   address         The destination {@link SocketAddress}.
295     * @param   messages        The {@link List} of {@link SSDPMessage}s to
296     *                          send.
297     */
298    public void send(int mx,
299                     SocketAddress address,
300                     List<? extends SSDPMessage> messages) {
301        List<Long> delays =
302            messages.stream()
303            .map(t -> random.nextInt((int) SECONDS.toMillis(mx)))
304            .map(t -> SECONDS.toMillis(1) + t)
305            .collect(toList());
306
307        delays.sort(Comparator.naturalOrder());
308
309        messages.stream().forEach(t -> send(delays.remove(0), address, t));
310    }
311
312    private void receive(DatagramSocket socket) {
313        try {
314            socket.setSoTimeout((int) MILLISECONDS.convert(15, SECONDS));
315
316            for (;;) {
317                try {
318                    byte[] bytes = new byte[8 * 1024];
319                    DatagramPacket packet =
320                        new DatagramPacket(bytes, bytes.length);
321
322                    socket.receive(packet);
323
324                    SSDPMessage message = parse(packet);
325
326                    if (message != null) {
327                        fireReceiveEvent(socket, message);
328                    }
329                } catch (SocketTimeoutException exception) {
330                }
331            }
332        } catch (IOException exception) {
333        }
334    }
335
336    private SSDPMessage parse(DatagramPacket packet) {
337        SSDPMessage message = null;
338
339        if (message == null) {
340            try {
341                message = SSDPResponse.from(packet);
342            } catch (ParseException exception) {
343            }
344        }
345
346        if (message == null) {
347            try {
348                message = SSDPRequest.from(packet);
349            } catch (ParseException exception) {
350            }
351        }
352
353        return message;
354    }
355
356    @Override
357    public void shutdown() {
358        advertisers.values().stream().forEach(t -> t.cancel(true));
359        advertisers.keySet().stream().forEach(t -> byebye(t));
360        advertisers.clear();
361
362        super.shutdown();
363    }
364
365    /**
366     * {@link SSDPDiscoveryService} listener interface definition.
367     */
368    public interface Listener {
369
370        /**
371         * Callback when a {@link Listener} is added to a
372         * {@link SSDPDiscoveryService}.
373         *
374         * @param       service         The {@link SSDPDiscoveryService}.
375         */
376        public void register(SSDPDiscoveryService service);
377
378        /**
379         * Callback when a {@link Listener} is removed from a
380         * {@link SSDPDiscoveryService}.
381         *
382         * @param       service         The {@link SSDPDiscoveryService}.
383         */
384        public void unregister(SSDPDiscoveryService service);
385
386        /**
387         * Callback made just before sending a {@link SSDPMessage}.
388         *
389         * @param       service         The {@link SSDPDiscoveryService}.
390         * @param       socket          The {@link DatagramSocket}.
391         * @param       message         The {@link SSDPMessage}.
392         */
393        public void sendEvent(SSDPDiscoveryService service,
394                              DatagramSocket socket,
395                              SSDPMessage message);
396
397        /**
398         * Callback made after receiving a {@link SSDPMessage}.
399         *
400         * @param       service         The {@link SSDPDiscoveryService}.
401         * @param       socket          The {@link DatagramSocket}.
402         * @param       message         The {@link SSDPMessage}.
403         */
404        public void receiveEvent(SSDPDiscoveryService service,
405                                 DatagramSocket socket,
406                                 SSDPMessage message);
407    }
408
409    /**
410     * {@link SSDPDiscoveryService} {@link SSDPRequest} handler.
411     */
412    public static abstract class RequestHandler implements Listener {
413        private final SSDPRequest.Method method;
414
415        /**
416         * Sole constructor.
417         *
418         * @param       method          The {@link SSDPRequest.Method} to
419         *                              handle.
420         */
421        protected RequestHandler(SSDPRequest.Method method) {
422            this.method = Objects.requireNonNull(method);
423        }
424
425        public abstract void run(SSDPDiscoveryService service,
426                                 DatagramSocket socket, SSDPRequest request);
427
428        @Override
429        public void register(SSDPDiscoveryService service) { }
430
431        @Override
432        public void unregister(SSDPDiscoveryService service) { }
433
434        @Override
435        public void receiveEvent(SSDPDiscoveryService service,
436                                 DatagramSocket socket, SSDPMessage message) {
437            if (message instanceof SSDPRequest) {
438                SSDPRequest request = (SSDPRequest) message;
439
440                if (method.is(request.getMethod())) {
441                    service.submit(() -> run(service, socket, request));
442                }
443            }
444        }
445
446        @Override
447        public void sendEvent(SSDPDiscoveryService service,
448                              DatagramSocket socket, SSDPMessage message) {
449        }
450    }
451
452    /**
453     * {@link SSDPDiscoveryService} {@link SSDPResponse} handler.
454     */
455    public static abstract class ResponseHandler implements Listener {
456
457        /**
458         * Sole constructor.
459         */
460        protected ResponseHandler() { }
461
462        public abstract void run(SSDPDiscoveryService service,
463                                 DatagramSocket socket, SSDPResponse request);
464
465        @Override
466        public void register(SSDPDiscoveryService service) { }
467
468        @Override
469        public void unregister(SSDPDiscoveryService service) { }
470
471        @Override
472        public void receiveEvent(SSDPDiscoveryService service,
473                                 DatagramSocket socket, SSDPMessage message) {
474            if (message instanceof SSDPResponse) {
475                service.submit(() -> run(service, socket, (SSDPResponse) message));
476            }
477        }
478
479        @Override
480        public void sendEvent(SSDPDiscoveryService service,
481                              DatagramSocket socket, SSDPMessage message) {
482        }
483    }
484
485    @ToString
486    private class MSEARCH extends RequestHandler {
487        public MSEARCH() { super(SSDPRequest.Method.MSEARCH); }
488
489        @Override
490        public void run(SSDPDiscoveryService service,
491                        DatagramSocket socket, SSDPRequest request) {
492            try {
493                if (isHeaderValue(request, SSDPMessage.MAN, "\"ssdp:discover\"")) {
494                    int mx = request.getMX();
495                    SocketAddress address = request.getSocketAddress();
496                    List<SSDPMessage> list = new LinkedList<>();
497                    URI st = request.getST();
498                    boolean all = SSDPMessage.SSDP_ALL.equals(st);
499
500                    advertisers.keySet()
501                        .stream()
502                        .forEach(device -> device.notify((nt, usn) -> {
503                                    if (SSDP.matches(st, nt)) {
504                                        list.add(new MSearch(service, all ? nt : st, usn, device));
505                                    }
506                                }));
507
508                    service.send(mx, address, list);
509                }
510            } catch (Exception exception) {
511                /* log.error("{}", exception.getMessage(), exception); */
512            }
513        }
514
515        private boolean isHeaderValue(SSDPRequest request,
516                                      String header, String value) {
517            return Objects.equals(request.getHeaderValue(header), value);
518        }
519
520        private class MSearch extends SSDPResponse {
521            public MSearch(SSDPDiscoveryService service,
522                           URI st, URI usn, RootDevice device) {
523                super(SC_OK, "OK");
524
525                header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge());
526                header(DATE, GENERATOR.getCurrentDate());
527                header(EXT, (String) null);
528                header(LOCATION, device.getLocation());
529                header(SERVER, service.getUserAgent());
530                header(ST, st);
531                header(USN, usn);
532                header(BOOTID_UPNP_ORG, service.getBootId());
533                header(CONFIGID_UPNP_ORG, device.getConfigId());
534                header(SEARCHPORT_UPNP_ORG, service.getSearchPort());
535            }
536        }
537    }
538
539    private class MSearch extends SSDPRequest {
540        public MSearch(int mx, URI st) {
541            super(Method.MSEARCH);
542
543            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
544            header(MAN, "\"ssdp:discover\"");
545            header(MX, mx);
546            header(ST, st);
547            header(USER_AGENT, getUserAgent());
548        }
549    }
550
551    private class Alive extends SSDPRequest {
552        public Alive(URI nt, URI usn, RootDevice device) {
553            super(Method.NOTIFY);
554
555            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
556            header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge());
557            header(NT, nt);
558            header(NTS, SSDP_ALIVE);
559            header(SERVER, getUserAgent());
560            header(USN, usn);
561            header(LOCATION, device.getLocation());
562            header(BOOTID_UPNP_ORG, getBootId());
563            header(CONFIGID_UPNP_ORG, device.getConfigId());
564            header(SEARCHPORT_UPNP_ORG, getSearchPort());
565        }
566    }
567
568    private class ByeBye extends SSDPRequest {
569        public ByeBye(URI nt, URI usn, RootDevice device) {
570            super(Method.NOTIFY);
571
572            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
573            header(NT, nt);
574            header(NTS, SSDP_BYEBYE);
575            header(USN, usn);
576            header(BOOTID_UPNP_ORG, getBootId());
577            header(CONFIGID_UPNP_ORG, device.getConfigId());
578        }
579    }
580
581    private class Update extends SSDPRequest {
582        public Update(URI nt, URI usn, RootDevice device) {
583            super(Method.NOTIFY);
584
585            header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS);
586            header(LOCATION, device.getLocation());
587            header(NT, nt);
588            header(NTS, SSDP_UPDATE);
589            header(USN, usn);
590            header(BOOTID_UPNP_ORG, getBootId());
591            header(CONFIGID_UPNP_ORG, device.getConfigId());
592            header(NEXTBOOTID_UPNP_ORG, getNextBootId());
593            header(SEARCHPORT_UPNP_ORG, getSearchPort());
594        }
595    }
596}