Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AMQP Transport #746

Merged
merged 1 commit into from
Nov 8, 2018
Merged

AMQP Transport #746

merged 1 commit into from
Nov 8, 2018

Conversation

mattfung
Copy link
Contributor

@mattfung mattfung commented Aug 7, 2018

Hello,

I noticed that here had been interest in AMQP transport similiar to NATS (#681) as well as talks of pubsub support (#295) so I thought I might like to share an implementation of a simple AMQP client & server transport.

Comments/Reviews would be appreciated.

@peterbourgon
Copy link
Member

Cool, this looks reasonable at a first glance. As a next step, please run go vet, golint, and staticcheck and fix the relevant errors. A couple of problems that stood out to me: underscores in variable names, and superfluous newlines in function declarations.

@mattfung
Copy link
Contributor Author

May bad with the underscores.. bad python habits.
The problems should be fixed now, lemme know if anything can be improved

@jstoja
Copy link

jstoja commented Oct 16, 2018

Hey @peterbourgon @mattfung any chance that this moves forward? This would be a great addition along NATS!

@jstoja
Copy link

jstoja commented Oct 17, 2018

@mattfung I had some issues with running the tests on my end. Rebasing solves all the tests, would you be able to do that?

@jstoja
Copy link

jstoja commented Oct 17, 2018

I tried the subscriber par only and it worked great! I seems to me that it could be enhanced when looking at Nats.
With the Nats transporter, you can directly pass a Handler when subscribing, like this

cSub, err := nc.QueueSubscribe("stringsvc.count", "stringsvc", countHandler.ServeMsg(nc))

Which doesn't seem possible without additional code for this implementation since the AMQP client library returns a channel of Deliveries:

func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error)
func (s Subscriber) ServeDelivery(ch Channel) func(deliv *amqp.Delivery)

and the ServeDelivery process message per message.

So you have to create a helper function to listen to the channel and map the ServiceDelivery return to each message. Have you any idea on how to improve this? Or maybe I'm missing something in the implementation...

@mattfung
Copy link
Contributor Author

make a PR to https://github.com/streadway/amqp with channel listener functionality :)

jokes aside, Id be interested on improvements

@jstoja
Copy link

jstoja commented Oct 17, 2018 via email

@jstoja
Copy link

jstoja commented Nov 4, 2018

@peterbourgon Do you have any feedback on this for merging it? I've used it for a professional project as a demonstration (so with small usage), and it ran several days without any issues.

@peterbourgon
Copy link
Member

peterbourgon commented Nov 4, 2018 via email

@jstoja
Copy link

jstoja commented Nov 4, 2018 via email

Copy link
Member

@peterbourgon peterbourgon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of relatively minor design questions, and lots of style nits, most notably that I want proper punctuation for all block comments :) But in general this looks good, with those changes and some explanation, I'm see no problem to merge. Thank you for the great contribution!

@@ -0,0 +1,2 @@
// Package amqp implements a go-kit transport for AMQP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Package amqp implements a go-kit transport for AMQP
// Package amqp implements an AMQP transport.

type DecodeRequestFunc func(context.Context, *amqp.Delivery) (request interface{}, err error)

// EncodeRequestFunc encodes the passed request object into
// an AMQP channel for publishing. It is designed to be used in AMQP Publishers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// an AMQP channel for publishing. It is designed to be used in AMQP Publishers
// an AMQP channel for publishing. It is designed to be used in AMQP Publishers.

type EncodeRequestFunc func(context.Context, *amqp.Publishing, interface{}) error

// EncodeResponseFunc encodes the passed reponse object to
// an AMQP channel for publishing. It is designed to be used in AMQP Subscribers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// an AMQP channel for publishing. It is designed to be used in AMQP Subscribers
// an AMQP channel for publishing. It is designed to be used in AMQP Subscribers.

*amqp.Delivery, Channel, *amqp.Publishing, interface{}) error

// DecodeResponseFunc extracts a user-domain response object from
// an AMQP Delivery object. It is designed to be used in AMQP Publishers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// an AMQP Delivery object. It is designed to be used in AMQP Publishers
// an AMQP Delivery object. It is designed to be used in AMQP Publishers.

"github.com/streadway/amqp"
)

// Publisher wraps AMQP channel and queue and provides a method that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Publisher wraps AMQP channel and queue and provides a method that
// Publisher wraps an AMQP channel and queue, and provides a method that

type PublisherResponseFunc func(context.Context, *amqp.Delivery) context.Context

// SetPublishExchange returns a RequestFunc that sets the Exchange field
// of AMQP Publish call
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add ending punctuation (.) to this and all other header comment blocks in the PR.

// ReplyErrorEncoder serializes the error message as a DefaultErrorResponse
// JSON and sends the message to the ReplyTo address
func ReplyErrorEncoder(ctx context.Context,
err error, deliv *amqp.Delivery, ch Channel, pub *amqp.Publishing) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please either put the function definition on a single line, or use one parameter per line, e.g.

func Foo(
    ctx context.Context,
    err error,
    deliv *amqp.Delivery,
) {

}

deliv, err := p.publishAndConsumeFirstMatchingResponse(ctx, &pub)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

}

request, err := s.dec(ctx, deliv)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

ctx = f(ctx, deliv, ch, &pub)
}

if err := s.enc(ctx, deliv, ch, &pub, response); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're combining "encoding the response to the delivery" with "sending the delivery over AMQP" into a single operation, but my intuition is these should be separate steps, with separate failure modes. Is there a technical reason for that that I'm overlooking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No good reason other than "cuz the nats transport did it"

I'll separate it to two functions

@mattfung
Copy link
Contributor Author

mattfung commented Nov 6, 2018

Sounds good, I'll get them fixed within a day's time.
thx again for putting in the time to do this

@mattfung
Copy link
Contributor Author

mattfung commented Nov 6, 2018

Committed the changes. I'm guessing the tests failed because I didn't rebase the code with the latest master branch?

Anyhow, let me know if anything needs to be changed.

@jstoja
Copy link

jstoja commented Nov 7, 2018

Tried without (and also with) the rebase locally and I didn't manage to make them fail... Maybe Peter will have an idea.

@peterbourgon
Copy link
Member

Yes, this seems fine; can you rebase on master?

amqp transport publisher

amqp transport tests

lint fixes for amqp transport

fixed formatting and punctuation

zzz

default ContentType is null, increased max length of correlationId to 255

refractored subscriber EncodeResponseFunc into encode func, and send func

zzz
@peterbourgon peterbourgon merged commit 12210fb into go-kit:master Nov 8, 2018
@peterbourgon
Copy link
Member

Dang nice

@tlogik
Copy link

tlogik commented Nov 15, 2018

@mattfung I am new to gokit and struggling a bit getting AMQP up and working.
Besides the test do you have a sample app or something that shows how to use the AMQP setup.
I am starting prototyping moving some of out NET services to GO and we rely heavily on RabbitMQ.
Any help is very much appreciated.

@mattfung
Copy link
Contributor Author

@tlogik for now, heres an implementation of stringsvc4 pub & sub
https://github.com/cloudpbx/kit/tree/amqp/examples/stringsvc4/amqp

Hope this helps.

@jstoja
Copy link

jstoja commented Nov 16, 2018

Awesome! Would be great to have it in this repository too, no?

@mattfung
Copy link
Contributor Author

mattfung commented Nov 16, 2018

Thats the plan, my only reservation currently is that the commit makes some breaking changes with the existing stringsvc4 implementation. When I resolve that, I'll create a new merge request.

@tlogik
Copy link

tlogik commented Nov 17, 2018

Hope this helps.

It helped a lot. Much appreciated.

@tlogik
Copy link

tlogik commented Nov 18, 2018

Hi again @mattfung
I am using your example as a template, trying to get the behaviour i need.
Questions arise as i work with the implementation.
I find that in the Subscriber.go, func ServeDelivery, the channel for consuming is also used for publishing.
That is somewhat ok, except for at common requirement for subscribing from one Vhost and publishing to another Vhost.
AFAIK you need to create a new connection to each Vhost, create individual channels etc.

i do not seem to be able to setup the internal publisher, in the subscriber.go, for an alternate channel - can you confirm that is not possible in the current implementation?

@mattfung
Copy link
Contributor Author

@tlogik correct, current implementation does not allow for a subscriber to subscribe to one channel (& consequently, Vhost) and publish to another. I am curious on how other frameworks handle this problem if you happen to know of any.

Currently, publishers and subscribers are (for simplicity's sake) tied to one channel. That can change if it is a significant limitation.

An alternative approach would be to have a subscriber for vhost A trigger a publisher for vhost B.

@tlogik
Copy link

tlogik commented Nov 20, 2018

@mattfung currently we use individual instances of subscriber and publisher with individual connstrings.
I was looking into using go-kit as the toolbox of choice to create a standard to metrics, health, service registration etc, but it is a little hard to extend the transport layer.
i am currently working on a service that needs to batch process messages consumed from MQ and then inserted into SOLR. It is a lot faster to insert in SOLR via batching than individual requests.
This will require a custom amqp implementation to support this.

@jstoja
Copy link

jstoja commented Nov 23, 2018

@tlogik @mattfung From steadway/amqp's package documentation:

It's advisable to use separate connections for Channel.Publish and Channel.Consume so not to have TCP pushback on publishing affect the ability to consume messages, so this parameter is here mostly for completeness.

source: https://godoc.org/github.com/streadway/amqp#Channel.Consume

So to me the implementation of @mattfung respect perfectly that I like that it's not possible to shoot myself in the foot using this implementation with go-kit.

Moreover, the vhosts are part of the connection string, you cannot have 1 channel listening on several vhost, since an amqp channel is some kind of wrapper adding multiplexing to the parent amqp connection which is itself tied to one vhost.

@tlogik I guess that if you want to implement that, you can easily consume messages from one channel following @mattfung examples and implement some kind of buffering inside your program in addition of using prefetching (https://godoc.org/github.com/streadway/amqp#Channel.Qos).

@freemanoid
Copy link

@mattfung thanks for the amqp implementation, I've adopted it and it works perfectly! I see that consuming function automatically publishes some "response". It's not needed for me and I don't see any possibility to turn it off. Can you explain why did you added it? Maybe I'm using amqp wrong.

@peterbourgon
Copy link
Member

If you're not publishing responses, you're not doing RPC.

@freemanoid
Copy link

If you're not publishing responses, you're not doing RPC.

Do you mean this implementation is not about AMQP, but RPC over AMQP?

@mattfung
Copy link
Contributor Author

@freemanoid Good point, one way AMQP communication is not uncommon. Currently the closest thing to a "no-op" is to use the NopEncoder such that an empty response is sent back.

There are two solutions to implement true "no-op" subscribers, one is to make the "encoder" both encode and send, and use EncodeNopResponse (though Im not sure if thats a good idea )

The other option is to add an options/ change the parameter to NewSubscriber

@peterbourgon
Copy link
Member

@freemanoid

Do you mean this implementation is not about AMQP, but RPC over AMQP?

Yes. Go kit is concerned exclusively with RPC.

Non-goals

  • Supporting messaging patterns other than RPC (for now) — e.g. MPI, pub/sub, CQRS, etc.

@mattfung

The other option is to add an options/ change the parameter to NewSubscriber

For the record, if this is easy, I'd happily take a PR.

@madshov
Copy link

madshov commented Mar 2, 2019

@mattfung Do you plan any work around adding an option/change parameter to NewSubscriber to no-op subscribers?

@mattfung
Copy link
Contributor Author

@madshov sorry for the slow response, PR is posted #850

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants