001package ball.upnp.ssdp; 002/*- 003 * ########################################################################## 004 * UPnP/SSDP Implementation Classes 005 * $Id: SSDPDiscoveryService.java 7215 2021-01-03 18:39:51Z ball $ 006 * $HeadURL: svn+ssh://svn.hcf.dev/var/spool/scm/repository.svn/ball-upnp/trunk/src/main/java/ball/upnp/ssdp/SSDPDiscoveryService.java $ 007 * %% 008 * Copyright (C) 2013 - 2021 Allen D. Ball 009 * %% 010 * Licensed under the Apache License, Version 2.0 (the "License"); 011 * you may not use this file except in compliance with the License. 012 * You may obtain a copy of the License at 013 * 014 * http://www.apache.org/licenses/LICENSE-2.0 015 * 016 * Unless required by applicable law or agreed to in writing, software 017 * distributed under the License is distributed on an "AS IS" BASIS, 018 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 019 * See the License for the specific language governing permissions and 020 * limitations under the License. 021 * ########################################################################## 022 */ 023import ball.upnp.RootDevice; 024import ball.upnp.SSDP; 025import java.io.IOException; 026import java.net.DatagramPacket; 027import java.net.DatagramSocket; 028import java.net.MulticastSocket; 029import java.net.SocketAddress; 030import java.net.SocketException; 031import java.net.SocketTimeoutException; 032import java.net.URI; 033import java.util.Comparator; 034import java.util.LinkedList; 035import java.util.List; 036import java.util.Objects; 037import java.util.Random; 038import java.util.concurrent.ConcurrentHashMap; 039import java.util.concurrent.CopyOnWriteArrayList; 040import java.util.concurrent.ScheduledFuture; 041import java.util.concurrent.ScheduledThreadPoolExecutor; 042import java.util.stream.Stream; 043import lombok.ToString; 044import org.apache.http.ParseException; 045 046import static java.nio.charset.StandardCharsets.UTF_8; 047import static java.util.concurrent.TimeUnit.MILLISECONDS; 048import static java.util.concurrent.TimeUnit.SECONDS; 049import static java.util.stream.Collectors.joining; 050import static java.util.stream.Collectors.toList; 051import static org.apache.commons.lang3.StringUtils.EMPTY; 052import static org.apache.commons.lang3.StringUtils.SPACE; 053 054/** 055 * SSDP discovery {@link ScheduledThreadPoolExecutor} implementation. 056 * 057 * {@bean.info} 058 * 059 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball} 060 * @version $Revision: 7215 $ 061 */ 062public class SSDPDiscoveryService extends ScheduledThreadPoolExecutor { 063 private static final String OS = 064 Stream.of("os.name", "os.version") 065 .map(System::getProperty) 066 .map(t -> t.replaceAll("[\\p{Space}]+", EMPTY)) 067 .collect(joining("/")); 068 private static final String UPNP = "UPnP/2.0"; 069 070 private final String server; 071 private final int bootId = (int) (System.currentTimeMillis() / 1000); 072 private final Random random = new Random(); 073 private final MulticastSocket multicast; 074 private final DatagramSocket unicast; 075 private final CopyOnWriteArrayList<Listener> listeners = 076 new CopyOnWriteArrayList<>(); 077 private final ConcurrentHashMap<RootDevice,ScheduledFuture<?>> advertisers = 078 new ConcurrentHashMap<>(); 079 080 /** 081 * Sole constructor. 082 * 083 * @param product The {@code product/version} {@link String} 084 * identifying this UPnP application. 085 * 086 * @throws IOException If the underlying {@link MulticastSocket} 087 * cannot be conditioned. 088 */ 089 public SSDPDiscoveryService(String product) throws IOException { 090 super(8); 091 092 server = 093 Stream.of(OS, UPNP, product) 094 .filter(Objects::nonNull) 095 .collect(joining(SPACE)); 096 097 random.setSeed(System.currentTimeMillis()); 098 099 multicast = new SSDPMulticastSocket(); 100 /* 101 * Bind to an {@link.rfc 4340} ephemeral port. 102 */ 103 DatagramSocket socket = null; 104 List<Integer> ports = 105 random.ints(49152, 65536).limit(256).boxed().collect(toList()); 106 107 for (;;) { 108 try { 109 socket = new DatagramSocket(ports.remove(0)); 110 break; 111 } catch (SocketException exception) { 112 if (ports.isEmpty()) { 113 throw exception; 114 } else { 115 continue; 116 } 117 } 118 } 119 120 unicast = socket; 121 122 addListener(new MSEARCH()); 123 124 submit(() -> receive(multicast)); 125 submit(() -> receive(unicast)); 126 } 127 128 /** 129 * {@code SERVER} and {@code USER-AGENT} 130 * 131 * @return {@code SERVER} and {@code USER-AGENT} 132 */ 133 public String getUserAgent() { return server; } 134 135 /** 136 * {@code BOOTID.UPNP.ORG} 137 * 138 * @return {@code bootId} 139 */ 140 public int getBootId() { return bootId; } 141 142 /** 143 * {@code NEXTBOOTID.UPNP.ORG} 144 * 145 * @return {@code nextBootId} 146 */ 147 public int getNextBootId() { throw new UnsupportedOperationException(); } 148 149 /** 150 * {@code SEARCHPORT.UPNP.ORG} 151 * 152 * @return Search port. 153 */ 154 public int getSearchPort() { return unicast.getLocalPort(); } 155 156 /** 157 * Method to add a {@link Listener}. 158 * 159 * @param listener The {@link Listener}. 160 * 161 * @return {@link.this} 162 */ 163 public SSDPDiscoveryService addListener(Listener listener) { 164 if ((! listeners.contains(listener)) && listeners.add(listener)) { 165 listener.register(this); 166 } 167 168 return this; 169 } 170 171 /** 172 * Method to remove a {@link Listener}. 173 * 174 * @param listener The {@link Listener}. 175 * 176 * @return {@link.this} 177 */ 178 public SSDPDiscoveryService removeListener(Listener listener) { 179 if (listeners.remove(listener)) { 180 listener.register(this); 181 } 182 183 return this; 184 } 185 186 private void fireSendEvent(DatagramSocket socket, SSDPMessage message) { 187 listeners.stream().forEach(t -> t.sendEvent(this, socket, message)); 188 } 189 190 private void fireReceiveEvent(DatagramSocket socket, SSDPMessage message) { 191 listeners.stream().forEach(t -> t.receiveEvent(this, socket, message)); 192 } 193 194 /** 195 * Method to add a {@link RootDevice} to advertise. 196 * 197 * @param device The {@link RootDevice} to advertise. 198 * @param rate The rate (in seconds) to repeat 199 * advertisements. 200 * 201 * @return {@link.this} 202 */ 203 public SSDPDiscoveryService advertise(RootDevice device, 204 int rate) { 205 ScheduledFuture<?> future = 206 scheduleAtFixedRate(() -> alive(device), 207 advertisers.size(), rate, SECONDS); 208 209 future = advertisers.put(device, future); 210 211 if (future != null) { 212 future.cancel(true); 213 } 214 215 return this; 216 } 217 218 private void alive(RootDevice device) { 219 device.notify((nt, usn) -> multicast(new Alive(nt, usn, device))); 220 } 221 222 private void byebye(RootDevice device) { 223 device.notify((nt, usn) -> multicast(new ByeBye(nt, usn, device))); 224 } 225 226 /** 227 * Send multicast {@code M-SEARCH} messsage. 228 * 229 * @param mx The {@code MX} header value. 230 * @param st The {@code ST} header value. 231 */ 232 public void msearch(int mx, URI st) { multicast(0, new MSearch(mx, st)); } 233 234 /** 235 * Method to queue an {@link SSDPMessage} for multicast without delay. 236 * 237 * @param message The {@link SSDPMessage} to send. 238 */ 239 public void multicast(SSDPMessage message) { 240 send(0, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message); 241 } 242 243 /** 244 * Method to queue an {@link SSDPMessage} for multicast with delay. 245 * 246 * @param delay Time to delay (in milliseconds) before 247 * sending. 248 * @param message The {@link SSDPMessage} to send. 249 */ 250 public void multicast(long delay, SSDPMessage message) { 251 send(delay, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message); 252 } 253 254 /** 255 * Method to queue an {@link SSDPMessage} for sending without delay. 256 * 257 * @param address The destination {@link SocketAddress}. 258 * @param message The {@link SSDPMessage} to send. 259 */ 260 public void send(SocketAddress address, SSDPMessage message) { 261 send(0, address, message); 262 } 263 264 /** 265 * Method to queue an {@link SSDPMessage} for sending with delay. 266 * 267 * @param delay Time to delay (in milliseconds) before 268 * sending. 269 * @param address The destination {@link SocketAddress}. 270 * @param message The {@link SSDPMessage} to send. 271 */ 272 public void send(long delay, SocketAddress address, SSDPMessage message) { 273 byte[] bytes = message.toString().getBytes(UTF_8); 274 DatagramPacket packet = 275 new DatagramPacket(bytes, 0, bytes.length, address); 276 277 schedule(() -> task(message, packet), delay, MILLISECONDS); 278 } 279 280 private void task(SSDPMessage message, DatagramPacket packet) { 281 try { 282 fireSendEvent(unicast, message); 283 unicast.send(packet); 284 } catch (IOException exception) { 285 } 286 } 287 288 /** 289 * Method to queue a {@link List} of {@link SSDPMessage}s for sending 290 * with a {@code MX} parameter. Messages are sent in list order with 291 * random delays none greater that {@code MX} seconds. 292 * 293 * @param mx Maximum delay (in seconds) before sending. 294 * @param address The destination {@link SocketAddress}. 295 * @param messages The {@link List} of {@link SSDPMessage}s to 296 * send. 297 */ 298 public void send(int mx, 299 SocketAddress address, 300 List<? extends SSDPMessage> messages) { 301 List<Long> delays = 302 messages.stream() 303 .map(t -> random.nextInt((int) SECONDS.toMillis(mx))) 304 .map(t -> SECONDS.toMillis(1) + t) 305 .collect(toList()); 306 307 delays.sort(Comparator.naturalOrder()); 308 309 messages.stream().forEach(t -> send(delays.remove(0), address, t)); 310 } 311 312 private void receive(DatagramSocket socket) { 313 try { 314 socket.setSoTimeout((int) MILLISECONDS.convert(15, SECONDS)); 315 316 for (;;) { 317 try { 318 byte[] bytes = new byte[8 * 1024]; 319 DatagramPacket packet = 320 new DatagramPacket(bytes, bytes.length); 321 322 socket.receive(packet); 323 324 SSDPMessage message = parse(packet); 325 326 if (message != null) { 327 fireReceiveEvent(socket, message); 328 } 329 } catch (SocketTimeoutException exception) { 330 } 331 } 332 } catch (IOException exception) { 333 } 334 } 335 336 private SSDPMessage parse(DatagramPacket packet) { 337 SSDPMessage message = null; 338 339 if (message == null) { 340 try { 341 message = SSDPResponse.from(packet); 342 } catch (ParseException exception) { 343 } 344 } 345 346 if (message == null) { 347 try { 348 message = SSDPRequest.from(packet); 349 } catch (ParseException exception) { 350 } 351 } 352 353 return message; 354 } 355 356 @Override 357 public void shutdown() { 358 advertisers.values().stream().forEach(t -> t.cancel(true)); 359 advertisers.keySet().stream().forEach(t -> byebye(t)); 360 advertisers.clear(); 361 362 super.shutdown(); 363 } 364 365 /** 366 * {@link SSDPDiscoveryService} listener interface definition. 367 */ 368 public interface Listener { 369 370 /** 371 * Callback when a {@link Listener} is added to a 372 * {@link SSDPDiscoveryService}. 373 * 374 * @param service The {@link SSDPDiscoveryService}. 375 */ 376 public void register(SSDPDiscoveryService service); 377 378 /** 379 * Callback when a {@link Listener} is removed from a 380 * {@link SSDPDiscoveryService}. 381 * 382 * @param service The {@link SSDPDiscoveryService}. 383 */ 384 public void unregister(SSDPDiscoveryService service); 385 386 /** 387 * Callback made just before sending a {@link SSDPMessage}. 388 * 389 * @param service The {@link SSDPDiscoveryService}. 390 * @param socket The {@link DatagramSocket}. 391 * @param message The {@link SSDPMessage}. 392 */ 393 public void sendEvent(SSDPDiscoveryService service, 394 DatagramSocket socket, 395 SSDPMessage message); 396 397 /** 398 * Callback made after receiving a {@link SSDPMessage}. 399 * 400 * @param service The {@link SSDPDiscoveryService}. 401 * @param socket The {@link DatagramSocket}. 402 * @param message The {@link SSDPMessage}. 403 */ 404 public void receiveEvent(SSDPDiscoveryService service, 405 DatagramSocket socket, 406 SSDPMessage message); 407 } 408 409 /** 410 * {@link SSDPDiscoveryService} {@link SSDPRequest} handler. 411 */ 412 public static abstract class RequestHandler implements Listener { 413 private final SSDPRequest.Method method; 414 415 /** 416 * Sole constructor. 417 * 418 * @param method The {@link SSDPRequest.Method} to 419 * handle. 420 */ 421 protected RequestHandler(SSDPRequest.Method method) { 422 this.method = Objects.requireNonNull(method); 423 } 424 425 public abstract void run(SSDPDiscoveryService service, 426 DatagramSocket socket, SSDPRequest request); 427 428 @Override 429 public void register(SSDPDiscoveryService service) { } 430 431 @Override 432 public void unregister(SSDPDiscoveryService service) { } 433 434 @Override 435 public void receiveEvent(SSDPDiscoveryService service, 436 DatagramSocket socket, SSDPMessage message) { 437 if (message instanceof SSDPRequest) { 438 SSDPRequest request = (SSDPRequest) message; 439 440 if (method.is(request.getMethod())) { 441 service.submit(() -> run(service, socket, request)); 442 } 443 } 444 } 445 446 @Override 447 public void sendEvent(SSDPDiscoveryService service, 448 DatagramSocket socket, SSDPMessage message) { 449 } 450 } 451 452 /** 453 * {@link SSDPDiscoveryService} {@link SSDPResponse} handler. 454 */ 455 public static abstract class ResponseHandler implements Listener { 456 457 /** 458 * Sole constructor. 459 */ 460 protected ResponseHandler() { } 461 462 public abstract void run(SSDPDiscoveryService service, 463 DatagramSocket socket, SSDPResponse request); 464 465 @Override 466 public void register(SSDPDiscoveryService service) { } 467 468 @Override 469 public void unregister(SSDPDiscoveryService service) { } 470 471 @Override 472 public void receiveEvent(SSDPDiscoveryService service, 473 DatagramSocket socket, SSDPMessage message) { 474 if (message instanceof SSDPResponse) { 475 service.submit(() -> run(service, socket, (SSDPResponse) message)); 476 } 477 } 478 479 @Override 480 public void sendEvent(SSDPDiscoveryService service, 481 DatagramSocket socket, SSDPMessage message) { 482 } 483 } 484 485 @ToString 486 private class MSEARCH extends RequestHandler { 487 public MSEARCH() { super(SSDPRequest.Method.MSEARCH); } 488 489 @Override 490 public void run(SSDPDiscoveryService service, 491 DatagramSocket socket, SSDPRequest request) { 492 try { 493 if (isHeaderValue(request, SSDPMessage.MAN, "\"ssdp:discover\"")) { 494 int mx = request.getMX(); 495 SocketAddress address = request.getSocketAddress(); 496 List<SSDPMessage> list = new LinkedList<>(); 497 URI st = request.getST(); 498 boolean all = SSDPMessage.SSDP_ALL.equals(st); 499 500 advertisers.keySet() 501 .stream() 502 .forEach(device -> device.notify((nt, usn) -> { 503 if (SSDP.matches(st, nt)) { 504 list.add(new MSearch(service, all ? nt : st, usn, device)); 505 } 506 })); 507 508 service.send(mx, address, list); 509 } 510 } catch (Exception exception) { 511 /* log.error("{}", exception.getMessage(), exception); */ 512 } 513 } 514 515 private boolean isHeaderValue(SSDPRequest request, 516 String header, String value) { 517 return Objects.equals(request.getHeaderValue(header), value); 518 } 519 520 private class MSearch extends SSDPResponse { 521 public MSearch(SSDPDiscoveryService service, 522 URI st, URI usn, RootDevice device) { 523 super(SC_OK, "OK"); 524 525 header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge()); 526 header(DATE, GENERATOR.getCurrentDate()); 527 header(EXT, (String) null); 528 header(LOCATION, device.getLocation()); 529 header(SERVER, service.getUserAgent()); 530 header(ST, st); 531 header(USN, usn); 532 header(BOOTID_UPNP_ORG, service.getBootId()); 533 header(CONFIGID_UPNP_ORG, device.getConfigId()); 534 header(SEARCHPORT_UPNP_ORG, service.getSearchPort()); 535 } 536 } 537 } 538 539 private class MSearch extends SSDPRequest { 540 public MSearch(int mx, URI st) { 541 super(Method.MSEARCH); 542 543 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 544 header(MAN, "\"ssdp:discover\""); 545 header(MX, mx); 546 header(ST, st); 547 header(USER_AGENT, getUserAgent()); 548 } 549 } 550 551 private class Alive extends SSDPRequest { 552 public Alive(URI nt, URI usn, RootDevice device) { 553 super(Method.NOTIFY); 554 555 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 556 header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge()); 557 header(NT, nt); 558 header(NTS, SSDP_ALIVE); 559 header(SERVER, getUserAgent()); 560 header(USN, usn); 561 header(LOCATION, device.getLocation()); 562 header(BOOTID_UPNP_ORG, getBootId()); 563 header(CONFIGID_UPNP_ORG, device.getConfigId()); 564 header(SEARCHPORT_UPNP_ORG, getSearchPort()); 565 } 566 } 567 568 private class ByeBye extends SSDPRequest { 569 public ByeBye(URI nt, URI usn, RootDevice device) { 570 super(Method.NOTIFY); 571 572 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 573 header(NT, nt); 574 header(NTS, SSDP_BYEBYE); 575 header(USN, usn); 576 header(BOOTID_UPNP_ORG, getBootId()); 577 header(CONFIGID_UPNP_ORG, device.getConfigId()); 578 } 579 } 580 581 private class Update extends SSDPRequest { 582 public Update(URI nt, URI usn, RootDevice device) { 583 super(Method.NOTIFY); 584 585 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 586 header(LOCATION, device.getLocation()); 587 header(NT, nt); 588 header(NTS, SSDP_UPDATE); 589 header(USN, usn); 590 header(BOOTID_UPNP_ORG, getBootId()); 591 header(CONFIGID_UPNP_ORG, device.getConfigId()); 592 header(NEXTBOOTID_UPNP_ORG, getNextBootId()); 593 header(SEARCHPORT_UPNP_ORG, getSearchPort()); 594 } 595 } 596}