1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.impl.ProtonLinkImpl;
12 
13 //import io.vertx.core.AsyncResult;
14 //import io.vertx.core.Handler;
15 import hunt.amqp.ProtonHelper;
16 import hunt.amqp.ProtonLink;
17 import hunt.amqp.ProtonQoS;
18 
19 import hunt.proton.amqp.Symbol;
20 import hunt.proton.amqp.UnsignedLong;
21 import hunt.proton.amqp.transport.ErrorCondition;
22 import hunt.proton.amqp.transport.ReceiverSettleMode;
23 import hunt.proton.amqp.transport.SenderSettleMode;
24 import hunt.proton.amqp.transport.Source;
25 import hunt.proton.amqp.transport.Target;
26 import hunt.proton.engine.Delivery;
27 import hunt.proton.engine.EndpointState;
28 import hunt.proton.engine.Link;
29 import hunt.proton.engine.Record;
30 import hunt.collection.Map;
31 import hunt.amqp.Handler;
32 import hunt.amqp.impl.ProtonSessionImpl;
33 import hunt.Exceptions;
34 import hunt.logging;
35 /**
36  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
37  */
38 abstract class ProtonLinkImpl(T) : ProtonLink!T {
39 
40   protected  Link link;
41   private Handler!T _openHandler;
42   private Handler!T _closeHandler;
43   private Handler!T _detachHandler;
44 
45   this(Link link) {
46     this.link = link;
47     this.link.setContext(this);
48 
49     setQoS(getRemoteQoS());
50   }
51 
52   public abstract T self();
53 
54   public ProtonSessionImpl getSession() {
55     return cast(ProtonSessionImpl) (this.link.getSession().getContext());
56   }
57 
58   public Record attachments() {
59     return link.attachments();
60   }
61 
62   public ErrorCondition getCondition() {
63     return link.getCondition();
64   }
65 
66   public int getCredit() {
67     return link.getCredit();
68   }
69 
70   public bool getDrain() {
71     return link.getDrain();
72   }
73 
74   public EndpointState getLocalState() {
75     return link.getLocalState();
76   }
77 
78   public string getName() {
79     return link.getName();
80   }
81 
82   public ErrorCondition getRemoteCondition() {
83     return link.getRemoteCondition();
84   }
85 
86   public int getRemoteCredit() {
87     return link.getRemoteCredit();
88   }
89 
90   public EndpointState getRemoteState() {
91     return link.getRemoteState();
92   }
93 
94   public Target getRemoteTarget() {
95     return link.getRemoteTarget();
96   }
97 
98   public Target getTarget() {
99     return link.getTarget();
100   }
101 
102   public T setTarget(Target target) {
103     link.setTarget(target);
104     return self();
105   }
106 
107   public Source getRemoteSource() {
108     return link.getRemoteSource();
109   }
110 
111   public Source getSource() {
112     return link.getSource();
113   }
114 
115   public T setSource(Source source) {
116     link.setSource(source);
117     return self();
118   }
119 
120   public int getUnsettled() {
121     return link.getUnsettled();
122   }
123 
124   public int getQueued() {
125     return link.getQueued();
126   }
127 
128   public bool advance() {
129     return link.advance();
130   }
131 
132   public int drained() {
133     int drained = link.drained();
134     getSession().getConnectionImpl().flush();
135     return drained;
136   }
137 
138   public bool detached() {
139     return link.detached();
140   }
141 
142   public Delivery delivery(byte[] tag, int offset, int length) {
143     return link.delivery(tag, offset, length);
144   }
145 
146   public Delivery current() {
147     return link.current();
148   }
149 
150   public T setCondition(ErrorCondition condition) {
151     link.setCondition(condition);
152     return self();
153   }
154 
155   public Delivery delivery(byte[] tag) {
156     return link.delivery(tag);
157   }
158 
159   public T open() {
160     link.open();
161     version(HUNT_AMQP_DEBUG) logInfo("link open flush");
162     getSession().getConnectionImpl().flush();
163     return self();
164   }
165 
166   public T close() {
167     link.close();
168     getSession().getConnectionImpl().flush();
169     return self();
170   }
171 
172   public T detach() {
173     link.detach();
174     getSession().getConnectionImpl().flush();
175     return self();
176   }
177 
178   public T openHandler(Handler!T openHandler) {
179     this._openHandler = openHandler;
180     return self();
181   }
182 
183   public T closeHandler(Handler!T closeHandler) {
184     this._closeHandler = closeHandler;
185     return self();
186   }
187 
188   public T detachHandler(Handler!T detachHandler) {
189     this._detachHandler = detachHandler;
190     return self();
191   }
192 
193   public bool isOpen() {
194     return getLocalState() == EndpointState.ACTIVE;
195   }
196 
197   public ProtonQoS getQoS() {
198     if (link.getSenderSettleMode() == SenderSettleMode.SETTLED) {
199       return ProtonQoS.AT_MOST_ONCE;
200     }
201 
202     return ProtonQoS.AT_LEAST_ONCE;
203   }
204 
205   public ProtonQoS getRemoteQoS() {
206     if (link.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED) {
207       return ProtonQoS.AT_MOST_ONCE;
208     }
209 
210     return ProtonQoS.AT_LEAST_ONCE;
211   }
212 
213   public T setQoS(ProtonQoS qos) {
214     int type = qos.ordinal();
215     enum int AT_MOST_ONCE = ProtonQoS.AT_MOST_ONCE.ordinal;
216     enum int AT_LEAST_ONCE = ProtonQoS.AT_LEAST_ONCE.ordinal;
217     switch (type) {
218     case AT_MOST_ONCE:
219       link.setSenderSettleMode(SenderSettleMode.SETTLED);
220       link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
221       break;
222     case AT_LEAST_ONCE:
223       link.setSenderSettleMode(SenderSettleMode.UNSETTLED);
224       link.setReceiverSettleMode(ReceiverSettleMode.FIRST);
225       break;
226     default:
227       break;
228     }
229     return self();
230   }
231 
232   public UnsignedLong getMaxMessageSize() {
233     return link.getMaxMessageSize();
234   }
235 
236   public void setMaxMessageSize(UnsignedLong maxMessageSize) {
237     link.setMaxMessageSize(maxMessageSize);
238   }
239 
240   public UnsignedLong getRemoteMaxMessageSize() {
241     return link.getRemoteMaxMessageSize();
242   }
243 
244   public Map!(Symbol, Object) getRemoteProperties() {
245     return link.getRemoteProperties();
246   }
247 
248   public void setProperties(Map!(Symbol, Object) properties) {
249     link.setProperties(properties);
250   }
251 
252   public void setOfferedCapabilities(Symbol[] capabilities) {
253     link.setOfferedCapabilities(capabilities);
254   }
255 
256   public Symbol[] getRemoteOfferedCapabilities() {
257     return link.getRemoteOfferedCapabilities();
258   }
259 
260   public void setDesiredCapabilities(Symbol[] capabilities) {
261     link.setDesiredCapabilities(capabilities);
262   }
263 
264   public Symbol[] getRemoteDesiredCapabilities() {
265     return link.getRemoteDesiredCapabilities();
266   }
267 
268   public void free() {
269     link.free();
270     getSession().getConnectionImpl().flush();
271   }
272 
273   /////////////////////////////////////////////////////////////////////////////
274   //
275   // Implementation details hidden from public api.
276   //
277   /////////////////////////////////////////////////////////////////////////////
278   void fireRemoteOpen() {
279     if (_openHandler !is null) {
280       //openHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
281       _openHandler.handle(self());
282     }
283   }
284 
285   void fireRemoteDetach() {
286     if (_detachHandler !is null) {
287       //detachHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
288       _detachHandler.handle(self());
289     }
290   }
291 
292   void fireRemoteClose() {
293     if (_closeHandler !is null) {
294       //closeHandler.handle(ProtonHelper.future(self(), getRemoteCondition()));
295       _closeHandler.handle(self());
296     }
297   }
298 
299   abstract void handleLinkFlow();
300 }