⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
EnvironmentVariableDefinition,
GlobalVariableDefinition,
LibraryArgumentDefinition,
VariableDefinition,
)
from robotcode.robot.diagnostics.library_doc import LibraryDoc
from robotcode.robot.diagnostics.library_doc import KeywordDoc, LibraryDoc
from robotcode.robot.diagnostics.namespace import Namespace

from ...common.parts.diagnostics import DiagnosticsCollectType, DiagnosticsResult
Expand Down Expand Up @@ -54,20 +55,18 @@ def _on_initialized(self, sender: Any) -> None:
self.parent.documents_cache.variables_changed.add(self._on_variables_changed)

def _on_libraries_changed(self, sender: Any, libraries: List[LibraryDoc]) -> None:
for doc in self.parent.documents.documents:
namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if namespace is not None:
lib_docs = (e.library_doc for e in namespace.get_libraries().values())
if any(lib_doc in lib_docs for lib_doc in libraries):
self.parent.diagnostics.force_refresh_document(doc)
docs_to_refresh: set[TextDocument] = set()
for lib_doc in libraries:
docs_to_refresh.update(self.parent.documents_cache.get_library_users(lib_doc))
for doc in docs_to_refresh:
self.parent.diagnostics.force_refresh_document(doc)

def _on_variables_changed(self, sender: Any, variables: List[LibraryDoc]) -> None:
for doc in self.parent.documents.documents:
namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if namespace is not None:
lib_docs = (e.library_doc for e in namespace.get_variables_imports().values())
if any(lib_doc in lib_docs for lib_doc in variables):
self.parent.diagnostics.force_refresh_document(doc)
docs_to_refresh: set[TextDocument] = set()
for var_doc in variables:
docs_to_refresh.update(self.parent.documents_cache.get_variables_users(var_doc))
for doc in docs_to_refresh:
self.parent.diagnostics.force_refresh_document(doc)

@language_id("robotframework")
def analyze(self, sender: Any, document: TextDocument) -> None:
Expand All @@ -83,37 +82,25 @@ def _on_get_related_documents(self, sender: Any, document: TextDocument) -> Opti
namespace = self.parent.documents_cache.get_only_initialized_namespace(document)
if namespace is None:
return None

result = []

lib_doc = namespace.get_library_doc()
for doc in self.parent.documents.documents:
if doc.language_id != "robotframework":
continue

doc_namespace = self.parent.documents_cache.get_only_initialized_namespace(doc)
if doc_namespace is None:
continue

if doc_namespace.is_analyzed():
for ref in doc_namespace.get_namespace_references():
if ref.library_doc == lib_doc:
result.append(doc)

return result
source = str(document.uri.to_path())
return self.parent.documents_cache.get_importers(source)

def modify_diagnostics(self, document: TextDocument, diagnostics: List[Diagnostic]) -> List[Diagnostic]:
return self.parent.documents_cache.get_diagnostic_modifier(document).modify_diagnostics(diagnostics)

@language_id("robotframework")
def collect_namespace_diagnostics(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
try:
namespace = self.parent.documents_cache.get_namespace(document)

return DiagnosticsResult(
self.collect_namespace_diagnostics, self.modify_diagnostics(document, namespace.get_diagnostics())
self.collect_namespace_diagnostics,
self.modify_diagnostics(document, namespace.get_diagnostics()),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
Expand All @@ -138,10 +125,47 @@ def collect_namespace_diagnostics(
],
)

def _is_keyword_used_anywhere(
self,
document: TextDocument,
kw: KeywordDoc,
namespace: Namespace,
) -> bool:
"""Check if keyword is used anywhere, using index with safe fallback."""
if self.parent.documents_cache.get_keyword_ref_users(kw):
return True

if namespace.get_keyword_references().get(kw):
return True

# Safe fallback: workspace scan if index might be incomplete
refs = self.parent.robot_references.find_keyword_references(document, kw, False, True)
return bool(refs)

def _is_variable_used_anywhere(
self,
document: TextDocument,
var: VariableDefinition,
namespace: Namespace,
) -> bool:
"""Check if variable is used anywhere, using index with safe fallback."""
if self.parent.documents_cache.get_variable_ref_users(var):
return True

if namespace.get_variable_references().get(var):
return True

# Safe fallback: workspace scan if index might be incomplete
refs = self.parent.robot_references.find_variable_references(document, var, False, True)
return bool(refs)

@language_id("robotframework")
@_logger.call
def collect_unused_keyword_references(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri)

Expand All @@ -161,8 +185,7 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
for kw in (namespace.get_library_doc()).keywords.values():
check_current_task_canceled()

references = self.parent.robot_references.find_keyword_references(document, kw, False, True)
if not references:
if not self._is_keyword_used_anywhere(document, kw, namespace):
result.append(
Diagnostic(
range=kw.name_range,
Expand All @@ -174,7 +197,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
)
)

return DiagnosticsResult(self.collect_unused_keyword_references, self.modify_diagnostics(document, result))
return DiagnosticsResult(
self.collect_unused_keyword_references,
self.modify_diagnostics(document, result),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
Expand All @@ -200,7 +226,10 @@ def _collect_unused_keyword_references(self, document: TextDocument) -> Diagnost
@language_id("robotframework")
@_logger.call
def collect_unused_variable_references(
self, sender: Any, document: TextDocument, diagnostics_type: DiagnosticsCollectType
self,
sender: Any,
document: TextDocument,
diagnostics_type: DiagnosticsCollectType,
) -> DiagnosticsResult:
config = self.parent.workspace.get_configuration(AnalysisConfig, document.uri)

Expand All @@ -222,15 +251,19 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos
check_current_task_canceled()

if isinstance(
var, (LibraryArgumentDefinition, EnvironmentVariableDefinition, GlobalVariableDefinition)
var,
(
LibraryArgumentDefinition,
EnvironmentVariableDefinition,
GlobalVariableDefinition,
),
):
continue

if var.name_token is not None and var.name_token.value and var.name_token.value.startswith("_"):
continue

references = self.parent.robot_references.find_variable_references(document, var, False, True)
if not references:
if not self._is_variable_used_anywhere(document, var, namespace):
result.append(
Diagnostic(
range=var.name_range,
Expand All @@ -243,7 +276,10 @@ def _collect_unused_variable_references(self, document: TextDocument) -> Diagnos
)
)

return DiagnosticsResult(self.collect_unused_variable_references, self.modify_diagnostics(document, result))
return DiagnosticsResult(
self.collect_unused_variable_references,
self.modify_diagnostics(document, result),
)
except (CancelledError, SystemExit, KeyboardInterrupt):
raise
except BaseException as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,23 +224,30 @@ def _find_variable_references(
include_declaration: bool = True,
stop_at_first: bool = False,
) -> List[Location]:
result = []
result: List[Location] = []

if include_declaration and variable.source:
result.append(Location(str(Uri.from_path(variable.source)), variable.name_range))

if variable.type == VariableDefinitionType.LOCAL_VARIABLE:
result.extend(self.find_variable_references_in_file(document, variable, False))
else:
result.extend(
self._find_references_in_workspace(
document,
stop_at_first,
self.find_variable_references_in_file,
variable,
False,
# Use reverse index for lookup instead of workspace scan
docs_to_search = self.parent.documents_cache.get_variable_ref_users(variable)
if docs_to_search:
for doc in docs_to_search:
check_current_task_canceled()
result.extend(self.find_variable_references_in_file(doc, variable, False))
if result and stop_at_first:
break
else:
# Fallback to workspace scan if index is empty
result.extend(
self._find_references_in_workspace(
document, stop_at_first, self.find_variable_references_in_file, variable, False
)
)
)

return result

@_logger.call
Expand Down Expand Up @@ -317,20 +324,26 @@ def _find_keyword_references(
include_declaration: bool = True,
stop_at_first: bool = False,
) -> List[Location]:
result = []
result: List[Location] = []

if include_declaration and kw_doc.source:
result.append(Location(str(Uri.from_path(kw_doc.source)), kw_doc.range))

result.extend(
self._find_references_in_workspace(
document,
stop_at_first,
self.find_keyword_references_in_file,
kw_doc,
False,
# Use reverse index for lookup instead of workspace scan
docs_to_search = self.parent.documents_cache.get_keyword_ref_users(kw_doc)
if docs_to_search:
for doc in docs_to_search:
check_current_task_canceled()
result.extend(self.find_keyword_references_in_file(doc, kw_doc, False))
if result and stop_at_first:
break
else:
# Fallback to workspace scan if index is empty
result.extend(
self._find_references_in_workspace(
document, stop_at_first, self.find_keyword_references_in_file, kw_doc, False
)
)
)

return result

Expand Down
26 changes: 24 additions & 2 deletions packages/robot/src/robotcode/robot/diagnostics/data_cache.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import os
import pickle
import tempfile
from abc import ABC, abstractmethod
from enum import Enum
from pathlib import Path
Expand All @@ -12,6 +14,8 @@
class CacheSection(Enum):
LIBRARY = "libdoc"
VARIABLES = "variables"
RESOURCE = "resource"
NAMESPACE = "namespace"


class DataCache(ABC):
Expand Down Expand Up @@ -85,5 +89,23 @@ def save_cache_data(self, section: CacheSection, entry_name: str, data: Any) ->
cached_file = self.build_cache_data_filename(section, entry_name)

cached_file.parent.mkdir(parents=True, exist_ok=True)
with cached_file.open("wb") as f:
pickle.dump(data, f)

# Atomic write: write to temp file, then rename
# This ensures readers never see partial/corrupt data
temp_fd, temp_path = tempfile.mkstemp(
dir=cached_file.parent,
prefix=cached_file.stem + "_",
suffix=".tmp",
)
try:
with os.fdopen(temp_fd, "wb") as f:
pickle.dump(data, f)
# Atomic rename (POSIX guarantees atomicity; Windows may fail if target exists)
Path(temp_path).replace(cached_file)
except Exception:
# Clean up temp file on failure (temp file may be left behind on SystemExit/KeyboardInterrupt)
try:
os.unlink(temp_path)
except OSError:
pass
raise
Loading