diff --git a/cloudslam/app.go b/cloudslam/app.go index d1fa89b..77fb6ee 100644 --- a/cloudslam/app.go +++ b/cloudslam/app.go @@ -28,7 +28,8 @@ type AppClient struct { PackageClient pbPackage.PackageServiceClient SyncClient pbDataSync.DataSyncServiceClient RobotClient pbApp.RobotServiceClient - HTTPClient *http.Client // used for downloading pcds of the current cloudslam session + HTTPClient *http.Client // used for downloading pcds of the current cloudslam session + logger logging.Logger } // CreateCloudSLAMClient creates a new grpc cloud configured to communicate with the robot service based on the cloud config given. @@ -63,6 +64,7 @@ func CreateCloudSLAMClient(ctx context.Context, apiKey, apiKeyID, baseURL string // This might be redundant with CloseIdleConnections in Close(), // and unsure if the extra cost of redoing the TLS handshake makes this change worth it HTTPClient: &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}, + logger: logger, }, nil } @@ -106,21 +108,17 @@ func (app *AppClient) GetDataFromHTTP(ctx context.Context, dataURL string) ([]by // CheckSensorsDataCapture verifies that all of the provided sensors have at least one enabled // data capture method configured in the machine part's config. Returns an error listing any sensors // that are missing enabled capture. -func (app *AppClient) CheckSensorsDataCapture(ctx context.Context, partID string, sensors []*cloudslamSensorInfo, logger logging.Logger) error { +func (app *AppClient) CheckSensorsDataCapture(ctx context.Context, partID string, sensors []*cloudslamSensorInfo) error { req := pbApp.ConfigRequest{Id: partID} resp, err := app.RobotClient.Config(ctx, &req) if err != nil { return err } - // index sensor names for quick lookup, track which ones we've verified pending := make(map[string]struct{}, len(sensors)) - for _, s := range sensors { - pending[s.name] = struct{}{} - } - sensorTypes := make(map[string]slam.SensorType, len(sensors)) for _, s := range sensors { + pending[s.name] = struct{}{} sensorTypes[s.name] = s.sensorType } @@ -128,9 +126,9 @@ func (app *AppClient) CheckSensorsDataCapture(ctx context.Context, partID string if _, ok := pending[comp.GetName()]; !ok { continue } - logger.Debugf("checking data capture for sensor %q (type %v)", comp.GetName(), sensorTypes[comp.GetName()]) + app.logger.Debugf("checking data capture for sensor %q (type %v)", comp.GetName(), sensorTypes[comp.GetName()]) for _, svcConfig := range comp.GetServiceConfigs() { - logger.Debugf(" service config type: %q, attributes: %v", svcConfig.GetType(), svcConfig.GetAttributes()) + app.logger.Debugf(" service config type: %q, attributes: %v", svcConfig.GetType(), svcConfig.GetAttributes()) } if hasEnabledDataCapture(comp, sensorTypes[comp.GetName()]) { delete(pending, comp.GetName()) diff --git a/cloudslam/cloudslam.go b/cloudslam/cloudslam.go index 85f6ea2..9621a3a 100644 --- a/cloudslam/cloudslam.go +++ b/cloudslam/cloudslam.go @@ -7,16 +7,18 @@ import ( "embed" "errors" "fmt" + "math" + "os" "strconv" "strings" "sync/atomic" "time" - "os" - + "github.com/golang/geo/r3" pbCloudSLAM "go.viam.com/api/app/cloudslam/v1" "go.viam.com/rdk/grpc" "go.viam.com/rdk/logging" + "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/services/slam" "go.viam.com/rdk/spatialmath" @@ -67,11 +69,24 @@ type Config struct { BaseURL string `json:"base_url,omitempty"` // this should only be used for testing in staging } +// activeJobState holds the current job ID and when it was started. +// startedAt is zero if the job was already running when the wrapper started up. +type activeJobState struct { + id string + startedAt time.Time +} + +// updatingMapInfo holds the name and version of an existing map to continue from. +type updatingMapInfo struct { + name string + version string +} + type cloudslamWrapper struct { resource.Named resource.AlwaysRebuild - activeJob atomic.Pointer[string] + activeJob atomic.Pointer[activeJobState] // nil when no job is running lastPose atomic.Pointer[spatialmath.Pose] lastPointCloudURL atomic.Pointer[string] defaultpcd []byte @@ -87,9 +102,7 @@ type cloudslamWrapper struct { viamVersion string // optional cloudslam setting, describes which viam-server appimage to use(stable/latest/pr/pinned) slamVersion string // optional cloudslam setting, describes which cartographer appimage to use(stable/latest/pr/pinned) - // updating mode values. A user can only use updating mode if the partID is configured - updatingMapName string // empty if slam is not in updating mode - updatingMapVersion string // empty if slam is not in updating mode + updatingMap *updatingMapInfo // nil if not in updating mode // app clients for talking to app app *AppClient @@ -161,6 +174,9 @@ func newSLAM( if partID == "" { partID = os.Getenv(rdkutils.MachinePartIDEnvVar) } + if partID == "" { + return nil, fmt.Errorf("machine_part_id is required but not set in config or %s env var", rdkutils.MachinePartIDEnvVar) + } viamVersion := newConf.VIAMVersion if viamVersion == "" { @@ -226,8 +242,7 @@ func newSLAM( func (svc *cloudslamWrapper) initialize(mappingMode slam.MappingMode) error { var err error svc.lastPose.Store(&initPose) - initJob := "" - svc.activeJob.Store(&initJob) + svc.activeJob.Store(nil) svc.lastPointCloudURL.Store(&initPCDURL) // using this as a placeholder image. need to determine the right way to have the module use it @@ -236,15 +251,18 @@ func (svc *cloudslamWrapper) initialize(mappingMode slam.MappingMode) error { return err } - // only attempt updating mode if a partID is configured and the user is not trying to make a new map. + // only attempt updating mode if the user is not trying to make a new map. // the cloudslam can still make a new map if no slam_map package is found. // the webapp does not remove the package from the config when swapping from updating mode to mapping mode, so this code // needs the extra check to ensure we only update maps when the user wants to. - if svc.partID != "" && mappingMode != slam.MappingModeNewMap { - svc.updatingMapName, svc.updatingMapVersion, err = svc.app.GetSLAMMapPackageOnRobot(svc.cancelCtx, svc.partID) + if mappingMode != slam.MappingModeNewMap { + name, version, err := svc.app.GetSLAMMapPackageOnRobot(svc.cancelCtx, svc.partID) if err != nil { return err } + if name != "" { + svc.updatingMap = &updatingMapInfo{name: name, version: version} + } } // check if the machine has an active job @@ -253,8 +271,9 @@ func (svc *cloudslamWrapper) initialize(mappingMode slam.MappingMode) error { if err != nil { return err } - sessionID := resp.GetSessionId() - svc.activeJob.Store(&sessionID) + if resp.GetSessionId() != "" { + svc.activeJob.Store(&activeJobState{id: resp.GetSessionId()}) + } svc.workers = goutils.NewBackgroundStoppableWorkers(svc.activeMappingSessionThread) return nil @@ -267,14 +286,14 @@ func (svc *cloudslamWrapper) activeMappingSessionThread(ctx context.Context) { return } - currJob := *svc.activeJob.Load() + job := svc.activeJob.Load() // do nothing if no active jobs - if currJob == "" { + if job == nil { continue } // get the most recent pointcloud and position if there is an active job - req := &pbCloudSLAM.GetMappingSessionPointCloudRequest{SessionId: currJob} + req := &pbCloudSLAM.GetMappingSessionPointCloudRequest{SessionId: job.id} resp, err := svc.app.CSClient.GetMappingSessionPointCloud(ctx, req) if err != nil { svc.logger.Error(err) @@ -296,8 +315,16 @@ func (svc *cloudslamWrapper) Position(ctx context.Context) (spatialmath.Pose, er func (svc *cloudslamWrapper) PointCloudMap(ctx context.Context, returnEditedMap bool) (func() ([]byte, error), error) { currMap := *svc.lastPointCloudURL.Load() - // return the placeholder map when no maps are present if currMap == "" { + job := svc.activeJob.Load() + if job != nil && !job.startedAt.IsZero() { + progressPCD, err := generateProgressRingPCD(time.Since(job.startedAt)) + if err != nil { + svc.logger.Warnf("failed to generate progress PCD: %v", err) + return toChunkedFunc(svc.defaultpcd), nil + } + return toChunkedFunc(progressPCD), nil + } return toChunkedFunc(svc.defaultpcd), nil } pcdBytes, err := svc.app.GetDataFromHTTP(ctx, currMap) @@ -316,7 +343,7 @@ func (svc *cloudslamWrapper) Properties(ctx context.Context) (slam.Properties, e } func (svc *cloudslamWrapper) Close(ctx context.Context) error { - if *svc.activeJob.Load() != "" { + if svc.activeJob.Load() != nil { _, err := svc.StopJob(ctx) if err != nil { svc.logger.Errorf("error while stopping job: %v", err) @@ -330,22 +357,20 @@ func (svc *cloudslamWrapper) Close(ctx context.Context) error { func (svc *cloudslamWrapper) DoCommand(ctx context.Context, req map[string]interface{}) (map[string]interface{}, error) { resp := map[string]interface{}{} if name, ok := req[startJobKey]; ok { - if svc.partID != "" { - if err := svc.app.CheckSensorsDataCapture(ctx, svc.partID, svc.sensors, svc.logger); err != nil { - return nil, err - } + if err := svc.app.CheckSensorsDataCapture(ctx, svc.partID, svc.sensors); err != nil { + return nil, err } jobID, isUpdating, err := svc.StartJob(svc.cancelCtx, name.(string)) if err != nil { return nil, err } - svc.activeJob.Store(&jobID) + svc.activeJob.Store(&activeJobState{id: jobID, startedAt: time.Now()}) svc.lastPose.Store(&initPose) svc.lastPointCloudURL.Store(&initPCDURL) if isUpdating { resp[updatingModeKey] = fmt.Sprintf("slam map found on machine, starting cloudslam in updating mode. Map "+ - "Name = %v // Updating Version = %v", svc.updatingMapName, svc.updatingMapVersion) + "Name = %v // Updating Version = %v", svc.updatingMap.name, svc.updatingMap.version) } resp[startJobKey] = "Starting cloudslam session, the machine should appear in ~1 minute. Job ID: " + jobID } @@ -368,28 +393,28 @@ func (svc *cloudslamWrapper) DoCommand(ctx context.Context, req map[string]inter // StopJob stops the current active cloudslam job. func (svc *cloudslamWrapper) StopJob(ctx context.Context) (string, error) { - // grab the active job but do not clear it from the module. that way users can still see the final map on the machine - currJob := *svc.activeJob.Load() - if currJob == "" { + job := svc.activeJob.Load() + if job == nil { return "", errors.New("no active jobs") } - req := &pbCloudSLAM.StopMappingSessionRequest{SessionId: currJob} + req := &pbCloudSLAM.StopMappingSessionRequest{SessionId: job.id} resp, err := svc.app.CSClient.StopMappingSession(ctx, req) if err != nil { return "", err } metaResp, err := svc.app.CSClient.GetMappingSessionMetadataByID(ctx, - &pbCloudSLAM.GetMappingSessionMetadataByIDRequest{SessionId: currJob}) + &pbCloudSLAM.GetMappingSessionMetadataByIDRequest{SessionId: job.id}) if err != nil { - svc.logger.Warnf("could not retrieve session metadata for job %s: %v", currJob, err) + svc.logger.Warnf("could not retrieve session metadata for job %s: %v", job.id, err) } else if metaResp.GetSessionMetadata().GetEndStatus() == pbCloudSLAM.EndStatus_END_STATUS_FAIL { return "", fmt.Errorf("cloudslam session failed: %s", metaResp.GetSessionMetadata().GetErrorMsg()) } + svc.activeJob.Store(nil) packageName := strings.Split(resp.GetPackageId(), "/")[1] - packageURL := svc.app.baseURL + "/robots?name=" + packageName + "&version=" + resp.GetVersion() + packageURL := svc.app.baseURL + "/robots?page=slam&name=" + packageName + "&version=" + resp.GetVersion() return packageURL, nil } @@ -422,9 +447,9 @@ func (svc *cloudslamWrapper) StartJob(ctx context.Context, mapName string) (stri Sensors: svc.sensorInfoToProto(), SlamConfig: configParams, } - if svc.updatingMapName != "" { - req.MapName = svc.updatingMapName - req.ExistingMapVersion = svc.updatingMapVersion + if svc.updatingMap != nil { + req.MapName = svc.updatingMap.name + req.ExistingMapVersion = svc.updatingMap.version updatingMode = true } resp, err := svc.app.CSClient.StartMappingSession(ctx, req) @@ -480,6 +505,37 @@ func (svc *cloudslamWrapper) ParseSensorsForPackage() ([]interface{}, error) { return sensorMetadata, nil } +// generateProgressRingPCD generates a point cloud of a progress arc indicating elapsed time. +// The arc grows clockwise from 0 to a full circle over progressRingDuration, giving the user +// visual feedback while waiting for the first cloudslam map to appear. +func generateProgressRingPCD(elapsed time.Duration) ([]byte, error) { + const ( + numPoints = 360 + radius = 1000.0 // mm + progressRingDuration = 5 * time.Minute + ) + + // Always show at least a small arc so the user sees something immediately. + fraction := math.Max(0.02, math.Min(elapsed.Seconds()/progressRingDuration.Seconds(), 1.0)) + filledPoints := int(fraction * numPoints) + + pc := pointcloud.NewBasicEmpty() + for i := 0; i < filledPoints; i++ { + angle := float64(i) / numPoints * 2 * math.Pi + x := radius * math.Cos(angle) + y := radius * math.Sin(angle) + if err := pc.Set(r3.Vector{X: x, Y: y, Z: 0}, pointcloud.NewBasicData()); err != nil { + return nil, err + } + } + + var buf bytes.Buffer + if err := pointcloud.ToPCD(pc, &buf, pointcloud.PCDAscii); err != nil { + return nil, err + } + return buf.Bytes(), nil +} + // toChunkedFunc takes binary data and wraps it in a helper function that converts it into chunks for streaming APIs. func toChunkedFunc(b []byte) func() ([]byte, error) { chunk := make([]byte, chunkSizeBytes)