1 /* 2 * Copyright 2016, 2017 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 module hunt.amqp.impl.ProtonClientImpl; 17 18 import hunt.amqp.ProtonClient; 19 import hunt.amqp.ProtonClientOptions; 20 import hunt.amqp.ProtonConnection; 21 import hunt.amqp.ProtonTransportOptions; 22 import hunt.amqp.impl.ProtonConnectionImpl; 23 import hunt.amqp.Handler; 24 import hunt.io.ByteBuffer; 25 import hunt.net.NetClient; 26 import hunt.net.NetUtil; 27 import hunt.net; 28 import hunt.logging; 29 import hunt.amqp.impl.ProtonSaslClientAuthenticatorImpl; 30 import hunt.amqp.ProtonReceiver; 31 import hunt.amqp.ProtonMessageHandler; 32 import hunt.amqp.ProtonDelivery; 33 import hunt.proton.message.Message; 34 import hunt.proton.amqp.messaging.Section; 35 import hunt.proton.amqp.messaging.AmqpValue; 36 import hunt.String; 37 import std.stdio; 38 import hunt.amqp.ProtonSender; 39 import hunt.amqp.ProtonHelper; 40 import core.thread; 41 42 /** 43 * @author <a href="http://tfox.org">Tim Fox</a> 44 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 45 */ 46 47 class ConnectionEventBaseHandler : NetConnectionHandler { 48 alias ConnCallBack = void delegate(Connection connection); 49 alias MsgCallBack = void delegate(Connection connection, ByteBuffer message); 50 51 private { 52 ProtonClientOptions _options; 53 string _host; 54 string _username; 55 string _password; 56 NetClient _client; 57 MsgCallBack _msgCallBack; 58 ConnCallBack _closeCallBack; 59 Handler!ProtonConnection _handler; 60 } 61 62 this(ProtonClientOptions options, string host, NetClient client, string name, 63 string pwd, Handler!ProtonConnection handler) { 64 this._options = options; 65 this._host = host; 66 _client = client; 67 _username = name; 68 _password = pwd; 69 _handler = handler; 70 } 71 72 override void connectionOpened(Connection connection) { 73 string virtualHost = _options.getVirtualHost() !is null ? _options.getVirtualHost() : _host; 74 ProtonConnectionImpl conn = new ProtonConnectionImpl(virtualHost, connection); 75 76 ProtonSaslClientAuthenticatorImpl authenticator = new ProtonSaslClientAuthenticatorImpl(_username, 77 _password, _options.getEnabledSaslMechanisms(), _handler); 78 79 ProtonTransportOptions transportOptions = new ProtonTransportOptions(); 80 transportOptions.setHeartbeat(_options.getHeartbeat()); 81 transportOptions.setMaxFrameSize(_options.getMaxFrameSize()); 82 83 conn.bindClient(_client, authenticator, transportOptions); 84 85 // Need to flush here to get the SASL process going, or it will wait until calls on the connection are processed 86 // later (e.g open()). 87 version (HUNT_DEBUG) 88 logInfof("Connection %d opened", connection.getId()); 89 conn.flush(); 90 91 // Thread.sleep(1.seconds); 92 93 // conn.open(); 94 //string address = "queue://foo"; 95 // 96 //conn.createReceiver(address).handler( 97 // new class ProtonMessageHandler { 98 // void handle(ProtonDelivery delivery, Message message) 99 // { 100 // Section bd = message.getBody(); 101 // // if (body instanceof AmqpValue) { 102 // String content = (cast(AmqpValue) bd).getValue(); 103 // writefln("Received message with content: %s " , cast(string)(content.getBytes())); 104 // //} 105 // } 106 // } 107 //).open(); 108 // 109 // 110 //ProtonSender sender = conn.createSender(null); 111 //Message message = ProtonHelper.message(new String(address), new String("Hello World from client")); 112 //sender.open(); 113 //sender.send(message, new class Handler!ProtonDelivery 114 //{ 115 // void handle(ProtonDelivery var1) 116 // { 117 // writefln("The message was received by the server: remote state=" ); 118 // } 119 //}); 120 121 //private static void helloWorldSendAndConsumeExample(ProtonConnection connection) { 122 // connection.open(); 123 // 124 // // Receive messages from queue "foo" (using an ActiveMQ style address as example). 125 // String address = "queue://foo"; 126 // 127 // connection.createReceiver(address).handler((delivery, msg) -> { 128 // Section body = msg.getBody(); 129 // if (body instanceof AmqpValue) { 130 // String content = (String) ((AmqpValue) body).getValue(); 131 // System.out.println("Received message with content: " + content); 132 // } 133 // // By default, the receiver automatically accepts (and settles) the delivery 134 // // when the handler returns, if no other disposition has been applied. 135 // // To change this and always manage dispositions yourself, use the 136 // // setAutoAccept method on the receiver. 137 // }).open(); 138 // 139 // // Create an anonymous (no address) sender, have the message carry its destination 140 // ProtonSender sender = connection.createSender(null); 141 // 142 // // Create a message to send, have it carry its destination for use with the anonymous sender 143 // Message message = message(address, "Hello World from client"); 144 // 145 // // Can optionally add an openHandler or sendQueueDrainHandler 146 // // to await remote sender open completing or credit to send being 147 // // granted. But here we will just buffer the send immediately. 148 // sender.open(); 149 // System.out.println("Sending message to server"); 150 // sender.send(message, delivery -> { 151 // System.out.println(String.format("The message was received by the server: remote state=%s, remotely settled=%s", 152 // delivery.getRemoteState(), delivery.remotelySettled())); 153 // }); 154 //} 155 156 } 157 158 override void connectionClosed(Connection connection) { 159 if (_closeCallBack !is null) { 160 _closeCallBack(connection); 161 } 162 } 163 164 override void messageReceived(Connection connection, Object message) { //ByteBuffer { 165 ByteBuffer buf = cast(ByteBuffer) message; 166 if (_msgCallBack !is null) { 167 try { 168 _msgCallBack(connection, buf); 169 } catch(Throwable ex) { 170 warning(ex.msg); 171 version(HUNT_DEBUG) warning(ex); 172 } 173 } 174 } 175 176 override void exceptionCaught(Connection connection, Throwable t) { 177 } 178 179 override void failedOpeningConnection(int connectionId, Throwable t) { 180 } 181 182 override void failedAcceptingConnection(int connectionId, Throwable t) { 183 } 184 185 void setOnConnection(ConnCallBack callback) { 186 187 } 188 189 void setOnClosed(ConnCallBack callback) { 190 _closeCallBack = callback; 191 } 192 193 void setOnMessage(MsgCallBack callback) { 194 _msgCallBack = callback; 195 } 196 197 } 198 199 /** 200 * 201 */ 202 class ProtonClientImpl : ProtonClient { 203 204 this() { 205 } 206 207 public void connect(string host, int port, Handler!ProtonConnection handler) { 208 connect(host, port, null, null, handler); 209 } 210 211 public void connect(string host, int port, string username, string password, 212 Handler!ProtonConnection handler) { 213 connect(new ProtonClientOptions(), host, port, username, password, handler); 214 } 215 216 public void connect(ProtonClientOptions options, string host, int port, 217 Handler!ProtonConnection handler) { 218 connect(options, host, port, null, null, handler); 219 } 220 221 public void connect(ProtonClientOptions options, string host, int port, 222 string username, string password, Handler!ProtonConnection handler) { 223 NetClient netClient = NetUtil.createNetClient(options); 224 connectNetClient(netClient, host, port, username, password, options, handler); 225 } 226 227 private void connectNetClient(NetClient netClient, string host, int port, string username, 228 string password, ProtonClientOptions options, Handler!ProtonConnection handler) { 229 230 string serverName = options.getSniServerName() !is null ? options.getSniServerName() : ( 231 options.getVirtualHost() !is null ? options.getVirtualHost() : null); 232 233 netClient.setHandler(new ConnectionEventBaseHandler(options, host, 234 netClient, username, password, handler)); 235 netClient.connect(host, port, serverName); 236 237 //netClient.connect(port, host, serverName, res -> { 238 // if (res.succeeded()) { 239 // String virtualHost = options.getVirtualHost() !is null ? options.getVirtualHost() : host; 240 // ProtonConnectionImpl conn = new ProtonConnectionImpl(vertx, virtualHost, (ContextInternal) Vertx.currentContext()); 241 // conn.disconnectHandler(h -> { 242 // LOG.trace("Connection disconnected"); 243 // if(!connectHandler.isComplete()) { 244 // connectHandler.handle(Future.failedFuture(new VertxException("Disconnected"))); 245 // } 246 // }); 247 // 248 // ProtonSaslClientAuthenticatorImpl authenticator = new ProtonSaslClientAuthenticatorImpl(username, password, 249 // options.getEnabledSaslMechanisms(), connectHandler); 250 // 251 // ProtonTransportOptions transportOptions = new ProtonTransportOptions(); 252 // transportOptions.setHeartbeat(options.getHeartbeat()); 253 // transportOptions.setMaxFrameSize(options.getMaxFrameSize()); 254 // 255 // conn.bindClient(netClient, res.result(), authenticator, transportOptions); 256 // 257 // // Need to flush here to get the SASL process going, or it will wait until calls on the connection are processed 258 // // later (e.g open()). 259 // conn.flush(); 260 // } else { 261 // connectHandler.handle(Future.failedFuture(res.cause())); 262 // } 263 //}); 264 } 265 266 //static class ConnectCompletionHandler implements Handler<AsyncResult<ProtonConnection>> { 267 // private AtomicBoolean completed = new AtomicBoolean(); 268 // private Handler<AsyncResult<ProtonConnection>> applicationConnectHandler; 269 // private NetClient netClient; 270 // 271 // ConnectCompletionHandler(Handler<AsyncResult<ProtonConnection>> applicationConnectHandler, NetClient netClient) { 272 // this.applicationConnectHandler = Objects.requireNonNull(applicationConnectHandler); 273 // this.netClient = Objects.requireNonNull(netClient); 274 // } 275 // 276 // public boolean isComplete() { 277 // return completed.get(); 278 // } 279 // 280 // @Override 281 // public void handle(AsyncResult<ProtonConnection> event) { 282 // if (completed.compareAndSet(false, true)) { 283 // if (event.failed()) { 284 // netClient.close(); 285 // } 286 // applicationConnectHandler.handle(event); 287 // } 288 // } 289 //} 290 }