1 /*
2  * hunt-amqp: AMQP library for D programming language, based on hunt-net.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.streams.ProtonStreams;
12 
13 import hunt.proton.message.Message;
14 
15 import hunt.amqp.ProtonConnection;
16 import hunt.amqp.impl.ProtonConnectionImpl;
17 import hunt.amqp.streams.impl.ProtonPublisherImpl;
18 import hunt.amqp.streams.impl.ProtonPublisherWrapperImpl;
19 import hunt.amqp.streams.impl.ProtonSubscriberImpl;
20 import hunt.amqp.streams.impl.ProtonSubscriberWrapperImpl;
21 import hunt.amqp.streams.ProtonPublisher;
22 import hunt.amqp.streams.Delivery;
23 import hunt.amqp.streams.ProtonPublisherOptions;
24 import hunt.amqp.streams.ProtonSubscriber;
25 import hunt.amqp.streams.Tracker;
26 import hunt.amqp.streams.ProtonSubscriberOptions;
27 
28 class ProtonStreams {
29 
30   /**
31    * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}.
32    * Messages are carried by {@link Delivery} elements of the stream, which are used by the consuming application
33    * to explicitly acknowledge each message after processing it.
34    *
35    * The publisher may only be subscribed to a single time.
36    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
37    *
38    * @param connection
39    *          the connection to create the consumer with.
40    * @param address
41    *          The source address to attach the consumer to.
42    * @return the consumers Publisher stream.
43    */
44   public static ProtonPublisher!Delivery createDeliveryConsumer(ProtonConnection connection, string address) {
45     return createDeliveryConsumer(connection, address, new ProtonPublisherOptions());
46   }
47 
48   /**
49    * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}.
50    * Messages are carried by {@link Delivery} elements of the stream, which are used by the consuming
51    * application to explicitly acknowledge each message after processing it.
52    *
53    * The publisher may only be subscribed to a single time.
54    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
55    *
56    * The consumer link is closed when the subscription is cancelled, unless the passed options request a
57    * durable sub, in which case the link is only detached. A Dynamic address can be requested by setting
58    * the dynamic option true.
59    *
60    * @param connection
61    *          the connection to create the consumer with.
62    * @param address
63    *          The source address to attach the consumer to, or null the 'dynamic' option is being used.
64    * @param options
65    *          The options.
66    * @return the consumers Publisher stream.
67    */
68   public static ProtonPublisher!Delivery createDeliveryConsumer(ProtonConnection connection, string address, ProtonPublisherOptions options) {
69     return new ProtonPublisherImpl(address, cast(ProtonConnectionImpl) connection, options);
70   }
71 
72   /**
73    * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}.
74    * Messages will be automatically accepted when the {@link org.reactivestreams.Subscriber#onNext(Object) Subscriber#onNext(Object)}
75    * method returns. If you require more control over when the message is accepted, you should use
76    * {@link #createDeliveryConsumer(ProtonConnection, String)} instead.
77    *
78    * The publisher may only be subscribed to a single time.
79    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
80    *
81    * @param connection
82    *          the connection to create the consumer with.
83    * @param address
84    *          The source address to attach the consumer to.
85    * @return the consumers Publisher stream.
86    */
87   public static ProtonPublisher!Message createConsumer(ProtonConnection connection, string address) {
88     return createConsumer(connection, address, new ProtonPublisherOptions());
89   }
90 
91   /**
92    * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}.
93    * Messages will be automatically accepted when the {@link org.reactivestreams.Subscriber#onNext(Object) Subscriber#onNext(Object)}
94    * method returns. If you require more control over when the message is accepted, you should use
95    * {@link #createDeliveryConsumer(ProtonConnection, String, ProtonPublisherOptions)} instead.
96    *
97    * The publisher may only be subscribed to a single time.
98    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
99    *
100    * @param connection
101    *          the connection to create the consumer with.
102    * @param address
103    *          The source address to attach the consumer to.
104    * @param options
105    *          The options.
106    * @return the consumers Publisher stream.
107    */
108   public static ProtonPublisher!Message createConsumer(ProtonConnection connection, string address, ProtonPublisherOptions options) {
109     ProtonPublisherImpl publisher = new ProtonPublisherImpl(address, cast(ProtonConnectionImpl) connection, options);
110 
111     return new ProtonPublisherWrapperImpl(publisher);
112   }
113 
114   /**
115    * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}.
116    *
117    * The status of the message delivery, i.e whether the server peer accepts it etc, can be checked
118    * using its containing tracker, which are created using {@link Tracker#create(Message, io.vertx.core.Handler)}
119    * or {@link Tracker#create(Message)}.
120    *
121    * The subscriber may only be subscribed once.
122    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
123    *
124    * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay'
125    * and each message sent must specify its individual destination address.
126    *
127    * @param connection
128    *          the connection to create the consumer with.
129    * @param address
130    *          The target address to attach the producer to (or null to send to the anonymous relay).
131    * @return the producers Subscriber stream.
132    */
133   public static ProtonSubscriber!Tracker createTrackerProducer(ProtonConnection connection, string address) {
134     return new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection);
135   }
136 
137   /**
138    * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}.
139    *
140    * The status of the message delivery, i.e whether the server peer accepts it etc, can be checked
141    * using its containing tracker, which are created using {@link Tracker#create(Message, io.vertx.core.Handler)}
142    * or {@link Tracker#create(Message)}.
143    *
144    * The subscriber may only be subscribed once.
145    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
146    *
147    * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay'
148    * and each message sent must specify its individual destination address.
149    *
150    * @param connection
151    *          the connection to create the consumer with.
152    * @param address
153    *          The target address to attach the producer to (or null to send to the anonymous relay).
154    * @param options
155    *          The options.
156    * @return the producers Subscriber stream.
157    */
158   public static ProtonSubscriber!Tracker createTrackerProducer(ProtonConnection connection, string address, ProtonSubscriberOptions options) {
159     return new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection, options);
160   }
161 
162   /**
163    * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}.
164    * The status of the message delivery can not be tracked after send, if you need that ability use
165    * {@link ProtonStreams#createTrackerProducer(ProtonConnection, String)}.
166    *
167    * The subscriber may only be subscribed once.
168    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
169    *
170    * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay'
171    * and each message sent must specify its individual destination address.
172    *
173    * @param connection
174    *          the connection to create the consumer with.
175    * @param address
176    *          The target address to attach the producer to (or null to send to the anonymous relay).
177    * @return the producers Subscriber stream.
178    */
179   public static ProtonSubscriber!Message createProducer(ProtonConnection connection, string address) {
180     ProtonSubscriberImpl subscriber = new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection);
181 
182     return new ProtonSubscriberWrapperImpl(subscriber);
183   }
184 
185   /**
186    * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}.
187    * The status of the message delivery can not be tracked after send, if you need that ability use
188    * {@link ProtonStreams#createTrackerProducer(ProtonConnection, String, ProtonSubscriberOptions)}.
189    *
190    * The subscriber may only be subscribed once.
191    * Must be called on the {@link io.vertx.core.Context} thread for the given connection.
192    *
193    * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay'
194    * and each message sent must specify its individual destination address.
195    *
196    * @param connection
197    *          the connection to create the consumer with.
198    * @param address
199    *          The target address to attach the producer to (or null to send to the anonymous relay).
200    * @param options
201    *          The options.
202    * @return the producers Subscriber stream.
203    */
204   public static ProtonSubscriber!Message createProducer(ProtonConnection connection, string address, ProtonSubscriberOptions options) {
205     ProtonSubscriberImpl subscriber = new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection, options);
206 
207     return new ProtonSubscriberWrapperImpl(subscriber);
208   }
209 }