Load hosted instance¶
import pytest
import shutil
import lamindb_setup as ln_setup
from lamindb_setup.core.upath import UPath
from lamindb_setup.core._hub_core import delete_instance
from lamindb_setup.core._hub_client import connect_hub_with_auth
from lamindb_setup.core.upath import InstanceNotEmpty
instance_name = "my-hosted"
assert ln_setup.settings.user.handle == "testuser1"
ln_setup.connect(f"testuser1/{instance_name}")
root = ln_setup.settings.storage.root
root.view_tree()
target_path = root / "test_notebooks.py"
target_path.upload_from("test_notebooks.py")
assert target_path.exists()
test_dir = UPath("./test-dir-upload")
test_dir.mkdir()
(test_dir / "file1").touch()
subdir = test_dir / "subdir"
subdir.mkdir()
(subdir / "file2").touch()
subsubdir = subdir / "subsubdir"
subsubdir.mkdir()
(subsubdir / "file3").touch()
target_dir = root / "test-dir-upload"
target_dir.upload_from(test_dir, create_folder=True) # default
assert target_dir.is_dir()
assert (target_dir / "test-dir-upload").exists()
assert (target_dir / "test-dir-upload/file1").exists()
target_dir.rmdir()
assert not target_dir.exists()
target_dir.upload_from(test_dir, create_folder=False)
assert target_dir.is_dir()
assert (target_dir / "file1").exists()
assert not (target_dir / "test-dir-upload").exists()
Test that instance can not be deleted from hub:
with pytest.raises(InstanceNotEmpty):
delete_instance(f"testuser1/{instance_name}")
Test storage record for the root exists:
hub = connect_hub_with_auth()
response = hub.table("storage").select("*").eq("root", root.as_posix()).execute().data
assert len(response) == 1
assert response[0]["is_default"]
Delete everything:
shutil.rmtree(test_dir)
target_path.unlink()
assert not target_path.exists()
target_dir.rmdir()
assert not target_dir.exists()
assert ln_setup.settings.storage._mark_storage_root.exists()
ln_setup.delete(instance_name, force=True)
delete_instance(f"testuser1/{instance_name}")
Check that the storage record has been deleted:
response = hub.table("storage").select("*").eq("root", root.as_posix()).execute().data
assert len(response) == 0