001package ball.upnp.ssdp; 002/*- 003 * ########################################################################## 004 * UPnP/SSDP Implementation Classes 005 * %% 006 * Copyright (C) 2013 - 2022 Allen D. Ball 007 * %% 008 * Licensed under the Apache License, Version 2.0 (the "License"); 009 * you may not use this file except in compliance with the License. 010 * You may obtain a copy of the License at 011 * 012 * http://www.apache.org/licenses/LICENSE-2.0 013 * 014 * Unless required by applicable law or agreed to in writing, software 015 * distributed under the License is distributed on an "AS IS" BASIS, 016 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 017 * See the License for the specific language governing permissions and 018 * limitations under the License. 019 * ########################################################################## 020 */ 021import ball.upnp.RootDevice; 022import ball.upnp.SSDP; 023import java.io.IOException; 024import java.net.DatagramPacket; 025import java.net.DatagramSocket; 026import java.net.MulticastSocket; 027import java.net.SocketAddress; 028import java.net.SocketException; 029import java.net.SocketTimeoutException; 030import java.net.URI; 031import java.util.Comparator; 032import java.util.LinkedList; 033import java.util.List; 034import java.util.Objects; 035import java.util.Random; 036import java.util.concurrent.ConcurrentHashMap; 037import java.util.concurrent.CopyOnWriteArrayList; 038import java.util.concurrent.ScheduledFuture; 039import java.util.concurrent.ScheduledThreadPoolExecutor; 040import java.util.stream.Stream; 041import lombok.ToString; 042import org.apache.hc.core5.http.ParseException; 043import org.apache.hc.core5.http.protocol.HttpDateGenerator; 044 045import static java.nio.charset.StandardCharsets.UTF_8; 046import static java.util.concurrent.TimeUnit.MILLISECONDS; 047import static java.util.concurrent.TimeUnit.SECONDS; 048import static java.util.stream.Collectors.joining; 049import static java.util.stream.Collectors.toList; 050import static org.apache.commons.lang3.StringUtils.EMPTY; 051import static org.apache.commons.lang3.StringUtils.SPACE; 052import static org.apache.hc.core5.http.HttpStatus.SC_OK; 053 054/** 055 * SSDP discovery {@link ScheduledThreadPoolExecutor} implementation. 056 * 057 * {@bean.info} 058 * 059 * @author {@link.uri mailto:ball@hcf.dev Allen D. Ball} 060 */ 061public class SSDPDiscoveryService extends ScheduledThreadPoolExecutor { 062 private static final String OS = 063 Stream.of("os.name", "os.version") 064 .map(System::getProperty) 065 .map(t -> t.replaceAll("[\\p{Space}]+", EMPTY)) 066 .collect(joining("/")); 067 private static final String UPNP = "UPnP/2.0"; 068 069 private final String server; 070 private final int bootId = (int) (System.currentTimeMillis() / 1000); 071 private final Random random = new Random(); 072 private final MulticastSocket multicast; 073 private final DatagramSocket unicast; 074 private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>(); 075 private final ConcurrentHashMap<RootDevice,ScheduledFuture<?>> advertisers = new ConcurrentHashMap<>(); 076 077 /** 078 * Sole constructor. 079 * 080 * @param product The {@code product/version} {@link String} 081 * identifying this UPnP application. 082 * 083 * @throws IOException If the underlying {@link MulticastSocket} 084 * cannot be conditioned. 085 */ 086 public SSDPDiscoveryService(String product) throws IOException { 087 super(8); 088 089 server = 090 Stream.of(OS, UPNP, product) 091 .filter(Objects::nonNull) 092 .collect(joining(SPACE)); 093 094 random.setSeed(System.currentTimeMillis()); 095 096 multicast = new SSDPMulticastSocket(); 097 /* 098 * Bind to an {@link.rfc 4340} ephemeral port. 099 */ 100 DatagramSocket socket = null; 101 List<Integer> ports = random.ints(49152, 65536).limit(256).boxed().collect(toList()); 102 103 for (;;) { 104 try { 105 socket = new DatagramSocket(ports.remove(0)); 106 break; 107 } catch (SocketException exception) { 108 if (ports.isEmpty()) { 109 throw exception; 110 } else { 111 continue; 112 } 113 } 114 } 115 116 unicast = socket; 117 118 addListener(new MSEARCH()); 119 120 submit(() -> receive(multicast)); 121 submit(() -> receive(unicast)); 122 } 123 124 /** 125 * {@code SERVER} and {@code USER-AGENT} 126 * 127 * @return {@code SERVER} and {@code USER-AGENT} 128 */ 129 public String getUserAgent() { return server; } 130 131 /** 132 * {@code BOOTID.UPNP.ORG} 133 * 134 * @return {@code bootId} 135 */ 136 public int getBootId() { return bootId; } 137 138 /** 139 * {@code NEXTBOOTID.UPNP.ORG} 140 * 141 * @return {@code nextBootId} 142 */ 143 public int getNextBootId() { throw new UnsupportedOperationException(); } 144 145 /** 146 * {@code SEARCHPORT.UPNP.ORG} 147 * 148 * @return Search port. 149 */ 150 public int getSearchPort() { return unicast.getLocalPort(); } 151 152 /** 153 * Method to add a {@link Listener}. 154 * 155 * @param listener The {@link Listener}. 156 * 157 * @return {@link.this} 158 */ 159 public SSDPDiscoveryService addListener(Listener listener) { 160 if ((! listeners.contains(listener)) && listeners.add(listener)) { 161 listener.register(this); 162 } 163 164 return this; 165 } 166 167 /** 168 * Method to remove a {@link Listener}. 169 * 170 * @param listener The {@link Listener}. 171 * 172 * @return {@link.this} 173 */ 174 public SSDPDiscoveryService removeListener(Listener listener) { 175 if (listeners.remove(listener)) { 176 listener.register(this); 177 } 178 179 return this; 180 } 181 182 private void fireSendEvent(DatagramSocket socket, SSDPMessage message) { 183 listeners.stream().forEach(t -> t.sendEvent(this, socket, message)); 184 } 185 186 private void fireReceiveEvent(DatagramSocket socket, SSDPMessage message) { 187 listeners.stream().forEach(t -> t.receiveEvent(this, socket, message)); 188 } 189 190 /** 191 * Method to add a {@link RootDevice} to advertise. 192 * 193 * @param device The {@link RootDevice} to advertise. 194 * @param rate The rate (in seconds) to repeat 195 * advertisements. 196 * 197 * @return {@link.this} 198 */ 199 public SSDPDiscoveryService advertise(RootDevice device, int rate) { 200 ScheduledFuture<?> future = scheduleAtFixedRate(() -> alive(device), advertisers.size(), rate, SECONDS); 201 202 future = advertisers.put(device, future); 203 204 if (future != null) { 205 future.cancel(true); 206 } 207 208 return this; 209 } 210 211 private void alive(RootDevice device) { 212 device.notify((nt, usn) -> multicast(new Alive(nt, usn, device))); 213 } 214 215 private void byebye(RootDevice device) { 216 device.notify((nt, usn) -> multicast(new ByeBye(nt, usn, device))); 217 } 218 219 /** 220 * Send multicast {@code M-SEARCH} messsage. 221 * 222 * @param mx The {@code MX} header value. 223 * @param st The {@code ST} header value. 224 */ 225 public void msearch(int mx, URI st) { multicast(0, new MSearch(mx, st)); } 226 227 /** 228 * Method to queue an {@link SSDPMessage} for multicast without delay. 229 * 230 * @param message The {@link SSDPMessage} to send. 231 */ 232 public void multicast(SSDPMessage message) { 233 send(0, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message); 234 } 235 236 /** 237 * Method to queue an {@link SSDPMessage} for multicast with delay. 238 * 239 * @param delay Time to delay (in milliseconds) before 240 * sending. 241 * @param message The {@link SSDPMessage} to send. 242 */ 243 public void multicast(long delay, SSDPMessage message) { 244 send(delay, SSDPMulticastSocket.INET_SOCKET_ADDRESS, message); 245 } 246 247 /** 248 * Method to queue an {@link SSDPMessage} for sending without delay. 249 * 250 * @param address The destination {@link SocketAddress}. 251 * @param message The {@link SSDPMessage} to send. 252 */ 253 public void send(SocketAddress address, SSDPMessage message) { 254 send(0, address, message); 255 } 256 257 /** 258 * Method to queue an {@link SSDPMessage} for sending with delay. 259 * 260 * @param delay Time to delay (in milliseconds) before 261 * sending. 262 * @param address The destination {@link SocketAddress}. 263 * @param message The {@link SSDPMessage} to send. 264 */ 265 public void send(long delay, SocketAddress address, SSDPMessage message) { 266 byte[] bytes = message.toString().getBytes(UTF_8); 267 DatagramPacket packet = new DatagramPacket(bytes, 0, bytes.length, address); 268 269 schedule(() -> task(message, packet), delay, MILLISECONDS); 270 } 271 272 private void task(SSDPMessage message, DatagramPacket packet) { 273 try { 274 fireSendEvent(unicast, message); 275 unicast.send(packet); 276 } catch (IOException exception) { 277 } 278 } 279 280 /** 281 * Method to queue a {@link List} of {@link SSDPMessage}s for sending 282 * with a {@code MX} parameter. Messages are sent in list order with 283 * random delays none greater that {@code MX} seconds. 284 * 285 * @param mx Maximum delay (in seconds) before sending. 286 * @param address The destination {@link SocketAddress}. 287 * @param messages The {@link List} of {@link SSDPMessage}s to 288 * send. 289 */ 290 public void send(int mx, SocketAddress address, List<? extends SSDPMessage> messages) { 291 List<Long> delays = 292 messages.stream() 293 .map(t -> random.nextInt((int) SECONDS.toMillis(mx))) 294 .map(t -> SECONDS.toMillis(1) + t) 295 .collect(toList()); 296 297 delays.sort(Comparator.naturalOrder()); 298 299 messages.stream().forEach(t -> send(delays.remove(0), address, t)); 300 } 301 302 private void receive(DatagramSocket socket) { 303 try { 304 socket.setSoTimeout((int) MILLISECONDS.convert(15, SECONDS)); 305 306 for (;;) { 307 try { 308 byte[] bytes = new byte[8 * 1024]; 309 DatagramPacket packet = new DatagramPacket(bytes, bytes.length); 310 311 socket.receive(packet); 312 313 SSDPMessage message = parse(packet); 314 315 if (message != null) { 316 fireReceiveEvent(socket, message); 317 } 318 } catch (SocketTimeoutException exception) { 319 } 320 } 321 } catch (IOException exception) { 322 } 323 } 324 325 private SSDPMessage parse(DatagramPacket packet) { 326 SSDPMessage message = null; 327 328 if (message == null) { 329 try { 330 message = SSDPResponse.from(packet); 331 } catch (ParseException exception) { 332 } 333 } 334 335 if (message == null) { 336 try { 337 message = SSDPRequest.from(packet); 338 } catch (ParseException exception) { 339 } 340 } 341 342 return message; 343 } 344 345 @Override 346 public void shutdown() { 347 advertisers.values().stream().forEach(t -> t.cancel(true)); 348 advertisers.keySet().stream().forEach(t -> byebye(t)); 349 advertisers.clear(); 350 351 super.shutdown(); 352 } 353 354 /** 355 * {@link SSDPDiscoveryService} listener interface definition. 356 */ 357 public interface Listener { 358 359 /** 360 * Callback when a {@link Listener} is added to a 361 * {@link SSDPDiscoveryService}. 362 * 363 * @param service The {@link SSDPDiscoveryService}. 364 */ 365 public void register(SSDPDiscoveryService service); 366 367 /** 368 * Callback when a {@link Listener} is removed from a 369 * {@link SSDPDiscoveryService}. 370 * 371 * @param service The {@link SSDPDiscoveryService}. 372 */ 373 public void unregister(SSDPDiscoveryService service); 374 375 /** 376 * Callback made just before sending a {@link SSDPMessage}. 377 * 378 * @param service The {@link SSDPDiscoveryService}. 379 * @param socket The {@link DatagramSocket}. 380 * @param message The {@link SSDPMessage}. 381 */ 382 public void sendEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message); 383 384 /** 385 * Callback made after receiving a {@link SSDPMessage}. 386 * 387 * @param service The {@link SSDPDiscoveryService}. 388 * @param socket The {@link DatagramSocket}. 389 * @param message The {@link SSDPMessage}. 390 */ 391 public void receiveEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message); 392 } 393 394 /** 395 * {@link SSDPDiscoveryService} {@link SSDPRequest} handler. 396 */ 397 public static abstract class RequestHandler implements Listener { 398 private final SSDPRequest.Method method; 399 400 /** 401 * Sole constructor. 402 * 403 * @param method The {@link SSDPRequest.Method} to 404 * handle. 405 */ 406 protected RequestHandler(SSDPRequest.Method method) { 407 this.method = Objects.requireNonNull(method); 408 } 409 410 public abstract void run(SSDPDiscoveryService service, DatagramSocket socket, SSDPRequest request); 411 412 @Override 413 public void register(SSDPDiscoveryService service) { } 414 415 @Override 416 public void unregister(SSDPDiscoveryService service) { } 417 418 @Override 419 public void receiveEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) { 420 if (message instanceof SSDPRequest) { 421 SSDPRequest request = (SSDPRequest) message; 422 423 if (method.is(request.getMethod())) { 424 service.submit(() -> run(service, socket, request)); 425 } 426 } 427 } 428 429 @Override 430 public void sendEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) { 431 } 432 } 433 434 /** 435 * {@link SSDPDiscoveryService} {@link SSDPResponse} handler. 436 */ 437 public static abstract class ResponseHandler implements Listener { 438 439 /** 440 * Sole constructor. 441 */ 442 protected ResponseHandler() { } 443 444 public abstract void run(SSDPDiscoveryService service, DatagramSocket socket, SSDPResponse request); 445 446 @Override 447 public void register(SSDPDiscoveryService service) { } 448 449 @Override 450 public void unregister(SSDPDiscoveryService service) { } 451 452 @Override 453 public void receiveEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) { 454 if (message instanceof SSDPResponse) { 455 service.submit(() -> run(service, socket, (SSDPResponse) message)); 456 } 457 } 458 459 @Override 460 public void sendEvent(SSDPDiscoveryService service, DatagramSocket socket, SSDPMessage message) { 461 } 462 } 463 464 @ToString 465 private class MSEARCH extends RequestHandler { 466 public MSEARCH() { super(SSDPRequest.Method.MSEARCH); } 467 468 @Override 469 public void run(SSDPDiscoveryService service, DatagramSocket socket, SSDPRequest request) { 470 try { 471 if (isHeaderValue(request, SSDPMessage.MAN, "\"ssdp:discover\"")) { 472 int mx = request.getMX(); 473 SocketAddress address = request.getSocketAddress(); 474 List<SSDPMessage> list = new LinkedList<>(); 475 URI st = request.getST(); 476 boolean all = SSDPMessage.SSDP_ALL.equals(st); 477 478 advertisers.keySet().stream() 479 .forEach(device -> device.notify((nt, usn) -> { 480 if (SSDP.matches(st, nt)) { 481 list.add(new MSearch(service, all ? nt : st, usn, device)); 482 } 483 })); 484 485 service.send(mx, address, list); 486 } 487 } catch (Exception exception) { 488 /* log.error("{}", exception.getMessage(), exception); */ 489 } 490 } 491 492 private boolean isHeaderValue(SSDPRequest request, String header, String value) { 493 return Objects.equals(request.getHeaderValue(header), value); 494 } 495 496 private class MSearch extends SSDPResponse { 497 private static final long serialVersionUID = -2963023442177743880L; 498 499 public MSearch(SSDPDiscoveryService service, URI st, URI usn, RootDevice device) { 500 super(SC_OK, "OK"); 501 502 header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge()); 503 header(DATE, HttpDateGenerator.INSTANCE.getCurrentDate()); 504 header(EXT, (String) null); 505 header(LOCATION, device.getLocation()); 506 header(SERVER, service.getUserAgent()); 507 header(ST, st); 508 header(USN, usn); 509 header(BOOTID_UPNP_ORG, service.getBootId()); 510 header(CONFIGID_UPNP_ORG, device.getConfigId()); 511 header(SEARCHPORT_UPNP_ORG, service.getSearchPort()); 512 } 513 } 514 } 515 516 private class MSearch extends SSDPRequest { 517 private static final long serialVersionUID = -7606319039115145787L; 518 519 public MSearch(int mx, URI st) { 520 super(Method.MSEARCH); 521 522 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 523 header(MAN, "\"ssdp:discover\""); 524 header(MX, mx); 525 header(ST, st); 526 header(USER_AGENT, getUserAgent()); 527 } 528 } 529 530 private class Alive extends SSDPRequest { 531 private static final long serialVersionUID = 1405796530322683498L; 532 533 public Alive(URI nt, URI usn, RootDevice device) { 534 super(Method.NOTIFY); 535 536 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 537 header(CACHE_CONTROL, MAX_AGE + "=" + device.getMaxAge()); 538 header(NT, nt); 539 header(NTS, SSDP_ALIVE); 540 header(SERVER, getUserAgent()); 541 header(USN, usn); 542 header(LOCATION, device.getLocation()); 543 header(BOOTID_UPNP_ORG, getBootId()); 544 header(CONFIGID_UPNP_ORG, device.getConfigId()); 545 header(SEARCHPORT_UPNP_ORG, getSearchPort()); 546 } 547 } 548 549 private class ByeBye extends SSDPRequest { 550 private static final long serialVersionUID = -4058077320857161188L; 551 552 public ByeBye(URI nt, URI usn, RootDevice device) { 553 super(Method.NOTIFY); 554 555 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 556 header(NT, nt); 557 header(NTS, SSDP_BYEBYE); 558 header(USN, usn); 559 header(BOOTID_UPNP_ORG, getBootId()); 560 header(CONFIGID_UPNP_ORG, device.getConfigId()); 561 } 562 } 563 564 private class Update extends SSDPRequest { 565 private static final long serialVersionUID = -3155461457458248824L; 566 567 public Update(URI nt, URI usn, RootDevice device) { 568 super(Method.NOTIFY); 569 570 header(HOST, SSDPMulticastSocket.INET_SOCKET_ADDRESS); 571 header(LOCATION, device.getLocation()); 572 header(NT, nt); 573 header(NTS, SSDP_UPDATE); 574 header(USN, usn); 575 header(BOOTID_UPNP_ORG, getBootId()); 576 header(CONFIGID_UPNP_ORG, device.getConfigId()); 577 header(NEXTBOOTID_UPNP_ORG, getNextBootId()); 578 header(SEARCHPORT_UPNP_ORG, getSearchPort()); 579 } 580 } 581}