feat(cache): add object versioning and standalone cache binary (RFE426)#427
feat(cache): add object versioning and standalone cache binary (RFE426)#427k82cn wants to merge 17 commits intoxflops:mainfrom
Conversation
Extract object cache into standalone flame-object-cache binary and implement client-side caching with version tracking for efficient conditional gets. Server-side changes: - Convert object_cache to standalone binary (remove lib.rs, add main.rs) - Initialize object version to 1 on PUT, increment on PATCH - Implement conditional GET via do_get ticket format: key:version - Return empty stream when client version matches (not modified) - Add wildcard session support for bulk delete (app/*) - Remove embedded cache from executor-manager Client-side changes (Python SDK): - Add client-side object cache with thread-safe access - Implement version checking in get_object (always verify with server) - Support version=0 for unconditional fetch (bypass cache) - Invalidate cache on update_object/patch_object mutations - Add delete_objects function with wildcard support Deployment: - Add Dockerfile.foc for standalone cache container - Add flame-object-cache service to docker-compose.yaml - Update CI configs to point cache endpoint to flame-object-cache - Add --cache profile to flmadm install command - Add systemd service template with MemoryMax=12G Documentation: - Add RFE426 design document and status - Update RFE318 design with ObjectKey and wildcard session - Update object_cache README with wire protocol details - Update flmadm README with cache installation instructions
There was a problem hiding this comment.
Code Review
This pull request extracts the object cache from the executor manager into a standalone service, flame-object-cache, and implements object versioning with client-side caching in the Python SDK. Key changes include updating deployment configurations, adding a new installation profile in flmadm, and modifying the storage layer to support application-scoped keys and wildcard session deletions. Feedback highlights critical issues where the client-side cache ignores custom deserializers on cache hits and the server-side versioning logic incorrectly resets version numbers to one during updates or when metadata is evicted from memory. Additionally, improvements are suggested for the validation of wildcard characters in object keys to prevent inconsistent state.
| if cached_version > 0: | ||
| with _cache_lock: | ||
| cached = _object_cache.get(cache_key) | ||
| if cached is not None: | ||
| return cached.data |
There was a problem hiding this comment.
On a cache hit (server returns not_modified), the code returns cached.data directly. However, cached.data contains the result of the first deserialization performed when the object was originally cached. If a user calls get_object with a different deserializer function on a cache hit, the new deserializer is ignored, and the old result is returned. The cache should either store the raw base and deltas to allow re-deserialization, or the cache key must include the deserializer identity.
| let size = object.data.len() as u64; | ||
|
|
||
| self.storage.write_object(&key, &object).await?; | ||
| let versioned_object = Object::new(1, object.data); |
There was a problem hiding this comment.
The put operation always initializes the object version to 1. This is correct for new objects, but put is also used for UPDATE operations (overwriting existing objects). In the case of an update, the version should be incremented from the current version rather than reset to 1. Resetting the version breaks the monotonicity requirement for client-side caching.
let current_version = {
let metadata = lock_ptr!(self.metadata)?;
metadata.get(&key_str).map(|m| m.version).unwrap_or(0)
};
let new_version = current_version + 1;
let versioned_object = Object::new(new_version, object.data);
self.storage.write_object(&key, &versioned_object).await?;
let meta = self.create_metadata(key_str.clone(), new_version, size, 0);| let current_version = { | ||
| let metadata = lock_ptr!(self.metadata)?; | ||
| metadata.get(&key_str).map(|m| m.version).unwrap_or(0) | ||
| }; | ||
| let new_version = current_version + 1; |
There was a problem hiding this comment.
The patch operation calculates the new_version based on the in-memory metadata map. If an object has been evicted from memory but remains on disk, metadata.get(&key_str) will return None, causing the version to reset to 1 (0 + 1). This violates version monotonicity. The current version should be retrieved from the storage engine if it's not present in the in-memory metadata.
| if *part == WILDCARD_SESSION && i != 1 { | ||
| return Err(FlameError::InvalidConfig( | ||
| "Wildcard '*' only allowed for session_id".to_string(), | ||
| )); | ||
| } |
There was a problem hiding this comment.
The wildcard character * is currently only forbidden for app_name and object_id via the loop index check, but it should be explicitly forbidden in all components except when used as a standalone session_id. Additionally, app/session/* is currently accepted as a valid 3-part key where the object ID is literally *, which contradicts the design that wildcard keys cannot reference specific objects.
- Fix version monotonicity: PUT now increments version for existing objects instead of resetting to 1. Check memory first, then storage for current version. - Fix PATCH version lookup: fetch current version from storage when metadata is evicted from memory to maintain monotonicity. - Fix wildcard validation: explicitly reject '*' in object_id position. - Apply cargo fmt formatting fixes.
Remove hardcoded /etc/flame/flame-cluster.yaml default and use FlameClusterContext::from_file(None) which searches standard locations (~/.flame/flame-cluster.yaml) like other Flame services.
Arrow Flight requires at least a schema message even for empty responses. Return schema-only FlightData instead of completely empty stream when client version matches server version (not modified case).
- Stop services before restarting to avoid race conditions - Start services in correct dependency order with delays - Add ./work/cache mount to flame-console and flame-executor-manager
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
- Enable RUST_LOG=debug in compose.yaml for all services - Add detailed logging to RecursiveService.compute_recursive() with exception tracing - Add timing measurements to test_runner_recursive_same_session - Add logging to RunnerService session opening and task submission - Add logging to Runner._start() for app existence checking - Add logging to cache.py for put/get/update/patch operations - Use common::init_logger for object_cache to write to log file
…entation - Revert RUST_LOG to info in compose.yaml - Remove verbose debug logs from cache.py, keep only error logs - Simplify RunnerService logs to essential session opening info - Simplify RecursiveService logs to show depth and key state changes - Simplify test logs to show depth, result, and timing only - Change logging.basicConfig to INFO level in recursive test
Summary
This PR implements RFE426: Object Versioning for Client-Side Cache, with two major components:
flame-object-cachebinary - Extract object cache from executor-manager into dedicated processChanges
Server-Side (Rust)
Standalone Binary:
object_cachecrate from library to binary (removelib.rs, addmain.rs)executor_manager/src/main.rsDockerfile.focfor container buildscompose.yamlwithdepends_onfor proper orderingVersion Tracking:
1on PUT operationdo_getticket format:{key}:{client_version}ObjectKey Enhancements:
app/*) for bulk delete operationsObjectKeystruct throughoutis_all_sessions(),matches(),with_object_id()methodsClient-Side (Python SDK)
Client-Side Cache:
_object_cache: Dict[tuple, Object]with thread-safe_cache_lockget_object()always checks with server, returns cached data on version matchversion=0bypasses cache (unconditional fetch)update_object()andpatch_object()invalidate cache after mutationNew Functions:
delete_objects(key_prefix)- Delete objects by prefix with wildcard supportDeployment
flmadm:
InstallProfile::Cacheand--cacheflagflame-object-cachetoBuildArtifactsMemoryMax=12G--allprofileCI/Docker:
ci/flame-cluster*.yamlcache endpoints toflame-object-cache:9090flame-object-cacheservice tocompose.yamldepends_onobject-cacheDocumentation
docs/designs/RFE426-cache-versioning/FS.md- Full design documentdocs/designs/RFE426-cache-versioning/STATUS.md- Implementation checklistdocs/designs/RFE318-cache/FS.md- Add ObjectKey and wildcard docsobject_cache/README.md- Wire protocol details, accurate API tableflmadm/README.md- Cache installation instructionsWire Protocol
do_putdo_put(PATCH cmd)FlightDescriptor.for_command("PATCH:{key}"))do_get{key}:{version}, empty stream = not modified)do_action(DELETE){app}/*wildcard)Testing
Closes #426