1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.impl.ProtonConnectionImpl;
12 
13 import hunt.Exceptions;
14 import hunt.amqp.ProtonConnection;
15 import hunt.amqp.ProtonHelper;
16 import hunt.amqp.ProtonLinkOptions;
17 import hunt.amqp.ProtonReceiver;
18 import hunt.amqp.ProtonSender;
19 import hunt.amqp.ProtonSession;
20 import hunt.amqp.ProtonTransportOptions;
21 import hunt.amqp.sasl.ProtonSaslAuthenticator;
22 
23 import hunt.logging;
24 import hunt.proton.Proton;
25 import hunt.proton.amqp.Symbol;
26 import hunt.proton.amqp.transport.ErrorCondition;
27 import hunt.proton.engine.Connection;
28 import hunt.proton.engine.EndpointState;
29 import hunt.proton.engine.Link;
30 import hunt.proton.engine.Receiver;
31 import hunt.proton.engine.Record;
32 import hunt.proton.engine.Sender;
33 import hunt.proton.engine.Session;
34 import hunt.amqp.impl.ProtonSenderImpl;
35 import hunt.amqp.impl.ProtonReceiverImpl;
36 
37 import hunt.collection.ArrayList;
38 import hunt.collection.LinkedHashMap;
39 import hunt.collection.List;
40 import hunt.collection.Map;
41 import hunt.net.NetClient;
42 
43 
44 import hunt.amqp.impl.ProtonTransport;
45 import hunt.amqp.Handler;
46 import std.concurrency : initOnce;
47 import std.uuid;
48 import std.random;
49 import hunt.amqp.impl.ProtonMetaDataSupportImpl;
50 import hunt.amqp.impl.ProtonSessionImpl;
51 import hunt.net.Connection;
52 import hunt.amqp.impl.ProtonSaslClientAuthenticatorImpl;
53 import hunt.Object;
54 import hunt.String;
55 
56 /**
57  * @author <a href="http://tfox.org">Tim Fox</a>
58  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
59  */
60 class ProtonConnectionImpl : ProtonConnection {
61 
62     // static  Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
63 
64     static Symbol ANONYMOUS_RELAY() {
65         __gshared Symbol inst;
66         return initOnce!inst(Symbol.valueOf("ANONYMOUS-RELAY"));
67     }
68 
69     private hunt.proton.engine.Connection.Connection connection; //= Proton.connection();
70     private ProtonTransport transport;
71     private Handler!(Void)[] endHandlers;
72 
73     private Handler!ProtonConnection _openHandler; //= (result) -> {
74     //  LOG.trace("Connection open completed");
75     //};
76     private AsyncResultHandler!ProtonConnection _closeHandler;
77 
78     private AmqpEventHandler!ProtonConnection _disconnectHandler; 
79     //
80     private AmqpEventHandler!ProtonSession _sessionOpenHandler;
81 
82     private Handler!ProtonSender _senderOpenHandler; //= (sender) -> {
83     //  sender.setCondition(new ErrorCondition(Symbol.getSymbol("Not Supported"), ""));
84     //};
85     private Handler!ProtonReceiver _receiverOpenHandler; //= (receiver) -> {
86     //  receiver.setCondition(new ErrorCondition(Symbol.getSymbol("Not Supported"), ""));
87     //};
88     private bool anonymousRelaySupported;
89     private ProtonSession defaultSession;
90     private hunt.net.Connection.Connection _conn;
91 
92     this(string hostname, hunt.net.Connection.Connection conn) {
93         _closeHandler = (result) {
94             if (result.succeeded()) {
95                 trace("Connection closed");
96             } else {
97                 warning("Connection closed with error", result.cause());
98             }
99         };
100 
101         _disconnectHandler = (connection) {
102             trace("Connection disconnected");
103         };
104 
105         _sessionOpenHandler = (session) {
106             session.setCondition(new ErrorCondition(Symbol.getSymbol("Not Supported"), new String("")));
107         };
108 
109         this.connection = Proton.connection();
110         this.connection.setContext(this);
111         string tmp = "vert.x-";
112         tmp ~= randomUUID().toString();
113         this.connection.setContainer(tmp);
114         this.connection.setHostname(hostname);
115         this._conn = conn;
116         Map!(Symbol, Object) props = createInitialPropertiesMap();
117         connection.setProperties(props);
118     }
119 
120     private Map!(Symbol, Object) createInitialPropertiesMap() {
121         Map!(Symbol, Object) props = new LinkedHashMap!(Symbol, Object)();
122         props.put(ProtonMetaDataSupportImpl.PRODUCT_KEY, new String("vertx-proton"));
123         props.put(ProtonMetaDataSupportImpl.VERSION_KEY, new String("version"));
124         return props;
125     }
126 
127     /////////////////////////////////////////////////////////////////////////////
128     //
129     // Delegated state tracking
130     //
131     /////////////////////////////////////////////////////////////////////////////
132 
133     ProtonConnectionImpl setProperties(Map!(Symbol, Object) properties) {
134         Map!(Symbol, Object) newProps = null;
135         if (properties !is null) {
136             newProps = createInitialPropertiesMap();
137             newProps.putAll(properties);
138         }
139 
140         connection.setProperties(newProps);
141         return this;
142     }
143 
144     ProtonConnectionImpl setOfferedCapabilities(Symbol[] capabilities) {
145         connection.setOfferedCapabilities(capabilities);
146         return this;
147     }
148 
149     ProtonConnectionImpl setHostname(string hostname) {
150         connection.setHostname(hostname);
151         return this;
152     }
153 
154     ProtonConnectionImpl setDesiredCapabilities(Symbol[] capabilities) {
155         connection.setDesiredCapabilities(capabilities);
156         return this;
157     }
158 
159     ProtonConnectionImpl setContainer(string container) {
160         connection.setContainer(container);
161         return this;
162     }
163 
164     ProtonConnectionImpl setCondition(ErrorCondition condition) {
165         connection.setCondition(condition);
166         return this;
167     }
168 
169     ErrorCondition getCondition() {
170         return connection.getCondition();
171     }
172 
173     string getContainer() {
174         return connection.getContainer();
175     }
176 
177     string getHostname() {
178         return connection.getHostname();
179     }
180 
181     EndpointState getLocalState() {
182         return connection.getLocalState();
183     }
184 
185     ErrorCondition getRemoteCondition() {
186         return connection.getRemoteCondition();
187     }
188 
189     string getRemoteContainer() {
190         return connection.getRemoteContainer();
191     }
192 
193     Symbol[] getRemoteDesiredCapabilities() {
194         return connection.getRemoteDesiredCapabilities();
195     }
196 
197     string getRemoteHostname() {
198         return connection.getRemoteHostname();
199     }
200 
201     Symbol[] getRemoteOfferedCapabilities() {
202         return connection.getRemoteOfferedCapabilities();
203     }
204 
205     Map!(Symbol, Object) getRemoteProperties() {
206         return connection.getRemoteProperties();
207     }
208 
209     EndpointState getRemoteState() {
210         return connection.getRemoteState();
211     }
212 
213     bool isAnonymousRelaySupported() {
214         return anonymousRelaySupported;
215     }
216 
217     Record attachments() {
218         return connection.attachments();
219     }
220 
221     /////////////////////////////////////////////////////////////////////////////
222     //
223     // Handle/Trigger connection level state changes
224     //
225     /////////////////////////////////////////////////////////////////////////////
226 
227     ProtonConnection open() {
228         version (HUNT_AMQP_DEBUG)
229             logInfo("open-------");
230         connection.open();
231         flush();
232         return this;
233     }
234 
235     ProtonConnection close() {
236         connection.close();
237         flush();
238         return this;
239     }
240 
241     ProtonSessionImpl createSession() {
242         return new ProtonSessionImpl(connection.session());
243     }
244 
245     private ProtonSession getDefaultSession() {
246         if (defaultSession is null) {
247             defaultSession = createSession();
248             //defaultSession.closeHandler(result -> {
249             //  string msg = "The connections default session closed unexpectedly";
250             //  if (!result.succeeded()) {
251             //    msg += ": ";
252             //    msg += ": " + string.valueOf(result.cause());
253             //  }
254             //  Future<ProtonConnection> failure = Future.failedFuture(msg);
255             //  Handler<AsyncResult<ProtonConnection>> connCloseHandler = closeHandler;
256             //  if (connCloseHandler !is null) {
257             //    connCloseHandler.handle(failure);
258             //  }
259             //});
260 
261             //defaultSession.closeHandler ( new class Handler!ProtonSession {
262             //  void handle(ProtonSession var1)
263             //      {
264             //        string msg = "The connections default session closed unexpectedly";
265             //          if (closeHandler !is null) {
266             //            closeHandler.handle();
267             //          }
268             //      }
269             //  } );
270 
271             defaultSession.open();
272             // Deliberately not flushing, the sender/receiver open
273             // call will do that (if it doesn't happen otherwise).
274         }
275         return defaultSession;
276     }
277 
278     ProtonSender createSender(string address) {
279         return getDefaultSession().createSender((address));
280     }
281 
282     ProtonSender createSender(string address, ProtonLinkOptions senderOptions) {
283         return getDefaultSession().createSender((address), senderOptions);
284     }
285 
286     ProtonReceiver createReceiver(string address) {
287         return getDefaultSession().createReceiver((address));
288     }
289 
290     ProtonReceiver createReceiver(string address, ProtonLinkOptions receiverOptions) {
291         return getDefaultSession().createReceiver((address), receiverOptions);
292     }
293 
294     void flush() {
295         if (transport !is null) {
296             version (HUNT_AMQP_DEBUG)
297                 logInfo("transport flush");
298             transport.flush();
299         }
300     }
301 
302     void disconnect() {
303         if (transport !is null) {
304             transport.disconnect();
305         }
306     }
307 
308     bool isDisconnected() {
309         return transport is null;
310     }
311 
312     ProtonConnection openHandler(Handler!ProtonConnection openHandler) {
313         //implementationMissing(false);
314         this._openHandler = openHandler;
315         return this;
316     }
317 
318     ProtonConnection closeHandler(AsyncResultHandler!ProtonConnection closeHandler) {
319         this._closeHandler = closeHandler;
320         return this;
321     }
322 
323     ProtonConnection disconnectHandler(AmqpEventHandler!ProtonConnection disconnectHandler) {
324         this._disconnectHandler = disconnectHandler;
325         return this;
326     }
327 
328     ProtonConnection sessionOpenHandler(AmqpEventHandler!ProtonSession remoteSessionOpenHandler) {
329         this._sessionOpenHandler = remoteSessionOpenHandler;
330         return this;
331     }
332 
333     ProtonConnection senderOpenHandler(Handler!ProtonSender remoteSenderOpenHandler) {
334         //implementationMissing(false);
335         this._senderOpenHandler = remoteSenderOpenHandler;
336         return this;
337     }
338 
339     ProtonConnection receiverOpenHandler(Handler!ProtonReceiver remoteReceiverOpenHandler) {
340         //implementationMissing(false);
341         this._receiverOpenHandler = remoteReceiverOpenHandler;
342         return this;
343     }
344 
345     /////////////////////////////////////////////////////////////////////////////
346     //
347     // Implementation details hidden from api.
348     //
349     /////////////////////////////////////////////////////////////////////////////
350 
351     private void processCapabilities() {
352         Symbol[] capabilities = getRemoteOfferedCapabilities();
353         anonymousRelaySupported = true;
354         if (capabilities.length != 0) {
355             List!Symbol list = new ArrayList!Symbol(capabilities);
356             if (list.contains(ANONYMOUS_RELAY)) {
357                 anonymousRelaySupported = true;
358             }
359         }
360     }
361 
362     void fireRemoteOpen() {
363         processCapabilities();
364 
365         if (_openHandler !is null) {
366             _openHandler.handle(this);
367         }
368     }
369 
370     void fireRemoteClose() {
371         version(HUNT_DEBUG) info("Remote closed");
372         if (_closeHandler !is null) {
373             _closeHandler(ProtonHelper.future!(ProtonConnection)(this, getRemoteCondition()));
374         }
375     }
376 
377     void fireDisconnect() {
378         version(HUNT_DEBUG) info("Disconnecting...");
379         transport = null;
380         if (_disconnectHandler !is null) {
381             _disconnectHandler(this);
382         }
383 
384         foreach(Handler!Void handler; endHandlers) {
385             handler.handle(null);
386         }
387 
388         endHandlers = null;
389     }
390 
391     void bindClient(NetClient client, ProtonSaslClientAuthenticatorImpl authenticator,
392             ProtonTransportOptions transportOptions) {
393         transport = new ProtonTransport(connection, client, _conn,
394                 authenticator, transportOptions);
395     }
396 
397     //void bindServer(NetSocket socket, ProtonSaslAuthenticator authenticator, ProtonTransportOptions transportOptions) {
398     //  transport = new ProtonTransport(connection, vertx, null, socket, authenticator, transportOptions);
399     //}
400 
401     void fireRemoteSessionOpen(hunt.proton.engine.Session.Session session) {
402         implementationMissing(false);
403         //if (sessionOpenHandler !is null) {
404         //  sessionOpenHandler.handle(new ProtonSessionImpl(session));
405         //}
406     }
407 
408     void fireRemoteLinkOpen(Link link) {
409         if (cast(Sender) link !is null) {
410             if (_senderOpenHandler !is null) {
411                 _senderOpenHandler.handle(new ProtonSenderImpl(cast(Sender) link));
412             }
413         } else {
414             if (_receiverOpenHandler !is null) {
415                 _receiverOpenHandler.handle(new ProtonReceiverImpl(cast(Receiver) link));
416             }
417         }
418     }
419 
420     void addEndHandler(Handler!Void handler) {
421         endHandlers ~= handler;
422     }
423 
424     hunt.net.Connection.Connection getContext() {
425         return _conn;
426     }
427 }