Edge-Cloud systems are layered and heterogeneous.
This infrastructures provide opportunities but also challenges.
Cloud: high scalability and power but high latency in comms
Fog: hierarchical architecture near the edge devices
Edge: low latency in comms but limited computational power
Several approaches have been proposed in literature to manage this complexity.
Pulverization breaks down the system into smaller computational units that are continuously executed across the available hosts.
A logical device and one of its neighbours
Example of peer-to-peer deployment.
Example of edge-cloud deployment.
This approach promotes the deployment independence of the system by separating functional aspects from deployment aspects.
Main features:
The framework is written in Kotlin Multiplatform supporting:
JVM, Android, JS, iOS, Linux, macOS, and Windows.
ComponentRef
The reference to a component abstracting from its physical place.
Some communication optimizations can be performed by the framework.
Communicator
Represents the communication between components abstracting from the specific protocol.
In-Memory, RabbitMQ and MQTT communicators
One person looks around and chooses an object to be the mystery object, then gives the other players suggestions to find the object.
The game is realized in a modern version using a smartphone device.
The smartphone is used to sense the signal strength of the object via Bluetooth.
On the smartphone UI, the user can see the distance from the object and the distance of the teammates.
The goal is to find the object.
We define the structure of each device that took part into the system, by providing the system’s capability.
The interface Capability
represent the capability that a component requires to be executed.
object EmbeddedDevice : Capability
object HighCpu : Capability
val systemConfig = pulverizationSystem {
device("wearable") {
Behaviour and Communication deployableOn setOf(EmbeddedDevice, HighCpu)
Sensors and Actuators deployableOn setOf(EmbeddedDevice)
}
}
In this configuration the Behaviour
and Communication
components can be deployed either on an embedded device and a server.
Sensors
and Actuators
can be deployed only on an embedded device.
The Behaviour
component represents the logic of the device.
It can be seen as a function with a dependency to the other four components.
class WearableBehaviour : Behaviour<Unit, DistanceFromSource, SignalStrengthValue, WearableDisplayInfo, Unit> {
override val context: Context by inject()
private val filter = Filter<Int>(WINDOW)
override fun invoke(
state: Unit,
export: List<DistanceFromSource>,
sensedValues: SignalStrengthValue,
): BehaviourOutput<Unit, DistanceFromSource, WearableDisplayInfo, Unit> {
filter.register(sensedValues.rssi)
val filteredRssi = filter.get().toInt()
val distance = 10.0.pow((RSSI_ONE_METER - filteredRssi) / SIGNAL_PATH_LOSS)
val neighbourDistance = export.associate { it.deviceId to it.distance }
val displayInfo = WearableDisplayInfo(neighbourDistance, distance)
return BehaviourOutput(Unit, DistanceFromSource(context.deviceID, distance), displayInfo, Unit)
}
}
The Communication
component is used to communicate with other instances of logical devices in the system.
send()
and receive()
are the only methods to be implemented for this component.
@Serializable
data class DistanceFromSource(val deviceId: String, val distance: Double)
class WearableComm : Communication<DistanceFromSource> {
override val context: Context by inject()
private val mqttClient = MqttClient(...)
private val commTopic = "communication"
private val defaultQoS = 2
override fun receive(): Flow<DistanceFromSource> = callbackFlow<DistanceFromSource> {
val callback = object : MqttCallback { ... }
mqttCliet.setCallback(callback)
}.filterNot { it.deviceId == context.deviceID }
override suspend fun send(payload: DistanceFromSource) {
mqttClient.publish(commTopic, Json.encodeToString(payload).encodeToByteArray(), defaultQoS, false)
}
}
The implementation of this component is “application-dependent” based on the communication strategy to adopt.
The definition of sensors requires two concepts: Sensor
and SensorsContainer
.
The former represents the the actual sensor; the latter aggregate all the sensors belonging to the device.
@Serializable
data class SignalStrengthValue(val rssi: Int)
class WearableSensor(private val context: AndroidContext) : Sensor<SignalStrengthValue> {
private var rssi = 0
override suspend initialize() {
// Setup Android BT
// Update the variable `rssi` via callback
}
override suspend fun sense(): SignalStrengthValue = SignalStrengthValue(rssi)
}
class WearableSensorsContainer(private val aContext: AndroidContext) : SensorsContainer() {
override val context: Context by inject()
override suspend fun initialize() {
this += WearableSensor(aContext).apply { initialize() }
}
}
Similarly, for the actuators we have Actuator
and ActuatorsContainer
.
class WearableActuator(private val display: DisplayViewModel) : Actuator<WearableDisplayInfo> {
override suspend fun actuate(payload: WearableDisplayInfo) {
Log.i("WearableActuator", "Actuate: $payload")
display.update(payload)
}
}
class WearableActuatorsContainer(private val display: DisplayViewModel) : ActuatorsContainer() {
override val context: Context by inject()
override suspend fun initialize() {
this += WearableActuator(display)
}
}
We define the available hosts in the system, by providing the system’s capability.
object Smartphone : Host {
override val hostname = "android"
override val capabilities = setOf(EmbeddedDevice)
}
object Laptop : Host {
override val hostname = "laptop"
override val capabilities = setOf(HighCpu)
}
We define a ReconfigurationEvent
to reconfigure the system when the battery falls below 20%.
Then, we register all the implemented components and their startup hosts
Finally, we specify the new configuration associated to the reconfiguration event.
class LowBatteryEvent() : ReconfigurationEvent<Double>(), Initializable {
override val events: Flow<Double> = generateSequence(FULL) { it - DISCHARGE }.asFlow().onEach { delay(DELAY) }
override val predicate: (Double) -> Boolean = { it < THRESHOLD }
}
val lowBatteryEvent = LowBatteryEvent().apply { initialize() }
val runtimeConfig = pulverizationRuntime(config, "wearable", infrastructure) {
WearableBehaviour() withLogic ::wearableBehaviourLogic startsOn Smartphone
WearableComm() withLogic ::wearableCommLogic startsOn Smartphone
WearableSensorsContainer(context) withLogic ::wearableSensorsLogic startsOn Smartphone
WearableActuatorsContainer(display) withLogic ::wearableActuatorsLogic startsOn Smartphone
reconfigurationRules {
onDevice {
lowBatteryEvent reconfigures { Behaviour movesTo Laptop }
}
}
withCommunicator { MqttCommunicator(hostname = "192.168.8.1", port = 1883) }
withReconfigurator { MqttReconfigurator(hostname = "192.168.8.1", port = 1883) }
withRemotePlaceProvider { defaultMqttRemotePlace() }
}
After the generation of the runtime configuration, it will be used by the runtime to execute the deployment unit.
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
// Other initializations
val platform = PulverizationRuntime(deviceId, "android", runtimeConfig)
platform.start()
}
suspend fun main() {
// Other initializations
val platform = PulverizationRuntime(deviceId, "laptop", runtimeConfig)
platform.start()
}
Stabilize the framework’s API
Support global reconfiguration rules
Support openness (new host added at runtime)
Other newtwork protocols (ZeroMQ, socket, …)
Improve error handling and failure recovery