1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.impl.ProtonReceiverImpl; 12 13 import hunt.amqp.ProtonMessageHandler; 14 import hunt.amqp.ProtonReceiver; 15 import hunt.proton.Proton; 16 import hunt.proton.amqp.messaging.Modified; 17 import hunt.proton.amqp.transport.Source; 18 import hunt.proton.codec.CompositeReadableBuffer; 19 import hunt.proton.codec.ReadableBuffer; 20 import hunt.proton.engine.Delivery; 21 import hunt.proton.engine.Receiver; 22 import hunt.proton.engine.Session; 23 import hunt.proton.message.impl.MessageImpl; 24 import hunt.amqp.Handler; 25 import hunt.Object; 26 import hunt.amqp.ProtonHelper; 27 import hunt.amqp.impl.ProtonLinkImpl; 28 import hunt.amqp.impl.ProtonDeliveryImpl; 29 import hunt.Long; 30 import hunt.Exceptions; 31 import hunt.logging; 32 import hunt.Boolean; 33 34 /** 35 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 36 */ 37 class ProtonReceiverImpl : ProtonLinkImpl!ProtonReceiver , ProtonReceiver { 38 39 40 private ProtonMessageHandler _handler; 41 private int prefetch = 1000; 42 private Handler!Void drainCompleteHandler; 43 private Long drainTimeoutTaskId = null; 44 private Session session; 45 private int maxFrameSize; 46 private long sessionIncomingCapacity; 47 private long windowFullThreshhold; 48 49 this(Receiver receiver) { 50 super(receiver); 51 session = receiver.getSession(); 52 sessionIncomingCapacity = session.getIncomingCapacity(); 53 maxFrameSize = session.getConnection().getTransport().getMaxFrameSize(); 54 windowFullThreshhold = sessionIncomingCapacity - maxFrameSize; 55 } 56 57 override 58 public ProtonReceiverImpl self() { 59 return this; 60 } 61 62 private Receiver getReceiver() { 63 return cast(Receiver) link; 64 } 65 66 public int recv(byte[] bytes, int offset, int size) { 67 return getReceiver().recv(bytes, offset, size); 68 } 69 70 override 71 public string getRemoteAddress() { 72 Source remoteSource = getRemoteSource(); 73 74 return remoteSource is null ? null : cast(string)(remoteSource.getAddress().getBytes()); 75 } 76 77 public ProtonReceiver drain(long timeout ,Handler!Void completionHandler) { 78 if (prefetch > 0) { 79 throw new IllegalStateException("Manual credit management not available while prefetch is non-zero"); 80 } 81 82 //if (completionHandler is null) { 83 // throw new IllegalArgumentException("A completion handler must be provided"); 84 //} 85 // 86 //if (drainCompleteHandler !is null) { 87 // throw new IllegalStateException("A previous drain operation has not yet completed"); 88 //} 89 90 if ((getCredit() - getQueued()) <= 0) { 91 // We have no remote credit 92 if (getQueued() == 0) { 93 // All the deliveries have been processed, drain is a no-op, nothing to do but complete. 94 //completionHandler.handle(Future.succeededFuture()); 95 96 } else { 97 // There are still deliveries to process, wait for them to be. 98 setDrainHandlerAndTimeoutTask(timeout); 99 } 100 } else { 101 setDrainHandlerAndTimeoutTask(timeout); 102 103 getReceiver().drain(0); 104 flushConnection(); 105 } 106 107 return this; 108 } 109 110 private void setDrainHandlerAndTimeoutTask(long delay) { 111 implementationMissing(false); 112 // if(delay > 0) { 113 // Vertx vertx = Vertx.currentContext().owner(); 114 // drainTimeoutTaskId = vertx.setTimer(delay, x -> { 115 // drainTimeoutTaskId = null; 116 // drainCompleteHandler = null; 117 // completionHandler.handle(Future.failedFuture("Drain attempt timed out")); 118 // }); 119 // } 120 } 121 122 override 123 public ProtonReceiver flow(int credits) { 124 flow(credits, true); 125 return this; 126 } 127 128 private void flow(int credits, bool checkPrefetch) { 129 if (checkPrefetch && prefetch > 0) { 130 throw new IllegalStateException("Manual credit management not available while prefetch is non-zero"); 131 } 132 133 //if (drainCompleteHandler !is null) { 134 // throw new IllegalStateException("A previous drain operation has not yet completed"); 135 //} 136 137 getReceiver().flow(credits); 138 flushConnection(); 139 } 140 141 public bool draining() { 142 return getReceiver().draining(); 143 } 144 145 public ProtonReceiver setDrain(bool drain) { 146 getReceiver().setDrain(drain); 147 return this; 148 } 149 150 override 151 public ProtonReceiver handler(ProtonMessageHandler handler) { 152 this._handler = handler; 153 onDelivery(); 154 return this; 155 } 156 157 private void flushConnection() { 158 getSession().getConnectionImpl().flush(); 159 } 160 161 ///////////////////////////////////////////////////////////////////////////// 162 // 163 // Implementation details hidden from public api. 164 // 165 ///////////////////////////////////////////////////////////////////////////// 166 167 private bool autoAccept = true; 168 private CompositeReadableBuffer splitContent; 169 170 void onDelivery() { 171 if (this._handler is null) { 172 return; 173 } 174 175 Receiver receiver = getReceiver(); 176 Delivery delivery = receiver.current(); 177 178 if (delivery !is null) { 179 180 if(delivery.isAborted()) { 181 handleAborted(receiver, delivery); 182 return; 183 } 184 185 if (delivery.isPartial()) { 186 handlePartial(receiver, delivery); 187 188 // Delivery is not yet completely received, 189 // return and allow further frames to arrive. 190 return; 191 } 192 193 // Complete prior partial content if needed, or grab it all. 194 ReadableBuffer data = receiver.recv(); 195 if(splitContent !is null) { 196 data = completePartial(data); 197 } 198 199 receiver.advance(); 200 201 MessageImpl msg = cast(MessageImpl) Proton.message(); 202 ProtonDeliveryImpl delImpl = new ProtonDeliveryImpl(delivery); 203 try { 204 msg.decode(data); 205 } catch (Throwable t) { 206 logError("Unable to decode message, undeliverable"); 207 handleDecodeFailure(receiver, delImpl); 208 return; 209 } 210 211 _handler.handle(delImpl, msg); 212 213 if (autoAccept && delivery.getLocalState() is null) { 214 ProtonHelper.accepted(delImpl, true); 215 } 216 217 if (prefetch > 0) { 218 // Replenish credit if prefetch is configured. 219 // TODO: batch credit replenish, optionally flush if exceeding a given threshold? 220 flow(1, false); 221 } else { 222 processForDrainCompletion(); 223 } 224 } 225 } 226 227 private void handleDecodeFailure(Receiver receiver, ProtonDeliveryImpl delImpl) { 228 Modified modified = new Modified(); 229 modified.setDeliveryFailed(new Boolean(true)); 230 modified.setUndeliverableHere(new Boolean(true)); 231 232 delImpl.disposition(modified, true); 233 234 if(!receiver.getDrain()) { 235 flow(1, false); 236 } else { 237 processForDrainCompletion(); 238 } 239 } 240 241 private void handleAborted(Receiver receiver, Delivery delivery) { 242 splitContent = null; 243 244 receiver.advance(); 245 delivery.settle(); 246 247 if(!receiver.getDrain()) { 248 flow(1, false); 249 } else { 250 processForDrainCompletion(); 251 } 252 } 253 254 private void handlePartial( Receiver receiver, Delivery delivery) { 255 if (sessionIncomingCapacity <= 0 || maxFrameSize <= 0 || session.getIncomingBytes() < windowFullThreshhold) { 256 // No window, or there is still capacity, so do nothing. 257 } else { 258 // The session window could be effectively full, we need to 259 // read part of the delivery content to ensure there is 260 // room made for receiving more of the delivery. 261 if(delivery.available() > 0) { 262 ReadableBuffer buff = receiver.recv(); 263 264 if(splitContent is null && cast(CompositeReadableBuffer)buff !is null) { 265 // Its a composite and there is no prior partial content, use it. 266 splitContent = cast(CompositeReadableBuffer) buff; 267 } else { 268 int remaining = buff.remaining(); 269 if(remaining > 0) { 270 if (splitContent is null) { 271 splitContent = new CompositeReadableBuffer(); 272 } 273 274 byte[] chunk = new byte[remaining]; 275 buff.get(chunk); 276 277 splitContent.append(chunk); 278 } 279 } 280 } 281 } 282 } 283 284 private ReadableBuffer completePartial( ReadableBuffer Content) { 285 int pending = Content.remaining(); 286 if(pending > 0) { 287 byte[] chunk = new byte[pending]; 288 Content.get(chunk); 289 290 splitContent.append(chunk); 291 } 292 293 ReadableBuffer data = splitContent; 294 splitContent = null; 295 296 return data; 297 } 298 299 override 300 public bool isAutoAccept() { 301 return autoAccept; 302 } 303 304 override 305 public ProtonReceiver setAutoAccept(bool autoAccept) { 306 this.autoAccept = autoAccept; 307 return this; 308 } 309 310 override 311 public ProtonReceiver setPrefetch(int messages) { 312 if (messages < 0) { 313 throw new IllegalArgumentException("Value must not be negative"); 314 } 315 316 prefetch = messages; 317 return this; 318 } 319 320 override 321 public int getPrefetch() { 322 return prefetch; 323 } 324 325 override 326 public ProtonReceiver open() { 327 super.open(); 328 if (prefetch > 0) { 329 // Grant initial credit if prefetching. 330 flow(prefetch, false); 331 } 332 333 return this; 334 } 335 336 override 337 void handleLinkFlow(){ 338 processForDrainCompletion(); 339 } 340 341 private void processForDrainCompletion() { 342 //implementationMissing(false); 343 Handler!Void h = drainCompleteHandler; 344 if(h !is null && getCredit() <= 0 && getQueued() <= 0) { 345 bool timeoutTaskCleared = false; 346 347 Long timerId = drainTimeoutTaskId; 348 if(timerId !is null) { 349 //Vertx vertx = Vertx.currentContext().owner(); 350 // timeoutTaskCleared = vertx.cancelTimer(timerId); 351 timeoutTaskCleared = true; 352 } else { 353 timeoutTaskCleared = true; 354 } 355 356 drainTimeoutTaskId = null; 357 drainCompleteHandler = null; 358 359 if(timeoutTaskCleared) { 360 h.handle(new Boolean(true)); 361 } 362 } 363 } 364 }