Blog

Price Checker - a FRAAS and AWS application

Thu, 05/25/2017 - 23:54

In this article we describe Price Checker - application that uses FRAAS and AWS platforms to help Ryanair users fly for cheapest prices. 

Price Checker uses FRAAS API to check prices for specific routes in specific time frames. Documentation on how to call the API can be found here. In this article we will be talking about following tehnologies:

  • FRAAS API Gateway
  • AWS API Gateway
  • AWS Lambda functions
  • AWS SNS Topics
  • AWS DynamoDB
  • AWS Cloudformation
  • AWS SES
  • AWS S3 buckets
  1. FRAAS API Gateway

FRAAS API Gateway gives to users a number of Ryanair's services. Go here to get full documentation and here to see some helpful guides. Endpoints we can use allow users to search for the flights, check cheapest prices, see flight schedule, airports etc. Aplication Price Checker uses one endpoint from farefinder api - getCheapest flights method from oneWayFares controller. The endpoint allows users to check what are the cheapest prices for defined date range and route. 

Endpoint's resource URL:

http://apigateway.ryanair.com/pub/v1/farefinder/3/oneWayFares

Parameters:

  • departureAirportIataCode
  • arrivalAirportIataCode
  • outboudDepartureDateFrom
  • outboudDepartureDateTo
  • currency
  • apikey - it's not endpoint specific, however still it's required in the call. More about it can be found here

Let's create some sample curl command:

curl -X GET "http://apigateway.ryanair.com/pub/v1/farefinder/3/oneWayFares?departureAirportIataCode=WRO&arrivalAirportIataCode=DUB&outboundDepartureDateFrom=2017-04-20&outboundDepartureDateTo=2017-05-20&currency=EUR&apikey={apikey}"

The command requests for flights from Wrocław (IATA code is WRO) to Dublin (IATA is DUB) between 20th the April and 20th the May. Parameter currency=EUR was added to display prices in Euros. From the response we can see that:

{
  "total": 1,
  "fares": [
    {
      "outbound": {
        "departureAirport": {
          "iataCode": "WRO",
          "name": "Wroclaw",
          "seoName": "wroclaw",
          "countryName": "Poland"
        },
        "arrivalAirport": {
          "iataCode": "DUB",
          "name": "Dublin",
          "seoName": "dublin",
          "countryName": "Ireland"
        },
        "departureDate": "2017-05-19T22:20:00",
        "arrivalDate": "2017-05-19T23:59:00",
        "price": {
          "value": 50.89,
          "valueMainUnit": "50",
          "valueFractionalUnit": "89",
          "currencyCode": "EUR",
          "currencySymbol": "€"
        }
      },
      "summary": {
        "price": {
          "value": 50.89,
          "valueMainUnit": "50",
          "valueFractionalUnit": "89",
          "currencyCode": "EUR",
          "currencySymbol": "€"
        },
        "newRoute": false
      }
    }
  ],
  "arrivalAirportCategories": null,
  "size": 1
}

Cheapest flight for the route is on 19th of May and it costs at the time 50.89€. With a usage of this endpoint Price Checker will periodically check flights's prices and notify users with the cheapest price.

  1. Functionality

At first let's have a look what we are building from business point of view. The application has 3 core functionalities:

  • As a user I want to be able to register in the application
  • As a user I want to add route details I'm interested in
  • As a user I want to be notified when my route's price is lower than submitted value

Price Checker doesn't have any UI. The communication is resolved via RESTful services. Endpoints were made available to the world using AWS API Gateway, which is on the top of the compute services. Following methods are available to use:

  • POST /auth/signUp - method for registration
  • POST /auth/signIn - method for logging user in
  • GET /auth/userProfile - method for getting user's profile 
  • GET /pricewatch - method for retrieving all user's pricewatch items. Pricewatch is an object describing route details with price. More about what exactly pricewatch item is, is described in model section
  • POST /pricewatch - method to add new pricewatch entity
  1. Architecture

Price Checker's arichtecture is presented on diagram below:

 

The architecture can be divided into two logical parts:

  • User management
  • Back-end services

Both parts use same compute technology - AWS Lambda functions. From AWS documentation we can find out that: "Lambda function is a compute service that lets you run code without provisioning or managing servers". Basically you worry about the code and AWS manages the rest - it's super easy and intuitive. You pay only for copute time, first 1M requests and 400,000 GB-seconds of compute time are monthly free. However before you start coding you need to make a couple of configuration decisions:

  • Runtime - probably the biggest decision to make. From a number of available programming languages you need to pick one that you feel most comfortable with. Price Checker back-end services use Node.js 4.3 (and all code snippets will be in this runtime)
  • Handler - this is path to the name of the main method in your code, e.g. handler name for method 'signUp' in file sign-up which is in directory Authentication is 'Authentication/sign-up.signUp'
  • Role - name of the role that Lambda function will run with. The role needs to be created in IAM and needs to have appropriate permissions, like e.g. read database permission
  • other - settings like Memory size, timeout etc. Those were defaulted in the application

The handler method needs to be exported and take 3 arguments - event, context and callback function

exports.signUp = function(event, context, callback) {
}

where in event we get input data to the function and callback is a function that should be used in order to return value or rise an error (this is a standard convention in Node.js programming)

User management

This part is used directly by AWS API Gateway and it responds for user requests. The requests are received by API Gateway. It takes the payload and headers, modifies them if needed (more about it in API Gateway chapter) and passes it to the Lambda function in event method parameter. There is one lambda funciton for every API method that handles the request, e.g.:

  • POST /auth/signIn - function name: signIn
  • POST /auth/signUp - function name: singUp
  • GET /auth/userProfile - function name: getUserProfile 
  • GET /pricewatch - function name: getPricewatchItems
  • POST /pricewatch - funciton name: addPricewatchItem

Back-end services

This part contains the logic of the application. What it does is prestented on the flow below:

 

ReadPricewatchList reads all records from DynamoDB table PricewatchList and pushes them to the SNS queue PricewatchEntry (1 message per record). An entry in PricewatchEntry triggers function PriceGet with the message from the queue. PriceGet function gets lowest price for PricewatchItem from FRAAS API Gateway. The lowest price for the route is pushed to LowestPriceSNS.

 

An entry in LowestPriceSNS topic triggers LowestPriceHandler with lowest price value for the route object. Responsibility of the function is to get all subscribers for the route and send notification via SES to subscribers.

 

  1. AWS DynamoDB

Now let's talk a little bit about DynamoDB. This is an AWS NoSQL database. It's all stored and managed on AWS cloud and there is very little configuration you need to do before start persisting data. You will be asked for following properties when creating a table:

  • Table name - no suprises here
  • Primary key - unique identifier of an item
  • Provisioned capacity - read and write capacity units - those values depend on your application. The bigger value you set the more you pay to AWS

There is no need of specifying a model. Once your object has correct primary key you can save any JSON object you want. However when creating a primary key things get a little complicated. Mainly because there are two possible types of primary key in DynamoDB:

  • partition key - a simple primary key, composed of one attribute known as partition key. DynamoDB uses the partition key's value as input to internal hash function. The output of the function determines the partition in which the item will be stored. In this case the partition key must be unique
  • partition key and sort key - composite primary key - this primary key is composed of two attributes. First one is partition key, and the second is the sort key. In this type of primary key, the partition key's value is also an input to the internal hash function, and its ouput determines the partition in which the item will be stored. All items with the same partition key are stored together, in sorted order determined by sort key value. In this example pair partition key and sort key has to be unique.

Creating primary key is crucial moment for modelling a table because DynamoDB tables can only be queried by primary key. You can read more about DynamoDB on AWS docs page.

  1. Model

Here all model objects used by Price Checker will be described. 

PricewatchList

This table has only one property: routeId. However one routeId contains information about 4 properties: source airport IATA code, destination airport IATA code, dates from and to. All those properties are composed in to one string (they are divided by $), sample routeId is:

WRO$DUB$2017-10-06$2017-11-11

For above example route is from Wrocław to Dublin, between 2017-10-06 and 2017-11-11. As it is a DynamoDB table, schema is presented in DynamoDB format (please notice that primary key consists only from partition key):

{
     "AttributeDefinitions": [
         {
             "AttributeName": "routeId", 
             "AttributeType": "S"
         }
     ],
     "KeySchema": [
         {
             "KeyType": "HASH", 
             "AttributeName": "routeId"
         }
     ]
}

This object is used by two Lambda fucntions - AddPricewatchItem and ReadPricewatchList.

PricewatchSubscribers

This table contains subscribers for notifications about low prices for a route. Its primary key is composed of routeId and lowPriceAlert. Please have a look at the model:

{
   "AttributeDefinitions": [
            {
                "AttributeName": "lowPriceAlert", 
                "AttributeType": "N"
            }, 
            {
                "AttributeName": "routeId", 
                "AttributeType": "S"
            }
        ],
        "KeySchema": [
            {
                "KeyType": "HASH", 
                "AttributeName": "routeId"
            }, 
            {
                "KeyType": "RANGE", 
                "AttributeName": "lowPriceAlert"
            }
        ],
}

Range key attribute is needed to query the table for subscribers, because there is a possibility that two users are interested in the same route but their notification threshold (lowPriceAlert) is different. 

To explain it better let's assume following - for the same routeId there are subscribers that want to be notified when routeId's prcice goes below value 100‎€, for the same routeId other subscirbers want to be notified when route's price is below 80‎€ and other subscribers with this value set to 50‎€. In a case when route's price is 60 we want to notify those subscribers with lowPriceAlert greater than 60‎€, which is 80and 100. In order to query for those subscribers the tool needs to look for subscribers with routeId and lowPriceAlert > new price.

Additionally to the primary key, the Price Checker saves subscribers list. Please have a loot at sample full table item:

{
  "lowPriceAlert": {
    "N": "80"
  },
  "routeId": {
    "S": "CIA$DUB$2017-07-10$2017-07-14"
  },
  "subscribers": {
    "L": [
      {
        "M": {
          "notificationId": {
            "S": "smith@ryanair.com"
          },
          "notificationType": {
            "S": "EMAIL"
          }
        }
      }
    ]
  }
}

Basically it means that for the routeId and lowPriceAlert another property "subscribers" is stored. Its type is list (L). This is a list of objects with two properties:

  • notificationId - in current implementation an email address
  • notificationType - in current implementtion only EMAIL notificationType is supported

This part of the application can be extended in next releases to store push notifications or twitter subscriptions.

User

In this table all user data is stored. It can be divided into following groups:

  • username - primary key of the table. Contains user's username (email)
  • authentication - an object which in current implementation contains only password field. The password is not hashed. There are few possible improvements, like hashing password or adding new factor of authentication
  • pricewatchItems - a list of objects which stores all user's pricewatchItems
  • userProfile - an object with user first name, last name and email

 Please have a look at DB's schema:

{
 "AttributeDefinitions": [
   {
    "AttributeName": "username", 
    "AttributeType": "S"
   }
 ], 
 "KeySchema": [
   {
    "KeyType": "HASH", 
    "AttributeName": "username"
   }
 ], 
}

Full Users item:

{
  "authentication": {
    "M": {
      "password": {
        "S": "password"
      }
    }
  },
  "pricewatchItems": {
    "L": [
      {
        "M": {
          "dateFrom": {
            "S": "2017-07-10"
          },
          "dateTo": {
            "S": "2017-07-14"
          },
          "destination": {
            "S": "DUB"
          },
          "lowPriceAlert": {
            "N": "80"
          },
          "source": {
            "S": "CIA"
          }
        }
      }
    ]
  },
  "username": {
    "S": "smith@ryanair.com"
  },
  "userProfile": {
    "M": {
      "email": {
        "S": "smith@ryanair.com"
      },
      "firstName": {
        "S": "John"
      },
      "lastName": {
        "S": "Smith"
      }
    }
  }
}
  1. Lambda functions

All Price Checker's back-end services were implemented in Lambda functions. The application has 7 of them:

  • SignUp - registers users (creates Users's item)
  • SignIn - logs user in 
  • GetUserProfile - returns userProfile from User table for logged in user
  • AddNewPricewatch - adds pricewatchItem for logged in user, subscribers the user for route's notifications
  • GetMyPricewatchItems - returns all user's pricewatchItems
  • ReadPricewatchList - reads all items from table PricewatchList and pushes them to SNS queue (1 message per record)
  • GetPrices - for given routeId checks its lowest price and pushes the result to SNS queue
  • HandleLowPriceAlert - for given routeId and new price value loads all subscribers and sends them email notification

Before describing those functions let's have a look at helper modules. There are three of them:

  • Dynamo operations
  • SNS operations
  • Email operations

Dynamo operations

This module contains very basic DynamoDB operations - put, scan, query, update and delete - also known as CRUD. As you will notice all DynamoDB's operations (they all come from AWS.DynamoDB.DocumentClient object) are very simple when you look at functions's design, e.g.:

  • scan(params, callback) - function to scan table - returns all items in callback function
  • put(params, callback) - function to put an object to a table
  • delete(params, callback) - function to delete

To extract as much code as possible and at the same time to stay generic some of the functions builds up params inside function body (like scanTable) and other takes parameters from calling function (building up inside function's body and staying generic wouldn't be possible in some cases).

First let's have a look at scanTable function:

var scanTable = function (tableName) {
    return new Promise(function (resolve, reject) {
        var docClient = new AWS.DynamoDB.DocumentClient();
        var params = {
            TableName: tableName
        };

        docClient.scan(params, function (err, data) {
            if (err) {
                console.error(err);
                return reject(err);
            } else {
                return resolve(data.Items);
            }
        });
    });
};

From the code we can see that function returns a promise - this is a standard JS concept. Params variable is created inside the function and it needs to have only one property - TableName. The name of the table is passed by calling function. The rest is just calling AWS's built in function and returning list of items.

Put function - very similar function to previous one, but takes additional argument - item which needs to be persisted. 

var put = function (tableName, item) {
    return new Promise(function (resolve, reject) {
        var docClient = new AWS.DynamoDB.DocumentClient();
        var params = {
            TableName: tableName,
            Item: item
        };
        docClient.put(params, function (err) {
            if (err) {
                console.error('ERR', err);
                return reject(err);
            }
            console.log('Item successfully put');
            return resolve();
        });
    });
};

Functions query, updateDocument and deleteItem just extracts some of the boiler plate code, but don't do anything specific. Just takes parameters, calls proper function and in case of an error logs an error, and in case of success returns correct results. Let's see the code:

var query = function (params) {
    return new Promise(function (resolve, reject) {
        var docClient = new AWS.DynamoDB.DocumentClient();
        docClient.query(params, function (err, data) {
            if (err) {
                console.error(err);
                return reject(err);
            } else {
                return resolve(data.Items);
            }
        });
    });
};
var updateDocument = function (params) {
    return new Promise(function (resolve, reject) {
        var docClient = new AWS.DynamoDB.DocumentClient();
        docClient.update(params, function (err, data) {
            if (err) {
                console.error(err);
                return reject(err);
            } else {
                return resolve(data);
            }
        });
    });
};
var deleteItem = function (params) {
    return new Promise(function (resolve, reject) {
        var docClient = new AWS.DynamoDB.DocumentClient();
        docClient.delete(params, function(err, item) {
            if(err) {
                console.error(err);
                return reject(err);
            } else {
                return resolve(item);
            }
        });
    });
};

AWS docs with examples can be found here

Let's see some sample parameters for updating an item: 

var params = {
    TableName:table,
    Key:{
        "year": year,
        "title": title
    },
    UpdateExpression: "set info.rating = :r, info.plot=:p, info.actors=:a",
    ExpressionAttributeValues:{
        ":r":5.5,
        ":p":"Everything happens all at once.",
        ":a":["Larry", "Moe", "Curly"]
    },
    ReturnValues:"UPDATED_NEW"
};
  • TableName - name of the table
  • Key - primary key
  • Update expression - this is a place whre you actually specify what you are updating
  • ExpressionAttributeValues - you specify values you want to set
  • ReturnValues - you can return values from update command - it can be whole new object, only updated properties, whole old object etc.

Pushing to SNS:

Responsibility of this module is to extract pushing an item to the SNS queue. Let's see the code:

var subject = process.env.SNS_TOPIC;
var snsArn = process.env.SNS_ARN;

var publish = function (item) {
    return new Promise(function(resolve, reject) {
        var message = JSON.stringify(item);
        var params = {
            Message: message,
            Subject: subject,
            TopicArn: snsArn
        };

        SNS.publish(params, function(err, response) {
            if(err) {
                console.error(err);
                return reject(err);
            } else {
                resolve('Published item:\n' + message);
                return resolve(response);
            }
        });
    });
};

To send a SNS message to the topic we need to call function publish(params, callback) from SNS object. Params need to have properties Message, Subject and TopicArn which are respectively stringified item we want to send, topic subject and topic Arn. Subject and Arn are environment variables in this example but they could be hardcoded as well. The rest is just as simple as query DynamoDB. We call publish function from SNS object and return the results. 

Sending email

Responsibility of this module is to extract code that sends an email. The function takes 4 parameters:

  • to - email address of addressee
  • subject - subject of an email
  • bodyHtml - email body in html
  • callback - callback function
var fromEmail = "pricecheckerservice@ryanair.com";
var sendEmail = function(to, subject, bodyHtml, callback) {
    var eparam = {
        Destination: {
            ToAddresses: [to]
        },
        Message: {
            Body: {
                Text: {
                    Data: bodyHtml
                }
            },
            Subject: {
                Data: subject
            }
        },
        Source: fromEmail,
        ReplyToAddresses: [fromEmail],
        ReturnPath: fromEmail
    };

    console.log("Gonna send following object:\n" + JSON.stringify(eparam));

    ses.sendEmail(eparam, function(err, data) {
        if(err) {
            console.log("Error sending emails. Error:\n");
            console.error(err);
        } else {
            console.log("Emails send successfully");
            callback(null, data);
        }
    });
}

To send an email using SES we just need to prepare parameters in correct format and call function sendEmail from SES object. There are lots of possible email parameters, they are all described in SES documentation which can be found here

SignUp

This is a function that is triggered by API Gateway. So there is a http request received by API Gateway. The API Gateway takes the request headers and payload and passes it to the function (in event object). Function has following steps:

  • Validate request payload
  • Sign user up - create user item in Users table (using helper module - dynamo-operations)
  • Return confirmation message to the API Gateway

SignUp:

exports.signUp = function(event, context, callback) {

    if(event.username == null || event.username == "" || 
        event.password == null || event.password == "") {
        callback(responses.badRequest('Username and password is required'), null);
    } else {
        var item = {
            username: event.username,
            authentication: {
                password: event.password
            },
            userProfile: {
                email: event.username,
                firstName: event.firstName,
                lastName: event.lastName
            },
            pricewatchItems: []
        };

        put(userTableName, item).then(function() {
            callback(null, responses.created('User has been successfully created'));
        }).catch(function(err) {
            if (err.code === 'ConditionalCheckFailedException') {
                callback(responses.badRequest('Given username already exists in database'), null);
            } else {
                callback(responses.internalServerError('Error occurred when writing to a database: '
                    + err.code), null);
            }
        });
    }
};

Functions sign-up, sign-in, get-my-pricewatch-items and get-user-profile - they all work very similar. They validate input parameters, make a call to DynamoDB using dynamo-operations module and return confirmation message to API Gateway. So as they are very similar to the signUp function they won't be described.

AddPricewatchItem

This function is triggered by API Gateway when request of adding new pricewatch item is received. Function's responsibility is to:

  • update user's item in Users table - new pricewatch item needs to be added to the list of pricewatchItems
  • add new item to PricewatchList table
  • subscribe user for notifications about the route's prices - add entry in PricewatchSubscribers table
exports.add = function(event, context, callback) {

//validate the input values
    if(event.sessionId == null || event.sessionId == "") {
        callback(responses.badRequest('SessionId (username) is required'), null);
    } else if(event.body.source == null ||
            event.body.destination == null ||
            event.body.dateFrom == null ||
            event.body.dateTo == null ||
            event.body.lowPriceAlert == null) {
        callback(responses.badRequest
            ('Post body should contain following fields: source, destination, dateFrom, dateTo, lowPriceAlert'), null);
    } else {
//input values are correct. now look for the user (user's email is sent as sessionId)
        var item = event.body;

        var params = {
            TableName: userTableName,
            KeyConditionExpression: "#username = :username",
            ExpressionAttributeNames: {
                "#username": "username"
            },
            ExpressionAttributeValues: {
                ":username": event.sessionId
            }
        };

        var routeId = item.source + '$' + item.destination + '$' +
            item.dateFrom + '$' + item.dateTo;
        db.query(params).then(function(items) {
            if (items.length === 0) {
//user wasn't found. return unaouthorized
                var errorMessage = 'Wrong session token';
                console.error(errorMessage);
                callback(responses.unauthorized(errorMessage), null);
            } else {
//user was found. now check if user hasn't added the pricewatch item before
                var newPriceWatch = {
                    source: item.source,
                    destination: item.destination,
                    dateFrom: item.dateFrom,
                    dateTo: item.dateTo,
                    lowPriceAlert: item.lowPriceAlert
                };

                var shouldAdd = true;
                for(var i = 0; i < items[0].pricewatchItems.length; i++) {
                    if(items[0].pricewatchItems[i].source == newPriceWatch.source &&
                        items[0].pricewatchItems[i].destination == newPriceWatch.destination &&
                        items[0].pricewatchItems[i].dateFrom == newPriceWatch.dateFrom &&
                        items[0].pricewatchItems[i].dateTo == newPriceWatch.dateTo &&
                        items[0].pricewatchItems[i].lowPriceAlert == newPriceWatch.lowPriceAlert) {
                            callback(responses.badRequest('Posted object already exists in DB'), null);
                            shouldAdd = false;
                            break;
                    }
                }
                if(shouldAdd) {
//user hasn't added the pricewatch item before. append new pricewatch item to the pricewatchItem list
                    var updateParams = {
                        TableName: userTableName,
                        Key: {
                            "username": items[0].username
                        },
                        UpdateExpression: "set pricewatchItems = list_append(pricewatchItems, :new)",
                        ExpressionAttributeValues: {
                            ":new": [newPriceWatch]
                        },
                        ReturnValues: "ALL_NEW"
                    };
//do the update
                    db.updateDocument(updateParams).then(function (item) {
//update was successfull. now get subscribers for the routeId and lowPriceAlert
                        console.log("Successfully added new pricewatch item to Users table:\n" + item);

                        var getSubscribers = {
                            TableName: pricewatchSubscribersTableName,
                            KeyConditionExpression: "routeId = :routeId and lowPriceAlert = :lowPriceAlert",
                            ExpressionAttributeValues: {
                                ":routeId": routeId,
                                ":lowPriceAlert": newPriceWatch.lowPriceAlert
                            }
                        };

                        return db.query(getSubscribers);
                    }).then(function (subscribers) {
                        if(subscribers.length === 0) {
//ther is no entry in PricewatchSubscriber with the routeId and the lowPriceAlert, create it.
                            var item = {
                                routeId: routeId,
                                lowPriceAlert: newPriceWatch.lowPriceAlert,
                                subscribers: [
                                    {
                                        notificationId: event.sessionId,
                                        notificationType: "EMAIL"
                                    }
                                ]
                            };

                            return db.put(pricewatchSubscribersTableName, item);
                        } else {
//the is an entry in PricewatchSubscribers with the routeId and the lowPriceAlert, add user to the subscribers list
                            var updateParams = {
                                TableName: pricewatchSubscribersTableName,
                                Key: {
                                    "routeId": routeId,
                                    "lowPriceAlert": newPriceWatch.lowPriceAlert
                                },
                                UpdateExpression: "set subscribers = list_append(subscribers, :new)",
                                ExpressionAttributeValues: {
                                    ":new": [{
                                        notificationId: event.sessionId,
                                        notificationType: "EMAIL"
                                    }]
                                },
                                ReturnValues: "ALL_NEW"
                            };

                            return db.updateDocument(updateParams);
                        }                        
                    }).then(function () {
//User has been subscribed, now add the routeId to table PricewachList
                        var pricewatchItem = {
                            routeId: routeId
                        };
                        return db.put(pricewatchListTableName, pricewatchItem);
                    }).then(function () {
                        var response = {
                            sessionId: event.sessionId,
                            response: responses.created('Successfully added new pricewatch item.')
                        };
                        callback(null, response);
                    });
                }
            }
        }).catch(function(err) {
            console.error(err);
            callback(responses.internalServerError('Unexpected error occurred'), null);
        });
    }
};

ReadPricewatchlist

This function is triggered by AWS once per day. Triggering by AWS can be done by navigating to function's details in Triggers tab and adding a trigger. Function's responsibility is to read all records from PricewatchList table (using scanTable from dynamo-operatios) and pushing in one-by-one to SNS queue (using publish from sns-operation). Let's have a look:

exports.priceSyncHandler = function(event, context, callback) {
    scanTable(pricewatchListTable).then(function(items) {
        var itemsLength = items.length;
        if(itemsLength == 0) {
            callback(null, "Success");
        }
        items.forEach(function (item) {
            publish(item).then(function() {
                if(--itemsLength == 0) {
                    callback(null, "Success");
                }
            }).catch(function(err) {
                console.error(err);
                callback(err, null);
            });
        });
    }).catch(function(err) {
        callback(err, null);
    });
};

CheckFlightPrices

This function is triggered by SNS entry being pushed to the queue. This function uses FRAAS API Gateway in order to check the cheapest flight price. Entry method of the function:

  • unstrigifies the routeId
  • splits it by '$' char - to have all 4 route properties which are required in next step
  • calls getFlightPrices function
  • pushes message to the queue with routeId and new cheapest flight price (using sns-operations)

Let's analyze the getFlightPrices:

var getFlightPrices = function (origin, destination, dateFrom, dateTo) {
    var serviceURL = process.env.SERVICE_URL;
    var apikey = process.env.API_KEY;
    var callUrl = serviceURL + "farefinder/3/oneWayFares?" +
        "departureAirportIataCode=" + origin +
        "&arrivalAirportIataCode=" + destination +
        "&outboundDepartureDateFrom=" + dateFrom +
        "&outboundDepartureDateTo=" + dateTo +
        "&currency=EUR" +
        "&apikey=" + apikey;

    return new Promise(function(resolve, reject) {
        http.get(callUrl, function (response) {
            console.log("Check prices response: " + response.statusCode);

            var bodyStream = "";
//response object emits two events. data and end
            response.on('data', function (data) {
//data event is raised when new chunk of data was received
                bodyStream += data;
            });
            response.on('end', function () {
//end event is raised when response was fully received
                if (response.statusCode !== 200) {
                    return reject('GET request failed:\n' + bodyStream);
                }

                var body = JSON.parse(bodyStream);
                if (body.total < 1) {
                    return resolve(null);
                }
                return resolve(body.fares[0].summary.price.value);
            });
        });
    });
};

The function at first takes serviceURL and apikey from environment variables (it needs to be FRAAS API Gateway URL and valid apikey of the application on Ryanair's devportal). In next couple of lines it builds up a URL which will be called (see chapter 1. FRAAS API Gateway for endpoint's details). In next step a http module from Node.js is used to make a get call to the API. If the response has a code of 200 OK and price was received the function returns it to calling method.

HandleLowPriceAlert

This function is triggered by SNS entry in LowestPriceSNS queue. The message that is received is routeId and current lowest price. Function's steps are:

  • Take routeId and it's new price
  • Query table PricewatchSubscribers - get all subscribers which lowPriceAlert is greater than new price
  • For each subscriber send an email (use email-operations helper)
  • For each subscriber object, update its lowPriceAlert to the new value so the email will be sent again when flight price will go down again
exports.handleLowPrice = function (event, context, callback) {
    var message = JSON.parse(event.Records[0].Sns.Message);
    console.log("Message received from SNS: " + JSON.stringify(message));
// get all PricewatchSubscribers for the routeId with lowPriceAlert >= new lowest route price
    var params = {
        TableName: pricewatchSubscribersTableName,
        KeyConditionExpression: "#routeId = :routeId and #lowPriceAlert >= :lowPriceAlert",
        ExpressionAttributeNames: {
            "#routeId": "routeId",
            "#lowPriceAlert": "lowPriceAlert"
        },
        ExpressionAttributeValues: {
            ":routeId": message.routeId,
            ":lowPriceAlert": message.price
        }
    };

    db.query(params).then(function (items) {
        var itemsLength = items.length;
        if(itemsLength === 0) {
            callback(null, 'no emails sent');
        }
        items.forEach(function (item) {
//for each PricewatchSubscriber item
            var subscribersCount = item.subscribers.length;
            item.subscribers.forEach(function (subscriber) {
//for each subscriber from the PricewatchSubscriber item
                var route = message.routeId.split('$');
                console.log("Sending price alert to : " + subscriber.notificationId +
                    ", found price: " + message.price + " is lower than the price added by subscriber which is: "
                    + item.lowPriceAlert + ". Gonna send it via: " + subscriber.notificationType);
//prepare an email for the user and the route
                var email = createLowPriceAlertEmail(route, message.price);
//send the email
                ses.sendEmail(subscriber.notificationId, email.subject, email.body, function () {
                    console.log("Email has been sent to: " + subscriber.notificationId);
                    if (--subscribersCount == 0 && --itemsLength == 0) {
//for all PricewatchSubscribers items for whom the emails were sent, update lowPriceAlert value to new lowest price
                        updateWithPrimaryKey(message.routeId, message.price).then(function() {
                            console.log('Sent all notifications...');
                            callback(null, 'Sent all notifications');
                        });
                    }
                });
            });
        });
    }).catch(function (err) {
        console.error('ERR', err);
        callback(err, null);
    });
};

Method createLowPriceAlertEmail has nothing fancy, just builds notification email with passed parameters (route, price etc.). Method updateWithPrimaryKey requires clarification. 

var updateWithPrimaryKey = function(routeId, lowPriceAlert) {
    var allWithSameRouteId =  {
        TableName: pricewatchSubscribersTableName,
        KeyConditionExpression: "#routeId = :routeId and #lowPriceAlert >= :lowPriceAlert",
        ExpressionAttributeNames: {
            "#routeId": "routeId",
            "#lowPriceAlert": "lowPriceAlert"
        },
        ExpressionAttributeValues: {
            ":routeId": routeId,
            ":lowPriceAlert": lowPriceAlert
        }
    };
    var allSubscribers = [];
    var deleteIt = {
        TableName: pricewatchSubscribersTableName
    };

    return new Promise(function(resolve, reject) {
        db.query(allWithSameRouteId).then(function(allItems) {
//get all PricewatchSubscribers items for whom the notification was sent
            var allItemsLength = allItems.length;
            allItems.forEach(function(item) {
//get all subscriber from every PricewatchSubscribers item and add it to allSubscribers list
                allSubscribers = allSubscribers.concat(item.subscribers);
                deleteIt.Key = {
                    'routeId': routeId,
                    'lowPriceAlert': item.lowPriceAlert
                };
                db.deleteItem(deleteIt).then(function() {
//delete the PricewatchSubscribers item with the routeId and old lowPriceAlert
                    if(--allItemsLength == 0) {
//now when PricewatchSubscribers items were deleted add new PricewatchSubscribers item - with the routeId, 
//new lowPriceAlert and allSubscribers
                        var newItem = {
                            routeId: routeId,
                            lowPriceAlert: parseFloat((lowPriceAlert - 0.01).toFixed(2)),
                            subscribers: allSubscribers
                        };
                        db.put(pricewatchSubscribersTableName, newItem).then(function() {
                            resolve();
                        }).catch(function(err) {
                            console.error(err);
                            reject(err);
                        });
                    }
                });
            });
        });
    });
};

 

  1. API Gateway

In this chapter API Gateway will be described which handles RESTful requests to the Price Checker application. Before implementation details will be described let's recall available API methods:

  • POST /auth/signUp - method for registration
  • POST /auth/signIn - method for logging user in
  • GET /auth/userProfile - method for getting user's profile 
  • GET /pricewatch - method for retrieving all user's pricewatch items. Pricewatch is an object describing route details with price. More about what exactly pricewatch item is, is described in model section
  • POST /pricewatch - method to add new pricewatch entity

As you can see there are three POST methods. Let's describe POST object's model. As the application accepts application/json content type, the model will be presented in JSON schema:

POST auth/signUp

{  
   "type":"object",
   "$schema":"http://json-schema.org/schema#",
   "required": ["username", "password"],
   "properties":{  
      "firstName":{  
         "type":"string"
      },
      "lastName":{  
         "type":"string"
      },
      "password":{  
         "type":"string"
      },
      "username":{  
         "type":"string",
         "description": "Username must be in email format"
      }
   }
}

POST /pricewatch

This is an object that represents route details and price:

{
  "type": "object",
  "$schema": "http://json-schema.org/schema#",
  "required": ["source", "destination", "dateFrom", "dateTo", "lowPriceAlert"],
  "properties": {
    "source": {
      "type": "string",
      "description": "IATA code of source airport"
    }
    "destination": {
      "type": "string",
      "description": "IATA code of destination airport"
    }
    "dateFrom": {
      "type": "string",
      "description": "Correct string format is yyyy-MM-dd. Price Checker looks for flight from this date"
    }
    "dateTo": {
      "type": "string",
      "description": "Correct string format is yyyy-MM-dd. Price Checker looks for flight to this date"
    }
    "lowPriceAlert": {
      "type": "number",
      "description": "Price Checker will notify user about new low price when flight's price goes below lowPriceAlert"
    }
  }
}

POST auth/signIn

{  
   "type":"object",
   "$schema":"http://json-schema.org/schema#",
   "required": ["username", "password"],
   "properties":{  
      "password":{  
         "type":"string"
      },
      "username":{  
         "type":"string"
      }
   }
}

Authentication

Authentication in Price Checker wasn't actually implemented. X-Session-Token property is retured as the response when user logs in. Its value is just user's username (while it should be some kind of generated session token). In current implementation to authenticate calls to Price Checker a X-Session-Token headers is required - its values should be user's username. 

 

Resolving one API Gateway call consists of 5 smaller steps. Let's present the flow for POST pricewatch method:

Let's describe all steps:

  • Method Request - here you specify method authorization settings and parameters it can receive 
  • Integration Request - this is a place where you specify integration type. Price Checker uses Lamda type in order to pass the payload to Lambda function. In this step payload can be changed. Price's cechecrs uses body mapping for content type application/json and makes following transformation:
    {
      "sessionId": "$input.params('X-Session-Token')",
      "body": $input.body
    }

Which means that to 'event' object in Lambda function will contain two properties: sessionId with value from header X-Session-Token and body property with request body value.

  • Running Lambda function
  • Integration Response - this is mirror step to Integration Request. Response from Lambda function is get and it can be modified in this step before sending it back to the caller. Depending on what we get from the function we can return different responses. Responses can be defined based on Lambda function's returned value. The returned value is regex-tested and based on the result different integration response is picked to build http response. Each integration response defines response headers and body
  • Method Response - in this step you specify possible http status codes and headers
  1. Cloudformation

Now let's talk about how to create all described resources on AWS automatically. The service that should be used is Cloudformation. You create a template that describes all resources you want to create and AWS takes care of privisioning and configuring those resources for you. Please have a look at Cloudformation docs for more details. Those resources can all be defined in one or more files, it can be JSON formatted or YAML formatted. Price Checker's cloudformation scripts were divided into 4 tiers:

  • Data - DynamoDB resources
  • Messaging - SNS topics
  • Application-services - Lambda functions, roles and environment variables
  • Presentation-services - API Gateway

There are few sections in CloudFormation script:

  • AWSTemplateFormatVersion - only one value is correct at the time - '2010-09-09'
  • Parameters - for parameters specification - its values, types etc. 
  • Resources - for resources specification
  • Outputs - for exporting output values for later use

Now each Cloudformation section will be described as well as all types of resources created in Price Checker.

Parameters

Parameters:
  ENV:
    Description: Environment on which the tool will work
    Type: String
    Default: DEV
    AllowedValues:
      - DEV
      - TEST
      - PROD

This examples comes from Data cloudformation script. One parameter is specified - ENV. Its default value is DEV,  there are two more possible values: TEST and PROD and parameter value is String. Multiple parameters can be defined for each CloudFormation script.

Resources

A section where all application resources are specified. Very good documentation on CloudFormation resources can be found here.

AWS::DynamoDB::Table

Users:
    Type: 'AWS::DynamoDB::Table'
    Properties:
      TableName: !Sub
        - FR-PRICE_CHECKER-USERS-TBL-${Env}
        - { Env: !Ref ENV }
      AttributeDefinitions:
        - AttributeName: username
          AttributeType: S
      KeySchema:
        - AttributeName: username
          KeyType: HASH
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

Name of the resource is at root level of the resource. It can be used later in the script when there is a need of referencing it. The type of the resource in the example is AWS::DynamoDB::Table - this defines what we are creating here. Now let's see parameters:

  • TableName - a name of the table. It can be just a String value, e.g. TableName: Users, but here we are using !Sub function. It substitutes first string values from ${} with second string values. In this example we are substituting ${Env}. In second string we specify that Env is !Ref ENV. !Ref is a function and it gets reference value of the following resource. Function !Ref ENV is returning value of ENV parameter. Assuming that ENV parameter is set to DEV the whole second string then resolves to { Env: DEV }. So TableName resolves to: FR-PRICE_CHECKER-USERS-TBL-DEV. For each object type !Ref function can return different type of values. For parameters it's just a String, but for other it can be e.g. Arn of the resource - information about what is returned to !Ref function can be found AWS docs
  • AttributeDefinitions, KeySchema - this is just a DynamoDB table schema
  • ProvisionedThroughtput - read and write capacity units

AWS::SNS::Topic

  PricewatchEntrySNS:
    Type: 'AWS::SNS::Topic'
    Properties:
      DisplayName: !Sub
        - FR-PRICE_CHECKER-PRICEWATCH_ENTRY-${Env}
        - { Env: !Ref ENV }
      TopicName: !Sub
        - FR-PRICE_CHECKER-PRICEWATCH_ENTRY-${Env}
        - { Env: !Ref ENV }

In order to create a SNS topic we have to give the resource a type of AWS::SNS::Topic. Properties DisplanyName and TopicName can be the same and this is just a String value. In this example names are composed of a name and environment. 

AWS::Lambda::Function

Usually configuration of Lambda function requires some additional resources to be created. The reason behind it is that Lambda might need to have an access to some other resources (so for the function an IAM role needs to be configured) or it might have to be subscribed to SNS topic (in this case SNS subscription resource must be created) etc. Let's analyze sets that are used in Price Checker.

Lambda function with IAM role:

  ReadPricewatchListLambda:
    Type: 'AWS::Lambda::Function'
    DependsOn: LambdaExecutorRole
    Properties:
      Code:
        S3Bucket: !Ref S3BucketForLambdaCode
        S3Key: 'ReadPricewatchList.zip'
      Description: 'Reads all rows from PricewatchList table and pushes each row one-by-one to PricewatchEntry SNS topic'
      FunctionName: !Sub
        - FR-PRICE_CHECKER-readPricewatchList-${Env}
        - { Env: !Ref ENV }
      Handler: ReadPricewatchList/read-pricewatch-list.priceSyncHandler
      Role: !GetAtt LambdaExecutorRole.Arn
      Runtime: 'nodejs4.3'
      Environment:
        Variables:
          SNS_TOPIC: 
            'Fn::ImportValue': !Sub '${ParentStackMessagingName}-PricewatchEntryTopicName'
          SNS_ARN: 
            'Fn::ImportValue': !Sub '${ParentStackMessagingName}-PricewatchEntryTopicArn'   
          PRICEWATCH_LIST_TABLE: 
            'Fn::ImportValue': !Sub '${ParentStackDataName}-PricewatchListTableName'

Type of the resource must be AWS::Lambda::Function. Here in resource we have new property DependsOn. It means that resource ReadPricewatchListLambda will wait until resource with a name LambdaExecutorRole is created. This functionality resolves issues with dependencies between resources. Code property defines where the function code should be get from. Lambda code could be also inlined in the script. Things get interesting in Role property. In this case it must be role's Arn. In order to get it we have to call function !GetAtt and request for resources Arn. As shown on example we can also set environment variables which we do in order to set table name, and SNS topic values.

AWS::IAM::Role

  LambdaExecutorRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: !Sub
        - FR-PRICE_CHECKER-LAMBDA-EXECUTOR-${Env}
        - { Env: !Ref ENV }
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Action: 'sts:AssumeRole'
          Effect: 'Allow'
          Principal:
            Service: 'lambda.amazonaws.com'
  DBPOLICY:
    Type: 'AWS::IAM::ManagedPolicy'
    DependsOn: 
      - LambdaExecutorRole
    Properties:
      Description: 'Policy used for accessing PriceChecker dynamo db specific tables'
      Roles: 
      - !Sub
        - FR-PRICE_CHECKER-LAMBDA-EXECUTOR-${Env}
        - { Env: !Ref ENV }
      PolicyDocument:
        Version: '2012-10-17'        
        Statement:
          - Effect: 'Allow'
            Action: 'dynamodb:*'
            Resource:    
              - !Join [
                '',
                [
                  !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/',
                  'Fn::ImportValue': !Sub '${ParentStackDataName}-UsersTableName'
                ]
              ]
              - !Join [
                '',
                [
                  !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/',
                  'Fn::ImportValue': !Sub '${ParentStackDataName}-PricewatchListTableName'
                ]
              ] 
              - !Join [
                '',
                [
                  !Sub 'arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/',
                  'Fn::ImportValue': !Sub '${ParentStackDataName}-PricewatchSubscribersTableName'
                ]
              ]

First AWS::IAM::Role is created - the name and assumed role is added. In next resource a managed policy (AWS::IAM:ManagedPolicy) is created and attached to Role resource. In ManagedPolicy resource there is specification of which actions the role will allow to perform, in this case it allows action dynamodb:* to resources: UsersTableName, PricewatchListTableName and PricewatchSubscribersTableName. 

Lambda with subscription to SNS:

  GetPricesLambdaSNSSubscription:
    Type: 'AWS::SNS::Subscription'
    DependsOn: GetPricesLambda
    Properties:
      Endpoint: !GetAtt GetPricesLambda.Arn
      Protocol: lambda
      TopicArn: 
        'Fn::ImportValue': !Sub '${ParentStackMessagingName}-PricewatchEntryTopicArn'
  PricewatchEntrySNSLambdaInvokePermission:
    Type: 'AWS::Lambda::Permission'
    DependsOn: GetPricesLambda
    Properties:
      Action: 'lambda:InvokeFunction'
      Principal: 'sns.amazonaws.com'
      SourceArn: 
        'Fn::ImportValue': !Sub '${ParentStackMessagingName}-PricewatchEntryTopicArn'
      FunctionName: !GetAtt GetPricesLambda.Arn

GetPricesLambdaSNSSubscription we configure SNS subscription resource. In Endpoint property we have to specify Lambda's Arn and in TopicArn we specify topic's Arn. Command 'Fn::ImportValue': !Sub ${ParentStacjMessagingName}-PricewatchEntryTopicArtn' imports value that was exported from previous CloudFormation tier. Next a permission is added to GetPricewatchItemsLambda to be invoked by sns.amazonaws.com. We have to specify the actual resource arn which is done in SourceArn. 

Lambda permission to be invoked by API Gateway:

  RunGetUserProfileLambdaPermission:
    Type: 'AWS::Lambda::Permission'
    DependsOn: GetUserProfileLambda
    Properties:
      Action: 'lambda:invokeFunction'
      FunctionName: !GetAtt GetUserProfileLambda.Arn
      Principal: 'apigateway.amazonaws.com'

Last additional resource to Lambda function - invoke permission by amazon API Gateway.

AWS::ApiGateway::*

API method. To create a method we need to create:

  • AWS::ApiGateway::RestApi
  • AWS::ApiGateway::Resource on top of AWS::ApiGateway::RestApi resource
  • AWS::ApiGateway::Method on top of AWS::ApiGateway::Resource resource - each method must specify all 4 method stages which were described in API Gateway
  PriceCheckerApi:
    Type: AWS::ApiGateway::RestApi
    Properties:
      Name: !Sub
        - 'FR-PRICE_CHECKER-${Env}'
        - { Env: !Ref ENV }
      Description: 'Handles user management REST operations'
  PricewatchResource:
    Type: 'AWS::ApiGateway::Resource'
    DependsOn: PriceCheckerApi
    Properties:
      ParentId: !GetAtt PriceCheckerApi.RootResourceId
      PathPart: pricewatch
      RestApiId: 
        Ref: PriceCheckerApi
  GetPricewatchMethod:
    Type: 'AWS::ApiGateway::Method'
    DependsOn: 
      - PricewatchResource
      - PriceCheckerApi
    Properties:
      ApiKeyRequired: false
      AuthorizationType: NONE
      HttpMethod: GET
      ResourceId:
        Ref: PricewatchResource        
      RestApiId: 
        Ref: PriceCheckerApi      
      MethodResponses:
        - StatusCode: 200
          ResponseParameters:
            method.response.header.X-Session-Token: integration.response.body.sessionId
        - StatusCode: 400
        - StatusCode: 401
        - StatusCode: 500
      Integration:
        Type: AWS
        IntegrationHttpMethod: POST
        Uri: 
          !Join [
            '',
            [
              !Sub 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/',
              'Fn::ImportValue': !Sub '${ParentStackName}-GetPricewatchItemsLambdaArn',
              '/invocations'
            ]
          ]
        PassthroughBehavior: WHEN_NO_TEMPLATES
        IntegrationResponses:
        - StatusCode: 200
          ResponseParameters:
            method.response.header.X-Session-Token: integration.response.body.sessionId
          ResponseTemplates:
            application/json: "$input.path('$.pricewatchItems')"
        - StatusCode: 400
          SelectionPattern: ".*BadRequest.*"
          ResponseTemplates:
            application/json: "$input.path('$.errorMessage')"
        - StatusCode: 401
          SelectionPattern: ".*Unauthorized.*"
          ResponseTemplates:
            application/json: "$input.path('$.errorMessage')"
        - StatusCode: 500
          SelectionPattern: ".*InternalServerError.*"
          ResponseTemplates:
            application/json: "$input.path('$.errorMessage')"
        RequestTemplates:
          application/json: 
            !Sub |
              {
                "sessionId": "$input.params('X-Session-Token')"
              }
  1. Summary

The presented application Price Checker profits from using FRAAS API Gateway. It checks if Ryanair's fligts prices go down and if they are the app notifies users. Presented examples show how easy is to use Ryanair's APIs and its online documentation. Ryanair team has made very easy to understand and use their endpoint. Now building helpful tools for users has become very easy and a sky is the limit.

Price Checker was fully built on AWS platform. Services which were used like Lambdas, API Gateway, SNS were very easy to configure and developers could focus purely on code.


FRAAS up and running in less than 4 minutes

Thu, 05/25/2017 - 23:49

Price Checker - a FRAAS and AWS application

Thu, 05/25/2017 - 23:54

Travel Labs Spain

Thu, 01/04/2018 - 10:01