1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.streams.impl.ProtonPublisherImpl; 12 13 import hunt.collection.ArrayList; 14 //import java.util.concurrent.atomic.AtomicBoolean; 15 16 import hunt.proton.amqp.Symbol; 17 import hunt.proton.amqp.messaging.Released; 18 import hunt.proton.amqp.messaging.TerminusDurability; 19 import hunt.proton.amqp.messaging.TerminusExpiryPolicy; 20 import hunt.proton.amqp.transport.Source; 21 import hunt.proton.amqp.transport.Target; 22 import hunt.amqp.streams.Subscriber; 23 import hunt.amqp.streams.Subscription; 24 25 import hunt.amqp.ProtonLinkOptions; 26 import hunt.amqp.ProtonReceiver; 27 import hunt.amqp.impl.ProtonConnectionImpl; 28 import hunt.amqp.streams.Delivery; 29 import hunt.amqp.streams.ProtonPublisher; 30 import hunt.amqp.streams.ProtonPublisherOptions; 31 import hunt.net.Connection; 32 import std.concurrency : initOnce; 33 import hunt.Exceptions; 34 import hunt.amqp.impl.ProtonClientImpl; 35 import hunt.amqp.Handler; 36 import hunt.amqp.ProtonMessageHandler; 37 import hunt.amqp.ProtonDelivery; 38 import hunt.proton.message.Message; 39 import std.algorithm; 40 import hunt.logging; 41 import hunt.Long; 42 import hunt.math.Helper; 43 44 import hunt.proton.amqp.messaging.Source; 45 import hunt.amqp.streams.impl.DeliveryImpl; 46 import hunt.Boolean; 47 48 class ProtonPublisherImpl : ProtonPublisher!Delivery { 49 50 // private static Symbol SHARED = Symbol.valueOf("shared"); 51 // private static Symbol GLOBAL = Symbol.valueOf("global"); 52 53 static Symbol SHARED() { 54 __gshared Symbol inst; 55 return initOnce!inst(Symbol.valueOf("shared")); 56 } 57 58 static Symbol GLOBAL() { 59 __gshared Symbol inst; 60 return initOnce!inst(Symbol.valueOf("global")); 61 } 62 63 private Connection connCtx; 64 private ProtonConnectionImpl conn; 65 // private AtomicBoolean subscribed = new AtomicBoolean(); 66 private bool subscribed ; 67 private AmqpSubscription subscription; 68 private ProtonReceiver receiver; 69 private bool emitOnConnectionEnd = true; 70 private int maxOutstandingCredit = 1000; 71 72 private bool durable; 73 74 this(string address, ProtonConnectionImpl conn, ProtonPublisherOptions options) { 75 this.connCtx = conn.getContext(); 76 this.conn = conn; 77 78 ProtonLinkOptions linkOptions = new ProtonLinkOptions(); 79 if(options.getLinkName() !is null) { 80 linkOptions.setLinkName(options.getLinkName()); 81 } 82 83 receiver = conn.createReceiver(address, linkOptions); 84 receiver.setAutoAccept(false); 85 receiver.setPrefetch(0); 86 87 if(options.getMaxOutstandingCredit() > 0) { 88 maxOutstandingCredit = options.getMaxOutstandingCredit(); 89 } 90 91 //hunt.proton.amqp.messaging.Source 92 hunt.proton.amqp.messaging.Source.Source source = cast(hunt.proton.amqp.messaging.Source.Source) receiver.getSource(); 93 durable = options.isDurable(); 94 if(durable) { 95 source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); 96 source.setDurable(TerminusDurability.UNSETTLED_STATE); 97 } 98 99 if(options.isDynamic()) { 100 source.setAddress(null); 101 source.setDynamic(new Boolean(true)); 102 } 103 104 ArrayList!Symbol capabilities = new ArrayList!Symbol(); 105 if(options.isShared()) { 106 capabilities.add(SHARED); 107 } 108 if(options.isGlobal()) { 109 capabilities.add(GLOBAL); 110 } 111 112 if(!capabilities.isEmpty()) { 113 // Symbol[] caps = capabilities.toArray(new Symbol[capabilities.size()]); 114 source.setCapabilities(capabilities); 115 } 116 } 117 118 119 public void subscribe(Subscriber!Delivery subscriber) { 120 //LOG.trace("Subscribe called"); 121 //Objects.requireNonNull(subscriber, "A subscriber must be supplied"); 122 123 //if(subscribed = (true)) { 124 // throw new IllegalStateException("Only a single susbcriber supported, and subscribe already called."); 125 //} 126 127 //if(subscribed.getAndSet(true)) { 128 // throw new IllegalStateException("Only a single susbcriber supported, and subscribe already called."); 129 //} 130 131 subscription = new AmqpSubscription(subscriber); 132 133 //ConnectionEventBaseHandler handler = cast(ConnectionEventBaseHandler)connCtx.getHandler(); 134 135 // receiver.closeHandler() 136 137 receiver.closeHandler( 138 new class Handler!ProtonReceiver{ 139 void handle(ProtonReceiver o) 140 { 141 receiver.close(); 142 } 143 } 144 ); 145 146 receiver.detachHandler( 147 new class Handler!ProtonReceiver{ 148 void handle(ProtonReceiver o) 149 { 150 receiver.detach(); 151 } 152 } 153 ); 154 155 receiver.openHandler( 156 new class Handler!ProtonReceiver{ 157 void handle(ProtonReceiver o) 158 { 159 subscription.indicateSubscribed(); 160 } 161 } 162 ); 163 164 receiver.handler( 165 new class ProtonMessageHandler{ 166 void handle(ProtonDelivery delivery, Message message) 167 { 168 Delivery envelope = new DeliveryImpl(message, delivery, connCtx); 169 if(!subscription.onNextWrapper(envelope)){ 170 delivery.disposition(Released.getInstance(), true); 171 } 172 } 173 } 174 ); 175 176 receiver.open(); 177 178 179 //connCtx.runOnContext(x-> { 180 // conn.addEndHandler(v -> { 181 // if(emitOnConnectionEnd) { 182 // subscription.indicateError(new Exception("Connection closed: " + conn.getContainer())); 183 // } 184 // }); 185 // 186 // receiver.closeHandler(res-> { 187 // subscription.indicateError(new Exception("Link closed unexpectedly")); 188 // receiver.close(); 189 // }); 190 // 191 // receiver.detachHandler(res-> { 192 // subscription.indicateError(new Exception("Link detached unexpectedly")); 193 // receiver.detach(); 194 // }); 195 // 196 // receiver.openHandler(res -> { 197 // subscription.indicateSubscribed(); 198 // }); 199 // 200 // receiver.handler((delivery, message) -> { 201 // Delivery envelope = new DeliveryImpl(message, delivery, connCtx); 202 // if(!subscription.onNextWrapper(envelope)){ 203 // delivery.disposition(Released.getInstance(), true); 204 // } 205 // }); 206 // 207 // receiver.open(); 208 //}); 209 } 210 211 // ================================================== 212 213 class AmqpSubscription : Subscription { 214 215 private Subscriber!Delivery subcriber; 216 //private AtomicBoolean cancelled = new AtomicBoolean(); 217 //private AtomicBoolean completed = new AtomicBoolean(); 218 private bool cancelled; 219 private bool completed; 220 private long outstandingRequests = 0; 221 222 this(Subscriber!Delivery sub) { 223 this.subcriber = sub; 224 } 225 226 private bool onNextWrapper(Delivery next) { 227 if(!completed&& !cancelled){ 228 subcriber.onNext(next); 229 230 // Now top up credits if still needed 231 outstandingRequests = outstandingRequests - 1; 232 233 if(!cancelled) { 234 int currentCredit = receiver.getCredit(); 235 if(currentCredit < (maxOutstandingCredit * 0.5) && outstandingRequests > currentCredit) { 236 int creditLimit = cast(int) min(outstandingRequests, maxOutstandingCredit); 237 238 int credits = creditLimit - currentCredit; 239 if(credits > 0) { 240 //if (LOG.isTraceEnabled()) { 241 // LOG.trace("Updating credit for outstanding requests: " + credits); 242 //} 243 logInfo("Updating credit for outstanding requests"); 244 flowCreditIfNeeded(credits); 245 } 246 } 247 } 248 249 return true; 250 } else { 251 // LOG.trace("skipped calling onNext, already completed or cancelled"); 252 logInfo("skipped calling onNext, already completed or cancelled"); 253 return false; 254 } 255 } 256 257 258 public void request(long n) { 259 if(n <= 0 && !cancelled) { 260 logError("non-positive subscription request, requests must be > 0"); 261 //connCtx.runOnContext(x -> { 262 // indicateError(new IllegalArgumentException("non-positive subscription request, requests must be > 0")); 263 //}); 264 } else if(!cancelled) { 265 266 if(n == Long.MAX_VALUE) { 267 outstandingRequests = Long.MAX_VALUE; 268 } else { 269 try { 270 outstandingRequests = MathHelper.addExact(n, outstandingRequests); 271 } catch (ArithmeticException ae) { 272 outstandingRequests = Long.MAX_VALUE; 273 } 274 } 275 276 if(cancelled) { 277 // LOG.trace("Not sending more credit, subscription cancelled since request was originally scheduled"); 278 logInfo("Not sending more credit, subscription cancelled since request was originally scheduled"); 279 return; 280 } 281 282 flowCreditIfNeeded(n); 283 284 285 //connCtx.runOnContext(x -> { 286 // if (LOG.isTraceEnabled()) { 287 // LOG.trace("Processing request: " + n); 288 // } 289 // 290 // if(n == Long.MAX_VALUE) { 291 // outstandingRequests = Long.MAX_VALUE; 292 // } else { 293 // try { 294 // outstandingRequests = Math.addExact(n, outstandingRequests); 295 // } catch (ArithmeticException ae) { 296 // outstandingRequests = Long.MAX_VALUE; 297 // } 298 // } 299 // 300 // if(cancelled.get()) { 301 // LOG.trace("Not sending more credit, subscription cancelled since request was originally scheduled"); 302 // return; 303 // } 304 // 305 // flowCreditIfNeeded(n); 306 //}); 307 } 308 } 309 310 private void flowCreditIfNeeded(long n) { 311 int currentCredit = receiver.getCredit(); 312 if(currentCredit < maxOutstandingCredit) { 313 int limit = maxOutstandingCredit - currentCredit; 314 int addedCredit = cast(int) min(n, limit); 315 316 if(addedCredit > 0) { 317 if(!completed) { 318 //if (LOG.isTraceEnabled()) { 319 // LOG.trace("Flowing additional credits : " + addedCredit); 320 //} 321 receiver.flow(addedCredit); 322 } else { 323 logInfo("Skipping flowing additional credits as already completed"); 324 //if (LOG.isTraceEnabled()) { 325 // LOG.trace("Skipping flowing additional credits as already completed: " + addedCredit); 326 //} 327 } 328 } 329 } 330 } 331 332 333 public void cancel() { 334 if (!cancelled) { 335 cancelled = true; 336 337 receiver.closeHandler( 338 new class Handler!ProtonReceiver{ 339 void handle(ProtonReceiver o) 340 { 341 indicateCompletion(); 342 receiver.close(); 343 } 344 } 345 ); 346 347 receiver.detachHandler( 348 new class Handler!ProtonReceiver{ 349 void handle(ProtonReceiver o) 350 { 351 indicateCompletion(); 352 receiver.detach(); 353 } 354 } 355 ); 356 357 if (durable) { 358 receiver.detach(); 359 } else { 360 receiver.close(); 361 } 362 363 364 // connCtx.runOnContext(x -> { 365 // LOG.trace("Cancelling"); 366 // receiver.closeHandler(y -> { 367 // indicateCompletion(); 368 // receiver.close(); 369 // }); 370 // receiver.detachHandler(y -> { 371 // indicateCompletion(); 372 // receiver.detach(); 373 // }); 374 // 375 // if(durable) { 376 // receiver.detach(); 377 // } else { 378 // receiver.close(); 379 // } 380 // }); 381 //} else { 382 // LOG.trace("Cancel no-op, already called."); 383 //} 384 } 385 } 386 private void indicateError(Throwable t) { 387 if(!completed){ 388 completed = true; 389 Subscriber!Delivery sub = subcriber; 390 subcriber = null; 391 if(sub !is null && !cancelled) { 392 //LOG.trace("Indicating error"); 393 logError("Indicating error"); 394 sub.onError(t); 395 } else { 396 //LOG.trace("Skipping error indication, no sub or already cancelled"); 397 logError("Skipping error indication, no sub or already cancelled"); 398 } 399 } 400 else { 401 //LOG.trace("indicateError no-op, already completed"); 402 logInfo("indicateError no-op, already completed"); 403 } 404 } 405 406 private void indicateSubscribed() { 407 if(!completed){ 408 logInfo("Indicating subscribed"); 409 if(subcriber !is null) { 410 subcriber.onSubscribe(this); 411 } 412 } else { 413 logInfo("indicateSubscribed no-op, already completed"); 414 } 415 } 416 417 private void indicateCompletion() { 418 if(!completed){ 419 completed = true; 420 Subscriber!Delivery sub = subcriber; 421 subcriber = null; 422 423 bool canned = cancelled; 424 if(sub !is null && ((outstandingRequests > 0 && canned) || !canned)) { 425 logInfo("Indicating completion"); 426 sub.onComplete(); 427 } else { 428 logInfo("Skipping completion indication"); 429 } 430 } else { 431 logInfo("indicateCompletion no-op, already completed"); 432 } 433 } 434 } 435 436 public bool isEmitOnConnectionEnd() { 437 return emitOnConnectionEnd; 438 } 439 440 public void setEmitOnConnectionEnd(bool emitOnConnectionEnd) { 441 this.emitOnConnectionEnd = emitOnConnectionEnd; 442 } 443 444 public ProtonReceiver getLink() { 445 return receiver; 446 } 447 448 // ================================================== 449 450 451 public ProtonPublisher!Delivery setSource(hunt.proton.amqp.transport.Source.Source source) { 452 receiver.setSource(source); 453 return this; 454 } 455 456 457 public hunt.proton.amqp.transport.Source.Source getSource() { 458 return receiver.getSource(); 459 } 460 461 462 public ProtonPublisher!Delivery setTarget(Target target) { 463 receiver.setTarget(target); 464 return this; 465 } 466 467 468 public Target getTarget() { 469 return receiver.getTarget(); 470 } 471 472 473 public hunt.proton.amqp.transport.Source.Source getRemoteSource() { 474 return receiver.getRemoteSource(); 475 } 476 477 478 public Target getRemoteTarget() { 479 return receiver.getRemoteTarget(); 480 } 481 482 483 public string getRemoteAddress() { 484 hunt.proton.amqp.transport.Source.Source remoteSource = getRemoteSource(); 485 486 return remoteSource is null ? null : (cast(string)(remoteSource.getAddress().getBytes())); 487 } 488 }