1 /*
2 * Copyright 2016, 2017 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 module hunt.amqp.impl.ProtonClientImpl;
17 
18 import hunt.amqp.ProtonClient;
19 import hunt.amqp.ProtonClientOptions;
20 import hunt.amqp.ProtonConnection;
21 import hunt.amqp.ProtonTransportOptions;
22 import hunt.amqp.impl.ProtonConnectionImpl;
23 import hunt.amqp.Handler;
24 import hunt.io.ByteBuffer;
25 import hunt.net.NetClient;
26 import hunt.net.NetUtil;
27 import hunt.net;
28 import hunt.logging;
29 import hunt.amqp.impl.ProtonSaslClientAuthenticatorImpl;
30 import hunt.amqp.ProtonReceiver;
31 import hunt.amqp.ProtonMessageHandler;
32 import hunt.amqp.ProtonDelivery;
33 import hunt.proton.message.Message;
34 import hunt.proton.amqp.messaging.Section;
35 import hunt.proton.amqp.messaging.AmqpValue;
36 import hunt.String;
37 import std.stdio;
38 import hunt.amqp.ProtonSender;
39 import hunt.amqp.ProtonHelper;
40 import core.thread;
41 
42 /**
43  * @author <a href="http://tfox.org">Tim Fox</a>
44  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
45  */
46 
47 class ConnectionEventBaseHandler : NetConnectionHandler {
48     alias ConnCallBack = void delegate(Connection connection);
49     alias MsgCallBack = void delegate(Connection connection, ByteBuffer message);
50 
51     private {
52         ProtonClientOptions _options;
53         string _host;
54         string _username;
55         string _password;
56         NetClient _client;
57         MsgCallBack _msgCallBack;
58         ConnCallBack _closeCallBack;
59         Handler!ProtonConnection _handler;
60     }
61 
62     this(ProtonClientOptions options, string host, NetClient client, string name,
63             string pwd, Handler!ProtonConnection handler) {
64         this._options = options;
65         this._host = host;
66         _client = client;
67         _username = name;
68         _password = pwd;
69         _handler = handler;
70     }
71 
72     override void connectionOpened(Connection connection) {
73         string virtualHost = _options.getVirtualHost() !is null ? _options.getVirtualHost() : _host;
74         ProtonConnectionImpl conn = new ProtonConnectionImpl(virtualHost, connection);
75 
76         ProtonSaslClientAuthenticatorImpl authenticator = new ProtonSaslClientAuthenticatorImpl(_username,
77                 _password, _options.getEnabledSaslMechanisms(), _handler);
78 
79         ProtonTransportOptions transportOptions = new ProtonTransportOptions();
80         transportOptions.setHeartbeat(_options.getHeartbeat());
81         transportOptions.setMaxFrameSize(_options.getMaxFrameSize());
82 
83         conn.bindClient(_client, authenticator, transportOptions);
84 
85         // Need to flush here to get the SASL process going, or it will wait until calls on the connection are processed
86         // later (e.g open()).
87         version (HUNT_DEBUG)
88             logInfof("Connection %d opened", connection.getId());
89         conn.flush();
90 
91         // Thread.sleep(1.seconds);
92 
93         // conn.open();
94         //string address = "queue://foo";
95         //
96         //conn.createReceiver(address).handler(
97         //    new class ProtonMessageHandler {
98         //        void handle(ProtonDelivery delivery, Message message)
99         //        {
100         //              Section bd = message.getBody();
101         //   // if (body instanceof AmqpValue) {
102         //              String content = (cast(AmqpValue) bd).getValue();
103         //              writefln("Received message with content: %s " , cast(string)(content.getBytes()));
104         //    //}
105         //        }
106         //    }
107         //).open();
108         //
109         //
110         //ProtonSender sender = conn.createSender(null);
111         //Message message = ProtonHelper.message(new String(address), new String("Hello World from client"));
112         //sender.open();
113         //sender.send(message, new class Handler!ProtonDelivery
114         //{
115         //    void handle(ProtonDelivery var1)
116         //    {
117         //        writefln("The message was received by the server: remote state=" );
118         //    }
119         //});
120 
121         //private static void helloWorldSendAndConsumeExample(ProtonConnection connection) {
122         //  connection.open();
123         //
124         //  // Receive messages from queue "foo" (using an ActiveMQ style address as example).
125         //  String address = "queue://foo";
126         //
127         //  connection.createReceiver(address).handler((delivery, msg) -> {
128         //    Section body = msg.getBody();
129         //    if (body instanceof AmqpValue) {
130         //      String content = (String) ((AmqpValue) body).getValue();
131         //      System.out.println("Received message with content: " + content);
132         //    }
133         //    // By default, the receiver automatically accepts (and settles) the delivery
134         //    // when the handler returns, if no other disposition has been applied.
135         //    // To change this and always manage dispositions yourself, use the
136         //    // setAutoAccept method on the receiver.
137         //  }).open();
138         //
139         //  // Create an anonymous (no address) sender, have the message carry its destination
140         //  ProtonSender sender = connection.createSender(null);
141         //
142         //  // Create a message to send, have it carry its destination for use with the anonymous sender
143         //  Message message = message(address, "Hello World from client");
144         //
145         //  // Can optionally add an openHandler or sendQueueDrainHandler
146         //  // to await remote sender open completing or credit to send being
147         //  // granted. But here we will just buffer the send immediately.
148         //  sender.open();
149         //  System.out.println("Sending message to server");
150         //  sender.send(message, delivery -> {
151         //	System.out.println(String.format("The message was received by the server: remote state=%s, remotely settled=%s",
152         //	delivery.getRemoteState(), delivery.remotelySettled()));
153         //  });
154         //}
155 
156     }
157 
158     override void connectionClosed(Connection connection) {
159         if (_closeCallBack !is null) {
160             _closeCallBack(connection);
161         }
162     }
163 
164     override void messageReceived(Connection connection, Object message) { //ByteBuffer {
165         ByteBuffer buf = cast(ByteBuffer) message;
166         if (_msgCallBack !is null) {
167             try {
168                 _msgCallBack(connection, buf);
169             } catch(Throwable ex) {
170                 warning(ex.msg);
171                 version(HUNT_DEBUG) warning(ex);
172             }
173         }
174     }
175 
176     override void exceptionCaught(Connection connection, Throwable t) {
177     }
178 
179     override void failedOpeningConnection(int connectionId, Throwable t) {
180     }
181 
182     override void failedAcceptingConnection(int connectionId, Throwable t) {
183     }
184 
185     void setOnConnection(ConnCallBack callback) {
186 
187     }
188 
189     void setOnClosed(ConnCallBack callback) {
190         _closeCallBack = callback;
191     }
192 
193     void setOnMessage(MsgCallBack callback) {
194         _msgCallBack = callback;
195     }
196 
197 }
198 
199 /**
200  * 
201  */
202 class ProtonClientImpl : ProtonClient {
203 
204     this() {
205     }
206 
207     public void connect(string host, int port, Handler!ProtonConnection handler) {
208         connect(host, port, null, null, handler);
209     }
210 
211     public void connect(string host, int port, string username, string password,
212             Handler!ProtonConnection handler) {
213         connect(new ProtonClientOptions(), host, port, username, password, handler);
214     }
215 
216     public void connect(ProtonClientOptions options, string host, int port,
217             Handler!ProtonConnection handler) {
218         connect(options, host, port, null, null, handler);
219     }
220 
221     public void connect(ProtonClientOptions options, string host, int port,
222             string username, string password, Handler!ProtonConnection handler) {
223         NetClient netClient = NetUtil.createNetClient(options);
224         connectNetClient(netClient, host, port, username, password, options, handler);
225     }
226 
227     private void connectNetClient(NetClient netClient, string host, int port, string username,
228             string password, ProtonClientOptions options, Handler!ProtonConnection handler) {
229 
230         string serverName = options.getSniServerName() !is null ? options.getSniServerName() : (
231                 options.getVirtualHost() !is null ? options.getVirtualHost() : null);
232 
233         netClient.setHandler(new ConnectionEventBaseHandler(options, host,
234                 netClient, username, password, handler));
235         netClient.connect(host, port, serverName);
236 
237         //netClient.connect(port, host, serverName, res -> {
238         //  if (res.succeeded()) {
239         //    String virtualHost = options.getVirtualHost() !is null ? options.getVirtualHost() : host;
240         //    ProtonConnectionImpl conn = new ProtonConnectionImpl(vertx, virtualHost, (ContextInternal) Vertx.currentContext());
241         //    conn.disconnectHandler(h -> {
242         //      LOG.trace("Connection disconnected");
243         //      if(!connectHandler.isComplete()) {
244         //        connectHandler.handle(Future.failedFuture(new VertxException("Disconnected")));
245         //      }
246         //    });
247         //
248         //    ProtonSaslClientAuthenticatorImpl authenticator = new ProtonSaslClientAuthenticatorImpl(username, password,
249         //            options.getEnabledSaslMechanisms(), connectHandler);
250         //
251         //    ProtonTransportOptions transportOptions = new ProtonTransportOptions();
252         //    transportOptions.setHeartbeat(options.getHeartbeat());
253         //    transportOptions.setMaxFrameSize(options.getMaxFrameSize());
254         //
255         //    conn.bindClient(netClient, res.result(), authenticator, transportOptions);
256         //
257         //    // Need to flush here to get the SASL process going, or it will wait until calls on the connection are processed
258         //    // later (e.g open()).
259         //    conn.flush();
260         //  } else {
261         //    connectHandler.handle(Future.failedFuture(res.cause()));
262         //  }
263         //});
264     }
265 
266     //static class ConnectCompletionHandler implements Handler<AsyncResult<ProtonConnection>> {
267     //  private AtomicBoolean completed = new AtomicBoolean();
268     //  private Handler<AsyncResult<ProtonConnection>> applicationConnectHandler;
269     //  private NetClient netClient;
270     //
271     //  ConnectCompletionHandler(Handler<AsyncResult<ProtonConnection>> applicationConnectHandler, NetClient netClient) {
272     //    this.applicationConnectHandler = Objects.requireNonNull(applicationConnectHandler);
273     //    this.netClient = Objects.requireNonNull(netClient);
274     //  }
275     //
276     //  public boolean isComplete() {
277     //    return completed.get();
278     //  }
279     //
280     //  @Override
281     //  public void handle(AsyncResult<ProtonConnection> event) {
282     //    if (completed.compareAndSet(false, true)) {
283     //      if (event.failed()) {
284     //        netClient.close();
285     //      }
286     //      applicationConnectHandler.handle(event);
287     //    }
288     //  }
289     //}
290 }