Below you will find pages that utilize the taxonomy term “lambda”
Posts
收集 AWS IoT data到 AWS kinesis 處理資料流
使用AWS IoT服務來開發大量的資料傳輸系統,主要是想彙整資料或是做數據分析,AWS同樣提供資料串流的分析服務 kinesis. kinesis服務裡面有三個類別,Data Streams, Data Firehose, Data Analytics. Data Streams: streams 這個服務其實就有點類似 Kafka. 在IoT mqtt的protocol下,publish/subscribe之間系統是不會保存任何傳遞的資料。所以一但需要做資料流分析時,需要將收到的資料存在Message Queue裡,kinesis streams就擔任這個初階的角色,stream可以搭配firhorsec,analytics,lambda…一起使用。
Data Firehose: firehose可以將stream的資料倒至其他AWS的服務,例如: S3, Redshift…,firehose服務裡提供transformation功能將資料整理成需要的格式。
Data Analytics: 利用收到的data stream,只要提供需要sql程式,設定好Source(data stream)與Destination,AWS就會提供即時分析服務。
IoT trigger Kinese stream
只要在IoT的Service上建立rule,只要推送資料到設定的topi就會將資料導入Kinese stream。 選定 Send messages to an Kinesis Stream 就可以在符合Rule時將資料導入data stream 選定預先創立好的stream,設定Partition key,還有執行角色就完成 Partition key說明
利用Partition key,IoT的資料可以藉由group的方式去將資料分配的儲存在stream的shards裡面,最容易的方式是用網頁上提供的function: newuuid(),去隨機產出亂數平均的儲存資料,或是利用IoT的資料裡面的Key直做group。 [{'key':'a1', 'value':1}, {'key':'a2', 'value':2}, {'key':'a1', 'value':3}] 如果Partition key填入${key},那麼第1,3比資料會被送到stream裡的同一個shard,2則被送到另一個shard。
trigger lambda處理收集好的batch資料
shard只由一個lambda服務 pull 多shard由多個lambda執行task, semphore問題
Posts
AWS IoT | 進階資訊分享
AWS IoT - 進階 IoT reserved topic 在 AWS IoT的資源上,有部份的 topic name是被保留。 $aws/events/presence/connected/# : 如果有任何的使用者連上 IoT 就會推播訊息到這個 Topic $aws/events/presence/disconnected/#: 如果有任何的使用者斷線,就會推播訊息
這個是當使用者 connected/disconnected 推播到 topic 裡的訊息範例 { "clientId": "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6", "timestamp": 1460065214626, "eventType": "connected", "sessionIdentifier": "00000000-0000-0000-0000-000000000000", "principalIdentifier": "000000000000/ABCDEFGHIJKLMNOPQRSTU:some-user/ABCDEFGHIJKLMNOPQRSTU:some-user" } $aws/events/subscriptions/subscribed/#: 如果有任何的使用者訂閱了任何的 topic就會推播訊息到這個topic $aws/events/subscriptions/unsubscribed/#: 如果有任何的使用者解訂閱了任何的 topic就會推播訊息
這個是 subscribed/unsubscribed 推播的訊息範例 { "clientId": "186b5", "timestamp": 1460065214626, "eventType": "subscribed" | "unsubscribed", "sessionIdentifier": "00000000-0000-0000-0000-000000000000", "principalIdentifier": "000000000000/ABCDEFGHIJKLMNOPQRSTU:some-user/ABCDEFGHIJKLMNOPQRSTU:some-user" "topics" : ["foo/bar","device/data","dog/cat"] } :::info
同時要聆聽相同 Topic 底下不同的子 Topic 可用 + 號串聯,例如: $aws/events/subscriptions/+/#: 可以同時聆聽到 connected, disconnected, subscribed, unsubscribed …的訊息 company/+/member: + 號可以是任意的字串,只要符合這個 Topic 的 name 的規則,都可以收到資訊 :::
Posts
Dev issue: Lambda reuse
當系統透過 APIGateway 呼叫 Lambda 執行時,我以為Lambda會全部重新執行打包好的程式。所以開發時,我們將 database connection 物件當成 Global 物件使用,認為每次重新呼叫 Lambda 都會全部重新生成物件。因為這個觀念而導致系統錯誤,當 Project 的物件被 require 後或變成 global 參數,再次執行 Lambda container 會出現 reuse 特性,將不會再被重新執行而生成新物件,以下是一個範例。 DB 的 client 為 global 物件
const pg = require('pg'); const client = new pg.Client('postgres://myrds:5432/dbname'); client.connect(); exports.handler = (event, context, cb) => { const {test_print} = require('./example_module'); test_print(); client.query('SELECT * FROM users WHERE ', (err, users) => { // Do stuff with users cb(null); // Finish the function cleanly }); }; 上面的範例,因為 client 為 global物件,所以如果在程式裡面有 disconnect client 的狀況下,再次執行 Lambda 就會出現錯誤,因為 client 物件沒被重新 connect。 Global 程式不會每次都被執行