1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.impl.ProtonReceiverImpl;
12 
13 import hunt.amqp.ProtonMessageHandler;
14 import hunt.amqp.ProtonReceiver;
15 import hunt.proton.Proton;
16 import hunt.proton.amqp.messaging.Modified;
17 import hunt.proton.amqp.transport.Source;
18 import hunt.proton.codec.CompositeReadableBuffer;
19 import hunt.proton.codec.ReadableBuffer;
20 import hunt.proton.engine.Delivery;
21 import hunt.proton.engine.Receiver;
22 import hunt.proton.engine.Session;
23 import hunt.proton.message.impl.MessageImpl;
24 import hunt.amqp.Handler;
25 import hunt.Object;
26 import hunt.amqp.ProtonHelper;
27 import hunt.amqp.impl.ProtonLinkImpl;
28 import hunt.amqp.impl.ProtonDeliveryImpl;
29 import hunt.Long;
30 import hunt.Exceptions;
31 import hunt.logging;
32 import hunt.Boolean;
33 
34 /**
35  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
36  */
37 class ProtonReceiverImpl : ProtonLinkImpl!ProtonReceiver , ProtonReceiver {
38 
39 
40   private ProtonMessageHandler _handler;
41   private int prefetch = 1000;
42   private Handler!Void drainCompleteHandler;
43   private Long drainTimeoutTaskId = null;
44   private Session session;
45   private int maxFrameSize;
46   private long sessionIncomingCapacity;
47   private long windowFullThreshhold;
48 
49   this(Receiver receiver) {
50     super(receiver);
51     session = receiver.getSession();
52     sessionIncomingCapacity = session.getIncomingCapacity();
53     maxFrameSize = session.getConnection().getTransport().getMaxFrameSize();
54     windowFullThreshhold = sessionIncomingCapacity - maxFrameSize;
55   }
56 
57   override
58   public ProtonReceiverImpl self() {
59     return this;
60   }
61 
62   private Receiver getReceiver() {
63     return cast(Receiver) link;
64   }
65 
66   public int recv(byte[] bytes, int offset, int size) {
67     return getReceiver().recv(bytes, offset, size);
68   }
69 
70   override
71   public string getRemoteAddress() {
72     Source remoteSource = getRemoteSource();
73 
74     return remoteSource is null ? null : cast(string)(remoteSource.getAddress().getBytes());
75   }
76 
77   public ProtonReceiver drain(long timeout ,Handler!Void completionHandler) {
78     if (prefetch > 0) {
79       throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
80     }
81 
82     //if (completionHandler is null) {
83     //  throw new IllegalArgumentException("A completion handler must be provided");
84     //}
85     //
86     //if (drainCompleteHandler !is null) {
87     //  throw new IllegalStateException("A previous drain operation has not yet completed");
88     //}
89 
90     if ((getCredit() - getQueued()) <= 0) {
91       // We have no remote credit
92       if (getQueued() == 0) {
93         // All the deliveries have been processed, drain is a no-op, nothing to do but complete.
94         //completionHandler.handle(Future.succeededFuture());
95 
96       } else {
97           // There are still deliveries to process, wait for them to be.
98           setDrainHandlerAndTimeoutTask(timeout);
99       }
100     } else {
101       setDrainHandlerAndTimeoutTask(timeout);
102 
103       getReceiver().drain(0);
104       flushConnection();
105     }
106 
107     return this;
108   }
109 
110   private void setDrainHandlerAndTimeoutTask(long delay) {
111     implementationMissing(false);
112    // if(delay > 0) {
113    //   Vertx vertx = Vertx.currentContext().owner();
114    //   drainTimeoutTaskId = vertx.setTimer(delay, x -> {
115    //     drainTimeoutTaskId = null;
116    //     drainCompleteHandler = null;
117    //     completionHandler.handle(Future.failedFuture("Drain attempt timed out"));
118    //   });
119    // }
120   }
121 
122   override
123   public ProtonReceiver flow(int credits) {
124     flow(credits, true);
125     return this;
126   }
127 
128   private void flow(int credits, bool checkPrefetch) {
129     if (checkPrefetch && prefetch > 0) {
130       throw new IllegalStateException("Manual credit management not available while prefetch is non-zero");
131     }
132 
133     //if (drainCompleteHandler !is null) {
134     //  throw new IllegalStateException("A previous drain operation has not yet completed");
135     //}
136 
137     getReceiver().flow(credits);
138     flushConnection();
139   }
140 
141   public bool draining() {
142     return getReceiver().draining();
143   }
144 
145   public ProtonReceiver setDrain(bool drain) {
146     getReceiver().setDrain(drain);
147     return this;
148   }
149 
150   override
151   public ProtonReceiver handler(ProtonMessageHandler handler) {
152     this._handler = handler;
153     onDelivery();
154     return this;
155   }
156 
157   private void flushConnection() {
158     getSession().getConnectionImpl().flush();
159   }
160 
161   /////////////////////////////////////////////////////////////////////////////
162   //
163   // Implementation details hidden from public api.
164   //
165   /////////////////////////////////////////////////////////////////////////////
166 
167   private bool autoAccept = true;
168   private CompositeReadableBuffer splitContent;
169 
170   void onDelivery() {
171     if (this._handler is null) {
172       return;
173     }
174 
175     Receiver receiver = getReceiver();
176     Delivery delivery = receiver.current();
177 
178     if (delivery !is null) {
179 
180       if(delivery.isAborted()) {
181         handleAborted(receiver, delivery);
182         return;
183       }
184 
185       if (delivery.isPartial()) {
186         handlePartial(receiver, delivery);
187 
188         // Delivery is not yet completely received,
189         // return and allow further frames to arrive.
190         return;
191       }
192 
193       // Complete prior partial content if needed, or grab it all.
194       ReadableBuffer data = receiver.recv();
195       if(splitContent !is null) {
196         data = completePartial(data);
197       }
198 
199       receiver.advance();
200 
201       MessageImpl msg = cast(MessageImpl) Proton.message();
202       ProtonDeliveryImpl delImpl = new ProtonDeliveryImpl(delivery);
203       try {
204         msg.decode(data);
205       } catch (Throwable t) {
206         logError("Unable to decode message, undeliverable");
207         handleDecodeFailure(receiver, delImpl);
208         return;
209       }
210 
211       _handler.handle(delImpl, msg);
212 
213       if (autoAccept && delivery.getLocalState() is null) {
214         ProtonHelper.accepted(delImpl, true);
215       }
216 
217       if (prefetch > 0) {
218         // Replenish credit if prefetch is configured.
219         // TODO: batch credit replenish, optionally flush if exceeding a given threshold?
220         flow(1, false);
221       } else {
222         processForDrainCompletion();
223       }
224     }
225   }
226 
227   private void handleDecodeFailure(Receiver receiver, ProtonDeliveryImpl delImpl) {
228     Modified modified = new Modified();
229     modified.setDeliveryFailed(new Boolean(true));
230     modified.setUndeliverableHere(new Boolean(true));
231 
232     delImpl.disposition(modified, true);
233 
234     if(!receiver.getDrain()) {
235       flow(1, false);
236     } else {
237       processForDrainCompletion();
238     }
239   }
240 
241   private void handleAborted(Receiver receiver, Delivery delivery) {
242     splitContent = null;
243 
244     receiver.advance();
245     delivery.settle();
246 
247     if(!receiver.getDrain()) {
248       flow(1, false);
249     } else {
250       processForDrainCompletion();
251     }
252   }
253 
254   private void handlePartial( Receiver receiver,  Delivery delivery) {
255     if (sessionIncomingCapacity <= 0 || maxFrameSize <= 0 || session.getIncomingBytes() < windowFullThreshhold) {
256       // No window, or there is still capacity, so do nothing.
257     } else {
258       // The session window could be effectively full, we need to
259       // read part of the delivery content to ensure there is
260       // room made for receiving more of the delivery.
261       if(delivery.available() > 0) {
262         ReadableBuffer buff = receiver.recv();
263 
264         if(splitContent is null && cast(CompositeReadableBuffer)buff !is null) {
265           // Its a composite and there is no prior partial content, use it.
266           splitContent = cast(CompositeReadableBuffer) buff;
267         } else {
268           int remaining = buff.remaining();
269           if(remaining > 0) {
270             if (splitContent is null) {
271               splitContent = new CompositeReadableBuffer();
272             }
273 
274             byte[] chunk = new byte[remaining];
275             buff.get(chunk);
276 
277             splitContent.append(chunk);
278           }
279         }
280       }
281     }
282   }
283 
284   private ReadableBuffer completePartial( ReadableBuffer Content) {
285     int pending = Content.remaining();
286     if(pending > 0) {
287       byte[] chunk = new byte[pending];
288       Content.get(chunk);
289 
290       splitContent.append(chunk);
291     }
292 
293     ReadableBuffer data = splitContent;
294     splitContent = null;
295 
296     return data;
297   }
298 
299   override
300   public bool isAutoAccept() {
301     return autoAccept;
302   }
303 
304   override
305   public ProtonReceiver setAutoAccept(bool autoAccept) {
306     this.autoAccept = autoAccept;
307     return this;
308   }
309 
310   override
311   public ProtonReceiver setPrefetch(int messages) {
312     if (messages < 0) {
313       throw new IllegalArgumentException("Value must not be negative");
314     }
315 
316     prefetch = messages;
317     return this;
318   }
319 
320   override
321   public int getPrefetch() {
322     return prefetch;
323   }
324 
325   override
326   public ProtonReceiver open() {
327     super.open();
328     if (prefetch > 0) {
329       // Grant initial credit if prefetching.
330       flow(prefetch, false);
331     }
332 
333     return this;
334   }
335 
336   override
337   void handleLinkFlow(){
338     processForDrainCompletion();
339   }
340 
341   private void processForDrainCompletion() {
342     //implementationMissing(false);
343     Handler!Void h = drainCompleteHandler;
344     if(h !is null && getCredit() <= 0 && getQueued() <= 0) {
345       bool timeoutTaskCleared = false;
346 
347       Long timerId = drainTimeoutTaskId;
348       if(timerId !is null) {
349         //Vertx vertx = Vertx.currentContext().owner();
350        // timeoutTaskCleared = vertx.cancelTimer(timerId);
351         timeoutTaskCleared = true;
352       } else {
353         timeoutTaskCleared = true;
354       }
355 
356       drainTimeoutTaskId = null;
357       drainCompleteHandler = null;
358 
359       if(timeoutTaskCleared) {
360         h.handle(new Boolean(true));
361       }
362     }
363   }
364 }