Lee Hambley

Founder, Consultant, Software Engineer

Hamburg, Germany

I recently ran across the use-case of needing to consume the CouchDB _changes stream in a streaming fashion.

Most of the recommendations I found online were singing the praises of EventMachine or some other event-driven solution, but in fact, it's much easier than that, here's the code I came up with:

require 'json'
                require 'thread'
                require 'socket'
                changes = Queue.new
                reader  = Thread.new(changes) do |changes|
                  s = TCPSocket.new 'localhost', 5984
                  s.write "GET /omio_development/_changes?since=0&feed=continuous\r\n"
                  s.write "\r\n"
                  while line = s.gets
                    changes.push JSON.parse(line) if line.chars.first == '{'
                  changes.push NilClass
                handler = Thread.new(changes) do |changes|
                  while change = changes.pop
                    break if change.is_a? NilClass
                    warn change.inspect
                [reader, handler].map(&:join)

This is the simplest thing that can normally be made to work, and is idiomatic using Ruby's Queue class (imported from the thread part of the standard library) to communicate between two threads.

There's nothing groundbreaking here, it should probably be moved into a class, and made so that it can be started, and stopped from the background (I'd run these two threads in a new thread, that a start and stop method could be called easily and conveniently. I'd probably pass a locked mutex to the reader thread to stop it in the first line of it's body before calling start on my class, the second thread wouldn't need a mutex as it would block immediately on the handler

I've taken some liberties here, with the specifics of the couchdb continuous changes protocol, as I simply dont' care about the HTTP headers (they'd tell me it's a chunked response, with no Content-Length.

It would be prudent to test this a bit more thoroughly, but writing a test-case for this would involve writing a server as well, as I'm not aware of any of the web-mocking tools that will allow you to mock streaming bodies, but I haven't researched it extensively.

There's probably a better way of signalling the handler thread that the queue is finished than passing a NilClass, curiously in Ruby there's no way to close a queue, or terminate it, except to send a known end of queue value to cause the thread to break out. In the Go language, queues are called channels and they can be closed(), signalling anyone who is reading the channel that there's nothing else to read, and never will be.

This work is licensed under a Creative Commons Attribution 3.0 Unported License.

Licences for my individual projects, and mini-code snippets can be found within each.