Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions cloudslam/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ type AppClient struct {
PackageClient pbPackage.PackageServiceClient
SyncClient pbDataSync.DataSyncServiceClient
RobotClient pbApp.RobotServiceClient
HTTPClient *http.Client // used for downloading pcds of the current cloudslam session
HTTPClient *http.Client // used for downloading pcds of the current cloudslam session
logger logging.Logger
}

// CreateCloudSLAMClient creates a new grpc cloud configured to communicate with the robot service based on the cloud config given.
Expand Down Expand Up @@ -63,6 +64,7 @@ func CreateCloudSLAMClient(ctx context.Context, apiKey, apiKeyID, baseURL string
// This might be redundant with CloseIdleConnections in Close(),
// and unsure if the extra cost of redoing the TLS handshake makes this change worth it
HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: true}},
logger: logger,
}, nil
}

Expand Down Expand Up @@ -106,31 +108,27 @@ func (app *AppClient) GetDataFromHTTP(ctx context.Context, dataURL string) ([]by
// CheckSensorsDataCapture verifies that all of the provided sensors have at least one enabled
// data capture method configured in the machine part's config. Returns an error listing any sensors
// that are missing enabled capture.
func (app *AppClient) CheckSensorsDataCapture(ctx context.Context, partID string, sensors []*cloudslamSensorInfo, logger logging.Logger) error {
func (app *AppClient) CheckSensorsDataCapture(ctx context.Context, partID string, sensors []*cloudslamSensorInfo) error {
req := pbApp.ConfigRequest{Id: partID}
resp, err := app.RobotClient.Config(ctx, &req)
if err != nil {
return err
}

// index sensor names for quick lookup, track which ones we've verified
pending := make(map[string]struct{}, len(sensors))
for _, s := range sensors {
pending[s.name] = struct{}{}
}

sensorTypes := make(map[string]slam.SensorType, len(sensors))
for _, s := range sensors {
pending[s.name] = struct{}{}
sensorTypes[s.name] = s.sensorType
}

for _, comp := range resp.GetConfig().GetComponents() {
if _, ok := pending[comp.GetName()]; !ok {
continue
}
logger.Debugf("checking data capture for sensor %q (type %v)", comp.GetName(), sensorTypes[comp.GetName()])
app.logger.Debugf("checking data capture for sensor %q (type %v)", comp.GetName(), sensorTypes[comp.GetName()])
for _, svcConfig := range comp.GetServiceConfigs() {
logger.Debugf(" service config type: %q, attributes: %v", svcConfig.GetType(), svcConfig.GetAttributes())
app.logger.Debugf(" service config type: %q, attributes: %v", svcConfig.GetType(), svcConfig.GetAttributes())
}
if hasEnabledDataCapture(comp, sensorTypes[comp.GetName()]) {
delete(pending, comp.GetName())
Expand Down
124 changes: 90 additions & 34 deletions cloudslam/cloudslam.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,18 @@ import (
"embed"
"errors"
"fmt"
"math"
"os"
"strconv"
"strings"
"sync/atomic"
"time"

"os"

"github.com/golang/geo/r3"
pbCloudSLAM "go.viam.com/api/app/cloudslam/v1"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/services/slam"
"go.viam.com/rdk/spatialmath"
Expand Down Expand Up @@ -67,11 +69,24 @@ type Config struct {
BaseURL string `json:"base_url,omitempty"` // this should only be used for testing in staging
}

// activeJobState holds the current job ID and when it was started.
// startedAt is zero if the job was already running when the wrapper started up.
type activeJobState struct {
id string
startedAt time.Time
}

// updatingMapInfo holds the name and version of an existing map to continue from.
type updatingMapInfo struct {
name string
version string
}

type cloudslamWrapper struct {
resource.Named
resource.AlwaysRebuild

activeJob atomic.Pointer[string]
activeJob atomic.Pointer[activeJobState] // nil when no job is running
lastPose atomic.Pointer[spatialmath.Pose]
lastPointCloudURL atomic.Pointer[string]
defaultpcd []byte
Expand All @@ -87,9 +102,7 @@ type cloudslamWrapper struct {
viamVersion string // optional cloudslam setting, describes which viam-server appimage to use(stable/latest/pr/pinned)
slamVersion string // optional cloudslam setting, describes which cartographer appimage to use(stable/latest/pr/pinned)

// updating mode values. A user can only use updating mode if the partID is configured
updatingMapName string // empty if slam is not in updating mode
updatingMapVersion string // empty if slam is not in updating mode
updatingMap *updatingMapInfo // nil if not in updating mode

// app clients for talking to app
app *AppClient
Expand Down Expand Up @@ -161,6 +174,9 @@ func newSLAM(
if partID == "" {
partID = os.Getenv(rdkutils.MachinePartIDEnvVar)
}
if partID == "" {
return nil, fmt.Errorf("machine_part_id is required but not set in config or %s env var", rdkutils.MachinePartIDEnvVar)
}

viamVersion := newConf.VIAMVersion
if viamVersion == "" {
Expand Down Expand Up @@ -226,8 +242,7 @@ func newSLAM(
func (svc *cloudslamWrapper) initialize(mappingMode slam.MappingMode) error {
var err error
svc.lastPose.Store(&initPose)
initJob := ""
svc.activeJob.Store(&initJob)
svc.activeJob.Store(nil)
svc.lastPointCloudURL.Store(&initPCDURL)

// using this as a placeholder image. need to determine the right way to have the module use it
Expand All @@ -236,15 +251,18 @@ func (svc *cloudslamWrapper) initialize(mappingMode slam.MappingMode) error {
return err
}

// only attempt updating mode if a partID is configured and the user is not trying to make a new map.
// only attempt updating mode if the user is not trying to make a new map.
// the cloudslam can still make a new map if no slam_map package is found.
// the webapp does not remove the package from the config when swapping from updating mode to mapping mode, so this code
// needs the extra check to ensure we only update maps when the user wants to.
if svc.partID != "" && mappingMode != slam.MappingModeNewMap {
svc.updatingMapName, svc.updatingMapVersion, err = svc.app.GetSLAMMapPackageOnRobot(svc.cancelCtx, svc.partID)
if mappingMode != slam.MappingModeNewMap {
name, version, err := svc.app.GetSLAMMapPackageOnRobot(svc.cancelCtx, svc.partID)
if err != nil {
return err
}
if name != "" {
svc.updatingMap = &updatingMapInfo{name: name, version: version}
}
}

// check if the machine has an active job
Expand All @@ -253,8 +271,9 @@ func (svc *cloudslamWrapper) initialize(mappingMode slam.MappingMode) error {
if err != nil {
return err
}
sessionID := resp.GetSessionId()
svc.activeJob.Store(&sessionID)
if resp.GetSessionId() != "" {
svc.activeJob.Store(&activeJobState{id: resp.GetSessionId()})
}

svc.workers = goutils.NewBackgroundStoppableWorkers(svc.activeMappingSessionThread)
return nil
Expand All @@ -267,14 +286,14 @@ func (svc *cloudslamWrapper) activeMappingSessionThread(ctx context.Context) {
return
}

currJob := *svc.activeJob.Load()
job := svc.activeJob.Load()
// do nothing if no active jobs
if currJob == "" {
if job == nil {
continue
}

// get the most recent pointcloud and position if there is an active job
req := &pbCloudSLAM.GetMappingSessionPointCloudRequest{SessionId: currJob}
req := &pbCloudSLAM.GetMappingSessionPointCloudRequest{SessionId: job.id}
resp, err := svc.app.CSClient.GetMappingSessionPointCloud(ctx, req)
if err != nil {
svc.logger.Error(err)
Expand All @@ -296,8 +315,16 @@ func (svc *cloudslamWrapper) Position(ctx context.Context) (spatialmath.Pose, er
func (svc *cloudslamWrapper) PointCloudMap(ctx context.Context, returnEditedMap bool) (func() ([]byte, error), error) {
currMap := *svc.lastPointCloudURL.Load()

// return the placeholder map when no maps are present
if currMap == "" {
job := svc.activeJob.Load()
if job != nil && !job.startedAt.IsZero() {
progressPCD, err := generateProgressRingPCD(time.Since(job.startedAt))
if err != nil {
svc.logger.Warnf("failed to generate progress PCD: %v", err)
return toChunkedFunc(svc.defaultpcd), nil
}
return toChunkedFunc(progressPCD), nil
}
return toChunkedFunc(svc.defaultpcd), nil
}
pcdBytes, err := svc.app.GetDataFromHTTP(ctx, currMap)
Expand All @@ -316,7 +343,7 @@ func (svc *cloudslamWrapper) Properties(ctx context.Context) (slam.Properties, e
}

func (svc *cloudslamWrapper) Close(ctx context.Context) error {
if *svc.activeJob.Load() != "" {
if svc.activeJob.Load() != nil {
_, err := svc.StopJob(ctx)
if err != nil {
svc.logger.Errorf("error while stopping job: %v", err)
Expand All @@ -330,22 +357,20 @@ func (svc *cloudslamWrapper) Close(ctx context.Context) error {
func (svc *cloudslamWrapper) DoCommand(ctx context.Context, req map[string]interface{}) (map[string]interface{}, error) {
resp := map[string]interface{}{}
if name, ok := req[startJobKey]; ok {
if svc.partID != "" {
if err := svc.app.CheckSensorsDataCapture(ctx, svc.partID, svc.sensors, svc.logger); err != nil {
return nil, err
}
if err := svc.app.CheckSensorsDataCapture(ctx, svc.partID, svc.sensors); err != nil {
return nil, err
}
jobID, isUpdating, err := svc.StartJob(svc.cancelCtx, name.(string))
if err != nil {
return nil, err
}
svc.activeJob.Store(&jobID)
svc.activeJob.Store(&activeJobState{id: jobID, startedAt: time.Now()})
svc.lastPose.Store(&initPose)
svc.lastPointCloudURL.Store(&initPCDURL)

if isUpdating {
resp[updatingModeKey] = fmt.Sprintf("slam map found on machine, starting cloudslam in updating mode. Map "+
"Name = %v // Updating Version = %v", svc.updatingMapName, svc.updatingMapVersion)
"Name = %v // Updating Version = %v", svc.updatingMap.name, svc.updatingMap.version)
}
resp[startJobKey] = "Starting cloudslam session, the machine should appear in ~1 minute. Job ID: " + jobID
}
Expand All @@ -368,28 +393,28 @@ func (svc *cloudslamWrapper) DoCommand(ctx context.Context, req map[string]inter

// StopJob stops the current active cloudslam job.
func (svc *cloudslamWrapper) StopJob(ctx context.Context) (string, error) {
// grab the active job but do not clear it from the module. that way users can still see the final map on the machine
currJob := *svc.activeJob.Load()
if currJob == "" {
job := svc.activeJob.Load()
if job == nil {
return "", errors.New("no active jobs")
}

req := &pbCloudSLAM.StopMappingSessionRequest{SessionId: currJob}
req := &pbCloudSLAM.StopMappingSessionRequest{SessionId: job.id}
resp, err := svc.app.CSClient.StopMappingSession(ctx, req)
if err != nil {
return "", err
}

metaResp, err := svc.app.CSClient.GetMappingSessionMetadataByID(ctx,
&pbCloudSLAM.GetMappingSessionMetadataByIDRequest{SessionId: currJob})
&pbCloudSLAM.GetMappingSessionMetadataByIDRequest{SessionId: job.id})
if err != nil {
svc.logger.Warnf("could not retrieve session metadata for job %s: %v", currJob, err)
svc.logger.Warnf("could not retrieve session metadata for job %s: %v", job.id, err)
} else if metaResp.GetSessionMetadata().GetEndStatus() == pbCloudSLAM.EndStatus_END_STATUS_FAIL {
return "", fmt.Errorf("cloudslam session failed: %s", metaResp.GetSessionMetadata().GetErrorMsg())
}

svc.activeJob.Store(nil)
packageName := strings.Split(resp.GetPackageId(), "/")[1]
packageURL := svc.app.baseURL + "/robots?name=" + packageName + "&version=" + resp.GetVersion()
packageURL := svc.app.baseURL + "/robots?page=slam&name=" + packageName + "&version=" + resp.GetVersion()
return packageURL, nil
}

Expand Down Expand Up @@ -422,9 +447,9 @@ func (svc *cloudslamWrapper) StartJob(ctx context.Context, mapName string) (stri
Sensors: svc.sensorInfoToProto(),
SlamConfig: configParams,
}
if svc.updatingMapName != "" {
req.MapName = svc.updatingMapName
req.ExistingMapVersion = svc.updatingMapVersion
if svc.updatingMap != nil {
req.MapName = svc.updatingMap.name
req.ExistingMapVersion = svc.updatingMap.version
updatingMode = true
}
resp, err := svc.app.CSClient.StartMappingSession(ctx, req)
Expand Down Expand Up @@ -480,6 +505,37 @@ func (svc *cloudslamWrapper) ParseSensorsForPackage() ([]interface{}, error) {
return sensorMetadata, nil
}

// generateProgressRingPCD generates a point cloud of a progress arc indicating elapsed time.
// The arc grows clockwise from 0 to a full circle over progressRingDuration, giving the user
// visual feedback while waiting for the first cloudslam map to appear.
func generateProgressRingPCD(elapsed time.Duration) ([]byte, error) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when running cloudslam there is always a period of waiting for the job to start, because we have to spin up a job and setup the machine.

now we display something that moves, and in the next pr we expand on that further

const (
numPoints = 360
radius = 1000.0 // mm
progressRingDuration = 5 * time.Minute
)

// Always show at least a small arc so the user sees something immediately.
fraction := math.Max(0.02, math.Min(elapsed.Seconds()/progressRingDuration.Seconds(), 1.0))
filledPoints := int(fraction * numPoints)

pc := pointcloud.NewBasicEmpty()
for i := 0; i < filledPoints; i++ {
angle := float64(i) / numPoints * 2 * math.Pi
x := radius * math.Cos(angle)
y := radius * math.Sin(angle)
if err := pc.Set(r3.Vector{X: x, Y: y, Z: 0}, pointcloud.NewBasicData()); err != nil {
return nil, err
}
}

var buf bytes.Buffer
if err := pointcloud.ToPCD(pc, &buf, pointcloud.PCDAscii); err != nil {
return nil, err
}
return buf.Bytes(), nil
}

// toChunkedFunc takes binary data and wraps it in a helper function that converts it into chunks for streaming APIs.
func toChunkedFunc(b []byte) func() ([]byte, error) {
chunk := make([]byte, chunkSizeBytes)
Expand Down