Flutter + AWS Realtime database (No Amplify): Part 2

Taehoon Kim
8 min readDec 29, 2020

--

Let’s continue from the previous guide.
So far, we’ve discussed how to implement web sockets using serverless and tested with Flutter web. Again, this guide works the same whether it's a web or a mobile application.

Here, we need to discuss “how to plan for the database”.
This is what I’m thinking,

  • The user enters the chat room with a user name.
    The user name can be anything because we’ll differentiate users by their IP Addresses.
  • DynamoDB table is updated with the user data.
  • When the user sends a message, MESSAGES data is updated.
  • When MESSAGES is updated, it triggers DynamoDB Stream.
  • Within the DynamoDB Stream, it searches logged in users and the new message is notified using the web sockets.

Simple, right? But would it work? Let’s find out.

One thing to mention before we begin. This guide will not cover everything we need in order to make real-time chat work.
This will cover the basic flow only. For small details, it’s your job to finish.

First, we need lambda to update the table.
We need two types of updates. USER data and MESSAGES data.
Let’s see the code first.

module.exports.createConnectedUser = async (event, context) => {
let response = {};
console.log('Received event: ', JSON.stringify(event, null, 2));

const {connectionId, ipAddress, connectedAt, userName} = event;
var params = {
TableName: process.env.TABLE_NAME,
Item: {
"collection": "CONNECTED",
"subCollection": "CONNECTED#USER#" + ipAddress,
"data": {
"connectionId": connectionId,
"ipAddress": ipAddress,
"connectedAt": connectedAt,
"userName": userName
}
}
}
try {
const data = await docClient.put(params).promise();
response = {
statusCode: 200,
body: data,
};
console.log("Added item:", JSON.stringify(data, null, 2));
} catch(err) {
response = {
statusCode: 400,
body: err,
};
console.error("Unable to create item. Error JSON:", JSON.stringify(err, null, 2));
}
return JSON.stringify(response);
}
module.exports.updateConnected = async (event, context, callback) => {
let response = {};
console.log('Received event: ', JSON.stringify(event, null, 2));

// destruct
const {connectionId, ipAddress, connectedAt, userName} = event;
let updateExpressionString =
"set "+
"#data.#userName = :userName, " +
"#data.#connectionId = :connectionId, " +
"#data.#ipAddress = :ipAddress, " +
"#data.#connectedAt = :connectedAt";
var params = {
TableName: process.env.TABLE_NAME,
Key:{
"collection": "CONNECTED",
"subCollection": "CONNECTED#USER#" + ipAddress,
},
UpdateExpression: updateExpressionString,
ExpressionAttributeNames:{
"#data": "data",
"#userName": "userName",
"#connectionId": "connectionId",
"#ipAddress": "ipAddress",
"#connectedAt": "connectedAt"
},
ExpressionAttributeValues:{
":userName": userName,
":connectionId": connectionId,
":ipAddress": ipAddress,
":connectedAt": connectedAt ? connectedAt : 0
},
ReturnValues:"UPDATED_NEW"
};
try {
const data = await docClient.update(params).promise();
response = {
statusCode: 200,
message: "Update completed",
body: data,
};
console.log("UpdateItem succeeded:", JSON.stringify(data, null, 2));
} catch(err) {
response = {
statusCode: 400,
message: "Update error",
body: err,
};
console.error("Unable to update item. Error JSON:", JSON.stringify(err, null, 2));
}
//callback(null, response);
return JSON.stringify(response);
};
module.exports.getConnected = async (event, context, callback) => { let response = {}; console.log('Received parameter in event: ', JSON.stringify(event, null, 2)); var params = {
TableName: process.env.TABLE_NAME,
FilterExpression: "#collectionName = :cname AND (attribute_exists(#data.#ipAddress) AND NOT #data.#ipAddress = :null)",
ExpressionAttributeNames:{
"#collectionName": "collection",
"#data": "data",
"#ipAddress": "ipAddress"
},
ExpressionAttributeValues: {
":cname": "CONNECTED",
":null": null
}
}
try {
const data = await docClient.scan(params).promise();

response = {
statusCode: 200,
message: "Scan completed",
body: data.Items,
};
console.log('Printing all scanned items....');
data.Items.forEach((el) => {
JSON.stringify(el, null, 2)
});
} catch(err) {
response = {
statusCode: 400,
message: "Scan error",
body: err,
};
console.error("Unable to get item. Error JSON:", JSON.stringify(err, null, 2));
}
//callback(null, response);
return JSON.stringify(response);
};

createConnectedUser creates a new USER data.
updateConnected updates existing USER data.
getConnected is used to get USER data when DynamoDB Stream kicks in.

Let’s go into a bit more detail.

When you take a look at createConnectedUser , it creates an item into the table. One thing to notice is how the item is named and categorized.
As you see, I just gave names as collection, subCollection and data.

These are nothing complicated. You may think of them as folders. So, data is in the folder subCollection which is in the folder collection.
For more complicated data relationships, you may need to refer to DynamoDB data modeling.

USER is differentiated by its IP Addresses. Therefore, USER data is formatted as CONNECTED#USER#<IP_ADDRESS> .
Later in updateConnected function, we may find the exact target user by CONNECTED#USER#<IP_ADDRESS> and update its value.

getConnected function uses FilterExpression and it searches whether collection name is CONNECTED and checks if ipAddress key exists in the data object.

Now we need functions to deal with MESSAGES items.
Let’s see the code first.

module.exports.createMessageUser = async (event, context) => {

let response = {};
console.log('Received event: ', JSON.stringify(event, null, 2));

// destruct
const {ipAddress, content, userName} = event;
// create uuid
const uuid = uuidv4();
var params = {
TableName: process.env.TABLE_NAME,
Item: {
"collection": "MESSAGES",
"subCollection": "MESSAGES#ID#" + uuid,
"data": {
"id": uuid,
"ipAddress": ipAddress,
"content": content,
"userName": userName,
"created": Date.now()
}
}
}
try {
const data = await docClient.put(params).promise();
response = {
statusCode: 200,
body: data,
};
console.log("Added item:", JSON.stringify(data, null, 2));
} catch(err) {
response = {
statusCode: 400,
body: err,
};
console.error("Unable to create item. Error JSON:", JSON.stringify(err, null, 2));
}
return JSON.stringify(response);
}
// This is called only once when user logs in.
module.exports.getMessage = async (event, context) => {
let response = {};
console.log('Received parameter in event: ', JSON.stringify(event, null, 2)); var params = {
TableName: process.env.TABLE_NAME,
FilterExpression: "#collectionName = :cname AND (attribute_exists(#data.#ipAddress) AND NOT #data.#ipAddress = :null)",
Limit: 40,
ExpressionAttributeNames:{
"#collectionName": "collection",
"#data": "data",
"#ipAddress": "ipAddress"
},
ExpressionAttributeValues: {
":cname": "MESSAGES",
":null": null
}
}
try {
const data = await docClient.scan(params).promise();

response = {
statusCode: 200,
message: "Scan completed",
body: data.Items,
};
console.log('Printing all scanned items....');
data.Items.forEach((el) => {
JSON.stringify(el, null, 2)
});
} catch(err) {
response = {
statusCode: 400,
message: "Scan error",
body: err,
};
console.error("Unable to get item. Error JSON:", JSON.stringify(err, null, 2));
}
return JSON.stringify(response);
};

Very similar to USER items.
All messages are differentiated by their unique ids created by the package uuid.

So we now have functions to created / update the DynamoDB table.
Then we need the DynamoDB stream function which is called every time the table has been updated.

module.exports.chatTableStreamHandler = async (event, context, callback) => {  let allListenedData = {};
event.Records.forEach(function(record) {
console.log(record.eventID);
console.log(record.eventName);
console.log('DynamoDB Record: %j', record.dynamodb);
// Handle MESSAGES collection only
if(record.eventName === "INSERT" && record.dynamodb.StreamViewType === "NEW_IMAGE" && record.dynamodb.Keys['collection']['S'] === 'MESSAGES') {

// Collect data
allListenedData = {
"created": record.dynamodb.NewImage['data']['M']['created']['N'],
"ipAddress": record.dynamodb.NewImage['data']['M']['ipAddress']['S'],
"id": record.dynamodb.NewImage['data']['M']['id']['S'],
"userName": record.dynamodb.NewImage['data']['M']['userName']['S'],
"content": record.dynamodb.NewImage['data']['M']['content']['S'],
};

}
});
console.log('Picked Data: ', allListenedData); // Get connected users
var lambda = new AWS.Lambda({
region: process.env.REGION_NAME
});
const lambdaParams = {
FunctionName: process.env.SERVICE_NAME + '-' + process.env.STAGE_NAME + '-getConnected'
};
try {
const res = await lambda.invoke(lambdaParams).promise();

console.log('Get all connected user Lambda called: ', JSON.stringify(res, null, 2));
console.log('Res type is ' + typeof res.Payload);
if (res.StatusCode === 200) {
let parsedData = JSON.parse(res.Payload);
let parsedAgainData = JSON.parse(parsedData);
let connectedUserList = parsedAgainData.body ? [...parsedAgainData.body] : [];
console.log('Connected List: ', connectedUserList);
// create task
let taskContainer = [];
function tempLambdaFunction(targetUserData) {
return new Promise(function async (resolve, reject){

let payload = {
"requestContext": {
"stage": "dev",
"domainName": "us2q8s4g99.execute-api.us-east-1.amazonaws.com", // Use Websocket URL
"connectionId": targetUserData['connectionId']
},
"body": {...allListenedData}
}
let lambda2Params = {
FunctionName: process.env.SERVICE_NAME + '-' + process.env.STAGE_NAME + '-webSocketMessageHandler',
Payload: JSON.stringify(payload, null, 2)
}
console.log('Executing for target: ', lambda2Params);
lambda.invoke(lambda2Params).promise().then(() => {
resolve();
}).catch((e) => {
reject(e);
});
});
}
for(let i = 0; i < connectedUserList.length; i++) {
const targetUserData = connectedUserList[i]['data'];
taskContainer.push(tempLambdaFunction(targetUserData));
}
Promise.all(taskContainer).then(() => {
callback(null, 'Done!');
}).catch(e => {
console.log('Error Promising ALL' + JSON.stringify(e));
callback(null, 'Error!');
})
}
} catch(e) {
console.log('Error Calling Lambda: ', JSON.stringify(e, null, 2));
callback(null, 'Error!');
}
};

Let’s see what’s happening here.

Whenever the DynamoDB table is updated, this function is called with the event. The event contains what items have updated the table.
So we loop the event records and find the right item we are looking for.
We are looking for an item that had been INSERTED, NEW_IMAGE and the collection name to be MESSAGE.

Once we have the list, it’s a list of new messages.
Then we get the connected user data using getConnected lambda function.

How do we know the name of the lambda function?
It’s <SERVICE_NAME>-<STAGE_NAME>-<FUNCTION_NAME> now. This can be changed by AWS so please check if not working.

In the DynamoDB Stream function, it calls webSocketMessageHandler function to all users. It’s the web socket that notifies everyone with the new message.

Let’s see the code.

module.exports.webSocketMessageHandler = async (event, context) => {
console.log(JSON.stringify(event, null, 2));
console.log('Type is ' + typeof event);

let parsedEvent = event; //JSON.parse(event);
let parsedBody = event.body;

let connectionId = parsedEvent.requestContext.connectionId;
const endpoint = parsedEvent.requestContext.domainName + "/" + parsedEvent.requestContext.stage;

console.log(`[WebSocket Message Handler] Connection Id: ${connectionId} and end point: ${endpoint}`);
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: "2018-11-29",
endpoint: endpoint
});
const params = {
ConnectionId: connectionId,
Data: JSON.stringify({fromLogin: false, connectionID: connectionId, messageData: parsedBody}),
};
return apigwManagementApi.postToConnection(params).promise();
};

As we concluded in part 1, it’s possible to notify users if the endpoint and ConnectionId is correct.

Now we have all functions set.
As the final step, we need to modify the serverless file to contain all these.

service: flutter-aws-serverless
frameworkVersion: '2'
provider:
name: aws
runtime: nodejs12.x
region: us-east-1
stage: dev
profile: taehoon-flutter-study
environment:
TABLE_NAME: flutter-chat-tbl
SERVICE_NAME: ${self:service}
STAGE_NAME: ${self:provider.stage}
REGION_NAME: ${self:provider.region}
iamRoleStatements:
- Effect: "Allow"
Action:
- dynamodb:*
Resource: "arn:aws:dynamodb:${self:provider.region}:*:*"
- Effect: "Allow"
Action:
- lambda:*
Resource: "arn:aws:lambda:${self:provider.region}:*:*"
- Effect: Allow
Action:
#- "execute-api:ManageConnections"
- "execute-api:*"
Resource: "arn:aws:execute-api:*:*:**/@connections/*"
functions:
# Web sockets handlers
connectionHandler:
handler: webSocketHandler.connectionHandler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
defaultHandler:
handler: webSocketHandler.defaultHandler
events:
- websocket: $default
webSocketMessageHandler:
handler: webSocketHandler.webSocketMessageHandler
events:
- websocket:
route: chatMessages
# DynamoDB Stream Handler
chatTableStreamHandler:
handler: tableStreamHandler.chatTableStreamHandler
events:
- stream:
type: dynamodb
batchSize: 1
startingPosition: LATEST
arn:
Fn::GetAtt: [flutterChatTable, StreamArn]
# CONNECTED collection CRUD
createConnected:
handler: connectedHandler.createConnected
events:
- http:
path: ${self:service}/connected/createConnected
method: get
createConnectedUser:
handler: connectedHandler.createConnectedUser
events:
- http:
path: ${self:service}/connected/createConnectedUser
method: post
cors: true
integration: lambda

getConnected:
handler: connectedHandler.getConnected
events:
- http:
path: ${self:service}/connected/getConnected
method: get
integration: lambda

getConnectedUser:
handler: connectedHandler.getConnectedUser
events:
- http:
path: ${self:service}/connected/getConnectedUser
method: get
integration: lambda

updateConnected:
handler: connectedHandler.updateConnected
events:
- http:
path: ${self:service}/connected/updateConnected
method: post
cors: true
integration: lambda

# MSG Collection CRUD
createMessage:
handler: messageHandler.createMessage
events:
- http:
path: ${self:service}/messages/createMessage
method: get
createMessageUser:
handler: messageHandler.createMessageUser
events:
- http:
path: ${self:service}/messages/createMessageUser
method: post
cors: true
integration: lambda

getMessage:
handler: messageHandler.getMessage
events:
- http:
path: ${self:service}/messages/getMessage
method: get
integration: lambda

updateMessage:
handler: messageHandler.updateMessage
events:
- http:
path: ${self:service}/messages/updateMessage
method: post
resources:
Resources:
flutterChatTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: flutter-chat-tbl
AttributeDefinitions:
- AttributeName: collection
AttributeType: S
- AttributeName: subCollection
AttributeType: S
KeySchema:
- AttributeName: collection
KeyType: HASH # Partition Key
- AttributeName: subCollection
KeyType: RANGE # Sort key
ProvisionedThroughput:
ReadCapacityUnits: 10
WriteCapacityUnits: 10
StreamSpecification:
StreamViewType: NEW_IMAGE

chatTableStreamHandler make a connection for the DynamoDB Stream. Therefore, when flutterChatTable has a new update, chatTableStreamHandler kicks in.

Also, don’t forget to specify StreamSpecification under flutterChatTable.

Once it's done, please deploy.

sls deploy

In the very next part, we’ll finish it up with building UI’s using Flutter Web and test if it works.

The result might look like this,

For more info, please refer to the git here:

--

--

No responses yet