Skip to content
Get Started for Free

Managed Service for Apache Flink

Apache Flink is a framework for building applications that process and analyze streaming data. Managed Service for Apache Flink (MSF) is an AWS service that provides the underlying infrastructure and a hosted Apache Flink cluster that can run Apache Flink applications.

LocalStack lets you to run Flink applications locally and implements several AWS-compatible API operations.

A separate Apache Flink cluster is started in application mode for every Managed Flink application created. Flink cluster deployment on LocalStack consists of two separate containers for JobManager and TaskManager.

The default container runtime for MSF on LocalStack is Docker. LocalStack creates Flink JobManager and TaskManager containers on the Docker network specified by MAIN_DOCKER_NETWORK (defaults to bridge).

Running LocalStack inside a Kubernetes cluster requires additional configuration. Mounting the host Docker socket (/var/run/docker.sock) into a LocalStack pod is not sufficient — the Docker executor cannot create Flink containers in this topology and applications will remain stuck in STARTING indefinitely.

To run MSF when LocalStack is deployed inside Kubernetes, set CONTAINER_RUNTIME=kubernetes. This uses the Kubernetes executor, which creates Flink workloads as Kubernetes pods instead of Docker containers.

If you are running LocalStack outside of Kubernetes (for example, with Docker Compose or the LocalStack CLI), no additional configuration is required and the Docker executor is used automatically.

This guide builds a demo Flink application and deploys it to LocalStack. The application generates synthetic records, processes them and sends the output to an S3 bucket.

Start the LocalStack container using your preferred method.

Begin by cloning the AWS sample repository. We will use the S3 Sink application in this example.

Terminal window
git clone https://github.com/localstack-samples/amazon-managed-service-for-apache-flink-examples.git
cd java/S3Sink

Next, use Maven to compile and package the Flink application into a jar.

Terminal window
mvn package

The Flink application jar file will be placed in the ./target/flink-kds-s3.jar directory.

MSF requires that all application code resides in S3.

Create an S3 bucket and upload the compiled Flink application jar.

Terminal window
awslocal s3api create-bucket --bucket flink-bucket
awslocal s3api put-object --bucket flink-bucket --key job.jar --body ./target/flink-kds-s3.jar

As mentioned earlier, this Flink application writes the output to an S3 bucket.

Create the S3 bucket that will serve as the sink.

Terminal window
awslocal s3api create-bucket --bucket sink-bucket

MSF requires a service execution role which allows it to connect to other services. Without the proper permissions policy and role, this example application will not be able to connect to S3 sink bucket to output the result.

Create an IAM role for the running MSF application to assume.

role.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "kinesisanalytics.amazonaws.com"},
"Action": "sts:AssumeRole"
}
]
}
Terminal window
awslocal iam create-role --role-name msaf-role --assume-role-policy-document file://role.json

Next create add a permissions policy to this role that permits read and write access to S3.

policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:GetObjectVersion", "s3:PutObject"],
"Resource": "*"
}
]
}
Terminal window
awslocal iam put-role-policy --role-name msaf-role --policy-name msaf-policy --policy-document file://policy.json

Now, when the running MSF application assumes this role, it will have the necessary permissions to write to the S3 sink.

With all prerequisite resources in place, the Flink application can now be created and started.

Terminal window
awslocal kinesisanalyticsv2 create-application \
--application-name msaf-app \
--runtime-environment FLINK-1_20 \
--application-mode STREAMING \
--service-execution-role arn:aws:iam::000000000000:role/msaf-role \
--application-configuration '{
"ApplicationCodeConfiguration": {
"CodeContent": {
"S3ContentLocation": {
"BucketARN": "arn:aws:s3:::flink-bucket",
"FileKey": "job.jar"
}
},
"CodeContentType": "ZIPFILE"
},
"EnvironmentProperties": {
"PropertyGroups": [{
"PropertyGroupId": "bucket", "PropertyMap": {"name": "sink-bucket"}
}]
}
}'
awslocal kinesisanalyticsv2 start-application --application-name msaf-app

Once the Flink cluster is up and running, the application will stream the results to the sink S3 bucket. You can verify this with:

Terminal window
awslocal s3api list-objects --bucket sink-bucket

LocalStack MSF supports CloudWatch Logs integration to help monitor the Flink cluster for application events or configuration problems. The logging option can be added at the time of creating the Flink application using the CreateApplication operation. Logging options can also be managed at a later point using the AddApplicationCloudWatchLoggingOption and DeleteApplicationCloudWatchLoggingOption operations.

There are following prerequisites for CloudWatch Logs integration:

  • You must create the application’s log group and log stream. Flink will not create it for you.
  • You must add the permissions your application needs to write to the log stream to the service execution role. Generally the following IAM actions are sufficient: logs:DescribeLogGroups, logs:DescribeLogStreams and logs:PutLogEvents

To add a logging option:

Terminal window
awslocal kinesisanalyticsv2 add-application-cloud-watch-logging-option \
--application-name msaf-app \
--cloud-watch-logging-option '{"LogStreamARN": "arn:aws:logs:us-east-1:000000000000:log-group:msaf-log-group:log-stream:msaf-log-stream"}'
Output
{
"ApplicationARN": "arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app",
"ApplicationVersionId": 2,
"CloudWatchLoggingOptionDescriptions": [
{
"CloudWatchLoggingOptionId": "1.1",
"LogStreamARN": "arn:aws:logs:us-east-1:000000000000:log-group:msaf-log-group:log-stream:msaf-log-stream"
}
]
}

Configured logging options can be retrieved using DescribeApplication:

Terminal window
awslocal kinesisanalyticsv2 describe-application --application-name msaf-app | jq .ApplicationDetail.CloudWatchLoggingOptionDescriptions
Output
[
{
"CloudWatchLoggingOptionId": "1.1",
"LogStreamARN": "arn:aws:logs:us-east-1:000000000000:log-group:msaf-log-group:log-stream:msaf-log-stream"
}
]

Log events can be retrieved from CloudWatch Logs using the appropriate operation. To retrieve all events:

Terminal window
awslocal logs get-log-events --log-group-name msaf-log-group --log-stream-name msaf-log-stream

LocalStack reports both Flink application and Flink framework logs to CloudWatch. However, certain extended information such as stack traces may be missing. You may obtain this information by execing into the Flink Docker container created by LocalStack and inspecting /opt/flink/log.

You can manage resource tags using TagResource, UntagResource and ListTagsForResource. Tags can also be specified when creating the Flink application using the CreateApplication operation.

Terminal window
awslocal kinesisanalyticsv2 tag-resource \
--resource-arn arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app \
--tags Key=country,Value=SE
awslocal kinesisanalyticsv2 list-tags-for-resource \
--resource-arn arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app
Output
{
"Tags": [
{
"Key": "country",
"Value": "SE"
}
]
}

You can also untag the resource:

Terminal window
awslocal kinesisanalyticsv2 untag-resource \
--resource-arn arn:aws:kinesisanalytics:us-east-1:000000000000:application/msaf-app \
--tag-keys country
Flink versionSupported by LocalStackSupported by Apache
1.20.0yesyes
1.19.1yesyes
1.18.1yesyes
1.15.2yesno
1.13.1yesno

If describe-application returns STARTING indefinitely and no Flink containers appear, the most likely cause is that LocalStack cannot reach the Docker daemon or Kubernetes API to create the Flink JobManager and TaskManager.

When this occurs, LocalStack logs an internal error after CLUSTER_READY_WAIT_TIMEOUT elapses:

Exception: Error submitting job: Flink cluster <id> is not running

The application does not automatically transition to FAILED — it remains in STARTING. This means IaC tools that use state waiters (such as the Terraform AWS provider or Crossplane) will block until their own timeout expires.

Common causes and fixes:

  • Running LocalStack inside Kubernetes with the Docker executor — the Docker executor is not supported in this topology. Set CONTAINER_RUNTIME=kubernetes and ensure you have an Enterprise licence. See Deployment Considerations for details.
  • Wrong or missing Docker network — if LocalStack is running in a non-default Docker network, set MAIN_DOCKER_NETWORK to the name of that network so Flink containers are attached to the correct network.
  • Docker socket not accessible — confirm that the Docker socket is mounted and functional. You can verify by listing containers from inside the LocalStack container: curl --unix-socket /var/run/docker.sock http://localhost/containers/json.
  • Application versions are not maintained
  • Only S3 zipfile code is supported
  • Values of 20,000 ms for execution.checkpointing.interval and 5,000 ms for execution.checkpointing.min-pause are used for checkpointing. They can not be overridden.
  • In-place version upgrades and roll-backs are not supported
  • Snapshot/savepoint management is not implemented
  • CloudTrail integration and CloudWatch metrics is not implemented. The application logging level defaults to INFO and can not be overridden.
  • Parallelism is limited to the default value of 1, with one TaskManager that has one Task Slot allocated. Parallelism configuration provided on Flink application creation or update is ignored.
  • When a Flink cluster fails to start, the application remains in STARTING rather than transitioning to FAILED. Check LocalStack logs and see Troubleshooting for guidance.
  • The Docker executor is not supported when LocalStack runs as a pod inside a Kubernetes cluster. Use CONTAINER_RUNTIME=kubernetes (Enterprise only) in this topology.
OperationImplementedVerified on Kubernetes
Page 1 of 0
Was this page helpful?