1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.streams.impl.ProtonSubscriberImpl;
12 
13 
14 import hunt.proton.amqp.transport.Source;
15 import hunt.proton.amqp.transport.Target;
16 import hunt.amqp.streams.Subscription;
17 
18 import hunt.amqp.Handler;
19 import hunt.amqp.ProtonDelivery;
20 import hunt.amqp.ProtonLinkOptions;
21 import hunt.amqp.ProtonSender;
22 import hunt.amqp.impl.ProtonConnectionImpl;
23 import hunt.amqp.impl.ProtonDeliveryImpl;
24 import hunt.amqp.streams.ProtonSubscriber;
25 import hunt.amqp.streams.ProtonSubscriberOptions;
26 import hunt.amqp.streams.Tracker;
27 import hunt.net.Connection;
28 import hunt.Exceptions;
29 import hunt.logging;
30 import hunt.Object;
31 import hunt.amqp.streams.impl.TrackerImpl;
32 
33 class ProtonSubscriberImpl : ProtonSubscriber!Tracker {
34 
35   private Subscription sub;
36   private Connection connCtx;
37   private ProtonConnectionImpl conn;
38   private ProtonSender sender;
39   //private  AtomicBoolean subscribed = new AtomicBoolean();
40   //private  AtomicBoolean completed = new AtomicBoolean();
41   //private  AtomicBoolean cancelledSub = new AtomicBoolean();
42 
43   private  bool subscribed ;
44   private  bool completed ;
45   private  bool cancelledSub ;
46   private bool emitOnConnectionEnd = true;
47   private long outstandingRequests = 0;
48 
49   this(string address, ProtonConnectionImpl conn) {
50     this(address, conn, new ProtonSubscriberOptions());
51   }
52 
53   this(string address, ProtonConnectionImpl conn, ProtonSubscriberOptions options) {
54     this.connCtx = conn.getContext();
55     this.conn = conn;
56 
57     ProtonLinkOptions linkOptions = new ProtonLinkOptions();
58     if(options.getLinkName() !is null) {
59       linkOptions.setLinkName(options.getLinkName());
60     }
61 
62     sender = conn.createSender(address, linkOptions);
63     sender.setAutoDrained(false);
64   }
65 
66   
67   public void onSubscribe(Subscription subscription) {
68    // Objects.requireNonNull(subscription, "A subscription must be supplied");
69 
70     if(subscribed) {
71       subscribed = true;
72       logInfo("Only a single Subscription is supported and already subscribed, cancelling new subscriber.");
73       subscription.cancel();
74       return;
75     }
76 
77     this.sub = subscription;
78 
79    conn.addEndHandler(
80    new class Handler!Void{
81      void handle(Void o)
82      {
83        if(emitOnConnectionEnd) {
84          cancelSub();
85        }
86      }
87    }
88    );
89 
90   sender.sendQueueDrainHandler(
91   new class Handler!ProtonSender{
92     void handle(ProtonSender o)
93     {
94       if(!completed && !cancelledSub) {
95         long credit = sender.getCredit();
96         long newRequests = credit - outstandingRequests;
97 
98         if(newRequests > 0) {
99           outstandingRequests += newRequests;
100           sub.request(newRequests);
101         }
102       }
103     }
104   }
105   );
106 
107   sender.detachHandler(
108   new class Handler!ProtonSender{
109     void handle(ProtonSender o)
110     {
111       cancelSub();
112       sender.detach();
113     }
114   }
115   );
116 
117   sender.closeHandler(
118   new class Handler!ProtonSender{
119     void handle(ProtonSender o)
120     {
121       cancelSub();
122       sender.close();
123     }
124   }
125   );
126 
127   sender.open();
128 
129 
130     //connCtx.runOnContext(x-> {
131     //  conn.addEndHandler(v -> {
132     //    if(emitOnConnectionEnd) {
133     //      cancelSub();
134     //    }
135     //  });
136     //
137     //  sender.sendQueueDrainHandler(sender -> {
138     //    if(!completed.get() && !cancelledSub.get()) {
139     //      long credit = sender.getCredit();
140     //      long newRequests = credit - outstandingRequests;
141     //
142     //      if(newRequests > 0) {
143     //        outstandingRequests += newRequests;
144     //        sub.request(newRequests);
145     //      }
146     //    }
147     //  });
148     //
149     //  sender.detachHandler(res-> {
150     //    cancelSub();
151     //    sender.detach();
152     //  });
153     //
154     //  sender.closeHandler(res-> {
155     //    cancelSub();
156     //    sender.close();
157     //  });
158     //
159     //  sender.openHandler(res -> {
160     //    LOG.trace("Attach received");
161     //  });
162     //
163     //  sender.open();
164     //});
165   }
166 
167   private void cancelSub() {
168     if(!cancelledSub) {
169       cancelledSub = true;
170       sub.cancel();
171     }
172   }
173 
174   
175   public void onNext(Tracker tracker) {
176    // Objects.requireNonNull(tracker, "An element must be supplied when calling onNext");
177 
178     if(!completed) {
179         outstandingRequests--;
180         TrackerImpl env = cast(TrackerImpl) tracker;
181         ProtonDelivery delivery = sender.send(tracker.message(), new class Handler!ProtonDelivery
182         {
183           void handle(ProtonDelivery var1)
184           {
185             Handler!Tracker h = env.handler();
186             if(h !is null) {
187               h.handle(env);
188             }
189           }
190         });
191         env.setDelivery(cast(ProtonDeliveryImpl) delivery);
192     }
193   }
194 
195   
196   public void onError(Throwable t) {
197 
198     if(!completed) {
199       completed = true;
200       sender.sendQueueDrainHandler(null);
201       sender.detachHandler(null);
202       sender.closeHandler(null);
203     }
204   }
205 
206   
207   public void onComplete() {
208     if(!completed) {
209       completed = true;
210       sender.sendQueueDrainHandler(null);
211       sender.detachHandler(null);
212       sender.closeHandler(null);
213       sender.close();
214     }
215   }
216 
217   
218   public ProtonSubscriber!Tracker setSource(Source source) {
219     sender.setSource(source);
220     return this;
221   }
222 
223   
224   public Source getSource() {
225     return sender.getSource();
226   }
227 
228   
229   public ProtonSubscriber!Tracker setTarget(Target target) {
230     sender.setTarget(target);
231     return this;
232   }
233 
234   
235   public Target getTarget() {
236     return sender.getTarget();
237   }
238 
239   public Source getRemoteSource() {
240     return sender.getRemoteSource();
241   }
242 
243   public Target getRemoteTarget() {
244     return sender.getRemoteTarget();
245   }
246 
247   public bool isEmitOnConnectionEnd() {
248     return emitOnConnectionEnd;
249   }
250 
251   public void setEmitOnConnectionEnd(bool emitOnConnectionEnd) {
252     this.emitOnConnectionEnd = emitOnConnectionEnd;
253   }
254 
255   public ProtonSender getLink() {
256     return sender;
257   }
258 }