Skip to content

Commit be5cab4

Browse files
authored
lib shouldn't use Fn (#136)
1 parent 6decded commit be5cab4

110 files changed

Lines changed: 2922 additions & 3182 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.github/workflows/build_and_test.yml

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
1+
22
name: Build And Test
33

44
on: [pull_request]
@@ -8,12 +8,28 @@ jobs:
88
runs-on: ubuntu-latest
99
steps:
1010
- uses: actions/checkout@v1
11-
- uses: ./docker/gh-build
11+
12+
- name: Build
13+
uses: ./docker/gh-build
14+
env:
15+
DBX_TEST_CLIENT_ID: ${{ secrets.DBX_TEST_CLIENT_ID }}
16+
DBX_TEST_ACCESS_TOKEN: ${{ secrets.DBX_TEST_ACCESS_TOKEN }}
17+
with:
18+
args: ./gradlew clean build -x test -x jvmTest -x jsTest -x jsNodeTest -x jsBrowserTest --info --max-workers 1 --no-daemon
19+
20+
- name: JVM Tests
21+
uses: ./docker/gh-build
1222
env:
1323
DBX_TEST_CLIENT_ID: ${{ secrets.DBX_TEST_CLIENT_ID }}
1424
DBX_TEST_ACCESS_TOKEN: ${{ secrets.DBX_TEST_ACCESS_TOKEN }}
1525
with:
16-
args: ./gradlew build --info --max-workers 1 --no-daemon
26+
args: ./gradlew test jvmTest --info --max-workers 1 --no-daemon
27+
28+
- name: JS (Browser/Node) Tests
29+
uses: ./docker/gh-build
30+
with:
31+
args: ./gradlew jsBrowserTest jsNodeTest --info --max-workers 1 --no-daemon
32+
1733
# Upload HTML test reports
1834
- name: Upload Test Reports
1935
uses: actions/upload-artifact@v4

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ ds
99
logs/
1010
.kotlin/
1111
kotlin-js-store/
12+
.output.txt
13+
debug.log

.release/migration-off-fn.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
* Migrated the `lib` module away from the custom `Fn` class to standard Kotlin lambdas.
2+
* Introduced `ExecutionScope` to provide contextual parameters to lambdas during execution, especially in distributed environments.
3+
* Added custom serializers in the `exe` module for all migrated components to maintain backward compatibility with existing distributed execution infrastructure.
4+
* Migrated major components including `map`, `merge`, `window`, `resample`, `flatten`, `flatMap`, `toCsv`, `toWav`, `toTable`, and `out`.

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ import io.wavebeans.lib.stream.*
7777
import java.io.File
7878

7979
fun main() {
80+
// register the driver
81+
WbFileDriver.registerDriver("file", LocalWbFileDriver)
82+
8083
// describe what you want compute
8184
val out = 440.sine()
8285
.trim(1000)

cli/src/main/kotlin/io/wavebeans/cli/WaveBeansCli.kt

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import ch.qos.logback.classic.Level
44
import ch.qos.logback.classic.LoggerContext
55
import io.wavebeans.cli.script.RunMode
66
import io.wavebeans.cli.script.ScriptRunner
7+
import io.wavebeans.fs.local.LocalWbFileDriver
78
import io.wavebeans.http.WbHttpService
9+
import io.wavebeans.lib.io.WbFileDriver
810
import io.wavebeans.lib.table.TableRegistry
911
import io.wavebeans.lib.table.TableRegistryImpl
1012
import org.apache.commons.cli.CommandLine
@@ -87,6 +89,12 @@ class WaveBeansCli(
8789
runOptions["httpLocations"] = cli.getRequired(httpCommunicator) { listOf("127.0.0.1:$it") }
8890
}
8991
}
92+
93+
// register local file driver by default
94+
try {
95+
WbFileDriver.registerDriver("file", LocalWbFileDriver)
96+
} catch (ignore: IllegalStateException) {}
97+
9098
val sampleRate = cli.get(s) { it.toFloat() } ?: 44100.0f
9199

92100
val httpWait = cli.get(httpWait) { it.toLong() } ?: 0

cli/src/test/kotlin/io/wavebeans/cli/WaveBeansCliSpec.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,10 @@ class WaveBeansCliSpec : DescribeSpec({
111111
val portRange = createPorts(2)
112112
val facilitators = portRange.map {
113113
Facilitator(
114-
communicatorPort = it,
115114
threadsNumber = 2,
115+
communicatorPort = it,
116116
onServerShutdownTimeoutMillis = 100,
117-
podDiscovery = object : PodDiscovery() {}
117+
podDiscovery = object : PodDiscovery() {},
118118
)
119119
}
120120
facilitators.forEach { it.start() }
@@ -178,10 +178,10 @@ class WaveBeansCliSpec : DescribeSpec({
178178
val httpCommunicatorPort = findFreePort()
179179
val gardeners = portRange.map {
180180
Facilitator(
181-
communicatorPort = it,
182181
threadsNumber = 2,
182+
communicatorPort = it,
183183
onServerShutdownTimeoutMillis = 100,
184-
podDiscovery = object : PodDiscovery() {}
184+
podDiscovery = object : PodDiscovery() {},
185185
)
186186
}
187187

cli/src/test/kotlin/io/wavebeans/cli/script/ScriptRunnerSpec.kt

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ import io.kotest.core.spec.style.DescribeSpec
77
import io.kotest.datatest.withData
88
import io.wavebeans.execution.PodDiscovery
99
import io.wavebeans.execution.distributed.Facilitator
10+
import io.wavebeans.execution.distributed.FacilitatorConfig
11+
import io.wavebeans.fs.local.LocalWbFileDriver
1012
import io.wavebeans.lib.WaveBeansClassLoader
13+
import io.wavebeans.lib.io.WbFileDriver
1114
import io.wavebeans.tests.createPorts
1215
import java.io.File
1316
import java.lang.Thread.sleep
@@ -20,10 +23,13 @@ class ScriptRunnerSpec : DescribeSpec({
2023
val facilitators = portRange
2124
.map {
2225
Facilitator(
23-
communicatorPort = it,
2426
threadsNumber = 2,
27+
communicatorPort = it,
2528
onServerShutdownTimeoutMillis = 100,
26-
podDiscovery = object : PodDiscovery() {}
29+
podDiscovery = object : PodDiscovery() {},
30+
fileSystems = listOf(
31+
FacilitatorConfig.FileSystemDescriptor("file", LocalWbFileDriver::class.java.canonicalName),
32+
)
2733
)
2834
}
2935

@@ -190,31 +196,10 @@ class ScriptRunnerSpec : DescribeSpec({
190196
}
191197
}
192198

193-
context("Defining function as class") {
194-
withData(modes) { mode ->
195-
val script = """
196-
class InputFn: Fn<Pair<Long, Float>, Sample?>() {
197-
override fun apply(argument: Pair<Long, Float>): Sample? {
198-
return sampleOf(argument.first)
199-
}
200-
}
201-
202-
input(InputFn())
203-
.map { it }
204-
.trim(1)
205-
.toDevNull()
206-
.out()
207-
""".trimIndent()
208-
209-
assertThat(mode.eval(script)).isNull()
210-
}
211-
212-
}
213-
214199
context("Defining function as lambda") {
215200
withData(modes) { mode ->
216201
val script = """
217-
input { (i, _) -> sampleOf(i) }
202+
input { i, _ -> sampleOf(i) }
218203
.map { it }
219204
.trim(1)
220205
.toDevNull()

docker/gh-build/Dockerfile

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,21 @@ FROM eclipse-temurin:11-jdk
22

33
LABEL maintainer="WaveBeans"
44
LABEL "com.github.actions.name"="JDK 11 with Kotlin 2.2.20"
5-
LABEL "com.github.actions.description"="Can run java app and uses Kotlin SDK"
5+
LABEL "com.github.actions.description"="Can run java app and uses Kotlin SDK, have Google Chrome installed for tests"
66

7-
RUN apt-get update &&\
8-
apt-get install unzip
7+
RUN apt-get update && \
8+
apt-get install -y wget gnupg unzip && \
9+
wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - && \
10+
echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list && \
11+
apt-get update && \
12+
apt-get install -y google-chrome-stable && \
13+
rm -rf /var/lib/apt/lists/*
14+
15+
ENV CHROME_BIN=/usr/bin/google-chrome
916

1017
RUN cd /usr/lib && \
1118
wget -q https://github.com/JetBrains/kotlin/releases/download/v2.2.20/kotlin-compiler-2.2.20.zip && \
1219
unzip kotlin-compiler-*.zip && \
1320
rm kotlin-compiler-*.zip
1421

15-
ENV PATH=$PATH:/usr/lib/kotlinc/bin
22+
ENV PATH=$PATH:/usr/lib/kotlinc/bin

docs/dev/distributed-execution.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
- [Registering Bush Endpoints](#registering-bush-endpoints)
1414
- [Starting job and tracking its progress](#starting-job-and-tracking-its-progress)
1515
- [Pods distribution](#pods-distribution)
16+
- [Lambda Serialization and ExecutionScope](#lambda-serialization-and-executionscope)
1617

1718
<!-- END doctoc generated TOC please keep comment here to allow auto update -->
1819

@@ -140,6 +141,14 @@ Distributed overseer, while created, provided with the list of [Facilitators](de
140141

141142
The idea behind this planner is to be able to distribute based on Facilitator states, i.e. taking into account current capacity and assignments, as well as Bean aware deployment like, for example, inputs and outputs are better spread across different overseers as they may have high IO, or even requires special types of the nodes. All this things Planner can fetch upon start and make a better judgement what to deploy where. And the overseer will blindly follow the lead.
142143

144+
### Lambda Serialization and ExecutionScope
145+
146+
In distributed mode, the topology is serialized into JSON. Functional beans (like `MapStream`) use `LambdaSerializer` to handle the serialization of Kotlin lambdas.
147+
148+
Since Kotlin lambdas are not natively serializable across different JVM processes without the exact same context, WaveBeans wraps them into an internal `Fn` representation during serialization.
149+
150+
The `ExecutionScope` is serialized alongside the functional bean parameters. When the pod is instantiated on a worker node, the `ExecutionScope` is reconstructed, and the lambda is invoked with this scope as its receiver. This ensures that parameters passed via `executionScope { ... }` are available on all worker nodes.
151+
143152

144153

145154
[actors-hierarchy]: assets/distributed-execution-actors-hierarchy.png "Actors Hierarchy"

docs/migration_off_fn.md

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
### Migration off `Fn` within `lib`
2+
3+
This document tracks the progress of migrating away from the `Fn` class and its related infrastructure within the `lib` module. The goal is to replace `Fn` with more standard or efficient functional representations where applicable.
4+
5+
#### Migration Instructions
6+
7+
The goal of this migration is to replace the use of the `Fn` class with standard Kotlin functional interfaces (lambdas) in the `lib` module while maintaining serialization compatibility in the `exe` module.
8+
9+
##### Step 1: Update the `lib` module
10+
11+
Modify the classes in the `lib` module to use standard Kotlin functional interfaces instead of `Fn`.
12+
13+
- Change constructor parameters and properties from `Fn<T, R>` to `(T) -> R` (or appropriate functional type).
14+
- Update the implementation to call the lambda directly instead of using `.apply()`.
15+
- Keep the `BeanParams` classes and other structures, but update their properties to use lambdas.
16+
- If the component needs to access parameters from the environment (e.g., multiplier in `changeAmplitude`), it should use `ExecutionScope`.
17+
- `ExecutionScope` should be added to the `BeanParams` class and passed to the lambda as a receiver: `ExecutionScope.(T) -> R`.
18+
- If the `BeanParams` had a custom serializer within the `lib` module, it should be moved or replaced by a more general approach, as lambdas cannot be directly serialized by `kotlinx.serialization` without extra help.
19+
20+
Example with `ExecutionScope` (`MapStreamParams` in `io.wavebeans.lib.stream.MapStream`):
21+
```kotlin
22+
class MapStreamParams<T : Any, R : Any>(
23+
val scope: ExecutionScope,
24+
val transform: ExecutionScope.(T) -> R
25+
) : BeanParams
26+
```
27+
28+
Example (`InputParams` in `io.wavebeans.lib.io.FunctionInput`):
29+
```kotlin
30+
// Before
31+
class InputParams<T : Any>(
32+
val generator: Fn<Pair<Long, Float>, T?>,
33+
val sampleRate: Float? = null
34+
) : BeanParams
35+
36+
// After
37+
class InputParams<T : Any>(
38+
val generator: (Long, Float) -> T?,
39+
val sampleRate: Float? = null
40+
) : BeanParams
41+
```
42+
43+
##### Step 2: Create a custom serializer in the `exe` module
44+
45+
Since lambdas are not serializable, create a custom `KSerializer` in the `exe` module (typically under `io.wavebeans.execution.serializer`) that wraps the lambda into an `Fn` during serialization and unwraps it during deserialization.
46+
47+
- The `serialize` method should use `io.wavebeans.lib.wrap()` to convert the lambda to an `Fn`.
48+
- If using `ExecutionScope`, ensure it is also serialized (using `ExecutionScope.serializer()`) and passed to `wrap()` if necessary, or handled in the lambda returned by `deserialize`.
49+
- The `deserialize` method should decode the `Fn` and then return a lambda that calls `fn.apply()`.
50+
- Use `FnSerializer` to handle the actual serialization/deserialization of the wrapped `Fn`.
51+
52+
Example with `ExecutionScope` (`MapStreamParamsSerializer` in `io.wavebeans.execution.serializer`):
53+
```kotlin
54+
object MapStreamParamsSerializer : KSerializer<MapStreamParams<*, *>> {
55+
56+
override val descriptor: SerialDescriptor = buildClassSerialDescriptor(MapStreamParams::class.className()) {
57+
element("scope", ExecutionScope.serializer().descriptor)
58+
element("transformFn", FnSerializer.descriptor)
59+
}
60+
61+
override fun deserialize(decoder: Decoder): MapStreamParams<*, *> {
62+
return decoder.decodeStructure(descriptor) {
63+
lateinit var fn: Fn<Any, Any>
64+
lateinit var scope: ExecutionScope
65+
loop@ while (true) {
66+
when (val i = decodeElementIndex(descriptor)) {
67+
CompositeDecoder.DECODE_DONE -> break@loop
68+
0 -> scope = decodeSerializableElement(descriptor, i, ExecutionScope.serializer())
69+
1 -> fn = decodeSerializableElement(descriptor, i, FnSerializer) as Fn<Any, Any>
70+
else -> throw SerializationException("Unknown index $i")
71+
}
72+
}
73+
MapStreamParams<Any, Any>(scope) { fn.apply(it) }
74+
}
75+
}
76+
77+
override fun serialize(encoder: Encoder, value: MapStreamParams<*, *>) {
78+
encoder.encodeStructure(descriptor) {
79+
encodeSerializableElement(descriptor, 0, ExecutionScope.serializer(), value.scope)
80+
encodeSerializableElement(descriptor, 1, FnSerializer, wrap(value.transform))
81+
}
82+
}
83+
}
84+
```
85+
86+
Example (`InputParamsSerializer` in `io.wavebeans.execution.serializer`):
87+
```kotlin
88+
object InputParamsSerializer : KSerializer<InputParams<*>> {
89+
override val descriptor: SerialDescriptor = buildClassSerialDescriptor(InputParams::class.className()) {
90+
element("generateFn", FnSerializer.descriptor)
91+
element("sampleRate", Float.serializer().nullable.descriptor)
92+
}
93+
94+
override fun deserialize(decoder: Decoder): InputParams<*> {
95+
return decoder.decodeStructure(descriptor) {
96+
var sampleRate: Float? = null
97+
lateinit var func: Fn<Pair<Long, Float>, Any?>
98+
loop@ while (true) {
99+
when (val i = decodeElementIndex(descriptor)) {
100+
CompositeDecoder.DECODE_DONE -> break@loop
101+
0 -> func = decodeSerializableElement(descriptor, i, FnSerializer) as Fn<Pair<Long, Float>, Any?>
102+
1 -> sampleRate = decodeNullableSerializableElement(descriptor, i, Float.serializer().nullable)
103+
else -> throw SerializationException("Unknown index $i")
104+
}
105+
}
106+
InputParams({ a, b -> func.apply(a to b) }, sampleRate)
107+
}
108+
}
109+
110+
override fun serialize(encoder: Encoder, value: InputParams<*>) {
111+
encoder.encodeStructure(descriptor) {
112+
encodeSerializableElement(descriptor, 0, FnSerializer, wrap(value.generator))
113+
encodeNullableSerializableElement(descriptor, 1, Float.serializer().nullable, value.sampleRate)
114+
}
115+
}
116+
}
117+
```
118+
119+
##### Step 3: Register the serializer in `SerializationUtils.kt`
120+
121+
Update `io.wavebeans.execution.SerializationUtils.kt` to register the new serializer in the `beanParams()` method. This ensures that when a `BeanParams` is encountered during topology serialization, it uses your custom serializer.
122+
123+
```kotlin
124+
fun SerializersModuleBuilder.beanParams() {
125+
polymorphic(BeanParams::class) {
126+
// ...
127+
subclass(InputParams::class, InputParamsSerializer)
128+
// ...
129+
}
130+
}
131+
```
132+
133+
#### Technical Debt
134+
135+
The following items are temporary measures introduced during the migration and should be resolved once the migration is complete:
136+
137+
- [ ] Migrate `sincResampleFunc` and `SincResampleFn` to use lambdas instead of `Fn`.
138+
- [ ] Migrate `SimpleResampleFn` to use lambdas instead of `Fn`.
139+
140+
#### Classes to Migrate
141+
142+
- [ ] `io.wavebeans.lib.stream.SincResampleFn`
143+
- [x] `io.wavebeans.lib.io.CsvStreamOutput`
144+
- [x] `io.wavebeans.lib.io.CsvStreamOutputParams`
145+
- [x] `io.wavebeans.lib.io.CsvPartialStreamOutput`
146+
- [x] `io.wavebeans.lib.stream.window.MapWindowFn`
147+
- [x] `io.wavebeans.lib.stream.ResampleStreamParams`
148+
- [x] `io.wavebeans.lib.stream.ResampleBeanStream`
149+
- [x] `io.wavebeans.lib.stream.ResampleFiniteStream`
150+
- [x] `io.wavebeans.lib.stream.AbstractResampleStream`
151+
- [x] `io.wavebeans.lib.io.InputParams` (in `io.wavebeans.lib.io.FunctionInput`)
152+
- [x] `io.wavebeans.lib.io.Input` (in `io.wavebeans.lib.io.FunctionInput`)
153+
- [x] `io.wavebeans.lib.io.FunctionStreamOutput`
154+
- [x] `io.wavebeans.lib.io.FunctionStreamOutputParams`
155+
- [x] `io.wavebeans.lib.stream.FlattenStreamsParams` (in `io.wavebeans.lib.stream.FlattenStream`)
156+
- [x] `io.wavebeans.lib.stream.FlattenStream`
157+
- [x] `io.wavebeans.lib.stream.FlattenWindowStreamsParams` (in `io.wavebeans.lib.stream.FlattenWindowStream`)
158+
- [x] `io.wavebeans.lib.stream.FlattenWindowStream`
159+
- [x] `io.wavebeans.lib.stream.FunctionMergedStreamParams`
160+
- [x] `io.wavebeans.lib.stream.FunctionMergedStream`
161+
- [x] Identify areas for `ExecutionScope` documentation.
162+
- [x] Update `docs/user/api/functions.md` with `ExecutionScope` and `ScopeParameters`.
163+
- [x] Update operation-specific docs (`map`, `merge`, `input`, `out`) with `ExecutionScope` examples.
164+
- [x] Update `distributed-execution.md` with technical details of `ExecutionScope` serialization.
165+
- [x] Update `docs/user/api/readme.md` with `ExecutionScope` as a key concept.

0 commit comments

Comments
 (0)