1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.streams.impl.ProtonSubscriberImpl; 12 13 14 import hunt.proton.amqp.transport.Source; 15 import hunt.proton.amqp.transport.Target; 16 import hunt.amqp.streams.Subscription; 17 18 import hunt.amqp.Handler; 19 import hunt.amqp.ProtonDelivery; 20 import hunt.amqp.ProtonLinkOptions; 21 import hunt.amqp.ProtonSender; 22 import hunt.amqp.impl.ProtonConnectionImpl; 23 import hunt.amqp.impl.ProtonDeliveryImpl; 24 import hunt.amqp.streams.ProtonSubscriber; 25 import hunt.amqp.streams.ProtonSubscriberOptions; 26 import hunt.amqp.streams.Tracker; 27 import hunt.net.Connection; 28 import hunt.Exceptions; 29 import hunt.logging; 30 import hunt.Object; 31 import hunt.amqp.streams.impl.TrackerImpl; 32 33 class ProtonSubscriberImpl : ProtonSubscriber!Tracker { 34 35 private Subscription sub; 36 private Connection connCtx; 37 private ProtonConnectionImpl conn; 38 private ProtonSender sender; 39 //private AtomicBoolean subscribed = new AtomicBoolean(); 40 //private AtomicBoolean completed = new AtomicBoolean(); 41 //private AtomicBoolean cancelledSub = new AtomicBoolean(); 42 43 private bool subscribed ; 44 private bool completed ; 45 private bool cancelledSub ; 46 private bool emitOnConnectionEnd = true; 47 private long outstandingRequests = 0; 48 49 this(string address, ProtonConnectionImpl conn) { 50 this(address, conn, new ProtonSubscriberOptions()); 51 } 52 53 this(string address, ProtonConnectionImpl conn, ProtonSubscriberOptions options) { 54 this.connCtx = conn.getContext(); 55 this.conn = conn; 56 57 ProtonLinkOptions linkOptions = new ProtonLinkOptions(); 58 if(options.getLinkName() !is null) { 59 linkOptions.setLinkName(options.getLinkName()); 60 } 61 62 sender = conn.createSender(address, linkOptions); 63 sender.setAutoDrained(false); 64 } 65 66 67 public void onSubscribe(Subscription subscription) { 68 // Objects.requireNonNull(subscription, "A subscription must be supplied"); 69 70 if(subscribed) { 71 subscribed = true; 72 logInfo("Only a single Subscription is supported and already subscribed, cancelling new subscriber."); 73 subscription.cancel(); 74 return; 75 } 76 77 this.sub = subscription; 78 79 conn.addEndHandler( 80 new class Handler!Void{ 81 void handle(Void o) 82 { 83 if(emitOnConnectionEnd) { 84 cancelSub(); 85 } 86 } 87 } 88 ); 89 90 sender.sendQueueDrainHandler( 91 new class Handler!ProtonSender{ 92 void handle(ProtonSender o) 93 { 94 if(!completed && !cancelledSub) { 95 long credit = sender.getCredit(); 96 long newRequests = credit - outstandingRequests; 97 98 if(newRequests > 0) { 99 outstandingRequests += newRequests; 100 sub.request(newRequests); 101 } 102 } 103 } 104 } 105 ); 106 107 sender.detachHandler( 108 new class Handler!ProtonSender{ 109 void handle(ProtonSender o) 110 { 111 cancelSub(); 112 sender.detach(); 113 } 114 } 115 ); 116 117 sender.closeHandler( 118 new class Handler!ProtonSender{ 119 void handle(ProtonSender o) 120 { 121 cancelSub(); 122 sender.close(); 123 } 124 } 125 ); 126 127 sender.open(); 128 129 130 //connCtx.runOnContext(x-> { 131 // conn.addEndHandler(v -> { 132 // if(emitOnConnectionEnd) { 133 // cancelSub(); 134 // } 135 // }); 136 // 137 // sender.sendQueueDrainHandler(sender -> { 138 // if(!completed.get() && !cancelledSub.get()) { 139 // long credit = sender.getCredit(); 140 // long newRequests = credit - outstandingRequests; 141 // 142 // if(newRequests > 0) { 143 // outstandingRequests += newRequests; 144 // sub.request(newRequests); 145 // } 146 // } 147 // }); 148 // 149 // sender.detachHandler(res-> { 150 // cancelSub(); 151 // sender.detach(); 152 // }); 153 // 154 // sender.closeHandler(res-> { 155 // cancelSub(); 156 // sender.close(); 157 // }); 158 // 159 // sender.openHandler(res -> { 160 // LOG.trace("Attach received"); 161 // }); 162 // 163 // sender.open(); 164 //}); 165 } 166 167 private void cancelSub() { 168 if(!cancelledSub) { 169 cancelledSub = true; 170 sub.cancel(); 171 } 172 } 173 174 175 public void onNext(Tracker tracker) { 176 // Objects.requireNonNull(tracker, "An element must be supplied when calling onNext"); 177 178 if(!completed) { 179 outstandingRequests--; 180 TrackerImpl env = cast(TrackerImpl) tracker; 181 ProtonDelivery delivery = sender.send(tracker.message(), new class Handler!ProtonDelivery 182 { 183 void handle(ProtonDelivery var1) 184 { 185 Handler!Tracker h = env.handler(); 186 if(h !is null) { 187 h.handle(env); 188 } 189 } 190 }); 191 env.setDelivery(cast(ProtonDeliveryImpl) delivery); 192 } 193 } 194 195 196 public void onError(Throwable t) { 197 198 if(!completed) { 199 completed = true; 200 sender.sendQueueDrainHandler(null); 201 sender.detachHandler(null); 202 sender.closeHandler(null); 203 } 204 } 205 206 207 public void onComplete() { 208 if(!completed) { 209 completed = true; 210 sender.sendQueueDrainHandler(null); 211 sender.detachHandler(null); 212 sender.closeHandler(null); 213 sender.close(); 214 } 215 } 216 217 218 public ProtonSubscriber!Tracker setSource(Source source) { 219 sender.setSource(source); 220 return this; 221 } 222 223 224 public Source getSource() { 225 return sender.getSource(); 226 } 227 228 229 public ProtonSubscriber!Tracker setTarget(Target target) { 230 sender.setTarget(target); 231 return this; 232 } 233 234 235 public Target getTarget() { 236 return sender.getTarget(); 237 } 238 239 public Source getRemoteSource() { 240 return sender.getRemoteSource(); 241 } 242 243 public Target getRemoteTarget() { 244 return sender.getRemoteTarget(); 245 } 246 247 public bool isEmitOnConnectionEnd() { 248 return emitOnConnectionEnd; 249 } 250 251 public void setEmitOnConnectionEnd(bool emitOnConnectionEnd) { 252 this.emitOnConnectionEnd = emitOnConnectionEnd; 253 } 254 255 public ProtonSender getLink() { 256 return sender; 257 } 258 }