1 /*
2 * Copyright 2016, 2017 the original author or authors.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 module hunt.amqp.impl.ProtonTransport;
17 
18 import hunt.net.NetClient;
19 import hunt.amqp.ProtonReceiver;
20 import hunt.amqp.ProtonSender;
21 import hunt.net.buffer.ByteBuf;
22 import hunt.io.ByteBuffer;
23 // import hunt.io.Buffer;
24 import hunt.amqp.ProtonConnection;
25 import hunt.amqp.ProtonTransportOptions;
26 import hunt.amqp.sasl.ProtonSaslAuthenticator;
27 import hunt.proton.Proton;
28 import hunt.proton.engine.BaseHandler;
29 import hunt.proton.engine.Collector;
30 import hunt.proton.engine.Connection;
31 import hunt.proton.engine.EndpointState;
32 import hunt.proton.engine.Event;
33 import hunt.proton.engine.Transport;
34 import hunt.proton.engine.impl.TransportInternal;
35 import hunt.amqp.impl.ProtonClientImpl;
36 import hunt.net.Connection;
37 import hunt.net.buffer.WrappedByteBuf;
38 import hunt.amqp.impl.ProtonConnectionImpl;
39 import hunt.amqp.impl.ProtonSessionImpl;
40 import hunt.amqp.impl.ProtonLinkImpl;
41 import hunt.amqp.impl.ProtonDeliveryImpl;
42 import hunt.amqp.impl.ProtonReceiverImpl;
43 import hunt.net.buffer.Unpooled;
44 import hunt.Exceptions;
45 import hunt.logging;
46 import std.algorithm;
47 import hunt.amqp.Handler;
48 import std.stdio;
49 import hunt.time.LocalDateTime;
50 import hunt.util.DateTime;
51 // import hunt.io.BufferUtils;
52 import hunt.concurrency.ScheduledThreadPoolExecutor;
53 import hunt.concurrency.Executors;
54 import hunt.concurrency.ExecutorService;
55 import hunt.concurrency.Scheduler;
56 import hunt.concurrency.Delayed;
57 import hunt.util.Common;
58 import core.time;
59 import std.concurrency : initOnce;
60 import hunt.time.LocalDateTime;
61 import hunt.util.Runnable;
62 
63 /**
64  * @author <a href="http://tfox.org">Tim Fox</a>
65  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
66  */
67 
68 alias NetSocket = hunt.net.Connection;
69 alias ConnCallBack = void delegate(NetSocket.Connection connection);
70 alias MsgCallBack = void delegate(NetSocket.Connection connection, ByteBuffer message);
71 
72 /**
73  *
74  */
75 struct CommonUtil {
76 
77     static ScheduledThreadPoolExecutor scheduler() {
78         return initOnce!_scheduler(
79                 cast(ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(5));
80     }
81 
82     private __gshared ScheduledThreadPoolExecutor _scheduler;
83 
84     static void stopScheduler() {
85         if (_scheduler !is null) {
86             _scheduler.shutdown();
87         }
88     }
89 }
90 
91 /**
92  * 
93  */
94 class ProtonTransport : BaseHandler {
95     private static int DEFAULT_MAX_FRAME_SIZE = 32 * 1024; // 32kb
96 
97     private hunt.proton.engine.Connection.Connection connection;
98     private NetClient netClient;
99     private hunt.net.Connection.Connection socket;
100     private Transport transport; //= Proton.transport();
101     private Collector collector; // = Proton.collector();
102     private ProtonSaslAuthenticator authenticator;
103     private ScheduledThreadPoolExecutor executor;
104 
105     // private volatile Long idleTimeoutCheckTimerId; // TODO: cancel when closing etc?
106 
107     private bool failed;
108 
109     this(hunt.proton.engine.Connection.Connection connection, NetClient netClient,
110             hunt.net.Connection.Connection socket,
111             ProtonSaslAuthenticator authenticator, ProtonTransportOptions options) {
112         this.transport = Proton.transport();
113         this.collector = Proton.collector();
114         this.connection = connection;
115         this.netClient = netClient;
116         this.socket = socket;
117         int maxFrameSize = options.getMaxFrameSize() == 0
118             ? DEFAULT_MAX_FRAME_SIZE : options.getMaxFrameSize();
119         transport.setMaxFrameSize(maxFrameSize);
120         transport.setOutboundFrameSizeLimit(maxFrameSize);
121         transport.setEmitFlowEventOnSend(false); // TODO: make configurable
122         transport.setIdleTimeout(2 * options.getHeartbeat());
123         (cast(TransportInternal) transport).setUseReadOnlyOutputBuffer(false);
124         if (authenticator !is null) {
125             authenticator.init(this.socket,
126                     cast(ProtonConnection)(this.connection.getContext()), transport);
127         }
128         this.authenticator = authenticator;
129         transport.bind(connection);
130         connection.collect(collector);
131         (cast(ConnectionEventBaseHandler)(socket.getHandler())).setOnClosed(&this.handleSocketEnd);
132         (cast(ConnectionEventBaseHandler)(socket.getHandler())).setOnMessage(
133                 &this.handleSocketBuffer);
134         // socket.endHandler(this::handleSocketEnd);
135         //socket.handler(this::handleSocketBuffer);
136     }
137 
138     private void handleSocketEnd(NetSocket.Connection arg) {
139         transport.unbind();
140         transport.close();
141         if (this.netClient !is null) {
142             this.netClient.close();
143         } else {
144             this.socket.close();
145         }
146         (cast(ProtonConnectionImpl) this.connection.getContext()).fireDisconnect();
147     }
148 
149     private void handleSocketBuffer(hunt.net.Connection.Connection connection, ByteBuffer buff) {
150         pumpInbound(buff);
151 
152         Event protonEvent = null;
153 
154         enum CONNECTION_REMOTE_OPEN = AmqpEventType.CONNECTION_REMOTE_OPEN.ordinal;
155         enum CONNECTION_REMOTE_CLOSE = AmqpEventType.CONNECTION_REMOTE_CLOSE.ordinal;
156         enum SESSION_REMOTE_OPEN = AmqpEventType.SESSION_REMOTE_OPEN.ordinal;
157         enum SESSION_REMOTE_CLOSE = AmqpEventType.SESSION_REMOTE_CLOSE.ordinal;
158         enum LINK_REMOTE_OPEN = AmqpEventType.LINK_REMOTE_OPEN.ordinal;
159         enum LINK_REMOTE_DETACH = AmqpEventType.LINK_REMOTE_DETACH.ordinal;
160         enum LINK_REMOTE_CLOSE = AmqpEventType.LINK_REMOTE_CLOSE.ordinal;
161         enum LINK_FLOW = AmqpEventType.LINK_FLOW.ordinal;
162         enum DELIVERY = AmqpEventType.DELIVERY.ordinal;
163         enum TRANSPORT_ERROR = AmqpEventType.TRANSPORT_ERROR.ordinal;
164         enum CONNECTION_INIT = AmqpEventType.CONNECTION_INIT.ordinal;
165         enum CONNECTION_BOUND = AmqpEventType.CONNECTION_BOUND.ordinal;
166         enum CONNECTION_UNBOUND = AmqpEventType.CONNECTION_UNBOUND.ordinal;
167         enum CONNECTION_LOCAL_OPEN = AmqpEventType.CONNECTION_LOCAL_OPEN.ordinal;
168         enum CONNECTION_LOCAL_CLOSE = AmqpEventType.CONNECTION_LOCAL_CLOSE.ordinal;
169         enum CONNECTION_FINAL = AmqpEventType.CONNECTION_FINAL.ordinal;
170 
171         enum SESSION_INIT = AmqpEventType.SESSION_INIT.ordinal;
172         enum SESSION_LOCAL_OPEN = AmqpEventType.SESSION_LOCAL_OPEN.ordinal;
173         enum SESSION_LOCAL_CLOSE = AmqpEventType.SESSION_LOCAL_CLOSE.ordinal;
174         enum SESSION_FINAL = AmqpEventType.SESSION_FINAL.ordinal;
175 
176         enum LINK_INIT = AmqpEventType.LINK_INIT.ordinal;
177         enum LINK_LOCAL_OPEN = AmqpEventType.LINK_LOCAL_OPEN.ordinal;
178         enum LINK_LOCAL_DETACH = AmqpEventType.LINK_LOCAL_DETACH.ordinal;
179         enum LINK_LOCAL_CLOSE = AmqpEventType.LINK_LOCAL_CLOSE.ordinal;
180         enum LINK_FINAL = AmqpEventType.LINK_FINAL.ordinal;
181 
182         while ((protonEvent = collector.peek()) !is null) {
183             ProtonConnectionImpl conn = cast(ProtonConnectionImpl) protonEvent.getConnection()
184                 .getContext();
185 
186             Type eventType = protonEvent.getType();
187             int type = eventType.ordinal;
188 
189             version (HUNT_AMQP_DEBUG) {
190                 if (eventType != (Type.TRANSPORT)) {
191                     warningf("New Proton Event: %s, ordinal: %d", eventType.toString(), type);
192                 }
193             }
194             // warningf("New Proton Event: %s, ordinal: %d", eventType.toString(), type);
195 
196             switch (type) {
197             case CONNECTION_REMOTE_OPEN: {
198                     conn.fireRemoteOpen();
199                     initiateIdleTimeoutChecks();
200                     break;
201                 }
202 
203             case CONNECTION_REMOTE_CLOSE: {
204                     conn.fireRemoteClose();
205                     break;
206                 }
207 
208             case SESSION_REMOTE_OPEN: {
209                     ProtonSessionImpl session = cast(ProtonSessionImpl) protonEvent.getSession()
210                         .getContext();
211                     if (session is null) {
212                         conn.fireRemoteSessionOpen(protonEvent.getSession());
213                     } else {
214                         session.fireRemoteOpen();
215                     }
216                     break;
217                 }
218             case SESSION_REMOTE_CLOSE: {
219                     ProtonSessionImpl session = cast(ProtonSessionImpl) protonEvent.getSession()
220                         .getContext();
221                     session.fireRemoteClose();
222                     break;
223                 }
224             case LINK_REMOTE_OPEN: {
225                     ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink()
226                         .getContext();
227                     if (link !is null) {
228                         link.fireRemoteOpen();
229                         break;
230                     }
231 
232                     ProtonLinkImpl!ProtonSender lins = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink()
233                         .getContext();
234                     if (lins !is null) {
235                         lins.fireRemoteOpen();
236                         break;
237                     }
238 
239                     conn.fireRemoteLinkOpen(protonEvent.getLink());
240                     //if (link is null) {
241                     //  conn.fireRemoteLinkOpen(protonEvent.getLink());
242                     //} else {
243                     //  link.fireRemoteOpen();
244                     //}
245                     break;
246                 }
247             case LINK_REMOTE_DETACH: {
248                     ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink()
249                         .getContext();
250                     if (link !is null) {
251                         link.fireRemoteDetach();
252                         break;
253                     } else {
254                         ProtonLinkImpl!ProtonSender lk = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink()
255                             .getContext();
256                         lk.fireRemoteDetach();
257                         break;
258                     }
259                 }
260 
261             case LINK_REMOTE_CLOSE: {
262                     ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink()
263                         .getContext();
264                     if (link !is null) {
265                         link.fireRemoteClose();
266                         break;
267                     } else {
268                         ProtonLinkImpl!ProtonSender lk = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink()
269                             .getContext();
270                         lk.fireRemoteClose();
271                         break;
272                     }
273                     // link.fireRemoteClose();
274                 }
275             case LINK_FLOW: {
276                     ProtonLinkImpl!ProtonReceiver link = cast(ProtonLinkImpl!ProtonReceiver) protonEvent.getLink()
277                         .getContext();
278                     if (link !is null) {
279                         link.handleLinkFlow();
280                         break;
281                     } else {
282                         ProtonLinkImpl!ProtonSender lk = cast(ProtonLinkImpl!ProtonSender) protonEvent.getLink()
283                             .getContext();
284                         lk.handleLinkFlow();
285                         break;
286                     }
287                     // ProtonLinkImpl<?> link = (ProtonLinkImpl<?>) protonEvent.getLink().getContext();
288                     //link.handleLinkFlow();
289                 }
290             case DELIVERY: {
291                     ProtonDeliveryImpl delivery = cast(ProtonDeliveryImpl) protonEvent.getDelivery()
292                         .getContext();
293                     if (delivery !is null) {
294                         delivery.fireUpdate();
295                     } else {
296                         ProtonReceiverImpl receiver = cast(ProtonReceiverImpl) protonEvent.getLink()
297                             .getContext();
298                         receiver.onDelivery();
299                     }
300                     break;
301                 }
302             case TRANSPORT_ERROR: {
303                     failed = true;
304                     break;
305                 }
306 
307             case CONNECTION_INIT:
308                 break;
309             case CONNECTION_BOUND:
310                 break;
311             case CONNECTION_UNBOUND:
312                 break;
313             case CONNECTION_LOCAL_OPEN:
314                 break;
315             case CONNECTION_LOCAL_CLOSE:
316                 break;
317             case CONNECTION_FINAL:
318                 break;
319             case SESSION_INIT:
320                 break;
321             case SESSION_LOCAL_OPEN:
322                 break;
323             case SESSION_LOCAL_CLOSE:
324                 break;
325             case SESSION_FINAL:
326                 break;
327             case LINK_INIT:
328                 break;
329             case LINK_LOCAL_OPEN:
330                 break;
331             case LINK_LOCAL_DETACH:
332                 break;
333             case LINK_LOCAL_CLOSE:
334                 break;
335             case LINK_FINAL:
336                 break;
337             default:
338                 break;
339             }
340 
341             collector.pop();
342         }
343 
344         if (!failed) {
345             processSaslAuthentication();
346         }
347 
348         flush();
349 
350         if (failed) {
351             disconnect();
352         }
353     }
354 
355     private void processSaslAuthentication() {
356         if (authenticator is null) {
357             return;
358         }
359 
360         // socket.pause();
361         // dfmt off
362         authenticator.process(new class Handler!bool {
363             void handle(bool var1)
364             {
365                     if (var1)
366                     {
367                         authenticator = null;
368                     }
369             }
370         });
371         // dfmt on
372 
373         //  authenticator.process(complete -> {
374         //    if(complete) {
375         //      authenticator = null;
376         //    }
377         //
378         //    socket.resume();
379         //  });
380     }
381 
382     private void initiateIdleTimeoutChecks() {
383         executor = CommonUtil.scheduler();
384         executor.setRemoveOnCancelPolicy(true);
385 
386         // dfmt off
387         ScheduledFuture!(void) pingFuture = executor.scheduleWithFixedDelay(new class Runnable {
388             void run() {
389                 bool checkScheduled = false;
390                 version(HUNT_AMQP_DEBUG) logInfo("beating ...");
391                 if (connection.getLocalState() == EndpointState.ACTIVE) {
392                     // Using nano time since it is not related to the wall clock, which may change
393                     long now = LocalDateTime.now().toEpochMilli();
394                     long deadline = transport.tick(now);
395 
396                     flush();
397 
398 
399                     if (transport.isClosed()) {
400                         logError("IdleTimeoutCheck closed the transport due to the peer exceeding our requested idle-timeout.");
401                         disconnect();
402                     } else {
403                         checkScheduled = true;
404                         //if (deadline != 0) {
405                         //  // timer treats 0 as error, ensure value is at least 1 as there was a deadline
406                         //  long delay = Math.max(deadline - now, 1);
407                         //  checkScheduled = true;
408                         //  if (LOG.isTraceEnabled()) {
409                         //    LOG.trace("IdleTimeoutCheck rescheduling with delay: " + delay);
410                         //  }
411                         //  idleTimeoutCheckTimerId = vertx.setTimer(delay, this);
412                         //}
413                     }
414                 } else {
415                     version(HUNT_DEBUG) logInfo("IdleTimeoutCheck skipping check, connection is not active.");
416                 }
417             }
418         },
419         msecs(20000),
420         msecs(20000));
421 
422         // dfmt on
423 
424         //implementationMissing(false);
425         // Using nano time since it is not related to the wall clock, which may change
426         // long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
427         // long now = DateTime.currentTimeNsecs();
428         // long deadline = transport.tick(now);
429         // if (deadline != 0)
430         // {
431         //   logError("!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
432         // }
433         //if (deadline != 0) {
434         //  // timer treats 0 as error, ensure value is at least 1 as there was a deadline
435         //  long delay = Math.max(deadline - now, 1);
436         //  if (LOG.isTraceEnabled()) {
437         //    LOG.trace("IdleTimeoutCheck being initiated, initial delay: " + delay);
438         //  }
439         //  idleTimeoutCheckTimerId = vertx.setTimer(delay, new IdleTimeoutCheck());
440         //}
441     }
442 
443     private void pumpInbound(ByteBuffer buffer) {
444         if (failed) {
445             logError("Skipping processing of data following transport error");
446             return;
447         }
448 
449         //ByteBuf data = buffer.getByteBuf();
450         //do {
451         //  ByteBuffer transportBuffer = transport.tail();
452         //
453         //  int amount = Math.min(transportBuffer.remaining(), data.readableBytes());
454         //  transportBuffer.limit(transportBuffer.position() + amount);
455         //  data.readBytes(transportBuffer);
456         //
457         //  transport.process();
458         //} while (data.isReadable());
459 
460         // Lets push bytes from vert.x to proton engine.
461         try {
462             // ByteBuf data = buffer.getByteBuf();
463             // WrappedByteBuf data = new WrappedByteBuf;
464             // data.readBytes(buffer.getRemaining());
465 
466             ByteBuf data = Unpooled.wrappedBuffer(buffer);
467             do {
468                 ByteBuffer transportBuffer = transport.tail();
469                 // writeln("%s",transportBuffer.array);
470                 int amount = min(transportBuffer.remaining(), data.readableBytes());
471                 transportBuffer.limit(transportBuffer.position() + amount);
472                 // byte[] tmb = new byte[transportBuffer.position() + amount];
473                 version (HUNT_AMQP_DEBUG) {
474                     tracef("recv(%d bytes): [%(%02X %)]",
475                             data.getReadableBytes.length, data.getReadableBytes);
476                 }
477                 data.readBytes(transportBuffer);
478 
479                 // transportBuffer = BufferUtils.toBuffer(tmb);
480                 // logError("recevbef : %s",transportBuffer.toString());
481                 // transportBuffer.put(tmb);
482 
483                 // logError("recevafter : %s",transportBuffer.toString());
484 
485                 //transportBuffer.flip();
486                 // logError("recevafter flip : %s",transportBuffer.getRemaining());
487                 transport.process();
488             }
489             while (data.isReadable());
490         } catch (Exception te) {
491             failed = true;
492             logError("Exception while processing transport input");
493             //LOG.trace("Exception while processing transport input", te);
494         }
495     }
496 
497     void flush() {
498         synchronized (this) {
499             bool done = false;
500             while (!done) {
501                 ByteBuffer outputBuffer = transport.getOutputBuffer();
502                 if (outputBuffer !is null && outputBuffer.hasRemaining()) {
503                     //NetSocketInternal internal = (NetSocketInternal) socket;
504                     //ByteBuf bb = internal.channelHandlerContext().alloc().directBuffer(outputBuffer.remaining());
505                     // bb.writeBytes(outputBuffer);
506                     //logError("send : %s --- %d" , outputBuffer.array(),outputBuffer.array().length);
507                     version (HUNT_AMQP_DEBUG) {
508                         logInfof("send(%d bytes): [%(%02X %)]",
509                                 outputBuffer.getRemaining.length, outputBuffer.getRemaining());
510                     }
511                     socket.write(outputBuffer);
512                     // internal.writeMessage(bb);
513                     transport.outputConsumed();
514                 } else {
515                     done = true;
516                 }
517             }
518         }
519     }
520 
521     public void disconnect() {
522         if (netClient !is null) {
523             netClient.close();
524         } else {
525             socket.close();
526         }
527     }
528 
529     //private  class IdleTimeoutCheck implements Handler<Long> {
530     //  @Override
531     //  public void handle(Long event) {
532     //    boolean checkScheduled = false;
533     //
534     //    if (connection.getLocalState() == EndpointState.ACTIVE) {
535     //      // Using nano time since it is not related to the wall clock, which may change
536     //      long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
537     //      long deadline = transport.tick(now);
538     //
539     //      flush();
540     //
541     //      if (transport.isClosed()) {
542     //        LOG.info("IdleTimeoutCheck closed the transport due to the peer exceeding our requested idle-timeout.");
543     //        disconnect();
544     //      } else {
545     //        if (deadline != 0) {
546     //          // timer treats 0 as error, ensure value is at least 1 as there was a deadline
547     //          long delay = Math.max(deadline - now, 1);
548     //          checkScheduled = true;
549     //          if (LOG.isTraceEnabled()) {
550     //            LOG.trace("IdleTimeoutCheck rescheduling with delay: " + delay);
551     //          }
552     //          idleTimeoutCheckTimerId = vertx.setTimer(delay, this);
553     //        }
554     //      }
555     //    } else {
556     //      LOG.trace("IdleTimeoutCheck skipping check, connection is not active.");
557     //    }
558     //
559     //    if (!checkScheduled) {
560     //      idleTimeoutCheckTimerId = null;
561     //      LOG.trace("IdleTimeoutCheck exiting");
562     //    }
563     //  }
564     //}
565 }