1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.impl.ProtonSessionImpl;
12 
13 import hunt.amqp.ProtonReceiver;
14 import hunt.amqp.ProtonSender;
15 import hunt.amqp.ProtonSession;
16 import hunt.amqp.ProtonConnection;
17 import hunt.amqp.ProtonHelper;
18 import hunt.amqp.ProtonQoS;
19 import hunt.amqp.ProtonLinkOptions;
20 import hunt.amqp.impl.ProtonConnectionImpl;
21 import hunt.amqp.impl.ProtonReceiverImpl;
22 import hunt.amqp.impl.ProtonSenderImpl;
23 
24 import hunt.proton.amqp.Symbol;
25 import hunt.proton.amqp.messaging.Accepted;
26 import hunt.proton.amqp.messaging.Modified;
27 import hunt.proton.amqp.messaging.Rejected;
28 import hunt.proton.amqp.messaging.Released;
29 import hunt.proton.amqp.messaging.Source;
30 import hunt.proton.amqp.messaging.Target;
31 import hunt.proton.amqp.transport.ErrorCondition;
32 import hunt.proton.engine.EndpointState;
33 import hunt.proton.engine.Receiver;
34 import hunt.proton.engine.Record;
35 import hunt.proton.engine.Sender;
36 import hunt.proton.engine.Session;
37 import hunt.Exceptions;
38 import hunt.String;
39 import hunt.collection.ArrayList;
40 import std.conv: to;
41 import hunt.Boolean;
42 import hunt.amqp.Handler;
43 import hunt.logging;
44 /**
45  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
46  */
47 class ProtonSessionImpl : ProtonSession {
48 
49     private  Session session;
50     private int autoLinkCounter = 0;
51     private Handler!ProtonSession _openHandler ;//= (result) -> {
52     //  LOG.trace("Session open completed");
53     //};
54     private Handler!ProtonSession _closeHandler; // = (result) -> {
55     //  if (result.succeeded()) {
56     //    LOG.trace("Session closed");
57     //  } else {
58     //    LOG.warn("Session closed with error", result.cause());
59     //  }
60     //};
61 
62     this(Session session) {
63         this.session = session;
64         this.session.setContext(this);
65         session.setIncomingCapacity(2147483647);
66         _openHandler = new class Handler!ProtonSession
67         {
68             void handle(ProtonSession var1)
69             {
70                 version(HUNT_DEBUG) logInfo("Session open completed");
71             }
72         };
73 
74         _closeHandler = new class Handler!ProtonSession
75         {
76             void handle(ProtonSession var1)
77             {
78                 error("Session closed with error");
79             }
80         };
81 
82     }
83 
84     
85     ProtonConnection getConnection() {
86         return getConnectionImpl();
87     }
88 
89     ProtonConnectionImpl getConnectionImpl() {
90         return cast(ProtonConnectionImpl) (this.session.getConnection().getContext());
91     }
92 
93     long getOutgoingWindow() {
94         return session.getOutgoingWindow();
95     }
96 
97     
98     ProtonSession setIncomingCapacity(int bytes) {
99         session.setIncomingCapacity(bytes);
100         return this;
101     }
102 
103     int getOutgoingBytes() {
104         return session.getOutgoingBytes();
105     }
106 
107     EndpointState getRemoteState() {
108         return session.getRemoteState();
109     }
110 
111     int getIncomingBytes() {
112         return session.getIncomingBytes();
113     }
114 
115     
116     ErrorCondition getRemoteCondition() {
117         return session.getRemoteCondition();
118     }
119 
120     
121     int getIncomingCapacity() {
122         return session.getIncomingCapacity();
123     }
124 
125     EndpointState getLocalState() {
126         return session.getLocalState();
127     }
128 
129     
130     ProtonSession setCondition(ErrorCondition condition) {
131         session.setCondition(condition);
132         return this;
133     }
134 
135     
136     ErrorCondition getCondition() {
137         return session.getCondition();
138     }
139 
140     void setOutgoingWindow(long outgoingWindowSize) {
141         session.setOutgoingWindow(outgoingWindowSize);
142     }
143 
144     
145     ProtonSessionImpl open() {
146         session.open();
147         version(HUNT_AMQP_DEBUG) logInfo("session open");
148         getConnectionImpl().flush();
149         return this;
150     }
151 
152     
153     ProtonSessionImpl close() {
154         version(HUNT_AMQP_DEBUG) logInfo("session close");
155         session.close();
156         getConnectionImpl().flush();
157         return this;
158     }
159 
160     
161     ProtonSessionImpl openHandler(Handler!ProtonSession openHandler) {
162         this._openHandler = openHandler;
163         return this;
164     }
165 
166 
167     ProtonSessionImpl closeHandler(Handler!ProtonSession closeHandler) {
168         this._closeHandler = closeHandler;
169         return this;
170     }
171 
172     private string generateLinkName() {
173         // TODO: include useful details in name, like address and container?
174         return "auto-" ~ to!string((autoLinkCounter++));
175     }
176 
177     private string getOrCreateLinkName(ProtonLinkOptions linkOptions) {
178         return linkOptions.getLinkName() is null ? generateLinkName() : linkOptions.getLinkName();
179     }
180 
181     
182     ProtonReceiver createReceiver(string address) {
183         return createReceiver(address, new ProtonLinkOptions());
184     }
185 
186     
187     ProtonReceiver createReceiver(string address, ProtonLinkOptions receiverOptions) {
188         Receiver receiver = session.receiver(getOrCreateLinkName(receiverOptions));
189 
190         Symbol[] outcomes = [ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
191                 Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL ];
192 
193         Source source = new Source();
194         source.setAddress(new String(address));
195         //source.setOutcomes(new ArrayList!Symbol(outcomes));
196         source.setDefaultOutcome(Released.getInstance());
197         if(receiverOptions.isDynamic()) {
198             source.setDynamic(new Boolean(true));
199         }
200 
201         Target target = new Target();
202 
203         receiver.setSource(source);
204         receiver.setTarget(target);
205 
206         ProtonReceiverImpl r = new ProtonReceiverImpl(receiver);
207         //r.openHandler((result) -> {
208         //  LOG.trace("Receiver open completed");
209         //});
210         //r.closeHandler((result) -> {
211         //  if (result.succeeded()) {
212         //    LOG.trace("Receiver closed");
213         //  } else {
214         //    LOG.warn("Receiver closed with error", result.cause());
215         //  }
216         //});
217 
218         // Default to at-least-once
219         r.setQoS(ProtonQoS.AT_LEAST_ONCE);
220 
221         return r;
222     }
223 
224     
225     ProtonSender createSender(string address) {
226         return createSender(address, new ProtonLinkOptions());
227     }
228 
229     
230     ProtonSender createSender(string address, ProtonLinkOptions senderOptions) {
231         Sender sender = session.sender(getOrCreateLinkName(senderOptions));
232 
233         Symbol[] outcomes = [ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
234                 Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL ];
235         Source source = new Source();
236         //source.setOutcomes(new ArrayList!Symbol(outcomes));
237 
238         Target target = new Target();
239         target.setAddress(address is null ? null : new String(address));
240         if(senderOptions.isDynamic()) {
241             target.setDynamic(new Boolean(true));
242         }
243 
244         sender.setSource(source);
245         sender.setTarget(target);
246 
247         ProtonSenderImpl s = new ProtonSenderImpl(sender);
248         if (address is null) {
249             s.setAnonymousSender(true);
250         }
251 
252         //s.openHandler((result) -> {
253         //  LOG.trace("Sender open completed");
254         //});
255         //s.closeHandler((result) -> {
256         //  if (result.succeeded()) {
257         //    LOG.trace("Sender closed");
258         //  } else {
259         //    LOG.warn("Sender closed with error", result.cause());
260         //  }
261         //});
262 
263         // Default to at-least-once
264         s.setQoS(ProtonQoS.AT_LEAST_ONCE);
265 
266         return s;
267     }
268 
269     
270     Record attachments() {
271         return session.attachments();
272     }
273 
274     
275     void free() {
276         session.free();
277         getConnectionImpl().flush();
278     }
279 
280     /////////////////////////////////////////////////////////////////////////////
281     //
282     // Implementation details hidden from api.
283     //
284     /////////////////////////////////////////////////////////////////////////////
285     void fireRemoteOpen() {
286         version(HUNT_DEBUG) {
287             string s = this.session.getConnection().getRemoteHostname();
288             warningf("The remote opened: %s", s);
289         }
290         if (_openHandler !is null)
291         {
292             _openHandler.handle(null);
293         }
294     }
295 
296     void fireRemoteClose() {
297         version(HUNT_DEBUG) {
298             string s = this.session.getConnection().getRemoteHostname();
299             warningf("The remote closed: %s", s);
300         }
301 
302         if (_closeHandler !is null) {
303             _closeHandler.handle(null);
304         }
305     }
306 
307 }