1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.streams.impl.ProtonPublisherImpl;
12 
13 import hunt.collection.ArrayList;
14 //import java.util.concurrent.atomic.AtomicBoolean;
15 
16 import hunt.proton.amqp.Symbol;
17 import hunt.proton.amqp.messaging.Released;
18 import hunt.proton.amqp.messaging.TerminusDurability;
19 import hunt.proton.amqp.messaging.TerminusExpiryPolicy;
20 import hunt.proton.amqp.transport.Source;
21 import hunt.proton.amqp.transport.Target;
22 import hunt.amqp.streams.Subscriber;
23 import hunt.amqp.streams.Subscription;
24 
25 import hunt.amqp.ProtonLinkOptions;
26 import hunt.amqp.ProtonReceiver;
27 import hunt.amqp.impl.ProtonConnectionImpl;
28 import hunt.amqp.streams.Delivery;
29 import hunt.amqp.streams.ProtonPublisher;
30 import hunt.amqp.streams.ProtonPublisherOptions;
31 import hunt.net.Connection;
32 import std.concurrency : initOnce;
33 import hunt.Exceptions;
34 import hunt.amqp.impl.ProtonClientImpl;
35 import hunt.amqp.Handler;
36 import hunt.amqp.ProtonMessageHandler;
37 import hunt.amqp.ProtonDelivery;
38 import hunt.proton.message.Message;
39 import std.algorithm;
40 import hunt.logging;
41 import hunt.Long;
42 import hunt.math.Helper;
43 
44 import hunt.proton.amqp.messaging.Source;
45 import hunt.amqp.streams.impl.DeliveryImpl;
46 import hunt.Boolean;
47 
48 class ProtonPublisherImpl : ProtonPublisher!Delivery {
49 
50  // private static  Symbol SHARED = Symbol.valueOf("shared");
51  // private static  Symbol GLOBAL = Symbol.valueOf("global");
52 
53    static Symbol  SHARED() {
54      __gshared Symbol  inst;
55      return initOnce!inst(Symbol.valueOf("shared"));
56    }
57 
58    static Symbol  GLOBAL() {
59      __gshared Symbol  inst;
60      return initOnce!inst(Symbol.valueOf("global"));
61    }
62 
63   private Connection connCtx;
64   private  ProtonConnectionImpl conn;
65  // private  AtomicBoolean subscribed = new AtomicBoolean();
66   private bool subscribed ;
67   private AmqpSubscription subscription;
68   private ProtonReceiver receiver;
69   private bool emitOnConnectionEnd = true;
70   private int maxOutstandingCredit = 1000;
71 
72   private bool durable;
73 
74   this(string address, ProtonConnectionImpl conn, ProtonPublisherOptions options) {
75     this.connCtx = conn.getContext();
76     this.conn = conn;
77 
78     ProtonLinkOptions linkOptions = new ProtonLinkOptions();
79     if(options.getLinkName() !is null) {
80       linkOptions.setLinkName(options.getLinkName());
81     }
82 
83     receiver = conn.createReceiver(address, linkOptions);
84     receiver.setAutoAccept(false);
85     receiver.setPrefetch(0);
86 
87     if(options.getMaxOutstandingCredit() > 0) {
88       maxOutstandingCredit = options.getMaxOutstandingCredit();
89     }
90 
91     //hunt.proton.amqp.messaging.Source
92     hunt.proton.amqp.messaging.Source.Source source = cast(hunt.proton.amqp.messaging.Source.Source) receiver.getSource();
93     durable = options.isDurable();
94     if(durable) {
95       source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
96       source.setDurable(TerminusDurability.UNSETTLED_STATE);
97     }
98 
99     if(options.isDynamic()) {
100       source.setAddress(null);
101       source.setDynamic(new Boolean(true));
102     }
103 
104     ArrayList!Symbol capabilities = new ArrayList!Symbol();
105     if(options.isShared()) {
106       capabilities.add(SHARED);
107     }
108     if(options.isGlobal()) {
109       capabilities.add(GLOBAL);
110     }
111 
112     if(!capabilities.isEmpty()) {
113     //  Symbol[] caps = capabilities.toArray(new Symbol[capabilities.size()]);
114       source.setCapabilities(capabilities);
115     }
116   }
117 
118   
119   public void subscribe(Subscriber!Delivery subscriber) {
120     //LOG.trace("Subscribe called");
121     //Objects.requireNonNull(subscriber, "A subscriber must be supplied");
122 
123         //if(subscribed = (true)) {
124         //  throw new IllegalStateException("Only a single susbcriber supported, and subscribe already called.");
125         //}
126 
127     //if(subscribed.getAndSet(true)) {
128     //  throw new IllegalStateException("Only a single susbcriber supported, and subscribe already called.");
129     //}
130 
131     subscription = new AmqpSubscription(subscriber);
132 
133     //ConnectionEventBaseHandler handler =  cast(ConnectionEventBaseHandler)connCtx.getHandler();
134 
135    // receiver.closeHandler()
136 
137     receiver.closeHandler(
138       new class Handler!ProtonReceiver{
139           void handle(ProtonReceiver o)
140           {
141             receiver.close();
142           }
143       }
144     );
145 
146     receiver.detachHandler(
147       new class Handler!ProtonReceiver{
148         void handle(ProtonReceiver o)
149         {
150           receiver.detach();
151         }
152       }
153     );
154 
155     receiver.openHandler(
156       new class Handler!ProtonReceiver{
157         void handle(ProtonReceiver o)
158         {
159           subscription.indicateSubscribed();
160         }
161       }
162     );
163 
164    receiver.handler(
165        new class ProtonMessageHandler{
166          void handle(ProtonDelivery delivery, Message message)
167          {
168            Delivery envelope = new DeliveryImpl(message, delivery, connCtx);
169            if(!subscription.onNextWrapper(envelope)){
170              delivery.disposition(Released.getInstance(), true);
171            }
172          }
173        }
174    );
175 
176      receiver.open();
177 
178 
179     //connCtx.runOnContext(x-> {
180     //  conn.addEndHandler(v -> {
181     //    if(emitOnConnectionEnd) {
182     //      subscription.indicateError(new Exception("Connection closed: " + conn.getContainer()));
183     //    }
184     //  });
185     //
186     //  receiver.closeHandler(res-> {
187     //    subscription.indicateError(new Exception("Link closed unexpectedly"));
188     //    receiver.close();
189     //  });
190     //
191     //  receiver.detachHandler(res-> {
192     //    subscription.indicateError(new Exception("Link detached unexpectedly"));
193     //    receiver.detach();
194     //  });
195     //
196     //  receiver.openHandler(res -> {
197     //    subscription.indicateSubscribed();
198     //  });
199     //
200     //  receiver.handler((delivery, message) -> {
201     //    Delivery envelope = new DeliveryImpl(message, delivery, connCtx);
202     //    if(!subscription.onNextWrapper(envelope)){
203     //      delivery.disposition(Released.getInstance(), true);
204     //    }
205     //  });
206     //
207     //  receiver.open();
208     //});
209   }
210 
211   // ==================================================
212 
213   class AmqpSubscription : Subscription {
214 
215     private Subscriber!Delivery subcriber;
216     //private  AtomicBoolean cancelled = new AtomicBoolean();
217     //private  AtomicBoolean completed = new AtomicBoolean();
218     private bool cancelled;
219     private bool completed;
220     private long outstandingRequests = 0;
221 
222     this(Subscriber!Delivery sub) {
223       this.subcriber = sub;
224     }
225 
226     private bool onNextWrapper(Delivery next) {
227       if(!completed&& !cancelled){
228         subcriber.onNext(next);
229 
230         // Now top up credits if still needed
231         outstandingRequests = outstandingRequests - 1;
232 
233         if(!cancelled) {
234           int currentCredit = receiver.getCredit();
235           if(currentCredit < (maxOutstandingCredit * 0.5) && outstandingRequests > currentCredit) {
236             int creditLimit = cast(int) min(outstandingRequests, maxOutstandingCredit);
237 
238             int credits = creditLimit - currentCredit;
239             if(credits > 0) {
240               //if (LOG.isTraceEnabled()) {
241               //  LOG.trace("Updating credit for outstanding requests: " + credits);
242               //}
243               logInfo("Updating credit for outstanding requests");
244               flowCreditIfNeeded(credits);
245             }
246           }
247         }
248 
249         return true;
250       } else {
251        // LOG.trace("skipped calling onNext, already completed or cancelled");
252         logInfo("skipped calling onNext, already completed or cancelled");
253         return false;
254       }
255     }
256 
257     
258     public void request(long n) {
259       if(n <= 0 && !cancelled) {
260         logError("non-positive subscription request, requests must be > 0");
261         //connCtx.runOnContext(x -> {
262         //  indicateError(new IllegalArgumentException("non-positive subscription request, requests must be > 0"));
263         //});
264       } else if(!cancelled) {
265 
266           if(n == Long.MAX_VALUE) {
267             outstandingRequests = Long.MAX_VALUE;
268           } else {
269             try {
270               outstandingRequests = MathHelper.addExact(n, outstandingRequests);
271             } catch (ArithmeticException ae) {
272               outstandingRequests = Long.MAX_VALUE;
273             }
274           }
275 
276           if(cancelled) {
277            // LOG.trace("Not sending more credit, subscription cancelled since request was originally scheduled");
278             logInfo("Not sending more credit, subscription cancelled since request was originally scheduled");
279             return;
280           }
281 
282           flowCreditIfNeeded(n);
283 
284 
285         //connCtx.runOnContext(x -> {
286         //  if (LOG.isTraceEnabled()) {
287         //    LOG.trace("Processing request: " + n);
288         //  }
289         //
290         //  if(n == Long.MAX_VALUE) {
291         //    outstandingRequests = Long.MAX_VALUE;
292         //  } else {
293         //    try {
294         //      outstandingRequests = Math.addExact(n, outstandingRequests);
295         //    } catch (ArithmeticException ae) {
296         //      outstandingRequests = Long.MAX_VALUE;
297         //    }
298         //  }
299         //
300         //  if(cancelled.get()) {
301         //    LOG.trace("Not sending more credit, subscription cancelled since request was originally scheduled");
302         //    return;
303         //  }
304         //
305         //  flowCreditIfNeeded(n);
306         //});
307       }
308     }
309 
310     private void flowCreditIfNeeded(long n) {
311       int currentCredit = receiver.getCredit();
312       if(currentCredit < maxOutstandingCredit) {
313         int limit = maxOutstandingCredit - currentCredit;
314         int addedCredit  = cast(int) min(n, limit);
315 
316         if(addedCredit > 0) {
317           if(!completed) {
318             //if (LOG.isTraceEnabled()) {
319             //  LOG.trace("Flowing additional credits : " + addedCredit);
320             //}
321             receiver.flow(addedCredit);
322           } else {
323             logInfo("Skipping flowing additional credits as already completed");
324             //if (LOG.isTraceEnabled()) {
325             //  LOG.trace("Skipping flowing additional credits as already completed: " + addedCredit);
326             //}
327           }
328         }
329       }
330     }
331 
332     
333     public void cancel() {
334       if (!cancelled) {
335         cancelled = true;
336 
337         receiver.closeHandler(
338         new class Handler!ProtonReceiver{
339           void handle(ProtonReceiver o)
340           {
341             indicateCompletion();
342             receiver.close();
343           }
344         }
345         );
346 
347         receiver.detachHandler(
348         new class Handler!ProtonReceiver{
349           void handle(ProtonReceiver o)
350           {
351             indicateCompletion();
352             receiver.detach();
353           }
354         }
355         );
356 
357         if (durable) {
358           receiver.detach();
359         } else {
360           receiver.close();
361         }
362 
363 
364         //  connCtx.runOnContext(x -> {
365         //    LOG.trace("Cancelling");
366         //    receiver.closeHandler(y -> {
367         //      indicateCompletion();
368         //      receiver.close();
369         //    });
370         //    receiver.detachHandler(y -> {
371         //      indicateCompletion();
372         //      receiver.detach();
373         //    });
374         //
375         //    if(durable) {
376         //      receiver.detach();
377         //    } else {
378         //      receiver.close();
379         //    }
380         //  });
381         //} else {
382         //  LOG.trace("Cancel no-op, already called.");
383         //}
384       }
385     }
386     private void indicateError(Throwable t) {
387       if(!completed){
388         completed = true;
389         Subscriber!Delivery sub = subcriber;
390         subcriber = null;
391         if(sub !is null && !cancelled) {
392           //LOG.trace("Indicating error");
393           logError("Indicating error");
394           sub.onError(t);
395         } else {
396           //LOG.trace("Skipping error indication, no sub or already cancelled");
397           logError("Skipping error indication, no sub or already cancelled");
398         }
399       }
400       else {
401         //LOG.trace("indicateError no-op, already completed");
402         logInfo("indicateError no-op, already completed");
403       }
404     }
405 
406     private void indicateSubscribed() {
407       if(!completed){
408         logInfo("Indicating subscribed");
409         if(subcriber !is null) {
410           subcriber.onSubscribe(this);
411         }
412       } else {
413         logInfo("indicateSubscribed no-op, already completed");
414       }
415     }
416 
417     private void indicateCompletion() {
418       if(!completed){
419         completed = true;
420         Subscriber!Delivery sub = subcriber;
421         subcriber = null;
422 
423         bool canned = cancelled;
424         if(sub !is null && ((outstandingRequests > 0  && canned) || !canned)) {
425           logInfo("Indicating completion");
426           sub.onComplete();
427         } else {
428           logInfo("Skipping completion indication");
429         }
430       } else {
431         logInfo("indicateCompletion no-op, already completed");
432       }
433     }
434   }
435 
436   public bool isEmitOnConnectionEnd() {
437     return emitOnConnectionEnd;
438   }
439 
440   public void setEmitOnConnectionEnd(bool emitOnConnectionEnd) {
441     this.emitOnConnectionEnd = emitOnConnectionEnd;
442   }
443 
444   public ProtonReceiver getLink() {
445     return receiver;
446   }
447 
448   // ==================================================
449 
450   
451   public ProtonPublisher!Delivery setSource(hunt.proton.amqp.transport.Source.Source source) {
452     receiver.setSource(source);
453     return this;
454   }
455 
456   
457   public hunt.proton.amqp.transport.Source.Source getSource() {
458     return receiver.getSource();
459   }
460 
461   
462   public ProtonPublisher!Delivery setTarget(Target target) {
463     receiver.setTarget(target);
464     return this;
465   }
466 
467   
468   public Target getTarget() {
469     return receiver.getTarget();
470   }
471 
472   
473   public hunt.proton.amqp.transport.Source.Source getRemoteSource() {
474     return receiver.getRemoteSource();
475   }
476 
477   
478   public Target getRemoteTarget() {
479     return receiver.getRemoteTarget();
480   }
481 
482   
483   public string getRemoteAddress() {
484     hunt.proton.amqp.transport.Source.Source remoteSource = getRemoteSource();
485 
486     return remoteSource is null ? null : (cast(string)(remoteSource.getAddress().getBytes()));
487   }
488 }