1 /* 2 * hunt-amqp: AMQP library for D programming language, based on hunt-net. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.streams.ProtonStreams; 12 13 import hunt.proton.message.Message; 14 15 import hunt.amqp.ProtonConnection; 16 import hunt.amqp.impl.ProtonConnectionImpl; 17 import hunt.amqp.streams.impl.ProtonPublisherImpl; 18 import hunt.amqp.streams.impl.ProtonPublisherWrapperImpl; 19 import hunt.amqp.streams.impl.ProtonSubscriberImpl; 20 import hunt.amqp.streams.impl.ProtonSubscriberWrapperImpl; 21 import hunt.amqp.streams.ProtonPublisher; 22 import hunt.amqp.streams.Delivery; 23 import hunt.amqp.streams.ProtonPublisherOptions; 24 import hunt.amqp.streams.ProtonSubscriber; 25 import hunt.amqp.streams.Tracker; 26 import hunt.amqp.streams.ProtonSubscriberOptions; 27 28 class ProtonStreams { 29 30 /** 31 * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}. 32 * Messages are carried by {@link Delivery} elements of the stream, which are used by the consuming application 33 * to explicitly acknowledge each message after processing it. 34 * 35 * The publisher may only be subscribed to a single time. 36 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 37 * 38 * @param connection 39 * the connection to create the consumer with. 40 * @param address 41 * The source address to attach the consumer to. 42 * @return the consumers Publisher stream. 43 */ 44 public static ProtonPublisher!Delivery createDeliveryConsumer(ProtonConnection connection, string address) { 45 return createDeliveryConsumer(connection, address, new ProtonPublisherOptions()); 46 } 47 48 /** 49 * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}. 50 * Messages are carried by {@link Delivery} elements of the stream, which are used by the consuming 51 * application to explicitly acknowledge each message after processing it. 52 * 53 * The publisher may only be subscribed to a single time. 54 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 55 * 56 * The consumer link is closed when the subscription is cancelled, unless the passed options request a 57 * durable sub, in which case the link is only detached. A Dynamic address can be requested by setting 58 * the dynamic option true. 59 * 60 * @param connection 61 * the connection to create the consumer with. 62 * @param address 63 * The source address to attach the consumer to, or null the 'dynamic' option is being used. 64 * @param options 65 * The options. 66 * @return the consumers Publisher stream. 67 */ 68 public static ProtonPublisher!Delivery createDeliveryConsumer(ProtonConnection connection, string address, ProtonPublisherOptions options) { 69 return new ProtonPublisherImpl(address, cast(ProtonConnectionImpl) connection, options); 70 } 71 72 /** 73 * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}. 74 * Messages will be automatically accepted when the {@link org.reactivestreams.Subscriber#onNext(Object) Subscriber#onNext(Object)} 75 * method returns. If you require more control over when the message is accepted, you should use 76 * {@link #createDeliveryConsumer(ProtonConnection, String)} instead. 77 * 78 * The publisher may only be subscribed to a single time. 79 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 80 * 81 * @param connection 82 * the connection to create the consumer with. 83 * @param address 84 * The source address to attach the consumer to. 85 * @return the consumers Publisher stream. 86 */ 87 public static ProtonPublisher!Message createConsumer(ProtonConnection connection, string address) { 88 return createConsumer(connection, address, new ProtonPublisherOptions()); 89 } 90 91 /** 92 * Creates an AMQP consumer, presented as a reactive streams {@link org.reactivestreams.Publisher Publisher}. 93 * Messages will be automatically accepted when the {@link org.reactivestreams.Subscriber#onNext(Object) Subscriber#onNext(Object)} 94 * method returns. If you require more control over when the message is accepted, you should use 95 * {@link #createDeliveryConsumer(ProtonConnection, String, ProtonPublisherOptions)} instead. 96 * 97 * The publisher may only be subscribed to a single time. 98 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 99 * 100 * @param connection 101 * the connection to create the consumer with. 102 * @param address 103 * The source address to attach the consumer to. 104 * @param options 105 * The options. 106 * @return the consumers Publisher stream. 107 */ 108 public static ProtonPublisher!Message createConsumer(ProtonConnection connection, string address, ProtonPublisherOptions options) { 109 ProtonPublisherImpl publisher = new ProtonPublisherImpl(address, cast(ProtonConnectionImpl) connection, options); 110 111 return new ProtonPublisherWrapperImpl(publisher); 112 } 113 114 /** 115 * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}. 116 * 117 * The status of the message delivery, i.e whether the server peer accepts it etc, can be checked 118 * using its containing tracker, which are created using {@link Tracker#create(Message, io.vertx.core.Handler)} 119 * or {@link Tracker#create(Message)}. 120 * 121 * The subscriber may only be subscribed once. 122 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 123 * 124 * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay' 125 * and each message sent must specify its individual destination address. 126 * 127 * @param connection 128 * the connection to create the consumer with. 129 * @param address 130 * The target address to attach the producer to (or null to send to the anonymous relay). 131 * @return the producers Subscriber stream. 132 */ 133 public static ProtonSubscriber!Tracker createTrackerProducer(ProtonConnection connection, string address) { 134 return new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection); 135 } 136 137 /** 138 * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}. 139 * 140 * The status of the message delivery, i.e whether the server peer accepts it etc, can be checked 141 * using its containing tracker, which are created using {@link Tracker#create(Message, io.vertx.core.Handler)} 142 * or {@link Tracker#create(Message)}. 143 * 144 * The subscriber may only be subscribed once. 145 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 146 * 147 * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay' 148 * and each message sent must specify its individual destination address. 149 * 150 * @param connection 151 * the connection to create the consumer with. 152 * @param address 153 * The target address to attach the producer to (or null to send to the anonymous relay). 154 * @param options 155 * The options. 156 * @return the producers Subscriber stream. 157 */ 158 public static ProtonSubscriber!Tracker createTrackerProducer(ProtonConnection connection, string address, ProtonSubscriberOptions options) { 159 return new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection, options); 160 } 161 162 /** 163 * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}. 164 * The status of the message delivery can not be tracked after send, if you need that ability use 165 * {@link ProtonStreams#createTrackerProducer(ProtonConnection, String)}. 166 * 167 * The subscriber may only be subscribed once. 168 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 169 * 170 * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay' 171 * and each message sent must specify its individual destination address. 172 * 173 * @param connection 174 * the connection to create the consumer with. 175 * @param address 176 * The target address to attach the producer to (or null to send to the anonymous relay). 177 * @return the producers Subscriber stream. 178 */ 179 public static ProtonSubscriber!Message createProducer(ProtonConnection connection, string address) { 180 ProtonSubscriberImpl subscriber = new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection); 181 182 return new ProtonSubscriberWrapperImpl(subscriber); 183 } 184 185 /** 186 * Creates an AMQP producer, presented as a reactive streams {@link org.reactivestreams.Subscriber Subscriber}. 187 * The status of the message delivery can not be tracked after send, if you need that ability use 188 * {@link ProtonStreams#createTrackerProducer(ProtonConnection, String, ProtonSubscriberOptions)}. 189 * 190 * The subscriber may only be subscribed once. 191 * Must be called on the {@link io.vertx.core.Context} thread for the given connection. 192 * 193 * If no address (i.e null) is specified then a producer will be established to the 'anonymous relay' 194 * and each message sent must specify its individual destination address. 195 * 196 * @param connection 197 * the connection to create the consumer with. 198 * @param address 199 * The target address to attach the producer to (or null to send to the anonymous relay). 200 * @param options 201 * The options. 202 * @return the producers Subscriber stream. 203 */ 204 public static ProtonSubscriber!Message createProducer(ProtonConnection connection, string address, ProtonSubscriberOptions options) { 205 ProtonSubscriberImpl subscriber = new ProtonSubscriberImpl(address, cast(ProtonConnectionImpl) connection, options); 206 207 return new ProtonSubscriberWrapperImpl(subscriber); 208 } 209 }