1 /* 2 * Copyright 2016, 2017 the original author or authors. 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 module hunt.amqp.impl.ProtonTransport; 17 18 import hunt.net.NetClient; 19 import hunt.amqp.ProtonReceiver; 20 import hunt.amqp.ProtonSender; 21 import hunt.net.buffer.ByteBuf; 22 import hunt.io.ByteBuffer; 23 // import hunt.io.Buffer; 24 import hunt.amqp.ProtonConnection; 25 import hunt.amqp.ProtonTransportOptions; 26 import hunt.amqp.sasl.ProtonSaslAuthenticator; 27 import hunt.proton.Proton; 28 import hunt.proton.engine.BaseHandler; 29 import hunt.proton.engine.Collector; 30 import hunt.proton.engine.Connection; 31 import hunt.proton.engine.EndpointState; 32 import hunt.proton.engine.Event; 33 import hunt.proton.engine.Transport; 34 import hunt.proton.engine.impl.TransportInternal; 35 import hunt.amqp.impl.ProtonClientImpl; 36 import hunt.net.Connection; 37 import hunt.net.buffer.WrappedByteBuf; 38 import hunt.amqp.impl.ProtonConnectionImpl; 39 import hunt.amqp.impl.ProtonSessionImpl; 40 import hunt.amqp.impl.ProtonLinkImpl; 41 import hunt.amqp.impl.ProtonDeliveryImpl; 42 import hunt.amqp.impl.ProtonReceiverImpl; 43 import hunt.net.buffer.Unpooled; 44 import hunt.Exceptions; 45 import hunt.logging; 46 import std.algorithm; 47 import hunt.amqp.Handler; 48 import std.stdio; 49 import hunt.time.LocalDateTime; 50 import hunt.util.DateTime; 51 // import hunt.io.BufferUtils; 52 import hunt.concurrency.ScheduledThreadPoolExecutor; 53 import hunt.concurrency.Executors; 54 import hunt.concurrency.ExecutorService; 55 import hunt.concurrency.Scheduler; 56 import hunt.concurrency.Delayed; 57 import hunt.util.Common; 58 import core.time; 59 import std.concurrency : initOnce; 60 import hunt.time.LocalDateTime; 61 import hunt.util.Runnable; 62 63 /** 64 * @author <a href="http://tfox.org">Tim Fox</a> 65 * @author <a href="http://hiramchirino.com">Hiram Chirino</a> 66 */ 67 68 alias NetSocket = hunt.net.Connection; 69 alias ConnCallBack = void delegate(NetSocket.Connection connection); 70 alias MsgCallBack = void delegate(NetSocket.Connection connection, ByteBuffer message); 71 72 /** 73 * 74 */ 75 struct CommonUtil { 76 77 static ScheduledThreadPoolExecutor scheduler() { 78 return initOnce!_scheduler( 79 cast(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5)); 80 } 81 82 private __gshared ScheduledThreadPoolExecutor _scheduler; 83 84 static void stopScheduler() { 85 if (_scheduler !is null) { 86 _scheduler.shutdown(); 87 } 88 } 89 } 90 91 /** 92 * 93 */ 94 class ProtonTransport : BaseHandler { 95 private static int DEFAULT_MAX_FRAME_SIZE = 32 * 1024; // 32kb 96 97 private hunt.proton.engine.Connection.Connection connection; 98 private NetClient netClient; 99 private hunt.net.Connection.Connection socket; 100 private Transport transport; //= Proton.transport(); 101 private Collector collector; // = Proton.collector(); 102 private ProtonSaslAuthenticator authenticator; 103 private ScheduledThreadPoolExecutor executor; 104 105 // private volatile Long idleTimeoutCheckTimerId; // TODO: cancel when closing etc? 106 107 private bool failed; 108 109 this(hunt.proton.engine.Connection.Connection connection, NetClient netClient, 110 hunt.net.Connection.Connection socket, 111 ProtonSaslAuthenticator authenticator, ProtonTransportOptions options) { 112 this.transport = Proton.transport(); 113 this.collector = Proton.collector(); 114 this.connection = connection; 115 this.netClient = netClient; 116 this.socket = socket; 117 int maxFrameSize = options.getMaxFrameSize() == 0 118 ? DEFAULT_MAX_FRAME_SIZE : options.getMaxFrameSize(); 119 transport.setMaxFrameSize(maxFrameSize); 120 transport.setOutboundFrameSizeLimit(maxFrameSize); 121 transport.setEmitFlowEventOnSend(false); // TODO: make configurable 122 transport.setIdleTimeout(2 * options.getHeartbeat()); 123 (cast(TransportInternal) transport).setUseReadOnlyOutputBuffer(false); 124 if (authenticator !is null) { 125 authenticator.init(this.socket, 126 cast(ProtonConnection)(this.connection.getContext()), transport); 127 } 128 this.authenticator = authenticator; 129 transport.bind(connection); 130 connection.collect(collector); 131 (cast(ConnectionEventBaseHandler)(socket.getHandler())).setOnClosed(&this.handleSocketEnd); 132 (cast(ConnectionEventBaseHandler)(socket.getHandler())).setOnMessage( 133 &this.handleSocketBuffer); 134 // socket.endHandler(this::handleSocketEnd); 135 //socket.handler(this::handleSocketBuffer); 136 } 137 138 private void handleSocketEnd(NetSocket.Connection arg) { 139 transport.unbind(); 140 transport.close(); 141 if (this.netClient !is null) { 142 this.netClient.close(); 143 } else { 144 this.socket.close(); 145 } 146 (cast(ProtonConnectionImpl) this.connection.getContext()).fireDisconnect(); 147 } 148 149 private void handleSocketBuffer(hunt.net.Connection.Connection connection, ByteBuffer buff) { 150 pumpInbound(buff); 151 152 Event protonEvent = null; 153 154 enum CONNECTION_REMOTE_OPEN = AmqpEventType.CONNECTION_REMOTE_OPEN.ordinal; 155 enum CONNECTION_REMOTE_CLOSE = AmqpEventType.CONNECTION_REMOTE_CLOSE.ordinal; 156 enum SESSION_REMOTE_OPEN = AmqpEventType.SESSION_REMOTE_OPEN.ordinal; 157 enum SESSION_REMOTE_CLOSE = AmqpEventType.SESSION_REMOTE_CLOSE.ordinal; 158 enum LINK_REMOTE_OPEN = AmqpEventType.LINK_REMOTE_OPEN.ordinal; 159 enum LINK_REMOTE_DETACH = AmqpEventType.LINK_REMOTE_DETACH.ordinal; 160 enum LINK_REMOTE_CLOSE = AmqpEventType.LINK_REMOTE_CLOSE.ordinal; 161 enum LINK_FLOW = AmqpEventType.LINK_FLOW.ordinal; 162 enum DELIVERY = AmqpEventType.DELIVERY.ordinal; 163 enum TRANSPORT_ERROR = AmqpEventType.TRANSPORT_ERROR.ordinal; 164 enum CONNECTION_INIT = AmqpEventType.CONNECTION_INIT.ordinal; 165 enum CONNECTION_BOUND = AmqpEventType.CONNECTION_BOUND.ordinal; 166 enum CONNECTION_UNBOUND = AmqpEventType.CONNECTION_UNBOUND.ordinal; 167 enum CONNECTION_LOCAL_OPEN = AmqpEventType.CONNECTION_LOCAL_OPEN.ordinal; 168 enum CONNECTION_LOCAL_CLOSE = AmqpEventType.CONNECTION_LOCAL_CLOSE.ordinal; 169 enum CONNECTION_FINAL = AmqpEventType.CONNECTION_FINAL.ordinal; 170 171 enum SESSION_INIT = AmqpEventType.SESSION_INIT.ordinal; 172 enum SESSION_LOCAL_OPEN = AmqpEventType.SESSION_LOCAL_OPEN.ordinal; 173 enum SESSION_LOCAL_CLOSE = AmqpEventType.SESSION_LOCAL_CLOSE.ordinal; 174 enum SESSION_FINAL = AmqpEventType.SESSION_FINAL.ordinal; 175 176 enum LINK_INIT = AmqpEventType.LINK_INIT.ordinal; 177 enum LINK_LOCAL_OPEN = AmqpEventType.LINK_LOCAL_OPEN.ordinal; 178 enum LINK_LOCAL_DETACH = AmqpEventType.LINK_LOCAL_DETACH.ordinal; 179 enum LINK_LOCAL_CLOSE = AmqpEventType.LINK_LOCAL_CLOSE.ordinal; 180 enum LINK_FINAL = AmqpEventType.LINK_FINAL.ordinal; 181 182 while ((protonEvent = collector.peek()) !is null) { 183 ProtonConnectionImpl conn = cast(ProtonConnectionImpl) protonEvent.getConnection() 184 .getContext(); 185 186 Type eventType = protonEvent.getType(); 187 int type = eventType.ordinal; 188 189 version (HUNT_AMQP_DEBUG) { 190 if (eventType != (Type.TRANSPORT)) { 191 warningf("New Proton Event: %s, ordinal: %d", eventType.toString(), type); 192 } 193 } 194 // warningf("New Proton Event: %s, ordinal: %d", eventType.toString(), type); 195 196 switch (type) { 197 case CONNECTION_REMOTE_OPEN: { 198 conn.fireRemoteOpen(); 199 initiateIdleTimeoutChecks(); 200 break; 201 } 202 203 case CONNECTION_REMOTE_CLOSE: { 204 conn.fireRemoteClose(); 205 break; 206 } 207 208 case SESSION_REMOTE_OPEN: { 209 ProtonSessionImpl session = cast(ProtonSessionImpl) protonEvent.getSession() 210 .getContext(); 211 if (session is null) { 212 conn.fireRemoteSessionOpen(protonEvent.getSession()); 213 } else { 214 session.fireRemoteOpen(); 215 } 216 break; 217 } 218 case SESSION_REMOTE_CLOSE: { 219 ProtonSessionImpl session = cast(ProtonSessionImpl) protonEvent.getSession() 220 .getContext(); 221 session.fireRemoteClose(); 222 break; 223 } 224 case LINK_REMOTE_OPEN: { 225 ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink() 226 .getContext(); 227 if (link !is null) { 228 link.fireRemoteOpen(); 229 break; 230 } 231 232 ProtonLinkImpl!ProtonSender lins = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink() 233 .getContext(); 234 if (lins !is null) { 235 lins.fireRemoteOpen(); 236 break; 237 } 238 239 conn.fireRemoteLinkOpen(protonEvent.getLink()); 240 //if (link is null) { 241 // conn.fireRemoteLinkOpen(protonEvent.getLink()); 242 //} else { 243 // link.fireRemoteOpen(); 244 //} 245 break; 246 } 247 case LINK_REMOTE_DETACH: { 248 ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink() 249 .getContext(); 250 if (link !is null) { 251 link.fireRemoteDetach(); 252 break; 253 } else { 254 ProtonLinkImpl!ProtonSender lk = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink() 255 .getContext(); 256 lk.fireRemoteDetach(); 257 break; 258 } 259 } 260 261 case LINK_REMOTE_CLOSE: { 262 ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink() 263 .getContext(); 264 if (link !is null) { 265 link.fireRemoteClose(); 266 break; 267 } else { 268 ProtonLinkImpl!ProtonSender lk = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink() 269 .getContext(); 270 lk.fireRemoteClose(); 271 break; 272 } 273 // link.fireRemoteClose(); 274 } 275 case LINK_FLOW: { 276 ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink() 277 .getContext(); 278 if (link !is null) { 279 link.handleLinkFlow(); 280 break; 281 } else { 282 ProtonLinkImpl!ProtonSender lk = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink() 283 .getContext(); 284 lk.handleLinkFlow(); 285 break; 286 } 287 // ProtonLinkImpl<?> link = (ProtonLinkImpl<?>) protonEvent.getLink().getContext(); 288 //link.handleLinkFlow(); 289 } 290 case DELIVERY: { 291 ProtonDeliveryImpl delivery = cast(ProtonDeliveryImpl) protonEvent.getDelivery() 292 .getContext(); 293 if (delivery !is null) { 294 delivery.fireUpdate(); 295 } else { 296 ProtonReceiverImpl receiver = cast(ProtonReceiverImpl) protonEvent.getLink() 297 .getContext(); 298 receiver.onDelivery(); 299 } 300 break; 301 } 302 case TRANSPORT_ERROR: { 303 failed = true; 304 break; 305 } 306 307 case CONNECTION_INIT: 308 break; 309 case CONNECTION_BOUND: 310 break; 311 case CONNECTION_UNBOUND: 312 break; 313 case CONNECTION_LOCAL_OPEN: 314 break; 315 case CONNECTION_LOCAL_CLOSE: 316 break; 317 case CONNECTION_FINAL: 318 break; 319 case SESSION_INIT: 320 break; 321 case SESSION_LOCAL_OPEN: 322 break; 323 case SESSION_LOCAL_CLOSE: 324 break; 325 case SESSION_FINAL: 326 break; 327 case LINK_INIT: 328 break; 329 case LINK_LOCAL_OPEN: 330 break; 331 case LINK_LOCAL_DETACH: 332 break; 333 case LINK_LOCAL_CLOSE: 334 break; 335 case LINK_FINAL: 336 break; 337 default: 338 break; 339 } 340 341 collector.pop(); 342 } 343 344 if (!failed) { 345 processSaslAuthentication(); 346 } 347 348 flush(); 349 350 if (failed) { 351 disconnect(); 352 } 353 } 354 355 private void processSaslAuthentication() { 356 if (authenticator is null) { 357 return; 358 } 359 360 // socket.pause(); 361 // dfmt off 362 authenticator.process(new class Handler!bool { 363 void handle(bool var1) 364 { 365 if (var1) 366 { 367 authenticator = null; 368 } 369 } 370 }); 371 // dfmt on 372 373 // authenticator.process(complete -> { 374 // if(complete) { 375 // authenticator = null; 376 // } 377 // 378 // socket.resume(); 379 // }); 380 } 381 382 private void initiateIdleTimeoutChecks() { 383 executor = CommonUtil.scheduler(); 384 executor.setRemoveOnCancelPolicy(true); 385 386 // dfmt off 387 ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable { 388 void run() { 389 bool checkScheduled = false; 390 version(HUNT_AMQP_DEBUG) logInfo("beating ..."); 391 if (connection.getLocalState() == EndpointState.ACTIVE) { 392 // Using nano time since it is not related to the wall clock, which may change 393 long now = LocalDateTime.now().toEpochMilli(); 394 long deadline = transport.tick(now); 395 396 flush(); 397 398 399 if (transport.isClosed()) { 400 logError("IdleTimeoutCheck closed the transport due to the peer exceeding our requested idle-timeout."); 401 disconnect(); 402 } else { 403 checkScheduled = true; 404 //if (deadline != 0) { 405 // // timer treats 0 as error, ensure value is at least 1 as there was a deadline 406 // long delay = Math.max(deadline - now, 1); 407 // checkScheduled = true; 408 // if (LOG.isTraceEnabled()) { 409 // LOG.trace("IdleTimeoutCheck rescheduling with delay: " + delay); 410 // } 411 // idleTimeoutCheckTimerId = vertx.setTimer(delay, this); 412 //} 413 } 414 } else { 415 version(HUNT_DEBUG) logInfo("IdleTimeoutCheck skipping check, connection is not active."); 416 } 417 } 418 }, 419 msecs(20000), 420 msecs(20000)); 421 422 // dfmt on 423 424 //implementationMissing(false); 425 // Using nano time since it is not related to the wall clock, which may change 426 // long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 427 // long now = DateTime.currentTimeNsecs(); 428 // long deadline = transport.tick(now); 429 // if (deadline != 0) 430 // { 431 // logError("!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); 432 // } 433 //if (deadline != 0) { 434 // // timer treats 0 as error, ensure value is at least 1 as there was a deadline 435 // long delay = Math.max(deadline - now, 1); 436 // if (LOG.isTraceEnabled()) { 437 // LOG.trace("IdleTimeoutCheck being initiated, initial delay: " + delay); 438 // } 439 // idleTimeoutCheckTimerId = vertx.setTimer(delay, new IdleTimeoutCheck()); 440 //} 441 } 442 443 private void pumpInbound(ByteBuffer buffer) { 444 if (failed) { 445 logError("Skipping processing of data following transport error"); 446 return; 447 } 448 449 //ByteBuf data = buffer.getByteBuf(); 450 //do { 451 // ByteBuffer transportBuffer = transport.tail(); 452 // 453 // int amount = Math.min(transportBuffer.remaining(), data.readableBytes()); 454 // transportBuffer.limit(transportBuffer.position() + amount); 455 // data.readBytes(transportBuffer); 456 // 457 // transport.process(); 458 //} while (data.isReadable()); 459 460 // Lets push bytes from vert.x to proton engine. 461 try { 462 // ByteBuf data = buffer.getByteBuf(); 463 // WrappedByteBuf data = new WrappedByteBuf; 464 // data.readBytes(buffer.getRemaining()); 465 466 ByteBuf data = Unpooled.wrappedBuffer(buffer); 467 do { 468 ByteBuffer transportBuffer = transport.tail(); 469 // writeln("%s",transportBuffer.array); 470 int amount = min(transportBuffer.remaining(), data.readableBytes()); 471 transportBuffer.limit(transportBuffer.position() + amount); 472 // byte[] tmb = new byte[transportBuffer.position() + amount]; 473 version (HUNT_AMQP_DEBUG) { 474 tracef("recv(%d bytes): [%(%02X %)]", 475 data.getReadableBytes.length, data.getReadableBytes); 476 } 477 data.readBytes(transportBuffer); 478 479 // transportBuffer = BufferUtils.toBuffer(tmb); 480 // logError("recevbef : %s",transportBuffer.toString()); 481 // transportBuffer.put(tmb); 482 483 // logError("recevafter : %s",transportBuffer.toString()); 484 485 //transportBuffer.flip(); 486 // logError("recevafter flip : %s",transportBuffer.getRemaining()); 487 transport.process(); 488 } 489 while (data.isReadable()); 490 } catch (Exception te) { 491 failed = true; 492 logError("Exception while processing transport input"); 493 //LOG.trace("Exception while processing transport input", te); 494 } 495 } 496 497 void flush() { 498 synchronized (this) { 499 bool done = false; 500 while (!done) { 501 ByteBuffer outputBuffer = transport.getOutputBuffer(); 502 if (outputBuffer !is null && outputBuffer.hasRemaining()) { 503 //NetSocketInternal internal = (NetSocketInternal) socket; 504 //ByteBuf bb = internal.channelHandlerContext().alloc().directBuffer(outputBuffer.remaining()); 505 // bb.writeBytes(outputBuffer); 506 //logError("send : %s --- %d" , outputBuffer.array(),outputBuffer.array().length); 507 version (HUNT_AMQP_DEBUG) { 508 logInfof("send(%d bytes): [%(%02X %)]", 509 outputBuffer.getRemaining.length, outputBuffer.getRemaining()); 510 } 511 socket.write(outputBuffer); 512 // internal.writeMessage(bb); 513 transport.outputConsumed(); 514 } else { 515 done = true; 516 } 517 } 518 } 519 } 520 521 public void disconnect() { 522 if (netClient !is null) { 523 netClient.close(); 524 } else { 525 socket.close(); 526 } 527 } 528 529 //private class IdleTimeoutCheck implements Handler<Long> { 530 // @Override 531 // public void handle(Long event) { 532 // boolean checkScheduled = false; 533 // 534 // if (connection.getLocalState() == EndpointState.ACTIVE) { 535 // // Using nano time since it is not related to the wall clock, which may change 536 // long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); 537 // long deadline = transport.tick(now); 538 // 539 // flush(); 540 // 541 // if (transport.isClosed()) { 542 // LOG.info("IdleTimeoutCheck closed the transport due to the peer exceeding our requested idle-timeout."); 543 // disconnect(); 544 // } else { 545 // if (deadline != 0) { 546 // // timer treats 0 as error, ensure value is at least 1 as there was a deadline 547 // long delay = Math.max(deadline - now, 1); 548 // checkScheduled = true; 549 // if (LOG.isTraceEnabled()) { 550 // LOG.trace("IdleTimeoutCheck rescheduling with delay: " + delay); 551 // } 552 // idleTimeoutCheckTimerId = vertx.setTimer(delay, this); 553 // } 554 // } 555 // } else { 556 // LOG.trace("IdleTimeoutCheck skipping check, connection is not active."); 557 // } 558 // 559 // if (!checkScheduled) { 560 // idleTimeoutCheckTimerId = null; 561 // LOG.trace("IdleTimeoutCheck exiting"); 562 // } 563 // } 564 //} 565 }