# Pipelines Library

flow-kafka-pipelines (opens new window) is a Node.js library for building stream processing microservices. The library is built around the idea of a pipeline that consumes messages from a source and can produce messages to a sink. A simple domain-specific-language (DSL) is provided to build a pipeline by specifying a source, a sink, and intermediate processing steps such as map and filter operations.

# Installation

To install the library from NPM, issue the following command in your project directory:

npm install flow-kafka-pipelines

# Pipeline DSL

Pipeline Step Description
fromTopic Consume messages from a Kafka topic.
toTopic Produce messages to a Kafka topic.
fromReadable Consume messages from a generic NodeJS Readable stream.
toWritable Produce messages to a generic NodeJS Writable stream.
map Transform a message via a map function.
filter Filter messages using a predicate function.
aggregate Perform windowed aggregation.
pipe Transform a message via a generic Node.js Transform.

A pipeline example:

import * as kafkaPipelines from 'flow-kafka-pipelines';
const context = new kafkaPipelines.Context({...});

const pipeline = context
  .pipeline()
  .fromTopic({
    topic: 'topic1',
    groupId: 'group1',
    valueFormat: kafkaPipelines.ValueFormat.JSON
  })
  .filter(msg => msg.type === 'locationEvent')
  .map(msg => {
    msg.prop1 = msg.prop1 * 10;
    return msg;
  })
  .toTopic({
    topic: 'topic2',
    key: (msg: any) => msg.key,
    valueFormat: kafkaPipelines.ValueFormat.JSON
  });

pipeline.start().then(() => console.log('Pipeline started'));

# Pipeline Sources

Pipeline sources are provided to source messages from:

# Pipeline Sinks

Pipeline sinks are provided to sink messages to:

# Implementing Custom Pipeline Sources

Any generic Node.js Readable can be used as a pipeline source. To implement a custom source, implement a sub-class of stream.Readable (opens new window).