Advanced

Keep-alives

For HTTP streaming and WebSocket connections, it is usually desirable to send keep-alive packets periodically, so that idle connections aren’t timed out by clients or routers. Pushpin has built-in support for sending such packets.

For HTTP streaming, include a Grip-Keep-Alive response header:

HTTP/1.1 200 OK
Grip-Hold: stream
Grip-Channel: mychannel
Grip-Keep-Alive: \n; format=cstring; timeout=30

For example, the above response would tell Pushpin to send a newline whenever the stream has been idle for 30 seconds. The format field can be set to raw (data sent as-is; this is the default if omitted), cstring (data is backslash-escaped, useful for sending lines or multi-lines), or base64 (data is Base64 encoded, useful for sending binary data).

For WebSockets, send a control message of type keep-alive:

c:{"type": "keep-alive", "content": "{}", "timeout": 30}

For example, the above control message would tell Pushpin to send a message containing {} whenever the connection has been idle for 30 seconds. To send binary content, set content-bin to a Base64-encoded value instead of setting content. To send message types other than TEXT, set message-type to binary, ping, or pong. To turn keep-alives off, send the control message with the content and content-bin parameters omitted.

Commands

Publishing is just one of several commands supported by Pushpin. Commands are available over ZeroMQ and/or HTTP.

To call commands over ZeroMQ, connect to Pushpin’s REP command socket (port 5563 by default). Requests are tnetstring-encoded, containing a method field. If a command requires arguments, set them using the args field (containing an object of named arguments). If you may make multiple requests over the same connection using a DEALER socket, then you should also set the id field so that it is possible to match replies.

Publish

As a command, publishing is supported over HTTP only. To publish via ZeroMQ, connect to Pushpin’s PULL or SUB sockets (since these inputs are not request/response, they are not considered to be “commands”). See Publishing.

Get ZeroMQ URIs

Discover Pushpin’s ZeroMQ socket addresses. This command is supported over ZeroMQ only.

Pushpin binds on on many ZeroMQ socket addresses, and having to configure your application with all of them can be cumbersome. With this command, your application only needs to know the address of the command socket, which it can then use to discover the others.

Request:

{
  "method": "get-zmq-uris"
}

Response:

{
  "success": true,
  "value": {
    "command": "tcp://127.0.0.1:5563",
    "publish-pull": "tcp://127.0.0.1:5560",
    "publish-sub": "tcp://127.0.0.1:5562"
  }
}

Recover

Cause all reliable connections to enter a recovery state. This command is supported over both HTTP and ZeroMQ.

Use this command if your publisher has recently crashed, to speed up subscription recovery time.

HTTP request:

POST /recover HTTP/1.1
Host: localhost:5561

(empty body)

HTTP response

HTTP/1.1 200 OK
Content-Type: text/plain

Updated

ZeroMQ request:

{
  "method": "recover"
}
{
  "success": true
}

Refresh

Cause a WebSocket-over-HTTP session to make a request. This command is supported over ZeroMQ only.

This command requires an argument cid containing the connection ID. Its value should be the same as the Connection-ID header present in received WebSocket-over-HTTP requests.

Request:

{
  "method": "refresh",
  "args": {
    "cid": "1b5e2c2e-e8ce-11e6-88b6-a72f3e34e30c"
  }
}
{
  "success": true
}

Connection check

Check if one or more connections are still present. This command is supported over ZeroMQ only.

This command requires an argument ids containing a list of connection IDs to look up. Theses IDs should be the same as those received over the Stats socket. The command returns the IDs that are still present.

Request:

{
  "method": "conncheck",
  "args": {
    "ids": [
      "m2zhttp_45517:pushpin-m2-7999_1_0",
      "m2zhttp_45517:pushpin-m2-7999_2_0"
    ]
  }
}

Response:

{
  "success": true,
  "value": [
    "m2zhttp_45517:pushpin-m2-7999_1_0"
  ]
}

Stats socket

Pushpin exposes various events using a ZeroMQ PUB socket. Each message is prefixed with an event type followed by a space, then a tnetstring-encoded payload prefixed with the letter T. By default, the stats socket is at ipc://{rundir}/pushpin-stats, but this can be changed using the stats_spec configuration option.

For example, a received message might look like this: (lines wrapped for readability)

message T88:4:from,21:pushpin-handler_18537,7:channel,4:test,
9:transport,11:http-stream,5:count,1:1#}

The following events are defined:

Connection:

Connection (conn) events indicate when connections come and go. They may contain the following fields:

Subscription:

Subscription (sub) events indicate when subscriptions come and go. They may contain the following fields:

Message:

Message (message) events indicate when one or more messages have been delivered to receivers. They may contain the following fields:

Activity:

Activity (activity) events indicate the aggregate activity level of various activities. They can be used as a rough way of determining how busy the connections are, excluding published messages. An activity is counted for each new request or connection, for every keep-alive sent, and for every WebSocket message (non-published) sent within the last minute.

Report:

Report (report) events indicate aggregate connection, message, and activity information. They are sent periodically.

Here’s a simple Python program that connects to the stats socket and prints the decoded messages:

import sys
import tnetstring
import zmq

ctx = zmq.Context()
sock = ctx.socket(zmq.SUB)
sock.connect('ipc:///var/run/pushpin/pushpin-stats')
sock.setsockopt(zmq.SUBSCRIBE, '')

while True:
    m_raw = sock.recv()
    mtype, mdata = m_raw.split(' ', 1)
    if mdata[0] != 'T':
        print 'unsupported format'
        continue
    m = tnetstring.loads(mdata[1:])
    print '%s %s' % (mtype, m)

The zmq.SUBSCRIBE option specifies a prefix filter on messages. If you only want to receive events of a certain type, you can set this to a non-empty string such as 'report '.

Message filtering

The backend can assign arbitrary meta values to a subscriber and also to a published message, and these values can be used for filtering logic. Currently there is only one supported filter: skip-self, which will drop a message if the subscriber has a user meta value and the published message has a sender meta value, and these two values are equal.

To set the user meta value and enable the skip-self filter for HTTP, include an extra Grip-Set-Meta header and add a filter param to any Grip-Channel:

Grip-Set-Meta: user=alice
Grip-Channel: test; filter=skip-self

To apply more than one filter to a channel, pass additional filter params to the same channel.

For WebSockets, this info goes into control messages:

c:{"type": "set-meta", "name": "user", "value": "alice"}
c:{"type": "subscribe", "channel": "test", "filters": ["skip-self"]}

In the case of WebSockets, the list of filters is passed as a single list rather than multiple filter params.

To include meta values in a published message, include a meta object in the message item, containing the values:

{
  "items": [
    {
      "channel": "test",
      "meta": {
        "sender": "alice"
      },
      "formats": {
        "http-stream": {
          "content": "hello\n"
        },
        "ws-message": {
          "content": "hello\n"
        }
      }
    }
  ]
}

The pushpin-publish tool has a --sender option for easy experimenting.

Paged streaming

If the backend needs to return a lot of initial content for an HTTP streaming request, then it can return a portion of the content and have Pushpin make a new request to fetch the next part. This is done by providing a Grip-Link header:

HTTP/1.1 200 OK
Content-Type: text/plain
Grip-Link: </stream/?after=nextID>; rel=next

{... first part of content ...}

Once Pushpin has finished sending the current response data to the client, it will request the next link and send that response to the client as well (body only). This process will repeat until a response from the backend either omits a next link or sets Grip-Hold. If the response contains both Grip-Hold and a next link, then the request will enter a hold state and the next link may be used for data recovery (see Reliability).

There is a blog post that walks through this feature.

Reliability

Publish-subscribe systems are unreliable by design. However, because Pushpin interfaces with a backend server that likely has access to durable storage, it is possible for Pushpin to leverage the backend in order to provide reliable transmission.

Currently, this feature works for HTTP streaming and long-polling, but not WebSockets. Also, the way it is used depends on the transport. See below for per-transport details.

There is a blog post that walks through this feature.

Reliable HTTP streaming

When creating a stream hold, any channel to be subscribed must include a prev-id value. A next link must also be provided:

HTTP/1.1 200 OK
Content-Type: text/plain
Grip-Hold: stream
Grip-Channel: fruit; prev-id=3
Grip-Link: </fruit/?after=3>; rel=next

{... initial response ...}

Pushpin will enter a hold state, and may request the next link in order to repair the data stream under the following conditions:

If a published message’s prev-id matches the last known prev-id of the subscription, then the last known prev-id of the subscription is set to the published message’s id and the message is delivered. If it does not match, then the message is dropped and the next link is requested.

When the next link is requested, the behavior is similar to Paged streaming, in that subsequent next links will be followed if provided, until a response contains Grip-Hold at which point the connection returns to a hold state.

In each request, Pushpin will include a Grip-Last header indicating the last known ID received on a given channel. This should take precedence over whatever checkpoint information may have been encoded in the link.

GET /fruit/?after=3 HTTP/1.1
Grip-Last: fruit; last-id=7

For example, if the backend server received the above request, then the last-id of 7 would be used as the basis for determining the response content rather than the after query param.

Reliable HTTP long-polling

When creating a response hold, any channel to be subscribed must include a prev-id value:

HTTP/1.1 200 OK
Content-Type: text/plain
Grip-Hold: response
Grip-Channel: fruit; prev-id=3

{... timeout response ...}

Pushpin will enter a hold state, and may retry the request with the backend in order to repair the data stream under the following conditions:

If a published message’s prev-id matches the last known prev-id of the subscription, then the last known prev-id of the subscription is set to the published message’s id and the message is delivered. If it does not match, then the message is dropped and the request is retried.

Multiple processes

Pushpin consists of six processes: mongrel2, zurl, m2adapter, pushpin-proxy, pushpin-handler, and pushpin (the “runner”). In a basic setup you don’t really need to think about this. Just run pushpin to start everything up, and terminate the process (or Ctrl-C) to shut everything down.

If you’d prefer to individually manage any of these processes yourself, then adjust the services field in pushpin.conf. You can even choose to not use the runner at all. In that case, Pushpin’s own processes can be launched as follows:

M2adapter process:

m2adapter --config=/path/to/m2adapter.conf

Proxy process:

pushpin-proxy --config=/path/to/pushpin.conf

Handler process:

pushpin-handler --config=/path/to/pushpin.conf