Flutter + AWS Realtime database (No Amplify): Part 2
- Flutter + AWS Realtime database (No Amplify): Part 1
- Flutter + AWS Realtime database (No Amplify): Part 2
- Flutter + AWS Realtime database (No Amplify): Part 3
Let’s continue from the previous guide.
So far, we’ve discussed how to implement web sockets using serverless and tested with Flutter web. Again, this guide works the same whether it's a web or a mobile application.
Here, we need to discuss “how to plan for the database”.
This is what I’m thinking,
- The user enters the chat room with a user name.
The user name can be anything because we’ll differentiate users by their IP Addresses. - DynamoDB table is updated with the user data.
- When the user sends a message, MESSAGES data is updated.
- When MESSAGES is updated, it triggers DynamoDB Stream.
- Within the DynamoDB Stream, it searches logged in users and the new message is notified using the web sockets.
Simple, right? But would it work? Let’s find out.
One thing to mention before we begin. This guide will not cover everything we need in order to make real-time chat work.
This will cover the basic flow only. For small details, it’s your job to finish.
First, we need lambda to update the table.
We need two types of updates. USER data and MESSAGES data.
Let’s see the code first.
module.exports.createConnectedUser = async (event, context) => {
let response = {};
console.log('Received event: ', JSON.stringify(event, null, 2));
const {connectionId, ipAddress, connectedAt, userName} = event; var params = {
TableName: process.env.TABLE_NAME,
Item: {
"collection": "CONNECTED",
"subCollection": "CONNECTED#USER#" + ipAddress,
"data": {
"connectionId": connectionId,
"ipAddress": ipAddress,
"connectedAt": connectedAt,
"userName": userName
}
}
} try {
const data = await docClient.put(params).promise();
response = {
statusCode: 200,
body: data,
};
console.log("Added item:", JSON.stringify(data, null, 2));
} catch(err) {
response = {
statusCode: 400,
body: err,
};
console.error("Unable to create item. Error JSON:", JSON.stringify(err, null, 2));
} return JSON.stringify(response);
}module.exports.updateConnected = async (event, context, callback) => {
let response = {};
console.log('Received event: ', JSON.stringify(event, null, 2));
// destruct
const {connectionId, ipAddress, connectedAt, userName} = event; let updateExpressionString =
"set "+
"#data.#userName = :userName, " +
"#data.#connectionId = :connectionId, " +
"#data.#ipAddress = :ipAddress, " +
"#data.#connectedAt = :connectedAt"; var params = {
TableName: process.env.TABLE_NAME,
Key:{
"collection": "CONNECTED",
"subCollection": "CONNECTED#USER#" + ipAddress,
},
UpdateExpression: updateExpressionString,
ExpressionAttributeNames:{
"#data": "data",
"#userName": "userName",
"#connectionId": "connectionId",
"#ipAddress": "ipAddress",
"#connectedAt": "connectedAt"
},
ExpressionAttributeValues:{
":userName": userName,
":connectionId": connectionId,
":ipAddress": ipAddress,
":connectedAt": connectedAt ? connectedAt : 0
},
ReturnValues:"UPDATED_NEW"
}; try {
const data = await docClient.update(params).promise();
response = {
statusCode: 200,
message: "Update completed",
body: data,
};
console.log("UpdateItem succeeded:", JSON.stringify(data, null, 2));
} catch(err) {
response = {
statusCode: 400,
message: "Update error",
body: err,
};
console.error("Unable to update item. Error JSON:", JSON.stringify(err, null, 2));
} //callback(null, response);
return JSON.stringify(response);
};module.exports.getConnected = async (event, context, callback) => { let response = {}; console.log('Received parameter in event: ', JSON.stringify(event, null, 2)); var params = {
TableName: process.env.TABLE_NAME,
FilterExpression: "#collectionName = :cname AND (attribute_exists(#data.#ipAddress) AND NOT #data.#ipAddress = :null)",
ExpressionAttributeNames:{
"#collectionName": "collection",
"#data": "data",
"#ipAddress": "ipAddress"
},
ExpressionAttributeValues: {
":cname": "CONNECTED",
":null": null
}
} try {
const data = await docClient.scan(params).promise();
response = {
statusCode: 200,
message: "Scan completed",
body: data.Items,
};
console.log('Printing all scanned items....');
data.Items.forEach((el) => {
JSON.stringify(el, null, 2)
});
} catch(err) {
response = {
statusCode: 400,
message: "Scan error",
body: err,
};
console.error("Unable to get item. Error JSON:", JSON.stringify(err, null, 2));
} //callback(null, response);
return JSON.stringify(response);
};
createConnectedUser
creates a new USER data.updateConnected
updates existing USER data.getConnected
is used to get USER data when DynamoDB Stream kicks in.
Let’s go into a bit more detail.
When you take a look at createConnectedUser
, it creates an item into the table. One thing to notice is how the item is named and categorized.
As you see, I just gave names as collection
, subCollection
and data
.
These are nothing complicated. You may think of them as folders. So, data
is in the folder subCollection
which is in the folder collection
.
For more complicated data relationships, you may need to refer to DynamoDB data modeling.
USER is differentiated by its IP Addresses. Therefore, USER data is formatted as CONNECTED#USER#<IP_ADDRESS>
.
Later in updateConnected
function, we may find the exact target user by CONNECTED#USER#<IP_ADDRESS>
and update its value.
getConnected
function uses FilterExpression
and it searches whether collection
name is CONNECTED
and checks if ipAddress
key exists in the data
object.
Now we need functions to deal with MESSAGES items.
Let’s see the code first.
module.exports.createMessageUser = async (event, context) => {
let response = {};
console.log('Received event: ', JSON.stringify(event, null, 2));
// destruct
const {ipAddress, content, userName} = event; // create uuid
const uuid = uuidv4(); var params = {
TableName: process.env.TABLE_NAME,
Item: {
"collection": "MESSAGES",
"subCollection": "MESSAGES#ID#" + uuid,
"data": {
"id": uuid,
"ipAddress": ipAddress,
"content": content,
"userName": userName,
"created": Date.now()
}
}
} try {
const data = await docClient.put(params).promise();
response = {
statusCode: 200,
body: data,
};
console.log("Added item:", JSON.stringify(data, null, 2));
} catch(err) {
response = {
statusCode: 400,
body: err,
};
console.error("Unable to create item. Error JSON:", JSON.stringify(err, null, 2));
} return JSON.stringify(response);
}// This is called only once when user logs in.
module.exports.getMessage = async (event, context) => {
let response = {}; console.log('Received parameter in event: ', JSON.stringify(event, null, 2)); var params = {
TableName: process.env.TABLE_NAME,
FilterExpression: "#collectionName = :cname AND (attribute_exists(#data.#ipAddress) AND NOT #data.#ipAddress = :null)",
Limit: 40,
ExpressionAttributeNames:{
"#collectionName": "collection",
"#data": "data",
"#ipAddress": "ipAddress"
},
ExpressionAttributeValues: {
":cname": "MESSAGES",
":null": null
}
} try {
const data = await docClient.scan(params).promise();
response = {
statusCode: 200,
message: "Scan completed",
body: data.Items,
};
console.log('Printing all scanned items....');
data.Items.forEach((el) => {
JSON.stringify(el, null, 2)
});
} catch(err) {
response = {
statusCode: 400,
message: "Scan error",
body: err,
};
console.error("Unable to get item. Error JSON:", JSON.stringify(err, null, 2));
} return JSON.stringify(response);
};
Very similar to USER items.
All messages are differentiated by their unique ids created by the package uuid
.
So we now have functions to created / update the DynamoDB table.
Then we need the DynamoDB stream function which is called every time the table has been updated.
module.exports.chatTableStreamHandler = async (event, context, callback) => { let allListenedData = {};
event.Records.forEach(function(record) {
console.log(record.eventID);
console.log(record.eventName);
console.log('DynamoDB Record: %j', record.dynamodb);
// Handle MESSAGES collection only
if(record.eventName === "INSERT" && record.dynamodb.StreamViewType === "NEW_IMAGE" && record.dynamodb.Keys['collection']['S'] === 'MESSAGES') {
// Collect data
allListenedData = {
"created": record.dynamodb.NewImage['data']['M']['created']['N'],
"ipAddress": record.dynamodb.NewImage['data']['M']['ipAddress']['S'],
"id": record.dynamodb.NewImage['data']['M']['id']['S'],
"userName": record.dynamodb.NewImage['data']['M']['userName']['S'],
"content": record.dynamodb.NewImage['data']['M']['content']['S'],
};
}
}); console.log('Picked Data: ', allListenedData); // Get connected users
var lambda = new AWS.Lambda({
region: process.env.REGION_NAME
}); const lambdaParams = {
FunctionName: process.env.SERVICE_NAME + '-' + process.env.STAGE_NAME + '-getConnected'
}; try {
const res = await lambda.invoke(lambdaParams).promise();
console.log('Get all connected user Lambda called: ', JSON.stringify(res, null, 2));
console.log('Res type is ' + typeof res.Payload); if (res.StatusCode === 200) {
let parsedData = JSON.parse(res.Payload);
let parsedAgainData = JSON.parse(parsedData);
let connectedUserList = parsedAgainData.body ? [...parsedAgainData.body] : [];
console.log('Connected List: ', connectedUserList); // create task
let taskContainer = [];
function tempLambdaFunction(targetUserData) {
return new Promise(function async (resolve, reject){
let payload = {
"requestContext": {
"stage": "dev",
"domainName": "us2q8s4g99.execute-api.us-east-1.amazonaws.com", // Use Websocket URL
"connectionId": targetUserData['connectionId']
},
"body": {...allListenedData}
} let lambda2Params = {
FunctionName: process.env.SERVICE_NAME + '-' + process.env.STAGE_NAME + '-webSocketMessageHandler',
Payload: JSON.stringify(payload, null, 2)
}
console.log('Executing for target: ', lambda2Params);
lambda.invoke(lambda2Params).promise().then(() => {
resolve();
}).catch((e) => {
reject(e);
});
});
} for(let i = 0; i < connectedUserList.length; i++) {
const targetUserData = connectedUserList[i]['data'];
taskContainer.push(tempLambdaFunction(targetUserData));
} Promise.all(taskContainer).then(() => {
callback(null, 'Done!');
}).catch(e => {
console.log('Error Promising ALL' + JSON.stringify(e));
callback(null, 'Error!');
})
}
} catch(e) {
console.log('Error Calling Lambda: ', JSON.stringify(e, null, 2));
callback(null, 'Error!');
}};
Let’s see what’s happening here.
Whenever the DynamoDB table is updated, this function is called with the event
. The event
contains what items have updated the table.
So we loop the event records and find the right item we are looking for.
We are looking for an item that had been INSERTED
, NEW_IMAGE
and the collection
name to be MESSAGE
.
Once we have the list, it’s a list of new messages.
Then we get the connected user data using getConnected
lambda function.
How do we know the name of the lambda function?
It’s <SERVICE_NAME>-<STAGE_NAME>-<FUNCTION_NAME>
now. This can be changed by AWS so please check if not working.
In the DynamoDB Stream function, it calls webSocketMessageHandler
function to all users. It’s the web socket that notifies everyone with the new message.
Let’s see the code.
module.exports.webSocketMessageHandler = async (event, context) => {
console.log(JSON.stringify(event, null, 2));
console.log('Type is ' + typeof event);
let parsedEvent = event; //JSON.parse(event);
let parsedBody = event.body;
let connectionId = parsedEvent.requestContext.connectionId;
const endpoint = parsedEvent.requestContext.domainName + "/" + parsedEvent.requestContext.stage;
console.log(`[WebSocket Message Handler] Connection Id: ${connectionId} and end point: ${endpoint}`);
const apigwManagementApi = new AWS.ApiGatewayManagementApi({
apiVersion: "2018-11-29",
endpoint: endpoint
});
const params = {
ConnectionId: connectionId,
Data: JSON.stringify({fromLogin: false, connectionID: connectionId, messageData: parsedBody}),
};
return apigwManagementApi.postToConnection(params).promise();};
As we concluded in part 1, it’s possible to notify users if the endpoint
and ConnectionId
is correct.
Now we have all functions set.
As the final step, we need to modify the serverless file to contain all these.
service: flutter-aws-serverless
frameworkVersion: '2'provider:
name: aws
runtime: nodejs12.x
region: us-east-1
stage: dev
profile: taehoon-flutter-study
environment:
TABLE_NAME: flutter-chat-tbl
SERVICE_NAME: ${self:service}
STAGE_NAME: ${self:provider.stage}
REGION_NAME: ${self:provider.region}
iamRoleStatements:
- Effect: "Allow"
Action:
- dynamodb:*
Resource: "arn:aws:dynamodb:${self:provider.region}:*:*"
- Effect: "Allow"
Action:
- lambda:*
Resource: "arn:aws:lambda:${self:provider.region}:*:*"
- Effect: Allow
Action:
#- "execute-api:ManageConnections"
- "execute-api:*"
Resource: "arn:aws:execute-api:*:*:**/@connections/*"functions:
# Web sockets handlers
connectionHandler:
handler: webSocketHandler.connectionHandler
events:
- websocket:
route: $connect
- websocket:
route: $disconnect
defaultHandler:
handler: webSocketHandler.defaultHandler
events:
- websocket: $default
webSocketMessageHandler:
handler: webSocketHandler.webSocketMessageHandler
events:
- websocket:
route: chatMessages# DynamoDB Stream Handler
chatTableStreamHandler:
handler: tableStreamHandler.chatTableStreamHandler
events:
- stream:
type: dynamodb
batchSize: 1
startingPosition: LATEST
arn:
Fn::GetAtt: [flutterChatTable, StreamArn]# CONNECTED collection CRUD
createConnected:
handler: connectedHandler.createConnected
events:
- http:
path: ${self:service}/connected/createConnected
method: get
createConnectedUser:
handler: connectedHandler.createConnectedUser
events:
- http:
path: ${self:service}/connected/createConnectedUser
method: post
cors: true
integration: lambda
getConnected:
handler: connectedHandler.getConnected
events:
- http:
path: ${self:service}/connected/getConnected
method: get
integration: lambda
getConnectedUser:
handler: connectedHandler.getConnectedUser
events:
- http:
path: ${self:service}/connected/getConnectedUser
method: get
integration: lambda
updateConnected:
handler: connectedHandler.updateConnected
events:
- http:
path: ${self:service}/connected/updateConnected
method: post
cors: true
integration: lambda
# MSG Collection CRUD
createMessage:
handler: messageHandler.createMessage
events:
- http:
path: ${self:service}/messages/createMessage
method: get
createMessageUser:
handler: messageHandler.createMessageUser
events:
- http:
path: ${self:service}/messages/createMessageUser
method: post
cors: true
integration: lambda
getMessage:
handler: messageHandler.getMessage
events:
- http:
path: ${self:service}/messages/getMessage
method: get
integration: lambda
updateMessage:
handler: messageHandler.updateMessage
events:
- http:
path: ${self:service}/messages/updateMessage
method: postresources:
Resources:
flutterChatTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: flutter-chat-tbl
AttributeDefinitions:
- AttributeName: collection
AttributeType: S
- AttributeName: subCollection
AttributeType: S
KeySchema:
- AttributeName: collection
KeyType: HASH # Partition Key
- AttributeName: subCollection
KeyType: RANGE # Sort key
ProvisionedThroughput:
ReadCapacityUnits: 10
WriteCapacityUnits: 10
StreamSpecification:
StreamViewType: NEW_IMAGE
chatTableStreamHandler
make a connection for the DynamoDB Stream. Therefore, when flutterChatTable
has a new update, chatTableStreamHandler
kicks in.
Also, don’t forget to specify StreamSpecification
under flutterChatTable
.
Once it's done, please deploy.
sls deploy
In the very next part, we’ll finish it up with building UI’s using Flutter Web and test if it works.
The result might look like this,
For more info, please refer to the git here: