Wow! I am now working at, just got married, and a few other things. Looks like I need to blog more.


At BradsDeals, we looked for a way to streamline out event process.

Currently the process looks something like this:

[Web App] → [log.tgz files] → [S3] → (*) [Redshift]

After in Redshift:

[Redshift] (*) → [Postgres Data Mart(s)]

There are some architectural concerns here, but what I seek to eliminate: - use of cron to poll for new files - use of logs as intermediary between redshift - use of redshift as a staging for the data marts

I would like to use a pipeline

[producer] → [pipe] → [consumer]


tl;dr: peep the code gist

We decided to leverage AWS Kinesis for the pipe, and the producer and consumer had to be that Go. The pipeline will look like this:

[Web App] → [Kinesis] → [Workers]

Once we get the data to the workers we can write that the data anywhere:

[Workers] → [Postgres Data Mart(s)]

[Workers] → [S3 log files ]

[Workers] → [Redshift]

[Workers] → […other]

What I like about this is that Kinesis can scale vertically to terabytes of data per hour. The Web App and the Workers can scale horizontally. If you need more power than that well… thats why this post is called “Getting started…”

Amazon bills Kinesis as “a fully managed service for real-time processing of streaming data at massive scale.”

Create the Amazon Kinesis Stream

Read the basic instructions for creating a data stream or just go here.


  1. Click “Create Stream”
  2. Give the stream a name
  3. Set shards* to 2
  4. Click “Create”

*A stream is composed of shards. A single shard allows for 2MB/1MB read/write capacity, respectively. It also caps the read/write transactions at 5/1000, respectively.

You should see your new stream in the table, along with a status of ACTIVE once AWS is done building it.

Note: the code samples actually create and destroy the stream so you don’t need to create the stream, by hand, to run the code

The Codez

We are using Crowdmob’s Goamz library

Now Kinesis is a byte-stream so we need to use Go’s gob package to encode and decode our struct.

In this example, I have a Car struct.

Note: I didn’t refactor this, so it is not unpolished.

# Dont forget your AWS credentials 
package main 

import (


	aws ""
	kinesis ""

type Car struct {
	Id   int
	Make string

var (
	maxCars     = 100
	timeOut     = 5
	recordLimit = 100

This should be fairly straight forward. We of important all of the packages required.

We create a Car struct

We add some variables at are demo-centric: - the number of cars I am creating - time out for the main thread to wait for consumers to finish - max number of records to grab

func main() {

	shards     := 5
	streamName := "test"

	ksis := createStream(streamName, shards)
	defer deleteStream(ksis, streamName)

	streamDescription := waitForActive(ksis, streamName)

	putRecords(ksis, streamName, buildCars())

	for _, shard := range streamDescription.Shards {
		go getRecords(ksis, streamName, shard.ShardId)

	<-time.After(time.Duration(timeOut) * time.Second)

We call a function to attempt to create a new Kinesis stream called “test” with five (5) shards.

We defer a call to delete the stream.

We call a function to put records into the stream, this method calls buildCards to generate a slice of cars

We iterate through the shards of the stream and fire of goroutines to get the records

So that the main thread doesn’t exit, we wait for a few seconds. I could have used sleep but whats the fun of that?

func createStream(streamName string, shardCount int) *kinesis.Kinesis {
	region := aws.Regions[strings.ToLower(os.Getenv("AWS_REGION_NAME"))]

	auth, err := aws.EnvAuth()
	if err != nil {

	ksis := kinesis.New(auth, region)

	if err = ksis.CreateStream(streamName, shardCount); err != nil {
		fmt.Printf("CreateStream ERROR: %v\n", err)

	return ksis

func deleteStream(ksis *kinesis.Kinesis, streamName string) {
	if err := ksis.DeleteStream(streamName); err != nil {
		fmt.Printf("DeleteStream ERROR: %v\n", err)

We grab the region for the stream from the environment variables. Here we use the EnvAuth method to grab the authentication credentials from the environment vars.

We build the Kinesis object and the attempt to create a stream of a size shardCount

The deleteStream method is straight forward. Send a message to delete the stream or die trying

func waitForActive(ksis *kinesis.Kinesis, streamName string) *kinesis.StreamDescription {
	streamDescription := &kinesis.StreamDescription{}

	timeout := make(chan bool, 30)

	for {

		streamDescription, _ = ksis.DescribeStream(streamName)

		if streamDescription.StreamStatus == "ACTIVE" {
		} else {
			fmt.Printf("Stream be '%s'\n", streamDescription.StreamStatus)
			time.Sleep(4 * time.Second)
			timeout <- true

	return streamDescription

We get a description of the stream and wait 120 seconds for it to become active, otherwise we panic. PANIC!!

func publish(ksis *kinesis.Kinesis, streamName string, collection []*Car) error {
	var partitionKey string
	var buffer bytes.Buffer

	enc := gob.NewEncoder(&buffer)

	for i, item := range collection {
		enc = gob.NewEncoder(&buffer)
		if err := enc.Encode(item); err != nil {

		partitionKey = fmt.Sprintf("partitionKey-%d", i)
		_, err := ksis.PutRecord(streamName, partitionKey, buffer.Bytes(), "", "")

		if err != nil {
			return err

		fmt.Printf("publish: %v\n", item)

	return nil

Pretty ugly ain’t it?

We encode the records of the collection use the Gob encode and just them into the stream as bytes

We reset the buffer each iteration so that we don’t resend what was previously sent.

func getRecords(ksis *kinesis.Kinesis, streamName, shardId string) {

	shardIteratorRes, _ := ksis.GetShardIterator(shardId, streamName, "TRIM_HORIZON", "")
	shardIterator := shardIteratorRes.ShardIterator

	for {
		records, err := ksis.GetRecords(shardIterator, recordLimit)

		if len(records.Records) > 0 {
			for _, record := range records.Records {
				car := decodeCar(record.Data)
				fmt.Printf("getRecords [%s]: %s\n", shardId, car.String())
		} else if records.NextShardIterator == "" || shardIterator == records.NextShardIterator || err != nil {
			fmt.Printf("GetRecords ERROR: %v\n", err)
		shardIterator = records.NextShardIterator


func buildCars() (cars []*Car) {
	for i := 0; i < maxCars; i++ {
		cars = append(cars, &Car{i, fmt.Sprintf("Honda %d", i)})
	return cars

Create a slice of pointers to cars, Hondas. Because Hondas are pretty sweet cars.

func decodeCar(data []byte) (car Car) {
	dec := gob.NewDecoder(bytes.NewBuffer(data))
	return car

Decode a car from the byte stream. We use named return values so that we don’t need no stinkin variable declaration. We don’t use naked returns were I come from, either.

func (c *Car) String() string {
	return fmt.Sprintf("%s [# %d]", c.Make, c.Id)

Print out the car description in a specific format


There you have it, a basic pipeline for passing arbitrary data over Kinesis using Go.

blog comments powered by Disqus


25 September 2014