导入网购订单数据,分类统计服饰,食品,家居类的消费金额,输出最省钱的消费品类。
2026/1/16 11:04:57
Long的序列切成多少段(sub-sequence)GeneratorFunction<Long, OUT>:把输入的 index(Long)映射成任意事件类型一句话:Flink 负责发 index,你负责把 index 变成事件。
importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;publicclassDataGenDemo{publicstaticvoidmain(String[]args)throwsException{StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();GeneratorFunction<Long,String>generator=index->"Number: "+index;longnumberOfRecords=1000;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,numberOfRecords,Types.STRING);DataStreamSource<String>stream=env.fromSource(source,WatermarkStrategy.noWatermarks(),"Generator Source");stream.print();env.execute("datagen-demo");}}要点:
"Number: 0"到"Number: 999"顺序importorg.apache.flink.api.common.functions.GeneratorFunction;importorg.apache.flink.api.common.typeinfo.Types;importorg.apache.flink.connector.datagen.source.DataGeneratorSource;importorg.apache.flink.connector.datagen.source.RateLimiterStrategy;GeneratorFunction<Long,String>generator=index->"Number: "+index;DataGeneratorSource<String>source=newDataGeneratorSource<>(generator,Long.MAX_VALUE,RateLimiterStrategy.perSecond(100),// 全部 source subtasks 加起来 <= 100 条/sTypes.STRING);适用场景:
numberOfRecords = Long.MAX_VALUE基本等同“不会结束”(实践上像 unbounded)建议:
Long.MAX_VALUE+ rate limit可以,但有个硬条件:
GeneratorFunction必须对输入 index 完全确定性反例(会破坏确定性):
random()、System.currentTimeMillis()、读外部可变配置、读网络请求结果正确做法:
new Random(index)(每个 index 固定)默认例子用noWatermarks(),但你完全可以:
WatermarkStrategy生成 deterministic watermarks